Updating to manage completed events piling up, this will periodically clear out all events - repeats cannot be removed with the removeOnCompleted job option.

This commit is contained in:
mike12345567 2021-11-16 18:58:24 +00:00
parent 599702bfe9
commit 11debac115
4 changed files with 42 additions and 4 deletions

View File

@ -9,12 +9,27 @@ const { JobQueues } = require("../constants")
const { utils } = require("@budibase/auth/redis") const { utils } = require("@budibase/auth/redis")
const { opts, redisProtocolUrl } = utils.getRedisOptions() const { opts, redisProtocolUrl } = utils.getRedisOptions()
const redisConfig = redisProtocolUrl || { redis: opts } const CLEANUP_PERIOD_MS = 60 * 1000
let automationQueue = new Queue(JobQueues.AUTOMATIONS, redisConfig) const queueConfig = redisProtocolUrl || { redis: opts }
let cleanupInternal = null
let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig)
async function cleanup() {
await automationQueue.clean(CLEANUP_PERIOD_MS, "completed")
}
exports.pathPrefix = "/bulladmin" exports.pathPrefix = "/bulladmin"
exports.init = () => { exports.init = () => {
// cleanup the events every 5 minutes
if (!cleanupInternal) {
cleanupInternal = setInterval(cleanup, CLEANUP_PERIOD_MS)
// fire off an initial cleanup
cleanup().catch(err => {
console.error(`Unable to cleanup automation queue initially - ${err}`)
})
}
const expressApp = express() const expressApp = express()
// Set up queues for bull board admin // Set up queues for bull board admin
const queues = [automationQueue] const queues = [automationQueue]

View File

@ -47,7 +47,13 @@ async function queueRelevantRowAutomations(event, eventType) {
automationTrigger.inputs && automationTrigger.inputs &&
automationTrigger.inputs.tableId === event.row.tableId automationTrigger.inputs.tableId === event.row.tableId
) { ) {
await queue.add({ automation, event }) await queue.add(
{ automation, event },
{
removeOnComplete: true,
removeOnFail: true,
}
)
} }
} }
} }

View File

@ -8,8 +8,10 @@ const { DocumentTypes, isDevAppID } = require("../db/utils")
const { doInTenant } = require("@budibase/auth/tenancy") const { doInTenant } = require("@budibase/auth/tenancy")
const env = require("../environment") const env = require("../environment")
const usage = require("../utilities/usageQuota") const usage = require("../utilities/usageQuota")
const { definitions: triggerDefs } = require("../automations/triggerInfo")
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId
const CRON_STEP_ID = triggerDefs.CRON.stepId
const STOPPED_STATUS = { success: false, status: "STOPPED" } const STOPPED_STATUS = { success: false, status: "STOPPED" }
/** /**
@ -23,6 +25,8 @@ class Orchestrator {
this._chainCount = this._metadata ? this._metadata.automationChainCount : 0 this._chainCount = this._metadata ? this._metadata.automationChainCount : 0
this._appId = triggerOutput.appId this._appId = triggerOutput.appId
this._app = null this._app = null
const triggerStepId = automation.definition.trigger.stepId
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
// remove from context // remove from context
delete triggerOutput.appId delete triggerOutput.appId
delete triggerOutput.metadata delete triggerOutput.metadata
@ -34,11 +38,17 @@ class Orchestrator {
this._emitter = new AutomationEmitter(this._chainCount + 1) this._emitter = new AutomationEmitter(this._chainCount + 1)
this.executionOutput = { trigger: {}, steps: [] } this.executionOutput = { trigger: {}, steps: [] }
// setup the execution output // setup the execution output
const triggerStepId = automation.definition.trigger.stepId
const triggerId = automation.definition.trigger.id const triggerId = automation.definition.trigger.id
this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput) this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput)
} }
cleanupTriggerOutputs(stepId, triggerOutput) {
if (stepId === CRON_STEP_ID) {
triggerOutput.timestamp = Date.now()
}
return triggerOutput
}
async getStepFunctionality(stepId) { async getStepFunctionality(stepId) {
let step = await actions.getAction(stepId) let step = await actions.getAction(stepId)
if (step == null) { if (step == null) {

View File

@ -89,6 +89,13 @@ class InMemoryQueue {
getRepeatableJobs() { getRepeatableJobs() {
return [] return []
} }
/**
* Implemented for tests
*/
async clean() {
return []
}
} }
module.exports = InMemoryQueue module.exports = InMemoryQueue