Updating file structure so that each built in step has its own file containing the definition and the function of it, with the intention of keeping definitions together as they will be handled in the async actions.

This commit is contained in:
mike12345567 2020-09-16 14:00:04 +01:00
parent 7986016183
commit 1610f483b0
12 changed files with 549 additions and 497 deletions

View File

@ -1,7 +1,8 @@
const CouchDB = require("../../../db")
const newid = require("../../../db/newid")
const blockDefinitions = require("./blockDefinitions")
const triggers = require("../../../workflows/triggers")
const CouchDB = require("../../db")
const newid = require("../../db/newid")
const actions = require("../../workflows/actions")
const logic = require("../../workflows/logic")
const triggers = require("../../workflows/triggers")
/*************************
* *
@ -67,22 +68,22 @@ exports.destroy = async function(ctx) {
}
exports.getActionList = async function(ctx) {
ctx.body = blockDefinitions.ACTION
ctx.body = actions.BUILTIN_DEFINITIONS
}
exports.getTriggerList = async function(ctx) {
ctx.body = blockDefinitions.TRIGGER
ctx.body = triggers.BUILTIN_DEFINITIONS
}
exports.getLogicList = async function(ctx) {
ctx.body = blockDefinitions.LOGIC
ctx.body = logic.BUILTIN_DEFINITIONS
}
module.exports.getDefinitionList = async function(ctx) {
ctx.body = {
logic: blockDefinitions.LOGIC,
trigger: blockDefinitions.TRIGGER,
action: blockDefinitions.ACTION,
logic: logic.BUILTIN_DEFINITIONS,
trigger: triggers.BUILTIN_DEFINITIONS,
action: actions.BUILTIN_DEFINITIONS,
}
}

View File

@ -1,330 +0,0 @@
let accessLevels = require("../../../utilities/accessLevels")
let conditions = require("../../../workflows/logic").LogicConditions
const ACTION = {
SAVE_RECORD: {
name: "Save Record",
tagline: "Save a {{inputs.enriched.model.name}} record",
icon: "ri-save-3-fill",
description: "Save a record to your database",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
record: {
type: "object",
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
},
customType: "record",
title: "Table",
default: {},
required: ["modelId"],
},
},
required: ["record"],
},
outputs: {
properties: {
response: {
type: "object",
description: "The response from the table",
},
success: {
type: "boolean",
description: "Whether the action was successful",
},
id: {
type: "string",
description: "The identifier of the new record",
},
revision: {
type: "string",
description: "The revision of the new record",
},
},
required: ["success", "id", "revision"],
},
},
},
DELETE_RECORD: {
description: "Delete a record from your database",
icon: "ri-delete-bin-line",
name: "Delete Record",
tagline: "Delete a {{inputs.enriched.model.name}} record",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
id: {
type: "string",
title: "Record ID",
},
revision: {
type: "string",
title: "Record Revision",
},
},
required: ["modelId", "id", "revision"],
},
outputs: {
properties: {
record: {
type: "object",
customType: "record",
description: "The deleted record",
},
response: {
type: "object",
description: "The response from the table",
},
success: {
type: "boolean",
description: "Whether the action was successful",
},
},
required: ["record", "success"],
},
},
},
CREATE_USER: {
description: "Create a new user",
tagline: "Create user {{inputs.username}}",
icon: "ri-user-add-fill",
name: "Create User",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
username: {
type: "string",
title: "Username",
},
password: {
type: "string",
customType: "password",
title: "Password",
},
accessLevelId: {
type: "string",
title: "Access Level",
default: accessLevels.POWERUSER_LEVEL_ID,
enum: accessLevels.ACCESS_LEVELS,
},
},
required: ["username", "password", "accessLevelId"],
},
outputs: {
properties: {
id: {
type: "string",
description: "The identifier of the new user",
},
revision: {
type: "string",
description: "The revision of the new user",
},
response: {
type: "object",
description: "The response from the user table",
},
success: {
type: "boolean",
description: "Whether the action was successful",
},
},
required: ["id", "revision", "success"],
},
},
},
SEND_EMAIL: {
description: "Send an email.",
tagline: "Send email to {{inputs.to}}",
icon: "ri-mail-open-fill",
name: "Send Email",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
to: {
type: "string",
title: "Send To",
},
from: {
type: "string",
title: "Send From",
},
subject: {
type: "string",
title: "Email Subject",
},
contents: {
type: "string",
title: "Email Contents",
},
},
required: ["to", "from", "subject", "contents"],
},
outputs: {
properties: {
success: {
type: "boolean",
description: "Whether the email was sent",
},
response: {
type: "object",
description:
"A response from the email client, this may be an error",
},
},
required: ["success"],
},
},
},
}
const LOGIC = {
FILTER: {
name: "Filter",
tagline: "{{inputs.filter}} {{inputs.condition}} {{inputs.value}}",
icon: "ri-git-branch-line",
description: "Filter any workflows which do not meet certain conditions",
type: "LOGIC",
inputs: {},
schema: {
inputs: {
properties: {
filter: {
type: "string",
title: "Reference Value",
},
condition: {
type: "string",
title: "Condition",
enum: conditions,
default: "equals",
},
value: {
type: "string",
title: "Comparison Value",
},
},
required: ["filter", "condition", "value"],
},
outputs: {
properties: {
success: {
type: "boolean",
description: "Whether the logic block passed",
},
},
required: ["success"],
},
},
},
DELAY: {
name: "Delay",
icon: "ri-time-fill",
tagline: "Delay for {{inputs.time}} milliseconds",
description: "Delay the workflow until an amount of time has passed",
inputs: {},
schema: {
inputs: {
properties: {
time: {
type: "number",
title: "Delay in milliseconds",
},
},
required: ["time"],
},
},
type: "LOGIC",
},
}
const TRIGGER = {
RECORD_SAVED: {
name: "Record Saved",
event: "record:save",
icon: "ri-save-line",
tagline: "Record is added to {{inputs.enriched.model.name}}",
description: "Fired when a record is saved to your database",
inputs: {},
schema: {
inputs: {
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
},
required: ["modelId"],
},
outputs: {
properties: {
record: {
type: "object",
customType: "record",
description: "The new record that was saved",
},
},
required: ["record"],
},
},
type: "TRIGGER",
},
RECORD_DELETED: {
name: "Record Deleted",
event: "record:delete",
icon: "ri-delete-bin-line",
tagline: "Record is deleted from {{inputs.enriched.model.name}}",
description: "Fired when a record is deleted from your database",
inputs: {},
schema: {
inputs: {
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
},
required: ["modelId"],
},
outputs: {
properties: {
record: {
type: "object",
customType: "record",
description: "The record that was deleted",
},
},
required: ["record"],
},
},
type: "TRIGGER",
},
}
// This contains the definitions for the steps and triggers that make up a workflow, a workflow comprises
// of many steps and a single trigger
module.exports = {
ACTION,
LOGIC,
TRIGGER,
}

View File

@ -1,115 +1,20 @@
const userController = require("../api/controllers/user")
const recordController = require("../api/controllers/record")
const sgMail = require("@sendgrid/mail")
const sendEmail = require("./steps/sendEmail")
const saveRecord = require("./steps/saveRecord")
const deleteRecord = require("./steps/deleteRecord")
const createUser = require("./steps/createUser")
sgMail.setApiKey(process.env.SENDGRID_API_KEY)
const BUILTIN_ACTIONS = {
SEND_EMAIL: sendEmail.run,
SAVE_RECORD: saveRecord.run,
DELETE_RECORD: deleteRecord.run,
CREATE_USER: createUser.run,
}
let BUILTIN_ACTIONS = {
CREATE_USER: async function(inputs) {
const { username, password, accessLevelId } = inputs
const ctx = {
user: {
instanceId: inputs.instanceId,
},
request: {
body: { username, password, accessLevelId },
},
}
try {
await userController.create(ctx)
return {
response: ctx.body,
id: ctx.body._id,
revision: ctx.body._rev,
success: ctx.status === 200,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
},
SAVE_RECORD: async function(inputs) {
const ctx = {
params: {
instanceId: inputs.instanceId,
modelId: inputs.model._id,
},
request: {
body: inputs.record,
},
user: { instanceId: inputs.instanceId },
}
try {
await recordController.save(ctx)
return {
response: ctx.body,
id: ctx.body._id,
revision: ctx.body._rev,
success: ctx.status === 200,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
},
SEND_EMAIL: async function(inputs) {
const msg = {
to: inputs.to,
from: inputs.from,
subject: inputs.subject,
text: inputs.text,
}
try {
await sgMail.send(msg)
return {
success: true,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
},
DELETE_RECORD: async function(inputs) {
const { model, ...record } = inputs.record
// TODO: better logging of when actions are missed due to missing parameters
if (record.recordId == null || record.revId == null) {
return
}
let ctx = {
params: {
modelId: model._id,
recordId: record.recordId,
revId: record.revId,
},
user: { instanceId: inputs.instanceId },
}
try {
await recordController.destroy(ctx)
return {
response: ctx.body,
success: ctx.status === 200,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
},
const BUILTIN_DEFINITIONS = {
SEND_EMAIL: sendEmail.definition,
SAVE_RECORD: saveRecord.definition,
DELETE_RECORD: deleteRecord.definition,
CREATE_USER: createUser.definition,
}
module.exports.getAction = async function(actionName) {
@ -118,3 +23,5 @@ module.exports.getAction = async function(actionName) {
}
// TODO: load async actions here
}
module.exports.BUILTIN_DEFINITIONS = BUILTIN_DEFINITIONS

View File

@ -1,31 +1,20 @@
const wait = ms => new Promise(resolve => setTimeout(resolve, ms))
let filter = require("./steps/filter")
let delay = require("./steps/delay")
let LOGIC = {
DELAY: async function delay(inputs) {
await wait(inputs.time)
},
let BUILTIN_LOGIC = {
DELAY: delay.run,
FILTER: filter.run,
}
FILTER: async function filter(inputs) {
const { field, condition, value } = inputs
switch (condition) {
case "equals":
if (field !== value) return
break
default:
return
}
},
let BUILTIN_DEFINITIONS = {
DELAY: delay.definition,
FILTER: filter.definition,
}
module.exports.getLogic = function(logicName) {
if (LOGIC[logicName] != null) {
return LOGIC[logicName]
if (BUILTIN_LOGIC[logicName] != null) {
return BUILTIN_LOGIC[logicName]
}
}
module.exports.LogicConditions = [
"Equals",
"Not equals",
"Greater than",
"Less than",
]
module.exports.BUILTIN_DEFINITIONS = BUILTIN_DEFINITIONS

View File

@ -0,0 +1,82 @@
const userController = require("../../api/controllers/user")
let accessLevels = require("../../utilities/accessLevels")
module.exports.definition = {
description: "Create a new user",
tagline: "Create user {{inputs.username}}",
icon: "ri-user-add-fill",
name: "Create User",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
username: {
type: "string",
title: "Username",
},
password: {
type: "string",
customType: "password",
title: "Password",
},
accessLevelId: {
type: "string",
title: "Access Level",
default: accessLevels.POWERUSER_LEVEL_ID,
enum: accessLevels.ACCESS_LEVELS,
},
},
required: ["username", "password", "accessLevelId"],
},
outputs: {
properties: {
id: {
type: "string",
description: "The identifier of the new user",
},
revision: {
type: "string",
description: "The revision of the new user",
},
response: {
type: "object",
description: "The response from the user table",
},
success: {
type: "boolean",
description: "Whether the action was successful",
},
},
required: ["id", "revision", "success"],
},
},
}
module.exports.run = async function({ inputs, instanceId }) {
const { username, password, accessLevelId } = inputs
const ctx = {
user: {
instanceId: instanceId,
},
request: {
body: { username, password, accessLevelId },
},
}
try {
await userController.create(ctx)
return {
response: ctx.body,
id: ctx.body._id,
revision: ctx.body._rev,
success: ctx.status === 200,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
}

View File

@ -0,0 +1,25 @@
const wait = ms => new Promise(resolve => setTimeout(resolve, ms))
module.exports.definition = {
name: "Delay",
icon: "ri-time-fill",
tagline: "Delay for {{inputs.time}} milliseconds",
description: "Delay the workflow until an amount of time has passed",
inputs: {},
schema: {
inputs: {
properties: {
time: {
type: "number",
title: "Delay in milliseconds",
},
},
required: ["time"],
},
},
type: "LOGIC",
}
module.exports.run = async function delay({ inputs }) {
await wait(inputs.time)
}

View File

@ -0,0 +1,77 @@
const recordController = require("../../api/controllers/record")
module.exports.definition = {
description: "Delete a record from your database",
icon: "ri-delete-bin-line",
name: "Delete Record",
tagline: "Delete a {{inputs.enriched.model.name}} record",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
id: {
type: "string",
title: "Record ID",
},
revision: {
type: "string",
title: "Record Revision",
},
},
required: ["modelId", "id", "revision"],
},
outputs: {
properties: {
record: {
type: "object",
customType: "record",
description: "The deleted record",
},
response: {
type: "object",
description: "The response from the table",
},
success: {
type: "boolean",
description: "Whether the action was successful",
},
},
required: ["record", "success"],
},
},
}
module.exports.run = async function({ inputs, instanceId }) {
// TODO: better logging of when actions are missed due to missing parameters
if (inputs.id == null || inputs.revision == null) {
return
}
let ctx = {
params: {
modelId: inputs.modelId,
recordId: inputs.id,
revId: inputs.revision,
},
user: { instanceId },
}
try {
await recordController.destroy(ctx)
return {
response: ctx.body,
success: ctx.status === 200,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
}

View File

@ -0,0 +1,71 @@
const LogicConditions = {
EQUALS: "Equals",
NOT_EQUALS: "Not equals",
GREATER_THAN: "Greater than",
LESS_THAN: "Less than",
}
module.exports.definition = {
name: "Filter",
tagline: "{{inputs.filter}} {{inputs.condition}} {{inputs.value}}",
icon: "ri-git-branch-line",
description: "Filter any workflows which do not meet certain conditions",
type: "LOGIC",
inputs: {},
schema: {
inputs: {
properties: {
filter: {
type: "string",
title: "Reference Value",
},
condition: {
type: "string",
title: "Condition",
enum: Object.values(LogicConditions),
default: "equals",
},
value: {
type: "string",
title: "Comparison Value",
},
},
required: ["filter", "condition", "value"],
},
outputs: {
properties: {
success: {
type: "boolean",
description: "Whether the logic block passed",
},
},
required: ["success"],
},
},
}
module.exports.run = async function filter({ inputs }) {
const { field, condition, value } = inputs
let success
if (typeof field !== "object" && typeof value !== "object") {
switch (condition) {
case LogicConditions.EQUALS:
success = field === value
break
case LogicConditions.NOT_EQUALS:
success = field !== value
break
case LogicConditions.GREATER_THAN:
success = field > value
break
case LogicConditions.LESS_THAN:
success = field < value
break
default:
return
}
} else {
success = false
}
return { success }
}

View File

@ -0,0 +1,85 @@
const recordController = require("../../api/controllers/record")
module.exports.definition = {
name: "Save Record",
tagline: "Save a {{inputs.enriched.model.name}} record",
icon: "ri-save-3-fill",
description: "Save a record to your database",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
record: {
type: "object",
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
},
customType: "record",
title: "The record to be written",
default: {},
required: ["modelId"],
},
},
required: ["record"],
},
outputs: {
properties: {
response: {
type: "object",
description: "The response from the table",
},
success: {
type: "boolean",
description: "Whether the action was successful",
},
id: {
type: "string",
description: "The identifier of the new record",
},
revision: {
type: "string",
description: "The revision of the new record",
},
},
required: ["success", "id", "revision"],
},
},
}
module.exports.run = async function({ inputs, instanceId }) {
// TODO: better logging of when actions are missed due to missing parameters
if (inputs.record == null || inputs.record.modelId == null) {
return
}
// have to clean up the record, remove the model from it
const ctx = {
params: {
modelId: inputs.record.modelId,
},
request: {
body: inputs.record,
},
user: { instanceId },
}
try {
await recordController.save(ctx)
return {
response: ctx.body,
id: ctx.body._id,
revision: ctx.body._rev,
success: ctx.status === 200,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
}

View File

@ -0,0 +1,70 @@
const sgMail = require("@sendgrid/mail")
sgMail.setApiKey(process.env.SENDGRID_API_KEY)
module.exports.definition = {
description: "Send an email.",
tagline: "Send email to {{inputs.to}}",
icon: "ri-mail-open-fill",
name: "Send Email",
type: "ACTION",
inputs: {},
schema: {
inputs: {
properties: {
to: {
type: "string",
title: "Send To",
},
from: {
type: "string",
title: "Send From",
},
subject: {
type: "string",
title: "Email Subject",
},
contents: {
type: "string",
title: "Email Contents",
},
},
required: ["to", "from", "subject", "contents"],
},
outputs: {
properties: {
success: {
type: "boolean",
description: "Whether the email was sent",
},
response: {
type: "object",
description: "A response from the email client, this may be an error",
},
},
required: ["success"],
},
},
}
module.exports.run = async function({ inputs }) {
const msg = {
to: inputs.to,
from: inputs.from,
subject: inputs.subject,
text: inputs.text,
}
try {
let response = await sgMail.send(msg)
return {
success: true,
response,
}
} catch (err) {
console.error(err)
return {
success: false,
response: err,
}
}
}

View File

@ -2,18 +2,34 @@ const mustache = require("mustache")
const actions = require("./actions")
const logic = require("./logic")
function recurseMustache(inputs, context) {
for (let key in Object.keys(inputs)) {
let val = inputs[key]
if (typeof val === "string") {
inputs[key] = mustache.render(val, { context })
}
// this covers objects and arrays
else if (typeof val === "object") {
inputs[key] = recurseMustache(inputs[key], context)
}
}
return inputs
}
/**
* The workflow orchestrator is a class responsible for executing workflows.
* It handles the context of the workflow and makes sure each step gets the correct
* inputs and handles any outputs.
*/
class Orchestrator {
constructor(workflow) {
this._context = {}
constructor(workflow, triggerOutput) {
this._instanceId = triggerOutput.instanceId
// block zero is never used as the mustache is zero indexed for customer facing
this._context = { blocks: [{}, triggerOutput] }
this._workflow = workflow
}
async getStep(type, stepId) {
async getStepFunctionality(type, stepId) {
let step = null
if (type === "ACTION") {
step = await actions.getAction(stepId)
@ -26,28 +42,17 @@ class Orchestrator {
return step
}
async execute(context) {
async execute() {
let workflow = this._workflow
for (let block of workflow.definition.steps) {
let step = await this.getStep(block.type, block.stepId)
let args = { ...block.args }
// bind the workflow action args to the workflow context, if required
for (let arg of Object.keys(args)) {
const argValue = args[arg]
// We don't want to render mustache templates on non-strings
if (typeof argValue !== "string") continue
args[arg] = mustache.render(argValue, { context: this._context })
}
const response = await step({
args,
context,
let stepFn = await this.getStepFunctionality(block.type, block.stepId)
block.inputs = recurseMustache(block.inputs, this._context)
// instanceId is always passed
const outputs = await stepFn({
inputs: block.inputs,
instanceId: this._instanceId,
})
this._context = {
...this._context,
[block.id]: response,
}
this._context.blocks.push(outputs)
}
}
}
@ -55,8 +60,11 @@ class Orchestrator {
// callback is required for worker-farm to state that the worker thread has completed
module.exports = async (job, cb = null) => {
try {
const workflowOrchestrator = new Orchestrator(job.data.workflow)
await workflowOrchestrator.execute(job.data.event)
const workflowOrchestrator = new Orchestrator(
job.data.workflow,
job.data.event
)
await workflowOrchestrator.execute()
if (cb) {
cb()
}

View File

@ -4,6 +4,71 @@ const InMemoryQueue = require("./queue/inMemoryQueue")
let workflowQueue = new InMemoryQueue()
const BUILTIN_DEFINITIONS = {
RECORD_SAVED: {
name: "Record Saved",
event: "record:save",
icon: "ri-save-line",
tagline: "Record is added to {{inputs.enriched.model.name}}",
description: "Fired when a record is saved to your database",
inputs: {},
schema: {
inputs: {
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
},
required: ["modelId"],
},
outputs: {
properties: {
record: {
type: "object",
customType: "record",
description: "The new record that was saved",
},
},
required: ["record"],
},
},
type: "TRIGGER",
},
RECORD_DELETED: {
name: "Record Deleted",
event: "record:delete",
icon: "ri-delete-bin-line",
tagline: "Record is deleted from {{inputs.enriched.model.name}}",
description: "Fired when a record is deleted from your database",
inputs: {},
schema: {
inputs: {
properties: {
modelId: {
type: "string",
customType: "model",
title: "Table",
},
},
required: ["modelId"],
},
outputs: {
properties: {
record: {
type: "object",
customType: "record",
description: "The record that was deleted",
},
},
required: ["record"],
},
},
type: "TRIGGER",
},
}
async function queueRelevantWorkflows(event, eventType) {
if (event.instanceId == null) {
throw `No instanceId specified for ${eventType} - check event emitters.`
@ -36,3 +101,5 @@ module.exports.externalTrigger = async function(workflow, params) {
}
module.exports.workflowQueue = workflowQueue
module.exports.BUILTIN_DEFINITIONS = BUILTIN_DEFINITIONS