From d03fdc8bb164eda06c51cba5b682e13226685e92 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Wed, 5 Feb 2025 09:22:37 +0000 Subject: [PATCH] Create a cron trigger test. --- .../backend-core/src/queue/inMemoryQueue.ts | 123 +++++++++++------- .../automations/tests/triggers/cron.spec.ts | 20 ++- 2 files changed, 91 insertions(+), 52 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 5c52c693fb..6833c9a306 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,32 +1,44 @@ import events from "events" import { newid } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" +import { helpers } from "@budibase/shared-core" +import { Job, JobId, JobInformation } from "bull" -interface JobMessage { - id: string - timestamp: number - queue: string - data: any - opts?: JobOptions +function jobToJobInformation(job: Job): JobInformation { + let cron = "" + let every = -1 + let tz: string | undefined = undefined + let endDate: number | undefined = undefined + + const repeat = job.opts?.repeat + if (repeat) { + endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : Date.now() + tz = repeat.tz + if ("cron" in repeat) { + cron = repeat.cron + } else { + every = repeat.every + } + } + + return { + id: job.id.toString(), + name: "", + key: job.id.toString(), + tz, + endDate, + cron, + every, + next: 0, + } } -/** - * Bull works with a Job wrapper around all messages that contains a lot more information about - * the state of the message, this object constructor implements the same schema of Bull jobs - * for the sake of maintaining API consistency. - * @param queue The name of the queue which the message will be carried on. - * @param message The JSON message which will be passed back to the consumer. - * @returns A new job which can now be put onto the queue, this is mostly an - * internal structure so that an in memory queue can be easily swapped for a Bull queue. - */ -function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { - return { - id: newid(), - timestamp: Date.now(), - queue: queue, - data: message, - opts, - } +interface JobMessage extends Partial> { + id: string + timestamp: number + queue: Queue + data: any + opts?: JobOptions } /** @@ -40,7 +52,7 @@ class InMemoryQueue implements Partial { _opts?: QueueOptions _messages: JobMessage[] _queuedJobIds: Set - _emitter: NodeJS.EventEmitter + _emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }> _runCount: number _addCount: number @@ -70,34 +82,29 @@ class InMemoryQueue implements Partial { */ async process(concurrencyOrFunc: number | any, func?: any) { func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc - this._emitter.on("message", async () => { - if (this._messages.length <= 0) { - return - } - let msg = this._messages.shift() - - let resp = func(msg) + this._emitter.on("message", async message => { + let resp = func(message) async function retryFunc(fnc: any) { try { await fnc } catch (e: any) { - await new Promise(r => setTimeout(() => r(), 50)) - - await retryFunc(func(msg)) + await helpers.wait(50) + await retryFunc(func(message)) } } if (resp.then != null) { try { await retryFunc(resp) + this._emitter.emit("completed", message as Job) } catch (e: any) { console.error(e) } } this._runCount++ - const jobId = msg?.opts?.jobId?.toString() - if (jobId && msg?.opts?.removeOnComplete) { + const jobId = message.opts?.jobId?.toString() + if (jobId && message.opts?.removeOnComplete) { this._queuedJobIds.delete(jobId) } }) @@ -131,9 +138,16 @@ class InMemoryQueue implements Partial { } const pushMessage = () => { - this._messages.push(newJob(this._name, data, opts)) + const message: JobMessage = { + id: newid(), + timestamp: Date.now(), + queue: this as unknown as Queue, + data, + opts, + } + this._messages.push(message) this._addCount++ - this._emitter.emit("message") + this._emitter.emit("message", message) } const delay = opts?.delay @@ -159,13 +173,6 @@ class InMemoryQueue implements Partial { console.log(cronJobId) } - /** - * Implemented for tests - */ - async getRepeatableJobs() { - return [] - } - async removeJobs(_pattern: string) { // no-op } @@ -177,13 +184,31 @@ class InMemoryQueue implements Partial { return [] } - async getJob() { + async getJob(id: JobId) { + for (const message of this._messages) { + if (message.id === id) { + return message as Job + } + } return null } - on() { - // do nothing - return this as any + on(event: string, callback: (...args: any[]) => void): Queue { + // @ts-expect-error - this callback can be one of many types + this._emitter.on(event, callback) + return this as unknown as Queue + } + + async count() { + return this._messages.length + } + + async getCompletedCount() { + return this._runCount + } + + async getRepeatableJobs() { + return this._messages.map(job => jobToJobInformation(job as Job)) } } diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index af82eb3797..c377554df5 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -1,6 +1,7 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import TestConfiguration from "../../../tests/utilities/TestConfiguration" import { getQueue } from "../.." +import { Job } from "bull" describe("cron trigger", () => { const config = new TestConfiguration() @@ -13,9 +14,15 @@ describe("cron trigger", () => { config.end() }) - it("should run the webhook automation - checking for parameters", async () => { + it("should queue a Bull cron job", async () => { const queue = getQueue() - expect(await queue.count()).toEqual(0) + expect(await queue.getCompletedCount()).toEqual(0) + + const jobPromise = new Promise(resolve => { + queue.on("completed", async job => { + resolve(job) + }) + }) await createAutomationBuilder({ config }) .cron({ cron: "* * * * *" }) @@ -26,6 +33,13 @@ describe("cron trigger", () => { await config.publish() - expect(await queue.count()).toEqual(1) + expect(await queue.getCompletedCount()).toEqual(1) + + const job = await jobPromise + const repeat = job.opts?.repeat + if (!repeat || !("cron" in repeat)) { + throw new Error("Expected cron repeat") + } + expect(repeat.cron).toEqual("* * * * *") }) })