Moving around processors to separate audit logs out of central event handling.
This commit is contained in:
parent
58fab29fb4
commit
8853776c79
|
@ -1,38 +1,12 @@
|
||||||
import {
|
import { Event, AuditedEventFriendlyName } from "@budibase/types"
|
||||||
AuditLogFn,
|
|
||||||
Event,
|
|
||||||
IdentityType,
|
|
||||||
AuditedEventFriendlyName,
|
|
||||||
AuditLogQueueEvent,
|
|
||||||
} from "@budibase/types"
|
|
||||||
import { processors } from "./processors"
|
import { processors } from "./processors"
|
||||||
import identification from "./identification"
|
import identification from "./identification"
|
||||||
import { getAppId } from "../context"
|
|
||||||
import * as backfill from "./backfill"
|
import * as backfill from "./backfill"
|
||||||
import { createQueue, JobQueue } from "../queue"
|
|
||||||
import BullQueue from "bull"
|
|
||||||
|
|
||||||
export function isAudited(event: Event) {
|
export function isAudited(event: Event) {
|
||||||
return !!AuditedEventFriendlyName[event]
|
return !!AuditedEventFriendlyName[event]
|
||||||
}
|
}
|
||||||
|
|
||||||
let auditLogsEnabled = false
|
|
||||||
let auditLogQueue: BullQueue.Queue<AuditLogQueueEvent>
|
|
||||||
|
|
||||||
export const configure = (fn: AuditLogFn) => {
|
|
||||||
auditLogsEnabled = true
|
|
||||||
const writeAuditLogs = fn
|
|
||||||
auditLogQueue = createQueue<AuditLogQueueEvent>(JobQueue.AUDIT_LOG)
|
|
||||||
return auditLogQueue.process(async job => {
|
|
||||||
await writeAuditLogs(job.data.event, job.data.properties, {
|
|
||||||
userId: job.data.opts.userId,
|
|
||||||
timestamp: job.data.opts.timestamp,
|
|
||||||
appId: job.data.opts.appId,
|
|
||||||
hostInfo: job.data.opts.hostInfo,
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
export const publishEvent = async (
|
export const publishEvent = async (
|
||||||
event: Event,
|
event: Event,
|
||||||
properties: any,
|
properties: any,
|
||||||
|
@ -45,22 +19,6 @@ export const publishEvent = async (
|
||||||
// no backfill - send the event and exit
|
// no backfill - send the event and exit
|
||||||
if (!backfilling) {
|
if (!backfilling) {
|
||||||
await processors.processEvent(event, identity, properties, timestamp)
|
await processors.processEvent(event, identity, properties, timestamp)
|
||||||
if (auditLogsEnabled && isAudited(event)) {
|
|
||||||
// only audit log actual events, don't include backfills
|
|
||||||
const userId =
|
|
||||||
identity.type === IdentityType.USER ? identity.id : undefined
|
|
||||||
// add to the event queue, rather than just writing immediately
|
|
||||||
await auditLogQueue.add({
|
|
||||||
event,
|
|
||||||
properties,
|
|
||||||
opts: {
|
|
||||||
userId,
|
|
||||||
timestamp,
|
|
||||||
appId: getAppId(),
|
|
||||||
hostInfo: identity.hostInfo,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,7 @@ export * as processors from "./processors"
|
||||||
export * as analytics from "./analytics"
|
export * as analytics from "./analytics"
|
||||||
export { default as identification } from "./identification"
|
export { default as identification } from "./identification"
|
||||||
export * as backfillCache from "./backfill"
|
export * as backfillCache from "./backfill"
|
||||||
export { configure, isAudited } from "./events"
|
export { isAudited } from "./events"
|
||||||
|
|
||||||
import { processors } from "./processors"
|
import { processors } from "./processors"
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
import {
|
||||||
|
Event,
|
||||||
|
Identity,
|
||||||
|
Group,
|
||||||
|
IdentityType,
|
||||||
|
AuditLogQueueEvent,
|
||||||
|
AuditLogFn,
|
||||||
|
} from "@budibase/types"
|
||||||
|
import { EventProcessor } from "./types"
|
||||||
|
import { getAppId } from "../../context"
|
||||||
|
import { isAudited } from "../events"
|
||||||
|
import BullQueue from "bull"
|
||||||
|
import { createQueue, JobQueue } from "../../queue"
|
||||||
|
|
||||||
|
export default class AuditLogsProcessor implements EventProcessor {
|
||||||
|
static auditLogsEnabled = false
|
||||||
|
static auditLogQueue: BullQueue.Queue<AuditLogQueueEvent>
|
||||||
|
|
||||||
|
// can't use constructor as need to return promise
|
||||||
|
static init(fn: AuditLogFn) {
|
||||||
|
AuditLogsProcessor.auditLogsEnabled = true
|
||||||
|
const writeAuditLogs = fn
|
||||||
|
AuditLogsProcessor.auditLogQueue = createQueue<AuditLogQueueEvent>(
|
||||||
|
JobQueue.AUDIT_LOG
|
||||||
|
)
|
||||||
|
return AuditLogsProcessor.auditLogQueue.process(async job => {
|
||||||
|
await writeAuditLogs(job.data.event, job.data.properties, {
|
||||||
|
userId: job.data.opts.userId,
|
||||||
|
timestamp: job.data.opts.timestamp,
|
||||||
|
appId: job.data.opts.appId,
|
||||||
|
hostInfo: job.data.opts.hostInfo,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async processEvent(
|
||||||
|
event: Event,
|
||||||
|
identity: Identity,
|
||||||
|
properties: any,
|
||||||
|
timestamp?: string
|
||||||
|
): Promise<void> {
|
||||||
|
if (AuditLogsProcessor.auditLogsEnabled && isAudited(event)) {
|
||||||
|
// only audit log actual events, don't include backfills
|
||||||
|
const userId =
|
||||||
|
identity.type === IdentityType.USER ? identity.id : undefined
|
||||||
|
// add to the event queue, rather than just writing immediately
|
||||||
|
await AuditLogsProcessor.auditLogQueue.add({
|
||||||
|
event,
|
||||||
|
properties,
|
||||||
|
opts: {
|
||||||
|
userId,
|
||||||
|
timestamp,
|
||||||
|
appId: getAppId(),
|
||||||
|
hostInfo: identity.hostInfo,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async identify(identity: Identity, timestamp?: string | number) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
async identifyGroup(group: Group, timestamp?: string | number) {
|
||||||
|
// no-op
|
||||||
|
}
|
||||||
|
|
||||||
|
shutdown(): void {
|
||||||
|
AuditLogsProcessor.auditLogQueue.close()
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,8 +1,19 @@
|
||||||
import AnalyticsProcessor from "./AnalyticsProcessor"
|
import AnalyticsProcessor from "./AnalyticsProcessor"
|
||||||
import LoggingProcessor from "./LoggingProcessor"
|
import LoggingProcessor from "./LoggingProcessor"
|
||||||
|
import AuditLogsProcessor from "./AuditLogsProcessor"
|
||||||
import Processors from "./Processors"
|
import Processors from "./Processors"
|
||||||
|
import { AuditLogFn } from "@budibase/types"
|
||||||
|
|
||||||
export const analyticsProcessor = new AnalyticsProcessor()
|
export const analyticsProcessor = new AnalyticsProcessor()
|
||||||
const loggingProcessor = new LoggingProcessor()
|
const loggingProcessor = new LoggingProcessor()
|
||||||
|
const auditLogsProcessor = new AuditLogsProcessor()
|
||||||
|
|
||||||
export const processors = new Processors([analyticsProcessor, loggingProcessor])
|
export function init(auditingFn: AuditLogFn) {
|
||||||
|
return AuditLogsProcessor.init(auditingFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const processors = new Processors([
|
||||||
|
analyticsProcessor,
|
||||||
|
loggingProcessor,
|
||||||
|
auditLogsProcessor,
|
||||||
|
])
|
||||||
|
|
|
@ -127,7 +127,7 @@ export async function startup(app?: any, server?: any) {
|
||||||
let queuePromises = []
|
let queuePromises = []
|
||||||
// configure events to use the pro audit log write
|
// configure events to use the pro audit log write
|
||||||
// can't integrate directly into backend-core due to cyclic issues
|
// can't integrate directly into backend-core due to cyclic issues
|
||||||
queuePromises.push(events.configure(proSdk.auditLogs.write))
|
queuePromises.push(events.processors.init(proSdk.auditLogs.write))
|
||||||
queuePromises.push(automations.init())
|
queuePromises.push(automations.init())
|
||||||
queuePromises.push(initPro())
|
queuePromises.push(initPro())
|
||||||
if (app) {
|
if (app) {
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
import { Event, HostInfo } from "./events"
|
import { Event, HostInfo } from "./events"
|
||||||
|
import { AuditLogDoc } from "../documents"
|
||||||
|
|
||||||
export type AuditWriteOpts = {
|
export type AuditWriteOpts = {
|
||||||
appId?: string
|
appId?: string
|
||||||
|
@ -11,7 +12,7 @@ export type AuditLogFn = (
|
||||||
event: Event,
|
event: Event,
|
||||||
metadata: any,
|
metadata: any,
|
||||||
opts: AuditWriteOpts
|
opts: AuditWriteOpts
|
||||||
) => Promise<void>
|
) => Promise<AuditLogDoc | undefined>
|
||||||
|
|
||||||
export type AuditLogQueueEvent = {
|
export type AuditLogQueueEvent = {
|
||||||
event: Event
|
event: Event
|
||||||
|
|
|
@ -14,7 +14,7 @@ import Application from "koa"
|
||||||
import { bootstrap } from "global-agent"
|
import { bootstrap } from "global-agent"
|
||||||
import * as db from "./db"
|
import * as db from "./db"
|
||||||
import { auth, logging, events, middleware } from "@budibase/backend-core"
|
import { auth, logging, events, middleware } from "@budibase/backend-core"
|
||||||
import { sdk } from "@budibase/pro"
|
import { sdk as proSdk, sdk } from "@budibase/pro"
|
||||||
db.init()
|
db.init()
|
||||||
import Koa from "koa"
|
import Koa from "koa"
|
||||||
import koaBody from "koa-body"
|
import koaBody from "koa-body"
|
||||||
|
@ -30,7 +30,7 @@ import destroyable from "server-destroy"
|
||||||
|
|
||||||
// configure events to use the pro audit log write
|
// configure events to use the pro audit log write
|
||||||
// can't integrate directly into backend-core due to cyclic issues
|
// can't integrate directly into backend-core due to cyclic issues
|
||||||
events.configure(sdk.auditLogs.write)
|
events.processors.init(proSdk.auditLogs.write)
|
||||||
|
|
||||||
if (env.ENABLE_SSO_MAINTENANCE_MODE) {
|
if (env.ENABLE_SSO_MAINTENANCE_MODE) {
|
||||||
console.warn(
|
console.warn(
|
||||||
|
|
Loading…
Reference in New Issue