Main body of PR comments, switching event handling to be a publisher of async events, a generic event; then adding a new async document update processor, which can later be converted to be part of the event processing pipeline.

This commit is contained in:
Michael Drury 2023-04-15 00:37:22 +01:00
parent 0ba5887d9c
commit ef5bcc4b66
19 changed files with 245 additions and 151 deletions

View File

@ -1 +1,29 @@
export * from "./updates"
import { asyncEventQueue, init as initQueue } from "../events/asyncEvents"
import {
ProcessorMap,
default as DocumentUpdateProcessor,
} from "../events/processors/async/DocumentUpdateProcessor"
let processingPromise: Promise<void>
let documentProcessor: DocumentUpdateProcessor
export function init(processors: ProcessorMap) {
if (!asyncEventQueue) {
initQueue()
}
if (!documentProcessor) {
documentProcessor = new DocumentUpdateProcessor(processors)
}
// if not processing in this instance, kick it off
if (!processingPromise) {
processingPromise = asyncEventQueue.process(async job => {
const { event, identity, properties, timestamp } = job.data
await documentProcessor.processEvent(
event,
identity,
properties,
timestamp
)
})
}
}

View File

@ -1,62 +0,0 @@
import { createQueue, JobQueue } from "../queue"
import BullQueue from "bull"
import { DocumentType, SEPARATOR } from "../constants"
import { doInContext, doInTenant } from "../context"
type DocUpdateEvent = {
id: string
tenantId: string
appId?: string
}
type Processor = (update: DocUpdateEvent) => Promise<void>
const processors: { types: DocumentType[]; processor: Processor }[] = []
let queue: BullQueue.Queue
let processingPromise: Promise<void>
export function init() {
queue = createQueue<DocUpdateEvent>(JobQueue.DOC_UPDATE)
}
export async function shutdown() {
if (queue) {
await queue.close()
}
}
export async function update(opts: DocUpdateEvent) {
if (!queue) {
init()
}
await queue.add(opts)
}
async function handleJob(data: DocUpdateEvent) {
for (let { types, processor } of processors) {
if (types.find(type => data.id.startsWith(`${type}${SEPARATOR}`))) {
const context = data.appId || data.tenantId
const contextFn = data.appId ? doInContext : doInTenant
await contextFn(context, async () => {
await processor(data)
})
}
}
}
export async function process(types: DocumentType[], processor: Processor) {
if (!queue) {
init()
}
// add to processor list
processors.push({
types,
processor,
})
// if not processing in this instance, kick it off
if (!processingPromise) {
processingPromise = queue.process(async job => {
await handleJob(job.data)
})
}
}

View File

@ -0,0 +1,2 @@
export * from "./queue"
export * from "./publisher"

View File

@ -0,0 +1,12 @@
import { AsyncEvents } from "@budibase/types"
import { EventPayload, asyncEventQueue, init } from "./queue"
export async function publishAsyncEvent(payload: EventPayload) {
if (!asyncEventQueue) {
init()
}
const { event, identity } = payload
if (AsyncEvents.indexOf(event) !== -1 && identity.tenantId) {
await asyncEventQueue.add(payload)
}
}

View File

@ -0,0 +1,22 @@
import BullQueue from "bull"
import { createQueue, JobQueue } from "../../queue"
import { Event, Identity } from "@budibase/types"
export interface EventPayload {
event: Event
identity: Identity
properties: any
timestamp?: string | number
}
export let asyncEventQueue: BullQueue.Queue
export function init() {
asyncEventQueue = createQueue<EventPayload>(JobQueue.DOC_UPDATE)
}
export async function shutdown() {
if (asyncEventQueue) {
await asyncEventQueue.close()
}
}

View File

@ -0,0 +1,56 @@
import {
Event,
UserCreatedEvent,
UserUpdatedEvent,
UserDeletedEvent,
UserPermissionAssignedEvent,
UserPermissionRemovedEvent,
GroupCreatedEvent,
GroupUpdatedEvent,
GroupDeletedEvent,
GroupUsersAddedEvent,
GroupUsersDeletedEvent,
GroupPermissionsEditedEvent,
} from "@budibase/types"
const getEventProperties: Record<
string,
(properties: any) => string | undefined
> = {
[Event.USER_CREATED]: (properties: UserCreatedEvent) => properties.userId,
[Event.USER_UPDATED]: (properties: UserUpdatedEvent) => properties.userId,
[Event.USER_DELETED]: (properties: UserDeletedEvent) => properties.userId,
[Event.USER_PERMISSION_ADMIN_ASSIGNED]: (
properties: UserPermissionAssignedEvent
) => properties.userId,
[Event.USER_PERMISSION_ADMIN_REMOVED]: (
properties: UserPermissionRemovedEvent
) => properties.userId,
[Event.USER_PERMISSION_BUILDER_ASSIGNED]: (
properties: UserPermissionAssignedEvent
) => properties.userId,
[Event.USER_PERMISSION_BUILDER_REMOVED]: (
properties: UserPermissionRemovedEvent
) => properties.userId,
[Event.USER_GROUP_CREATED]: (properties: GroupCreatedEvent) =>
properties.groupId,
[Event.USER_GROUP_UPDATED]: (properties: GroupUpdatedEvent) =>
properties.groupId,
[Event.USER_GROUP_DELETED]: (properties: GroupDeletedEvent) =>
properties.groupId,
[Event.USER_GROUP_USERS_ADDED]: (properties: GroupUsersAddedEvent) =>
properties.groupId,
[Event.USER_GROUP_USERS_REMOVED]: (properties: GroupUsersDeletedEvent) =>
properties.groupId,
[Event.USER_GROUP_PERMISSIONS_EDITED]: (
properties: GroupPermissionsEditedEvent
) => properties.groupId,
}
export function getDocumentId(event: Event, properties: any) {
const extractor = getEventProperties[event]
if (!extractor) {
throw new Error("Event does not have a method of document ID extraction")
}
return extractor(properties)
}

View File

@ -1,7 +1,8 @@
import { Event, AuditedEventFriendlyName } from "@budibase/types"
import { Event } from "@budibase/types"
import { processors } from "./processors"
import identification from "./identification"
import * as backfill from "./backfill"
import { publishAsyncEvent } from "./asyncEvents"
export const publishEvent = async (
event: Event,
@ -12,6 +13,15 @@ export const publishEvent = async (
const identity = await identification.getCurrentIdentity()
const backfilling = await backfill.isBackfillingEvent(event)
// send off async events if required
await publishAsyncEvent({
event,
identity,
properties,
timestamp,
})
// no backfill - send the event and exit
if (!backfilling) {
await processors.processEvent(event, identity, properties, timestamp)

View File

@ -6,6 +6,8 @@ export * as backfillCache from "./backfill"
import { processors } from "./processors"
export function initAsyncEvents() {}
export const shutdown = () => {
processors.shutdown()
console.log("Events shutdown")

View File

@ -1,42 +0,0 @@
import { Event, Identity, Group, DocumentUpdateEvents } from "@budibase/types"
import { EventProcessor } from "./types"
import * as docUpdates from "../../docUpdates"
import { getTenantId } from "../../context"
export default class DocumentUpdateProcessor implements EventProcessor {
async processEvent(
event: Event,
identity: Identity,
properties: any,
timestamp?: string
): Promise<void> {
// only user and group IDs supported right now - no app documents yet
if (DocumentUpdateEvents.indexOf(event) !== -1 && identity.tenantId) {
await docUpdates.update({
id: this.getId(properties),
tenantId: getTenantId(),
})
}
}
getId(properties: any) {
let possibleProps = ["groupId", "userId"]
for (let prop of possibleProps) {
if (properties[prop]) {
return properties[prop]
}
}
}
async identify(identity: Identity, timestamp?: string | number) {
// no-op
}
async identifyGroup(group: Group, timestamp?: string | number) {
// no-op
}
shutdown(): void {
docUpdates.shutdown()
}
}

View File

@ -25,7 +25,9 @@ export default class Processor implements EventProcessor {
timestamp?: string | number
): Promise<void> {
for (const eventProcessor of this.processors) {
await eventProcessor.identify(identity, timestamp)
if (eventProcessor.identify) {
await eventProcessor.identify(identity, timestamp)
}
}
}
@ -34,13 +36,17 @@ export default class Processor implements EventProcessor {
timestamp?: string | number
): Promise<void> {
for (const eventProcessor of this.processors) {
await eventProcessor.identifyGroup(identity, timestamp)
if (eventProcessor.identifyGroup) {
await eventProcessor.identifyGroup(identity, timestamp)
}
}
}
shutdown() {
for (const eventProcessor of this.processors) {
eventProcessor.shutdown()
if (eventProcessor.shutdown) {
eventProcessor.shutdown()
}
}
}
}

View File

@ -0,0 +1,44 @@
import { EventProcessor } from "../types"
import { Event, Identity, DocUpdateEvent } from "@budibase/types"
import { DocumentType, SEPARATOR } from "../../../constants"
import { doInTenant } from "../../../context"
import { getDocumentId } from "../../documentId"
import { shutdown } from "../../asyncEvents"
export type Processor = (update: DocUpdateEvent) => Promise<void>
export type ProcessorMap = { types: DocumentType[]; processor: Processor }[]
export default class DocumentUpdateProcessor implements EventProcessor {
processors: ProcessorMap = []
constructor(processors: ProcessorMap) {
this.processors = processors
}
async processEvent(
event: Event,
identity: Identity,
properties: any,
timestamp?: string | number
) {
const tenantId = identity.tenantId
const docId = getDocumentId(event, properties)
if (!tenantId || !docId) {
return
}
for (let { types, processor } of this.processors) {
if (types.find(type => docId.startsWith(`${type}${SEPARATOR}`))) {
await doInTenant(tenantId, async () => {
await processor({
id: docId,
tenantId,
})
})
}
}
}
shutdown() {
return shutdown()
}
}

View File

@ -1,14 +1,12 @@
import AnalyticsProcessor from "./AnalyticsProcessor"
import LoggingProcessor from "./LoggingProcessor"
import AuditLogsProcessor from "./AuditLogsProcessor"
import DocumentUpdateProcessor from "./DocumentUpdateProcessor"
import Processors from "./Processors"
import { AuditLogFn } from "@budibase/types"
export const analyticsProcessor = new AnalyticsProcessor()
const loggingProcessor = new LoggingProcessor()
const auditLogsProcessor = new AuditLogsProcessor()
const documentUpdateProcessor = new DocumentUpdateProcessor()
export function init(auditingFn: AuditLogFn) {
return AuditLogsProcessor.init(auditingFn)
@ -18,5 +16,4 @@ export const processors = new Processors([
analyticsProcessor,
loggingProcessor,
auditLogsProcessor,
documentUpdateProcessor,
])

View File

@ -12,7 +12,7 @@ export interface EventProcessor {
properties: any,
timestamp?: string | number
): Promise<void>
identify(identity: Identity, timestamp?: string | number): Promise<void>
identifyGroup(group: Group, timestamp?: string | number): Promise<void>
shutdown(): void
identify?(identity: Identity, timestamp?: string | number): Promise<void>
identifyGroup?(group: Group, timestamp?: string | number): Promise<void>
shutdown?(): void
}

View File

@ -0,0 +1,42 @@
import { constants, docUpdates, logging } from "@budibase/backend-core"
import { sdk as proSdk } from "@budibase/pro"
import { DocUpdateEvent } from "@budibase/types"
import { syncUsersToAllApps } from "../sdk/app/applications/sync"
type UpdateCallback = (docId: string) => void
function userGroupUpdates(updateCb?: UpdateCallback) {
const types = [constants.DocumentType.USER, constants.DocumentType.GROUP]
const processor = async (update: DocUpdateEvent) => {
try {
const docId = update.id
const isGroup = docId.startsWith(constants.DocumentType.GROUP)
let userIds: string[]
if (isGroup) {
const group = await proSdk.groups.get(docId)
userIds = group.users?.map(user => user._id) || []
} else {
userIds = [docId]
}
if (userIds.length > 0) {
await syncUsersToAllApps(userIds)
}
if (updateCb) {
updateCb(docId)
}
} catch (err: any) {
// if something not found - no changes to perform
if (err?.status === 404) {
return
} else {
logging.logAlert("Failed to perform user/group app sync", err)
}
}
}
return { types, processor }
}
export function init(updateCb?: UpdateCallback) {
const processors = [userGroupUpdates(updateCb)]
docUpdates.init(processors)
}

View File

@ -2,4 +2,5 @@ import BudibaseEmitter from "./BudibaseEmitter"
const emitter = new BudibaseEmitter()
export { init } from "./docUpdates"
export default emitter

View File

@ -82,7 +82,7 @@ async function syncUsersToApp(
})
}
async function syncUsersToAllApps(userIds: string[]) {
export async function syncUsersToAllApps(userIds: string[]) {
// list of users, if one has been deleted it will be undefined in array
const users = (await getGlobalUsers(userIds, {
noProcessing: true,
@ -113,36 +113,6 @@ async function syncUsersToAllApps(userIds: string[]) {
}
}
export function initUserGroupSync(updateCb?: (docId: string) => void) {
const types = [constants.DocumentType.USER, constants.DocumentType.GROUP]
docUpdates.process(types, async update => {
try {
const docId = update.id
const isGroup = docId.startsWith(constants.DocumentType.GROUP)
let userIds: string[]
if (isGroup) {
const group = await proSdk.groups.get(docId)
userIds = group.users?.map(user => user._id) || []
} else {
userIds = [docId]
}
if (userIds.length > 0) {
await syncUsersToAllApps(userIds)
}
if (updateCb) {
updateCb(docId)
}
} catch (err: any) {
// if something not found - no changes to perform
if (err?.status === 404) {
return
} else {
logging.logAlert("Failed to perform user/group app sync", err)
}
}
})
}
export async function syncApp(
appId: string,
opts?: { automationOnly?: boolean }

View File

@ -1,6 +1,6 @@
import TestConfiguration from "../../../../tests/utilities/TestConfiguration"
import { events, context, roles, constants } from "@budibase/backend-core"
import { initUserGroupSync } from "../sync"
import { init } from "../../../../events"
import { rawUserMetadata } from "../../../users/utils"
import EventEmitter from "events"
import { UserGroup, UserMetadata, UserRoles, User } from "@budibase/types"
@ -35,7 +35,7 @@ function waitForUpdate(opts: { group?: boolean }) {
beforeAll(async () => {
app = await config.init("syncApp")
initUserGroupSync(updateCb)
init(updateCb)
})
async function createUser(email: string, roles: UserRoles, builder?: boolean) {

View File

@ -10,7 +10,7 @@ import fs from "fs"
import { watch } from "./watch"
import * as automations from "./automations"
import * as fileSystem from "./utilities/fileSystem"
import eventEmitter from "./events"
import { default as eventEmitter, init as eventInit } from "./events"
import * as migrations from "./migrations"
import * as bullboard from "./automations/bullboard"
import * as pro from "@budibase/pro"
@ -63,7 +63,7 @@ export async function startup(app?: any, server?: any) {
eventEmitter.emitPort(env.PORT)
fileSystem.init()
await redis.init()
sdk.applications.initUserGroupSync()
eventInit()
// run migrations on startup if not done via http
// not recommended in a clustered environment

View File

@ -186,7 +186,7 @@ export enum Event {
AUDIT_LOGS_DOWNLOADED = "audit_log:downloaded",
}
export const DocumentUpdateEvents: Event[] = [
export const AsyncEvents: Event[] = [
Event.USER_CREATED,
Event.USER_UPDATED,
Event.USER_DELETED,
@ -399,3 +399,9 @@ export interface BaseEvent {
}
export type TableExportFormat = "json" | "csv"
export type DocUpdateEvent = {
id: string
tenantId: string
appId?: string
}