Adding a document update queue based on the events which can be used to track when certain documents have changed, using this for users and groups to detect when a re-sync is needed.

This commit is contained in:
Michael Drury 2023-04-04 00:25:15 +01:00
parent b841cec4c9
commit ff98ba5a0a
9 changed files with 142 additions and 1 deletions

View File

@ -0,0 +1 @@
export * from "./updates"

View File

@ -0,0 +1,62 @@
import { createQueue, JobQueue } from "../queue"
import BullQueue from "bull"
import { DocumentType, SEPARATOR } from "../constants"
import { doInContext, doInTenant } from "../context"
type DocUpdateEvent = {
id: string
tenantId: string
appId?: string
}
type Processor = (update: DocUpdateEvent) => Promise<void>
const processors: { types: DocumentType[]; processor: Processor }[] = []
let queue: BullQueue.Queue
let processingPromise: Promise<void>
export function init() {
queue = createQueue<DocUpdateEvent>(JobQueue.DOC_UPDATE)
}
export async function shutdown() {
if (queue) {
await queue.close()
}
}
export async function update(opts: DocUpdateEvent) {
if (!queue) {
init()
}
await queue.add(opts)
}
async function handleJob(data: DocUpdateEvent) {
for (let { types, processor } of processors) {
if (types.find(type => data.id.startsWith(`${type}${SEPARATOR}`))) {
const context = data.appId || data.tenantId
const contextFn = data.appId ? doInContext : doInTenant
await contextFn(context, async () => {
await processor(data)
})
}
}
}
export async function process(types: DocumentType[], processor: Processor) {
if (!queue) {
init()
}
// add to processor list
processors.push({
types,
processor,
})
// if not processing in this instance, kick it off
if (!processingPromise) {
processingPromise = queue.process(async job => {
await handleJob(job.data)
})
}
}

View File

@ -0,0 +1,42 @@
import { Event, Identity, Group, DocumentUpdateEvents } from "@budibase/types"
import { EventProcessor } from "./types"
import * as docUpdates from "../../docUpdates"
import { getTenantId } from "../../context"
export default class DocumentUpdateProcessor implements EventProcessor {
async processEvent(
event: Event,
identity: Identity,
properties: any,
timestamp?: string
): Promise<void> {
// only user and group IDs supported right now - no app documents yet
if (DocumentUpdateEvents.indexOf(event) !== -1 && identity.tenantId) {
await docUpdates.update({
id: this.getId(properties),
tenantId: getTenantId(),
})
}
}
getId(properties: any) {
let possibleProps = ["groupId", "userId"]
for (let prop of possibleProps) {
if (properties[prop]) {
return properties[prop]
}
}
}
async identify(identity: Identity, timestamp?: string | number) {
// no-op
}
async identifyGroup(group: Group, timestamp?: string | number) {
// no-op
}
shutdown(): void {
docUpdates.shutdown()
}
}

View File

@ -1,12 +1,14 @@
import AnalyticsProcessor from "./AnalyticsProcessor"
import LoggingProcessor from "./LoggingProcessor"
import AuditLogsProcessor from "./AuditLogsProcessor"
import DocumentUpdateProcessor from "./DocumentUpdateProcessor"
import Processors from "./Processors"
import { AuditLogFn } from "@budibase/types"
export const analyticsProcessor = new AnalyticsProcessor()
const loggingProcessor = new LoggingProcessor()
const auditLogsProcessor = new AuditLogsProcessor()
const documentUpdateProcessor = new DocumentUpdateProcessor()
export function init(auditingFn: AuditLogFn) {
return AuditLogsProcessor.init(auditingFn)
@ -16,4 +18,5 @@ export const processors = new Processors([
analyticsProcessor,
loggingProcessor,
auditLogsProcessor,
documentUpdateProcessor,
])

View File

@ -27,6 +27,7 @@ export * as errors from "./errors"
export * as timers from "./timers"
export { default as env } from "./environment"
export * as blacklist from "./blacklist"
export * as docUpdates from "./docUpdates"
export { SearchParams } from "./db"
// Add context to tenancy for backwards compatibility
// only do this for external usages to prevent internal

View File

@ -2,4 +2,5 @@ export enum JobQueue {
AUTOMATION = "automationQueue",
APP_BACKUP = "appBackupQueue",
AUDIT_LOG = "auditLogQueue",
DOC_UPDATE = "docUpdateQueue",
}

View File

@ -1,7 +1,21 @@
import env from "../../../environment"
import { db as dbCore, context } from "@budibase/backend-core"
import {
db as dbCore,
context,
docUpdates,
constants,
} from "@budibase/backend-core"
import sdk from "../../"
export function initUserGroupSync() {
const types = [constants.DocumentType.USER, constants.DocumentType.GROUP]
docUpdates.process(types, async update => {
console.log("syncing - ", JSON.stringify(update))
// TODO: make the sync smarter
await sdk.users.syncGlobalUsers()
})
}
export async function syncApp(
appId: string,
opts?: { automationOnly?: boolean }

View File

@ -64,6 +64,7 @@ export async function startup(app?: any, server?: any) {
eventEmitter.emitPort(env.PORT)
fileSystem.init()
await redis.init()
sdk.applications.initUserGroupSync()
// run migrations on startup if not done via http
// not recommended in a clustered environment

View File

@ -186,6 +186,22 @@ export enum Event {
AUDIT_LOGS_DOWNLOADED = "audit_log:downloaded",
}
export const DocumentUpdateEvents: Event[] = [
Event.USER_CREATED,
Event.USER_UPDATED,
Event.USER_DELETED,
Event.USER_PERMISSION_ADMIN_ASSIGNED,
Event.USER_PERMISSION_ADMIN_REMOVED,
Event.USER_PERMISSION_BUILDER_ASSIGNED,
Event.USER_PERMISSION_BUILDER_REMOVED,
Event.USER_GROUP_CREATED,
Event.USER_GROUP_UPDATED,
Event.USER_GROUP_DELETED,
Event.USER_GROUP_USERS_ADDED,
Event.USER_GROUP_USERS_REMOVED,
Event.USER_GROUP_PERMISSIONS_EDITED,
]
// all events that are not audited have been added to this record as undefined, this means
// that Typescript can protect us against new events being added and auditing of those
// events not being considered. This might be a little ugly, but provides a level of