diff --git a/packages/server/src/api/controllers/workflow/actions/CREATE_USER.js b/packages/server/src/api/controllers/workflow/actions/CREATE_USER.js deleted file mode 100644 index be78275133..0000000000 --- a/packages/server/src/api/controllers/workflow/actions/CREATE_USER.js +++ /dev/null @@ -1,24 +0,0 @@ -const userController = require("../../user") - -module.exports = async function createUser({ args, instanceId }) { - const ctx = { - params: { - instanceId, - }, - request: { - body: args.user, - }, - } - - try { - const response = await userController.create(ctx) - return { - user: response, - } - } catch (err) { - console.error(err) - return { - user: null, - } - } -} diff --git a/packages/server/src/api/controllers/workflow/actions/DELAY.js b/packages/server/src/api/controllers/workflow/actions/DELAY.js deleted file mode 100644 index 97c70db8ae..0000000000 --- a/packages/server/src/api/controllers/workflow/actions/DELAY.js +++ /dev/null @@ -1,5 +0,0 @@ -const wait = ms => new Promise(resolve => setTimeout(resolve, ms)) - -module.exports = async function delay({ args }) { - await wait(args.time) -} diff --git a/packages/server/src/api/controllers/workflow/actions/FILTER.js b/packages/server/src/api/controllers/workflow/actions/FILTER.js deleted file mode 100644 index 94b7d8de81..0000000000 --- a/packages/server/src/api/controllers/workflow/actions/FILTER.js +++ /dev/null @@ -1,10 +0,0 @@ -module.exports = async function filter({ args }) { - const { field, condition, value } = args - switch (condition) { - case "equals": - if (field !== value) return - break - default: - return - } -} diff --git a/packages/server/src/api/controllers/workflow/actions/SAVE_RECORD.js b/packages/server/src/api/controllers/workflow/actions/SAVE_RECORD.js deleted file mode 100644 index 11bbb94f4d..0000000000 --- a/packages/server/src/api/controllers/workflow/actions/SAVE_RECORD.js +++ /dev/null @@ -1,29 +0,0 @@ -const recordController = require("../../record") - -module.exports = async function saveRecord({ args, context }) { - const { model, ...record } = args.record - - const ctx = { - params: { - instanceId: context.instanceId, - modelId: model._id, - }, - request: { - body: record, - }, - user: { instanceId: context.instanceId }, - } - - try { - await recordController.save(ctx) - return { - record: ctx.body, - } - } catch (err) { - console.error(err) - return { - record: null, - error: err.message, - } - } -} diff --git a/packages/server/src/api/controllers/workflow/actions/SEND_EMAIL.js b/packages/server/src/api/controllers/workflow/actions/SEND_EMAIL.js deleted file mode 100644 index 39cb7a8432..0000000000 --- a/packages/server/src/api/controllers/workflow/actions/SEND_EMAIL.js +++ /dev/null @@ -1,26 +0,0 @@ -const sgMail = require("@sendgrid/mail") - -sgMail.setApiKey(process.env.SENDGRID_API_KEY) - -module.exports = async function sendEmail({ args }) { - const msg = { - to: args.to, - from: args.from, - subject: args.subject, - text: args.text, - } - - try { - await sgMail.send(msg) - return { - success: true, - ...args, - } - } catch (err) { - console.error(err) - return { - success: false, - error: err.message, - } - } -} diff --git a/packages/server/src/api/controllers/workflow/blockDefinitions.js b/packages/server/src/api/controllers/workflow/blockDefinitions.js index 5df44e307d..df1a7ffe9c 100644 --- a/packages/server/src/api/controllers/workflow/blockDefinitions.js +++ b/packages/server/src/api/controllers/workflow/blockDefinitions.js @@ -4,61 +4,76 @@ const ACTION = { tagline: "Save a {{record.model.name}} record", icon: "ri-save-3-fill", description: "Save a record to your database.", - environment: "SERVER", params: { record: "record", }, args: { record: {}, }, + type: "ACTION", }, DELETE_RECORD: { description: "Delete a record from your database.", icon: "ri-delete-bin-line", name: "Delete Record", tagline: "Delete a {{record.model.name}} record", - environment: "SERVER", params: { record: "record", }, args: { record: {}, }, + type: "ACTION", }, - // FIND_RECORD: { - // description: "Find a record in your database.", - // tagline: "Find a {{record.model.name}} record", - // icon: "ri-search-line", - // name: "Find Record", - // environment: "SERVER", - // params: { - // record: "string", - // }, - // }, CREATE_USER: { description: "Create a new user.", tagline: "Create user {{username}}", icon: "ri-user-add-fill", name: "Create User", - environment: "SERVER", params: { username: "string", password: "password", accessLevelId: "accessLevel", }, + type: "ACTION", }, SEND_EMAIL: { description: "Send an email.", tagline: "Send email to {{to}}", icon: "ri-mail-open-fill", name: "Send Email", - environment: "SERVER", params: { to: "string", from: "string", subject: "longText", text: "longText", }, + type: "ACTION", + }, +} + +const LOGIC = { + FILTER: { + name: "Filter", + tagline: "{{field}} {{condition}} {{value}}", + icon: "ri-git-branch-line", + description: "Filter any workflows which do not meet certain conditions.", + params: { + filter: "string", + condition: ["equals"], + value: "string", + }, + type: "LOGIC", + }, + DELAY: { + name: "Delay", + icon: "ri-time-fill", + tagline: "Delay for {{time}} milliseconds", + description: "Delay the workflow until an amount of time has passed.", + params: { + time: "number", + }, + type: "LOGIC", }, } @@ -69,10 +84,10 @@ const TRIGGER = { icon: "ri-save-line", tagline: "Record is added to {{model.name}}", description: "Save a record to your database.", - environment: "SERVER", params: { model: "model", }, + type: "TRIGGER", }, RECORD_DELETED: { name: "Record Deleted", @@ -80,40 +95,17 @@ const TRIGGER = { icon: "ri-delete-bin-line", tagline: "Record is deleted from {{model.name}}", description: "Fired when a record is deleted from your database.", - environment: "SERVER", params: { model: "model", }, + type: "TRIGGER", }, } -const LOGIC = { - FILTER: { - name: "Filter", - tagline: "{{field}} {{condition}} {{value}}", - icon: "ri-git-branch-line", - description: "Filter any workflows which do not meet certain conditions.", - environment: "CLIENT", - params: { - filter: "string", - condition: ["equals"], - value: "string", - }, - }, - DELAY: { - name: "Delay", - icon: "ri-time-fill", - tagline: "Delay for {{time}} milliseconds", - description: "Delay the workflow until an amount of time has passed.", - environment: "CLIENT", - params: { - time: "number", - }, - }, -} - +// This contains the definitions for the steps and triggers that make up a workflow, a workflow comprises +// of many steps and a single trigger module.exports = { ACTION, - TRIGGER, LOGIC, + TRIGGER, } diff --git a/packages/server/src/api/controllers/workflow/index.js b/packages/server/src/api/controllers/workflow/index.js index 3f489ceede..f22afe537c 100644 --- a/packages/server/src/api/controllers/workflow/index.js +++ b/packages/server/src/api/controllers/workflow/index.js @@ -1,6 +1,7 @@ const CouchDB = require("../../../db") const newid = require("../../../db/newid") const blockDefinitions = require("./blockDefinitions") +const triggers = require("../../../workflows/triggers") /************************* * * @@ -60,24 +61,11 @@ exports.find = async function(ctx) { ctx.body = await db.get(ctx.params.id) } -exports.fetchActionScript = async function(ctx) { - ctx.body = require(`./actions/${ctx.action}`) -} - exports.destroy = async function(ctx) { const db = new CouchDB(ctx.user.instanceId) ctx.body = await db.remove(ctx.params.id, ctx.params.rev) } -exports.executeAction = async function(ctx) { - const { args, action } = ctx.request.body - const workflowAction = require(`./actions/${action}`) - ctx.body = await workflowAction({ - args, - instanceId: ctx.user.instanceId, - }) -} - exports.getActionList = async function(ctx) { ctx.body = blockDefinitions.ACTION } @@ -87,7 +75,7 @@ exports.getTriggerList = async function(ctx) { } exports.getLogicList = async function(ctx) { - ctx.body = blockDefinitions.ACTION + ctx.body = blockDefinitions.LOGIC } /********************* @@ -97,4 +85,7 @@ exports.getLogicList = async function(ctx) { *********************/ exports.trigger = async function(ctx) { + const db = new CouchDB(ctx.user.instanceId) + let workflow = await db.get(ctx.params.id) + await triggers.externalTrigger(workflow, ctx.request.body) } diff --git a/packages/server/src/api/routes/tests/workflow.spec.js b/packages/server/src/api/routes/tests/workflow.spec.js index f8b38c53ec..5991d06d9d 100644 --- a/packages/server/src/api/routes/tests/workflow.spec.js +++ b/packages/server/src/api/routes/tests/workflow.spec.js @@ -23,7 +23,7 @@ const TEST_WORKFLOW = { ], next: { - actionId: "abc123", + stepId: "abc123", type: "SERVER", conditions: { } diff --git a/packages/server/src/api/routes/workflow.js b/packages/server/src/api/routes/workflow.js index a2191e44d4..de1d49d191 100644 --- a/packages/server/src/api/routes/workflow.js +++ b/packages/server/src/api/routes/workflow.js @@ -11,10 +11,9 @@ router .get("/api/workflows/logic/list", authorized(BUILDER), controller.getLogicList) .get("/api/workflows", authorized(BUILDER), controller.fetch) .get("/api/workflows/:id", authorized(BUILDER), controller.find) - .get("/api/workflows/:id/:action", authorized(BUILDER), controller.fetchActionScript) .put("/api/workflows", authorized(BUILDER), controller.update) .post("/api/workflows", authorized(BUILDER), controller.create) - .post("/api/workflows/trigger", controller.trigger) + .post("/api/workflows/:id/trigger", controller.trigger) .delete("/api/workflows/:id/:rev", authorized(BUILDER), controller.destroy) module.exports = router diff --git a/packages/server/src/app.js b/packages/server/src/app.js index ee6fb3172d..5f720ad865 100644 --- a/packages/server/src/app.js +++ b/packages/server/src/app.js @@ -6,6 +6,7 @@ const http = require("http") const api = require("./api") const env = require("./environment") const eventEmitter = require("./events") +const workflows = require("./workflows/index") const Sentry = require("@sentry/node") const app = new Koa() @@ -49,4 +50,5 @@ process.on("SIGINT", () => process.exit(1)) module.exports = server.listen(env.PORT || 4001, () => { console.log(`Budibase running on ${JSON.stringify(server.address())}`) + workflows.init() }) diff --git a/packages/server/src/events/index.js b/packages/server/src/events/index.js index 0ad4e986bd..f627532e09 100644 --- a/packages/server/src/events/index.js +++ b/packages/server/src/events/index.js @@ -1,33 +1,11 @@ const EventEmitter = require("events").EventEmitter -const CouchDB = require("../db") -const { Orchestrator, serverStrategy } = require("./workflow") + +/** + * keeping event emitter in one central location as it might be used for things other than + * workflows (what it was for originally) - having a central emitter will be useful in the + * future. + */ const emitter = new EventEmitter() -async function executeRelevantWorkflows(event, eventType) { - const db = new CouchDB(event.instanceId) - const workflowsToTrigger = await db.query("database/by_workflow_trigger", { - key: [eventType], - include_docs: true, - }) - - const workflows = workflowsToTrigger.rows.map(wf => wf.doc) - - // Create orchestrator - const workflowOrchestrator = new Orchestrator() - workflowOrchestrator.strategy = serverStrategy - - for (let workflow of workflows) { - workflowOrchestrator.execute(workflow, event) - } -} - -emitter.on("record:save", async function(event) { - await executeRelevantWorkflows(event, "record:save") -}) - -emitter.on("record:delete", async function(event) { - await executeRelevantWorkflows(event, "record:delete") -}) - module.exports = emitter diff --git a/packages/server/src/events/workflow.js b/packages/server/src/events/workflow.js deleted file mode 100644 index d76f8e0e24..0000000000 --- a/packages/server/src/events/workflow.js +++ /dev/null @@ -1,52 +0,0 @@ -const mustache = require("mustache") - -/** - * The workflow orchestrator is a class responsible for executing workflows. - * It relies on the strategy pattern, which allows composable behaviour to be - * passed into its execute() function. This allows custom execution behaviour based - * on where the orchestrator is run. - * - */ -exports.Orchestrator = class Orchestrator { - set strategy(strategy) { - this._strategy = strategy() - } - - async execute(workflow, context) { - if (workflow.live) { - this._strategy.run(workflow.definition, context) - } - } -} - -exports.serverStrategy = () => ({ - context: {}, - bindContextArgs: function(args) { - const mappedArgs = { ...args } - - // bind the workflow action args to the workflow context, if required - for (let arg in args) { - const argValue = args[arg] - // We don't want to render mustache templates on non-strings - if (typeof argValue !== "string") continue - - mappedArgs[arg] = mustache.render(argValue, { context: this.context }) - } - - return mappedArgs - }, - run: async function(workflow, context) { - for (let block of workflow.steps) { - const action = require(`../api/controllers/workflow/actions/${block.actionId}`) - const response = await action({ - args: this.bindContextArgs(block.args), - context, - }) - - this.context = { - ...this.context, - [block.id]: response, - } - } - }, -}) diff --git a/packages/server/src/workflows/actions.js b/packages/server/src/workflows/actions.js new file mode 100644 index 0000000000..b20465b24e --- /dev/null +++ b/packages/server/src/workflows/actions.js @@ -0,0 +1,86 @@ +const userController = require("../api/controllers/user") +const recordController = require("../api/controllers/record") +const sgMail = require("@sendgrid/mail") + +sgMail.setApiKey(process.env.SENDGRID_API_KEY) + +let BUILTIN_ACTIONS = { + CREATE_USER: async function({ args, instanceId }) { + const ctx = { + params: { + instanceId, + }, + request: { + body: args.user, + }, + } + + try { + const response = await userController.create(ctx) + return { + user: response, + } + } catch (err) { + console.error(err) + return { + user: null, + } + } + }, + SAVE_RECORD: async function({ args, context }) { + const { model, ...record } = args.record + + const ctx = { + params: { + instanceId: context.instanceId, + modelId: model._id, + }, + request: { + body: record, + }, + user: { instanceId: context.instanceId }, + } + + try { + await recordController.save(ctx) + return { + record: ctx.body, + } + } catch (err) { + console.error(err) + return { + record: null, + error: err.message, + } + } + }, + SEND_EMAIL: async function({ args }) { + const msg = { + to: args.to, + from: args.from, + subject: args.subject, + text: args.text, + } + + try { + await sgMail.send(msg) + return { + success: true, + ...args, + } + } catch (err) { + console.error(err) + return { + success: false, + error: err.message, + } + } + }, +} + +module.exports.getAction = async function(actionName) { + if (BUILTIN_ACTIONS[actionName] != null) { + return BUILTIN_ACTIONS[actionName] + } + // TODO: load async actions here +} diff --git a/packages/server/src/workflows/index.js b/packages/server/src/workflows/index.js new file mode 100644 index 0000000000..9459f325b2 --- /dev/null +++ b/packages/server/src/workflows/index.js @@ -0,0 +1,67 @@ +const mustache = require("mustache") +const actions = require("./actions") +const logic = require("./logic") +const triggers = require("./triggers") + +/** + * The workflow orchestrator is a class responsible for executing workflows. + * It relies on the strategy pattern, which allows composable behaviour to be + * passed into its execute() function. This allows custom execution behaviour based + * on where the orchestrator is run. + * + */ +class Orchestrator { + constructor(workflow) { + this._context = {} + this._workflow = workflow + } + + async getStep(type, stepId) { + let step = null + if (type === "ACTION") { + step = await actions.getAction(stepId) + } else if (type === "LOGIC") { + step = logic.getLogic(stepId) + } + if (step == null) { + throw `Cannot find workflow step by name ${stepId}` + } + return step + } + + async execute(context) { + let workflow = this._workflow + if (!workflow.live) { + return + } + for (let block of workflow.steps) { + let step = this.getStep(block.type, block.stepId) + let args = { ...block.args } + // bind the workflow action args to the workflow context, if required + for (let arg of Object.keys(args)) { + const argValue = args[arg] + // We don't want to render mustache templates on non-strings + if (typeof argValue !== "string") continue + + args[arg] = mustache.render(argValue, { context: this._context }) + } + const response = await step({ + args, + context, + }) + + this._context = { + ...this._context, + [block.id]: response, + } + } + } +} + +module.exports.init = function() { + triggers.workflowQueue.process(async job => { + // Create orchestrator for each individual workflow (their own context) + const workflowOrchestrator = new Orchestrator(job.data.workflow) + await workflowOrchestrator.execute(job.data.event) + }) +} diff --git a/packages/server/src/workflows/logic.js b/packages/server/src/workflows/logic.js new file mode 100644 index 0000000000..a1d71bc0f2 --- /dev/null +++ b/packages/server/src/workflows/logic.js @@ -0,0 +1,24 @@ +const wait = ms => new Promise(resolve => setTimeout(resolve, ms)) + +let LOGIC = { + DELAY: async function delay({ args }) { + await wait(args.time) + }, + + FILTER: async function filter({ args }) { + const { field, condition, value } = args + switch (condition) { + case "equals": + if (field !== value) return + break + default: + return + } + }, +} + +module.exports.getLogic = function(logicName) { + if (LOGIC[logicName] != null) { + return LOGIC[logicName] + } +} diff --git a/packages/server/src/workflows/queue/inMemoryQueue.js b/packages/server/src/workflows/queue/inMemoryQueue.js new file mode 100644 index 0000000000..927eeb60b6 --- /dev/null +++ b/packages/server/src/workflows/queue/inMemoryQueue.js @@ -0,0 +1,44 @@ +let events = require("events") + +// Bull works with a Job wrapper around all messages that contains a lot more information about +// the state of the message, implement this for the sake of maintaining API consistency +function newJob(queue, message) { + return { + timestamp: Date.now(), + queue: queue, + data: message, + } +} + +// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock +class InMemoryQueue { + // opts is not used by this as there is no real use case when in memory, but is the same API as Bull + constructor(name, opts) { + this._name = name + this._opts = opts + this._messages = [] + this._emitter = new events.EventEmitter() + } + + // same API as bull, provide a callback and it will respond when messages are available + process(func) { + this._emitter.on("message", async () => { + if (this._messages.length <= 0) { + return + } + let msg = this._messages.shift() + let resp = func(msg) + if (resp.then != null) { + await resp + } + }) + } + + // simply puts a message to the queue and emits to the queue for processing + add(msg) { + this._messages.push(newJob(this._name, msg)) + this._emitter.emit("message") + } +} + +module.exports = InMemoryQueue diff --git a/packages/server/src/workflows/triggers.js b/packages/server/src/workflows/triggers.js new file mode 100644 index 0000000000..c72afca301 --- /dev/null +++ b/packages/server/src/workflows/triggers.js @@ -0,0 +1,32 @@ +const CouchDB = require("../db") +const emitter = require("../events/index") +const InMemoryQueue = require("./queue/inMemoryQueue") + +let workflowQueue = new InMemoryQueue() + +async function queueRelevantWorkflows(event, eventType) { + const db = new CouchDB(event.instanceId) + const workflowsToTrigger = await db.query("database/by_workflow_trigger", { + key: [eventType], + include_docs: true, + }) + + const workflows = workflowsToTrigger.rows.map(wf => wf.doc) + for (let workflow of workflows) { + workflowQueue.add({ workflow, event }) + } +} + +emitter.on("record:save", async function(event) { + await queueRelevantWorkflows(event, "record:save") +}) + +emitter.on("record:delete", async function(event) { + await queueRelevantWorkflows(event, "record:delete") +}) + +module.exports.externalTrigger = async function(workflow, params) { + workflowQueue.add({ workflow, event: params }) +} + +module.exports.workflowQueue = workflowQueue