Handling listeners as part of queue creation, rather than external part.
This commit is contained in:
parent
b6ca14aa85
commit
d620e54fdb
|
@ -1,78 +1,90 @@
|
|||
import { Queue, Job, JobId } from "bull"
|
||||
import { AutomationEvent } from "../definitions/automations"
|
||||
import * as automation from "../threads/automation"
|
||||
import { Job, JobId, Queue } from "bull"
|
||||
import { JobQueue } from "./constants"
|
||||
|
||||
export const addListeners = (queue: Queue) => {
|
||||
logging(queue)
|
||||
handleStalled(queue)
|
||||
export type StalledFn = (job: Job) => Promise<void>
|
||||
|
||||
export const addListeners = (
|
||||
queue: Queue,
|
||||
jobQueue: JobQueue,
|
||||
removeStalled?: StalledFn
|
||||
) => {
|
||||
logging(queue, jobQueue)
|
||||
if (removeStalled) {
|
||||
handleStalled(queue, removeStalled)
|
||||
}
|
||||
}
|
||||
|
||||
const handleStalled = (queue: Queue) => {
|
||||
const handleStalled = (queue: Queue, removeStalled: StalledFn) => {
|
||||
queue.on("stalled", async (job: Job) => {
|
||||
await automation.removeStalled(job as AutomationEvent)
|
||||
await removeStalled(job)
|
||||
})
|
||||
}
|
||||
|
||||
const logging = (queue: Queue) => {
|
||||
const logging = (queue: Queue, jobQueue: JobQueue) => {
|
||||
let eventType: string
|
||||
switch (jobQueue) {
|
||||
case JobQueue.AUTOMATIONS:
|
||||
eventType = "automation-event"
|
||||
break
|
||||
case JobQueue.APP_BACKUPS:
|
||||
eventType = "app-backup-event"
|
||||
break
|
||||
}
|
||||
if (process.env.NODE_DEBUG?.includes("bull")) {
|
||||
queue
|
||||
.on("error", (error: any) => {
|
||||
// An error occurred.
|
||||
console.error(`automation-event=error error=${JSON.stringify(error)}`)
|
||||
console.error(`${eventType}=error error=${JSON.stringify(error)}`)
|
||||
})
|
||||
.on("waiting", (jobId: JobId) => {
|
||||
// A Job is waiting to be processed as soon as a worker is idling.
|
||||
console.log(`automation-event=waiting jobId=${jobId}`)
|
||||
console.log(`${eventType}=waiting jobId=${jobId}`)
|
||||
})
|
||||
.on("active", (job: Job, jobPromise: any) => {
|
||||
// A job has started. You can use `jobPromise.cancel()`` to abort it.
|
||||
console.log(`automation-event=active jobId=${job.id}`)
|
||||
console.log(`${eventType}=active jobId=${job.id}`)
|
||||
})
|
||||
.on("stalled", (job: Job) => {
|
||||
// A job has been marked as stalled. This is useful for debugging job
|
||||
// workers that crash or pause the event loop.
|
||||
console.error(
|
||||
`automation-event=stalled jobId=${job.id} job=${JSON.stringify(job)}`
|
||||
`${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}`
|
||||
)
|
||||
})
|
||||
.on("progress", (job: Job, progress: any) => {
|
||||
// A job's progress was updated!
|
||||
console.log(
|
||||
`automation-event=progress jobId=${job.id} progress=${progress}`
|
||||
`${eventType}=progress jobId=${job.id} progress=${progress}`
|
||||
)
|
||||
})
|
||||
.on("completed", (job: Job, result) => {
|
||||
// A job successfully completed with a `result`.
|
||||
console.log(
|
||||
`automation-event=completed jobId=${job.id} result=${result}`
|
||||
)
|
||||
console.log(`${eventType}=completed jobId=${job.id} result=${result}`)
|
||||
})
|
||||
.on("failed", (job, err: any) => {
|
||||
// A job failed with reason `err`!
|
||||
console.log(`automation-event=failed jobId=${job.id} error=${err}`)
|
||||
console.log(`${eventType}=failed jobId=${job.id} error=${err}`)
|
||||
})
|
||||
.on("paused", () => {
|
||||
// The queue has been paused.
|
||||
console.log(`automation-event=paused`)
|
||||
console.log(`${eventType}=paused`)
|
||||
})
|
||||
.on("resumed", (job: Job) => {
|
||||
// The queue has been resumed.
|
||||
console.log(`automation-event=paused jobId=${job.id}`)
|
||||
console.log(`${eventType}=paused jobId=${job.id}`)
|
||||
})
|
||||
.on("cleaned", (jobs: Job[], type: string) => {
|
||||
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
|
||||
// jobs, and `type` is the type of jobs cleaned.
|
||||
console.log(
|
||||
`automation-event=cleaned length=${jobs.length} type=${type}`
|
||||
)
|
||||
console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`)
|
||||
})
|
||||
.on("drained", () => {
|
||||
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
|
||||
console.log(`automation-event=drained`)
|
||||
console.log(`${eventType}=drained`)
|
||||
})
|
||||
.on("removed", (job: Job) => {
|
||||
// A job successfully removed.
|
||||
console.log(`automation-event=removed jobId=${job.id}`)
|
||||
console.log(`${eventType}=removed jobId=${job.id}`)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,9 +1,9 @@
|
|||
import env from "../environment"
|
||||
import { getRedisOptions } from "../redis/utils"
|
||||
import { JobQueue } from "./constants"
|
||||
import inMemoryQueue from "./inMemoryQueue"
|
||||
import BullQueue from "bull"
|
||||
import InMemoryQueue from "./inMemoryQueue"
|
||||
import BullQueue from "bull"
|
||||
import { addListeners, StalledFn } from "./listeners"
|
||||
const { opts, redisProtocolUrl } = getRedisOptions()
|
||||
|
||||
const CLEANUP_PERIOD_MS = 60 * 1000
|
||||
|
@ -16,14 +16,18 @@ async function cleanup() {
|
|||
}
|
||||
}
|
||||
|
||||
export function createQueue(jobQueue: JobQueue) {
|
||||
export function createQueue(
|
||||
jobQueue: JobQueue,
|
||||
removeStalled: StalledFn
|
||||
): BullQueue.Queue {
|
||||
const queueConfig: any = redisProtocolUrl || { redis: opts }
|
||||
let queue: any
|
||||
if (env.isTest()) {
|
||||
queue = new BullQueue(jobQueue, queueConfig)
|
||||
} else {
|
||||
queue = new inMemoryQueue(jobQueue, queueConfig)
|
||||
queue = new InMemoryQueue(jobQueue, queueConfig)
|
||||
}
|
||||
addListeners(queue, jobQueue, removeStalled)
|
||||
QUEUES.push(queue)
|
||||
if (!cleanupInterval) {
|
||||
cleanupInterval = setInterval(cleanup, CLEANUP_PERIOD_MS)
|
||||
|
|
|
@ -2,10 +2,12 @@ const { createBullBoard } = require("@bull-board/api")
|
|||
const { BullAdapter } = require("@bull-board/api/bullAdapter")
|
||||
const { KoaAdapter } = require("@bull-board/koa")
|
||||
const { queue } = require("@budibase/backend-core")
|
||||
const listeners = require("./listeners")
|
||||
const automation = require("../threads/automation")
|
||||
|
||||
let automationQueue = queue.createQueue(queue.JobQueue.AUTOMATIONS)
|
||||
listeners.addListeners(automationQueue)
|
||||
let automationQueue = queue.createQueue(
|
||||
queue.JobQueue.AUTOMATIONS,
|
||||
automation.removeStalled
|
||||
)
|
||||
|
||||
const PATH_PREFIX = "/bulladmin"
|
||||
|
||||
|
|
Loading…
Reference in New Issue