diff --git a/lerna.json b/lerna.json index 5f1bcc6bf1..45eead5a5a 100644 --- a/lerna.json +++ b/lerna.json @@ -1,6 +1,6 @@ { "$schema": "node_modules/lerna/schemas/lerna-schema.json", - "version": "3.7.0", + "version": "3.7.4", "npmClient": "yarn", "concurrency": 20, "command": { diff --git a/packages/backend-core/package.json b/packages/backend-core/package.json index 8da06718ab..7510655ed7 100644 --- a/packages/backend-core/package.json +++ b/packages/backend-core/package.json @@ -56,6 +56,7 @@ "koa-pino-logger": "4.0.0", "lodash": "4.17.21", "node-fetch": "2.6.7", + "object-sizeof": "2.6.1", "passport-google-oauth": "2.0.0", "passport-local": "1.0.0", "passport-oauth2-refresh": "^2.1.0", diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 30af16c977..6f2f3cbc97 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -1,6 +1,6 @@ -import { AnyDocument, Database, Document } from "@budibase/types" +import { AnyDocument, Database, Document, DocumentType } from "@budibase/types" -import { JobQueue, Queue, createQueue } from "../queue" +import { BudibaseQueue, JobQueue } from "../queue" import * as dbUtils from "../db" interface ProcessDocMessage { @@ -13,11 +13,11 @@ const PERSIST_MAX_ATTEMPTS = 100 let processor: DocWritethroughProcessor | undefined export class DocWritethroughProcessor { - private static _queue: Queue + private static _queue: BudibaseQueue public static get queue() { if (!DocWritethroughProcessor._queue) { - DocWritethroughProcessor._queue = createQueue( + DocWritethroughProcessor._queue = new BudibaseQueue( JobQueue.DOC_WRITETHROUGH_QUEUE, { jobOptions: { @@ -57,6 +57,10 @@ export class DocWritethroughProcessor { docId: string data: Record }) { + // HACK - for now drop SCIM events + if (docId.startsWith(DocumentType.SCIM_LOG)) { + return + } const db = dbUtils.getDB(dbName) let doc: AnyDocument | undefined try { diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index b72651e21f..01944e68a8 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -18,7 +18,9 @@ import { const initialTime = Date.now() async function waitForQueueCompletion() { - await utils.queue.processMessages(DocWritethroughProcessor.queue) + await utils.queue.processMessages( + DocWritethroughProcessor.queue.getBullQueue() + ) } beforeAll(() => utils.queue.useRealQueues()) diff --git a/packages/backend-core/src/events/asyncEvents/queue.ts b/packages/backend-core/src/events/asyncEvents/queue.ts index 196fd359b3..a393918625 100644 --- a/packages/backend-core/src/events/asyncEvents/queue.ts +++ b/packages/backend-core/src/events/asyncEvents/queue.ts @@ -1,5 +1,4 @@ -import BullQueue from "bull" -import { createQueue, JobQueue } from "../../queue" +import { BudibaseQueue, JobQueue } from "../../queue" import { Event, Identity } from "@budibase/types" export interface EventPayload { @@ -9,10 +8,19 @@ export interface EventPayload { timestamp?: string | number } -export let asyncEventQueue: BullQueue.Queue +export let asyncEventQueue: BudibaseQueue export function init() { - asyncEventQueue = createQueue(JobQueue.SYSTEM_EVENT_QUEUE) + asyncEventQueue = new BudibaseQueue( + JobQueue.SYSTEM_EVENT_QUEUE, + { + jobTags: (event: EventPayload) => { + return { + "event.name": event.event, + } + }, + } + ) } export async function shutdown() { diff --git a/packages/backend-core/src/events/processors/AuditLogsProcessor.ts b/packages/backend-core/src/events/processors/AuditLogsProcessor.ts index 3dd2ab9d10..9a8d7d3cc7 100644 --- a/packages/backend-core/src/events/processors/AuditLogsProcessor.ts +++ b/packages/backend-core/src/events/processors/AuditLogsProcessor.ts @@ -8,24 +8,30 @@ import { } from "@budibase/types" import { EventProcessor } from "./types" import { getAppId, doInTenant, getTenantId } from "../../context" -import BullQueue from "bull" -import { createQueue, JobQueue } from "../../queue" +import { BudibaseQueue, JobQueue } from "../../queue" import { isAudited } from "../../utils" import env from "../../environment" export default class AuditLogsProcessor implements EventProcessor { static auditLogsEnabled = false - static auditLogQueue: BullQueue.Queue + static auditLogQueue: BudibaseQueue // can't use constructor as need to return promise static init(fn: AuditLogFn) { AuditLogsProcessor.auditLogsEnabled = true const writeAuditLogs = fn - AuditLogsProcessor.auditLogQueue = createQueue( - JobQueue.AUDIT_LOG + AuditLogsProcessor.auditLogQueue = new BudibaseQueue( + JobQueue.AUDIT_LOG, + { + jobTags: (event: AuditLogQueueEvent) => { + return { + "event.name": event.event, + } + }, + } ) return AuditLogsProcessor.auditLogQueue.process(async job => { - return doInTenant(job.data.tenantId, async () => { + await doInTenant(job.data.tenantId, async () => { let properties = job.data.properties if (properties.audited) { properties = { diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index ebab5c7392..6afbf6c4cf 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -96,12 +96,17 @@ export class InMemoryQueue implements Partial> { let resp = func(message) - async function retryFunc(fnc: any) { + async function retryFunc(fnc: any, attempt = 0) { try { await fnc } catch (e: any) { - await helpers.wait(50) - await retryFunc(func(message)) + attempt++ + if (attempt < 3) { + await helpers.wait(100 * attempt) + await retryFunc(func(message), attempt) + } else { + throw e + } } } diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 4e1086823b..5365080ed1 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -2,10 +2,12 @@ import env from "../environment" import { getRedisOptions } from "../redis/utils" import { JobQueue } from "./constants" import InMemoryQueue from "./inMemoryQueue" -import BullQueue, { QueueOptions, JobOptions } from "bull" +import BullQueue, { Queue, QueueOptions, JobOptions, Job } from "bull" import { addListeners, StalledFn } from "./listeners" import { Duration } from "../utils" import * as timers from "../timers" +import tracer from "dd-trace" +import sizeof from "object-sizeof" export type { QueueOptions, Queue, JobOptions } from "bull" @@ -15,7 +17,7 @@ const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs() // cleanup the queue every 60 seconds const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs() -let QUEUES: BullQueue.Queue[] = [] +let QUEUES: Queue[] = [] let cleanupInterval: NodeJS.Timeout async function cleanup() { @@ -25,49 +27,205 @@ async function cleanup() { } } -export function createQueue( - jobQueue: JobQueue, - opts: { - removeStalledCb?: StalledFn - maxStalledCount?: number - jobOptions?: JobOptions - } = {} -): BullQueue.Queue { - const redisOpts = getRedisOptions() - const queueConfig: QueueOptions = { - redis: redisOpts, - settings: { - maxStalledCount: opts.maxStalledCount ? opts.maxStalledCount : 0, - lockDuration: QUEUE_LOCK_MS, - lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, - }, +async function withMetrics( + name: string, + cb: () => Promise, + tags?: Record +): Promise { + const start = performance.now() + try { + const result = await cb() + tracer.dogstatsd.increment(`${name}.success`, 1, tags) + return result + } catch (err) { + tracer.dogstatsd.increment(`${name}.error`, 1, tags) + throw err + } finally { + const durationMs = performance.now() - start + tracer.dogstatsd.distribution(`${name}.duration.ms`, durationMs, tags) + tracer.dogstatsd.increment(name, 1, tags) } - if (opts.jobOptions) { - queueConfig.defaultJobOptions = opts.jobOptions +} + +function jobOptsTags(opts: JobOptions) { + return { + "job.opts.attempts": opts.attempts, + "job.opts.backoff": opts.backoff, + "job.opts.delay": opts.delay, + "job.opts.jobId": opts.jobId, + "job.opts.lifo": opts.lifo, + "job.opts.preventParsingData": opts.preventParsingData, + "job.opts.priority": opts.priority, + "job.opts.removeOnComplete": opts.removeOnComplete, + "job.opts.removeOnFail": opts.removeOnFail, + "job.opts.repeat": opts.repeat, + "job.opts.stackTraceLimit": opts.stackTraceLimit, + "job.opts.timeout": opts.timeout, } - let queue: BullQueue.Queue - if (!env.isTest()) { - queue = new BullQueue(jobQueue, queueConfig) - } else if ( - process.env.BULL_TEST_REDIS_PORT && - !isNaN(+process.env.BULL_TEST_REDIS_PORT) - ) { - queue = new BullQueue(jobQueue, { - redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT }, - }) - } else { - queue = new InMemoryQueue(jobQueue, queueConfig) as any +} + +function jobTags(job: Job) { + return { + "job.id": job.id, + "job.attemptsMade": job.attemptsMade, + "job.timestamp": job.timestamp, + "job.data.sizeBytes": sizeof(job.data), + ...jobOptsTags(job.opts || {}), } - addListeners(queue, jobQueue, opts?.removeStalledCb) - QUEUES.push(queue) - if (!cleanupInterval && !env.isTest()) { - cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS) - // fire off an initial cleanup - cleanup().catch(err => { - console.error(`Unable to cleanup ${jobQueue} initially - ${err}`) +} + +export interface BudibaseQueueOpts { + removeStalledCb?: StalledFn + maxStalledCount?: number + jobOptions?: JobOptions + jobTags?: (job: T) => Record +} + +export class BudibaseQueue { + private queue: Queue + private opts: BudibaseQueueOpts + private jobQueue: JobQueue + + constructor(jobQueue: JobQueue, opts: BudibaseQueueOpts = {}) { + this.opts = opts + this.jobQueue = jobQueue + this.queue = this.initQueue() + } + + private initQueue() { + const redisOpts = getRedisOptions() + const queueConfig: QueueOptions = { + redis: redisOpts, + settings: { + maxStalledCount: this.opts.maxStalledCount + ? this.opts.maxStalledCount + : 0, + lockDuration: QUEUE_LOCK_MS, + lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, + }, + } + if (this.opts.jobOptions) { + queueConfig.defaultJobOptions = this.opts.jobOptions + } + let queue: Queue + if (!env.isTest()) { + queue = new BullQueue(this.jobQueue, queueConfig) + } else if ( + process.env.BULL_TEST_REDIS_PORT && + !isNaN(+process.env.BULL_TEST_REDIS_PORT) + ) { + queue = new BullQueue(this.jobQueue, { + redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT }, + }) + } else { + queue = new InMemoryQueue(this.jobQueue, queueConfig) as any + } + + addListeners(queue, this.jobQueue, this.opts.removeStalledCb) + QUEUES.push(queue) + if (!cleanupInterval && !env.isTest()) { + cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS) + // fire off an initial cleanup + cleanup().catch(err => { + console.error(`Unable to cleanup ${this.jobQueue} initially - ${err}`) + }) + } + return queue + } + + getBullQueue() { + return this.queue + } + + process( + concurrency: number, + cb: (job: Job) => Promise + ): Promise + process(cb: (job: Job) => Promise): Promise + process(...args: any[]) { + let concurrency: number | undefined = undefined + let cb: (job: Job) => Promise + if (args.length === 2) { + concurrency = args[0] + cb = args[1] + } else { + cb = args[0] + } + + const wrappedCb = async (job: Job) => { + await tracer.trace("queue.process", async span => { + // @ts-expect-error monkey patching the parent span id + if (job.data._parentSpanContext) { + // @ts-expect-error monkey patching the parent span id + const parentContext = job.data._parentSpanContext + const parent = { + traceId: parentContext.traceId, + spanId: parentContext.spanId, + toTraceId: () => parentContext.traceId, + toSpanId: () => parentContext.spanId, + toTraceparent: () => "", + } + span.addLink(parent) + } + span.addTags({ "queue.name": this.jobQueue, ...jobTags(job) }) + if (this.opts.jobTags) { + span.addTags(this.opts.jobTags(job.data)) + } + + tracer.dogstatsd.distribution( + "queue.process.sizeBytes", + sizeof(job.data), + this.metricTags() + ) + await this.withMetrics("queue.process", () => cb(job)) + }) + } + + if (concurrency) { + return this.queue.process(concurrency, wrappedCb) + } else { + return this.queue.process(wrappedCb) + } + } + + async add(data: T, opts?: JobOptions): Promise> { + return await tracer.trace("queue.add", async span => { + span.addTags({ + "queue.name": this.jobQueue, + "job.data.sizeBytes": sizeof(data), + ...jobOptsTags(opts || {}), + }) + if (this.opts.jobTags) { + span.addTags(this.opts.jobTags(data)) + } + // @ts-expect-error monkey patching the parent span id + data._parentSpanContext = { + traceId: span.context().toTraceId(), + spanId: span.context().toSpanId(), + } + + tracer.dogstatsd.distribution( + "queue.add.sizeBytes", + sizeof(data), + this.metricTags() + ) + return await this.withMetrics("queue.add", () => + this.queue.add(data, opts) + ) }) } - return queue + + private withMetrics(name: string, cb: () => Promise) { + return withMetrics(name, cb, this.metricTags()) + } + + private metricTags() { + return { queueName: this.jobQueue } + } + + close() { + return this.queue.close() + } } export async function shutdown() { diff --git a/packages/backend-core/src/sql/sql.ts b/packages/backend-core/src/sql/sql.ts index 937f88c846..fa4742f23f 100644 --- a/packages/backend-core/src/sql/sql.ts +++ b/packages/backend-core/src/sql/sql.ts @@ -6,6 +6,7 @@ import { isInvalidISODateString, isValidFilter, isValidISODateString, + isValidTime, sqlLog, validateManyToMany, } from "./utils" @@ -417,11 +418,17 @@ class InternalBuilder { } if (typeof input === "string" && schema.type === FieldType.DATETIME) { - if (isInvalidISODateString(input)) { - return null - } - if (isValidISODateString(input)) { - return new Date(input.trim()) + if (schema.timeOnly) { + if (!isValidTime(input)) { + return null + } + } else { + if (isInvalidISODateString(input)) { + return null + } + if (isValidISODateString(input)) { + return new Date(input.trim()) + } } } return input diff --git a/packages/backend-core/src/sql/tests/utils.spec.ts b/packages/backend-core/src/sql/tests/utils.spec.ts new file mode 100644 index 0000000000..819a0aa581 --- /dev/null +++ b/packages/backend-core/src/sql/tests/utils.spec.ts @@ -0,0 +1,35 @@ +import { isValidISODateString, isInvalidISODateString } from "../utils" + +describe("ISO date string validity checks", () => { + it("accepts a valid ISO date string without a time", () => { + const str = "2013-02-01" + const valid = isValidISODateString(str) + const invalid = isInvalidISODateString(str) + expect(valid).toEqual(true) + expect(invalid).toEqual(false) + }) + + it("accepts a valid ISO date string with a time", () => { + const str = "2013-02-01T01:23:45Z" + const valid = isValidISODateString(str) + const invalid = isInvalidISODateString(str) + expect(valid).toEqual(true) + expect(invalid).toEqual(false) + }) + + it("accepts a valid ISO date string with a time and millis", () => { + const str = "2013-02-01T01:23:45.678Z" + const valid = isValidISODateString(str) + const invalid = isInvalidISODateString(str) + expect(valid).toEqual(true) + expect(invalid).toEqual(false) + }) + + it("rejects an invalid ISO date string", () => { + const str = "2013-523-814T444:22:11Z" + const valid = isValidISODateString(str) + const invalid = isInvalidISODateString(str) + expect(valid).toEqual(false) + expect(invalid).toEqual(true) + }) +}) diff --git a/packages/backend-core/src/sql/utils.ts b/packages/backend-core/src/sql/utils.ts index 746a949ef3..5bd9e4d12a 100644 --- a/packages/backend-core/src/sql/utils.ts +++ b/packages/backend-core/src/sql/utils.ts @@ -14,7 +14,7 @@ import environment from "../environment" const DOUBLE_SEPARATOR = `${SEPARATOR}${SEPARATOR}` const ROW_ID_REGEX = /^\[.*]$/g const ENCODED_SPACE = encodeURIComponent(" ") -const ISO_DATE_REGEX = /^\d{4}-\d{2}-\d{2}(?:T\d{2}:\d{2}:\d{2}.\d{3}Z)?$/ +const ISO_DATE_REGEX = /^\d{4}-\d{2}-\d{2}(?:T\d{2}:\d{2}:\d{2}(?:.\d{3})?Z)?$/ const TIME_REGEX = /^(?:\d{2}:)?(?:\d{2}:)(?:\d{2})$/ export function isExternalTableID(tableId: string) { @@ -139,17 +139,17 @@ export function breakRowIdField(_id: string | { _id: string }): any[] { } } -export function isInvalidISODateString(str: string) { +export function isValidISODateString(str: string) { const trimmedValue = str.trim() if (!ISO_DATE_REGEX.test(trimmedValue)) { return false } - let d = new Date(trimmedValue) - return isNaN(d.getTime()) + const d = new Date(trimmedValue) + return !isNaN(d.getTime()) } -export function isValidISODateString(str: string) { - return ISO_DATE_REGEX.test(str.trim()) +export function isInvalidISODateString(str: string) { + return !isValidISODateString(str) } export function isValidFilter(value: any) { diff --git a/packages/bbui/src/Form/Core/TextArea.svelte b/packages/bbui/src/Form/Core/TextArea.svelte index 30c1993f9b..15cc13a7fa 100644 --- a/packages/bbui/src/Form/Core/TextArea.svelte +++ b/packages/bbui/src/Form/Core/TextArea.svelte @@ -75,6 +75,7 @@ class:is-disabled={disabled} class:is-focused={isFocused} > + +