From fc38871b666e80e4f6cdc370d6ec04733305a212 Mon Sep 17 00:00:00 2001 From: mike12345567 Date: Fri, 11 Sep 2020 18:47:22 +0100 Subject: [PATCH] Adding threading for when not running against PouchDB. --- packages/server/package.json | 1 + packages/server/src/workflows/index.js | 86 +++++++---------------- packages/server/src/workflows/thread.js | 67 ++++++++++++++++++ packages/server/src/workflows/triggers.js | 3 + packages/server/yarn.lock | 9 ++- 5 files changed, 104 insertions(+), 62 deletions(-) create mode 100644 packages/server/src/workflows/thread.js diff --git a/packages/server/package.json b/packages/server/package.json index ba6097d436..31f6e7638b 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -74,6 +74,7 @@ "tar-fs": "^2.1.0", "uuid": "^3.3.2", "validate.js": "^0.13.1", + "worker-farm": "^1.7.0", "yargs": "^13.2.4", "zlib": "^1.0.5" }, diff --git a/packages/server/src/workflows/index.js b/packages/server/src/workflows/index.js index 3f54b9a58e..0ea885454f 100644 --- a/packages/server/src/workflows/index.js +++ b/packages/server/src/workflows/index.js @@ -1,67 +1,31 @@ -const mustache = require("mustache") -const actions = require("./actions") -const logic = require("./logic") const triggers = require("./triggers") +const workerFarm = require("worker-farm") +const CouchDB = require("../db/client") +const singleThread = require("./thread") -/** - * The workflow orchestrator is a class responsible for executing workflows. - * It relies on the strategy pattern, which allows composable behaviour to be - * passed into its execute() function. This allows custom execution behaviour based - * on where the orchestrator is run. - * - */ -class Orchestrator { - constructor(workflow) { - this._context = {} - this._workflow = workflow - } +let workers = workerFarm(require.resolve("./thread")) - async getStep(type, stepId) { - let step = null - if (type === "ACTION") { - step = await actions.getAction(stepId) - } else if (type === "LOGIC") { - step = logic.getLogic(stepId) - } - if (step == null) { - throw `Cannot find workflow step by name ${stepId}` - } - return step - } - - async execute(context) { - let workflow = this._workflow - if (!workflow.live) { - return - } - for (let block of workflow.definition.steps) { - let step = await this.getStep(block.type, block.stepId) - let args = { ...block.args } - // bind the workflow action args to the workflow context, if required - for (let arg of Object.keys(args)) { - const argValue = args[arg] - // We don't want to render mustache templates on non-strings - if (typeof argValue !== "string") continue - - args[arg] = mustache.render(argValue, { context: this._context }) +function runWorker(job) { + return new Promise((resolve, reject) => { + workers(job, err => { + if (err) { + reject(err) + } else { + resolve() } - const response = await step({ - args, - context, - }) - - this._context = { - ...this._context, - [block.id]: response, - } - } - } -} - -module.exports.init = function() { - triggers.workflowQueue.process(async job => { - // Create orchestrator for each individual workflow (their own context) - const workflowOrchestrator = new Orchestrator(job.data.workflow) - await workflowOrchestrator.execute(job.data.event) + }) + }) +} + +/** + * This module is built purely to kick off the worker farm and manage the inputs/outputs + */ +module.exports.init = function() { + triggers.workflowQueue.process(async job => { + if (CouchDB.preferredAdapters != null && CouchDB.preferredAdapters[0] !== "leveldb") { + await runWorker(job) + } else { + await singleThread(job) + } }) } diff --git a/packages/server/src/workflows/thread.js b/packages/server/src/workflows/thread.js new file mode 100644 index 0000000000..bf1fe41a3f --- /dev/null +++ b/packages/server/src/workflows/thread.js @@ -0,0 +1,67 @@ +const mustache = require("mustache") +const actions = require("./actions") +const logic = require("./logic") + +/** + * The workflow orchestrator is a class responsible for executing workflows. + * It handles the context of the workflow and makes sure each step gets the correct + * inputs and handles any outputs. + */ +class Orchestrator { + constructor(workflow) { + this._context = {} + this._workflow = workflow + } + + async getStep(type, stepId) { + let step = null + if (type === "ACTION") { + step = await actions.getAction(stepId) + } else if (type === "LOGIC") { + step = logic.getLogic(stepId) + } + if (step == null) { + throw `Cannot find workflow step by name ${stepId}` + } + return step + } + + async execute(context) { + let workflow = this._workflow + for (let block of workflow.definition.steps) { + let step = await this.getStep(block.type, block.stepId) + let args = { ...block.args } + // bind the workflow action args to the workflow context, if required + for (let arg of Object.keys(args)) { + const argValue = args[arg] + // We don't want to render mustache templates on non-strings + if (typeof argValue !== "string") continue + + args[arg] = mustache.render(argValue, { context: this._context }) + } + const response = await step({ + args, + context, + }) + + this._context = { + ...this._context, + [block.id]: response, + } + } + } +} + +module.exports = async (job, cb = null) => { + try { + const workflowOrchestrator = new Orchestrator(job.data.workflow) + await workflowOrchestrator.execute(job.data.event) + if (cb) { + cb() + } + } catch (err) { + if (cb) { + cb(err) + } + } +} diff --git a/packages/server/src/workflows/triggers.js b/packages/server/src/workflows/triggers.js index c72afca301..808093d75d 100644 --- a/packages/server/src/workflows/triggers.js +++ b/packages/server/src/workflows/triggers.js @@ -13,6 +13,9 @@ async function queueRelevantWorkflows(event, eventType) { const workflows = workflowsToTrigger.rows.map(wf => wf.doc) for (let workflow of workflows) { + if (!workflow.live) { + continue + } workflowQueue.add({ workflow, event }) } } diff --git a/packages/server/yarn.lock b/packages/server/yarn.lock index 0303280138..be975e6fc5 100644 --- a/packages/server/yarn.lock +++ b/packages/server/yarn.lock @@ -1912,7 +1912,7 @@ env-paths@^2.2.0: version "2.2.0" resolved "https://registry.yarnpkg.com/env-paths/-/env-paths-2.2.0.tgz#cdca557dc009152917d6166e2febe1f039685e43" -errno@~0.1.1: +errno@~0.1.1, errno@~0.1.7: version "0.1.7" resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.7.tgz#4684d71779ad39af177e3f007996f7c67c852618" dependencies: @@ -6238,6 +6238,13 @@ word-wrap@~1.2.3: version "1.2.3" resolved "https://registry.yarnpkg.com/word-wrap/-/word-wrap-1.2.3.tgz#610636f6b1f703891bd34771ccb17fb93b47079c" +worker-farm@^1.7.0: + version "1.7.0" + resolved "https://registry.yarnpkg.com/worker-farm/-/worker-farm-1.7.0.tgz#26a94c5391bbca926152002f69b84a4bf772e5a8" + integrity sha512-rvw3QTZc8lAxyVrqcSGVm5yP/IJ2UcB3U0graE3LCFoZ0Yn2x4EoVSqJKdB/T5M+FLcRPjz4TDacRf3OCfNUzw== + dependencies: + errno "~0.1.7" + wrap-ansi@^5.1.0: version "5.1.0" resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-5.1.0.tgz#1fd1f67235d5b6d0fee781056001bfb694c03b09"