Fixes for cronjob stop - correctly handle this without stalled job handle.
This commit is contained in:
parent
d620e54fdb
commit
b702c7482a
|
@ -3,24 +3,35 @@ import { JobQueue } from "./constants"
|
|||
|
||||
export type StalledFn = (job: Job) => Promise<void>
|
||||
|
||||
export const addListeners = (
|
||||
export function addListeners(
|
||||
queue: Queue,
|
||||
jobQueue: JobQueue,
|
||||
removeStalled?: StalledFn
|
||||
) => {
|
||||
) {
|
||||
logging(queue, jobQueue)
|
||||
if (removeStalled) {
|
||||
handleStalled(queue, removeStalled)
|
||||
}
|
||||
}
|
||||
|
||||
const handleStalled = (queue: Queue, removeStalled: StalledFn) => {
|
||||
function handleStalled(queue: Queue, removeStalled?: StalledFn) {
|
||||
queue.on("stalled", async (job: Job) => {
|
||||
await removeStalled(job)
|
||||
if (removeStalled) {
|
||||
await removeStalled(job)
|
||||
} else if (job.opts.repeat) {
|
||||
const jobId = job.id
|
||||
const repeatJobs = await queue.getRepeatableJobs()
|
||||
for (let repeatJob of repeatJobs) {
|
||||
if (repeatJob.id === jobId) {
|
||||
await queue.removeRepeatableByKey(repeatJob.key)
|
||||
}
|
||||
}
|
||||
console.log(`jobId=${jobId} disabled`)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const logging = (queue: Queue, jobQueue: JobQueue) => {
|
||||
function logging(queue: Queue, jobQueue: JobQueue) {
|
||||
let eventType: string
|
||||
switch (jobQueue) {
|
||||
case JobQueue.AUTOMATIONS:
|
||||
|
|
|
@ -18,7 +18,7 @@ async function cleanup() {
|
|||
|
||||
export function createQueue(
|
||||
jobQueue: JobQueue,
|
||||
removeStalled: StalledFn
|
||||
removeStalled?: StalledFn
|
||||
): BullQueue.Queue {
|
||||
const queueConfig: any = redisProtocolUrl || { redis: opts }
|
||||
let queue: any
|
||||
|
|
|
@ -91,9 +91,13 @@ export async function disableAllCrons(appId: any) {
|
|||
return Promise.all(promises)
|
||||
}
|
||||
|
||||
export async function disableCron(jobId: string, jobKey: string) {
|
||||
await automationQueue.removeRepeatableByKey(jobKey)
|
||||
await automationQueue.removeJobs(jobId)
|
||||
export async function disableCronById(jobId: number | string) {
|
||||
const repeatJobs = await automationQueue.getRepeatableJobs()
|
||||
for (let repeatJob of repeatJobs) {
|
||||
if (repeatJob.id === jobId) {
|
||||
await automationQueue.removeRepeatableByKey(repeatJob.key)
|
||||
}
|
||||
}
|
||||
console.log(`jobId=${jobId} disabled`)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,18 +27,6 @@ export interface TriggerOutput {
|
|||
timestamp?: number
|
||||
}
|
||||
|
||||
export interface AutomationEvent {
|
||||
data: {
|
||||
automation: Automation
|
||||
event: any
|
||||
}
|
||||
opts?: {
|
||||
repeat?: {
|
||||
jobId: string
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export interface AutomationContext extends AutomationResults {
|
||||
steps: any[]
|
||||
trigger: any
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
import { default as threadUtils } from "./utils"
|
||||
import { Job } from "bull"
|
||||
threadUtils.threadSetup()
|
||||
import { isRecurring, disableCron, isErrorInOutput } from "../automations/utils"
|
||||
import {
|
||||
isRecurring,
|
||||
disableCronById,
|
||||
isErrorInOutput,
|
||||
} from "../automations/utils"
|
||||
import { default as actions } from "../automations/actions"
|
||||
import { default as automationUtils } from "../automations/automationUtils"
|
||||
import { default as AutomationEmitter } from "../events/AutomationEmitter"
|
||||
|
@ -13,7 +18,6 @@ import {
|
|||
LoopStep,
|
||||
LoopStepType,
|
||||
LoopInput,
|
||||
AutomationEvent,
|
||||
TriggerOutput,
|
||||
AutomationContext,
|
||||
AutomationMetadata,
|
||||
|
@ -73,19 +77,16 @@ class Orchestrator {
|
|||
_automation: Automation
|
||||
_emitter: any
|
||||
_context: AutomationContext
|
||||
_repeat?: { jobId: string; jobKey: string }
|
||||
_job: Job
|
||||
executionOutput: AutomationContext
|
||||
|
||||
constructor(automation: Automation, triggerOutput: TriggerOutput, opts: any) {
|
||||
constructor(job: Job) {
|
||||
let automation = job.data.automation,
|
||||
triggerOutput = job.data.event
|
||||
const metadata = triggerOutput.metadata
|
||||
this._chainCount = metadata ? metadata.automationChainCount : 0
|
||||
this._appId = triggerOutput.appId as string
|
||||
if (opts?.repeat) {
|
||||
this._repeat = {
|
||||
jobId: opts.repeat.jobId,
|
||||
jobKey: opts.repeat.key,
|
||||
}
|
||||
}
|
||||
this._job = job
|
||||
const triggerStepId = automation.definition.trigger.stepId
|
||||
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
|
||||
// remove from context
|
||||
|
@ -134,7 +135,7 @@ class Orchestrator {
|
|||
}
|
||||
|
||||
async stopCron(reason: string) {
|
||||
if (!this._repeat) {
|
||||
if (!this._job.opts.repeat) {
|
||||
return
|
||||
}
|
||||
logWarn(
|
||||
|
@ -142,7 +143,7 @@ class Orchestrator {
|
|||
)
|
||||
const automation = this._automation
|
||||
const trigger = automation.definition.trigger
|
||||
await disableCron(this._repeat?.jobId, this._repeat?.jobKey)
|
||||
await disableCronById(this._job.id)
|
||||
this.updateExecutionOutput(
|
||||
trigger.id,
|
||||
trigger.stepId,
|
||||
|
@ -156,7 +157,7 @@ class Orchestrator {
|
|||
}
|
||||
|
||||
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> {
|
||||
if (!metadata.errorCount || !this._repeat) {
|
||||
if (!metadata.errorCount || !this._job.opts.repeat) {
|
||||
return false
|
||||
}
|
||||
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
||||
|
@ -475,17 +476,13 @@ class Orchestrator {
|
|||
}
|
||||
}
|
||||
|
||||
export function execute(input: AutomationEvent, callback: WorkerCallback) {
|
||||
const appId = input.data.event.appId
|
||||
export function execute(job: Job, callback: WorkerCallback) {
|
||||
const appId = job.data.event.appId
|
||||
if (!appId) {
|
||||
throw new Error("Unable to execute, event doesn't contain app ID.")
|
||||
}
|
||||
doInAppContext(appId, async () => {
|
||||
const automationOrchestrator = new Orchestrator(
|
||||
input.data.automation,
|
||||
input.data.event,
|
||||
input.opts
|
||||
)
|
||||
const automationOrchestrator = new Orchestrator(job)
|
||||
try {
|
||||
const response = await automationOrchestrator.execute()
|
||||
callback(null, response)
|
||||
|
@ -495,17 +492,13 @@ export function execute(input: AutomationEvent, callback: WorkerCallback) {
|
|||
})
|
||||
}
|
||||
|
||||
export const removeStalled = async (input: AutomationEvent) => {
|
||||
const appId = input.data.event.appId
|
||||
export const removeStalled = async (job: Job) => {
|
||||
const appId = job.data.event.appId
|
||||
if (!appId) {
|
||||
throw new Error("Unable to execute, event doesn't contain app ID.")
|
||||
}
|
||||
await doInAppContext(appId, async () => {
|
||||
const automationOrchestrator = new Orchestrator(
|
||||
input.data.automation,
|
||||
input.data.event,
|
||||
input.opts
|
||||
)
|
||||
const automationOrchestrator = new Orchestrator(job)
|
||||
await automationOrchestrator.stopCron("stalled")
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue