Removing the concept of worker-farm and fixing issues raised in review.
This commit is contained in:
parent
70bc68fb61
commit
7d1dec28da
|
@ -109,7 +109,6 @@
|
||||||
"to-json-schema": "0.2.5",
|
"to-json-schema": "0.2.5",
|
||||||
"uuid": "3.3.2",
|
"uuid": "3.3.2",
|
||||||
"validate.js": "0.13.1",
|
"validate.js": "0.13.1",
|
||||||
"worker-farm": "1.7.0",
|
|
||||||
"yargs": "13.2.4",
|
"yargs": "13.2.4",
|
||||||
"zlib": "1.0.5"
|
"zlib": "1.0.5"
|
||||||
},
|
},
|
||||||
|
|
|
@ -99,21 +99,10 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// callback is required for worker-farm to state that the worker thread has completed
|
module.exports = async job => {
|
||||||
module.exports = (job, cb) => {
|
|
||||||
if (!cb) {
|
|
||||||
throw "Callback must be defined."
|
|
||||||
}
|
|
||||||
const automationOrchestrator = new Orchestrator(
|
const automationOrchestrator = new Orchestrator(
|
||||||
job.data.automation,
|
job.data.automation,
|
||||||
job.data.event
|
job.data.event
|
||||||
)
|
)
|
||||||
automationOrchestrator
|
return automationOrchestrator.execute()
|
||||||
.execute()
|
|
||||||
.then(output => {
|
|
||||||
cb(null, output)
|
|
||||||
})
|
|
||||||
.catch(err => {
|
|
||||||
cb(err)
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
exports.defintion = {
|
exports.definition = {
|
||||||
name: "Cron Trigger",
|
name: "Cron Trigger",
|
||||||
event: "cron:trigger",
|
event: "cron:trigger",
|
||||||
icon: "ri-timer-line",
|
icon: "ri-timer-line",
|
||||||
|
|
|
@ -11,5 +11,5 @@ exports.definitions = {
|
||||||
ROW_DELETED: rowDeleted.definition,
|
ROW_DELETED: rowDeleted.definition,
|
||||||
WEBHOOK: webhook.definition,
|
WEBHOOK: webhook.definition,
|
||||||
APP: app.definition,
|
APP: app.definition,
|
||||||
CRON: cron.defintion,
|
CRON: cron.definition,
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
const env = require("../environment")
|
const env = require("../environment")
|
||||||
const workerFarm = require("worker-farm")
|
|
||||||
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
|
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
|
||||||
const singleThread = require("./thread")
|
const runner = require("./thread")
|
||||||
const { definitions } = require("./triggerInfo")
|
const { definitions } = require("./triggerInfo")
|
||||||
const webhooks = require("../api/controllers/webhook")
|
const webhooks = require("../api/controllers/webhook")
|
||||||
const CouchDB = require("../db")
|
const CouchDB = require("../db")
|
||||||
|
@ -13,32 +12,6 @@ const { MetadataTypes } = require("../constants")
|
||||||
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
||||||
const CRON_STEP_ID = definitions.CRON.stepId
|
const CRON_STEP_ID = definitions.CRON.stepId
|
||||||
|
|
||||||
let workers = workerFarm(require.resolve("./thread"))
|
|
||||||
|
|
||||||
function runWorker(job) {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
workers(job, (err, output) => {
|
|
||||||
if (err) {
|
|
||||||
reject(err)
|
|
||||||
} else {
|
|
||||||
resolve(output)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
function runSingleThread(job) {
|
|
||||||
return new Promise((resolve, reject) => {
|
|
||||||
singleThread(job, (err, output) => {
|
|
||||||
if (err) {
|
|
||||||
reject(err)
|
|
||||||
} else {
|
|
||||||
resolve(output)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
async function updateQuota(automation) {
|
async function updateQuota(automation) {
|
||||||
const appId = automation.appId
|
const appId = automation.appId
|
||||||
const apiObj = await getAPIKey(appId)
|
const apiObj = await getAPIKey(appId)
|
||||||
|
@ -53,13 +26,7 @@ exports.processEvent = async job => {
|
||||||
job.data.automation.apiKey = await updateQuota(job.data.automation)
|
job.data.automation.apiKey = await updateQuota(job.data.automation)
|
||||||
}
|
}
|
||||||
// need to actually await these so that an error can be captured properly
|
// need to actually await these so that an error can be captured properly
|
||||||
let response
|
return await runner(job)
|
||||||
if (!env.isProd()) {
|
|
||||||
response = await runSingleThread(job)
|
|
||||||
} else {
|
|
||||||
response = await runWorker(job)
|
|
||||||
}
|
|
||||||
return response
|
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
console.error(
|
console.error(
|
||||||
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
|
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
|
||||||
|
|
Loading…
Reference in New Issue