workflow orchestrator

This commit is contained in:
Martin McKeaveney 2020-05-26 21:34:01 +01:00
parent 920b47d6aa
commit b9a6c3ec10
12 changed files with 237 additions and 37 deletions

View File

@ -1,43 +1,38 @@
import { ERROR } from "../state/standardState"
import { loadRecord } from "./loadRecord" import { loadRecord } from "./loadRecord"
import { listRecords } from "./listRecords" import { listRecords } from "./listRecords"
import { authenticate } from "./authenticate" import { authenticate } from "./authenticate"
import { saveRecord } from "./saveRecord" import { saveRecord } from "./saveRecord"
import { triggerWorkflow } from "./workflow";
export const createApi = ({ rootPath = "", setState, getState }) => { export const createApi = ({ rootPath = "", setState, getState }) => {
const apiCall = method => ({ const apiCall = method => async ({ url, body }) => {
url, const response = await fetch(`${rootPath}${url}`, {
body,
notFound,
badRequest,
forbidden,
}) => {
return fetch(`${rootPath}${url}`, {
method: method, method: method,
headers: { headers: {
"Content-Type": "application/json", "Content-Type": "application/json",
}, },
body: body && JSON.stringify(body), body: body && JSON.stringify(body),
credentials: "same-origin", credentials: "same-origin",
}).then(r => {
switch (r.status) {
case 200:
return r.json()
case 404:
return error(notFound || `${url} Not found`)
case 400:
return error(badRequest || `${url} Bad Request`)
case 403:
return error(forbidden || `${url} Forbidden`)
default:
if (
r.status.toString().startsWith("2") ||
r.status.toString().startsWith("3")
)
return r.json()
else return error(`${url} - ${r.statusText}`)
}
}) })
switch (response.status) {
case 200:
return response.json()
case 404:
return error(`${url} Not found`)
case 400:
return error(`${url} Bad Request`)
case 403:
return error(`${url} Forbidden`)
default:
if (response.status >= 200 && response.status < 400) {
return response.json()
}
return error(`${url} - ${response.statusText}`)
}
} }
const post = apiCall("POST") const post = apiCall("POST")
@ -47,10 +42,9 @@ export const createApi = ({ rootPath = "", setState, getState }) => {
const ERROR_MEMBER = "##error" const ERROR_MEMBER = "##error"
const error = message => { const error = message => {
const e = {} const err = { [ERROR_MEMBER]: message }
e[ERROR_MEMBER] = message setState("##error_message", message)
setState(ERROR, message) return err
return e
} }
const isSuccess = obj => !obj || !obj[ERROR_MEMBER] const isSuccess = obj => !obj || !obj[ERROR_MEMBER]
@ -72,5 +66,6 @@ export const createApi = ({ rootPath = "", setState, getState }) => {
listRecords: listRecords(apiOpts), listRecords: listRecords(apiOpts),
authenticate: authenticate(apiOpts), authenticate: authenticate(apiOpts),
saveRecord: saveRecord(apiOpts), saveRecord: saveRecord(apiOpts),
triggerWorkflow: triggerWorkflow(apiOpts)
} }
} }

View File

@ -0,0 +1,16 @@
import Orchestrator, { clientStrategy } from "./orchestrator";
export const triggerWorkflow = api => ({ workflow }) => {
console.log(workflow);
const workflowOrchestrator = new Orchestrator(
api,
"inst_60dd510_700f7dc06735403e81d5af91072d7241"
);
workflowOrchestrator.strategy = clientStrategy
workflowOrchestrator.execute(workflow);
// hit the API and get the workflow data back
}

View File

@ -0,0 +1,103 @@
import get from "lodash/fp/get";
/**
* The workflow orhestrator is a class responsible for executing workflows.
* It relies on the strategy pattern, which allows composable behaviour to be
* passed into its execute() function. This allows custom execution behaviour based
* on where the orchestrator is run.
*
*/
export default class Orchestrator {
constructor(api, instanceId) {
this.api = api
this.instanceId = instanceId
}
set strategy(strategy) {
this._strategy = strategy
}
async execute(workflowId) {
const EXECUTE_WORKFLOW_URL = `/api/${this.instanceId}/workflows/${workflowId}`;
const workflow = await this.api.get({ url: EXECUTE_WORKFLOW_URL })
this._strategy.run({
workflow: workflow.definition,
api: this.api,
instanceId: this.instanceId
});
}
}
// Execute a workflow from a running budibase app
export const clientStrategy = {
context: {},
bindContextArgs: function(args) {
const mappedArgs = { ...args };
console.log("original args", args)
// bind the workflow action args to the workflow context, if required
for (let arg in args) {
const argValue = args[arg];
// Means that it's bound to state or workflow context
if (argValue.startsWith("$")) {
// if value is bound to workflow context.
if (argValue.startsWith("$context")) {
const path = argValue.replace("$context.", "");
// pass in the value from context
mappedArgs[arg] = get(path, this.context);
}
// if the value is bound to state
if (argValue.startsWith("$state")) {
const path = argValue.match("$context.", "");
// pass in the value from context
mappedArgs[arg] = get(path, this.context);
}
}
}
return Object.values(mappedArgs);
},
run: async function({ workflow, api, instanceId }) {
const block = workflow.next;
console.log("Executing workflow block", block);
if (!block) return;
// This code gets run in the browser
if (block.type === "CLIENT") {
if (block.actionId === "SET_STATE") {
// get props from the workflow context if required
api.setState(...this.bindContextArgs(block.args))
// update the context with the data
this.context = {
...this.context,
SET_STATE: block.args
}
}
};
// this workflow block gets executed on the server
if (block.type === "SERVER") {
const EXECUTE_WORKFLOW_URL = `/api/${instanceId}/workflows/action`
const response = await api.post({
url: EXECUTE_WORKFLOW_URL,
body: {
action: block.actionId,
args: block.args
}
});
this.context = {
...this.context,
[block.actionId]: response
}
}
console.log("workflowContext", this.context)
// TODO: clean this up, don't pass all those args
this.run({ workflow: workflow.next, instanceId, api });
}
}

View File

@ -68,4 +68,4 @@ export const getNewRecordToState = (coreApi, setState) => ({
} }
} }
const errorHandler = setState => message => setState(ERROR, message) const errorHandler = setState => message => setState("##error_message", message)

View File

@ -33,7 +33,7 @@ export const eventHandlers = (store, rootPath, routeTo) => {
"List Records": handler(["indexKey", "statePath"], api.listRecords), "List Records": handler(["indexKey", "statePath"], api.listRecords),
"Save Record": handler(["statePath"], api.saveRecord), "Save Record": handler(["statePath"], api.saveRecord),
"Navigate To": handler(["url"], param => routeTo(param && param.url)), "Navigate To": handler(["url"], param => routeTo(param && param.url)),
"Trigger Workflow": handler(["workflow"], api.triggerWorkflow),
Authenticate: handler(["username", "password"], api.authenticate), Authenticate: handler(["username", "password"], api.authenticate),
} }
} }

View File

@ -1 +0,0 @@
export const ERROR = "##error_message"

View File

@ -0,0 +1,43 @@
const TEST_WORKFLOW = {
"_id": "8ebe79daf1c744c7ab204c0b964e309e",
"_rev": "37-94ae573300721c98267cc1d18822c94d",
"name": "Workflow",
"type": "workflow",
"definition": {
"next": {
"type": "CLIENT",
"actionId": "SET_STATE",
"args": {
"path": "myPath",
"value": "foo"
},
"next": {
"type": "SERVER",
"actionId": "SAVE_RECORD",
"args": {
"record": {
"modelId": "f452a2b9c3a94251b9ea7be1e20e3b19",
"name": "workflowRecord"
},
"next": {
"type": "CLIENT",
"actionId": "SET_STATE",
"args": {
"path": "myPath",
"value": "$context.SAVE_RECORD.record.name"
},
}
}
}
}
}
};
describe("Workflow Orchestrator", () => {
it("executes a workflow", () => {
});
it("", () => {
});
});

View File

@ -5,6 +5,7 @@ const newid = require("../../db/newid")
const ajv = new Ajv() const ajv = new Ajv()
exports.save = async function(ctx) { exports.save = async function(ctx) {
console.log("THIS INSTANCE", ctx.params.instanceId);
const db = new CouchDB(ctx.params.instanceId) const db = new CouchDB(ctx.params.instanceId)
const record = ctx.request.body const record = ctx.request.body
@ -43,7 +44,7 @@ exports.save = async function(ctx) {
record.type = "record" record.type = "record"
const response = await db.post(record) const response = await db.post(record)
record._rev = response.rev record._rev = response.rev
ctx.eventPublisher.emit("RECORD_CREATED", record) // ctx.eventPublisher.emit("RECORD_CREATED", record)
ctx.body = record ctx.body = record
ctx.status = 200 ctx.status = 200

View File

@ -0,0 +1,9 @@
export default async function () {
const response = await fetch("www.google.com");
console.log(response);
console.log("CUSTOM ACTION");
return {
message: "CUSTOM_WORKFLOW_SCRIPT",
response
}
}

View File

@ -0,0 +1,20 @@
const recordController = require("../../record");
module.exports = async function saveRecord(args) {
console.log("SAVING this record", args.record);
const ctx = {
params: {
instanceId: "inst_60dd510_700f7dc06735403e81d5af91072d7241",
},
request: {
body: args.record
}
}
await recordController.save(ctx);
return {
record: ctx.body
}
}

View File

@ -1,6 +1,6 @@
const CouchDB = require("../../db") const CouchDB = require("../../../db")
const Ajv = require("ajv") const Ajv = require("ajv")
const newid = require("../../db/newid") const newid = require("../../../db/newid")
const ajv = new Ajv() const ajv = new Ajv()
@ -76,6 +76,18 @@ exports.find = async function(ctx) {
ctx.body = await db.get(ctx.params.id) ctx.body = await db.get(ctx.params.id)
} }
exports.executeAction = async function(ctx) {
const workflowAction = require(`./actions/${ctx.request.body.action}`);
const response = await workflowAction(ctx.request.body.args);
ctx.body = response;
}
exports.fetchActionScript = async function(ctx) {
const workflowAction = require(`./actions/${ctx.action}`);
console.log(workflowAction);
ctx.body = workflowAction;
}
exports.destroy = async function(ctx) { exports.destroy = async function(ctx) {
const db = new CouchDB(ctx.params.instanceId) const db = new CouchDB(ctx.params.instanceId)
ctx.body = await db.remove(ctx.params.id, ctx.params.rev) ctx.body = await db.remove(ctx.params.id, ctx.params.rev)

View File

@ -6,7 +6,9 @@ const router = Router()
router router
.get("/api/:instanceId/workflows", controller.fetch) .get("/api/:instanceId/workflows", controller.fetch)
.get("/api/:instanceId/workflows/:id", controller.find) .get("/api/:instanceId/workflows/:id", controller.find)
.get("/api/:instanceId/workflows/:id/:action", controller.fetchActionScript)
.post("/api/:instanceId/workflows", controller.create) .post("/api/:instanceId/workflows", controller.create)
.post("/api/:instanceId/workflows/action", controller.executeAction)
.put("/api/:instanceId/workflows", controller.update) .put("/api/:instanceId/workflows", controller.update)
.delete("/api/:instanceId/workflows/:id/:rev", controller.destroy) .delete("/api/:instanceId/workflows/:id/:rev", controller.destroy)