Merge automation-tests-9

This commit is contained in:
Sam Rose 2025-02-18 11:59:14 +00:00
commit 2662e58f8b
No known key found for this signature in database
6 changed files with 149 additions and 71 deletions

View File

@ -3,6 +3,7 @@ import { newid } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue" import { Queue, QueueOptions, JobOptions } from "./queue"
import { helpers } from "@budibase/shared-core" import { helpers } from "@budibase/shared-core"
import { Job, JobId, JobInformation } from "bull" import { Job, JobId, JobInformation } from "bull"
import { cloneDeep } from "lodash"
function jobToJobInformation(job: Job): JobInformation { function jobToJobInformation(job: Job): JobInformation {
let cron = "" let cron = ""
@ -33,7 +34,7 @@ function jobToJobInformation(job: Job): JobInformation {
} }
} }
interface JobMessage<T = any> extends Partial<Job<T>> { export interface TestQueueMessage<T = any> extends Partial<Job<T>> {
id: string id: string
timestamp: number timestamp: number
queue: Queue<T> queue: Queue<T>
@ -47,15 +48,15 @@ interface JobMessage<T = any> extends Partial<Job<T>> {
* internally to register when messages are available to the consumers - in can * internally to register when messages are available to the consumers - in can
* support many inputs and many consumers. * support many inputs and many consumers.
*/ */
class InMemoryQueue implements Partial<Queue> { export class InMemoryQueue<T = any> implements Partial<Queue<T>> {
_name: string _name: string
_opts?: QueueOptions _opts?: QueueOptions
_messages: JobMessage[] _messages: TestQueueMessage<T>[]
_queuedJobIds: Set<string> _queuedJobIds: Set<string>
_emitter: NodeJS.EventEmitter<{ _emitter: NodeJS.EventEmitter<{
message: [JobMessage] message: [TestQueueMessage<T>]
completed: [Job] completed: [Job<T>]
removed: [JobMessage] removed: [TestQueueMessage<T>]
}> }>
_runCount: number _runCount: number
_addCount: number _addCount: number
@ -86,10 +87,13 @@ class InMemoryQueue implements Partial<Queue> {
*/ */
async process(concurrencyOrFunc: number | any, func?: any) { async process(concurrencyOrFunc: number | any, func?: any) {
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
this._emitter.on("message", async message => { this._emitter.on("message", async msg => {
const message = cloneDeep(msg)
const isManualTrigger = (message as any).manualTrigger === true
// For the purpose of testing, don't trigger cron jobs immediately. // For the purpose of testing, don't trigger cron jobs immediately.
// Require the test to trigger them manually with timestamps. // Require the test to trigger them manually with timestamps.
if (message.opts?.repeat != null) { if (!isManualTrigger && message.opts?.repeat != null) {
return return
} }
@ -107,7 +111,7 @@ class InMemoryQueue implements Partial<Queue> {
if (resp.then != null) { if (resp.then != null) {
try { try {
await retryFunc(resp) await retryFunc(resp)
this._emitter.emit("completed", message as Job) this._emitter.emit("completed", message as Job<T>)
} catch (e: any) { } catch (e: any) {
console.error(e) console.error(e)
} }
@ -124,7 +128,6 @@ class InMemoryQueue implements Partial<Queue> {
return this as any return this as any
} }
// simply puts a message to the queue and emits to the queue for processing
/** /**
* Simple function to replicate the add message functionality of Bull, putting * Simple function to replicate the add message functionality of Bull, putting
* a new message on the queue. This then emits an event which will be used to * a new message on the queue. This then emits an event which will be used to
@ -133,7 +136,13 @@ class InMemoryQueue implements Partial<Queue> {
* a JSON message as this is required by Bull. * a JSON message as this is required by Bull.
* @param repeat serves no purpose for the import queue. * @param repeat serves no purpose for the import queue.
*/ */
async add(data: any, opts?: JobOptions) { async add(data: T | string, optsOrT?: JobOptions | T) {
if (typeof data === "string") {
throw new Error("doesn't support named jobs")
}
const opts = optsOrT as JobOptions
const jobId = opts?.jobId?.toString() const jobId = opts?.jobId?.toString()
if (jobId && this._queuedJobIds.has(jobId)) { if (jobId && this._queuedJobIds.has(jobId)) {
console.log(`Ignoring already queued job ${jobId}`) console.log(`Ignoring already queued job ${jobId}`)
@ -148,7 +157,7 @@ class InMemoryQueue implements Partial<Queue> {
} }
const pushMessage = () => { const pushMessage = () => {
const message: JobMessage = { const message: TestQueueMessage = {
id: newid(), id: newid(),
timestamp: Date.now(), timestamp: Date.now(),
queue: this as unknown as Queue, queue: this as unknown as Queue,
@ -176,7 +185,7 @@ class InMemoryQueue implements Partial<Queue> {
async removeRepeatableByKey(id: string) { async removeRepeatableByKey(id: string) {
for (const [idx, message] of this._messages.entries()) { for (const [idx, message] of this._messages.entries()) {
if (message.opts?.jobId?.toString() === id) { if (message.id === id) {
this._messages.splice(idx, 1) this._messages.splice(idx, 1)
this._emitter.emit("removed", message) this._emitter.emit("removed", message)
return return
@ -204,6 +213,17 @@ class InMemoryQueue implements Partial<Queue> {
return null return null
} }
manualTrigger(id: JobId) {
for (const message of this._messages) {
if (message.id === id) {
const forceMessage = { ...message, manualTrigger: true }
this._emitter.emit("message", forceMessage)
return
}
}
throw new Error(`Job with id ${id} not found`)
}
on(event: string, callback: (...args: any[]) => void): Queue { on(event: string, callback: (...args: any[]) => void): Queue {
// @ts-expect-error - this callback can be one of many types // @ts-expect-error - this callback can be one of many types
this._emitter.on(event, callback) this._emitter.on(event, callback)

View File

@ -1,2 +1,3 @@
export * from "./queue" export * from "./queue"
export * from "./constants" export * from "./constants"
export * from "./inMemoryQueue"

View File

@ -1,7 +1,7 @@
import env from "../../environment" import env from "../../environment"
import { AutomationResults, Automation, App } from "@budibase/types" import { AutomationResults, Automation, App } from "@budibase/types"
import { automations } from "@budibase/pro" import { automations } from "@budibase/pro"
import { db as dbUtils } from "@budibase/backend-core" import { db as dbUtils, logging } from "@budibase/backend-core"
import sizeof from "object-sizeof" import sizeof from "object-sizeof"
const MAX_LOG_SIZE_MB = 5 const MAX_LOG_SIZE_MB = 5
@ -32,7 +32,16 @@ export async function storeLog(
if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) { if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) {
sanitiseResults(results) sanitiseResults(results)
} }
await automations.logs.storeLog(automation, results) try {
await automations.logs.storeLog(automation, results)
} catch (e: any) {
if (e.status === 413 && e.request?.data) {
// if content is too large we shouldn't log it
delete e.request.data
e.request.data = { message: "removed due to large size" }
}
logging.logAlert("Error writing automation log", e)
}
} }
export async function checkAppMetadata(apps: App[]) { export async function checkAppMetadata(apps: App[]) {

View File

@ -1,11 +1,15 @@
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
import TestConfiguration from "../../../tests/utilities/TestConfiguration" import TestConfiguration from "../../../tests/utilities/TestConfiguration"
import { import {
captureAutomationQueueMessages, captureAutomationMessages,
captureAutomationRemovals,
captureAutomationResults, captureAutomationResults,
triggerCron,
} from "../utilities" } from "../utilities"
import { automations } from "@budibase/pro" import { automations } from "@budibase/pro"
import { AutomationStatus } from "@budibase/types" import { AutomationData, AutomationStatus } from "@budibase/types"
import { MAX_AUTOMATION_RECURRING_ERRORS } from "../../../constants"
import { Job } from "bull"
describe("cron trigger", () => { describe("cron trigger", () => {
const config = new TestConfiguration() const config = new TestConfiguration()
@ -33,7 +37,7 @@ describe("cron trigger", () => {
}) })
.save() .save()
const messages = await captureAutomationQueueMessages(automation, () => const messages = await captureAutomationMessages(automation, () =>
config.api.application.publish() config.api.application.publish()
) )
expect(messages).toHaveLength(1) expect(messages).toHaveLength(1)
@ -62,8 +66,8 @@ describe("cron trigger", () => {
}) })
}) })
it.only("should stop if the job fails more than 3 times", async () => { it("should stop if the job fails more than N times", async () => {
const runner = await createAutomationBuilder(config) const { automation } = await createAutomationBuilder(config)
.onCron({ cron: "* * * * *" }) .onCron({ cron: "* * * * *" })
.queryRows({ .queryRows({
// @ts-expect-error intentionally sending invalid data // @ts-expect-error intentionally sending invalid data
@ -71,28 +75,31 @@ describe("cron trigger", () => {
}) })
.save() .save()
await config.api.application.publish() const [message] = await captureAutomationMessages(automation, () =>
config.api.application.publish()
const results = await captureAutomationResults(
runner.automation,
async () => {
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
}
) )
expect(results).toHaveLength(5)
await config.withProdApp(async () => { await config.withProdApp(async () => {
const logs = await automations.logs.logSearch({ let results: Job<AutomationData>[] = []
automationId: runner.automation._id, const removed = await captureAutomationRemovals(automation, async () => {
results = await captureAutomationResults(automation, async () => {
for (let i = 0; i < MAX_AUTOMATION_RECURRING_ERRORS; i++) {
triggerCron(message)
}
})
})
expect(removed).toHaveLength(1)
expect(removed[0].id).toEqual(message.id)
expect(results).toHaveLength(5)
const search = await automations.logs.logSearch({
automationId: automation._id,
status: AutomationStatus.STOPPED_ERROR, status: AutomationStatus.STOPPED_ERROR,
}) })
expect(logs.data).toHaveLength(1) expect(search.data).toHaveLength(1)
expect(logs.data[0].status).toEqual(AutomationStatus.STOPPED_ERROR) expect(search.data[0].status).toEqual(AutomationStatus.STOPPED_ERROR)
}) })
}) })

View File

@ -6,6 +6,7 @@ import { Knex } from "knex"
import { getQueue } from "../.." import { getQueue } from "../.."
import { Job } from "bull" import { Job } from "bull"
import { helpers } from "@budibase/shared-core" import { helpers } from "@budibase/shared-core"
import { queue } from "@budibase/backend-core"
let config: TestConfiguration let config: TestConfiguration
@ -20,6 +21,17 @@ export function afterAll() {
config.end() config.end()
} }
export function getTestQueue(): queue.InMemoryQueue<AutomationData> {
return getQueue() as unknown as queue.InMemoryQueue<AutomationData>
}
export function triggerCron(message: Job<AutomationData>) {
if (!message.opts?.repeat || !("cron" in message.opts.repeat)) {
throw new Error("Expected cron message")
}
getTestQueue().manualTrigger(message.id)
}
export async function runInProd(fn: any) { export async function runInProd(fn: any) {
env._set("NODE_ENV", "production") env._set("NODE_ENV", "production")
let error let error
@ -34,9 +46,41 @@ export async function runInProd(fn: any) {
} }
} }
export async function captureAllAutomationQueueMessages( export async function captureAllAutomationRemovals(f: () => Promise<unknown>) {
const messages: Job<AutomationData>[] = []
const queue = getQueue()
const messageListener = async (message: Job<AutomationData>) => {
messages.push(message)
}
queue.on("removed", messageListener)
try {
await f()
// Queue messages tend to be send asynchronously in API handlers, so there's
// no guarantee that awaiting this function will have queued anything yet.
// We wait here to make sure we're queued _after_ any existing async work.
await helpers.wait(100)
} finally {
queue.off("removed", messageListener)
}
return messages
}
export async function captureAutomationRemovals(
automation: Automation | string,
f: () => Promise<unknown> f: () => Promise<unknown>
) { ) {
const messages = await captureAllAutomationRemovals(f)
return messages.filter(
m =>
m.data.automation._id ===
(typeof automation === "string" ? automation : automation._id)
)
}
export async function captureAllAutomationMessages(f: () => Promise<unknown>) {
const messages: Job<AutomationData>[] = [] const messages: Job<AutomationData>[] = []
const queue = getQueue() const queue = getQueue()
@ -58,11 +102,11 @@ export async function captureAllAutomationQueueMessages(
return messages return messages
} }
export async function captureAutomationQueueMessages( export async function captureAutomationMessages(
automation: Automation | string, automation: Automation | string,
f: () => Promise<unknown> f: () => Promise<unknown>
) { ) {
const messages = await captureAllAutomationQueueMessages(f) const messages = await captureAllAutomationMessages(f)
return messages.filter( return messages.filter(
m => m =>
m.data.automation._id === m.data.automation._id ===
@ -87,7 +131,8 @@ export async function captureAllAutomationResults(
} }
const messageListener = async (message: Job<AutomationData>) => { const messageListener = async (message: Job<AutomationData>) => {
// Don't count cron messages, as they don't get triggered automatically. // Don't count cron messages, as they don't get triggered automatically.
if (message.opts?.repeat != null) { const isManualTrigger = (message as any).manualTrigger === true
if (!isManualTrigger && message.opts?.repeat != null) {
return return
} }
messagesOutstanding++ messagesOutstanding++

View File

@ -234,13 +234,6 @@ class Orchestrator {
return this.job.data.event.appId! return this.job.data.event.appId!
} }
private async getMetadata(): Promise<AutomationMetadata> {
const id = generateAutomationMetadataID(this.automation._id!)
const db = context.getAppDB()
const doc = await db.tryGet<AutomationMetadata>(id)
return doc || { _id: id, errorCount: 0 }
}
isCron(): boolean { isCron(): boolean {
return this.automation.definition.trigger.stepId === CRON_STEP_ID return this.automation.definition.trigger.stepId === CRON_STEP_ID
} }
@ -259,44 +252,47 @@ class Orchestrator {
if (result) { if (result) {
setTriggerOutput(result, { setTriggerOutput(result, {
success: false, success: false,
status: AutomationStatus.STOPPED, status: AutomationStatus.STOPPED_ERROR,
}) })
await this.logResult(result) await this.logResult(result)
} }
} }
private async logResult(result: AutomationResults) { private async logResult(result: AutomationResults) {
try { await storeLog(this.automation, result)
await storeLog(this.automation, result) }
} catch (e: any) {
if (e.status === 413 && e.request?.data) { async getMetadata(): Promise<AutomationMetadata> {
// if content is too large we shouldn't log it const metadataId = generateAutomationMetadataID(this.automation._id!)
delete e.request.data const db = context.getAppDB()
e.request.data = { message: "removed due to large size" } const metadata = await db.tryGet<AutomationMetadata>(metadataId)
} return metadata || { _id: metadataId, errorCount: 0 }
logging.logAlert("Error writing automation log", e)
}
} }
async incrementErrorCount() { async incrementErrorCount() {
for (let attempt = 0; attempt < 3; attempt++) { const db = context.getAppDB()
let err: Error | undefined = undefined
for (let attempt = 0; attempt < 10; attempt++) {
const metadata = await this.getMetadata() const metadata = await this.getMetadata()
metadata.errorCount ||= 0 metadata.errorCount ||= 0
metadata.errorCount++ metadata.errorCount++
const db = context.getAppDB()
try { try {
await db.put(metadata) await db.put(metadata)
return metadata.errorCount return metadata.errorCount
} catch (err) { } catch (error: any) {
logging.logAlertWithInfo( err = error
"Failed to update error count in automation metadata", await helpers.wait(1000 + Math.random() * 1000)
db.name,
this.automation._id!,
err
)
} }
} }
logging.logAlertWithInfo(
"Failed to update error count in automation metadata",
db.name,
this.automation._id!,
err
)
return undefined
} }
private isProdApp(): boolean { private isProdApp(): boolean {
@ -306,7 +302,7 @@ class Orchestrator {
hasErrored(context: AutomationContext): boolean { hasErrored(context: AutomationContext): boolean {
const [_trigger, ...steps] = context.steps const [_trigger, ...steps] = context.steps
for (const step of steps) { for (const step of steps) {
if (step.outputs?.success === false) { if (step.success === false) {
return true return true
} }
} }
@ -374,7 +370,7 @@ class Orchestrator {
} }
let errorCount = 0 let errorCount = 0
if (isProdAppID(this.appId) && this.isCron() && this.hasErrored(ctx)) { if (this.isProdApp() && this.isCron() && this.hasErrored(ctx)) {
errorCount = (await this.incrementErrorCount()) || 0 errorCount = (await this.incrementErrorCount()) || 0
} }
@ -612,7 +608,7 @@ export async function executeInThread(
}) })
} }
export const removeStalled = async (job: Job) => { export const removeStalled = async (job: Job<AutomationData>) => {
const appId = job.data.event.appId const appId = job.data.event.appId
if (!appId) { if (!appId) {
throw new Error("Unable to execute, event doesn't contain app ID.") throw new Error("Unable to execute, event doesn't contain app ID.")