Adding worker controls to both automations and queries.

This commit is contained in:
mike12345567 2021-11-11 12:11:09 +00:00
parent 2e61209291
commit 82dac5c588
9 changed files with 86 additions and 55 deletions

View File

@ -157,6 +157,7 @@
onMount(async () => { onMount(async () => {
if (!query || !query._id) { if (!query || !query._id) {
roleId = Roles.BASIC
return return
} }
try { try {

View File

@ -3,7 +3,9 @@ const CouchDB = require("../../db")
const { generateQueryID, getQueryParams } = require("../../db/utils") const { generateQueryID, getQueryParams } = require("../../db/utils")
const { BaseQueryVerbs } = require("../../constants") const { BaseQueryVerbs } = require("../../constants")
const env = require("../../environment") const env = require("../../environment")
const queryRunner = require("../../utilities/queryRunner") const { Thread, ThreadType } = require("../../threads")
const Runner = new Thread(ThreadType.QUERY, { timeoutMs: 10000 })
// simple function to append "readable" to all read queries // simple function to append "readable" to all read queries
function enrichQueries(input) { function enrichQueries(input) {
@ -104,12 +106,12 @@ exports.preview = async function (ctx) {
const { fields, parameters, queryVerb, transformer } = ctx.request.body const { fields, parameters, queryVerb, transformer } = ctx.request.body
const enrichedQuery = await enrichQueryFields(fields, parameters) const enrichedQuery = await enrichQueryFields(fields, parameters)
const { rows, keys } = await queryRunner( const { rows, keys } = await Runner.run({
datasource, datasource,
queryVerb, queryVerb,
enrichedQuery, query: enrichedQuery,
transformer transformer,
) })
ctx.body = { ctx.body = {
rows, rows,
@ -129,12 +131,12 @@ exports.execute = async function (ctx) {
) )
// call the relevant CRUD method on the integration class // call the relevant CRUD method on the integration class
const { rows } = await queryRunner( const { rows } = await Runner.run({
datasource, datasource,
query.queryVerb, queryVerb: query.queryVerb,
enrichedQuery, query: enrichedQuery,
query.transformer transformer: query.transformer,
) })
ctx.body = rows ctx.body = rows
} }

View File

@ -11,8 +11,7 @@ jest.spyOn(global.console, "error")
require("../../environment") require("../../environment")
const automation = require("../index") const automation = require("../index")
const usageQuota = require("../../utilities/usageQuota") const thread = require("../../threads/automation")
const thread = require("../thread")
const triggers = require("../triggers") const triggers = require("../triggers")
const { basicAutomation } = require("../../tests/utilities/structures") const { basicAutomation } = require("../../tests/utilities/structures")
const { wait } = require("../../utilities") const { wait } = require("../../utilities")
@ -54,7 +53,7 @@ describe("Run through some parts of the automations system", () => {
} }
}) })
await wait(100) await wait(100)
expect(thread).toHaveBeenCalledWith(makePartial({ expect().toHaveBeenCalledWith(makePartial({
data: { data: {
event: { event: {
fields: { fields: {

View File

@ -1,4 +1,4 @@
const runner = require("./thread") const { Thread, ThreadType } = require("../threads")
const { definitions } = require("./triggerInfo") const { definitions } = require("./triggerInfo")
const webhooks = require("../api/controllers/webhook") const webhooks = require("../api/controllers/webhook")
const CouchDB = require("../db") const CouchDB = require("../db")
@ -10,11 +10,12 @@ const { getDeployedAppID } = require("@budibase/auth/db")
const WH_STEP_ID = definitions.WEBHOOK.stepId const WH_STEP_ID = definitions.WEBHOOK.stepId
const CRON_STEP_ID = definitions.CRON.stepId const CRON_STEP_ID = definitions.CRON.stepId
const Runner = new Thread(ThreadType.AUTOMATION)
exports.processEvent = async job => { exports.processEvent = async job => {
try { try {
// need to actually await these so that an error can be captured properly // need to actually await these so that an error can be captured properly
return await runner(job) return await Runner.run(job)
} catch (err) { } catch (err) {
console.error( console.error(
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}` `${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`

View File

@ -1,5 +1,5 @@
const actions = require("./actions") const actions = require("../automations/actions")
const automationUtils = require("./automationUtils") const automationUtils = require("../automations/automationUtils")
const AutomationEmitter = require("../events/AutomationEmitter") const AutomationEmitter = require("../events/AutomationEmitter")
const { processObject } = require("@budibase/string-templates") const { processObject } = require("@budibase/string-templates")
const { DEFAULT_TENANT_ID } = require("@budibase/auth").constants const { DEFAULT_TENANT_ID } = require("@budibase/auth").constants
@ -119,10 +119,17 @@ class Orchestrator {
} }
} }
module.exports = async job => { module.exports = (input, callback) => {
const automationOrchestrator = new Orchestrator( const automationOrchestrator = new Orchestrator(
job.data.automation, input.data.automation,
job.data.event input.data.event
) )
return automationOrchestrator.execute() automationOrchestrator
.execute()
.then(response => {
callback(null, response)
})
.catch(err => {
callback(err)
})
} }

View File

@ -0,0 +1,49 @@
const workerFarm = require("worker-farm")
const ThreadType = {
QUERY: "query",
AUTOMATION: "automation",
}
function typeToFile(type) {
let filename = null
switch (type) {
case ThreadType.QUERY:
filename = "./query"
break
case ThreadType.AUTOMATION:
filename = "./automation"
break
default:
throw "Unknown thread type"
}
return require.resolve(filename)
}
class Thread {
constructor(type, opts = { timeoutMs: null, count: 1 }) {
const workerOpts = {
autoStart: true,
maxConcurrentWorkers: opts.count ? opts.count : 1,
}
if (opts.timeoutMs) {
workerOpts.maxCallTime = opts.timeoutMs
}
this.workers = workerFarm(workerOpts, typeToFile(type))
}
run(data) {
return new Promise((resolve, reject) => {
this.workers(data, (err, response) => {
if (err) {
reject(err)
} else {
resolve(response)
}
})
})
}
}
module.exports.Thread = Thread
module.exports.ThreadType = ThreadType

View File

@ -1,5 +1,5 @@
const ScriptRunner = require("../scriptRunner") const ScriptRunner = require("../utilities/scriptRunner")
const { integrations } = require("../../integrations") const { integrations } = require("../integrations")
function formatResponse(resp) { function formatResponse(resp) {
if (typeof resp === "string") { if (typeof resp === "string") {

View File

@ -1,31 +0,0 @@
const workerFarm = require("worker-farm")
const MAX_WORKER_TIME_MS = 10000
const workers = workerFarm(
{
autoStart: true,
maxConcurrentWorkers: 1,
maxCallTime: MAX_WORKER_TIME_MS,
},
require.resolve("./runner")
)
function runService(data) {
return new Promise((resolve, reject) => {
workers(data, (err, response) => {
if (err) {
reject(err)
} else {
resolve(response)
}
})
})
}
module.exports = async (datasource, queryVerb, query, transformer) => {
return runService({
datasource,
queryVerb,
query,
transformer,
})
}

View File

@ -1,10 +1,13 @@
const fetch = require("node-fetch") const fetch = require("node-fetch")
const { VM, VMScript } = require("vm2") const { VM, VMScript } = require("vm2")
const JS_TIMEOUT_MS = 1000
class ScriptRunner { class ScriptRunner {
constructor(script, context) { constructor(script, context) {
const code = `let fn = () => {\n${script}\n}; results.out = fn();` const code = `let fn = () => {\n${script}\n}; results.out = fn();`
this.vm = new VM() this.vm = new VM({
timeout: JS_TIMEOUT_MS,
})
this.results = { out: "" } this.results = { out: "" }
this.vm.setGlobals(context) this.vm.setGlobals(context)
this.vm.setGlobal("fetch", fetch) this.vm.setGlobal("fetch", fetch)