From 82dac5c5882d8957f8b6b962f13a8fcb183b325a Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Thu, 11 Nov 2021 12:11:09 +0000 Subject: [PATCH] Adding worker controls to both automations and queries. --- .../components/integration/QueryViewer.svelte | 1 + packages/server/src/api/controllers/query.js | 22 +++++---- .../src/automations/tests/automation.spec.js | 5 +- packages/server/src/automations/utils.js | 5 +- .../thread.js => threads/automation.js} | 19 ++++--- packages/server/src/threads/index.js | 49 +++++++++++++++++++ .../runner.js => threads/query.js} | 4 +- .../server/src/utilities/queryRunner/index.js | 31 ------------ packages/server/src/utilities/scriptRunner.js | 5 +- 9 files changed, 86 insertions(+), 55 deletions(-) rename packages/server/src/{automations/thread.js => threads/automation.js} (92%) create mode 100644 packages/server/src/threads/index.js rename packages/server/src/{utilities/queryRunner/runner.js => threads/query.js} (92%) delete mode 100644 packages/server/src/utilities/queryRunner/index.js diff --git a/packages/builder/src/components/integration/QueryViewer.svelte b/packages/builder/src/components/integration/QueryViewer.svelte index 7d32db9e4c..54e455b92f 100644 --- a/packages/builder/src/components/integration/QueryViewer.svelte +++ b/packages/builder/src/components/integration/QueryViewer.svelte @@ -157,6 +157,7 @@ onMount(async () => { if (!query || !query._id) { + roleId = Roles.BASIC return } try { diff --git a/packages/server/src/api/controllers/query.js b/packages/server/src/api/controllers/query.js index f1d4a7a91c..7323355c64 100644 --- a/packages/server/src/api/controllers/query.js +++ b/packages/server/src/api/controllers/query.js @@ -3,7 +3,9 @@ const CouchDB = require("../../db") const { generateQueryID, getQueryParams } = require("../../db/utils") const { BaseQueryVerbs } = require("../../constants") const env = require("../../environment") -const queryRunner = require("../../utilities/queryRunner") +const { Thread, ThreadType } = require("../../threads") + +const Runner = new Thread(ThreadType.QUERY, { timeoutMs: 10000 }) // simple function to append "readable" to all read queries function enrichQueries(input) { @@ -104,12 +106,12 @@ exports.preview = async function (ctx) { const { fields, parameters, queryVerb, transformer } = ctx.request.body const enrichedQuery = await enrichQueryFields(fields, parameters) - const { rows, keys } = await queryRunner( + const { rows, keys } = await Runner.run({ datasource, queryVerb, - enrichedQuery, - transformer - ) + query: enrichedQuery, + transformer, + }) ctx.body = { rows, @@ -129,12 +131,12 @@ exports.execute = async function (ctx) { ) // call the relevant CRUD method on the integration class - const { rows } = await queryRunner( + const { rows } = await Runner.run({ datasource, - query.queryVerb, - enrichedQuery, - query.transformer - ) + queryVerb: query.queryVerb, + query: enrichedQuery, + transformer: query.transformer, + }) ctx.body = rows } diff --git a/packages/server/src/automations/tests/automation.spec.js b/packages/server/src/automations/tests/automation.spec.js index b338b391c7..81d34cea54 100644 --- a/packages/server/src/automations/tests/automation.spec.js +++ b/packages/server/src/automations/tests/automation.spec.js @@ -11,8 +11,7 @@ jest.spyOn(global.console, "error") require("../../environment") const automation = require("../index") -const usageQuota = require("../../utilities/usageQuota") -const thread = require("../thread") +const thread = require("../../threads/automation") const triggers = require("../triggers") const { basicAutomation } = require("../../tests/utilities/structures") const { wait } = require("../../utilities") @@ -54,7 +53,7 @@ describe("Run through some parts of the automations system", () => { } }) await wait(100) - expect(thread).toHaveBeenCalledWith(makePartial({ + expect().toHaveBeenCalledWith(makePartial({ data: { event: { fields: { diff --git a/packages/server/src/automations/utils.js b/packages/server/src/automations/utils.js index f2d1bf5699..58a2829a7a 100644 --- a/packages/server/src/automations/utils.js +++ b/packages/server/src/automations/utils.js @@ -1,4 +1,4 @@ -const runner = require("./thread") +const { Thread, ThreadType } = require("../threads") const { definitions } = require("./triggerInfo") const webhooks = require("../api/controllers/webhook") const CouchDB = require("../db") @@ -10,11 +10,12 @@ const { getDeployedAppID } = require("@budibase/auth/db") const WH_STEP_ID = definitions.WEBHOOK.stepId const CRON_STEP_ID = definitions.CRON.stepId +const Runner = new Thread(ThreadType.AUTOMATION) exports.processEvent = async job => { try { // need to actually await these so that an error can be captured properly - return await runner(job) + return await Runner.run(job) } catch (err) { console.error( `${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}` diff --git a/packages/server/src/automations/thread.js b/packages/server/src/threads/automation.js similarity index 92% rename from packages/server/src/automations/thread.js rename to packages/server/src/threads/automation.js index f63a32ebb1..c798fefbe0 100644 --- a/packages/server/src/automations/thread.js +++ b/packages/server/src/threads/automation.js @@ -1,5 +1,5 @@ -const actions = require("./actions") -const automationUtils = require("./automationUtils") +const actions = require("../automations/actions") +const automationUtils = require("../automations/automationUtils") const AutomationEmitter = require("../events/AutomationEmitter") const { processObject } = require("@budibase/string-templates") const { DEFAULT_TENANT_ID } = require("@budibase/auth").constants @@ -119,10 +119,17 @@ class Orchestrator { } } -module.exports = async job => { +module.exports = (input, callback) => { const automationOrchestrator = new Orchestrator( - job.data.automation, - job.data.event + input.data.automation, + input.data.event ) - return automationOrchestrator.execute() + automationOrchestrator + .execute() + .then(response => { + callback(null, response) + }) + .catch(err => { + callback(err) + }) } diff --git a/packages/server/src/threads/index.js b/packages/server/src/threads/index.js new file mode 100644 index 0000000000..909f381fd8 --- /dev/null +++ b/packages/server/src/threads/index.js @@ -0,0 +1,49 @@ +const workerFarm = require("worker-farm") + +const ThreadType = { + QUERY: "query", + AUTOMATION: "automation", +} + +function typeToFile(type) { + let filename = null + switch (type) { + case ThreadType.QUERY: + filename = "./query" + break + case ThreadType.AUTOMATION: + filename = "./automation" + break + default: + throw "Unknown thread type" + } + return require.resolve(filename) +} + +class Thread { + constructor(type, opts = { timeoutMs: null, count: 1 }) { + const workerOpts = { + autoStart: true, + maxConcurrentWorkers: opts.count ? opts.count : 1, + } + if (opts.timeoutMs) { + workerOpts.maxCallTime = opts.timeoutMs + } + this.workers = workerFarm(workerOpts, typeToFile(type)) + } + + run(data) { + return new Promise((resolve, reject) => { + this.workers(data, (err, response) => { + if (err) { + reject(err) + } else { + resolve(response) + } + }) + }) + } +} + +module.exports.Thread = Thread +module.exports.ThreadType = ThreadType diff --git a/packages/server/src/utilities/queryRunner/runner.js b/packages/server/src/threads/query.js similarity index 92% rename from packages/server/src/utilities/queryRunner/runner.js rename to packages/server/src/threads/query.js index 11ffbaa33d..ca8bf2161c 100644 --- a/packages/server/src/utilities/queryRunner/runner.js +++ b/packages/server/src/threads/query.js @@ -1,5 +1,5 @@ -const ScriptRunner = require("../scriptRunner") -const { integrations } = require("../../integrations") +const ScriptRunner = require("../utilities/scriptRunner") +const { integrations } = require("../integrations") function formatResponse(resp) { if (typeof resp === "string") { diff --git a/packages/server/src/utilities/queryRunner/index.js b/packages/server/src/utilities/queryRunner/index.js deleted file mode 100644 index 0660037a84..0000000000 --- a/packages/server/src/utilities/queryRunner/index.js +++ /dev/null @@ -1,31 +0,0 @@ -const workerFarm = require("worker-farm") -const MAX_WORKER_TIME_MS = 10000 -const workers = workerFarm( - { - autoStart: true, - maxConcurrentWorkers: 1, - maxCallTime: MAX_WORKER_TIME_MS, - }, - require.resolve("./runner") -) - -function runService(data) { - return new Promise((resolve, reject) => { - workers(data, (err, response) => { - if (err) { - reject(err) - } else { - resolve(response) - } - }) - }) -} - -module.exports = async (datasource, queryVerb, query, transformer) => { - return runService({ - datasource, - queryVerb, - query, - transformer, - }) -} diff --git a/packages/server/src/utilities/scriptRunner.js b/packages/server/src/utilities/scriptRunner.js index 9b362e1451..f65e0e1b7b 100644 --- a/packages/server/src/utilities/scriptRunner.js +++ b/packages/server/src/utilities/scriptRunner.js @@ -1,10 +1,13 @@ const fetch = require("node-fetch") const { VM, VMScript } = require("vm2") +const JS_TIMEOUT_MS = 1000 class ScriptRunner { constructor(script, context) { const code = `let fn = () => {\n${script}\n}; results.out = fn();` - this.vm = new VM() + this.vm = new VM({ + timeout: JS_TIMEOUT_MS, + }) this.results = { out: "" } this.vm.setGlobals(context) this.vm.setGlobal("fetch", fetch)