diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 45d88a7a38..daf9efa054 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -3,6 +3,7 @@ import { newid } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" import { helpers } from "@budibase/shared-core" import { Job, JobId, JobInformation } from "bull" +import { cloneDeep } from "lodash" function jobToJobInformation(job: Job): JobInformation { let cron = "" @@ -33,7 +34,7 @@ function jobToJobInformation(job: Job): JobInformation { } } -interface JobMessage extends Partial> { +export interface TestQueueMessage extends Partial> { id: string timestamp: number queue: Queue @@ -47,15 +48,15 @@ interface JobMessage extends Partial> { * internally to register when messages are available to the consumers - in can * support many inputs and many consumers. */ -class InMemoryQueue implements Partial { +export class InMemoryQueue implements Partial> { _name: string _opts?: QueueOptions - _messages: JobMessage[] + _messages: TestQueueMessage[] _queuedJobIds: Set _emitter: NodeJS.EventEmitter<{ - message: [JobMessage] - completed: [Job] - removed: [JobMessage] + message: [TestQueueMessage] + completed: [Job] + removed: [TestQueueMessage] }> _runCount: number _addCount: number @@ -86,10 +87,13 @@ class InMemoryQueue implements Partial { */ async process(concurrencyOrFunc: number | any, func?: any) { func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc - this._emitter.on("message", async message => { + this._emitter.on("message", async msg => { + const message = cloneDeep(msg) + + const isManualTrigger = (message as any).manualTrigger === true // For the purpose of testing, don't trigger cron jobs immediately. // Require the test to trigger them manually with timestamps. - if (message.opts?.repeat != null) { + if (!isManualTrigger && message.opts?.repeat != null) { return } @@ -107,7 +111,7 @@ class InMemoryQueue implements Partial { if (resp.then != null) { try { await retryFunc(resp) - this._emitter.emit("completed", message as Job) + this._emitter.emit("completed", message as Job) } catch (e: any) { console.error(e) } @@ -124,7 +128,6 @@ class InMemoryQueue implements Partial { return this as any } - // simply puts a message to the queue and emits to the queue for processing /** * Simple function to replicate the add message functionality of Bull, putting * a new message on the queue. This then emits an event which will be used to @@ -133,7 +136,13 @@ class InMemoryQueue implements Partial { * a JSON message as this is required by Bull. * @param repeat serves no purpose for the import queue. */ - async add(data: any, opts?: JobOptions) { + async add(data: T | string, optsOrT?: JobOptions | T) { + if (typeof data === "string") { + throw new Error("doesn't support named jobs") + } + + const opts = optsOrT as JobOptions + const jobId = opts?.jobId?.toString() if (jobId && this._queuedJobIds.has(jobId)) { console.log(`Ignoring already queued job ${jobId}`) @@ -148,7 +157,7 @@ class InMemoryQueue implements Partial { } const pushMessage = () => { - const message: JobMessage = { + const message: TestQueueMessage = { id: newid(), timestamp: Date.now(), queue: this as unknown as Queue, @@ -176,7 +185,7 @@ class InMemoryQueue implements Partial { async removeRepeatableByKey(id: string) { for (const [idx, message] of this._messages.entries()) { - if (message.opts?.jobId?.toString() === id) { + if (message.id === id) { this._messages.splice(idx, 1) this._emitter.emit("removed", message) return @@ -204,6 +213,17 @@ class InMemoryQueue implements Partial { return null } + manualTrigger(id: JobId) { + for (const message of this._messages) { + if (message.id === id) { + const forceMessage = { ...message, manualTrigger: true } + this._emitter.emit("message", forceMessage) + return + } + } + throw new Error(`Job with id ${id} not found`) + } + on(event: string, callback: (...args: any[]) => void): Queue { // @ts-expect-error - this callback can be one of many types this._emitter.on(event, callback) diff --git a/packages/backend-core/src/queue/index.ts b/packages/backend-core/src/queue/index.ts index b7d565ba13..5603c40513 100644 --- a/packages/backend-core/src/queue/index.ts +++ b/packages/backend-core/src/queue/index.ts @@ -1,2 +1,3 @@ export * from "./queue" export * from "./constants" +export * from "./inMemoryQueue" diff --git a/packages/server/src/automations/logging/index.ts b/packages/server/src/automations/logging/index.ts index 9d16f15a67..ed8dd9db88 100644 --- a/packages/server/src/automations/logging/index.ts +++ b/packages/server/src/automations/logging/index.ts @@ -1,7 +1,7 @@ import env from "../../environment" import { AutomationResults, Automation, App } from "@budibase/types" import { automations } from "@budibase/pro" -import { db as dbUtils } from "@budibase/backend-core" +import { db as dbUtils, logging } from "@budibase/backend-core" import sizeof from "object-sizeof" const MAX_LOG_SIZE_MB = 5 @@ -32,7 +32,16 @@ export async function storeLog( if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) { sanitiseResults(results) } - await automations.logs.storeLog(automation, results) + try { + await automations.logs.storeLog(automation, results) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) + } } export async function checkAppMetadata(apps: App[]) { diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index 61ed8b88c7..3250d29fc7 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -1,11 +1,15 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import TestConfiguration from "../../../tests/utilities/TestConfiguration" import { - captureAutomationQueueMessages, + captureAutomationMessages, + captureAutomationRemovals, captureAutomationResults, + triggerCron, } from "../utilities" import { automations } from "@budibase/pro" -import { AutomationStatus } from "@budibase/types" +import { AutomationData, AutomationStatus } from "@budibase/types" +import { MAX_AUTOMATION_RECURRING_ERRORS } from "../../../constants" +import { Job } from "bull" describe("cron trigger", () => { const config = new TestConfiguration() @@ -33,7 +37,7 @@ describe("cron trigger", () => { }) .save() - const messages = await captureAutomationQueueMessages(automation, () => + const messages = await captureAutomationMessages(automation, () => config.api.application.publish() ) expect(messages).toHaveLength(1) @@ -62,8 +66,8 @@ describe("cron trigger", () => { }) }) - it.only("should stop if the job fails more than 3 times", async () => { - const runner = await createAutomationBuilder(config) + it("should stop if the job fails more than N times", async () => { + const { automation } = await createAutomationBuilder(config) .onCron({ cron: "* * * * *" }) .queryRows({ // @ts-expect-error intentionally sending invalid data @@ -71,28 +75,31 @@ describe("cron trigger", () => { }) .save() - await config.api.application.publish() - - const results = await captureAutomationResults( - runner.automation, - async () => { - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - await runner.trigger({ timeout: 1000, fields: {} }) - } + const [message] = await captureAutomationMessages(automation, () => + config.api.application.publish() ) - expect(results).toHaveLength(5) - await config.withProdApp(async () => { - const logs = await automations.logs.logSearch({ - automationId: runner.automation._id, + let results: Job[] = [] + const removed = await captureAutomationRemovals(automation, async () => { + results = await captureAutomationResults(automation, async () => { + for (let i = 0; i < MAX_AUTOMATION_RECURRING_ERRORS; i++) { + triggerCron(message) + } + }) + }) + + expect(removed).toHaveLength(1) + expect(removed[0].id).toEqual(message.id) + + expect(results).toHaveLength(5) + + const search = await automations.logs.logSearch({ + automationId: automation._id, status: AutomationStatus.STOPPED_ERROR, }) - expect(logs.data).toHaveLength(1) - expect(logs.data[0].status).toEqual(AutomationStatus.STOPPED_ERROR) + expect(search.data).toHaveLength(1) + expect(search.data[0].status).toEqual(AutomationStatus.STOPPED_ERROR) }) }) diff --git a/packages/server/src/automations/tests/utilities/index.ts b/packages/server/src/automations/tests/utilities/index.ts index fcde6170f2..0648a5cb0a 100644 --- a/packages/server/src/automations/tests/utilities/index.ts +++ b/packages/server/src/automations/tests/utilities/index.ts @@ -6,6 +6,7 @@ import { Knex } from "knex" import { getQueue } from "../.." import { Job } from "bull" import { helpers } from "@budibase/shared-core" +import { queue } from "@budibase/backend-core" let config: TestConfiguration @@ -20,6 +21,17 @@ export function afterAll() { config.end() } +export function getTestQueue(): queue.InMemoryQueue { + return getQueue() as unknown as queue.InMemoryQueue +} + +export function triggerCron(message: Job) { + if (!message.opts?.repeat || !("cron" in message.opts.repeat)) { + throw new Error("Expected cron message") + } + getTestQueue().manualTrigger(message.id) +} + export async function runInProd(fn: any) { env._set("NODE_ENV", "production") let error @@ -34,9 +46,41 @@ export async function runInProd(fn: any) { } } -export async function captureAllAutomationQueueMessages( +export async function captureAllAutomationRemovals(f: () => Promise) { + const messages: Job[] = [] + const queue = getQueue() + + const messageListener = async (message: Job) => { + messages.push(message) + } + + queue.on("removed", messageListener) + try { + await f() + // Queue messages tend to be send asynchronously in API handlers, so there's + // no guarantee that awaiting this function will have queued anything yet. + // We wait here to make sure we're queued _after_ any existing async work. + await helpers.wait(100) + } finally { + queue.off("removed", messageListener) + } + + return messages +} + +export async function captureAutomationRemovals( + automation: Automation | string, f: () => Promise ) { + const messages = await captureAllAutomationRemovals(f) + return messages.filter( + m => + m.data.automation._id === + (typeof automation === "string" ? automation : automation._id) + ) +} + +export async function captureAllAutomationMessages(f: () => Promise) { const messages: Job[] = [] const queue = getQueue() @@ -58,11 +102,11 @@ export async function captureAllAutomationQueueMessages( return messages } -export async function captureAutomationQueueMessages( +export async function captureAutomationMessages( automation: Automation | string, f: () => Promise ) { - const messages = await captureAllAutomationQueueMessages(f) + const messages = await captureAllAutomationMessages(f) return messages.filter( m => m.data.automation._id === @@ -87,7 +131,8 @@ export async function captureAllAutomationResults( } const messageListener = async (message: Job) => { // Don't count cron messages, as they don't get triggered automatically. - if (message.opts?.repeat != null) { + const isManualTrigger = (message as any).manualTrigger === true + if (!isManualTrigger && message.opts?.repeat != null) { return } messagesOutstanding++ diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 5257d7d0d0..6ee467023f 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -234,13 +234,6 @@ class Orchestrator { return this.job.data.event.appId! } - private async getMetadata(): Promise { - const id = generateAutomationMetadataID(this.automation._id!) - const db = context.getAppDB() - const doc = await db.tryGet(id) - return doc || { _id: id, errorCount: 0 } - } - isCron(): boolean { return this.automation.definition.trigger.stepId === CRON_STEP_ID } @@ -259,44 +252,47 @@ class Orchestrator { if (result) { setTriggerOutput(result, { success: false, - status: AutomationStatus.STOPPED, + status: AutomationStatus.STOPPED_ERROR, }) await this.logResult(result) } } private async logResult(result: AutomationResults) { - try { - await storeLog(this.automation, result) - } catch (e: any) { - if (e.status === 413 && e.request?.data) { - // if content is too large we shouldn't log it - delete e.request.data - e.request.data = { message: "removed due to large size" } - } - logging.logAlert("Error writing automation log", e) - } + await storeLog(this.automation, result) + } + + async getMetadata(): Promise { + const metadataId = generateAutomationMetadataID(this.automation._id!) + const db = context.getAppDB() + const metadata = await db.tryGet(metadataId) + return metadata || { _id: metadataId, errorCount: 0 } } async incrementErrorCount() { - for (let attempt = 0; attempt < 3; attempt++) { + const db = context.getAppDB() + let err: Error | undefined = undefined + for (let attempt = 0; attempt < 10; attempt++) { const metadata = await this.getMetadata() metadata.errorCount ||= 0 metadata.errorCount++ - const db = context.getAppDB() try { await db.put(metadata) return metadata.errorCount - } catch (err) { - logging.logAlertWithInfo( - "Failed to update error count in automation metadata", - db.name, - this.automation._id!, - err - ) + } catch (error: any) { + err = error + await helpers.wait(1000 + Math.random() * 1000) } } + + logging.logAlertWithInfo( + "Failed to update error count in automation metadata", + db.name, + this.automation._id!, + err + ) + return undefined } private isProdApp(): boolean { @@ -306,7 +302,7 @@ class Orchestrator { hasErrored(context: AutomationContext): boolean { const [_trigger, ...steps] = context.steps for (const step of steps) { - if (step.outputs?.success === false) { + if (step.success === false) { return true } } @@ -374,7 +370,7 @@ class Orchestrator { } let errorCount = 0 - if (isProdAppID(this.appId) && this.isCron() && this.hasErrored(ctx)) { + if (this.isProdApp() && this.isCron() && this.hasErrored(ctx)) { errorCount = (await this.incrementErrorCount()) || 0 } @@ -612,7 +608,7 @@ export async function executeInThread( }) } -export const removeStalled = async (job: Job) => { +export const removeStalled = async (job: Job) => { const appId = job.data.event.appId if (!appId) { throw new Error("Unable to execute, event doesn't contain app ID.")