Handling listeners as part of queue creation, rather than external part.

This commit is contained in:
mike12345567 2022-10-13 17:55:05 +01:00
parent 9c57300030
commit 94188771df
3 changed files with 50 additions and 32 deletions

View File

@ -1,78 +1,90 @@
import { Queue, Job, JobId } from "bull" import { Job, JobId, Queue } from "bull"
import { AutomationEvent } from "../definitions/automations" import { JobQueue } from "./constants"
import * as automation from "../threads/automation"
export const addListeners = (queue: Queue) => { export type StalledFn = (job: Job) => Promise<void>
logging(queue)
handleStalled(queue) export const addListeners = (
queue: Queue,
jobQueue: JobQueue,
removeStalled?: StalledFn
) => {
logging(queue, jobQueue)
if (removeStalled) {
handleStalled(queue, removeStalled)
}
} }
const handleStalled = (queue: Queue) => { const handleStalled = (queue: Queue, removeStalled: StalledFn) => {
queue.on("stalled", async (job: Job) => { queue.on("stalled", async (job: Job) => {
await automation.removeStalled(job as AutomationEvent) await removeStalled(job)
}) })
} }
const logging = (queue: Queue) => { const logging = (queue: Queue, jobQueue: JobQueue) => {
let eventType: string
switch (jobQueue) {
case JobQueue.AUTOMATIONS:
eventType = "automation-event"
break
case JobQueue.APP_BACKUPS:
eventType = "app-backup-event"
break
}
if (process.env.NODE_DEBUG?.includes("bull")) { if (process.env.NODE_DEBUG?.includes("bull")) {
queue queue
.on("error", (error: any) => { .on("error", (error: any) => {
// An error occurred. // An error occurred.
console.error(`automation-event=error error=${JSON.stringify(error)}`) console.error(`${eventType}=error error=${JSON.stringify(error)}`)
}) })
.on("waiting", (jobId: JobId) => { .on("waiting", (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling. // A Job is waiting to be processed as soon as a worker is idling.
console.log(`automation-event=waiting jobId=${jobId}`) console.log(`${eventType}=waiting jobId=${jobId}`)
}) })
.on("active", (job: Job, jobPromise: any) => { .on("active", (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it. // A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log(`automation-event=active jobId=${job.id}`) console.log(`${eventType}=active jobId=${job.id}`)
}) })
.on("stalled", (job: Job) => { .on("stalled", (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job // A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop. // workers that crash or pause the event loop.
console.error( console.error(
`automation-event=stalled jobId=${job.id} job=${JSON.stringify(job)}` `${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}`
) )
}) })
.on("progress", (job: Job, progress: any) => { .on("progress", (job: Job, progress: any) => {
// A job's progress was updated! // A job's progress was updated!
console.log( console.log(
`automation-event=progress jobId=${job.id} progress=${progress}` `${eventType}=progress jobId=${job.id} progress=${progress}`
) )
}) })
.on("completed", (job: Job, result) => { .on("completed", (job: Job, result) => {
// A job successfully completed with a `result`. // A job successfully completed with a `result`.
console.log( console.log(`${eventType}=completed jobId=${job.id} result=${result}`)
`automation-event=completed jobId=${job.id} result=${result}`
)
}) })
.on("failed", (job, err: any) => { .on("failed", (job, err: any) => {
// A job failed with reason `err`! // A job failed with reason `err`!
console.log(`automation-event=failed jobId=${job.id} error=${err}`) console.log(`${eventType}=failed jobId=${job.id} error=${err}`)
}) })
.on("paused", () => { .on("paused", () => {
// The queue has been paused. // The queue has been paused.
console.log(`automation-event=paused`) console.log(`${eventType}=paused`)
}) })
.on("resumed", (job: Job) => { .on("resumed", (job: Job) => {
// The queue has been resumed. // The queue has been resumed.
console.log(`automation-event=paused jobId=${job.id}`) console.log(`${eventType}=paused jobId=${job.id}`)
}) })
.on("cleaned", (jobs: Job[], type: string) => { .on("cleaned", (jobs: Job[], type: string) => {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned. // jobs, and `type` is the type of jobs cleaned.
console.log( console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`)
`automation-event=cleaned length=${jobs.length} type=${type}`
)
}) })
.on("drained", () => { .on("drained", () => {
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
console.log(`automation-event=drained`) console.log(`${eventType}=drained`)
}) })
.on("removed", (job: Job) => { .on("removed", (job: Job) => {
// A job successfully removed. // A job successfully removed.
console.log(`automation-event=removed jobId=${job.id}`) console.log(`${eventType}=removed jobId=${job.id}`)
}) })
} }
} }

View File

@ -1,9 +1,9 @@
import env from "../environment" import env from "../environment"
import { getRedisOptions } from "../redis/utils" import { getRedisOptions } from "../redis/utils"
import { JobQueue } from "./constants" import { JobQueue } from "./constants"
import inMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull"
import InMemoryQueue from "./inMemoryQueue" import InMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull"
import { addListeners, StalledFn } from "./listeners"
const { opts, redisProtocolUrl } = getRedisOptions() const { opts, redisProtocolUrl } = getRedisOptions()
const CLEANUP_PERIOD_MS = 60 * 1000 const CLEANUP_PERIOD_MS = 60 * 1000
@ -16,14 +16,18 @@ async function cleanup() {
} }
} }
export function createQueue(jobQueue: JobQueue) { export function createQueue(
jobQueue: JobQueue,
removeStalled: StalledFn
): BullQueue.Queue {
const queueConfig: any = redisProtocolUrl || { redis: opts } const queueConfig: any = redisProtocolUrl || { redis: opts }
let queue: any let queue: any
if (env.isTest()) { if (env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig) queue = new BullQueue(jobQueue, queueConfig)
} else { } else {
queue = new inMemoryQueue(jobQueue, queueConfig) queue = new InMemoryQueue(jobQueue, queueConfig)
} }
addListeners(queue, jobQueue, removeStalled)
QUEUES.push(queue) QUEUES.push(queue)
if (!cleanupInterval) { if (!cleanupInterval) {
cleanupInterval = setInterval(cleanup, CLEANUP_PERIOD_MS) cleanupInterval = setInterval(cleanup, CLEANUP_PERIOD_MS)

View File

@ -2,10 +2,12 @@ const { createBullBoard } = require("@bull-board/api")
const { BullAdapter } = require("@bull-board/api/bullAdapter") const { BullAdapter } = require("@bull-board/api/bullAdapter")
const { KoaAdapter } = require("@bull-board/koa") const { KoaAdapter } = require("@bull-board/koa")
const { queue } = require("@budibase/backend-core") const { queue } = require("@budibase/backend-core")
const listeners = require("./listeners") const automation = require("../threads/automation")
let automationQueue = queue.createQueue(queue.JobQueue.AUTOMATIONS) let automationQueue = queue.createQueue(
listeners.addListeners(automationQueue) queue.JobQueue.AUTOMATIONS,
automation.removeStalled
)
const PATH_PREFIX = "/bulladmin" const PATH_PREFIX = "/bulladmin"