Updating to manage completed events piling up, this will periodically clear out all events - repeats cannot be removed with the removeOnCompleted job option.
This commit is contained in:
parent
d7ef5e606c
commit
ba4c79895f
|
@ -9,12 +9,27 @@ const { JobQueues } = require("../constants")
|
||||||
const { utils } = require("@budibase/auth/redis")
|
const { utils } = require("@budibase/auth/redis")
|
||||||
const { opts, redisProtocolUrl } = utils.getRedisOptions()
|
const { opts, redisProtocolUrl } = utils.getRedisOptions()
|
||||||
|
|
||||||
const redisConfig = redisProtocolUrl || { redis: opts }
|
const CLEANUP_PERIOD_MS = 60 * 1000
|
||||||
let automationQueue = new Queue(JobQueues.AUTOMATIONS, redisConfig)
|
const queueConfig = redisProtocolUrl || { redis: opts }
|
||||||
|
let cleanupInternal = null
|
||||||
|
|
||||||
|
let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig)
|
||||||
|
|
||||||
|
async function cleanup() {
|
||||||
|
await automationQueue.clean(CLEANUP_PERIOD_MS, "completed")
|
||||||
|
}
|
||||||
|
|
||||||
exports.pathPrefix = "/bulladmin"
|
exports.pathPrefix = "/bulladmin"
|
||||||
|
|
||||||
exports.init = () => {
|
exports.init = () => {
|
||||||
|
// cleanup the events every 5 minutes
|
||||||
|
if (!cleanupInternal) {
|
||||||
|
cleanupInternal = setInterval(cleanup, CLEANUP_PERIOD_MS)
|
||||||
|
// fire off an initial cleanup
|
||||||
|
cleanup().catch(err => {
|
||||||
|
console.error(`Unable to cleanup automation queue initially - ${err}`)
|
||||||
|
})
|
||||||
|
}
|
||||||
const expressApp = express()
|
const expressApp = express()
|
||||||
// Set up queues for bull board admin
|
// Set up queues for bull board admin
|
||||||
const queues = [automationQueue]
|
const queues = [automationQueue]
|
||||||
|
|
|
@ -47,7 +47,13 @@ async function queueRelevantRowAutomations(event, eventType) {
|
||||||
automationTrigger.inputs &&
|
automationTrigger.inputs &&
|
||||||
automationTrigger.inputs.tableId === event.row.tableId
|
automationTrigger.inputs.tableId === event.row.tableId
|
||||||
) {
|
) {
|
||||||
await queue.add({ automation, event })
|
await queue.add(
|
||||||
|
{ automation, event },
|
||||||
|
{
|
||||||
|
removeOnComplete: true,
|
||||||
|
removeOnFail: true,
|
||||||
|
}
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,8 +8,10 @@ const { DocumentTypes, isDevAppID } = require("../db/utils")
|
||||||
const { doInTenant } = require("@budibase/auth/tenancy")
|
const { doInTenant } = require("@budibase/auth/tenancy")
|
||||||
const env = require("../environment")
|
const env = require("../environment")
|
||||||
const usage = require("../utilities/usageQuota")
|
const usage = require("../utilities/usageQuota")
|
||||||
|
const { definitions: triggerDefs } = require("../automations/triggerInfo")
|
||||||
|
|
||||||
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId
|
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId
|
||||||
|
const CRON_STEP_ID = triggerDefs.CRON.stepId
|
||||||
const STOPPED_STATUS = { success: false, status: "STOPPED" }
|
const STOPPED_STATUS = { success: false, status: "STOPPED" }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -23,6 +25,8 @@ class Orchestrator {
|
||||||
this._chainCount = this._metadata ? this._metadata.automationChainCount : 0
|
this._chainCount = this._metadata ? this._metadata.automationChainCount : 0
|
||||||
this._appId = triggerOutput.appId
|
this._appId = triggerOutput.appId
|
||||||
this._app = null
|
this._app = null
|
||||||
|
const triggerStepId = automation.definition.trigger.stepId
|
||||||
|
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
|
||||||
// remove from context
|
// remove from context
|
||||||
delete triggerOutput.appId
|
delete triggerOutput.appId
|
||||||
delete triggerOutput.metadata
|
delete triggerOutput.metadata
|
||||||
|
@ -34,11 +38,17 @@ class Orchestrator {
|
||||||
this._emitter = new AutomationEmitter(this._chainCount + 1)
|
this._emitter = new AutomationEmitter(this._chainCount + 1)
|
||||||
this.executionOutput = { trigger: {}, steps: [] }
|
this.executionOutput = { trigger: {}, steps: [] }
|
||||||
// setup the execution output
|
// setup the execution output
|
||||||
const triggerStepId = automation.definition.trigger.stepId
|
|
||||||
const triggerId = automation.definition.trigger.id
|
const triggerId = automation.definition.trigger.id
|
||||||
this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput)
|
this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cleanupTriggerOutputs(stepId, triggerOutput) {
|
||||||
|
if (stepId === CRON_STEP_ID) {
|
||||||
|
triggerOutput.timestamp = Date.now()
|
||||||
|
}
|
||||||
|
return triggerOutput
|
||||||
|
}
|
||||||
|
|
||||||
async getStepFunctionality(stepId) {
|
async getStepFunctionality(stepId) {
|
||||||
let step = await actions.getAction(stepId)
|
let step = await actions.getAction(stepId)
|
||||||
if (step == null) {
|
if (step == null) {
|
||||||
|
|
|
@ -89,6 +89,13 @@ class InMemoryQueue {
|
||||||
getRepeatableJobs() {
|
getRepeatableJobs() {
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Implemented for tests
|
||||||
|
*/
|
||||||
|
async clean() {
|
||||||
|
return []
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = InMemoryQueue
|
module.exports = InMemoryQueue
|
||||||
|
|
Loading…
Reference in New Issue