Further PR comments.
This commit is contained in:
parent
d506ef52f8
commit
453d5cc0e6
|
@ -12,7 +12,7 @@ export interface EventPayload {
|
|||
export let asyncEventQueue: BullQueue.Queue
|
||||
|
||||
export function init() {
|
||||
asyncEventQueue = createQueue<EventPayload>(JobQueue.DOC_UPDATE)
|
||||
asyncEventQueue = createQueue<EventPayload>(JobQueue.SYSTEM_EVENT_QUEUE)
|
||||
}
|
||||
|
||||
export async function shutdown() {
|
||||
|
|
|
@ -13,17 +13,16 @@ export const publishEvent = async (
|
|||
const identity = await identification.getCurrentIdentity()
|
||||
|
||||
const backfilling = await backfill.isBackfillingEvent(event)
|
||||
|
||||
// send off async events if required
|
||||
await publishAsyncEvent({
|
||||
event,
|
||||
identity,
|
||||
properties,
|
||||
timestamp,
|
||||
})
|
||||
|
||||
// no backfill - send the event and exit
|
||||
if (!backfilling) {
|
||||
// send off async events if required
|
||||
await publishAsyncEvent({
|
||||
event,
|
||||
identity,
|
||||
properties,
|
||||
timestamp,
|
||||
})
|
||||
// now handle the main sync event processing pipeline
|
||||
await processors.processEvent(event, identity, properties, timestamp)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,12 +1,11 @@
|
|||
import { EventProcessor } from "../types"
|
||||
import { Event, Identity, DocUpdateEvent } from "@budibase/types"
|
||||
import { DocumentType, SEPARATOR } from "../../../constants"
|
||||
import { doInTenant } from "../../../context"
|
||||
import { getDocumentId } from "../../documentId"
|
||||
import { shutdown } from "../../asyncEvents"
|
||||
|
||||
export type Processor = (update: DocUpdateEvent) => Promise<void>
|
||||
export type ProcessorMap = { types: DocumentType[]; processor: Processor }[]
|
||||
export type ProcessorMap = { events: Event[]; processor: Processor }[]
|
||||
|
||||
export default class DocumentUpdateProcessor implements EventProcessor {
|
||||
processors: ProcessorMap = []
|
||||
|
@ -26,8 +25,8 @@ export default class DocumentUpdateProcessor implements EventProcessor {
|
|||
if (!tenantId || !docId) {
|
||||
return
|
||||
}
|
||||
for (let { types, processor } of this.processors) {
|
||||
if (types.find(type => docId.startsWith(`${type}${SEPARATOR}`))) {
|
||||
for (let { events, processor } of this.processors) {
|
||||
if (events.includes(event)) {
|
||||
await doInTenant(tenantId, async () => {
|
||||
await processor({
|
||||
id: docId,
|
||||
|
|
|
@ -1,18 +1 @@
|
|||
import { Event, Identity, Group } from "@budibase/types"
|
||||
|
||||
export enum EventProcessorType {
|
||||
POSTHOG = "posthog",
|
||||
LOGGING = "logging",
|
||||
}
|
||||
|
||||
export interface EventProcessor {
|
||||
processEvent(
|
||||
event: Event,
|
||||
identity: Identity,
|
||||
properties: any,
|
||||
timestamp?: string | number
|
||||
): Promise<void>
|
||||
identify?(identity: Identity, timestamp?: string | number): Promise<void>
|
||||
identifyGroup?(group: Group, timestamp?: string | number): Promise<void>
|
||||
shutdown?(): void
|
||||
}
|
||||
export { EventProcessor } from "@budibase/types"
|
||||
|
|
|
@ -2,5 +2,5 @@ export enum JobQueue {
|
|||
AUTOMATION = "automationQueue",
|
||||
APP_BACKUP = "appBackupQueue",
|
||||
AUDIT_LOG = "auditLogQueue",
|
||||
DOC_UPDATE = "docUpdateQueue",
|
||||
SYSTEM_EVENT_QUEUE = "systemEventQueue",
|
||||
}
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
export * from "./processors"
|
|
@ -0,0 +1,9 @@
|
|||
import userGroupProcessor from "./syncUsers"
|
||||
import { docUpdates } from "@budibase/backend-core"
|
||||
|
||||
export type UpdateCallback = (docId: string) => void
|
||||
|
||||
export function init(updateCb?: UpdateCallback) {
|
||||
const processors = [userGroupProcessor(updateCb)]
|
||||
docUpdates.init(processors)
|
||||
}
|
|
@ -1,12 +1,10 @@
|
|||
import { constants, docUpdates, logging } from "@budibase/backend-core"
|
||||
import { constants, logging } from "@budibase/backend-core"
|
||||
import { sdk as proSdk } from "@budibase/pro"
|
||||
import { DocUpdateEvent } from "@budibase/types"
|
||||
import { syncUsersToAllApps } from "../sdk/app/applications/sync"
|
||||
import { DocUpdateEvent, UserGroupSyncEvents } from "@budibase/types"
|
||||
import { syncUsersToAllApps } from "../../sdk/app/applications/sync"
|
||||
import { UpdateCallback } from "./processors"
|
||||
|
||||
type UpdateCallback = (docId: string) => void
|
||||
|
||||
function userGroupUpdates(updateCb?: UpdateCallback) {
|
||||
const types = [constants.DocumentType.USER, constants.DocumentType.GROUP]
|
||||
export default function process(updateCb?: UpdateCallback) {
|
||||
const processor = async (update: DocUpdateEvent) => {
|
||||
try {
|
||||
const docId = update.id
|
||||
|
@ -33,10 +31,5 @@ function userGroupUpdates(updateCb?: UpdateCallback) {
|
|||
}
|
||||
}
|
||||
}
|
||||
return { types, processor }
|
||||
}
|
||||
|
||||
export function init(updateCb?: UpdateCallback) {
|
||||
const processors = [userGroupUpdates(updateCb)]
|
||||
docUpdates.init(processors)
|
||||
return { events: UserGroupSyncEvents, processor }
|
||||
}
|
|
@ -1,4 +1,5 @@
|
|||
import { Hosting } from "../hosting"
|
||||
import { Group, Identity } from "./identification"
|
||||
|
||||
export enum Event {
|
||||
// USER
|
||||
|
@ -186,7 +187,7 @@ export enum Event {
|
|||
AUDIT_LOGS_DOWNLOADED = "audit_log:downloaded",
|
||||
}
|
||||
|
||||
export const AsyncEvents: Event[] = [
|
||||
export const UserGroupSyncEvents: Event[] = [
|
||||
Event.USER_CREATED,
|
||||
Event.USER_UPDATED,
|
||||
Event.USER_DELETED,
|
||||
|
@ -202,6 +203,8 @@ export const AsyncEvents: Event[] = [
|
|||
Event.USER_GROUP_PERMISSIONS_EDITED,
|
||||
]
|
||||
|
||||
export const AsyncEvents: Event[] = [...UserGroupSyncEvents]
|
||||
|
||||
// all events that are not audited have been added to this record as undefined, this means
|
||||
// that Typescript can protect us against new events being added and auditing of those
|
||||
// events not being considered. This might be a little ugly, but provides a level of
|
||||
|
@ -405,3 +408,15 @@ export type DocUpdateEvent = {
|
|||
tenantId: string
|
||||
appId?: string
|
||||
}
|
||||
|
||||
export interface EventProcessor {
|
||||
processEvent(
|
||||
event: Event,
|
||||
identity: Identity,
|
||||
properties: any,
|
||||
timestamp?: string | number
|
||||
): Promise<void>
|
||||
identify?(identity: Identity, timestamp?: string | number): Promise<void>
|
||||
identifyGroup?(group: Group, timestamp?: string | number): Promise<void>
|
||||
shutdown?(): void
|
||||
}
|
||||
|
|
12
yarn.lock
12
yarn.lock
|
@ -1486,15 +1486,15 @@
|
|||
pouchdb-promise "^6.0.4"
|
||||
through2 "^2.0.0"
|
||||
|
||||
"@budibase/pro@2.5.5-alpha.0":
|
||||
version "2.5.5-alpha.0"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-2.5.5-alpha.0.tgz#28b075a96efb564328a4972cae9ea6c9a5f3aabc"
|
||||
integrity sha512-98fLnvHWVy7ASEFC98bo6Qdd55SjC7yrJNuf7FUYZbeFwpmwwRxlWnWFTa0ctKWB5p2LToARWBns3TqgnUr/zQ==
|
||||
"@budibase/pro@2.5.5-alpha.1":
|
||||
version "2.5.5-alpha.1"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-2.5.5-alpha.1.tgz#6d9476ce724bd3e405cb2e0198c4b83b168e65bc"
|
||||
integrity sha512-fEuropk/0aH1+lELX6wdFa8UUpE+SMtlvBbsgNR2ulvLgLLPEYhI2gINlhiItMWWgTtxNtele3hOs1VWhn0o2A==
|
||||
dependencies:
|
||||
"@budibase/backend-core" "2.5.5-alpha.0"
|
||||
"@budibase/backend-core" "2.5.5-alpha.1"
|
||||
"@budibase/shared-core" "2.4.44-alpha.1"
|
||||
"@budibase/string-templates" "2.4.44-alpha.1"
|
||||
"@budibase/types" "2.5.5-alpha.0"
|
||||
"@budibase/types" "2.5.5-alpha.1"
|
||||
"@koa/router" "8.0.8"
|
||||
bull "4.10.1"
|
||||
joi "17.6.0"
|
||||
|
|
Loading…
Reference in New Issue