diff --git a/packages/backend-core/src/events/asyncEvents/queue.ts b/packages/backend-core/src/events/asyncEvents/queue.ts index c0b9b368b8..196fd359b3 100644 --- a/packages/backend-core/src/events/asyncEvents/queue.ts +++ b/packages/backend-core/src/events/asyncEvents/queue.ts @@ -12,7 +12,7 @@ export interface EventPayload { export let asyncEventQueue: BullQueue.Queue export function init() { - asyncEventQueue = createQueue(JobQueue.DOC_UPDATE) + asyncEventQueue = createQueue(JobQueue.SYSTEM_EVENT_QUEUE) } export async function shutdown() { diff --git a/packages/backend-core/src/events/events.ts b/packages/backend-core/src/events/events.ts index 12cef21dff..f02b9fdf32 100644 --- a/packages/backend-core/src/events/events.ts +++ b/packages/backend-core/src/events/events.ts @@ -13,17 +13,16 @@ export const publishEvent = async ( const identity = await identification.getCurrentIdentity() const backfilling = await backfill.isBackfillingEvent(event) - - // send off async events if required - await publishAsyncEvent({ - event, - identity, - properties, - timestamp, - }) - // no backfill - send the event and exit if (!backfilling) { + // send off async events if required + await publishAsyncEvent({ + event, + identity, + properties, + timestamp, + }) + // now handle the main sync event processing pipeline await processors.processEvent(event, identity, properties, timestamp) return } diff --git a/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts b/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts index a65152cc70..d64ac1d41d 100644 --- a/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts +++ b/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts @@ -1,12 +1,11 @@ import { EventProcessor } from "../types" import { Event, Identity, DocUpdateEvent } from "@budibase/types" -import { DocumentType, SEPARATOR } from "../../../constants" import { doInTenant } from "../../../context" import { getDocumentId } from "../../documentId" import { shutdown } from "../../asyncEvents" export type Processor = (update: DocUpdateEvent) => Promise -export type ProcessorMap = { types: DocumentType[]; processor: Processor }[] +export type ProcessorMap = { events: Event[]; processor: Processor }[] export default class DocumentUpdateProcessor implements EventProcessor { processors: ProcessorMap = [] @@ -26,8 +25,8 @@ export default class DocumentUpdateProcessor implements EventProcessor { if (!tenantId || !docId) { return } - for (let { types, processor } of this.processors) { - if (types.find(type => docId.startsWith(`${type}${SEPARATOR}`))) { + for (let { events, processor } of this.processors) { + if (events.includes(event)) { await doInTenant(tenantId, async () => { await processor({ id: docId, diff --git a/packages/backend-core/src/events/processors/types.ts b/packages/backend-core/src/events/processors/types.ts index 33db6d8932..5256a1bc62 100644 --- a/packages/backend-core/src/events/processors/types.ts +++ b/packages/backend-core/src/events/processors/types.ts @@ -1,18 +1 @@ -import { Event, Identity, Group } from "@budibase/types" - -export enum EventProcessorType { - POSTHOG = "posthog", - LOGGING = "logging", -} - -export interface EventProcessor { - processEvent( - event: Event, - identity: Identity, - properties: any, - timestamp?: string | number - ): Promise - identify?(identity: Identity, timestamp?: string | number): Promise - identifyGroup?(group: Group, timestamp?: string | number): Promise - shutdown?(): void -} +export { EventProcessor } from "@budibase/types" diff --git a/packages/backend-core/src/queue/constants.ts b/packages/backend-core/src/queue/constants.ts index 72d2e4742c..e1ffcfee36 100644 --- a/packages/backend-core/src/queue/constants.ts +++ b/packages/backend-core/src/queue/constants.ts @@ -2,5 +2,5 @@ export enum JobQueue { AUTOMATION = "automationQueue", APP_BACKUP = "appBackupQueue", AUDIT_LOG = "auditLogQueue", - DOC_UPDATE = "docUpdateQueue", + SYSTEM_EVENT_QUEUE = "systemEventQueue", } diff --git a/packages/server/src/events/docUpdates/index.ts b/packages/server/src/events/docUpdates/index.ts new file mode 100644 index 0000000000..fa7a623108 --- /dev/null +++ b/packages/server/src/events/docUpdates/index.ts @@ -0,0 +1 @@ +export * from "./processors" diff --git a/packages/server/src/events/docUpdates/processors.ts b/packages/server/src/events/docUpdates/processors.ts new file mode 100644 index 0000000000..8f3738f0dc --- /dev/null +++ b/packages/server/src/events/docUpdates/processors.ts @@ -0,0 +1,9 @@ +import userGroupProcessor from "./syncUsers" +import { docUpdates } from "@budibase/backend-core" + +export type UpdateCallback = (docId: string) => void + +export function init(updateCb?: UpdateCallback) { + const processors = [userGroupProcessor(updateCb)] + docUpdates.init(processors) +} diff --git a/packages/server/src/events/docUpdates.ts b/packages/server/src/events/docUpdates/syncUsers.ts similarity index 60% rename from packages/server/src/events/docUpdates.ts rename to packages/server/src/events/docUpdates/syncUsers.ts index fcd90f64c4..7957178168 100644 --- a/packages/server/src/events/docUpdates.ts +++ b/packages/server/src/events/docUpdates/syncUsers.ts @@ -1,12 +1,10 @@ -import { constants, docUpdates, logging } from "@budibase/backend-core" +import { constants, logging } from "@budibase/backend-core" import { sdk as proSdk } from "@budibase/pro" -import { DocUpdateEvent } from "@budibase/types" -import { syncUsersToAllApps } from "../sdk/app/applications/sync" +import { DocUpdateEvent, UserGroupSyncEvents } from "@budibase/types" +import { syncUsersToAllApps } from "../../sdk/app/applications/sync" +import { UpdateCallback } from "./processors" -type UpdateCallback = (docId: string) => void - -function userGroupUpdates(updateCb?: UpdateCallback) { - const types = [constants.DocumentType.USER, constants.DocumentType.GROUP] +export default function process(updateCb?: UpdateCallback) { const processor = async (update: DocUpdateEvent) => { try { const docId = update.id @@ -33,10 +31,5 @@ function userGroupUpdates(updateCb?: UpdateCallback) { } } } - return { types, processor } -} - -export function init(updateCb?: UpdateCallback) { - const processors = [userGroupUpdates(updateCb)] - docUpdates.init(processors) + return { events: UserGroupSyncEvents, processor } } diff --git a/packages/types/src/sdk/events/event.ts b/packages/types/src/sdk/events/event.ts index 0b2ed3ce7c..c4990f869b 100644 --- a/packages/types/src/sdk/events/event.ts +++ b/packages/types/src/sdk/events/event.ts @@ -1,4 +1,5 @@ import { Hosting } from "../hosting" +import { Group, Identity } from "./identification" export enum Event { // USER @@ -186,7 +187,7 @@ export enum Event { AUDIT_LOGS_DOWNLOADED = "audit_log:downloaded", } -export const AsyncEvents: Event[] = [ +export const UserGroupSyncEvents: Event[] = [ Event.USER_CREATED, Event.USER_UPDATED, Event.USER_DELETED, @@ -202,6 +203,8 @@ export const AsyncEvents: Event[] = [ Event.USER_GROUP_PERMISSIONS_EDITED, ] +export const AsyncEvents: Event[] = [...UserGroupSyncEvents] + // all events that are not audited have been added to this record as undefined, this means // that Typescript can protect us against new events being added and auditing of those // events not being considered. This might be a little ugly, but provides a level of @@ -405,3 +408,15 @@ export type DocUpdateEvent = { tenantId: string appId?: string } + +export interface EventProcessor { + processEvent( + event: Event, + identity: Identity, + properties: any, + timestamp?: string | number + ): Promise + identify?(identity: Identity, timestamp?: string | number): Promise + identifyGroup?(group: Group, timestamp?: string | number): Promise + shutdown?(): void +} diff --git a/yarn.lock b/yarn.lock index c1b5be3b11..f5cd450ddc 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1486,15 +1486,15 @@ pouchdb-promise "^6.0.4" through2 "^2.0.0" -"@budibase/pro@2.5.5-alpha.0": - version "2.5.5-alpha.0" - resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-2.5.5-alpha.0.tgz#28b075a96efb564328a4972cae9ea6c9a5f3aabc" - integrity sha512-98fLnvHWVy7ASEFC98bo6Qdd55SjC7yrJNuf7FUYZbeFwpmwwRxlWnWFTa0ctKWB5p2LToARWBns3TqgnUr/zQ== +"@budibase/pro@2.5.5-alpha.1": + version "2.5.5-alpha.1" + resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-2.5.5-alpha.1.tgz#6d9476ce724bd3e405cb2e0198c4b83b168e65bc" + integrity sha512-fEuropk/0aH1+lELX6wdFa8UUpE+SMtlvBbsgNR2ulvLgLLPEYhI2gINlhiItMWWgTtxNtele3hOs1VWhn0o2A== dependencies: - "@budibase/backend-core" "2.5.5-alpha.0" + "@budibase/backend-core" "2.5.5-alpha.1" "@budibase/shared-core" "2.4.44-alpha.1" "@budibase/string-templates" "2.4.44-alpha.1" - "@budibase/types" "2.5.5-alpha.0" + "@budibase/types" "2.5.5-alpha.1" "@koa/router" "8.0.8" bull "4.10.1" joi "17.6.0"