diff --git a/packages/backend-core/package.json b/packages/backend-core/package.json index abcf72491a..404b7da346 100644 --- a/packages/backend-core/package.json +++ b/packages/backend-core/package.json @@ -26,6 +26,7 @@ "aws-sdk": "2.1030.0", "bcrypt": "5.0.1", "bcryptjs": "2.4.3", + "bull": "^4.10.1", "dotenv": "16.0.1", "emitter-listener": "1.1.2", "ioredis": "4.28.0", diff --git a/packages/backend-core/src/index.ts b/packages/backend-core/src/index.ts index 42cad17620..659a56c051 100644 --- a/packages/backend-core/src/index.ts +++ b/packages/backend-core/src/index.ts @@ -19,6 +19,7 @@ import pino from "./pino" import * as middleware from "./middleware" import plugins from "./plugin" import encryption from "./security/encryption" +import * as queue from "./queue" // mimic the outer package exports import * as db from "./pkg/db" @@ -63,6 +64,7 @@ const core = { ...errorClasses, middleware, encryption, + queue, } export = core diff --git a/packages/backend-core/src/queue/constants.ts b/packages/backend-core/src/queue/constants.ts new file mode 100644 index 0000000000..d8fb3121a3 --- /dev/null +++ b/packages/backend-core/src/queue/constants.ts @@ -0,0 +1,4 @@ +export enum JobQueue { + AUTOMATIONS = "automationQueue", + APP_BACKUPS = "appBackupQueue", +} diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts new file mode 100644 index 0000000000..80ee7362e4 --- /dev/null +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -0,0 +1,127 @@ +import events from "events" + +/** + * Bull works with a Job wrapper around all messages that contains a lot more information about + * the state of the message, this object constructor implements the same schema of Bull jobs + * for the sake of maintaining API consistency. + * @param {string} queue The name of the queue which the message will be carried on. + * @param {object} message The JSON message which will be passed back to the consumer. + * @returns {Object} A new job which can now be put onto the queue, this is mostly an + * internal structure so that an in memory queue can be easily swapped for a Bull queue. + */ +function newJob(queue: string, message: any) { + return { + timestamp: Date.now(), + queue: queue, + data: message, + } +} + +/** + * This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock. + * It is relatively simple, using an event emitter internally to register when messages are available + * to the consumers - in can support many inputs and many consumers. + */ +class InMemoryQueue { + _name: string + _opts?: any + _messages: any[] + _emitter: EventEmitter + /** + * The constructor the queue, exactly the same as that of Bulls. + * @param {string} name The name of the queue which is being configured. + * @param {object|null} opts This is not used by the in memory queue as there is no real use + * case when in memory, but is the same API as Bull + */ + constructor(name: string, opts = null) { + this._name = name + this._opts = opts + this._messages = [] + this._emitter = new events.EventEmitter() + } + + /** + * Same callback API as Bull, each callback passed to this will consume messages as they are + * available. Please note this is a queue service, not a notification service, so each + * consumer will receive different messages. + * @param {function} func The callback function which will return a "Job", the same + * as the Bull API, within this job the property "data" contains the JSON message. Please + * note this is incredibly limited compared to Bull as in reality the Job would contain + * a lot more information about the queue and current status of Bull cluster. + */ + process(func: any) { + this._emitter.on("message", async () => { + if (this._messages.length <= 0) { + return + } + let msg = this._messages.shift() + let resp = func(msg) + if (resp.then != null) { + await resp + } + }) + } + + // simply puts a message to the queue and emits to the queue for processing + /** + * Simple function to replicate the add message functionality of Bull, putting + * a new message on the queue. This then emits an event which will be used to + * return the message to a consumer (if one is attached). + * @param {object} msg A message to be transported over the queue, this should be + * a JSON message as this is required by Bull. + * @param {boolean} repeat serves no purpose for the import queue. + */ + // eslint-disable-next-line no-unused-vars + add(msg: any, repeat: boolean) { + if (typeof msg !== "object") { + throw "Queue only supports carrying JSON." + } + this._messages.push(newJob(this._name, msg)) + this._emitter.emit("message") + } + + /** + * replicating the close function from bull, which waits for jobs to finish. + */ + async close() { + return [] + } + + /** + * This removes a cron which has been implemented, this is part of Bull API. + * @param {string} cronJobId The cron which is to be removed. + */ + removeRepeatableByKey(cronJobId: string) { + // TODO: implement for testing + console.log(cronJobId) + } + + /** + * Implemented for tests + */ + getRepeatableJobs() { + return [] + } + + // eslint-disable-next-line no-unused-vars + removeJobs(pattern: string) { + // no-op + } + + /** + * Implemented for tests + */ + async clean() { + return [] + } + + async getJob() { + return {} + } + + on() { + // do nothing + } +} + +export = InMemoryQueue diff --git a/packages/backend-core/src/queue/index.ts b/packages/backend-core/src/queue/index.ts new file mode 100644 index 0000000000..b7d565ba13 --- /dev/null +++ b/packages/backend-core/src/queue/index.ts @@ -0,0 +1,2 @@ +export * from "./queue" +export * from "./constants" diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts new file mode 100644 index 0000000000..de2c738ca4 --- /dev/null +++ b/packages/backend-core/src/queue/queue.ts @@ -0,0 +1,47 @@ +import env from "../environment" +import { getRedisOptions } from "../redis/utils" +import { JobQueue } from "./constants" +import inMemoryQueue from "./inMemoryQueue" +import BullQueue from "bull" +import InMemoryQueue from "./inMemoryQueue" +const { opts, redisProtocolUrl } = getRedisOptions() + +const CLEANUP_PERIOD_MS = 60 * 1000 +let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] +let cleanupInterval: NodeJS.Timeout + +async function cleanup() { + for (let queue of QUEUES) { + await queue.clean(CLEANUP_PERIOD_MS, "completed") + } +} + +export function createQueue(jobQueue: JobQueue) { + const queueConfig: any = redisProtocolUrl || { redis: opts } + let queue: any + if (env.isTest()) { + queue = new BullQueue(jobQueue, queueConfig) + } else { + queue = new inMemoryQueue(jobQueue, queueConfig) + } + QUEUES.push(queue) + if (!cleanupInterval) { + cleanupInterval = setInterval(cleanup, CLEANUP_PERIOD_MS) + // fire off an initial cleanup + cleanup().catch(err => { + console.error(`Unable to cleanup automation queue initially - ${err}`) + }) + } + return queue +} + +exports.shutdown = async () => { + if (QUEUES.length) { + clearInterval(cleanupInterval) + for (let queue of QUEUES) { + await queue.close() + } + QUEUES = [] + } + console.log("Queues shutdown") +} diff --git a/packages/backend-core/yarn.lock b/packages/backend-core/yarn.lock index 6bc9b63728..31d25320c7 100644 --- a/packages/backend-core/yarn.lock +++ b/packages/backend-core/yarn.lock @@ -291,6 +291,11 @@ resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== +"@budibase/types@2.0.30-alpha.3": + version "2.0.30-alpha.3" + resolved "https://registry.yarnpkg.com/@budibase/types/-/types-2.0.30-alpha.3.tgz#cb55bcced75b711cc8a675284fbacaa8ebf1c0f2" + integrity sha512-rHeFVuNbSSE4fMnX6uyrM2r47m+neqFXlVNOkhHU9i7KoIcIZbEYInU8CjUFR2da3ruST9ajXjJ5UenX2+MnTg== + "@hapi/hoek@^9.0.0": version "9.3.0" resolved "https://registry.yarnpkg.com/@hapi/hoek/-/hoek-9.3.0.tgz#8368869dcb735be2e7f5cb7647de78e167a251fb" @@ -543,6 +548,36 @@ semver "^7.3.5" tar "^6.1.11" +"@msgpackr-extract/msgpackr-extract-darwin-arm64@2.1.2": + version "2.1.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-arm64/-/msgpackr-extract-darwin-arm64-2.1.2.tgz#9571b87be3a3f2c46de05585470bc4f3af2f6f00" + integrity sha512-TyVLn3S/+ikMDsh0gbKv2YydKClN8HaJDDpONlaZR+LVJmsxLFUgA+O7zu59h9+f9gX1aj/ahw9wqa6rosmrYQ== + +"@msgpackr-extract/msgpackr-extract-darwin-x64@2.1.2": + version "2.1.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-darwin-x64/-/msgpackr-extract-darwin-x64-2.1.2.tgz#bfbc6936ede2955218f5621a675679a5fe8e6f4c" + integrity sha512-YPXtcVkhmVNoMGlqp81ZHW4dMxK09msWgnxtsDpSiZwTzUBG2N+No2bsr7WMtBKCVJMSD6mbAl7YhKUqkp/Few== + +"@msgpackr-extract/msgpackr-extract-linux-arm64@2.1.2": + version "2.1.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm64/-/msgpackr-extract-linux-arm64-2.1.2.tgz#22555e28382af2922e7450634c8a2f240bb9eb82" + integrity sha512-vHZ2JiOWF2+DN9lzltGbhtQNzDo8fKFGrf37UJrgqxU0yvtERrzUugnfnX1wmVfFhSsF8OxrfqiNOUc5hko1Zg== + +"@msgpackr-extract/msgpackr-extract-linux-arm@2.1.2": + version "2.1.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-arm/-/msgpackr-extract-linux-arm-2.1.2.tgz#ffb6ae1beea7ac572b6be6bf2a8e8162ebdd8be7" + integrity sha512-42R4MAFeIeNn+L98qwxAt360bwzX2Kf0ZQkBBucJ2Ircza3asoY4CDbgiu9VWklq8gWJVSJSJBwDI+c/THiWkA== + +"@msgpackr-extract/msgpackr-extract-linux-x64@2.1.2": + version "2.1.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-linux-x64/-/msgpackr-extract-linux-x64-2.1.2.tgz#7caf62eebbfb1345de40f75e89666b3d4194755f" + integrity sha512-RjRoRxg7Q3kPAdUSC5EUUPlwfMkIVhmaRTIe+cqHbKrGZ4M6TyCA/b5qMaukQ/1CHWrqYY2FbKOAU8Hg0pQFzg== + +"@msgpackr-extract/msgpackr-extract-win32-x64@2.1.2": + version "2.1.2" + resolved "https://registry.yarnpkg.com/@msgpackr-extract/msgpackr-extract-win32-x64/-/msgpackr-extract-win32-x64-2.1.2.tgz#f2d8b9ddd8d191205ed26ce54aba3dfc5ae3e7c9" + integrity sha512-rIZVR48zA8hGkHIK7ED6+ZiXsjRCcAVBJbm8o89OKAMTmEAQ2QvoOxoiu3w2isAaWwzgtQIOFIqHwvZDyLKCvw== + "@shopify/jest-koa-mocks@5.0.1": version "5.0.1" resolved "https://registry.yarnpkg.com/@shopify/jest-koa-mocks/-/jest-koa-mocks-5.0.1.tgz#fba490b6b7985fbb571eb9974897d396a3642e94" @@ -1497,6 +1532,21 @@ buffer@^5.5.0, buffer@^5.6.0: base64-js "^1.3.1" ieee754 "^1.1.13" +bull@^4.10.1: + version "4.10.1" + resolved "https://registry.yarnpkg.com/bull/-/bull-4.10.1.tgz#f14974b6089358b62b495a2cbf838aadc098e43f" + integrity sha512-Fp21tRPb2EaZPVfmM+ONZKVz2RA+to+zGgaTLyCKt3JMSU8OOBqK8143OQrnGuGpsyE5G+9FevFAGhdZZfQP2g== + dependencies: + cron-parser "^4.2.1" + debuglog "^1.0.0" + get-port "^5.1.1" + ioredis "^4.28.5" + lodash "^4.17.21" + msgpackr "^1.5.2" + p-timeout "^3.2.0" + semver "^7.3.2" + uuid "^8.3.0" + cache-content-type@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/cache-content-type/-/cache-content-type-1.0.1.tgz#035cde2b08ee2129f4a8315ea8f00a00dba1453c" @@ -1764,6 +1814,13 @@ core-util-is@~1.0.0: resolved "https://registry.yarnpkg.com/core-util-is/-/core-util-is-1.0.3.tgz#a6042d3634c2b27e9328f837b965fac83808db85" integrity sha512-ZQBvi1DcpJ4GDqanjucZ2Hj3wEO5pZDS89BWbkcrvdxksJorwUDDZamX9ldFkp9aw2lmBDLgkObEA4DWNJ9FYQ== +cron-parser@^4.2.1: + version "4.6.0" + resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.6.0.tgz#404c3fdbff10ae80eef6b709555d577ef2fd2e0d" + integrity sha512-guZNLMGUgg6z4+eGhmHGw7ft+v6OQeuHzd1gcLxCo9Yg/qoxmG3nindp2/uwGCLizEisf2H0ptqeVXeoCpP6FA== + dependencies: + luxon "^3.0.1" + cross-spawn@^7.0.3: version "7.0.3" resolved "https://registry.yarnpkg.com/cross-spawn/-/cross-spawn-7.0.3.tgz#f73a85b9d5d41d045551c177e2882d4ac85728a6" @@ -1837,6 +1894,11 @@ debug@~3.1.0: dependencies: ms "2.0.0" +debuglog@^1.0.0: + version "1.0.1" + resolved "https://registry.yarnpkg.com/debuglog/-/debuglog-1.0.1.tgz#aa24ffb9ac3df9a2351837cfb2d279360cd78492" + integrity sha512-syBZ+rnAK3EgMsH2aYEOLUW7mZSY9Gb+0wUMCFsZvcmiz+HigA0LOcq/HoQqVuGG+EKykunc7QG2bzrponfaSw== + decimal.js@^10.2.1: version "10.3.1" resolved "https://registry.yarnpkg.com/decimal.js/-/decimal.js-10.3.1.tgz#d8c3a444a9c6774ba60ca6ad7261c3a94fd5e783" @@ -2318,6 +2380,11 @@ get-package-type@^0.1.0: resolved "https://registry.yarnpkg.com/get-package-type/-/get-package-type-0.1.0.tgz#8de2d803cff44df3bc6c456e6668b36c3926e11a" integrity sha512-pjzuKtY64GYfWizNAJ0fr9VqttZkNiK2iS430LtIHzjBEr6bX8Am2zm4sW4Ro5wjWW5cAlRL1qAMTcXbjNAO2Q== +get-port@^5.1.1: + version "5.1.1" + resolved "https://registry.yarnpkg.com/get-port/-/get-port-5.1.1.tgz#0469ed07563479de6efb986baf053dcd7d4e3193" + integrity sha512-g/Q1aTSDOxFpchXC4i8ZWvxA1lnPqx/JHqcpIw0/LX9T8x/GBbi6YnlN5nhaKIFkT8oFsscUKgDJYxfwfS6QsQ== + get-stream@^4.1.0: version "4.1.0" resolved "https://registry.yarnpkg.com/get-stream/-/get-stream-4.1.0.tgz#c1b255575f3dc21d59bfc79cd3d2b46b1c3a54b5" @@ -2652,6 +2719,23 @@ ioredis@4.28.0: redis-parser "^3.0.0" standard-as-callback "^2.1.0" +ioredis@^4.28.5: + version "4.28.5" + resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-4.28.5.tgz#5c149e6a8d76a7f8fa8a504ffc85b7d5b6797f9f" + integrity sha512-3GYo0GJtLqgNXj4YhrisLaNNvWSNwSS2wS4OELGfGxH8I69+XfNdnmV1AyN+ZqMh0i7eX+SWjrwFKDBDgfBC1A== + dependencies: + cluster-key-slot "^1.1.0" + debug "^4.3.1" + denque "^1.1.0" + lodash.defaults "^4.2.0" + lodash.flatten "^4.4.0" + lodash.isarguments "^3.1.0" + p-map "^2.1.0" + redis-commands "1.7.0" + redis-errors "^1.2.0" + redis-parser "^3.0.0" + standard-as-callback "^2.1.0" + is-arrayish@^0.2.1: version "0.2.1" resolved "https://registry.yarnpkg.com/is-arrayish/-/is-arrayish-0.2.1.tgz#77c99840527aa8ecb1a8ba697b80645a7a926a9d" @@ -3725,6 +3809,11 @@ ltgt@2.2.1, ltgt@^2.1.2, ltgt@~2.2.0: resolved "https://registry.yarnpkg.com/ltgt/-/ltgt-2.2.1.tgz#f35ca91c493f7b73da0e07495304f17b31f87ee5" integrity sha512-AI2r85+4MquTw9ZYqabu4nMwy9Oftlfa/e/52t9IjtfG+mGBbTNdAoZ3RQKLHR6r0wQnwZnPIEh/Ya6XTWAKNA== +luxon@^3.0.1: + version "3.0.4" + resolved "https://registry.yarnpkg.com/luxon/-/luxon-3.0.4.tgz#d179e4e9f05e092241e7044f64aaa54796b03929" + integrity sha512-aV48rGUwP/Vydn8HT+5cdr26YYQiUZ42NM6ToMoaGKwYfWbfLeRkEu1wXWMHBZT6+KyLfcbbtVcoQFCbbPjKlw== + make-dir@^3.0.0, make-dir@^3.1.0: version "3.1.0" resolved "https://registry.yarnpkg.com/make-dir/-/make-dir-3.1.0.tgz#415e967046b3a7f1d185277d84aa58203726a13f" @@ -3872,6 +3961,27 @@ ms@^2.1.1, ms@^2.1.3: resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2" integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA== +msgpackr-extract@^2.1.2: + version "2.1.2" + resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-2.1.2.tgz#56272030f3e163e1b51964ef8b1cd5e7240c03ed" + integrity sha512-cmrmERQFb19NX2JABOGtrKdHMyI6RUyceaPBQ2iRz9GnDkjBWFjNJC0jyyoOfZl2U/LZE3tQCCQc4dlRyA8mcA== + dependencies: + node-gyp-build-optional-packages "5.0.3" + optionalDependencies: + "@msgpackr-extract/msgpackr-extract-darwin-arm64" "2.1.2" + "@msgpackr-extract/msgpackr-extract-darwin-x64" "2.1.2" + "@msgpackr-extract/msgpackr-extract-linux-arm" "2.1.2" + "@msgpackr-extract/msgpackr-extract-linux-arm64" "2.1.2" + "@msgpackr-extract/msgpackr-extract-linux-x64" "2.1.2" + "@msgpackr-extract/msgpackr-extract-win32-x64" "2.1.2" + +msgpackr@^1.5.2: + version "1.7.2" + resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.7.2.tgz#68d6debf5999d6b61abb6e7046a689991ebf7261" + integrity sha512-mWScyHTtG6TjivXX9vfIy2nBtRupaiAj0HQ2mtmpmYujAmqZmaaEVPaSZ1NKLMvicaMLFzEaMk0ManxMRg8rMQ== + optionalDependencies: + msgpackr-extract "^2.1.2" + napi-macros@~2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/napi-macros/-/napi-macros-2.0.0.tgz#2b6bae421e7b96eb687aa6c77a7858640670001b" @@ -3919,6 +4029,11 @@ node-forge@^0.7.1: resolved "https://registry.yarnpkg.com/node-forge/-/node-forge-0.7.6.tgz#fdf3b418aee1f94f0ef642cd63486c77ca9724ac" integrity sha512-sol30LUpz1jQFBjOKwbjxijiE3b6pjd74YwfD0fJOKPjF+fONKb2Yg8rYgS6+bK6VDl+/wfr4IYpC7jDzLUIfw== +node-gyp-build-optional-packages@5.0.3: + version "5.0.3" + resolved "https://registry.yarnpkg.com/node-gyp-build-optional-packages/-/node-gyp-build-optional-packages-5.0.3.tgz#92a89d400352c44ad3975010368072b41ad66c17" + integrity sha512-k75jcVzk5wnnc/FMxsf4udAoTEUv2jY3ycfdSd3yWu6Cnd1oee6/CfZJApyscA4FJOmdoixWwiwOyf16RzD5JA== + node-gyp-build@~4.1.0: version "4.1.1" resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.1.1.tgz#d7270b5d86717068d114cc57fff352f96d745feb" @@ -4075,6 +4190,11 @@ p-cancelable@^1.0.0: resolved "https://registry.yarnpkg.com/p-cancelable/-/p-cancelable-1.1.0.tgz#d078d15a3af409220c886f1d9a0ca2e441ab26cc" integrity sha512-s73XxOZ4zpt1edZYZzvhqFa6uvQc1vwUa0K0BdtIZgQMAJj9IbebH+JkgKZc9h+B05PKHLOTl4ajG1BmNrVZlw== +p-finally@^1.0.0: + version "1.0.0" + resolved "https://registry.yarnpkg.com/p-finally/-/p-finally-1.0.0.tgz#3fbcfb15b899a44123b34b6dcc18b724336a2cae" + integrity sha512-LICb2p9CB7FS+0eR1oqWnHhp0FljGLZCWBE9aix0Uye9W8LTQPwMTYVGWQWIw9RdQiDg4+epXQODwIYJtSJaow== + p-limit@^2.2.0: version "2.3.0" resolved "https://registry.yarnpkg.com/p-limit/-/p-limit-2.3.0.tgz#3dd33c647a214fdfffd835933eb086da0dc21db1" @@ -4094,6 +4214,13 @@ p-map@^2.1.0: resolved "https://registry.yarnpkg.com/p-map/-/p-map-2.1.0.tgz#310928feef9c9ecc65b68b17693018a665cea175" integrity sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw== +p-timeout@^3.2.0: + version "3.2.0" + resolved "https://registry.yarnpkg.com/p-timeout/-/p-timeout-3.2.0.tgz#c7e17abc971d2a7962ef83626b35d635acf23dfe" + integrity sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg== + dependencies: + p-finally "^1.0.0" + p-try@^2.0.0: version "2.2.0" resolved "https://registry.yarnpkg.com/p-try/-/p-try-2.2.0.tgz#cb2868540e313d61de58fafbe35ce9004d5540e6" @@ -5360,7 +5487,7 @@ uuid@8.1.0: resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.1.0.tgz#6f1536eb43249f473abc6bd58ff983da1ca30d8d" integrity sha512-CI18flHDznR0lq54xBycOVmphdCYnQLKn8abKn7PXUiKUGdEd+/l9LWNJmugXel4hXq7S+RMNl34ecyC9TntWg== -uuid@8.3.2, uuid@^8.3.2: +uuid@8.3.2, uuid@^8.3.0, uuid@^8.3.2: version "8.3.2" resolved "https://registry.yarnpkg.com/uuid/-/uuid-8.3.2.tgz#80d5b5ced271bb9af6c445f21a1a04c606cefbe2" integrity sha512-+NYs2QeMWy+GWFOEm9xnn6HCDp0l7QBD7ml8zLUmJ+93Q5NF0NocErnwkTkXVFNiX3/fpC6afS8Dhb/gz7R7eg== diff --git a/packages/server/src/automations/bullboard.js b/packages/server/src/automations/bullboard.js index 3aac6c4fed..b0ccf14634 100644 --- a/packages/server/src/automations/bullboard.js +++ b/packages/server/src/automations/bullboard.js @@ -1,37 +1,15 @@ const { createBullBoard } = require("@bull-board/api") const { BullAdapter } = require("@bull-board/api/bullAdapter") const { KoaAdapter } = require("@bull-board/koa") -const env = require("../environment") -const Queue = env.isTest() - ? require("../utilities/queue/inMemoryQueue") - : require("bull") -const { JobQueues } = require("../constants") -const { utils } = require("@budibase/backend-core/redis") -const { opts, redisProtocolUrl } = utils.getRedisOptions() +const { queue } = require("@budibase/backend-core") const listeners = require("./listeners") -const CLEANUP_PERIOD_MS = 60 * 1000 -const queueConfig = redisProtocolUrl || { redis: opts } -let cleanupInternal = null - -let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig) +let automationQueue = queue.createQueue(queue.JobQueues.AUTOMATIONS) listeners.addListeners(automationQueue) -async function cleanup() { - await automationQueue.clean(CLEANUP_PERIOD_MS, "completed") -} - const PATH_PREFIX = "/bulladmin" exports.init = () => { - // cleanup the events every 5 minutes - if (!cleanupInternal) { - cleanupInternal = setInterval(cleanup, CLEANUP_PERIOD_MS) - // fire off an initial cleanup - cleanup().catch(err => { - console.error(`Unable to cleanup automation queue initially - ${err}`) - }) - } // Set up queues for bull board admin const queues = [automationQueue] const adapters = [] @@ -48,12 +26,7 @@ exports.init = () => { } exports.shutdown = async () => { - if (automationQueue) { - clearInterval(cleanupInternal) - await automationQueue.close() - automationQueue = null - } - console.log("Bull shutdown") + await queue.shutdown() } exports.queue = automationQueue diff --git a/packages/server/src/constants/index.js b/packages/server/src/constants/index.js index b9362cecf6..73bffeabd6 100644 --- a/packages/server/src/constants/index.js +++ b/packages/server/src/constants/index.js @@ -2,10 +2,6 @@ const { BUILTIN_ROLE_IDS } = require("@budibase/backend-core/roles") const { UserStatus } = require("@budibase/backend-core/constants") const { objectStore } = require("@budibase/backend-core") -exports.JobQueues = { - AUTOMATIONS: "automationQueue", -} - const FilterTypes = { STRING: "string", FUZZY: "fuzzy",