diff --git a/packages/backend-core/package.json b/packages/backend-core/package.json index 8da06718ab..7510655ed7 100644 --- a/packages/backend-core/package.json +++ b/packages/backend-core/package.json @@ -56,6 +56,7 @@ "koa-pino-logger": "4.0.0", "lodash": "4.17.21", "node-fetch": "2.6.7", + "object-sizeof": "2.6.1", "passport-google-oauth": "2.0.0", "passport-local": "1.0.0", "passport-oauth2-refresh": "^2.1.0", diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index c73fd0a46a..6f2f3cbc97 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -1,6 +1,6 @@ import { AnyDocument, Database, Document, DocumentType } from "@budibase/types" -import { JobQueue, Queue, createQueue } from "../queue" +import { BudibaseQueue, JobQueue } from "../queue" import * as dbUtils from "../db" interface ProcessDocMessage { @@ -13,11 +13,11 @@ const PERSIST_MAX_ATTEMPTS = 100 let processor: DocWritethroughProcessor | undefined export class DocWritethroughProcessor { - private static _queue: Queue + private static _queue: BudibaseQueue public static get queue() { if (!DocWritethroughProcessor._queue) { - DocWritethroughProcessor._queue = createQueue( + DocWritethroughProcessor._queue = new BudibaseQueue( JobQueue.DOC_WRITETHROUGH_QUEUE, { jobOptions: { diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index b72651e21f..01944e68a8 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -18,7 +18,9 @@ import { const initialTime = Date.now() async function waitForQueueCompletion() { - await utils.queue.processMessages(DocWritethroughProcessor.queue) + await utils.queue.processMessages( + DocWritethroughProcessor.queue.getBullQueue() + ) } beforeAll(() => utils.queue.useRealQueues()) diff --git a/packages/backend-core/src/events/asyncEvents/queue.ts b/packages/backend-core/src/events/asyncEvents/queue.ts index 196fd359b3..a393918625 100644 --- a/packages/backend-core/src/events/asyncEvents/queue.ts +++ b/packages/backend-core/src/events/asyncEvents/queue.ts @@ -1,5 +1,4 @@ -import BullQueue from "bull" -import { createQueue, JobQueue } from "../../queue" +import { BudibaseQueue, JobQueue } from "../../queue" import { Event, Identity } from "@budibase/types" export interface EventPayload { @@ -9,10 +8,19 @@ export interface EventPayload { timestamp?: string | number } -export let asyncEventQueue: BullQueue.Queue +export let asyncEventQueue: BudibaseQueue export function init() { - asyncEventQueue = createQueue(JobQueue.SYSTEM_EVENT_QUEUE) + asyncEventQueue = new BudibaseQueue( + JobQueue.SYSTEM_EVENT_QUEUE, + { + jobTags: (event: EventPayload) => { + return { + "event.name": event.event, + } + }, + } + ) } export async function shutdown() { diff --git a/packages/backend-core/src/events/processors/AuditLogsProcessor.ts b/packages/backend-core/src/events/processors/AuditLogsProcessor.ts index 3dd2ab9d10..9a8d7d3cc7 100644 --- a/packages/backend-core/src/events/processors/AuditLogsProcessor.ts +++ b/packages/backend-core/src/events/processors/AuditLogsProcessor.ts @@ -8,24 +8,30 @@ import { } from "@budibase/types" import { EventProcessor } from "./types" import { getAppId, doInTenant, getTenantId } from "../../context" -import BullQueue from "bull" -import { createQueue, JobQueue } from "../../queue" +import { BudibaseQueue, JobQueue } from "../../queue" import { isAudited } from "../../utils" import env from "../../environment" export default class AuditLogsProcessor implements EventProcessor { static auditLogsEnabled = false - static auditLogQueue: BullQueue.Queue + static auditLogQueue: BudibaseQueue // can't use constructor as need to return promise static init(fn: AuditLogFn) { AuditLogsProcessor.auditLogsEnabled = true const writeAuditLogs = fn - AuditLogsProcessor.auditLogQueue = createQueue( - JobQueue.AUDIT_LOG + AuditLogsProcessor.auditLogQueue = new BudibaseQueue( + JobQueue.AUDIT_LOG, + { + jobTags: (event: AuditLogQueueEvent) => { + return { + "event.name": event.event, + } + }, + } ) return AuditLogsProcessor.auditLogQueue.process(async job => { - return doInTenant(job.data.tenantId, async () => { + await doInTenant(job.data.tenantId, async () => { let properties = job.data.properties if (properties.audited) { properties = { diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index ebab5c7392..6afbf6c4cf 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -96,12 +96,17 @@ export class InMemoryQueue implements Partial> { let resp = func(message) - async function retryFunc(fnc: any) { + async function retryFunc(fnc: any, attempt = 0) { try { await fnc } catch (e: any) { - await helpers.wait(50) - await retryFunc(func(message)) + attempt++ + if (attempt < 3) { + await helpers.wait(100 * attempt) + await retryFunc(func(message), attempt) + } else { + throw e + } } } diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 4e1086823b..5365080ed1 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -2,10 +2,12 @@ import env from "../environment" import { getRedisOptions } from "../redis/utils" import { JobQueue } from "./constants" import InMemoryQueue from "./inMemoryQueue" -import BullQueue, { QueueOptions, JobOptions } from "bull" +import BullQueue, { Queue, QueueOptions, JobOptions, Job } from "bull" import { addListeners, StalledFn } from "./listeners" import { Duration } from "../utils" import * as timers from "../timers" +import tracer from "dd-trace" +import sizeof from "object-sizeof" export type { QueueOptions, Queue, JobOptions } from "bull" @@ -15,7 +17,7 @@ const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs() // cleanup the queue every 60 seconds const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs() -let QUEUES: BullQueue.Queue[] = [] +let QUEUES: Queue[] = [] let cleanupInterval: NodeJS.Timeout async function cleanup() { @@ -25,49 +27,205 @@ async function cleanup() { } } -export function createQueue( - jobQueue: JobQueue, - opts: { - removeStalledCb?: StalledFn - maxStalledCount?: number - jobOptions?: JobOptions - } = {} -): BullQueue.Queue { - const redisOpts = getRedisOptions() - const queueConfig: QueueOptions = { - redis: redisOpts, - settings: { - maxStalledCount: opts.maxStalledCount ? opts.maxStalledCount : 0, - lockDuration: QUEUE_LOCK_MS, - lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, - }, +async function withMetrics( + name: string, + cb: () => Promise, + tags?: Record +): Promise { + const start = performance.now() + try { + const result = await cb() + tracer.dogstatsd.increment(`${name}.success`, 1, tags) + return result + } catch (err) { + tracer.dogstatsd.increment(`${name}.error`, 1, tags) + throw err + } finally { + const durationMs = performance.now() - start + tracer.dogstatsd.distribution(`${name}.duration.ms`, durationMs, tags) + tracer.dogstatsd.increment(name, 1, tags) } - if (opts.jobOptions) { - queueConfig.defaultJobOptions = opts.jobOptions +} + +function jobOptsTags(opts: JobOptions) { + return { + "job.opts.attempts": opts.attempts, + "job.opts.backoff": opts.backoff, + "job.opts.delay": opts.delay, + "job.opts.jobId": opts.jobId, + "job.opts.lifo": opts.lifo, + "job.opts.preventParsingData": opts.preventParsingData, + "job.opts.priority": opts.priority, + "job.opts.removeOnComplete": opts.removeOnComplete, + "job.opts.removeOnFail": opts.removeOnFail, + "job.opts.repeat": opts.repeat, + "job.opts.stackTraceLimit": opts.stackTraceLimit, + "job.opts.timeout": opts.timeout, } - let queue: BullQueue.Queue - if (!env.isTest()) { - queue = new BullQueue(jobQueue, queueConfig) - } else if ( - process.env.BULL_TEST_REDIS_PORT && - !isNaN(+process.env.BULL_TEST_REDIS_PORT) - ) { - queue = new BullQueue(jobQueue, { - redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT }, - }) - } else { - queue = new InMemoryQueue(jobQueue, queueConfig) as any +} + +function jobTags(job: Job) { + return { + "job.id": job.id, + "job.attemptsMade": job.attemptsMade, + "job.timestamp": job.timestamp, + "job.data.sizeBytes": sizeof(job.data), + ...jobOptsTags(job.opts || {}), } - addListeners(queue, jobQueue, opts?.removeStalledCb) - QUEUES.push(queue) - if (!cleanupInterval && !env.isTest()) { - cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS) - // fire off an initial cleanup - cleanup().catch(err => { - console.error(`Unable to cleanup ${jobQueue} initially - ${err}`) +} + +export interface BudibaseQueueOpts { + removeStalledCb?: StalledFn + maxStalledCount?: number + jobOptions?: JobOptions + jobTags?: (job: T) => Record +} + +export class BudibaseQueue { + private queue: Queue + private opts: BudibaseQueueOpts + private jobQueue: JobQueue + + constructor(jobQueue: JobQueue, opts: BudibaseQueueOpts = {}) { + this.opts = opts + this.jobQueue = jobQueue + this.queue = this.initQueue() + } + + private initQueue() { + const redisOpts = getRedisOptions() + const queueConfig: QueueOptions = { + redis: redisOpts, + settings: { + maxStalledCount: this.opts.maxStalledCount + ? this.opts.maxStalledCount + : 0, + lockDuration: QUEUE_LOCK_MS, + lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, + }, + } + if (this.opts.jobOptions) { + queueConfig.defaultJobOptions = this.opts.jobOptions + } + let queue: Queue + if (!env.isTest()) { + queue = new BullQueue(this.jobQueue, queueConfig) + } else if ( + process.env.BULL_TEST_REDIS_PORT && + !isNaN(+process.env.BULL_TEST_REDIS_PORT) + ) { + queue = new BullQueue(this.jobQueue, { + redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT }, + }) + } else { + queue = new InMemoryQueue(this.jobQueue, queueConfig) as any + } + + addListeners(queue, this.jobQueue, this.opts.removeStalledCb) + QUEUES.push(queue) + if (!cleanupInterval && !env.isTest()) { + cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS) + // fire off an initial cleanup + cleanup().catch(err => { + console.error(`Unable to cleanup ${this.jobQueue} initially - ${err}`) + }) + } + return queue + } + + getBullQueue() { + return this.queue + } + + process( + concurrency: number, + cb: (job: Job) => Promise + ): Promise + process(cb: (job: Job) => Promise): Promise + process(...args: any[]) { + let concurrency: number | undefined = undefined + let cb: (job: Job) => Promise + if (args.length === 2) { + concurrency = args[0] + cb = args[1] + } else { + cb = args[0] + } + + const wrappedCb = async (job: Job) => { + await tracer.trace("queue.process", async span => { + // @ts-expect-error monkey patching the parent span id + if (job.data._parentSpanContext) { + // @ts-expect-error monkey patching the parent span id + const parentContext = job.data._parentSpanContext + const parent = { + traceId: parentContext.traceId, + spanId: parentContext.spanId, + toTraceId: () => parentContext.traceId, + toSpanId: () => parentContext.spanId, + toTraceparent: () => "", + } + span.addLink(parent) + } + span.addTags({ "queue.name": this.jobQueue, ...jobTags(job) }) + if (this.opts.jobTags) { + span.addTags(this.opts.jobTags(job.data)) + } + + tracer.dogstatsd.distribution( + "queue.process.sizeBytes", + sizeof(job.data), + this.metricTags() + ) + await this.withMetrics("queue.process", () => cb(job)) + }) + } + + if (concurrency) { + return this.queue.process(concurrency, wrappedCb) + } else { + return this.queue.process(wrappedCb) + } + } + + async add(data: T, opts?: JobOptions): Promise> { + return await tracer.trace("queue.add", async span => { + span.addTags({ + "queue.name": this.jobQueue, + "job.data.sizeBytes": sizeof(data), + ...jobOptsTags(opts || {}), + }) + if (this.opts.jobTags) { + span.addTags(this.opts.jobTags(data)) + } + // @ts-expect-error monkey patching the parent span id + data._parentSpanContext = { + traceId: span.context().toTraceId(), + spanId: span.context().toSpanId(), + } + + tracer.dogstatsd.distribution( + "queue.add.sizeBytes", + sizeof(data), + this.metricTags() + ) + return await this.withMetrics("queue.add", () => + this.queue.add(data, opts) + ) }) } - return queue + + private withMetrics(name: string, cb: () => Promise) { + return withMetrics(name, cb, this.metricTags()) + } + + private metricTags() { + return { queueName: this.jobQueue } + } + + close() { + return this.queue.close() + } } export async function shutdown() { diff --git a/packages/pro b/packages/pro index d9d2766261..538deccd00 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit d9d2766261e02b11318ae76ad0d79698e14f89a9 +Subproject commit 538deccd0000090447d6cfe7f9716b16fdfefcc2 diff --git a/packages/server/src/api/index.ts b/packages/server/src/api/index.ts index 230e5079b4..53be2b30a9 100644 --- a/packages/server/src/api/index.ts +++ b/packages/server/src/api/index.ts @@ -16,7 +16,7 @@ export const router: Router = new Router() router.get("/health", async ctx => { if (automationsEnabled()) { - if (!(await automationQueue.isReady())) { + if (!(await automationQueue.getBullQueue().isReady())) { ctx.status = 503 return } diff --git a/packages/server/src/api/routes/tests/row.spec.ts b/packages/server/src/api/routes/tests/row.spec.ts index cf24430f82..c55db8640c 100644 --- a/packages/server/src/api/routes/tests/row.spec.ts +++ b/packages/server/src/api/routes/tests/row.spec.ts @@ -2276,6 +2276,25 @@ if (descriptions.length) { expect(updated.attachment.key).toBe(newAttachment.key) }) + it("should allow updating signature row", async () => { + const { table, row } = await coreAttachmentEnrichment( + { + signature: { + type: FieldType.SIGNATURE_SINGLE, + name: "signature", + constraints: { presence: false }, + }, + }, + "signature", + `${uuid.v4()}.png` + ) + + const newSignature = generateAttachment(`${uuid.v4()}.png`) + row["signature"] = newSignature + const updated = await config.api.row.save(table._id!, row) + expect(updated.signature.key).toBe(newSignature.key) + }) + it("should allow enriching attachment list rows", async () => { await coreAttachmentEnrichment( { diff --git a/packages/server/src/appMigrations/queue.ts b/packages/server/src/appMigrations/queue.ts index d06d039fd0..94be32d45d 100644 --- a/packages/server/src/appMigrations/queue.ts +++ b/packages/server/src/appMigrations/queue.ts @@ -13,7 +13,7 @@ export type AppMigrationJob = { // always create app migration queue - so that events can be pushed and read from it // across the different api and automation services -const appMigrationQueue = queue.createQueue( +const appMigrationQueue = new queue.BudibaseQueue( queue.JobQueue.APP_MIGRATION, { jobOptions: { diff --git a/packages/server/src/automations/bullboard.ts b/packages/server/src/automations/bullboard.ts index 349282e863..cf89eb5a74 100644 --- a/packages/server/src/automations/bullboard.ts +++ b/packages/server/src/automations/bullboard.ts @@ -7,25 +7,36 @@ import { getAppMigrationQueue } from "../appMigrations/queue" import { createBullBoard } from "@bull-board/api" import { AutomationData } from "@budibase/types" -export const automationQueue = queue.createQueue( +export const automationQueue = new queue.BudibaseQueue( queue.JobQueue.AUTOMATION, - { removeStalledCb: automation.removeStalled } + { + removeStalledCb: automation.removeStalled, + jobTags: (job: AutomationData) => { + return { + "automation.id": job.automation._id, + "automation.name": job.automation.name, + "automation.appId": job.automation.appId, + "automation.createdAt": job.automation.createdAt, + "automation.trigger": job.automation.definition.trigger.stepId, + } + }, + } ) const PATH_PREFIX = "/bulladmin" export async function init() { // Set up queues for bull board admin - const queues = [new BullAdapter(automationQueue)] + const queues = [new BullAdapter(automationQueue.getBullQueue())] const backupQueue = backups.getBackupQueue() if (backupQueue) { - queues.push(new BullAdapter(backupQueue)) + queues.push(new BullAdapter(backupQueue.getBullQueue())) } const appMigrationQueue = getAppMigrationQueue() if (appMigrationQueue) { - queues.push(new BullAdapter(appMigrationQueue)) + queues.push(new BullAdapter(appMigrationQueue.getBullQueue())) } const serverAdapter = new KoaAdapter() diff --git a/packages/server/src/automations/tests/utilities/index.ts b/packages/server/src/automations/tests/utilities/index.ts index 4b41f1d977..f97c976aa0 100644 --- a/packages/server/src/automations/tests/utilities/index.ts +++ b/packages/server/src/automations/tests/utilities/index.ts @@ -22,7 +22,7 @@ export function afterAll() { } export function getTestQueue(): queue.InMemoryQueue { - return getQueue() as unknown as queue.InMemoryQueue + return getQueue().getBullQueue() as unknown as queue.InMemoryQueue } export function triggerCron(message: Job) { @@ -48,7 +48,7 @@ export async function runInProd(fn: any) { export async function captureAllAutomationRemovals(f: () => Promise) { const messages: Job[] = [] - const queue = getQueue() + const queue = getQueue().getBullQueue() const messageListener = async (message: Job) => { messages.push(message) @@ -82,7 +82,7 @@ export async function captureAutomationRemovals( export async function captureAllAutomationMessages(f: () => Promise) { const messages: Job[] = [] - const queue = getQueue() + const queue = getQueue().getBullQueue() const messageListener = async (message: Job) => { messages.push(message) @@ -122,7 +122,7 @@ export async function captureAllAutomationResults( f: () => Promise ): Promise[]> { const runs: queue.TestQueueMessage[] = [] - const queue = getQueue() + const queue = getQueue().getBullQueue() let messagesOutstanding = 0 const completedListener = async ( diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index fe2ae66b26..1fa346be19 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -115,12 +115,14 @@ export async function updateTestHistory( // end the repetition and the job itself export async function disableAllCrons(appId: any) { const promises = [] - const jobs = await automationQueue.getRepeatableJobs() + const jobs = await automationQueue.getBullQueue().getRepeatableJobs() for (let job of jobs) { if (job.key.includes(`${appId}_cron`)) { - promises.push(automationQueue.removeRepeatableByKey(job.key)) + promises.push( + automationQueue.getBullQueue().removeRepeatableByKey(job.key) + ) if (job.id) { - promises.push(automationQueue.removeJobs(job.id)) + promises.push(automationQueue.getBullQueue().removeJobs(job.id)) } } } @@ -129,10 +131,10 @@ export async function disableAllCrons(appId: any) { } export async function disableCronById(jobId: JobId) { - const jobs = await automationQueue.getRepeatableJobs() + const jobs = await automationQueue.getBullQueue().getRepeatableJobs() for (const job of jobs) { if (job.id === jobId) { - await automationQueue.removeRepeatableByKey(job.key) + await automationQueue.getBullQueue().removeRepeatableByKey(job.key) } } console.log(`jobId=${jobId} disabled`) diff --git a/packages/server/src/integrations/microsoftSqlServer.ts b/packages/server/src/integrations/microsoftSqlServer.ts index 8548d57f15..9bc0eb3855 100644 --- a/packages/server/src/integrations/microsoftSqlServer.ts +++ b/packages/server/src/integrations/microsoftSqlServer.ts @@ -371,8 +371,7 @@ class SqlServerIntegration extends Sql implements DatasourcePlus { ? `${query.sql}; SELECT SCOPE_IDENTITY() AS id;` : query.sql this.log(sql, query.bindings) - const resp = await request.query(sql) - return resp + return await request.query(sql) } catch (err: any) { let readableMessage = getReadableErrorMessage( SourceName.SQL_SERVER, diff --git a/packages/server/src/sdk/app/oauth2/tests/utils.spec.ts b/packages/server/src/sdk/app/oauth2/tests/utils.spec.ts index 5739f1a794..e03abf1f5a 100644 --- a/packages/server/src/sdk/app/oauth2/tests/utils.spec.ts +++ b/packages/server/src/sdk/app/oauth2/tests/utils.spec.ts @@ -190,7 +190,7 @@ describe("oauth2 utils", () => { await config.doInContext(config.appId, () => getToken(oauthConfig._id)) await testUtils.queue.processMessages( - cache.docWritethrough.DocWritethroughProcessor.queue + cache.docWritethrough.DocWritethroughProcessor.queue.getBullQueue() ) const usageLog = await config.doInContext(config.appId, () => @@ -216,7 +216,7 @@ describe("oauth2 utils", () => { config.doInContext(config.appId, () => getToken(oauthConfig._id)) ).rejects.toThrow() await testUtils.queue.processMessages( - cache.docWritethrough.DocWritethroughProcessor.queue + cache.docWritethrough.DocWritethroughProcessor.queue.getBullQueue() ) const usageLog = await config.doInContext(config.appId, () => @@ -247,7 +247,7 @@ describe("oauth2 utils", () => { getToken(oauthConfig._id) ) await testUtils.queue.processMessages( - cache.docWritethrough.DocWritethroughProcessor.queue + cache.docWritethrough.DocWritethroughProcessor.queue.getBullQueue() ) for (const appId of [config.appId, config.prodAppId]) { diff --git a/packages/shared-core/src/constants/fields.ts b/packages/shared-core/src/constants/fields.ts index 5acf07d863..e169877eb8 100644 --- a/packages/shared-core/src/constants/fields.ts +++ b/packages/shared-core/src/constants/fields.ts @@ -30,4 +30,12 @@ export const SWITCHABLE_TYPES: SwitchableTypes = { FieldType.LONGFORM, ], [FieldType.NUMBER]: [FieldType.NUMBER, FieldType.BOOLEAN], + [FieldType.JSON]: [ + FieldType.JSON, + FieldType.ARRAY, + FieldType.ATTACHMENTS, + FieldType.ATTACHMENT_SINGLE, + FieldType.BB_REFERENCE, + FieldType.SIGNATURE_SINGLE, + ], } diff --git a/packages/types/src/documents/app/row.ts b/packages/types/src/documents/app/row.ts index bb58933b65..4c32e45a8c 100644 --- a/packages/types/src/documents/app/row.ts +++ b/packages/types/src/documents/app/row.ts @@ -128,6 +128,7 @@ export enum FieldType { export const JsonTypes = [ FieldType.ATTACHMENT_SINGLE, FieldType.ATTACHMENTS, + FieldType.SIGNATURE_SINGLE, // only BB_REFERENCE is JSON, it's an array, BB_REFERENCE_SINGLE is a string type FieldType.BB_REFERENCE, FieldType.JSON,