Merge pull request #7374 from Budibase/handle-stalled-crons

Add logging to all event listeners for automations + stalled handling
This commit is contained in:
Rory Powell 2022-08-20 09:26:10 +01:00 committed by GitHub
commit 5f278f6323
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 122 additions and 22 deletions

View File

@ -8,12 +8,14 @@ const Queue = env.isTest()
const { JobQueues } = require("../constants") const { JobQueues } = require("../constants")
const { utils } = require("@budibase/backend-core/redis") const { utils } = require("@budibase/backend-core/redis")
const { opts, redisProtocolUrl } = utils.getRedisOptions() const { opts, redisProtocolUrl } = utils.getRedisOptions()
const listeners = require("./listeners")
const CLEANUP_PERIOD_MS = 60 * 1000 const CLEANUP_PERIOD_MS = 60 * 1000
const queueConfig = redisProtocolUrl || { redis: opts } const queueConfig = redisProtocolUrl || { redis: opts }
let cleanupInternal = null let cleanupInternal = null
let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig) let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig)
listeners.addListeners(automationQueue)
async function cleanup() { async function cleanup() {
await automationQueue.clean(CLEANUP_PERIOD_MS, "completed") await automationQueue.clean(CLEANUP_PERIOD_MS, "completed")

View File

@ -0,0 +1,78 @@
import { Queue, Job, JobId } from "bull"
import { AutomationEvent } from "../definitions/automations"
import * as automation from "../threads/automation"
export const addListeners = (queue: Queue) => {
logging(queue)
// handleStalled(queue)
}
const handleStalled = (queue: Queue) => {
queue.on("active", async (job: Job) => {
await automation.removeStalled(job as AutomationEvent)
})
}
const logging = (queue: Queue) => {
if (process.env.NODE_DEBUG?.includes("bull")) {
queue
.on("error", (error: any) => {
// An error occurred.
console.error(`automation-event=error error=${JSON.stringify(error)}`)
})
.on("waiting", (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling.
console.log(`automation-event=waiting jobId=${jobId}`)
})
.on("active", (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log(`automation-event=active jobId=${job.id}`)
})
.on("stalled", (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
console.error(
`automation-event=stalled jobId=${job.id} job=${JSON.stringify(job)}`
)
})
.on("progress", (job: Job, progress: any) => {
// A job's progress was updated!
console.log(
`automation-event=progress jobId=${job.id} progress=${progress}`
)
})
.on("completed", (job: Job, result) => {
// A job successfully completed with a `result`.
console.log(
`automation-event=completed jobId=${job.id} result=${result}`
)
})
.on("failed", (job, err: any) => {
// A job failed with reason `err`!
console.log(`automation-event=failed jobId=${job.id} error=${err}`)
})
.on("paused", () => {
// The queue has been paused.
console.log(`automation-event=paused`)
})
.on("resumed", (job: Job) => {
// The queue has been resumed.
console.log(`automation-event=paused jobId=${job.id}`)
})
.on("cleaned", (jobs: Job[], type: string) => {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned.
console.log(
`automation-event=cleaned length=${jobs.length} type=${type}`
)
})
.on("drained", () => {
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
console.log(`automation-event=drained`)
})
.on("removed", (job: Job) => {
// A job successfully removed.
console.log(`automation-event=removed jobId=${job.id}`)
})
}
}

View File

@ -21,11 +21,13 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId
const CRON_STEP_ID = definitions.CRON.stepId const CRON_STEP_ID = definitions.CRON.stepId
const Runner = new Thread(ThreadType.AUTOMATION) const Runner = new Thread(ThreadType.AUTOMATION)
const jobMessage = (job: any, message: string) => {
return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}`
}
export async function processEvent(job: any) { export async function processEvent(job: any) {
try { try {
console.log( console.log(jobMessage(job, "running"))
`${job.data.automation.appId} automation ${job.data.automation._id} running. jobId=${job.id}`
)
// need to actually await these so that an error can be captured properly // need to actually await these so that an error can be captured properly
const tenantId = tenancy.getTenantIDFromAppID(job.data.event.appId) const tenantId = tenancy.getTenantIDFromAppID(job.data.event.appId)
return await tenancy.doInTenant(tenantId, async () => { return await tenancy.doInTenant(tenantId, async () => {
@ -34,9 +36,7 @@ export async function processEvent(job: any) {
}) })
} catch (err) { } catch (err) {
const errJson = JSON.stringify(err) const errJson = JSON.stringify(err)
console.error( console.error(jobMessage(job, `was unable to run - ${errJson}`))
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${errJson}`
)
console.trace(err) console.trace(err)
return { err } return { err }
} }
@ -91,6 +91,7 @@ export async function disableAllCrons(appId: any) {
export async function disableCron(jobId: string, jobKey: string) { export async function disableCron(jobId: string, jobKey: string) {
await queue.removeRepeatableByKey(jobKey) await queue.removeRepeatableByKey(jobKey)
await queue.removeJobs(jobId) await queue.removeJobs(jobId)
console.log(`jobId=${jobId} disabled`)
} }
export async function clearMetadata() { export async function clearMetadata() {

View File

@ -133,27 +133,34 @@ class Orchestrator {
return metadata return metadata
} }
async stopCron(reason: string) {
if (!this._repeat) {
return
}
logWarn(
`CRON disabled reason=${reason} - ${this._appId}/${this._automation._id}`
)
const automation = this._automation
const trigger = automation.definition.trigger
await disableCron(this._repeat?.jobId, this._repeat?.jobKey)
this.updateExecutionOutput(
trigger.id,
trigger.stepId,
{},
{
status: AutomationStatus.STOPPED_ERROR,
success: false,
}
)
await storeLog(automation, this.executionOutput)
}
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> { async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> {
if (!metadata.errorCount || !this._repeat) { if (!metadata.errorCount || !this._repeat) {
return false return false
} }
const automation = this._automation
const trigger = automation.definition.trigger
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
logWarn( await this.stopCron("errors")
`CRON disabled due to errors - ${this._appId}/${this._automation._id}`
)
await disableCron(this._repeat?.jobId, this._repeat?.jobKey)
this.updateExecutionOutput(
trigger.id,
trigger.stepId,
{},
{
status: AutomationStatus.STOPPED_ERROR,
success: false,
}
)
await storeLog(automation, this.executionOutput)
return true return true
} }
return false return false
@ -465,3 +472,15 @@ export function execute(input: AutomationEvent, callback: WorkerCallback) {
} }
}) })
} }
export const removeStalled = (input: AutomationEvent) => {
const appId = input.data.event.appId
doInAppContext(appId, async () => {
const automationOrchestrator = new Orchestrator(
input.data.automation,
input.data.event,
input.opts
)
await automationOrchestrator.stopCron("stalled")
})
}