Merge branch 'linked-records' of https://github.com/Budibase/budibase into linked-records
This commit is contained in:
commit
b4dc0708d1
|
@ -28,9 +28,9 @@
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{#if schemaFields.length}
|
{#if schemaFields.length}
|
||||||
<div class="bb-margin-xl block-field">
|
<div class="schema-fields">
|
||||||
{#each schemaFields as [field, schema]}
|
{#each schemaFields as [field, schema]}
|
||||||
<div class="bb-margin-xl capitalise">
|
<div class="capitalise">
|
||||||
{#if schemaHasOptions(schema)}
|
{#if schemaHasOptions(schema)}
|
||||||
<div class="field-label">{field}</div>
|
<div class="field-label">{field}</div>
|
||||||
<Select thin secondary bind:value={value[field]}>
|
<Select thin secondary bind:value={value[field]}>
|
||||||
|
@ -66,4 +66,10 @@
|
||||||
.field-label {
|
.field-label {
|
||||||
text-transform: capitalize;
|
text-transform: capitalize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
.schema-fields {
|
||||||
|
display: grid;
|
||||||
|
grid-gap: var(--spacing-xl);
|
||||||
|
margin-top: var(--spacing-xl);
|
||||||
|
}
|
||||||
</style>
|
</style>
|
||||||
|
|
|
@ -12,12 +12,12 @@ exports.fetch = async function(ctx) {
|
||||||
|
|
||||||
exports.find = async function(ctx) {
|
exports.find = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const db = new CouchDB(ctx.user.instanceId)
|
||||||
const model = await db.get(ctx.params.id)
|
ctx.body = await db.get(ctx.params.id)
|
||||||
ctx.body = model
|
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.save = async function(ctx) {
|
exports.save = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
const modelToSave = {
|
const modelToSave = {
|
||||||
type: "model",
|
type: "model",
|
||||||
_id: newid(),
|
_id: newid(),
|
||||||
|
@ -54,7 +54,7 @@ exports.save = async function(ctx) {
|
||||||
modelToSave._rev = result.rev
|
modelToSave._rev = result.rev
|
||||||
|
|
||||||
const { schema } = ctx.request.body
|
const { schema } = ctx.request.body
|
||||||
for (let key in schema) {
|
for (let key of Object.keys(schema)) {
|
||||||
// model has a linked record
|
// model has a linked record
|
||||||
if (schema[key].type === "link") {
|
if (schema[key].type === "link") {
|
||||||
// create the link field in the other model
|
// create the link field in the other model
|
||||||
|
@ -84,13 +84,18 @@ exports.save = async function(ctx) {
|
||||||
}
|
}
|
||||||
await db.put(designDoc)
|
await db.put(designDoc)
|
||||||
|
|
||||||
|
// syntactic sugar for event emission
|
||||||
|
modelToSave.modelId = modelToSave._id
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitModel(`model:save`, instanceId, modelToSave)
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `Model ${ctx.request.body.name} saved successfully.`
|
ctx.message = `Model ${ctx.request.body.name} saved successfully.`
|
||||||
ctx.body = modelToSave
|
ctx.body = modelToSave
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.destroy = async function(ctx) {
|
exports.destroy = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
|
|
||||||
const modelToDelete = await db.get(ctx.params.modelId)
|
const modelToDelete = await db.get(ctx.params.modelId)
|
||||||
|
|
||||||
|
@ -105,7 +110,7 @@ exports.destroy = async function(ctx) {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Delete linked record fields in dependent models
|
// Delete linked record fields in dependent models
|
||||||
for (let key in modelToDelete.schema) {
|
for (let key of Object.keys(modelToDelete.schema)) {
|
||||||
const { type, modelId } = modelToDelete.schema[key]
|
const { type, modelId } = modelToDelete.schema[key]
|
||||||
if (type === "link") {
|
if (type === "link") {
|
||||||
const linkedModel = await db.get(modelId)
|
const linkedModel = await db.get(modelId)
|
||||||
|
@ -119,6 +124,10 @@ exports.destroy = async function(ctx) {
|
||||||
delete designDoc.views[modelViewId]
|
delete designDoc.views[modelViewId]
|
||||||
await db.put(designDoc)
|
await db.put(designDoc)
|
||||||
|
|
||||||
|
// syntactic sugar for event emission
|
||||||
|
modelToDelete.modelId = modelToDelete._id
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitModel(`model:delete`, instanceId, modelToDelete)
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `Model ${ctx.params.modelId} deleted.`
|
ctx.message = `Model ${ctx.params.modelId} deleted.`
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,21 +2,6 @@ const CouchDB = require("../../db")
|
||||||
const validateJs = require("validate.js")
|
const validateJs = require("validate.js")
|
||||||
const newid = require("../../db/newid")
|
const newid = require("../../db/newid")
|
||||||
|
|
||||||
function emitEvent(eventType, ctx, record) {
|
|
||||||
let event = {
|
|
||||||
record,
|
|
||||||
instanceId: ctx.user.instanceId,
|
|
||||||
}
|
|
||||||
// add syntactic sugar for mustache later
|
|
||||||
if (record._id) {
|
|
||||||
event.id = record._id
|
|
||||||
}
|
|
||||||
if (record._rev) {
|
|
||||||
event.revision = record._rev
|
|
||||||
}
|
|
||||||
ctx.eventEmitter && ctx.eventEmitter.emit(eventType, event)
|
|
||||||
}
|
|
||||||
|
|
||||||
validateJs.extend(validateJs.validators.datetime, {
|
validateJs.extend(validateJs.validators.datetime, {
|
||||||
parse: function(value) {
|
parse: function(value) {
|
||||||
return new Date(value).getTime()
|
return new Date(value).getTime()
|
||||||
|
@ -28,7 +13,8 @@ validateJs.extend(validateJs.validators.datetime, {
|
||||||
})
|
})
|
||||||
|
|
||||||
exports.patch = async function(ctx) {
|
exports.patch = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
const record = await db.get(ctx.params.id)
|
const record = await db.get(ctx.params.id)
|
||||||
const model = await db.get(record.modelId)
|
const model = await db.get(record.modelId)
|
||||||
const patchfields = ctx.request.body
|
const patchfields = ctx.request.body
|
||||||
|
@ -55,13 +41,16 @@ exports.patch = async function(ctx) {
|
||||||
const response = await db.put(record)
|
const response = await db.put(record)
|
||||||
record._rev = response.rev
|
record._rev = response.rev
|
||||||
record.type = "record"
|
record.type = "record"
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitRecord(`record:update`, instanceId, record, model)
|
||||||
ctx.body = record
|
ctx.body = record
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `${model.name} updated successfully.`
|
ctx.message = `${model.name} updated successfully.`
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.save = async function(ctx) {
|
exports.save = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
const record = ctx.request.body
|
const record = ctx.request.body
|
||||||
record.modelId = ctx.params.modelId
|
record.modelId = ctx.params.modelId
|
||||||
|
|
||||||
|
@ -124,7 +113,8 @@ exports.save = async function(ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
emitEvent(`record:save`, ctx, record)
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitRecord(`record:save`, instanceId, record, model)
|
||||||
ctx.body = record
|
ctx.body = record
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `${model.name} created successfully`
|
ctx.message = `${model.name} created successfully`
|
||||||
|
@ -180,7 +170,8 @@ exports.find = async function(ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.destroy = async function(ctx) {
|
exports.destroy = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB()
|
||||||
const record = await db.get(ctx.params.recordId)
|
const record = await db.get(ctx.params.recordId)
|
||||||
if (record.modelId !== ctx.params.modelId) {
|
if (record.modelId !== ctx.params.modelId) {
|
||||||
ctx.throw(400, "Supplied modelId doesn't match the record's modelId")
|
ctx.throw(400, "Supplied modelId doesn't match the record's modelId")
|
||||||
|
@ -188,9 +179,10 @@ exports.destroy = async function(ctx) {
|
||||||
}
|
}
|
||||||
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
// for automations
|
// for automations include the record that was deleted
|
||||||
ctx.record = record
|
ctx.record = record
|
||||||
emitEvent(`record:delete`, ctx, record)
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitRecord(`record:delete`, instanceId, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.validate = async function(ctx) {
|
exports.validate = async function(ctx) {
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
let events = require("events")
|
|
||||||
|
|
||||||
// Bull works with a Job wrapper around all messages that contains a lot more information about
|
|
||||||
// the state of the message, implement this for the sake of maintaining API consistency
|
|
||||||
function newJob(queue, message) {
|
|
||||||
return {
|
|
||||||
timestamp: Date.now(),
|
|
||||||
queue: queue,
|
|
||||||
data: message,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock
|
|
||||||
class InMemoryQueue {
|
|
||||||
// opts is not used by this as there is no real use case when in memory, but is the same API as Bull
|
|
||||||
constructor(name, opts) {
|
|
||||||
this._name = name
|
|
||||||
this._opts = opts
|
|
||||||
this._messages = []
|
|
||||||
this._emitter = new events.EventEmitter()
|
|
||||||
}
|
|
||||||
|
|
||||||
// same API as bull, provide a callback and it will respond when messages are available
|
|
||||||
process(func) {
|
|
||||||
this._emitter.on("message", async () => {
|
|
||||||
if (this._messages.length <= 0) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
let msg = this._messages.shift()
|
|
||||||
let resp = func(msg)
|
|
||||||
if (resp.then != null) {
|
|
||||||
await resp
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// simply puts a message to the queue and emits to the queue for processing
|
|
||||||
add(msg) {
|
|
||||||
this._messages.push(newJob(this._name, msg))
|
|
||||||
this._emitter.emit("message")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = InMemoryQueue
|
|
|
@ -1,10 +1,10 @@
|
||||||
const recordController = require("../../api/controllers/record")
|
const recordController = require("../../api/controllers/record")
|
||||||
|
|
||||||
module.exports.definition = {
|
module.exports.definition = {
|
||||||
description: "Delete a record from your database",
|
description: "Delete a row from your database",
|
||||||
icon: "ri-delete-bin-line",
|
icon: "ri-delete-bin-line",
|
||||||
name: "Delete Record",
|
name: "Delete Row",
|
||||||
tagline: "Delete a {{inputs.enriched.model.name}} record",
|
tagline: "Delete a {{inputs.enriched.model.name}} row",
|
||||||
type: "ACTION",
|
type: "ACTION",
|
||||||
stepId: "DELETE_RECORD",
|
stepId: "DELETE_RECORD",
|
||||||
inputs: {},
|
inputs: {},
|
||||||
|
@ -18,11 +18,11 @@ module.exports.definition = {
|
||||||
},
|
},
|
||||||
id: {
|
id: {
|
||||||
type: "string",
|
type: "string",
|
||||||
title: "Record ID",
|
title: "Row ID",
|
||||||
},
|
},
|
||||||
revision: {
|
revision: {
|
||||||
type: "string",
|
type: "string",
|
||||||
title: "Record Revision",
|
title: "Row Revision",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
required: ["modelId", "id", "revision"],
|
required: ["modelId", "id", "revision"],
|
||||||
|
@ -32,7 +32,7 @@ module.exports.definition = {
|
||||||
record: {
|
record: {
|
||||||
type: "object",
|
type: "object",
|
||||||
customType: "record",
|
customType: "record",
|
||||||
description: "The deleted record",
|
description: "The deleted row",
|
||||||
},
|
},
|
||||||
response: {
|
response: {
|
||||||
type: "object",
|
type: "object",
|
||||||
|
|
|
@ -2,10 +2,10 @@ const recordController = require("../../api/controllers/record")
|
||||||
const automationUtils = require("../automationUtils")
|
const automationUtils = require("../automationUtils")
|
||||||
|
|
||||||
module.exports.definition = {
|
module.exports.definition = {
|
||||||
name: "Save Record",
|
name: "Create Row",
|
||||||
tagline: "Save a {{inputs.enriched.model.name}} record",
|
tagline: "Create a {{inputs.enriched.model.name}} row",
|
||||||
icon: "ri-save-3-fill",
|
icon: "ri-save-3-fill",
|
||||||
description: "Save a record to your database",
|
description: "Add a row to your database",
|
||||||
type: "ACTION",
|
type: "ACTION",
|
||||||
stepId: "SAVE_RECORD",
|
stepId: "SAVE_RECORD",
|
||||||
inputs: {},
|
inputs: {},
|
||||||
|
@ -32,7 +32,7 @@ module.exports.definition = {
|
||||||
record: {
|
record: {
|
||||||
type: "object",
|
type: "object",
|
||||||
customType: "record",
|
customType: "record",
|
||||||
description: "The new record",
|
description: "The new row",
|
||||||
},
|
},
|
||||||
response: {
|
response: {
|
||||||
type: "object",
|
type: "object",
|
||||||
|
@ -44,11 +44,11 @@ module.exports.definition = {
|
||||||
},
|
},
|
||||||
id: {
|
id: {
|
||||||
type: "string",
|
type: "string",
|
||||||
description: "The identifier of the new record",
|
description: "The identifier of the new row",
|
||||||
},
|
},
|
||||||
revision: {
|
revision: {
|
||||||
type: "string",
|
type: "string",
|
||||||
description: "The revision of the new record",
|
description: "The revision of the new row",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
required: ["success", "id", "revision"],
|
required: ["success", "id", "revision"],
|
||||||
|
|
|
@ -2,10 +2,10 @@ const recordController = require("../../api/controllers/record")
|
||||||
const automationUtils = require("../automationUtils")
|
const automationUtils = require("../automationUtils")
|
||||||
|
|
||||||
module.exports.definition = {
|
module.exports.definition = {
|
||||||
name: "Update Record",
|
name: "Update Row",
|
||||||
tagline: "Update a {{inputs.enriched.model.name}} record",
|
tagline: "Update a {{inputs.enriched.model.name}} record",
|
||||||
icon: "ri-refresh-fill",
|
icon: "ri-refresh-fill",
|
||||||
description: "Update a record to your database",
|
description: "Update a row in your database",
|
||||||
type: "ACTION",
|
type: "ACTION",
|
||||||
stepId: "UPDATE_RECORD",
|
stepId: "UPDATE_RECORD",
|
||||||
inputs: {},
|
inputs: {},
|
||||||
|
@ -15,11 +15,11 @@ module.exports.definition = {
|
||||||
record: {
|
record: {
|
||||||
type: "object",
|
type: "object",
|
||||||
customType: "record",
|
customType: "record",
|
||||||
title: "Record",
|
title: "Table",
|
||||||
},
|
},
|
||||||
recordId: {
|
recordId: {
|
||||||
type: "string",
|
type: "string",
|
||||||
title: "Record ID",
|
title: "Row ID",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
required: ["record", "recordId"],
|
required: ["record", "recordId"],
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
const CouchDB = require("../db")
|
const CouchDB = require("../db")
|
||||||
const emitter = require("../events/index")
|
const emitter = require("../events/index")
|
||||||
const InMemoryQueue = require("./queue/inMemoryQueue")
|
const InMemoryQueue = require("../utilities/queue/inMemoryQueue")
|
||||||
|
|
||||||
let automationQueue = new InMemoryQueue()
|
let automationQueue = new InMemoryQueue("automationQueue")
|
||||||
|
|
||||||
const FAKE_STRING = "TEST"
|
const FAKE_STRING = "TEST"
|
||||||
const FAKE_BOOL = false
|
const FAKE_BOOL = false
|
||||||
|
@ -11,11 +11,11 @@ const FAKE_DATETIME = "1970-01-01T00:00:00.000Z"
|
||||||
|
|
||||||
const BUILTIN_DEFINITIONS = {
|
const BUILTIN_DEFINITIONS = {
|
||||||
RECORD_SAVED: {
|
RECORD_SAVED: {
|
||||||
name: "Record Saved",
|
name: "Row Saved",
|
||||||
event: "record:save",
|
event: "record:save",
|
||||||
icon: "ri-save-line",
|
icon: "ri-save-line",
|
||||||
tagline: "Record is added to {{inputs.enriched.model.name}}",
|
tagline: "Row is added to {{inputs.enriched.model.name}}",
|
||||||
description: "Fired when a record is saved to your database",
|
description: "Fired when a row is saved to your database",
|
||||||
stepId: "RECORD_SAVED",
|
stepId: "RECORD_SAVED",
|
||||||
inputs: {},
|
inputs: {},
|
||||||
schema: {
|
schema: {
|
||||||
|
@ -34,15 +34,15 @@ const BUILTIN_DEFINITIONS = {
|
||||||
record: {
|
record: {
|
||||||
type: "object",
|
type: "object",
|
||||||
customType: "record",
|
customType: "record",
|
||||||
description: "The new record that was saved",
|
description: "The new row that was saved",
|
||||||
},
|
},
|
||||||
id: {
|
id: {
|
||||||
type: "string",
|
type: "string",
|
||||||
description: "Record ID - can be used for updating",
|
description: "Row ID - can be used for updating",
|
||||||
},
|
},
|
||||||
revision: {
|
revision: {
|
||||||
type: "string",
|
type: "string",
|
||||||
description: "Revision of record",
|
description: "Revision of row",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
required: ["record", "id"],
|
required: ["record", "id"],
|
||||||
|
@ -51,11 +51,11 @@ const BUILTIN_DEFINITIONS = {
|
||||||
type: "TRIGGER",
|
type: "TRIGGER",
|
||||||
},
|
},
|
||||||
RECORD_DELETED: {
|
RECORD_DELETED: {
|
||||||
name: "Record Deleted",
|
name: "Row Deleted",
|
||||||
event: "record:delete",
|
event: "record:delete",
|
||||||
icon: "ri-delete-bin-line",
|
icon: "ri-delete-bin-line",
|
||||||
tagline: "Record is deleted from {{inputs.enriched.model.name}}",
|
tagline: "Row is deleted from {{inputs.enriched.model.name}}",
|
||||||
description: "Fired when a record is deleted from your database",
|
description: "Fired when a row is deleted from your database",
|
||||||
stepId: "RECORD_DELETED",
|
stepId: "RECORD_DELETED",
|
||||||
inputs: {},
|
inputs: {},
|
||||||
schema: {
|
schema: {
|
||||||
|
@ -74,7 +74,7 @@ const BUILTIN_DEFINITIONS = {
|
||||||
record: {
|
record: {
|
||||||
type: "object",
|
type: "object",
|
||||||
customType: "record",
|
customType: "record",
|
||||||
description: "The record that was deleted",
|
description: "The row that was deleted",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
required: ["record", "id"],
|
required: ["record", "id"],
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
const CouchDB = require("./index")
|
||||||
|
const emitter = require("../events/index")
|
||||||
|
const InMemoryQueue = require("../utilities/queue/inMemoryQueue")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This functionality makes sure that when records with links are created, updated or deleted they are processed
|
||||||
|
* correctly - making sure that no stale links are left around and that all links have been made successfully.
|
||||||
|
*/
|
||||||
|
|
||||||
|
const EventType = {
|
||||||
|
RECORD_SAVE: "record:save",
|
||||||
|
RECORD_UPDATE: "record:update",
|
||||||
|
RECORD_DELETE: "record:delete",
|
||||||
|
MODEL_SAVE: "model:save",
|
||||||
|
MODEL_DELETE: "model:delete",
|
||||||
|
}
|
||||||
|
const linkedRecordQueue = new InMemoryQueue("linkedRecordQueue")
|
||||||
|
|
||||||
|
function createEmitterCallback(eventName) {
|
||||||
|
emitter.on(eventName, function(event) {
|
||||||
|
if (!event || !event.record || !event.record.modelId) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
linkedRecordQueue.add({
|
||||||
|
type: eventName,
|
||||||
|
event,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let typeKey of Object.keys(EventType)) {
|
||||||
|
createEmitterCallback(EventType[typeKey])
|
||||||
|
}
|
||||||
|
|
||||||
|
function doesModelHaveLinkedRecords(model) {
|
||||||
|
for (let key of Object.keys(model.schema)) {
|
||||||
|
const { type } = model.schema[key]
|
||||||
|
if (type === "link") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
linkedRecordQueue.process(async job => {
|
||||||
|
let event = job.data
|
||||||
|
// can't operate without these properties
|
||||||
|
if (event.instanceId == null || event.modelId == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const db = new CouchDB(event.instanceId)
|
||||||
|
let model = event.model == null ? await db.get(event.modelId) : event.model
|
||||||
|
// model doesn't have links, can stop here
|
||||||
|
if (!doesModelHaveLinkedRecords(model)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// no linked records to operate on
|
||||||
|
if (model == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch (event.type) {
|
||||||
|
case EventType.RECORD_SAVE:
|
||||||
|
break
|
||||||
|
case EventType.RECORD_UPDATE:
|
||||||
|
break
|
||||||
|
case EventType.RECORD_DELETE:
|
||||||
|
break
|
||||||
|
case EventType.MODEL_SAVE:
|
||||||
|
break
|
||||||
|
case EventType.MODEL_DELETE:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
})
|
|
@ -6,6 +6,45 @@ const EventEmitter = require("events").EventEmitter
|
||||||
* future.
|
* future.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
const emitter = new EventEmitter()
|
/**
|
||||||
|
* Extending the standard emitter to some syntactic sugar and standardisation to the emitted event.
|
||||||
|
* This is specifically quite important for mustache used in automations.
|
||||||
|
*/
|
||||||
|
class BudibaseEmitter extends EventEmitter {
|
||||||
|
emitRecord(eventName, instanceId, record, model = null) {
|
||||||
|
let event = {
|
||||||
|
record,
|
||||||
|
instanceId,
|
||||||
|
modelId: record.modelId,
|
||||||
|
}
|
||||||
|
if (model) {
|
||||||
|
event.model = model
|
||||||
|
}
|
||||||
|
if (record._id) {
|
||||||
|
event.id = record._id
|
||||||
|
}
|
||||||
|
if (record._rev) {
|
||||||
|
event.revision = record._rev
|
||||||
|
}
|
||||||
|
this.emit(eventName, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
emitModel(eventName, instanceId, model = null) {
|
||||||
|
let event = {
|
||||||
|
model,
|
||||||
|
instanceId,
|
||||||
|
modelId: model._id,
|
||||||
|
}
|
||||||
|
if (model._id) {
|
||||||
|
event.id = model._id
|
||||||
|
}
|
||||||
|
if (model._rev) {
|
||||||
|
event.revision = model._rev
|
||||||
|
}
|
||||||
|
this.emit(eventName, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const emitter = new BudibaseEmitter()
|
||||||
|
|
||||||
module.exports = emitter
|
module.exports = emitter
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
let events = require("events")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||||
|
* the state of the message, this object constructor implements the same schema of Bull jobs
|
||||||
|
* for the sake of maintaining API consistency.
|
||||||
|
* @param {string} queue The name of the queue which the message will be carried on.
|
||||||
|
* @param {object} message The JSON message which will be passed back to the consumer.
|
||||||
|
* @returns {Object} A new job which can now be put onto the queue, this is mostly an
|
||||||
|
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
|
||||||
|
*/
|
||||||
|
function newJob(queue, message) {
|
||||||
|
return {
|
||||||
|
timestamp: Date.now(),
|
||||||
|
queue: queue,
|
||||||
|
data: message,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock.
|
||||||
|
* It is relatively simple, using an event emitter internally to register when messages are available
|
||||||
|
* to the consumers - in can support many inputs and many consumers.
|
||||||
|
*/
|
||||||
|
class InMemoryQueue {
|
||||||
|
/**
|
||||||
|
* The constructor the queue, exactly the same as that of Bulls.
|
||||||
|
* @param {string} name The name of the queue which is being configured.
|
||||||
|
* @param {object|null} opts This is not used by the in memory queue as there is no real use
|
||||||
|
* case when in memory, but is the same API as Bull
|
||||||
|
*/
|
||||||
|
constructor(name, opts = null) {
|
||||||
|
this._name = name
|
||||||
|
this._opts = opts
|
||||||
|
this._messages = []
|
||||||
|
this._emitter = new events.EventEmitter()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same callback API as Bull, each callback passed to this will consume messages as they are
|
||||||
|
* available. Please note this is a queue service, not a notification service, so each
|
||||||
|
* consumer will receive different messages.
|
||||||
|
* @param {function<object>} func The callback function which will return a "Job", the same
|
||||||
|
* as the Bull API, within this job the property "data" contains the JSON message. Please
|
||||||
|
* note this is incredibly limited compared to Bull as in reality the Job would contain
|
||||||
|
* a lot more information about the queue and current status of Bull cluster.
|
||||||
|
*/
|
||||||
|
process(func) {
|
||||||
|
this._emitter.on("message", async () => {
|
||||||
|
if (this._messages.length <= 0) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
let msg = this._messages.shift()
|
||||||
|
let resp = func(msg)
|
||||||
|
if (resp.then != null) {
|
||||||
|
await resp
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// simply puts a message to the queue and emits to the queue for processing
|
||||||
|
/**
|
||||||
|
* Simple function to replicate the add message functionality of Bull, putting
|
||||||
|
* a new message on the queue. This then emits an event which will be used to
|
||||||
|
* return the message to a consumer (if one is attached).
|
||||||
|
* @param {object} msg A message to be transported over the queue, this should be
|
||||||
|
* a JSON message as this is required by Bull.
|
||||||
|
*/
|
||||||
|
add(msg) {
|
||||||
|
if (typeof msg !== "object") {
|
||||||
|
throw "Queue only supports carrying JSON."
|
||||||
|
}
|
||||||
|
this._messages.push(newJob(this._name, msg))
|
||||||
|
this._emitter.emit("message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = InMemoryQueue
|
Loading…
Reference in New Issue