Making worker thread decision based on environment variable (BUDIBASE_ENVIRONMENT) and some general tidy up, as well as fixing delete event emitter
This commit is contained in:
parent
e79d4f11c9
commit
535c4ca5aa
|
@ -4,6 +4,7 @@ WORKDIR /app
|
||||||
|
|
||||||
ENV CLOUD=1
|
ENV CLOUD=1
|
||||||
ENV COUCH_DB_URL=https://couchdb.budi.live:5984
|
ENV COUCH_DB_URL=https://couchdb.budi.live:5984
|
||||||
|
env BUDIBASE_ENVIRONMENT=PRODUCTION
|
||||||
|
|
||||||
# copy files and install dependencies
|
# copy files and install dependencies
|
||||||
COPY . ./
|
COPY . ./
|
||||||
|
|
|
@ -2,6 +2,16 @@ const CouchDB = require("../../db")
|
||||||
const validateJs = require("validate.js")
|
const validateJs = require("validate.js")
|
||||||
const newid = require("../../db/newid")
|
const newid = require("../../db/newid")
|
||||||
|
|
||||||
|
function emitEvent(eventType, ctx, record) {
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emit(eventType, {
|
||||||
|
args: {
|
||||||
|
record,
|
||||||
|
},
|
||||||
|
instanceId: ctx.user.instanceId,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
validateJs.extend(validateJs.validators.datetime, {
|
validateJs.extend(validateJs.validators.datetime, {
|
||||||
parse: function(value) {
|
parse: function(value) {
|
||||||
return new Date(value).getTime()
|
return new Date(value).getTime()
|
||||||
|
@ -66,9 +76,7 @@ exports.save = async function(ctx) {
|
||||||
const doc = row.doc
|
const doc = row.doc
|
||||||
return {
|
return {
|
||||||
...doc,
|
...doc,
|
||||||
[model.name]: doc[model.name]
|
[model.name]: doc[model.name] ? [...doc[model.name], record._id] : [record._id],
|
||||||
? [...doc[model.name], record._id]
|
|
||||||
: [record._id],
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -76,13 +84,7 @@ exports.save = async function(ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.eventEmitter &&
|
emitEvent(`record:save`, ctx, record)
|
||||||
ctx.eventEmitter.emit(`record:save`, {
|
|
||||||
args: {
|
|
||||||
record,
|
|
||||||
},
|
|
||||||
instanceId: ctx.user.instanceId,
|
|
||||||
})
|
|
||||||
ctx.body = record
|
ctx.body = record
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `${model.name} created successfully`
|
ctx.message = `${model.name} created successfully`
|
||||||
|
@ -145,7 +147,7 @@ exports.destroy = async function(ctx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
||||||
ctx.eventEmitter && ctx.eventEmitter.emit(`record:delete`, record)
|
emitEvent(`record:delete`, ctx, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.validate = async function(ctx) {
|
exports.validate = async function(ctx) {
|
||||||
|
@ -165,10 +167,7 @@ async function validate({ instanceId, modelId, record, model }) {
|
||||||
}
|
}
|
||||||
const errors = {}
|
const errors = {}
|
||||||
for (let fieldName in model.schema) {
|
for (let fieldName in model.schema) {
|
||||||
const res = validateJs.single(
|
const res = validateJs.single(record[fieldName], model.schema[fieldName].constraints)
|
||||||
record[fieldName],
|
|
||||||
model.schema[fieldName].constraints
|
|
||||||
)
|
|
||||||
if (res) errors[fieldName] = res
|
if (res) errors[fieldName] = res
|
||||||
}
|
}
|
||||||
return { valid: Object.keys(errors).length === 0, errors }
|
return { valid: Object.keys(errors).length === 0, errors }
|
||||||
|
|
|
@ -91,4 +91,9 @@ exports.trigger = async function(ctx) {
|
||||||
...ctx.request.body,
|
...ctx.request.body,
|
||||||
instanceId: ctx.user.instanceId,
|
instanceId: ctx.user.instanceId,
|
||||||
})
|
})
|
||||||
|
ctx.status = 200
|
||||||
|
ctx.body = {
|
||||||
|
message: `Workflow ${workflow._id} has been triggered.`,
|
||||||
|
workflow,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
const triggers = require("./triggers")
|
const triggers = require("./triggers")
|
||||||
const workerFarm = require("worker-farm")
|
const workerFarm = require("worker-farm")
|
||||||
const CouchDB = require("../db/client")
|
|
||||||
const singleThread = require("./thread")
|
const singleThread = require("./thread")
|
||||||
|
|
||||||
let workers = workerFarm(require.resolve("./thread"))
|
let workers = workerFarm(require.resolve("./thread"))
|
||||||
|
@ -22,7 +21,7 @@ function runWorker(job) {
|
||||||
*/
|
*/
|
||||||
module.exports.init = function() {
|
module.exports.init = function() {
|
||||||
triggers.workflowQueue.process(async job => {
|
triggers.workflowQueue.process(async job => {
|
||||||
if (CouchDB.preferredAdapters != null && CouchDB.preferredAdapters[0] !== "leveldb") {
|
if (process.env.BUDIBASE_ENVIRONMENT === "PRODUCTION") {
|
||||||
await runWorker(job)
|
await runWorker(job)
|
||||||
} else {
|
} else {
|
||||||
await singleThread(job)
|
await singleThread(job)
|
||||||
|
|
|
@ -52,6 +52,7 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// callback is required for worker-farm to state that the worker thread has completed
|
||||||
module.exports = async (job, cb = null) => {
|
module.exports = async (job, cb = null) => {
|
||||||
try {
|
try {
|
||||||
const workflowOrchestrator = new Orchestrator(job.data.workflow)
|
const workflowOrchestrator = new Orchestrator(job.data.workflow)
|
||||||
|
|
|
@ -5,6 +5,9 @@ const InMemoryQueue = require("./queue/inMemoryQueue")
|
||||||
let workflowQueue = new InMemoryQueue()
|
let workflowQueue = new InMemoryQueue()
|
||||||
|
|
||||||
async function queueRelevantWorkflows(event, eventType) {
|
async function queueRelevantWorkflows(event, eventType) {
|
||||||
|
if (event.instanceId == null) {
|
||||||
|
throw `No instanceId specified for ${eventType} - check event emitters.`
|
||||||
|
}
|
||||||
const db = new CouchDB(event.instanceId)
|
const db = new CouchDB(event.instanceId)
|
||||||
const workflowsToTrigger = await db.query("database/by_workflow_trigger", {
|
const workflowsToTrigger = await db.query("database/by_workflow_trigger", {
|
||||||
key: [eventType],
|
key: [eventType],
|
||||||
|
|
Loading…
Reference in New Issue