From 8c84a715cfb561688e232310fb991e050ccf051e Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Mon, 17 Feb 2025 17:39:38 +0000 Subject: [PATCH 1/6] Add tests for cron stopping. --- .../backend-core/src/queue/inMemoryQueue.ts | 31 ++++-- .../tests/steps/cron-automations.spec.ts | 45 --------- .../automations/tests/triggers/cron.spec.ts | 98 ++++++++++++++++++- .../tests/utilities/AutomationTestBuilder.ts | 32 +++++- .../src/automations/tests/utilities/index.ts | 59 ++++++++++- packages/server/src/automations/utils.ts | 2 +- .../src/tests/utilities/TestConfiguration.ts | 12 ++- packages/server/src/threads/automation.ts | 96 +++++++----------- packages/types/src/api/web/app/automation.ts | 1 + 9 files changed, 245 insertions(+), 131 deletions(-) delete mode 100644 packages/server/src/automations/tests/steps/cron-automations.spec.ts diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 867388ae07..45d88a7a38 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -52,7 +52,11 @@ class InMemoryQueue implements Partial { _opts?: QueueOptions _messages: JobMessage[] _queuedJobIds: Set - _emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }> + _emitter: NodeJS.EventEmitter<{ + message: [JobMessage] + completed: [Job] + removed: [JobMessage] + }> _runCount: number _addCount: number @@ -83,6 +87,12 @@ class InMemoryQueue implements Partial { async process(concurrencyOrFunc: number | any, func?: any) { func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc this._emitter.on("message", async message => { + // For the purpose of testing, don't trigger cron jobs immediately. + // Require the test to trigger them manually with timestamps. + if (message.opts?.repeat != null) { + return + } + let resp = func(message) async function retryFunc(fnc: any) { @@ -164,13 +174,14 @@ class InMemoryQueue implements Partial { */ async close() {} - /** - * This removes a cron which has been implemented, this is part of Bull API. - * @param cronJobId The cron which is to be removed. - */ - async removeRepeatableByKey(cronJobId: string) { - // TODO: implement for testing - console.log(cronJobId) + async removeRepeatableByKey(id: string) { + for (const [idx, message] of this._messages.entries()) { + if (message.opts?.jobId?.toString() === id) { + this._messages.splice(idx, 1) + this._emitter.emit("removed", message) + return + } + } } async removeJobs(_pattern: string) { @@ -214,7 +225,9 @@ class InMemoryQueue implements Partial { } async getRepeatableJobs() { - return this._messages.map(job => jobToJobInformation(job as Job)) + return this._messages + .filter(job => job.opts?.repeat != null) + .map(job => jobToJobInformation(job as Job)) } } diff --git a/packages/server/src/automations/tests/steps/cron-automations.spec.ts b/packages/server/src/automations/tests/steps/cron-automations.spec.ts deleted file mode 100644 index 41de957c52..0000000000 --- a/packages/server/src/automations/tests/steps/cron-automations.spec.ts +++ /dev/null @@ -1,45 +0,0 @@ -import tk from "timekeeper" -import "../../../environment" -import * as automations from "../../index" -import TestConfiguration from "../../../tests/utilities/TestConfiguration" -import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" - -const initialTime = Date.now() -tk.freeze(initialTime) - -const oneMinuteInMs = 60 * 1000 - -describe("cron automations", () => { - const config = new TestConfiguration() - - beforeAll(async () => { - await automations.init() - await config.init() - }) - - afterAll(async () => { - await automations.shutdown() - config.end() - }) - - beforeEach(() => { - tk.freeze(initialTime) - }) - - it("should initialise the automation timestamp", async () => { - await createAutomationBuilder(config).onCron({ cron: "* * * * *" }).save() - - tk.travel(Date.now() + oneMinuteInMs) - await config.publish() - - const { data } = await config.getAutomationLogs() - expect(data).toHaveLength(1) - expect(data).toEqual([ - expect.objectContaining({ - trigger: expect.objectContaining({ - outputs: { timestamp: initialTime + oneMinuteInMs }, - }), - }), - ]) - }) -}) diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index 3bbbbf0b0b..b445b28820 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -1,6 +1,11 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import TestConfiguration from "../../../tests/utilities/TestConfiguration" -import { captureAutomationResults } from "../utilities" +import { + captureAutomationQueueMessages, + captureAutomationResults, +} from "../utilities" +import { automations } from "@budibase/pro" +import { AutomationStatus } from "@budibase/types" describe("cron trigger", () => { const config = new TestConfiguration() @@ -13,6 +18,13 @@ describe("cron trigger", () => { config.end() }) + beforeEach(async () => { + const { automations } = await config.api.automation.fetch() + for (const automation of automations) { + await config.api.automation.delete(automation) + } + }) + it("should queue a Bull cron job", async () => { const { automation } = await createAutomationBuilder(config) .onCron({ cron: "* * * * *" }) @@ -21,12 +33,12 @@ describe("cron trigger", () => { }) .save() - const jobs = await captureAutomationResults(automation, () => + const messages = await captureAutomationQueueMessages(automation, () => config.api.application.publish() ) - expect(jobs).toHaveLength(1) + expect(messages).toHaveLength(1) - const repeat = jobs[0].opts?.repeat + const repeat = messages[0].opts?.repeat if (!repeat || !("cron" in repeat)) { throw new Error("Expected cron repeat") } @@ -49,4 +61,82 @@ describe("cron trigger", () => { }, }) }) + + it("should stop if the job fails more than 3 times", async () => { + const runner = await createAutomationBuilder(config) + .onCron({ cron: "* * * * *" }) + .queryRows({ + // @ts-expect-error intentionally sending invalid data + tableId: null, + }) + .save() + + await config.api.application.publish() + + const results = await captureAutomationResults( + runner.automation, + async () => { + await runner.trigger({ timeout: 1000, fields: {} }) + await runner.trigger({ timeout: 1000, fields: {} }) + await runner.trigger({ timeout: 1000, fields: {} }) + await runner.trigger({ timeout: 1000, fields: {} }) + await runner.trigger({ timeout: 1000, fields: {} }) + } + ) + + expect(results).toHaveLength(5) + + await config.withProdApp(async () => { + const { + data: [latest, ..._], + } = await automations.logs.logSearch({ + automationId: runner.automation._id, + }) + expect(latest.status).toEqual(AutomationStatus.STOPPED_ERROR) + }) + }) + + it("should fill in the timestamp if one is not provided", async () => { + const runner = await createAutomationBuilder(config) + .onCron({ cron: "* * * * *" }) + .serverLog({ + text: "Hello, world!", + }) + .save() + + await config.api.application.publish() + + const results = await captureAutomationResults( + runner.automation, + async () => { + await runner.trigger({ timeout: 1000, fields: {} }) + } + ) + expect(results).toHaveLength(1) + expect(results[0].data.event.timestamp).toBeWithin( + Date.now() - 1000, + Date.now() + 1000 + ) + }) + + it("should use the given timestamp if one is given", async () => { + const timestamp = 1234 + const runner = await createAutomationBuilder(config) + .onCron({ cron: "* * * * *" }) + .serverLog({ + text: "Hello, world!", + }) + .save() + + await config.api.application.publish() + + const results = await captureAutomationResults( + runner.automation, + async () => { + await runner.trigger({ timeout: 1000, fields: {}, timestamp }) + } + ) + expect(results).toHaveLength(1) + expect(results[0].data.event.timestamp).toEqual(timestamp) + }) }) diff --git a/packages/server/src/automations/tests/utilities/AutomationTestBuilder.ts b/packages/server/src/automations/tests/utilities/AutomationTestBuilder.ts index d707430a35..a95e58f69d 100644 --- a/packages/server/src/automations/tests/utilities/AutomationTestBuilder.ts +++ b/packages/server/src/automations/tests/utilities/AutomationTestBuilder.ts @@ -220,10 +220,34 @@ class AutomationRunner { async trigger( request: TriggerAutomationRequest ): Promise { - return await this.config.api.automation.trigger( - this.automation._id!, - request - ) + if (!this.config.prodAppId) { + throw new Error( + "Automations can only be triggered in a production app context, call config.api.application.publish()" + ) + } + // Because you can only trigger automations in a production app context, we + // wrap the trigger call to make tests a bit cleaner. If you really want to + // test triggering an automation in a dev app context, you can use the + // automation API directly. + return await this.config.withProdApp(async () => { + try { + return await this.config.api.automation.trigger( + this.automation._id!, + request + ) + } catch (e: any) { + if (e.cause.status === 404) { + throw new Error( + `Automation with ID ${ + this.automation._id + } not found in app ${this.config.getAppId()}. You may have forgotten to call config.api.application.publish().`, + { cause: e } + ) + } else { + throw e + } + } + }) } } diff --git a/packages/server/src/automations/tests/utilities/index.ts b/packages/server/src/automations/tests/utilities/index.ts index 44f6d3b8ca..fcde6170f2 100644 --- a/packages/server/src/automations/tests/utilities/index.ts +++ b/packages/server/src/automations/tests/utilities/index.ts @@ -34,6 +34,42 @@ export async function runInProd(fn: any) { } } +export async function captureAllAutomationQueueMessages( + f: () => Promise +) { + const messages: Job[] = [] + const queue = getQueue() + + const messageListener = async (message: Job) => { + messages.push(message) + } + + queue.on("message", messageListener) + try { + await f() + // Queue messages tend to be send asynchronously in API handlers, so there's + // no guarantee that awaiting this function will have queued anything yet. + // We wait here to make sure we're queued _after_ any existing async work. + await helpers.wait(100) + } finally { + queue.off("message", messageListener) + } + + return messages +} + +export async function captureAutomationQueueMessages( + automation: Automation | string, + f: () => Promise +) { + const messages = await captureAllAutomationQueueMessages(f) + return messages.filter( + m => + m.data.automation._id === + (typeof automation === "string" ? automation : automation._id) + ) +} + /** * Capture all automation runs that occur during the execution of a function. * This function will wait for all messages to be processed before returning. @@ -43,14 +79,18 @@ export async function captureAllAutomationResults( ): Promise[]> { const runs: Job[] = [] const queue = getQueue() - let messagesReceived = 0 + let messagesOutstanding = 0 const completedListener = async (job: Job) => { runs.push(job) - messagesReceived-- + messagesOutstanding-- } - const messageListener = async () => { - messagesReceived++ + const messageListener = async (message: Job) => { + // Don't count cron messages, as they don't get triggered automatically. + if (message.opts?.repeat != null) { + return + } + messagesOutstanding++ } queue.on("message", messageListener) queue.on("completed", completedListener) @@ -61,9 +101,18 @@ export async function captureAllAutomationResults( // We wait here to make sure we're queued _after_ any existing async work. await helpers.wait(100) } finally { + const waitMax = 10000 + let waited = 0 // eslint-disable-next-line no-unmodified-loop-condition - while (messagesReceived > 0) { + while (messagesOutstanding > 0) { await helpers.wait(50) + waited += 50 + if (waited > waitMax) { + // eslint-disable-next-line no-unsafe-finally + throw new Error( + `Timed out waiting for automation runs to complete. ${messagesOutstanding} messages waiting for completion.` + ) + } } queue.off("completed", completedListener) queue.off("message", messageListener) diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 83665fc975..9af166bcf9 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -72,7 +72,7 @@ export async function processEvent(job: AutomationJob) { const task = async () => { try { - if (isCronTrigger(job.data.automation)) { + if (isCronTrigger(job.data.automation) && !job.data.event.timestamp) { // Requires the timestamp at run time job.data.event.timestamp = Date.now() } diff --git a/packages/server/src/tests/utilities/TestConfiguration.ts b/packages/server/src/tests/utilities/TestConfiguration.ts index 1f464b2ea4..edb397169d 100644 --- a/packages/server/src/tests/utilities/TestConfiguration.ts +++ b/packages/server/src/tests/utilities/TestConfiguration.ts @@ -261,11 +261,13 @@ export default class TestConfiguration { async withApp(app: App | string, f: () => Promise) { const oldAppId = this.appId this.appId = typeof app === "string" ? app : app.appId - try { - return await f() - } finally { - this.appId = oldAppId - } + return await context.doInAppContext(this.appId, async () => { + try { + return await f() + } finally { + this.appId = oldAppId + } + }) } async withProdApp(f: () => Promise) { diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 174efa0fe0..039c3636f0 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -155,23 +155,12 @@ class Orchestrator { return step } - async getMetadata(): Promise { - const metadataId = generateAutomationMetadataID(this.automation._id!) - const db = context.getAppDB() - let metadata: AutomationMetadata - try { - metadata = await db.get(metadataId) - } catch (err) { - metadata = { - _id: metadataId, - errorCount: 0, - } - } - return metadata + isCron(): boolean { + return isRecurring(this.automation) } async stopCron(reason: string) { - if (!this.job.opts.repeat) { + if (!this.isCron()) { return } logging.logWarn( @@ -192,44 +181,42 @@ class Orchestrator { await storeLog(automation, this.executionOutput) } - async checkIfShouldStop(metadata: AutomationMetadata): Promise { - if (!metadata.errorCount || !this.job.opts.repeat) { + async checkIfShouldStop(): Promise { + const metadata = await this.getMetadata() + if (!metadata.errorCount || !this.isCron()) { return false } if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { - await this.stopCron("errors") return true } return false } - async updateMetadata(metadata: AutomationMetadata) { - const output = this.executionOutput, - automation = this.automation - if (!output || !isRecurring(automation)) { - return - } - const count = metadata.errorCount - const isError = isErrorInOutput(output) - // nothing to do in this scenario, escape - if (!count && !isError) { - return - } - if (isError) { - metadata.errorCount = count ? count + 1 : 1 - } else { - metadata.errorCount = 0 - } + async getMetadata(): Promise { + const metadataId = generateAutomationMetadataID(this.automation._id!) const db = context.getAppDB() - try { - await db.put(metadata) - } catch (err) { - logging.logAlertWithInfo( - "Failed to write automation metadata", - db.name, - automation._id!, - err - ) + const metadata = await db.tryGet(metadataId) + return metadata || { _id: metadataId, errorCount: 0 } + } + + async incrementErrorCount() { + for (let attempt = 0; attempt < 3; attempt++) { + const metadata = await this.getMetadata() + metadata.errorCount ||= 0 + metadata.errorCount++ + + const db = context.getAppDB() + try { + await db.put(metadata) + return + } catch (err) { + logging.logAlertWithInfo( + "Failed to update error count in automation metadata", + db.name, + this.automation._id!, + err + ) + } } } @@ -293,18 +280,6 @@ class Orchestrator { await enrichBaseContext(this.context) this.context.user = this.currentUser - let metadata - - // check if this is a recurring automation, - if (isProdAppID(this.appId) && isRecurring(this.automation)) { - span?.addTags({ recurring: true }) - metadata = await this.getMetadata() - const shouldStop = await this.checkIfShouldStop(metadata) - if (shouldStop) { - span?.addTags({ shouldStop: true }) - return - } - } const start = performance.now() await this.executeSteps(this.automation.definition.steps) @@ -332,10 +307,15 @@ class Orchestrator { } if ( isProdAppID(this.appId) && - isRecurring(this.automation) && - metadata + this.isCron() && + isErrorInOutput(this.executionOutput) ) { - await this.updateMetadata(metadata) + await this.incrementErrorCount() + if (await this.checkIfShouldStop()) { + await this.stopCron("errors") + span?.addTags({ shouldStop: true }) + return + } } return this.executionOutput } diff --git a/packages/types/src/api/web/app/automation.ts b/packages/types/src/api/web/app/automation.ts index f72966d100..6b6c63a261 100644 --- a/packages/types/src/api/web/app/automation.ts +++ b/packages/types/src/api/web/app/automation.ts @@ -65,6 +65,7 @@ export interface ClearAutomationLogResponse { export interface TriggerAutomationRequest { fields: Record + timestamp?: number // time in seconds timeout: number } From a5709c9de5675cb87b48d29d8a3ad304e718a48e Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Feb 2025 10:48:16 +0000 Subject: [PATCH 2/6] Test cron removal. --- .../backend-core/src/queue/inMemoryQueue.ts | 47 ++++++++++---- packages/backend-core/src/queue/index.ts | 1 + .../server/src/automations/logging/index.ts | 13 +++- .../automations/tests/triggers/cron.spec.ts | 53 ++++++++------- .../src/automations/tests/utilities/index.ts | 53 +++++++++++++-- packages/server/src/threads/automation.ts | 64 ++++++++----------- 6 files changed, 151 insertions(+), 80 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 45d88a7a38..9a6375e485 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -3,6 +3,7 @@ import { newid } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" import { helpers } from "@budibase/shared-core" import { Job, JobId, JobInformation } from "bull" +import { cloneDeep } from "lodash" function jobToJobInformation(job: Job): JobInformation { let cron = "" @@ -33,7 +34,7 @@ function jobToJobInformation(job: Job): JobInformation { } } -interface JobMessage extends Partial> { +export interface TestQueueMessage extends Partial> { id: string timestamp: number queue: Queue @@ -47,15 +48,15 @@ interface JobMessage extends Partial> { * internally to register when messages are available to the consumers - in can * support many inputs and many consumers. */ -class InMemoryQueue implements Partial { +export class InMemoryQueue implements Partial> { _name: string _opts?: QueueOptions - _messages: JobMessage[] + _messages: TestQueueMessage[] _queuedJobIds: Set _emitter: NodeJS.EventEmitter<{ - message: [JobMessage] - completed: [Job] - removed: [JobMessage] + message: [TestQueueMessage] + completed: [Job] + removed: [TestQueueMessage] }> _runCount: number _addCount: number @@ -86,10 +87,13 @@ class InMemoryQueue implements Partial { */ async process(concurrencyOrFunc: number | any, func?: any) { func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc - this._emitter.on("message", async message => { + this._emitter.on("message", async msg => { + const message = cloneDeep(msg) + + const isManualTrigger = (message as any).manualTrigger === true // For the purpose of testing, don't trigger cron jobs immediately. // Require the test to trigger them manually with timestamps. - if (message.opts?.repeat != null) { + if (!isManualTrigger && message.opts?.repeat != null) { return } @@ -107,7 +111,7 @@ class InMemoryQueue implements Partial { if (resp.then != null) { try { await retryFunc(resp) - this._emitter.emit("completed", message as Job) + this._emitter.emit("completed", message as Job) } catch (e: any) { console.error(e) } @@ -124,7 +128,6 @@ class InMemoryQueue implements Partial { return this as any } - // simply puts a message to the queue and emits to the queue for processing /** * Simple function to replicate the add message functionality of Bull, putting * a new message on the queue. This then emits an event which will be used to @@ -133,7 +136,14 @@ class InMemoryQueue implements Partial { * a JSON message as this is required by Bull. * @param repeat serves no purpose for the import queue. */ - async add(data: any, opts?: JobOptions) { + // add(name: string, data: T, opts?: JobOptions): Promise>; + async add(data: T | string, optsOrT?: JobOptions | T) { + if (typeof data === "string") { + throw new Error("doesn't support named jobs") + } + + const opts = optsOrT as JobOptions + const jobId = opts?.jobId?.toString() if (jobId && this._queuedJobIds.has(jobId)) { console.log(`Ignoring already queued job ${jobId}`) @@ -148,7 +158,7 @@ class InMemoryQueue implements Partial { } const pushMessage = () => { - const message: JobMessage = { + const message: TestQueueMessage = { id: newid(), timestamp: Date.now(), queue: this as unknown as Queue, @@ -176,7 +186,7 @@ class InMemoryQueue implements Partial { async removeRepeatableByKey(id: string) { for (const [idx, message] of this._messages.entries()) { - if (message.opts?.jobId?.toString() === id) { + if (message.id === id) { this._messages.splice(idx, 1) this._emitter.emit("removed", message) return @@ -204,6 +214,17 @@ class InMemoryQueue implements Partial { return null } + manualTrigger(id: JobId) { + for (const message of this._messages) { + if (message.id === id) { + const forceMessage = { ...message, manualTrigger: true } + this._emitter.emit("message", forceMessage) + return + } + } + throw new Error(`Job with id ${id} not found`) + } + on(event: string, callback: (...args: any[]) => void): Queue { // @ts-expect-error - this callback can be one of many types this._emitter.on(event, callback) diff --git a/packages/backend-core/src/queue/index.ts b/packages/backend-core/src/queue/index.ts index b7d565ba13..5603c40513 100644 --- a/packages/backend-core/src/queue/index.ts +++ b/packages/backend-core/src/queue/index.ts @@ -1,2 +1,3 @@ export * from "./queue" export * from "./constants" +export * from "./inMemoryQueue" diff --git a/packages/server/src/automations/logging/index.ts b/packages/server/src/automations/logging/index.ts index 9d16f15a67..ed8dd9db88 100644 --- a/packages/server/src/automations/logging/index.ts +++ b/packages/server/src/automations/logging/index.ts @@ -1,7 +1,7 @@ import env from "../../environment" import { AutomationResults, Automation, App } from "@budibase/types" import { automations } from "@budibase/pro" -import { db as dbUtils } from "@budibase/backend-core" +import { db as dbUtils, logging } from "@budibase/backend-core" import sizeof from "object-sizeof" const MAX_LOG_SIZE_MB = 5 @@ -32,7 +32,16 @@ export async function storeLog( if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) { sanitiseResults(results) } - await automations.logs.storeLog(automation, results) + try { + await automations.logs.storeLog(automation, results) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) + } } export async function checkAppMetadata(apps: App[]) { diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index b445b28820..8f29d2aff2 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -1,11 +1,15 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import TestConfiguration from "../../../tests/utilities/TestConfiguration" import { - captureAutomationQueueMessages, + captureAutomationMessages, + captureAutomationRemovals, captureAutomationResults, + triggerCron, } from "../utilities" import { automations } from "@budibase/pro" -import { AutomationStatus } from "@budibase/types" +import { AutomationData, AutomationStatus } from "@budibase/types" +import { MAX_AUTOMATION_RECURRING_ERRORS } from "../../../constants" +import { Job } from "bull" describe("cron trigger", () => { const config = new TestConfiguration() @@ -33,7 +37,7 @@ describe("cron trigger", () => { }) .save() - const messages = await captureAutomationQueueMessages(automation, () => + const messages = await captureAutomationMessages(automation, () => config.api.application.publish() ) expect(messages).toHaveLength(1) @@ -62,8 +66,8 @@ describe("cron trigger", () => { }) }) - it("should stop if the job fails more than 3 times", async () => { - const runner = await createAutomationBuilder(config) + it.only("should stop if the job fails more than 3 times", async () => { + const { automation } = await createAutomationBuilder(config) .onCron({ cron: "* * * * *" }) .queryRows({ // @ts-expect-error intentionally sending invalid data @@ -71,28 +75,31 @@ describe("cron trigger", () => { }) .save() - await config.api.application.publish() - - const results = await captureAutomationResults( - runner.automation, - async () => { - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - } + const [message] = await captureAutomationMessages(automation, () => + config.api.application.publish() ) - expect(results).toHaveLength(5) - await config.withProdApp(async () => { - const { - data: [latest, ..._], - } = await automations.logs.logSearch({ - automationId: runner.automation._id, + let results: Job[] = [] + const removed = await captureAutomationRemovals(automation, async () => { + results = await captureAutomationResults(automation, async () => { + for (let i = 0; i < MAX_AUTOMATION_RECURRING_ERRORS; i++) { + triggerCron(message) + } + }) }) - expect(latest.status).toEqual(AutomationStatus.STOPPED_ERROR) + + expect(removed).toHaveLength(1) + expect(removed[0].id).toEqual(message.id) + + expect(results).toHaveLength(5) + + const search = await automations.logs.logSearch({ + automationId: automation._id, + status: AutomationStatus.STOPPED_ERROR, + }) + expect(search.data).toHaveLength(1) + expect(search.data[0].status).toEqual(AutomationStatus.STOPPED_ERROR) }) }) diff --git a/packages/server/src/automations/tests/utilities/index.ts b/packages/server/src/automations/tests/utilities/index.ts index fcde6170f2..0648a5cb0a 100644 --- a/packages/server/src/automations/tests/utilities/index.ts +++ b/packages/server/src/automations/tests/utilities/index.ts @@ -6,6 +6,7 @@ import { Knex } from "knex" import { getQueue } from "../.." import { Job } from "bull" import { helpers } from "@budibase/shared-core" +import { queue } from "@budibase/backend-core" let config: TestConfiguration @@ -20,6 +21,17 @@ export function afterAll() { config.end() } +export function getTestQueue(): queue.InMemoryQueue { + return getQueue() as unknown as queue.InMemoryQueue +} + +export function triggerCron(message: Job) { + if (!message.opts?.repeat || !("cron" in message.opts.repeat)) { + throw new Error("Expected cron message") + } + getTestQueue().manualTrigger(message.id) +} + export async function runInProd(fn: any) { env._set("NODE_ENV", "production") let error @@ -34,9 +46,41 @@ export async function runInProd(fn: any) { } } -export async function captureAllAutomationQueueMessages( +export async function captureAllAutomationRemovals(f: () => Promise) { + const messages: Job[] = [] + const queue = getQueue() + + const messageListener = async (message: Job) => { + messages.push(message) + } + + queue.on("removed", messageListener) + try { + await f() + // Queue messages tend to be send asynchronously in API handlers, so there's + // no guarantee that awaiting this function will have queued anything yet. + // We wait here to make sure we're queued _after_ any existing async work. + await helpers.wait(100) + } finally { + queue.off("removed", messageListener) + } + + return messages +} + +export async function captureAutomationRemovals( + automation: Automation | string, f: () => Promise ) { + const messages = await captureAllAutomationRemovals(f) + return messages.filter( + m => + m.data.automation._id === + (typeof automation === "string" ? automation : automation._id) + ) +} + +export async function captureAllAutomationMessages(f: () => Promise) { const messages: Job[] = [] const queue = getQueue() @@ -58,11 +102,11 @@ export async function captureAllAutomationQueueMessages( return messages } -export async function captureAutomationQueueMessages( +export async function captureAutomationMessages( automation: Automation | string, f: () => Promise ) { - const messages = await captureAllAutomationQueueMessages(f) + const messages = await captureAllAutomationMessages(f) return messages.filter( m => m.data.automation._id === @@ -87,7 +131,8 @@ export async function captureAllAutomationResults( } const messageListener = async (message: Job) => { // Don't count cron messages, as they don't get triggered automatically. - if (message.opts?.repeat != null) { + const isManualTrigger = (message as any).manualTrigger === true + if (!isManualTrigger && message.opts?.repeat != null) { return } messagesOutstanding++ diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 039c3636f0..409065927b 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -181,17 +181,6 @@ class Orchestrator { await storeLog(automation, this.executionOutput) } - async checkIfShouldStop(): Promise { - const metadata = await this.getMetadata() - if (!metadata.errorCount || !this.isCron()) { - return false - } - if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { - return true - } - return false - } - async getMetadata(): Promise { const metadataId = generateAutomationMetadataID(this.automation._id!) const db = context.getAppDB() @@ -200,24 +189,29 @@ class Orchestrator { } async incrementErrorCount() { - for (let attempt = 0; attempt < 3; attempt++) { + const db = context.getAppDB() + let err: Error | undefined = undefined + for (let attempt = 0; attempt < 10; attempt++) { const metadata = await this.getMetadata() metadata.errorCount ||= 0 metadata.errorCount++ - const db = context.getAppDB() try { await db.put(metadata) - return - } catch (err) { - logging.logAlertWithInfo( - "Failed to update error count in automation metadata", - db.name, - this.automation._id!, - err - ) + return metadata.errorCount + } catch (error: any) { + err = error + await helpers.wait(Math.random() * 10) } } + + logging.logAlertWithInfo( + "Failed to update error count in automation metadata", + db.name, + this.automation._id!, + err + ) + return undefined } updateExecutionOutput(id: string, stepId: string, inputs: any, outputs: any) { @@ -295,28 +289,22 @@ class Orchestrator { } ) - try { - await storeLog(this.automation, this.executionOutput) - } catch (e: any) { - if (e.status === 413 && e.request?.data) { - // if content is too large we shouldn't log it - delete e.request.data - e.request.data = { message: "removed due to large size" } - } - logging.logAlert("Error writing automation log", e) - } + let errorCount = 0 if ( isProdAppID(this.appId) && this.isCron() && isErrorInOutput(this.executionOutput) ) { - await this.incrementErrorCount() - if (await this.checkIfShouldStop()) { - await this.stopCron("errors") - span?.addTags({ shouldStop: true }) - return - } + errorCount = (await this.incrementErrorCount()) || 0 } + + if (errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { + await this.stopCron("errors") + span?.addTags({ shouldStop: true }) + } else { + await storeLog(this.automation, this.executionOutput) + } + return this.executionOutput } ) @@ -743,7 +731,7 @@ export async function executeInThread( })) as AutomationResponse } -export const removeStalled = async (job: Job) => { +export const removeStalled = async (job: Job) => { const appId = job.data.event.appId if (!appId) { throw new Error("Unable to execute, event doesn't contain app ID.") From 318c96e9c0c56962251dce09f23e169a79218da4 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Feb 2025 10:48:43 +0000 Subject: [PATCH 3/6] Remove .only --- packages/server/src/automations/tests/triggers/cron.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index 8f29d2aff2..b7a9f1f213 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -66,7 +66,7 @@ describe("cron trigger", () => { }) }) - it.only("should stop if the job fails more than 3 times", async () => { + it("should stop if the job fails more than 3 times", async () => { const { automation } = await createAutomationBuilder(config) .onCron({ cron: "* * * * *" }) .queryRows({ From 9c445c1a8c927173813524000282795d1267f9ab Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Feb 2025 11:07:48 +0000 Subject: [PATCH 4/6] Fix loop.spec.ts timeout failures. --- packages/server/src/automations/tests/steps/loop.spec.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/server/src/automations/tests/steps/loop.spec.ts b/packages/server/src/automations/tests/steps/loop.spec.ts index f8af7dcf9f..88e641f5ff 100644 --- a/packages/server/src/automations/tests/steps/loop.spec.ts +++ b/packages/server/src/automations/tests/steps/loop.spec.ts @@ -21,6 +21,11 @@ describe("Attempt to run a basic loop automation", () => { }) beforeEach(async () => { + const { automations } = await config.api.automation.fetch() + for (const automation of automations) { + await config.api.automation.delete(automation) + } + table = await config.api.table.save(basicTable()) await config.api.row.save(table._id!, {}) }) From 5aeac61cd1da0f21df63c2041baafc59315aa261 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Feb 2025 11:17:05 +0000 Subject: [PATCH 5/6] Cleanup. --- packages/backend-core/src/queue/inMemoryQueue.ts | 1 - packages/server/src/threads/automation.ts | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 9a6375e485..daf9efa054 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -136,7 +136,6 @@ export class InMemoryQueue implements Partial> { * a JSON message as this is required by Bull. * @param repeat serves no purpose for the import queue. */ - // add(name: string, data: T, opts?: JobOptions): Promise>; async add(data: T | string, optsOrT?: JobOptions | T) { if (typeof data === "string") { throw new Error("doesn't support named jobs") diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 409065927b..f854635559 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -201,7 +201,7 @@ class Orchestrator { return metadata.errorCount } catch (error: any) { err = error - await helpers.wait(Math.random() * 10) + await helpers.wait(1000 + Math.random() * 1000) } } From 6952ca325ace7894011b980d42ee94013317a794 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Feb 2025 14:01:32 +0000 Subject: [PATCH 6/6] Respond to Adri's feedback. --- packages/backend-core/src/queue/inMemoryQueue.ts | 7 +++---- .../src/automations/tests/triggers/cron.spec.ts | 4 ++-- .../src/automations/tests/utilities/index.ts | 15 +++++++++------ 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index daf9efa054..842d3243bc 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -40,6 +40,7 @@ export interface TestQueueMessage extends Partial> { queue: Queue data: any opts?: JobOptions + manualTrigger?: boolean } /** @@ -90,10 +91,9 @@ export class InMemoryQueue implements Partial> { this._emitter.on("message", async msg => { const message = cloneDeep(msg) - const isManualTrigger = (message as any).manualTrigger === true // For the purpose of testing, don't trigger cron jobs immediately. // Require the test to trigger them manually with timestamps. - if (!isManualTrigger && message.opts?.repeat != null) { + if (!message.manualTrigger && message.opts?.repeat != null) { return } @@ -216,8 +216,7 @@ export class InMemoryQueue implements Partial> { manualTrigger(id: JobId) { for (const message of this._messages) { if (message.id === id) { - const forceMessage = { ...message, manualTrigger: true } - this._emitter.emit("message", forceMessage) + this._emitter.emit("message", { ...message, manualTrigger: true }) return } } diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index b7a9f1f213..90d29a60c1 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -9,7 +9,7 @@ import { import { automations } from "@budibase/pro" import { AutomationData, AutomationStatus } from "@budibase/types" import { MAX_AUTOMATION_RECURRING_ERRORS } from "../../../constants" -import { Job } from "bull" +import { queue } from "@budibase/backend-core" describe("cron trigger", () => { const config = new TestConfiguration() @@ -80,7 +80,7 @@ describe("cron trigger", () => { ) await config.withProdApp(async () => { - let results: Job[] = [] + let results: queue.TestQueueMessage[] = [] const removed = await captureAutomationRemovals(automation, async () => { results = await captureAutomationResults(automation, async () => { for (let i = 0; i < MAX_AUTOMATION_RECURRING_ERRORS; i++) { diff --git a/packages/server/src/automations/tests/utilities/index.ts b/packages/server/src/automations/tests/utilities/index.ts index 0648a5cb0a..4b41f1d977 100644 --- a/packages/server/src/automations/tests/utilities/index.ts +++ b/packages/server/src/automations/tests/utilities/index.ts @@ -120,19 +120,22 @@ export async function captureAutomationMessages( */ export async function captureAllAutomationResults( f: () => Promise -): Promise[]> { - const runs: Job[] = [] +): Promise[]> { + const runs: queue.TestQueueMessage[] = [] const queue = getQueue() let messagesOutstanding = 0 - const completedListener = async (job: Job) => { + const completedListener = async ( + job: queue.TestQueueMessage + ) => { runs.push(job) messagesOutstanding-- } - const messageListener = async (message: Job) => { + const messageListener = async ( + message: queue.TestQueueMessage + ) => { // Don't count cron messages, as they don't get triggered automatically. - const isManualTrigger = (message as any).manualTrigger === true - if (!isManualTrigger && message.opts?.repeat != null) { + if (!message.manualTrigger && message.opts?.repeat != null) { return } messagesOutstanding++