Major update to server workflow backend, implementing a very basic queue API in between triggers and orchestration and then moving everything around to keep it generally more concise, creating a directory to handle all workflow related matters as they do not pertain to the API (running of workflows is a completely separate operation).

This commit is contained in:
mike12345567 2020-09-10 15:00:21 +01:00
parent 834a62934c
commit 3a591c13d0
17 changed files with 302 additions and 233 deletions

View File

@ -1,24 +0,0 @@
const userController = require("../../user")
module.exports = async function createUser({ args, instanceId }) {
const ctx = {
params: {
instanceId,
},
request: {
body: args.user,
},
}
try {
const response = await userController.create(ctx)
return {
user: response,
}
} catch (err) {
console.error(err)
return {
user: null,
}
}
}

View File

@ -1,5 +0,0 @@
const wait = ms => new Promise(resolve => setTimeout(resolve, ms))
module.exports = async function delay({ args }) {
await wait(args.time)
}

View File

@ -1,10 +0,0 @@
module.exports = async function filter({ args }) {
const { field, condition, value } = args
switch (condition) {
case "equals":
if (field !== value) return
break
default:
return
}
}

View File

@ -1,29 +0,0 @@
const recordController = require("../../record")
module.exports = async function saveRecord({ args, context }) {
const { model, ...record } = args.record
const ctx = {
params: {
instanceId: context.instanceId,
modelId: model._id,
},
request: {
body: record,
},
user: { instanceId: context.instanceId },
}
try {
await recordController.save(ctx)
return {
record: ctx.body,
}
} catch (err) {
console.error(err)
return {
record: null,
error: err.message,
}
}
}

View File

@ -1,26 +0,0 @@
const sgMail = require("@sendgrid/mail")
sgMail.setApiKey(process.env.SENDGRID_API_KEY)
module.exports = async function sendEmail({ args }) {
const msg = {
to: args.to,
from: args.from,
subject: args.subject,
text: args.text,
}
try {
await sgMail.send(msg)
return {
success: true,
...args,
}
} catch (err) {
console.error(err)
return {
success: false,
error: err.message,
}
}
}

View File

@ -4,61 +4,76 @@ const ACTION = {
tagline: "<b>Save</b> a <b>{{record.model.name}}</b> record", tagline: "<b>Save</b> a <b>{{record.model.name}}</b> record",
icon: "ri-save-3-fill", icon: "ri-save-3-fill",
description: "Save a record to your database.", description: "Save a record to your database.",
environment: "SERVER",
params: { params: {
record: "record", record: "record",
}, },
args: { args: {
record: {}, record: {},
}, },
type: "ACTION",
}, },
DELETE_RECORD: { DELETE_RECORD: {
description: "Delete a record from your database.", description: "Delete a record from your database.",
icon: "ri-delete-bin-line", icon: "ri-delete-bin-line",
name: "Delete Record", name: "Delete Record",
tagline: "<b>Delete</b> a <b>{{record.model.name}}</b> record", tagline: "<b>Delete</b> a <b>{{record.model.name}}</b> record",
environment: "SERVER",
params: { params: {
record: "record", record: "record",
}, },
args: { args: {
record: {}, record: {},
}, },
type: "ACTION",
}, },
// FIND_RECORD: {
// description: "Find a record in your database.",
// tagline: "<b>Find</b> a <b>{{record.model.name}}</b> record",
// icon: "ri-search-line",
// name: "Find Record",
// environment: "SERVER",
// params: {
// record: "string",
// },
// },
CREATE_USER: { CREATE_USER: {
description: "Create a new user.", description: "Create a new user.",
tagline: "Create user <b>{{username}}</b>", tagline: "Create user <b>{{username}}</b>",
icon: "ri-user-add-fill", icon: "ri-user-add-fill",
name: "Create User", name: "Create User",
environment: "SERVER",
params: { params: {
username: "string", username: "string",
password: "password", password: "password",
accessLevelId: "accessLevel", accessLevelId: "accessLevel",
}, },
type: "ACTION",
}, },
SEND_EMAIL: { SEND_EMAIL: {
description: "Send an email.", description: "Send an email.",
tagline: "Send email to <b>{{to}}</b>", tagline: "Send email to <b>{{to}}</b>",
icon: "ri-mail-open-fill", icon: "ri-mail-open-fill",
name: "Send Email", name: "Send Email",
environment: "SERVER",
params: { params: {
to: "string", to: "string",
from: "string", from: "string",
subject: "longText", subject: "longText",
text: "longText", text: "longText",
}, },
type: "ACTION",
},
}
const LOGIC = {
FILTER: {
name: "Filter",
tagline: "{{field}} <b>{{condition}}</b> {{value}}",
icon: "ri-git-branch-line",
description: "Filter any workflows which do not meet certain conditions.",
params: {
filter: "string",
condition: ["equals"],
value: "string",
},
type: "LOGIC",
},
DELAY: {
name: "Delay",
icon: "ri-time-fill",
tagline: "Delay for <b>{{time}}</b> milliseconds",
description: "Delay the workflow until an amount of time has passed.",
params: {
time: "number",
},
type: "LOGIC",
}, },
} }
@ -69,10 +84,10 @@ const TRIGGER = {
icon: "ri-save-line", icon: "ri-save-line",
tagline: "Record is added to <b>{{model.name}}</b>", tagline: "Record is added to <b>{{model.name}}</b>",
description: "Save a record to your database.", description: "Save a record to your database.",
environment: "SERVER",
params: { params: {
model: "model", model: "model",
}, },
type: "TRIGGER",
}, },
RECORD_DELETED: { RECORD_DELETED: {
name: "Record Deleted", name: "Record Deleted",
@ -80,40 +95,17 @@ const TRIGGER = {
icon: "ri-delete-bin-line", icon: "ri-delete-bin-line",
tagline: "Record is deleted from <b>{{model.name}}</b>", tagline: "Record is deleted from <b>{{model.name}}</b>",
description: "Fired when a record is deleted from your database.", description: "Fired when a record is deleted from your database.",
environment: "SERVER",
params: { params: {
model: "model", model: "model",
}, },
type: "TRIGGER",
}, },
} }
const LOGIC = { // This contains the definitions for the steps and triggers that make up a workflow, a workflow comprises
FILTER: { // of many steps and a single trigger
name: "Filter",
tagline: "{{field}} <b>{{condition}}</b> {{value}}",
icon: "ri-git-branch-line",
description: "Filter any workflows which do not meet certain conditions.",
environment: "CLIENT",
params: {
filter: "string",
condition: ["equals"],
value: "string",
},
},
DELAY: {
name: "Delay",
icon: "ri-time-fill",
tagline: "Delay for <b>{{time}}</b> milliseconds",
description: "Delay the workflow until an amount of time has passed.",
environment: "CLIENT",
params: {
time: "number",
},
},
}
module.exports = { module.exports = {
ACTION, ACTION,
TRIGGER,
LOGIC, LOGIC,
TRIGGER,
} }

View File

@ -1,6 +1,7 @@
const CouchDB = require("../../../db") const CouchDB = require("../../../db")
const newid = require("../../../db/newid") const newid = require("../../../db/newid")
const blockDefinitions = require("./blockDefinitions") const blockDefinitions = require("./blockDefinitions")
const triggers = require("../../../workflows/triggers")
/************************* /*************************
* * * *
@ -60,24 +61,11 @@ exports.find = async function(ctx) {
ctx.body = await db.get(ctx.params.id) ctx.body = await db.get(ctx.params.id)
} }
exports.fetchActionScript = async function(ctx) {
ctx.body = require(`./actions/${ctx.action}`)
}
exports.destroy = async function(ctx) { exports.destroy = async function(ctx) {
const db = new CouchDB(ctx.user.instanceId) const db = new CouchDB(ctx.user.instanceId)
ctx.body = await db.remove(ctx.params.id, ctx.params.rev) ctx.body = await db.remove(ctx.params.id, ctx.params.rev)
} }
exports.executeAction = async function(ctx) {
const { args, action } = ctx.request.body
const workflowAction = require(`./actions/${action}`)
ctx.body = await workflowAction({
args,
instanceId: ctx.user.instanceId,
})
}
exports.getActionList = async function(ctx) { exports.getActionList = async function(ctx) {
ctx.body = blockDefinitions.ACTION ctx.body = blockDefinitions.ACTION
} }
@ -87,7 +75,7 @@ exports.getTriggerList = async function(ctx) {
} }
exports.getLogicList = async function(ctx) { exports.getLogicList = async function(ctx) {
ctx.body = blockDefinitions.ACTION ctx.body = blockDefinitions.LOGIC
} }
/********************* /*********************
@ -97,4 +85,7 @@ exports.getLogicList = async function(ctx) {
*********************/ *********************/
exports.trigger = async function(ctx) { exports.trigger = async function(ctx) {
const db = new CouchDB(ctx.user.instanceId)
let workflow = await db.get(ctx.params.id)
await triggers.externalTrigger(workflow, ctx.request.body)
} }

View File

@ -23,7 +23,7 @@ const TEST_WORKFLOW = {
], ],
next: { next: {
actionId: "abc123", stepId: "abc123",
type: "SERVER", type: "SERVER",
conditions: { conditions: {
} }

View File

@ -11,10 +11,9 @@ router
.get("/api/workflows/logic/list", authorized(BUILDER), controller.getLogicList) .get("/api/workflows/logic/list", authorized(BUILDER), controller.getLogicList)
.get("/api/workflows", authorized(BUILDER), controller.fetch) .get("/api/workflows", authorized(BUILDER), controller.fetch)
.get("/api/workflows/:id", authorized(BUILDER), controller.find) .get("/api/workflows/:id", authorized(BUILDER), controller.find)
.get("/api/workflows/:id/:action", authorized(BUILDER), controller.fetchActionScript)
.put("/api/workflows", authorized(BUILDER), controller.update) .put("/api/workflows", authorized(BUILDER), controller.update)
.post("/api/workflows", authorized(BUILDER), controller.create) .post("/api/workflows", authorized(BUILDER), controller.create)
.post("/api/workflows/trigger", controller.trigger) .post("/api/workflows/:id/trigger", controller.trigger)
.delete("/api/workflows/:id/:rev", authorized(BUILDER), controller.destroy) .delete("/api/workflows/:id/:rev", authorized(BUILDER), controller.destroy)
module.exports = router module.exports = router

View File

@ -6,6 +6,7 @@ const http = require("http")
const api = require("./api") const api = require("./api")
const env = require("./environment") const env = require("./environment")
const eventEmitter = require("./events") const eventEmitter = require("./events")
const workflows = require("./workflows/index")
const Sentry = require("@sentry/node") const Sentry = require("@sentry/node")
const app = new Koa() const app = new Koa()
@ -49,4 +50,5 @@ process.on("SIGINT", () => process.exit(1))
module.exports = server.listen(env.PORT || 4001, () => { module.exports = server.listen(env.PORT || 4001, () => {
console.log(`Budibase running on ${JSON.stringify(server.address())}`) console.log(`Budibase running on ${JSON.stringify(server.address())}`)
workflows.init()
}) })

View File

@ -1,33 +1,11 @@
const EventEmitter = require("events").EventEmitter const EventEmitter = require("events").EventEmitter
const CouchDB = require("../db")
const { Orchestrator, serverStrategy } = require("./workflow") /**
* keeping event emitter in one central location as it might be used for things other than
* workflows (what it was for originally) - having a central emitter will be useful in the
* future.
*/
const emitter = new EventEmitter() const emitter = new EventEmitter()
async function executeRelevantWorkflows(event, eventType) {
const db = new CouchDB(event.instanceId)
const workflowsToTrigger = await db.query("database/by_workflow_trigger", {
key: [eventType],
include_docs: true,
})
const workflows = workflowsToTrigger.rows.map(wf => wf.doc)
// Create orchestrator
const workflowOrchestrator = new Orchestrator()
workflowOrchestrator.strategy = serverStrategy
for (let workflow of workflows) {
workflowOrchestrator.execute(workflow, event)
}
}
emitter.on("record:save", async function(event) {
await executeRelevantWorkflows(event, "record:save")
})
emitter.on("record:delete", async function(event) {
await executeRelevantWorkflows(event, "record:delete")
})
module.exports = emitter module.exports = emitter

View File

@ -1,52 +0,0 @@
const mustache = require("mustache")
/**
* The workflow orchestrator is a class responsible for executing workflows.
* It relies on the strategy pattern, which allows composable behaviour to be
* passed into its execute() function. This allows custom execution behaviour based
* on where the orchestrator is run.
*
*/
exports.Orchestrator = class Orchestrator {
set strategy(strategy) {
this._strategy = strategy()
}
async execute(workflow, context) {
if (workflow.live) {
this._strategy.run(workflow.definition, context)
}
}
}
exports.serverStrategy = () => ({
context: {},
bindContextArgs: function(args) {
const mappedArgs = { ...args }
// bind the workflow action args to the workflow context, if required
for (let arg in args) {
const argValue = args[arg]
// We don't want to render mustache templates on non-strings
if (typeof argValue !== "string") continue
mappedArgs[arg] = mustache.render(argValue, { context: this.context })
}
return mappedArgs
},
run: async function(workflow, context) {
for (let block of workflow.steps) {
const action = require(`../api/controllers/workflow/actions/${block.actionId}`)
const response = await action({
args: this.bindContextArgs(block.args),
context,
})
this.context = {
...this.context,
[block.id]: response,
}
}
},
})

View File

@ -0,0 +1,86 @@
const userController = require("../api/controllers/user")
const recordController = require("../api/controllers/record")
const sgMail = require("@sendgrid/mail")
sgMail.setApiKey(process.env.SENDGRID_API_KEY)
let BUILTIN_ACTIONS = {
CREATE_USER: async function({ args, instanceId }) {
const ctx = {
params: {
instanceId,
},
request: {
body: args.user,
},
}
try {
const response = await userController.create(ctx)
return {
user: response,
}
} catch (err) {
console.error(err)
return {
user: null,
}
}
},
SAVE_RECORD: async function({ args, context }) {
const { model, ...record } = args.record
const ctx = {
params: {
instanceId: context.instanceId,
modelId: model._id,
},
request: {
body: record,
},
user: { instanceId: context.instanceId },
}
try {
await recordController.save(ctx)
return {
record: ctx.body,
}
} catch (err) {
console.error(err)
return {
record: null,
error: err.message,
}
}
},
SEND_EMAIL: async function({ args }) {
const msg = {
to: args.to,
from: args.from,
subject: args.subject,
text: args.text,
}
try {
await sgMail.send(msg)
return {
success: true,
...args,
}
} catch (err) {
console.error(err)
return {
success: false,
error: err.message,
}
}
},
}
module.exports.getAction = async function(actionName) {
if (BUILTIN_ACTIONS[actionName] != null) {
return BUILTIN_ACTIONS[actionName]
}
// TODO: load async actions here
}

View File

@ -0,0 +1,67 @@
const mustache = require("mustache")
const actions = require("./actions")
const logic = require("./logic")
const triggers = require("./triggers")
/**
* The workflow orchestrator is a class responsible for executing workflows.
* It relies on the strategy pattern, which allows composable behaviour to be
* passed into its execute() function. This allows custom execution behaviour based
* on where the orchestrator is run.
*
*/
class Orchestrator {
constructor(workflow) {
this._context = {}
this._workflow = workflow
}
async getStep(type, stepId) {
let step = null
if (type === "ACTION") {
step = await actions.getAction(stepId)
} else if (type === "LOGIC") {
step = logic.getLogic(stepId)
}
if (step == null) {
throw `Cannot find workflow step by name ${stepId}`
}
return step
}
async execute(context) {
let workflow = this._workflow
if (!workflow.live) {
return
}
for (let block of workflow.steps) {
let step = this.getStep(block.type, block.stepId)
let args = { ...block.args }
// bind the workflow action args to the workflow context, if required
for (let arg of Object.keys(args)) {
const argValue = args[arg]
// We don't want to render mustache templates on non-strings
if (typeof argValue !== "string") continue
args[arg] = mustache.render(argValue, { context: this._context })
}
const response = await step({
args,
context,
})
this._context = {
...this._context,
[block.id]: response,
}
}
}
}
module.exports.init = function() {
triggers.workflowQueue.process(async job => {
// Create orchestrator for each individual workflow (their own context)
const workflowOrchestrator = new Orchestrator(job.data.workflow)
await workflowOrchestrator.execute(job.data.event)
})
}

View File

@ -0,0 +1,24 @@
const wait = ms => new Promise(resolve => setTimeout(resolve, ms))
let LOGIC = {
DELAY: async function delay({ args }) {
await wait(args.time)
},
FILTER: async function filter({ args }) {
const { field, condition, value } = args
switch (condition) {
case "equals":
if (field !== value) return
break
default:
return
}
},
}
module.exports.getLogic = function(logicName) {
if (LOGIC[logicName] != null) {
return LOGIC[logicName]
}
}

View File

@ -0,0 +1,44 @@
let events = require("events")
// Bull works with a Job wrapper around all messages that contains a lot more information about
// the state of the message, implement this for the sake of maintaining API consistency
function newJob(queue, message) {
return {
timestamp: Date.now(),
queue: queue,
data: message,
}
}
// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock
class InMemoryQueue {
// opts is not used by this as there is no real use case when in memory, but is the same API as Bull
constructor(name, opts) {
this._name = name
this._opts = opts
this._messages = []
this._emitter = new events.EventEmitter()
}
// same API as bull, provide a callback and it will respond when messages are available
process(func) {
this._emitter.on("message", async () => {
if (this._messages.length <= 0) {
return
}
let msg = this._messages.shift()
let resp = func(msg)
if (resp.then != null) {
await resp
}
})
}
// simply puts a message to the queue and emits to the queue for processing
add(msg) {
this._messages.push(newJob(this._name, msg))
this._emitter.emit("message")
}
}
module.exports = InMemoryQueue

View File

@ -0,0 +1,32 @@
const CouchDB = require("../db")
const emitter = require("../events/index")
const InMemoryQueue = require("./queue/inMemoryQueue")
let workflowQueue = new InMemoryQueue()
async function queueRelevantWorkflows(event, eventType) {
const db = new CouchDB(event.instanceId)
const workflowsToTrigger = await db.query("database/by_workflow_trigger", {
key: [eventType],
include_docs: true,
})
const workflows = workflowsToTrigger.rows.map(wf => wf.doc)
for (let workflow of workflows) {
workflowQueue.add({ workflow, event })
}
}
emitter.on("record:save", async function(event) {
await queueRelevantWorkflows(event, "record:save")
})
emitter.on("record:delete", async function(event) {
await queueRelevantWorkflows(event, "record:delete")
})
module.exports.externalTrigger = async function(workflow, params) {
workflowQueue.add({ workflow, event: params })
}
module.exports.workflowQueue = workflowQueue