Adding threading for when not running against PouchDB.
This commit is contained in:
parent
ad48b9fdd2
commit
5067d93030
|
@ -74,6 +74,7 @@
|
||||||
"tar-fs": "^2.1.0",
|
"tar-fs": "^2.1.0",
|
||||||
"uuid": "^3.3.2",
|
"uuid": "^3.3.2",
|
||||||
"validate.js": "^0.13.1",
|
"validate.js": "^0.13.1",
|
||||||
|
"worker-farm": "^1.7.0",
|
||||||
"yargs": "^13.2.4",
|
"yargs": "^13.2.4",
|
||||||
"zlib": "^1.0.5"
|
"zlib": "^1.0.5"
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,67 +1,31 @@
|
||||||
const mustache = require("mustache")
|
|
||||||
const actions = require("./actions")
|
|
||||||
const logic = require("./logic")
|
|
||||||
const triggers = require("./triggers")
|
const triggers = require("./triggers")
|
||||||
|
const workerFarm = require("worker-farm")
|
||||||
|
const CouchDB = require("../db/client")
|
||||||
|
const singleThread = require("./thread")
|
||||||
|
|
||||||
/**
|
let workers = workerFarm(require.resolve("./thread"))
|
||||||
* The workflow orchestrator is a class responsible for executing workflows.
|
|
||||||
* It relies on the strategy pattern, which allows composable behaviour to be
|
|
||||||
* passed into its execute() function. This allows custom execution behaviour based
|
|
||||||
* on where the orchestrator is run.
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
class Orchestrator {
|
|
||||||
constructor(workflow) {
|
|
||||||
this._context = {}
|
|
||||||
this._workflow = workflow
|
|
||||||
}
|
|
||||||
|
|
||||||
async getStep(type, stepId) {
|
function runWorker(job) {
|
||||||
let step = null
|
return new Promise((resolve, reject) => {
|
||||||
if (type === "ACTION") {
|
workers(job, err => {
|
||||||
step = await actions.getAction(stepId)
|
if (err) {
|
||||||
} else if (type === "LOGIC") {
|
reject(err)
|
||||||
step = logic.getLogic(stepId)
|
} else {
|
||||||
}
|
resolve()
|
||||||
if (step == null) {
|
|
||||||
throw `Cannot find workflow step by name ${stepId}`
|
|
||||||
}
|
|
||||||
return step
|
|
||||||
}
|
|
||||||
|
|
||||||
async execute(context) {
|
|
||||||
let workflow = this._workflow
|
|
||||||
if (!workflow.live) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for (let block of workflow.definition.steps) {
|
|
||||||
let step = await this.getStep(block.type, block.stepId)
|
|
||||||
let args = { ...block.args }
|
|
||||||
// bind the workflow action args to the workflow context, if required
|
|
||||||
for (let arg of Object.keys(args)) {
|
|
||||||
const argValue = args[arg]
|
|
||||||
// We don't want to render mustache templates on non-strings
|
|
||||||
if (typeof argValue !== "string") continue
|
|
||||||
|
|
||||||
args[arg] = mustache.render(argValue, { context: this._context })
|
|
||||||
}
|
}
|
||||||
const response = await step({
|
})
|
||||||
args,
|
})
|
||||||
context,
|
}
|
||||||
})
|
|
||||||
|
/**
|
||||||
this._context = {
|
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
||||||
...this._context,
|
*/
|
||||||
[block.id]: response,
|
module.exports.init = function() {
|
||||||
}
|
triggers.workflowQueue.process(async job => {
|
||||||
}
|
if (CouchDB.preferredAdapters != null && CouchDB.preferredAdapters[0] !== "leveldb") {
|
||||||
}
|
await runWorker(job)
|
||||||
}
|
} else {
|
||||||
|
await singleThread(job)
|
||||||
module.exports.init = function() {
|
}
|
||||||
triggers.workflowQueue.process(async job => {
|
|
||||||
// Create orchestrator for each individual workflow (their own context)
|
|
||||||
const workflowOrchestrator = new Orchestrator(job.data.workflow)
|
|
||||||
await workflowOrchestrator.execute(job.data.event)
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,67 @@
|
||||||
|
const mustache = require("mustache")
|
||||||
|
const actions = require("./actions")
|
||||||
|
const logic = require("./logic")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The workflow orchestrator is a class responsible for executing workflows.
|
||||||
|
* It handles the context of the workflow and makes sure each step gets the correct
|
||||||
|
* inputs and handles any outputs.
|
||||||
|
*/
|
||||||
|
class Orchestrator {
|
||||||
|
constructor(workflow) {
|
||||||
|
this._context = {}
|
||||||
|
this._workflow = workflow
|
||||||
|
}
|
||||||
|
|
||||||
|
async getStep(type, stepId) {
|
||||||
|
let step = null
|
||||||
|
if (type === "ACTION") {
|
||||||
|
step = await actions.getAction(stepId)
|
||||||
|
} else if (type === "LOGIC") {
|
||||||
|
step = logic.getLogic(stepId)
|
||||||
|
}
|
||||||
|
if (step == null) {
|
||||||
|
throw `Cannot find workflow step by name ${stepId}`
|
||||||
|
}
|
||||||
|
return step
|
||||||
|
}
|
||||||
|
|
||||||
|
async execute(context) {
|
||||||
|
let workflow = this._workflow
|
||||||
|
for (let block of workflow.definition.steps) {
|
||||||
|
let step = await this.getStep(block.type, block.stepId)
|
||||||
|
let args = { ...block.args }
|
||||||
|
// bind the workflow action args to the workflow context, if required
|
||||||
|
for (let arg of Object.keys(args)) {
|
||||||
|
const argValue = args[arg]
|
||||||
|
// We don't want to render mustache templates on non-strings
|
||||||
|
if (typeof argValue !== "string") continue
|
||||||
|
|
||||||
|
args[arg] = mustache.render(argValue, { context: this._context })
|
||||||
|
}
|
||||||
|
const response = await step({
|
||||||
|
args,
|
||||||
|
context,
|
||||||
|
})
|
||||||
|
|
||||||
|
this._context = {
|
||||||
|
...this._context,
|
||||||
|
[block.id]: response,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = async (job, cb = null) => {
|
||||||
|
try {
|
||||||
|
const workflowOrchestrator = new Orchestrator(job.data.workflow)
|
||||||
|
await workflowOrchestrator.execute(job.data.event)
|
||||||
|
if (cb) {
|
||||||
|
cb()
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
if (cb) {
|
||||||
|
cb(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -13,6 +13,9 @@ async function queueRelevantWorkflows(event, eventType) {
|
||||||
|
|
||||||
const workflows = workflowsToTrigger.rows.map(wf => wf.doc)
|
const workflows = workflowsToTrigger.rows.map(wf => wf.doc)
|
||||||
for (let workflow of workflows) {
|
for (let workflow of workflows) {
|
||||||
|
if (!workflow.live) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
workflowQueue.add({ workflow, event })
|
workflowQueue.add({ workflow, event })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1912,7 +1912,7 @@ env-paths@^2.2.0:
|
||||||
version "2.2.0"
|
version "2.2.0"
|
||||||
resolved "https://registry.yarnpkg.com/env-paths/-/env-paths-2.2.0.tgz#cdca557dc009152917d6166e2febe1f039685e43"
|
resolved "https://registry.yarnpkg.com/env-paths/-/env-paths-2.2.0.tgz#cdca557dc009152917d6166e2febe1f039685e43"
|
||||||
|
|
||||||
errno@~0.1.1:
|
errno@~0.1.1, errno@~0.1.7:
|
||||||
version "0.1.7"
|
version "0.1.7"
|
||||||
resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.7.tgz#4684d71779ad39af177e3f007996f7c67c852618"
|
resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.7.tgz#4684d71779ad39af177e3f007996f7c67c852618"
|
||||||
dependencies:
|
dependencies:
|
||||||
|
@ -6238,6 +6238,13 @@ word-wrap@~1.2.3:
|
||||||
version "1.2.3"
|
version "1.2.3"
|
||||||
resolved "https://registry.yarnpkg.com/word-wrap/-/word-wrap-1.2.3.tgz#610636f6b1f703891bd34771ccb17fb93b47079c"
|
resolved "https://registry.yarnpkg.com/word-wrap/-/word-wrap-1.2.3.tgz#610636f6b1f703891bd34771ccb17fb93b47079c"
|
||||||
|
|
||||||
|
worker-farm@^1.7.0:
|
||||||
|
version "1.7.0"
|
||||||
|
resolved "https://registry.yarnpkg.com/worker-farm/-/worker-farm-1.7.0.tgz#26a94c5391bbca926152002f69b84a4bf772e5a8"
|
||||||
|
integrity sha512-rvw3QTZc8lAxyVrqcSGVm5yP/IJ2UcB3U0graE3LCFoZ0Yn2x4EoVSqJKdB/T5M+FLcRPjz4TDacRf3OCfNUzw==
|
||||||
|
dependencies:
|
||||||
|
errno "~0.1.7"
|
||||||
|
|
||||||
wrap-ansi@^5.1.0:
|
wrap-ansi@^5.1.0:
|
||||||
version "5.1.0"
|
version "5.1.0"
|
||||||
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-5.1.0.tgz#1fd1f67235d5b6d0fee781056001bfb694c03b09"
|
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-5.1.0.tgz#1fd1f67235d5b6d0fee781056001bfb694c03b09"
|
||||||
|
|
Loading…
Reference in New Issue