diff --git a/packages/backend-core/src/docUpdates/index.ts b/packages/backend-core/src/docUpdates/index.ts index c6c80aad8a..3971f8de12 100644 --- a/packages/backend-core/src/docUpdates/index.ts +++ b/packages/backend-core/src/docUpdates/index.ts @@ -1 +1,29 @@ -export * from "./updates" +import { asyncEventQueue, init as initQueue } from "../events/asyncEvents" +import { + ProcessorMap, + default as DocumentUpdateProcessor, +} from "../events/processors/async/DocumentUpdateProcessor" + +let processingPromise: Promise +let documentProcessor: DocumentUpdateProcessor + +export function init(processors: ProcessorMap) { + if (!asyncEventQueue) { + initQueue() + } + if (!documentProcessor) { + documentProcessor = new DocumentUpdateProcessor(processors) + } + // if not processing in this instance, kick it off + if (!processingPromise) { + processingPromise = asyncEventQueue.process(async job => { + const { event, identity, properties, timestamp } = job.data + await documentProcessor.processEvent( + event, + identity, + properties, + timestamp + ) + }) + } +} diff --git a/packages/backend-core/src/docUpdates/updates.ts b/packages/backend-core/src/docUpdates/updates.ts deleted file mode 100644 index 478d29f4eb..0000000000 --- a/packages/backend-core/src/docUpdates/updates.ts +++ /dev/null @@ -1,62 +0,0 @@ -import { createQueue, JobQueue } from "../queue" -import BullQueue from "bull" -import { DocumentType, SEPARATOR } from "../constants" -import { doInContext, doInTenant } from "../context" - -type DocUpdateEvent = { - id: string - tenantId: string - appId?: string -} - -type Processor = (update: DocUpdateEvent) => Promise - -const processors: { types: DocumentType[]; processor: Processor }[] = [] -let queue: BullQueue.Queue -let processingPromise: Promise - -export function init() { - queue = createQueue(JobQueue.DOC_UPDATE) -} - -export async function shutdown() { - if (queue) { - await queue.close() - } -} - -export async function update(opts: DocUpdateEvent) { - if (!queue) { - init() - } - await queue.add(opts) -} - -async function handleJob(data: DocUpdateEvent) { - for (let { types, processor } of processors) { - if (types.find(type => data.id.startsWith(`${type}${SEPARATOR}`))) { - const context = data.appId || data.tenantId - const contextFn = data.appId ? doInContext : doInTenant - await contextFn(context, async () => { - await processor(data) - }) - } - } -} - -export async function process(types: DocumentType[], processor: Processor) { - if (!queue) { - init() - } - // add to processor list - processors.push({ - types, - processor, - }) - // if not processing in this instance, kick it off - if (!processingPromise) { - processingPromise = queue.process(async job => { - await handleJob(job.data) - }) - } -} diff --git a/packages/backend-core/src/events/asyncEvents/index.ts b/packages/backend-core/src/events/asyncEvents/index.ts new file mode 100644 index 0000000000..65f7f8f58b --- /dev/null +++ b/packages/backend-core/src/events/asyncEvents/index.ts @@ -0,0 +1,2 @@ +export * from "./queue" +export * from "./publisher" diff --git a/packages/backend-core/src/events/asyncEvents/publisher.ts b/packages/backend-core/src/events/asyncEvents/publisher.ts new file mode 100644 index 0000000000..4e44c4ddc5 --- /dev/null +++ b/packages/backend-core/src/events/asyncEvents/publisher.ts @@ -0,0 +1,12 @@ +import { AsyncEvents } from "@budibase/types" +import { EventPayload, asyncEventQueue, init } from "./queue" + +export async function publishAsyncEvent(payload: EventPayload) { + if (!asyncEventQueue) { + init() + } + const { event, identity } = payload + if (AsyncEvents.indexOf(event) !== -1 && identity.tenantId) { + await asyncEventQueue.add(payload) + } +} diff --git a/packages/backend-core/src/events/asyncEvents/queue.ts b/packages/backend-core/src/events/asyncEvents/queue.ts new file mode 100644 index 0000000000..c0b9b368b8 --- /dev/null +++ b/packages/backend-core/src/events/asyncEvents/queue.ts @@ -0,0 +1,22 @@ +import BullQueue from "bull" +import { createQueue, JobQueue } from "../../queue" +import { Event, Identity } from "@budibase/types" + +export interface EventPayload { + event: Event + identity: Identity + properties: any + timestamp?: string | number +} + +export let asyncEventQueue: BullQueue.Queue + +export function init() { + asyncEventQueue = createQueue(JobQueue.DOC_UPDATE) +} + +export async function shutdown() { + if (asyncEventQueue) { + await asyncEventQueue.close() + } +} diff --git a/packages/backend-core/src/events/documentId.ts b/packages/backend-core/src/events/documentId.ts new file mode 100644 index 0000000000..887e56f07b --- /dev/null +++ b/packages/backend-core/src/events/documentId.ts @@ -0,0 +1,56 @@ +import { + Event, + UserCreatedEvent, + UserUpdatedEvent, + UserDeletedEvent, + UserPermissionAssignedEvent, + UserPermissionRemovedEvent, + GroupCreatedEvent, + GroupUpdatedEvent, + GroupDeletedEvent, + GroupUsersAddedEvent, + GroupUsersDeletedEvent, + GroupPermissionsEditedEvent, +} from "@budibase/types" + +const getEventProperties: Record< + string, + (properties: any) => string | undefined +> = { + [Event.USER_CREATED]: (properties: UserCreatedEvent) => properties.userId, + [Event.USER_UPDATED]: (properties: UserUpdatedEvent) => properties.userId, + [Event.USER_DELETED]: (properties: UserDeletedEvent) => properties.userId, + [Event.USER_PERMISSION_ADMIN_ASSIGNED]: ( + properties: UserPermissionAssignedEvent + ) => properties.userId, + [Event.USER_PERMISSION_ADMIN_REMOVED]: ( + properties: UserPermissionRemovedEvent + ) => properties.userId, + [Event.USER_PERMISSION_BUILDER_ASSIGNED]: ( + properties: UserPermissionAssignedEvent + ) => properties.userId, + [Event.USER_PERMISSION_BUILDER_REMOVED]: ( + properties: UserPermissionRemovedEvent + ) => properties.userId, + [Event.USER_GROUP_CREATED]: (properties: GroupCreatedEvent) => + properties.groupId, + [Event.USER_GROUP_UPDATED]: (properties: GroupUpdatedEvent) => + properties.groupId, + [Event.USER_GROUP_DELETED]: (properties: GroupDeletedEvent) => + properties.groupId, + [Event.USER_GROUP_USERS_ADDED]: (properties: GroupUsersAddedEvent) => + properties.groupId, + [Event.USER_GROUP_USERS_REMOVED]: (properties: GroupUsersDeletedEvent) => + properties.groupId, + [Event.USER_GROUP_PERMISSIONS_EDITED]: ( + properties: GroupPermissionsEditedEvent + ) => properties.groupId, +} + +export function getDocumentId(event: Event, properties: any) { + const extractor = getEventProperties[event] + if (!extractor) { + throw new Error("Event does not have a method of document ID extraction") + } + return extractor(properties) +} diff --git a/packages/backend-core/src/events/events.ts b/packages/backend-core/src/events/events.ts index c2f7cf66ec..12cef21dff 100644 --- a/packages/backend-core/src/events/events.ts +++ b/packages/backend-core/src/events/events.ts @@ -1,7 +1,8 @@ -import { Event, AuditedEventFriendlyName } from "@budibase/types" +import { Event } from "@budibase/types" import { processors } from "./processors" import identification from "./identification" import * as backfill from "./backfill" +import { publishAsyncEvent } from "./asyncEvents" export const publishEvent = async ( event: Event, @@ -12,6 +13,15 @@ export const publishEvent = async ( const identity = await identification.getCurrentIdentity() const backfilling = await backfill.isBackfillingEvent(event) + + // send off async events if required + await publishAsyncEvent({ + event, + identity, + properties, + timestamp, + }) + // no backfill - send the event and exit if (!backfilling) { await processors.processEvent(event, identity, properties, timestamp) diff --git a/packages/backend-core/src/events/index.ts b/packages/backend-core/src/events/index.ts index d0d59a5b22..a238d72bac 100644 --- a/packages/backend-core/src/events/index.ts +++ b/packages/backend-core/src/events/index.ts @@ -6,6 +6,8 @@ export * as backfillCache from "./backfill" import { processors } from "./processors" +export function initAsyncEvents() {} + export const shutdown = () => { processors.shutdown() console.log("Events shutdown") diff --git a/packages/backend-core/src/events/processors/DocumentUpdateProcessor.ts b/packages/backend-core/src/events/processors/DocumentUpdateProcessor.ts deleted file mode 100644 index 496da7e923..0000000000 --- a/packages/backend-core/src/events/processors/DocumentUpdateProcessor.ts +++ /dev/null @@ -1,42 +0,0 @@ -import { Event, Identity, Group, DocumentUpdateEvents } from "@budibase/types" -import { EventProcessor } from "./types" -import * as docUpdates from "../../docUpdates" -import { getTenantId } from "../../context" - -export default class DocumentUpdateProcessor implements EventProcessor { - async processEvent( - event: Event, - identity: Identity, - properties: any, - timestamp?: string - ): Promise { - // only user and group IDs supported right now - no app documents yet - if (DocumentUpdateEvents.indexOf(event) !== -1 && identity.tenantId) { - await docUpdates.update({ - id: this.getId(properties), - tenantId: getTenantId(), - }) - } - } - - getId(properties: any) { - let possibleProps = ["groupId", "userId"] - for (let prop of possibleProps) { - if (properties[prop]) { - return properties[prop] - } - } - } - - async identify(identity: Identity, timestamp?: string | number) { - // no-op - } - - async identifyGroup(group: Group, timestamp?: string | number) { - // no-op - } - - shutdown(): void { - docUpdates.shutdown() - } -} diff --git a/packages/backend-core/src/events/processors/Processors.ts b/packages/backend-core/src/events/processors/Processors.ts index 4baedd909f..72de945d44 100644 --- a/packages/backend-core/src/events/processors/Processors.ts +++ b/packages/backend-core/src/events/processors/Processors.ts @@ -25,7 +25,9 @@ export default class Processor implements EventProcessor { timestamp?: string | number ): Promise { for (const eventProcessor of this.processors) { - await eventProcessor.identify(identity, timestamp) + if (eventProcessor.identify) { + await eventProcessor.identify(identity, timestamp) + } } } @@ -34,13 +36,17 @@ export default class Processor implements EventProcessor { timestamp?: string | number ): Promise { for (const eventProcessor of this.processors) { - await eventProcessor.identifyGroup(identity, timestamp) + if (eventProcessor.identifyGroup) { + await eventProcessor.identifyGroup(identity, timestamp) + } } } shutdown() { for (const eventProcessor of this.processors) { - eventProcessor.shutdown() + if (eventProcessor.shutdown) { + eventProcessor.shutdown() + } } } } diff --git a/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts b/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts new file mode 100644 index 0000000000..a65152cc70 --- /dev/null +++ b/packages/backend-core/src/events/processors/async/DocumentUpdateProcessor.ts @@ -0,0 +1,44 @@ +import { EventProcessor } from "../types" +import { Event, Identity, DocUpdateEvent } from "@budibase/types" +import { DocumentType, SEPARATOR } from "../../../constants" +import { doInTenant } from "../../../context" +import { getDocumentId } from "../../documentId" +import { shutdown } from "../../asyncEvents" + +export type Processor = (update: DocUpdateEvent) => Promise +export type ProcessorMap = { types: DocumentType[]; processor: Processor }[] + +export default class DocumentUpdateProcessor implements EventProcessor { + processors: ProcessorMap = [] + + constructor(processors: ProcessorMap) { + this.processors = processors + } + + async processEvent( + event: Event, + identity: Identity, + properties: any, + timestamp?: string | number + ) { + const tenantId = identity.tenantId + const docId = getDocumentId(event, properties) + if (!tenantId || !docId) { + return + } + for (let { types, processor } of this.processors) { + if (types.find(type => docId.startsWith(`${type}${SEPARATOR}`))) { + await doInTenant(tenantId, async () => { + await processor({ + id: docId, + tenantId, + }) + }) + } + } + } + + shutdown() { + return shutdown() + } +} diff --git a/packages/backend-core/src/events/processors/index.ts b/packages/backend-core/src/events/processors/index.ts index 3582838d31..6646764e47 100644 --- a/packages/backend-core/src/events/processors/index.ts +++ b/packages/backend-core/src/events/processors/index.ts @@ -1,14 +1,12 @@ import AnalyticsProcessor from "./AnalyticsProcessor" import LoggingProcessor from "./LoggingProcessor" import AuditLogsProcessor from "./AuditLogsProcessor" -import DocumentUpdateProcessor from "./DocumentUpdateProcessor" import Processors from "./Processors" import { AuditLogFn } from "@budibase/types" export const analyticsProcessor = new AnalyticsProcessor() const loggingProcessor = new LoggingProcessor() const auditLogsProcessor = new AuditLogsProcessor() -const documentUpdateProcessor = new DocumentUpdateProcessor() export function init(auditingFn: AuditLogFn) { return AuditLogsProcessor.init(auditingFn) @@ -18,5 +16,4 @@ export const processors = new Processors([ analyticsProcessor, loggingProcessor, auditLogsProcessor, - documentUpdateProcessor, ]) diff --git a/packages/backend-core/src/events/processors/types.ts b/packages/backend-core/src/events/processors/types.ts index f4066fe248..33db6d8932 100644 --- a/packages/backend-core/src/events/processors/types.ts +++ b/packages/backend-core/src/events/processors/types.ts @@ -12,7 +12,7 @@ export interface EventProcessor { properties: any, timestamp?: string | number ): Promise - identify(identity: Identity, timestamp?: string | number): Promise - identifyGroup(group: Group, timestamp?: string | number): Promise - shutdown(): void + identify?(identity: Identity, timestamp?: string | number): Promise + identifyGroup?(group: Group, timestamp?: string | number): Promise + shutdown?(): void } diff --git a/packages/server/src/events/docUpdates.ts b/packages/server/src/events/docUpdates.ts new file mode 100644 index 0000000000..fcd90f64c4 --- /dev/null +++ b/packages/server/src/events/docUpdates.ts @@ -0,0 +1,42 @@ +import { constants, docUpdates, logging } from "@budibase/backend-core" +import { sdk as proSdk } from "@budibase/pro" +import { DocUpdateEvent } from "@budibase/types" +import { syncUsersToAllApps } from "../sdk/app/applications/sync" + +type UpdateCallback = (docId: string) => void + +function userGroupUpdates(updateCb?: UpdateCallback) { + const types = [constants.DocumentType.USER, constants.DocumentType.GROUP] + const processor = async (update: DocUpdateEvent) => { + try { + const docId = update.id + const isGroup = docId.startsWith(constants.DocumentType.GROUP) + let userIds: string[] + if (isGroup) { + const group = await proSdk.groups.get(docId) + userIds = group.users?.map(user => user._id) || [] + } else { + userIds = [docId] + } + if (userIds.length > 0) { + await syncUsersToAllApps(userIds) + } + if (updateCb) { + updateCb(docId) + } + } catch (err: any) { + // if something not found - no changes to perform + if (err?.status === 404) { + return + } else { + logging.logAlert("Failed to perform user/group app sync", err) + } + } + } + return { types, processor } +} + +export function init(updateCb?: UpdateCallback) { + const processors = [userGroupUpdates(updateCb)] + docUpdates.init(processors) +} diff --git a/packages/server/src/events/index.ts b/packages/server/src/events/index.ts index fad7bcaa9a..23c3f3e512 100644 --- a/packages/server/src/events/index.ts +++ b/packages/server/src/events/index.ts @@ -2,4 +2,5 @@ import BudibaseEmitter from "./BudibaseEmitter" const emitter = new BudibaseEmitter() +export { init } from "./docUpdates" export default emitter diff --git a/packages/server/src/sdk/app/applications/sync.ts b/packages/server/src/sdk/app/applications/sync.ts index 66fd5d2d59..d0a1d78428 100644 --- a/packages/server/src/sdk/app/applications/sync.ts +++ b/packages/server/src/sdk/app/applications/sync.ts @@ -82,7 +82,7 @@ async function syncUsersToApp( }) } -async function syncUsersToAllApps(userIds: string[]) { +export async function syncUsersToAllApps(userIds: string[]) { // list of users, if one has been deleted it will be undefined in array const users = (await getGlobalUsers(userIds, { noProcessing: true, @@ -113,36 +113,6 @@ async function syncUsersToAllApps(userIds: string[]) { } } -export function initUserGroupSync(updateCb?: (docId: string) => void) { - const types = [constants.DocumentType.USER, constants.DocumentType.GROUP] - docUpdates.process(types, async update => { - try { - const docId = update.id - const isGroup = docId.startsWith(constants.DocumentType.GROUP) - let userIds: string[] - if (isGroup) { - const group = await proSdk.groups.get(docId) - userIds = group.users?.map(user => user._id) || [] - } else { - userIds = [docId] - } - if (userIds.length > 0) { - await syncUsersToAllApps(userIds) - } - if (updateCb) { - updateCb(docId) - } - } catch (err: any) { - // if something not found - no changes to perform - if (err?.status === 404) { - return - } else { - logging.logAlert("Failed to perform user/group app sync", err) - } - } - }) -} - export async function syncApp( appId: string, opts?: { automationOnly?: boolean } diff --git a/packages/server/src/sdk/app/applications/tests/sync.spec.ts b/packages/server/src/sdk/app/applications/tests/sync.spec.ts index 90493c541d..3b412a7fa7 100644 --- a/packages/server/src/sdk/app/applications/tests/sync.spec.ts +++ b/packages/server/src/sdk/app/applications/tests/sync.spec.ts @@ -1,6 +1,6 @@ import TestConfiguration from "../../../../tests/utilities/TestConfiguration" import { events, context, roles, constants } from "@budibase/backend-core" -import { initUserGroupSync } from "../sync" +import { init } from "../../../../events" import { rawUserMetadata } from "../../../users/utils" import EventEmitter from "events" import { UserGroup, UserMetadata, UserRoles, User } from "@budibase/types" @@ -35,7 +35,7 @@ function waitForUpdate(opts: { group?: boolean }) { beforeAll(async () => { app = await config.init("syncApp") - initUserGroupSync(updateCb) + init(updateCb) }) async function createUser(email: string, roles: UserRoles, builder?: boolean) { diff --git a/packages/server/src/startup.ts b/packages/server/src/startup.ts index 80cdc3c792..2fd59b7a8e 100644 --- a/packages/server/src/startup.ts +++ b/packages/server/src/startup.ts @@ -10,7 +10,7 @@ import fs from "fs" import { watch } from "./watch" import * as automations from "./automations" import * as fileSystem from "./utilities/fileSystem" -import eventEmitter from "./events" +import { default as eventEmitter, init as eventInit } from "./events" import * as migrations from "./migrations" import * as bullboard from "./automations/bullboard" import * as pro from "@budibase/pro" @@ -63,7 +63,7 @@ export async function startup(app?: any, server?: any) { eventEmitter.emitPort(env.PORT) fileSystem.init() await redis.init() - sdk.applications.initUserGroupSync() + eventInit() // run migrations on startup if not done via http // not recommended in a clustered environment diff --git a/packages/types/src/sdk/events/event.ts b/packages/types/src/sdk/events/event.ts index 92965fa533..0b2ed3ce7c 100644 --- a/packages/types/src/sdk/events/event.ts +++ b/packages/types/src/sdk/events/event.ts @@ -186,7 +186,7 @@ export enum Event { AUDIT_LOGS_DOWNLOADED = "audit_log:downloaded", } -export const DocumentUpdateEvents: Event[] = [ +export const AsyncEvents: Event[] = [ Event.USER_CREATED, Event.USER_UPDATED, Event.USER_DELETED, @@ -399,3 +399,9 @@ export interface BaseEvent { } export type TableExportFormat = "json" | "csv" + +export type DocUpdateEvent = { + id: string + tenantId: string + appId?: string +}