Merge pull request #15566 from Budibase/automation-tests-9

Add tests for cron stopping.
This commit is contained in:
Sam Rose 2025-02-18 09:28:11 +00:00 committed by GitHub
commit 5e21278268
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 245 additions and 131 deletions

View File

@ -52,7 +52,11 @@ class InMemoryQueue implements Partial<Queue> {
_opts?: QueueOptions _opts?: QueueOptions
_messages: JobMessage[] _messages: JobMessage[]
_queuedJobIds: Set<string> _queuedJobIds: Set<string>
_emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }> _emitter: NodeJS.EventEmitter<{
message: [JobMessage]
completed: [Job]
removed: [JobMessage]
}>
_runCount: number _runCount: number
_addCount: number _addCount: number
@ -83,6 +87,12 @@ class InMemoryQueue implements Partial<Queue> {
async process(concurrencyOrFunc: number | any, func?: any) { async process(concurrencyOrFunc: number | any, func?: any) {
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
this._emitter.on("message", async message => { this._emitter.on("message", async message => {
// For the purpose of testing, don't trigger cron jobs immediately.
// Require the test to trigger them manually with timestamps.
if (message.opts?.repeat != null) {
return
}
let resp = func(message) let resp = func(message)
async function retryFunc(fnc: any) { async function retryFunc(fnc: any) {
@ -164,13 +174,14 @@ class InMemoryQueue implements Partial<Queue> {
*/ */
async close() {} async close() {}
/** async removeRepeatableByKey(id: string) {
* This removes a cron which has been implemented, this is part of Bull API. for (const [idx, message] of this._messages.entries()) {
* @param cronJobId The cron which is to be removed. if (message.opts?.jobId?.toString() === id) {
*/ this._messages.splice(idx, 1)
async removeRepeatableByKey(cronJobId: string) { this._emitter.emit("removed", message)
// TODO: implement for testing return
console.log(cronJobId) }
}
} }
async removeJobs(_pattern: string) { async removeJobs(_pattern: string) {
@ -214,7 +225,9 @@ class InMemoryQueue implements Partial<Queue> {
} }
async getRepeatableJobs() { async getRepeatableJobs() {
return this._messages.map(job => jobToJobInformation(job as Job)) return this._messages
.filter(job => job.opts?.repeat != null)
.map(job => jobToJobInformation(job as Job))
} }
} }

View File

@ -1,45 +0,0 @@
import tk from "timekeeper"
import "../../../environment"
import * as automations from "../../index"
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
const initialTime = Date.now()
tk.freeze(initialTime)
const oneMinuteInMs = 60 * 1000
describe("cron automations", () => {
const config = new TestConfiguration()
beforeAll(async () => {
await automations.init()
await config.init()
})
afterAll(async () => {
await automations.shutdown()
config.end()
})
beforeEach(() => {
tk.freeze(initialTime)
})
it("should initialise the automation timestamp", async () => {
await createAutomationBuilder(config).onCron({ cron: "* * * * *" }).save()
tk.travel(Date.now() + oneMinuteInMs)
await config.publish()
const { data } = await config.getAutomationLogs()
expect(data).toHaveLength(1)
expect(data).toEqual([
expect.objectContaining({
trigger: expect.objectContaining({
outputs: { timestamp: initialTime + oneMinuteInMs },
}),
}),
])
})
})

View File

@ -1,6 +1,11 @@
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
import TestConfiguration from "../../../tests/utilities/TestConfiguration" import TestConfiguration from "../../../tests/utilities/TestConfiguration"
import { captureAutomationResults } from "../utilities" import {
captureAutomationQueueMessages,
captureAutomationResults,
} from "../utilities"
import { automations } from "@budibase/pro"
import { AutomationStatus } from "@budibase/types"
describe("cron trigger", () => { describe("cron trigger", () => {
const config = new TestConfiguration() const config = new TestConfiguration()
@ -13,6 +18,13 @@ describe("cron trigger", () => {
config.end() config.end()
}) })
beforeEach(async () => {
const { automations } = await config.api.automation.fetch()
for (const automation of automations) {
await config.api.automation.delete(automation)
}
})
it("should queue a Bull cron job", async () => { it("should queue a Bull cron job", async () => {
const { automation } = await createAutomationBuilder(config) const { automation } = await createAutomationBuilder(config)
.onCron({ cron: "* * * * *" }) .onCron({ cron: "* * * * *" })
@ -21,12 +33,12 @@ describe("cron trigger", () => {
}) })
.save() .save()
const jobs = await captureAutomationResults(automation, () => const messages = await captureAutomationQueueMessages(automation, () =>
config.api.application.publish() config.api.application.publish()
) )
expect(jobs).toHaveLength(1) expect(messages).toHaveLength(1)
const repeat = jobs[0].opts?.repeat const repeat = messages[0].opts?.repeat
if (!repeat || !("cron" in repeat)) { if (!repeat || !("cron" in repeat)) {
throw new Error("Expected cron repeat") throw new Error("Expected cron repeat")
} }
@ -49,4 +61,82 @@ describe("cron trigger", () => {
}, },
}) })
}) })
it("should stop if the job fails more than 3 times", async () => {
const runner = await createAutomationBuilder(config)
.onCron({ cron: "* * * * *" })
.queryRows({
// @ts-expect-error intentionally sending invalid data
tableId: null,
})
.save()
await config.api.application.publish()
const results = await captureAutomationResults(
runner.automation,
async () => {
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
await runner.trigger({ timeout: 1000, fields: {} })
}
)
expect(results).toHaveLength(5)
await config.withProdApp(async () => {
const {
data: [latest, ..._],
} = await automations.logs.logSearch({
automationId: runner.automation._id,
})
expect(latest.status).toEqual(AutomationStatus.STOPPED_ERROR)
})
})
it("should fill in the timestamp if one is not provided", async () => {
const runner = await createAutomationBuilder(config)
.onCron({ cron: "* * * * *" })
.serverLog({
text: "Hello, world!",
})
.save()
await config.api.application.publish()
const results = await captureAutomationResults(
runner.automation,
async () => {
await runner.trigger({ timeout: 1000, fields: {} })
}
)
expect(results).toHaveLength(1)
expect(results[0].data.event.timestamp).toBeWithin(
Date.now() - 1000,
Date.now() + 1000
)
})
it("should use the given timestamp if one is given", async () => {
const timestamp = 1234
const runner = await createAutomationBuilder(config)
.onCron({ cron: "* * * * *" })
.serverLog({
text: "Hello, world!",
})
.save()
await config.api.application.publish()
const results = await captureAutomationResults(
runner.automation,
async () => {
await runner.trigger({ timeout: 1000, fields: {}, timestamp })
}
)
expect(results).toHaveLength(1)
expect(results[0].data.event.timestamp).toEqual(timestamp)
})
}) })

View File

@ -220,10 +220,34 @@ class AutomationRunner<TStep extends AutomationTriggerStepId> {
async trigger( async trigger(
request: TriggerAutomationRequest request: TriggerAutomationRequest
): Promise<TriggerAutomationResponse> { ): Promise<TriggerAutomationResponse> {
if (!this.config.prodAppId) {
throw new Error(
"Automations can only be triggered in a production app context, call config.api.application.publish()"
)
}
// Because you can only trigger automations in a production app context, we
// wrap the trigger call to make tests a bit cleaner. If you really want to
// test triggering an automation in a dev app context, you can use the
// automation API directly.
return await this.config.withProdApp(async () => {
try {
return await this.config.api.automation.trigger( return await this.config.api.automation.trigger(
this.automation._id!, this.automation._id!,
request request
) )
} catch (e: any) {
if (e.cause.status === 404) {
throw new Error(
`Automation with ID ${
this.automation._id
} not found in app ${this.config.getAppId()}. You may have forgotten to call config.api.application.publish().`,
{ cause: e }
)
} else {
throw e
}
}
})
} }
} }

View File

@ -34,6 +34,42 @@ export async function runInProd(fn: any) {
} }
} }
export async function captureAllAutomationQueueMessages(
f: () => Promise<unknown>
) {
const messages: Job<AutomationData>[] = []
const queue = getQueue()
const messageListener = async (message: Job<AutomationData>) => {
messages.push(message)
}
queue.on("message", messageListener)
try {
await f()
// Queue messages tend to be send asynchronously in API handlers, so there's
// no guarantee that awaiting this function will have queued anything yet.
// We wait here to make sure we're queued _after_ any existing async work.
await helpers.wait(100)
} finally {
queue.off("message", messageListener)
}
return messages
}
export async function captureAutomationQueueMessages(
automation: Automation | string,
f: () => Promise<unknown>
) {
const messages = await captureAllAutomationQueueMessages(f)
return messages.filter(
m =>
m.data.automation._id ===
(typeof automation === "string" ? automation : automation._id)
)
}
/** /**
* Capture all automation runs that occur during the execution of a function. * Capture all automation runs that occur during the execution of a function.
* This function will wait for all messages to be processed before returning. * This function will wait for all messages to be processed before returning.
@ -43,14 +79,18 @@ export async function captureAllAutomationResults(
): Promise<Job<AutomationData>[]> { ): Promise<Job<AutomationData>[]> {
const runs: Job<AutomationData>[] = [] const runs: Job<AutomationData>[] = []
const queue = getQueue() const queue = getQueue()
let messagesReceived = 0 let messagesOutstanding = 0
const completedListener = async (job: Job<AutomationData>) => { const completedListener = async (job: Job<AutomationData>) => {
runs.push(job) runs.push(job)
messagesReceived-- messagesOutstanding--
} }
const messageListener = async () => { const messageListener = async (message: Job<AutomationData>) => {
messagesReceived++ // Don't count cron messages, as they don't get triggered automatically.
if (message.opts?.repeat != null) {
return
}
messagesOutstanding++
} }
queue.on("message", messageListener) queue.on("message", messageListener)
queue.on("completed", completedListener) queue.on("completed", completedListener)
@ -61,9 +101,18 @@ export async function captureAllAutomationResults(
// We wait here to make sure we're queued _after_ any existing async work. // We wait here to make sure we're queued _after_ any existing async work.
await helpers.wait(100) await helpers.wait(100)
} finally { } finally {
const waitMax = 10000
let waited = 0
// eslint-disable-next-line no-unmodified-loop-condition // eslint-disable-next-line no-unmodified-loop-condition
while (messagesReceived > 0) { while (messagesOutstanding > 0) {
await helpers.wait(50) await helpers.wait(50)
waited += 50
if (waited > waitMax) {
// eslint-disable-next-line no-unsafe-finally
throw new Error(
`Timed out waiting for automation runs to complete. ${messagesOutstanding} messages waiting for completion.`
)
}
} }
queue.off("completed", completedListener) queue.off("completed", completedListener)
queue.off("message", messageListener) queue.off("message", messageListener)

View File

@ -72,7 +72,7 @@ export async function processEvent(job: AutomationJob) {
const task = async () => { const task = async () => {
try { try {
if (isCronTrigger(job.data.automation)) { if (isCronTrigger(job.data.automation) && !job.data.event.timestamp) {
// Requires the timestamp at run time // Requires the timestamp at run time
job.data.event.timestamp = Date.now() job.data.event.timestamp = Date.now()
} }

View File

@ -261,11 +261,13 @@ export default class TestConfiguration {
async withApp<R>(app: App | string, f: () => Promise<R>) { async withApp<R>(app: App | string, f: () => Promise<R>) {
const oldAppId = this.appId const oldAppId = this.appId
this.appId = typeof app === "string" ? app : app.appId this.appId = typeof app === "string" ? app : app.appId
return await context.doInAppContext(this.appId, async () => {
try { try {
return await f() return await f()
} finally { } finally {
this.appId = oldAppId this.appId = oldAppId
} }
})
} }
async withProdApp<R>(f: () => Promise<R>) { async withProdApp<R>(f: () => Promise<R>) {

View File

@ -155,23 +155,12 @@ class Orchestrator {
return step return step
} }
async getMetadata(): Promise<AutomationMetadata> { isCron(): boolean {
const metadataId = generateAutomationMetadataID(this.automation._id!) return isRecurring(this.automation)
const db = context.getAppDB()
let metadata: AutomationMetadata
try {
metadata = await db.get(metadataId)
} catch (err) {
metadata = {
_id: metadataId,
errorCount: 0,
}
}
return metadata
} }
async stopCron(reason: string) { async stopCron(reason: string) {
if (!this.job.opts.repeat) { if (!this.isCron()) {
return return
} }
logging.logWarn( logging.logWarn(
@ -192,46 +181,44 @@ class Orchestrator {
await storeLog(automation, this.executionOutput) await storeLog(automation, this.executionOutput)
} }
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> { async checkIfShouldStop(): Promise<boolean> {
if (!metadata.errorCount || !this.job.opts.repeat) { const metadata = await this.getMetadata()
if (!metadata.errorCount || !this.isCron()) {
return false return false
} }
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
await this.stopCron("errors")
return true return true
} }
return false return false
} }
async updateMetadata(metadata: AutomationMetadata) { async getMetadata(): Promise<AutomationMetadata> {
const output = this.executionOutput, const metadataId = generateAutomationMetadataID(this.automation._id!)
automation = this.automation const db = context.getAppDB()
if (!output || !isRecurring(automation)) { const metadata = await db.tryGet<AutomationMetadata>(metadataId)
return return metadata || { _id: metadataId, errorCount: 0 }
}
const count = metadata.errorCount
const isError = isErrorInOutput(output)
// nothing to do in this scenario, escape
if (!count && !isError) {
return
}
if (isError) {
metadata.errorCount = count ? count + 1 : 1
} else {
metadata.errorCount = 0
} }
async incrementErrorCount() {
for (let attempt = 0; attempt < 3; attempt++) {
const metadata = await this.getMetadata()
metadata.errorCount ||= 0
metadata.errorCount++
const db = context.getAppDB() const db = context.getAppDB()
try { try {
await db.put(metadata) await db.put(metadata)
return
} catch (err) { } catch (err) {
logging.logAlertWithInfo( logging.logAlertWithInfo(
"Failed to write automation metadata", "Failed to update error count in automation metadata",
db.name, db.name,
automation._id!, this.automation._id!,
err err
) )
} }
} }
}
updateExecutionOutput(id: string, stepId: string, inputs: any, outputs: any) { updateExecutionOutput(id: string, stepId: string, inputs: any, outputs: any) {
const stepObj = { id, stepId, inputs, outputs } const stepObj = { id, stepId, inputs, outputs }
@ -293,18 +280,6 @@ class Orchestrator {
await enrichBaseContext(this.context) await enrichBaseContext(this.context)
this.context.user = this.currentUser this.context.user = this.currentUser
let metadata
// check if this is a recurring automation,
if (isProdAppID(this.appId) && isRecurring(this.automation)) {
span?.addTags({ recurring: true })
metadata = await this.getMetadata()
const shouldStop = await this.checkIfShouldStop(metadata)
if (shouldStop) {
span?.addTags({ shouldStop: true })
return
}
}
const start = performance.now() const start = performance.now()
await this.executeSteps(this.automation.definition.steps) await this.executeSteps(this.automation.definition.steps)
@ -332,10 +307,15 @@ class Orchestrator {
} }
if ( if (
isProdAppID(this.appId) && isProdAppID(this.appId) &&
isRecurring(this.automation) && this.isCron() &&
metadata isErrorInOutput(this.executionOutput)
) { ) {
await this.updateMetadata(metadata) await this.incrementErrorCount()
if (await this.checkIfShouldStop()) {
await this.stopCron("errors")
span?.addTags({ shouldStop: true })
return
}
} }
return this.executionOutput return this.executionOutput
} }

View File

@ -65,6 +65,7 @@ export interface ClearAutomationLogResponse {
export interface TriggerAutomationRequest { export interface TriggerAutomationRequest {
fields: Record<string, any> fields: Record<string, any>
timestamp?: number
// time in seconds // time in seconds
timeout: number timeout: number
} }