From ba4c79895fdf056a3e7261dee590fc81e596da0e Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Tue, 16 Nov 2021 18:58:24 +0000 Subject: [PATCH] Updating to manage completed events piling up, this will periodically clear out all events - repeats cannot be removed with the removeOnCompleted job option. --- packages/server/src/automations/bullboard.js | 19 +++++++++++++++++-- packages/server/src/automations/triggers.js | 8 +++++++- packages/server/src/threads/automation.js | 12 +++++++++++- .../src/utilities/queue/inMemoryQueue.js | 7 +++++++ 4 files changed, 42 insertions(+), 4 deletions(-) diff --git a/packages/server/src/automations/bullboard.js b/packages/server/src/automations/bullboard.js index f9204dacf6..50a177781b 100644 --- a/packages/server/src/automations/bullboard.js +++ b/packages/server/src/automations/bullboard.js @@ -9,12 +9,27 @@ const { JobQueues } = require("../constants") const { utils } = require("@budibase/auth/redis") const { opts, redisProtocolUrl } = utils.getRedisOptions() -const redisConfig = redisProtocolUrl || { redis: opts } -let automationQueue = new Queue(JobQueues.AUTOMATIONS, redisConfig) +const CLEANUP_PERIOD_MS = 60 * 1000 +const queueConfig = redisProtocolUrl || { redis: opts } +let cleanupInternal = null + +let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig) + +async function cleanup() { + await automationQueue.clean(CLEANUP_PERIOD_MS, "completed") +} exports.pathPrefix = "/bulladmin" exports.init = () => { + // cleanup the events every 5 minutes + if (!cleanupInternal) { + cleanupInternal = setInterval(cleanup, CLEANUP_PERIOD_MS) + // fire off an initial cleanup + cleanup().catch(err => { + console.error(`Unable to cleanup automation queue initially - ${err}`) + }) + } const expressApp = express() // Set up queues for bull board admin const queues = [automationQueue] diff --git a/packages/server/src/automations/triggers.js b/packages/server/src/automations/triggers.js index e6c722ce3e..0457e17335 100644 --- a/packages/server/src/automations/triggers.js +++ b/packages/server/src/automations/triggers.js @@ -47,7 +47,13 @@ async function queueRelevantRowAutomations(event, eventType) { automationTrigger.inputs && automationTrigger.inputs.tableId === event.row.tableId ) { - await queue.add({ automation, event }) + await queue.add( + { automation, event }, + { + removeOnComplete: true, + removeOnFail: true, + } + ) } } } diff --git a/packages/server/src/threads/automation.js b/packages/server/src/threads/automation.js index c798fefbe0..1cd751e895 100644 --- a/packages/server/src/threads/automation.js +++ b/packages/server/src/threads/automation.js @@ -8,8 +8,10 @@ const { DocumentTypes, isDevAppID } = require("../db/utils") const { doInTenant } = require("@budibase/auth/tenancy") const env = require("../environment") const usage = require("../utilities/usageQuota") +const { definitions: triggerDefs } = require("../automations/triggerInfo") const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId +const CRON_STEP_ID = triggerDefs.CRON.stepId const STOPPED_STATUS = { success: false, status: "STOPPED" } /** @@ -23,6 +25,8 @@ class Orchestrator { this._chainCount = this._metadata ? this._metadata.automationChainCount : 0 this._appId = triggerOutput.appId this._app = null + const triggerStepId = automation.definition.trigger.stepId + triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput) // remove from context delete triggerOutput.appId delete triggerOutput.metadata @@ -34,11 +38,17 @@ class Orchestrator { this._emitter = new AutomationEmitter(this._chainCount + 1) this.executionOutput = { trigger: {}, steps: [] } // setup the execution output - const triggerStepId = automation.definition.trigger.stepId const triggerId = automation.definition.trigger.id this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput) } + cleanupTriggerOutputs(stepId, triggerOutput) { + if (stepId === CRON_STEP_ID) { + triggerOutput.timestamp = Date.now() + } + return triggerOutput + } + async getStepFunctionality(stepId) { let step = await actions.getAction(stepId) if (step == null) { diff --git a/packages/server/src/utilities/queue/inMemoryQueue.js b/packages/server/src/utilities/queue/inMemoryQueue.js index c57bac978c..dbd49c43c0 100644 --- a/packages/server/src/utilities/queue/inMemoryQueue.js +++ b/packages/server/src/utilities/queue/inMemoryQueue.js @@ -89,6 +89,13 @@ class InMemoryQueue { getRepeatableJobs() { return [] } + + /** + * Implemented for tests + */ + async clean() { + return [] + } } module.exports = InMemoryQueue