diff --git a/packages/server/src/api/controllers/automation.js b/packages/server/src/api/controllers/automation.js index 953b510f26..7261c3a116 100644 --- a/packages/server/src/api/controllers/automation.js +++ b/packages/server/src/api/controllers/automation.js @@ -274,7 +274,6 @@ exports.trigger = async function (ctx) { ...ctx.request.body, appId: ctx.appId, }) - ctx.status = 200 ctx.body = { message: `Automation ${automation._id} has been triggered.`, automation, @@ -282,5 +281,17 @@ exports.trigger = async function (ctx) { } exports.test = async function (ctx) { - ctx.body = {} + const db = new CouchDB(ctx.appId) + let automation = await db.get(ctx.params.id) + ctx.body = { + automation, + responses: await triggers.externalTrigger( + automation, + { + ...ctx.request.body, + appId: ctx.appId, + }, + { getResponses: true } + ), + } } diff --git a/packages/server/src/api/routes/automation.js b/packages/server/src/api/routes/automation.js index c7b674b4e4..416d270ea9 100644 --- a/packages/server/src/api/routes/automation.js +++ b/packages/server/src/api/routes/automation.js @@ -88,7 +88,7 @@ router "/api/automations/:id/trigger", paramResource("id"), authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE), - controller.trigger + controller.test ) .delete( "/api/automations/:id/:rev", diff --git a/packages/server/src/app.ts b/packages/server/src/app.ts index a32f241aa3..23e05e5e95 100644 --- a/packages/server/src/app.ts +++ b/packages/server/src/app.ts @@ -88,8 +88,8 @@ module.exports = server.listen(env.PORT || 0, async () => { env._set("PORT", server.address().port) eventEmitter.emitPort(env.PORT) fileSystem.init() - await automations.init() await redis.init() + await automations.init() }) process.on("uncaughtException", err => { diff --git a/packages/server/src/automations/index.js b/packages/server/src/automations/index.js index 0d4b07aff5..13948e9839 100644 --- a/packages/server/src/automations/index.js +++ b/packages/server/src/automations/index.js @@ -1,49 +1,12 @@ const triggers = require("./triggers") -const env = require("../environment") -const workerFarm = require("worker-farm") -const singleThread = require("./thread") -const { getAPIKey, update, Properties } = require("../utilities/usageQuota") - -let workers = workerFarm(require.resolve("./thread")) - -function runWorker(job) { - return new Promise((resolve, reject) => { - workers(job, err => { - if (err) { - reject(err) - } else { - resolve() - } - }) - }) -} - -async function updateQuota(automation) { - const appId = automation.appId - const apiObj = await getAPIKey(appId) - // this will fail, causing automation to escape if limits reached - await update(apiObj.apiKey, Properties.AUTOMATION, 1) - return apiObj.apiKey -} +const { processEvent } = require("./utils") /** * This module is built purely to kick off the worker farm and manage the inputs/outputs */ -module.exports.init = async function () { +exports.init = async function () { + // don't wait this promise, it'll never end triggers.automationQueue.process(async job => { - try { - if (env.USE_QUOTAS) { - job.data.automation.apiKey = await updateQuota(job.data.automation) - } - if (env.isProd()) { - await runWorker(job) - } else { - await singleThread(job) - } - } catch (err) { - console.error( - `${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}` - ) - } + await processEvent(job) }) } diff --git a/packages/server/src/automations/thread.js b/packages/server/src/automations/thread.js index 87d3f98d16..0b8e2a981b 100644 --- a/packages/server/src/automations/thread.js +++ b/packages/server/src/automations/thread.js @@ -83,25 +83,28 @@ class Orchestrator { this._context.steps.push(outputs) } catch (err) { console.error(`Automation error - ${step.stepId} - ${err}`) + return err } } + return this._context } } // callback is required for worker-farm to state that the worker thread has completed -module.exports = async (job, cb = null) => { - try { - const automationOrchestrator = new Orchestrator( - job.data.automation, - job.data.event - ) - await automationOrchestrator.execute() - if (cb) { - cb() - } - } catch (err) { - if (cb) { - cb(err) - } +module.exports = (job, cb) => { + if (!cb) { + throw "Callback must be defined." } + const automationOrchestrator = new Orchestrator( + job.data.automation, + job.data.event + ) + automationOrchestrator + .execute() + .then(output => { + cb(null, output) + }) + .catch(err => { + cb(err) + }) } diff --git a/packages/server/src/automations/triggers.js b/packages/server/src/automations/triggers.js index b76cd4ce2a..a026ffe709 100644 --- a/packages/server/src/automations/triggers.js +++ b/packages/server/src/automations/triggers.js @@ -10,6 +10,8 @@ const { utils } = require("@budibase/auth/redis") const { JobQueues } = require("../constants") const { definitions } = require("./triggerInfo") const { isDevAppID } = require("../db/utils") +// need this to call directly, so we can get a response +const { processEvent } = require("./utils") const { opts } = utils.getRedisOptions() let automationQueue = new Queue(JobQueues.AUTOMATIONS, { redis: opts }) @@ -76,7 +78,11 @@ emitter.on("row:delete", async function (event) { await queueRelevantRowAutomations(event, "row:delete") }) -exports.externalTrigger = async function (automation, params) { +exports.externalTrigger = async function ( + automation, + params, + { getResponses } = {} +) { if (automation.definition != null && automation.definition.trigger != null) { if (automation.definition.trigger.stepId === "APP") { // values are likely to be submitted as strings, so we shall convert to correct type @@ -88,8 +94,12 @@ exports.externalTrigger = async function (automation, params) { params.fields = coercedFields } } - - await automationQueue.add({ automation, event: params }) + const data = { automation, event: params } + if (getResponses) { + return processEvent({ data }) + } else { + return automationQueue.add(data) + } } exports.getQueues = () => { diff --git a/packages/server/src/automations/utils.js b/packages/server/src/automations/utils.js new file mode 100644 index 0000000000..7f5b542fa6 --- /dev/null +++ b/packages/server/src/automations/utils.js @@ -0,0 +1,56 @@ +const env = require("../environment") +const workerFarm = require("worker-farm") +const { getAPIKey, update, Properties } = require("../utilities/usageQuota") +const singleThread = require("./thread") + +let workers = workerFarm(require.resolve("./thread")) + +function runWorker(job) { + return new Promise((resolve, reject) => { + workers(job, (err, output) => { + if (err) { + reject(err) + } else { + resolve(output) + } + }) + }) +} + +function runSingleThread(job) { + return new Promise((resolve, reject) => { + singleThread(job, (err, output) => { + if (err) { + reject(err) + } else { + resolve(output) + } + }) + }) +} + +async function updateQuota(automation) { + const appId = automation.appId + const apiObj = await getAPIKey(appId) + // this will fail, causing automation to escape if limits reached + await update(apiObj.apiKey, Properties.AUTOMATION, 1) + return apiObj.apiKey +} + +exports.processEvent = async job => { + try { + if (env.USE_QUOTAS) { + job.data.automation.apiKey = await updateQuota(job.data.automation) + } + if (!env.isProd()) { + return runSingleThread(job) + } else { + return runWorker(job) + } + } catch (err) { + console.error( + `${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}` + ) + return err + } +}