Adding the ability to get back the context.

This commit is contained in:
mike12345567 2021-09-07 19:06:20 +01:00
parent 777e243440
commit d2070f9061
7 changed files with 105 additions and 62 deletions

View File

@ -274,7 +274,6 @@ exports.trigger = async function (ctx) {
...ctx.request.body, ...ctx.request.body,
appId: ctx.appId, appId: ctx.appId,
}) })
ctx.status = 200
ctx.body = { ctx.body = {
message: `Automation ${automation._id} has been triggered.`, message: `Automation ${automation._id} has been triggered.`,
automation, automation,
@ -282,5 +281,17 @@ exports.trigger = async function (ctx) {
} }
exports.test = async function (ctx) { exports.test = async function (ctx) {
ctx.body = {} const db = new CouchDB(ctx.appId)
let automation = await db.get(ctx.params.id)
ctx.body = {
automation,
responses: await triggers.externalTrigger(
automation,
{
...ctx.request.body,
appId: ctx.appId,
},
{ getResponses: true }
),
}
} }

View File

@ -88,7 +88,7 @@ router
"/api/automations/:id/trigger", "/api/automations/:id/trigger",
paramResource("id"), paramResource("id"),
authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE), authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE),
controller.trigger controller.test
) )
.delete( .delete(
"/api/automations/:id/:rev", "/api/automations/:id/:rev",

View File

@ -88,8 +88,8 @@ module.exports = server.listen(env.PORT || 0, async () => {
env._set("PORT", server.address().port) env._set("PORT", server.address().port)
eventEmitter.emitPort(env.PORT) eventEmitter.emitPort(env.PORT)
fileSystem.init() fileSystem.init()
await automations.init()
await redis.init() await redis.init()
await automations.init()
}) })
process.on("uncaughtException", err => { process.on("uncaughtException", err => {

View File

@ -1,49 +1,12 @@
const triggers = require("./triggers") const triggers = require("./triggers")
const env = require("../environment") const { processEvent } = require("./utils")
const workerFarm = require("worker-farm")
const singleThread = require("./thread")
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
let workers = workerFarm(require.resolve("./thread"))
function runWorker(job) {
return new Promise((resolve, reject) => {
workers(job, err => {
if (err) {
reject(err)
} else {
resolve()
}
})
})
}
async function updateQuota(automation) {
const appId = automation.appId
const apiObj = await getAPIKey(appId)
// this will fail, causing automation to escape if limits reached
await update(apiObj.apiKey, Properties.AUTOMATION, 1)
return apiObj.apiKey
}
/** /**
* This module is built purely to kick off the worker farm and manage the inputs/outputs * This module is built purely to kick off the worker farm and manage the inputs/outputs
*/ */
module.exports.init = async function () { exports.init = async function () {
// don't wait this promise, it'll never end
triggers.automationQueue.process(async job => { triggers.automationQueue.process(async job => {
try { await processEvent(job)
if (env.USE_QUOTAS) {
job.data.automation.apiKey = await updateQuota(job.data.automation)
}
if (env.isProd()) {
await runWorker(job)
} else {
await singleThread(job)
}
} catch (err) {
console.error(
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
)
}
}) })
} }

View File

@ -83,25 +83,28 @@ class Orchestrator {
this._context.steps.push(outputs) this._context.steps.push(outputs)
} catch (err) { } catch (err) {
console.error(`Automation error - ${step.stepId} - ${err}`) console.error(`Automation error - ${step.stepId} - ${err}`)
return err
} }
} }
return this._context
} }
} }
// callback is required for worker-farm to state that the worker thread has completed // callback is required for worker-farm to state that the worker thread has completed
module.exports = async (job, cb = null) => { module.exports = (job, cb) => {
try { if (!cb) {
const automationOrchestrator = new Orchestrator( throw "Callback must be defined."
job.data.automation,
job.data.event
)
await automationOrchestrator.execute()
if (cb) {
cb()
}
} catch (err) {
if (cb) {
cb(err)
}
} }
const automationOrchestrator = new Orchestrator(
job.data.automation,
job.data.event
)
automationOrchestrator
.execute()
.then(output => {
cb(null, output)
})
.catch(err => {
cb(err)
})
} }

View File

@ -10,6 +10,8 @@ const { utils } = require("@budibase/auth/redis")
const { JobQueues } = require("../constants") const { JobQueues } = require("../constants")
const { definitions } = require("./triggerInfo") const { definitions } = require("./triggerInfo")
const { isDevAppID } = require("../db/utils") const { isDevAppID } = require("../db/utils")
// need this to call directly, so we can get a response
const { processEvent } = require("./utils")
const { opts } = utils.getRedisOptions() const { opts } = utils.getRedisOptions()
let automationQueue = new Queue(JobQueues.AUTOMATIONS, { redis: opts }) let automationQueue = new Queue(JobQueues.AUTOMATIONS, { redis: opts })
@ -76,7 +78,11 @@ emitter.on("row:delete", async function (event) {
await queueRelevantRowAutomations(event, "row:delete") await queueRelevantRowAutomations(event, "row:delete")
}) })
exports.externalTrigger = async function (automation, params) { exports.externalTrigger = async function (
automation,
params,
{ getResponses } = {}
) {
if (automation.definition != null && automation.definition.trigger != null) { if (automation.definition != null && automation.definition.trigger != null) {
if (automation.definition.trigger.stepId === "APP") { if (automation.definition.trigger.stepId === "APP") {
// values are likely to be submitted as strings, so we shall convert to correct type // values are likely to be submitted as strings, so we shall convert to correct type
@ -88,8 +94,12 @@ exports.externalTrigger = async function (automation, params) {
params.fields = coercedFields params.fields = coercedFields
} }
} }
const data = { automation, event: params }
await automationQueue.add({ automation, event: params }) if (getResponses) {
return processEvent({ data })
} else {
return automationQueue.add(data)
}
} }
exports.getQueues = () => { exports.getQueues = () => {

View File

@ -0,0 +1,56 @@
const env = require("../environment")
const workerFarm = require("worker-farm")
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
const singleThread = require("./thread")
let workers = workerFarm(require.resolve("./thread"))
function runWorker(job) {
return new Promise((resolve, reject) => {
workers(job, (err, output) => {
if (err) {
reject(err)
} else {
resolve(output)
}
})
})
}
function runSingleThread(job) {
return new Promise((resolve, reject) => {
singleThread(job, (err, output) => {
if (err) {
reject(err)
} else {
resolve(output)
}
})
})
}
async function updateQuota(automation) {
const appId = automation.appId
const apiObj = await getAPIKey(appId)
// this will fail, causing automation to escape if limits reached
await update(apiObj.apiKey, Properties.AUTOMATION, 1)
return apiObj.apiKey
}
exports.processEvent = async job => {
try {
if (env.USE_QUOTAS) {
job.data.automation.apiKey = await updateQuota(job.data.automation)
}
if (!env.isProd()) {
return runSingleThread(job)
} else {
return runWorker(job)
}
} catch (err) {
console.error(
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
)
return err
}
}