From bd6d6534bef7feb9e6fe083c7125c5f88d5f877e Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 4 Feb 2025 15:14:58 +0000 Subject: [PATCH 1/7] Some typing improvements in automation queues. --- .../backend-core/src/queue/inMemoryQueue.ts | 7 +++-- packages/server/src/automations/bullboard.ts | 26 +++++++--------- packages/server/src/automations/index.ts | 5 ++- .../automations/tests/triggers/cron.spec.ts | 31 +++++++++++++++++++ packages/server/src/automations/utils.ts | 2 +- 5 files changed, 49 insertions(+), 22 deletions(-) create mode 100644 packages/server/src/automations/tests/triggers/cron.spec.ts diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index dd8d3daa37..5c52c693fb 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -30,9 +30,10 @@ function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { } /** - * This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock. - * It is relatively simple, using an event emitter internally to register when messages are available - * to the consumers - in can support many inputs and many consumers. + * This is designed to replicate Bull (https://github.com/OptimalBits/bull) in + * memory as a sort of mock. It is relatively simple, using an event emitter + * internally to register when messages are available to the consumers - in can + * support many inputs and many consumers. */ class InMemoryQueue implements Partial { _name: string diff --git a/packages/server/src/automations/bullboard.ts b/packages/server/src/automations/bullboard.ts index aa4287b2d0..349282e863 100644 --- a/packages/server/src/automations/bullboard.ts +++ b/packages/server/src/automations/bullboard.ts @@ -5,9 +5,9 @@ import * as automation from "../threads/automation" import { backups } from "@budibase/pro" import { getAppMigrationQueue } from "../appMigrations/queue" import { createBullBoard } from "@bull-board/api" -import BullQueue from "bull" +import { AutomationData } from "@budibase/types" -export const automationQueue: BullQueue.Queue = queue.createQueue( +export const automationQueue = queue.createQueue( queue.JobQueue.AUTOMATION, { removeStalledCb: automation.removeStalled } ) @@ -16,24 +16,20 @@ const PATH_PREFIX = "/bulladmin" export async function init() { // Set up queues for bull board admin + const queues = [new BullAdapter(automationQueue)] + const backupQueue = backups.getBackupQueue() - const appMigrationQueue = getAppMigrationQueue() - const queues = [automationQueue] if (backupQueue) { - queues.push(backupQueue) + queues.push(new BullAdapter(backupQueue)) } + + const appMigrationQueue = getAppMigrationQueue() if (appMigrationQueue) { - queues.push(appMigrationQueue) + queues.push(new BullAdapter(appMigrationQueue)) } - const adapters = [] - const serverAdapter: any = new KoaAdapter() - for (let queue of queues) { - adapters.push(new BullAdapter(queue)) - } - createBullBoard({ - queues: adapters, - serverAdapter, - }) + + const serverAdapter = new KoaAdapter() + createBullBoard({ queues, serverAdapter }) serverAdapter.setBasePath(PATH_PREFIX) return serverAdapter.registerPlugin() } diff --git a/packages/server/src/automations/index.ts b/packages/server/src/automations/index.ts index 4ef3210932..5f9ca1aa60 100644 --- a/packages/server/src/automations/index.ts +++ b/packages/server/src/automations/index.ts @@ -1,7 +1,6 @@ import { processEvent } from "./utils" import { automationQueue } from "./bullboard" import { rebootTrigger } from "./triggers" -import BullQueue from "bull" import { automationsEnabled } from "../features" export { automationQueue } from "./bullboard" @@ -25,6 +24,6 @@ export async function init() { return promise } -export function getQueues(): BullQueue.Queue[] { - return [automationQueue] +export function getQueue() { + return automationQueue } diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts new file mode 100644 index 0000000000..af82eb3797 --- /dev/null +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -0,0 +1,31 @@ +import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" +import TestConfiguration from "../../../tests/utilities/TestConfiguration" +import { getQueue } from "../.." + +describe("cron trigger", () => { + const config = new TestConfiguration() + + beforeEach(async () => { + await config.init() + }) + + afterAll(() => { + config.end() + }) + + it("should run the webhook automation - checking for parameters", async () => { + const queue = getQueue() + expect(await queue.count()).toEqual(0) + + await createAutomationBuilder({ config }) + .cron({ cron: "* * * * *" }) + .serverLog({ + text: "Hello, world!", + }) + .save() + + await config.publish() + + expect(await queue.count()).toEqual(1) + }) +}) diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 9bfcc6cf8a..83665fc975 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -230,7 +230,7 @@ export async function enableCronTrigger(appId: any, automation: Automation) { // can't use getAppDB here as this is likely to be called from dev app, // but this call could be for dev app or prod app, need to just use what // was passed in - await dbCore.doWithDB(appId, async (db: any) => { + await dbCore.doWithDB(appId, async db => { const response = await db.put(automation) automation._id = response.id automation._rev = response.rev From d03fdc8bb164eda06c51cba5b682e13226685e92 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Wed, 5 Feb 2025 09:22:37 +0000 Subject: [PATCH 2/7] Create a cron trigger test. --- .../backend-core/src/queue/inMemoryQueue.ts | 123 +++++++++++------- .../automations/tests/triggers/cron.spec.ts | 20 ++- 2 files changed, 91 insertions(+), 52 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 5c52c693fb..6833c9a306 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,32 +1,44 @@ import events from "events" import { newid } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" +import { helpers } from "@budibase/shared-core" +import { Job, JobId, JobInformation } from "bull" -interface JobMessage { - id: string - timestamp: number - queue: string - data: any - opts?: JobOptions +function jobToJobInformation(job: Job): JobInformation { + let cron = "" + let every = -1 + let tz: string | undefined = undefined + let endDate: number | undefined = undefined + + const repeat = job.opts?.repeat + if (repeat) { + endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : Date.now() + tz = repeat.tz + if ("cron" in repeat) { + cron = repeat.cron + } else { + every = repeat.every + } + } + + return { + id: job.id.toString(), + name: "", + key: job.id.toString(), + tz, + endDate, + cron, + every, + next: 0, + } } -/** - * Bull works with a Job wrapper around all messages that contains a lot more information about - * the state of the message, this object constructor implements the same schema of Bull jobs - * for the sake of maintaining API consistency. - * @param queue The name of the queue which the message will be carried on. - * @param message The JSON message which will be passed back to the consumer. - * @returns A new job which can now be put onto the queue, this is mostly an - * internal structure so that an in memory queue can be easily swapped for a Bull queue. - */ -function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { - return { - id: newid(), - timestamp: Date.now(), - queue: queue, - data: message, - opts, - } +interface JobMessage extends Partial> { + id: string + timestamp: number + queue: Queue + data: any + opts?: JobOptions } /** @@ -40,7 +52,7 @@ class InMemoryQueue implements Partial { _opts?: QueueOptions _messages: JobMessage[] _queuedJobIds: Set - _emitter: NodeJS.EventEmitter + _emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }> _runCount: number _addCount: number @@ -70,34 +82,29 @@ class InMemoryQueue implements Partial { */ async process(concurrencyOrFunc: number | any, func?: any) { func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc - this._emitter.on("message", async () => { - if (this._messages.length <= 0) { - return - } - let msg = this._messages.shift() - - let resp = func(msg) + this._emitter.on("message", async message => { + let resp = func(message) async function retryFunc(fnc: any) { try { await fnc } catch (e: any) { - await new Promise(r => setTimeout(() => r(), 50)) - - await retryFunc(func(msg)) + await helpers.wait(50) + await retryFunc(func(message)) } } if (resp.then != null) { try { await retryFunc(resp) + this._emitter.emit("completed", message as Job) } catch (e: any) { console.error(e) } } this._runCount++ - const jobId = msg?.opts?.jobId?.toString() - if (jobId && msg?.opts?.removeOnComplete) { + const jobId = message.opts?.jobId?.toString() + if (jobId && message.opts?.removeOnComplete) { this._queuedJobIds.delete(jobId) } }) @@ -131,9 +138,16 @@ class InMemoryQueue implements Partial { } const pushMessage = () => { - this._messages.push(newJob(this._name, data, opts)) + const message: JobMessage = { + id: newid(), + timestamp: Date.now(), + queue: this as unknown as Queue, + data, + opts, + } + this._messages.push(message) this._addCount++ - this._emitter.emit("message") + this._emitter.emit("message", message) } const delay = opts?.delay @@ -159,13 +173,6 @@ class InMemoryQueue implements Partial { console.log(cronJobId) } - /** - * Implemented for tests - */ - async getRepeatableJobs() { - return [] - } - async removeJobs(_pattern: string) { // no-op } @@ -177,13 +184,31 @@ class InMemoryQueue implements Partial { return [] } - async getJob() { + async getJob(id: JobId) { + for (const message of this._messages) { + if (message.id === id) { + return message as Job + } + } return null } - on() { - // do nothing - return this as any + on(event: string, callback: (...args: any[]) => void): Queue { + // @ts-expect-error - this callback can be one of many types + this._emitter.on(event, callback) + return this as unknown as Queue + } + + async count() { + return this._messages.length + } + + async getCompletedCount() { + return this._runCount + } + + async getRepeatableJobs() { + return this._messages.map(job => jobToJobInformation(job as Job)) } } diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index af82eb3797..c377554df5 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -1,6 +1,7 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import TestConfiguration from "../../../tests/utilities/TestConfiguration" import { getQueue } from "../.." +import { Job } from "bull" describe("cron trigger", () => { const config = new TestConfiguration() @@ -13,9 +14,15 @@ describe("cron trigger", () => { config.end() }) - it("should run the webhook automation - checking for parameters", async () => { + it("should queue a Bull cron job", async () => { const queue = getQueue() - expect(await queue.count()).toEqual(0) + expect(await queue.getCompletedCount()).toEqual(0) + + const jobPromise = new Promise(resolve => { + queue.on("completed", async job => { + resolve(job) + }) + }) await createAutomationBuilder({ config }) .cron({ cron: "* * * * *" }) @@ -26,6 +33,13 @@ describe("cron trigger", () => { await config.publish() - expect(await queue.count()).toEqual(1) + expect(await queue.getCompletedCount()).toEqual(1) + + const job = await jobPromise + const repeat = job.opts?.repeat + if (!repeat || !("cron" in repeat)) { + throw new Error("Expected cron repeat") + } + expect(repeat.cron).toEqual("* * * * *") }) }) From b98a1931d0e368104ee685bd4f288726b934b4e2 Mon Sep 17 00:00:00 2001 From: Budibase Staging Release Bot <> Date: Wed, 5 Feb 2025 14:36:44 +0000 Subject: [PATCH 3/7] Bump version to 3.4.2 --- lerna.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lerna.json b/lerna.json index faedd55ccb..df0fbd8dfc 100644 --- a/lerna.json +++ b/lerna.json @@ -1,6 +1,6 @@ { "$schema": "node_modules/lerna/schemas/lerna-schema.json", - "version": "3.4.1", + "version": "3.4.2", "npmClient": "yarn", "concurrency": 20, "command": { From ae3367f44424ce4bc85cf24dca870865e16bae08 Mon Sep 17 00:00:00 2001 From: melohagan <101575380+melohagan@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:18:01 +0000 Subject: [PATCH 4/7] Update type (#15490) * Add installId to activate license request * Add pro module * Revert "Add pro module" This reverts commit 468ff2afbf9dda0febeacc8a77c7627ddd89a505. --- packages/pro | 2 +- packages/types/src/api/account/license.ts | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/pro b/packages/pro index 43a5785ccb..af9648d61d 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit 43a5785ccb4f83ce929b29f05ea0a62199fcdf23 +Subproject commit af9648d61d20f277023379b2e5b2ef8f360f0be0 diff --git a/packages/types/src/api/account/license.ts b/packages/types/src/api/account/license.ts index edb1267ecf..342e4882e6 100644 --- a/packages/types/src/api/account/license.ts +++ b/packages/types/src/api/account/license.ts @@ -20,7 +20,8 @@ export interface QuotaTriggeredRequest { } export interface LicenseActivateRequest { - installVersion?: string + installVersion: string + installId: string } export interface UpdateLicenseRequest { From 6ad9ebe63c5aee257a778d59ccdcbb14d25e77e1 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Wed, 5 Feb 2025 15:24:54 +0000 Subject: [PATCH 5/7] Add failing test. --- .../automations/tests/triggers/cron.spec.ts | 19 ++++++++++++- .../src/tests/utilities/api/application.ts | 28 +++++++++++++------ 2 files changed, 38 insertions(+), 9 deletions(-) diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index c377554df5..956f054ca4 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -31,7 +31,7 @@ describe("cron trigger", () => { }) .save() - await config.publish() + await config.api.application.publish(config.getAppId()) expect(await queue.getCompletedCount()).toEqual(1) @@ -42,4 +42,21 @@ describe("cron trigger", () => { } expect(repeat.cron).toEqual("* * * * *") }) + + it("should fail if the cron expression is invalid", async () => { + await createAutomationBuilder({ config }) + .cron({ cron: "* * * * * *" }) + .serverLog({ + text: "Hello, world!", + }) + .save() + + await config.api.application.publish(config.getAppId(), { + status: 500, + body: { + message: + 'Deployment Failed: Invalid automation CRON "* * * * * *" - Expected 5 values, but got 6.', + }, + }) + }) }) diff --git a/packages/server/src/tests/utilities/api/application.ts b/packages/server/src/tests/utilities/api/application.ts index 1fe9840c1d..d0fcb60804 100644 --- a/packages/server/src/tests/utilities/api/application.ts +++ b/packages/server/src/tests/utilities/api/application.ts @@ -33,7 +33,10 @@ export class ApplicationAPI extends TestAPI { await this._delete(`/api/applications/${appId}`, { expectations }) } - publish = async (appId: string): Promise => { + publish = async ( + appId: string, + expectations?: Expectations + ): Promise => { return await this._post( `/api/applications/${appId}/publish`, { @@ -42,14 +45,16 @@ export class ApplicationAPI extends TestAPI { headers: { [constants.Header.APP_ID]: appId, }, + expectations, } ) } - unpublish = async (appId: string): Promise => { - await this._post(`/api/applications/${appId}/unpublish`, { - expectations: { status: 200 }, - }) + unpublish = async ( + appId: string, + expectations?: Expectations + ): Promise => { + await this._post(`/api/applications/${appId}/unpublish`, { expectations }) } sync = async ( @@ -144,13 +149,20 @@ export class ApplicationAPI extends TestAPI { }) } - fetch = async ({ status }: { status?: AppStatus } = {}): Promise => { + fetch = async ( + { status }: { status?: AppStatus } = {}, + expectations?: Expectations + ): Promise => { return await this._get("/api/applications", { query: { status }, + expectations, }) } - addSampleData = async (appId: string): Promise => { - await this._post(`/api/applications/${appId}/sample`) + addSampleData = async ( + appId: string, + expectations?: Expectations + ): Promise => { + await this._post(`/api/applications/${appId}/sample`, { expectations }) } } From 690fa705cf4d778d14e3107d4dabc3da6eccd79a Mon Sep 17 00:00:00 2001 From: melohagan <101575380+melohagan@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:45:38 +0000 Subject: [PATCH 6/7] Update pro ref (#15492) --- packages/pro | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pro b/packages/pro index af9648d61d..8cbaa80a9c 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit af9648d61d20f277023379b2e5b2ef8f360f0be0 +Subproject commit 8cbaa80a9cc1152c6cd53722e64da7d824da6e16 From 31664babfe11d69ab074d1f7293473a1a83d5d93 Mon Sep 17 00:00:00 2001 From: Budibase Staging Release Bot <> Date: Wed, 5 Feb 2025 16:33:48 +0000 Subject: [PATCH 7/7] Bump version to 3.4.3 --- lerna.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lerna.json b/lerna.json index df0fbd8dfc..bdb933c0c5 100644 --- a/lerna.json +++ b/lerna.json @@ -1,6 +1,6 @@ { "$schema": "node_modules/lerna/schemas/lerna-schema.json", - "version": "3.4.2", + "version": "3.4.3", "npmClient": "yarn", "concurrency": 20, "command": {