Adding the ability to get back the context.
This commit is contained in:
parent
1c24e3f520
commit
213bee1e3b
|
@ -274,7 +274,6 @@ exports.trigger = async function (ctx) {
|
|||
...ctx.request.body,
|
||||
appId: ctx.appId,
|
||||
})
|
||||
ctx.status = 200
|
||||
ctx.body = {
|
||||
message: `Automation ${automation._id} has been triggered.`,
|
||||
automation,
|
||||
|
@ -282,5 +281,17 @@ exports.trigger = async function (ctx) {
|
|||
}
|
||||
|
||||
exports.test = async function (ctx) {
|
||||
ctx.body = {}
|
||||
const db = new CouchDB(ctx.appId)
|
||||
let automation = await db.get(ctx.params.id)
|
||||
ctx.body = {
|
||||
automation,
|
||||
responses: await triggers.externalTrigger(
|
||||
automation,
|
||||
{
|
||||
...ctx.request.body,
|
||||
appId: ctx.appId,
|
||||
},
|
||||
{ getResponses: true }
|
||||
),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ router
|
|||
"/api/automations/:id/trigger",
|
||||
paramResource("id"),
|
||||
authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE),
|
||||
controller.trigger
|
||||
controller.test
|
||||
)
|
||||
.delete(
|
||||
"/api/automations/:id/:rev",
|
||||
|
|
|
@ -88,8 +88,8 @@ module.exports = server.listen(env.PORT || 0, async () => {
|
|||
env._set("PORT", server.address().port)
|
||||
eventEmitter.emitPort(env.PORT)
|
||||
fileSystem.init()
|
||||
await automations.init()
|
||||
await redis.init()
|
||||
await automations.init()
|
||||
})
|
||||
|
||||
process.on("uncaughtException", err => {
|
||||
|
|
|
@ -1,49 +1,12 @@
|
|||
const triggers = require("./triggers")
|
||||
const env = require("../environment")
|
||||
const workerFarm = require("worker-farm")
|
||||
const singleThread = require("./thread")
|
||||
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
|
||||
|
||||
let workers = workerFarm(require.resolve("./thread"))
|
||||
|
||||
function runWorker(job) {
|
||||
return new Promise((resolve, reject) => {
|
||||
workers(job, err => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async function updateQuota(automation) {
|
||||
const appId = automation.appId
|
||||
const apiObj = await getAPIKey(appId)
|
||||
// this will fail, causing automation to escape if limits reached
|
||||
await update(apiObj.apiKey, Properties.AUTOMATION, 1)
|
||||
return apiObj.apiKey
|
||||
}
|
||||
const { processEvent } = require("./utils")
|
||||
|
||||
/**
|
||||
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
||||
*/
|
||||
module.exports.init = async function () {
|
||||
exports.init = async function () {
|
||||
// don't wait this promise, it'll never end
|
||||
triggers.automationQueue.process(async job => {
|
||||
try {
|
||||
if (env.USE_QUOTAS) {
|
||||
job.data.automation.apiKey = await updateQuota(job.data.automation)
|
||||
}
|
||||
if (env.isProd()) {
|
||||
await runWorker(job)
|
||||
} else {
|
||||
await singleThread(job)
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
|
||||
)
|
||||
}
|
||||
await processEvent(job)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -83,25 +83,28 @@ class Orchestrator {
|
|||
this._context.steps.push(outputs)
|
||||
} catch (err) {
|
||||
console.error(`Automation error - ${step.stepId} - ${err}`)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return this._context
|
||||
}
|
||||
}
|
||||
|
||||
// callback is required for worker-farm to state that the worker thread has completed
|
||||
module.exports = async (job, cb = null) => {
|
||||
try {
|
||||
const automationOrchestrator = new Orchestrator(
|
||||
job.data.automation,
|
||||
job.data.event
|
||||
)
|
||||
await automationOrchestrator.execute()
|
||||
if (cb) {
|
||||
cb()
|
||||
}
|
||||
} catch (err) {
|
||||
if (cb) {
|
||||
cb(err)
|
||||
}
|
||||
module.exports = (job, cb) => {
|
||||
if (!cb) {
|
||||
throw "Callback must be defined."
|
||||
}
|
||||
const automationOrchestrator = new Orchestrator(
|
||||
job.data.automation,
|
||||
job.data.event
|
||||
)
|
||||
automationOrchestrator
|
||||
.execute()
|
||||
.then(output => {
|
||||
cb(null, output)
|
||||
})
|
||||
.catch(err => {
|
||||
cb(err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ const { utils } = require("@budibase/auth/redis")
|
|||
const { JobQueues } = require("../constants")
|
||||
const { definitions } = require("./triggerInfo")
|
||||
const { isDevAppID } = require("../db/utils")
|
||||
// need this to call directly, so we can get a response
|
||||
const { processEvent } = require("./utils")
|
||||
|
||||
const { opts } = utils.getRedisOptions()
|
||||
let automationQueue = new Queue(JobQueues.AUTOMATIONS, { redis: opts })
|
||||
|
@ -76,7 +78,11 @@ emitter.on("row:delete", async function (event) {
|
|||
await queueRelevantRowAutomations(event, "row:delete")
|
||||
})
|
||||
|
||||
exports.externalTrigger = async function (automation, params) {
|
||||
exports.externalTrigger = async function (
|
||||
automation,
|
||||
params,
|
||||
{ getResponses } = {}
|
||||
) {
|
||||
if (automation.definition != null && automation.definition.trigger != null) {
|
||||
if (automation.definition.trigger.stepId === "APP") {
|
||||
// values are likely to be submitted as strings, so we shall convert to correct type
|
||||
|
@ -88,8 +94,12 @@ exports.externalTrigger = async function (automation, params) {
|
|||
params.fields = coercedFields
|
||||
}
|
||||
}
|
||||
|
||||
await automationQueue.add({ automation, event: params })
|
||||
const data = { automation, event: params }
|
||||
if (getResponses) {
|
||||
return processEvent({ data })
|
||||
} else {
|
||||
return automationQueue.add(data)
|
||||
}
|
||||
}
|
||||
|
||||
exports.getQueues = () => {
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
const env = require("../environment")
|
||||
const workerFarm = require("worker-farm")
|
||||
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
|
||||
const singleThread = require("./thread")
|
||||
|
||||
let workers = workerFarm(require.resolve("./thread"))
|
||||
|
||||
function runWorker(job) {
|
||||
return new Promise((resolve, reject) => {
|
||||
workers(job, (err, output) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(output)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
function runSingleThread(job) {
|
||||
return new Promise((resolve, reject) => {
|
||||
singleThread(job, (err, output) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(output)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
async function updateQuota(automation) {
|
||||
const appId = automation.appId
|
||||
const apiObj = await getAPIKey(appId)
|
||||
// this will fail, causing automation to escape if limits reached
|
||||
await update(apiObj.apiKey, Properties.AUTOMATION, 1)
|
||||
return apiObj.apiKey
|
||||
}
|
||||
|
||||
exports.processEvent = async job => {
|
||||
try {
|
||||
if (env.USE_QUOTAS) {
|
||||
job.data.automation.apiKey = await updateQuota(job.data.automation)
|
||||
}
|
||||
if (!env.isProd()) {
|
||||
return runSingleThread(job)
|
||||
} else {
|
||||
return runWorker(job)
|
||||
}
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
|
||||
)
|
||||
return err
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue