Some groundwork for the linked records, building up a much more in-depth emitter for models and records to drive the record cleanup.
This commit is contained in:
parent
ebf256f09f
commit
7e3715d88a
|
@ -12,12 +12,12 @@ exports.fetch = async function(ctx) {
|
|||
|
||||
exports.find = async function(ctx) {
|
||||
const db = new CouchDB(ctx.user.instanceId)
|
||||
const model = await db.get(ctx.params.id)
|
||||
ctx.body = model
|
||||
ctx.body = await db.get(ctx.params.id)
|
||||
}
|
||||
|
||||
exports.save = async function(ctx) {
|
||||
const db = new CouchDB(ctx.user.instanceId)
|
||||
const instanceId = ctx.user.instanceId
|
||||
const db = new CouchDB(instanceId)
|
||||
const modelToSave = {
|
||||
type: "model",
|
||||
_id: newid(),
|
||||
|
@ -54,7 +54,7 @@ exports.save = async function(ctx) {
|
|||
modelToSave._rev = result.rev
|
||||
|
||||
const { schema } = ctx.request.body
|
||||
for (let key in schema) {
|
||||
for (let key of Object.keys(schema)) {
|
||||
// model has a linked record
|
||||
if (schema[key].type === "link") {
|
||||
// create the link field in the other model
|
||||
|
@ -84,13 +84,18 @@ exports.save = async function(ctx) {
|
|||
}
|
||||
await db.put(designDoc)
|
||||
|
||||
// syntactic sugar for event emission
|
||||
modelToSave.modelId = modelToSave._id
|
||||
ctx.eventEmitter &&
|
||||
ctx.eventEmitter.emitModel(`model:save`, instanceId, modelToSave)
|
||||
ctx.status = 200
|
||||
ctx.message = `Model ${ctx.request.body.name} saved successfully.`
|
||||
ctx.body = modelToSave
|
||||
}
|
||||
|
||||
exports.destroy = async function(ctx) {
|
||||
const db = new CouchDB(ctx.user.instanceId)
|
||||
const instanceId = ctx.user.instanceId
|
||||
const db = new CouchDB(instanceId)
|
||||
|
||||
const modelToDelete = await db.get(ctx.params.modelId)
|
||||
|
||||
|
@ -105,7 +110,7 @@ exports.destroy = async function(ctx) {
|
|||
)
|
||||
|
||||
// Delete linked record fields in dependent models
|
||||
for (let key in modelToDelete.schema) {
|
||||
for (let key of Object.keys(modelToDelete.schema)) {
|
||||
const { type, modelId } = modelToDelete.schema[key]
|
||||
if (type === "link") {
|
||||
const linkedModel = await db.get(modelId)
|
||||
|
@ -119,6 +124,10 @@ exports.destroy = async function(ctx) {
|
|||
delete designDoc.views[modelViewId]
|
||||
await db.put(designDoc)
|
||||
|
||||
// syntactic sugar for event emission
|
||||
modelToDelete.modelId = modelToDelete._id
|
||||
ctx.eventEmitter &&
|
||||
ctx.eventEmitter.emitModel(`model:delete`, instanceId, modelToDelete)
|
||||
ctx.status = 200
|
||||
ctx.message = `Model ${ctx.params.modelId} deleted.`
|
||||
}
|
||||
|
|
|
@ -2,21 +2,6 @@ const CouchDB = require("../../db")
|
|||
const validateJs = require("validate.js")
|
||||
const newid = require("../../db/newid")
|
||||
|
||||
function emitEvent(eventType, ctx, record) {
|
||||
let event = {
|
||||
record,
|
||||
instanceId: ctx.user.instanceId,
|
||||
}
|
||||
// add syntactic sugar for mustache later
|
||||
if (record._id) {
|
||||
event.id = record._id
|
||||
}
|
||||
if (record._rev) {
|
||||
event.revision = record._rev
|
||||
}
|
||||
ctx.eventEmitter && ctx.eventEmitter.emit(eventType, event)
|
||||
}
|
||||
|
||||
validateJs.extend(validateJs.validators.datetime, {
|
||||
parse: function(value) {
|
||||
return new Date(value).getTime()
|
||||
|
@ -28,7 +13,8 @@ validateJs.extend(validateJs.validators.datetime, {
|
|||
})
|
||||
|
||||
exports.patch = async function(ctx) {
|
||||
const db = new CouchDB(ctx.user.instanceId)
|
||||
const instanceId = ctx.user.instanceId
|
||||
const db = new CouchDB(instanceId)
|
||||
const record = await db.get(ctx.params.id)
|
||||
const model = await db.get(record.modelId)
|
||||
const patchfields = ctx.request.body
|
||||
|
@ -55,13 +41,16 @@ exports.patch = async function(ctx) {
|
|||
const response = await db.put(record)
|
||||
record._rev = response.rev
|
||||
record.type = "record"
|
||||
ctx.eventEmitter &&
|
||||
ctx.eventEmitter.emitRecord(`record:update`, instanceId, record, model)
|
||||
ctx.body = record
|
||||
ctx.status = 200
|
||||
ctx.message = `${model.name} updated successfully.`
|
||||
}
|
||||
|
||||
exports.save = async function(ctx) {
|
||||
const db = new CouchDB(ctx.user.instanceId)
|
||||
const instanceId = ctx.user.instanceId
|
||||
const db = new CouchDB(instanceId)
|
||||
const record = ctx.request.body
|
||||
record.modelId = ctx.params.modelId
|
||||
|
||||
|
@ -124,7 +113,8 @@ exports.save = async function(ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
emitEvent(`record:save`, ctx, record)
|
||||
ctx.eventEmitter &&
|
||||
ctx.eventEmitter.emitRecord(`record:save`, instanceId, record, model)
|
||||
ctx.body = record
|
||||
ctx.status = 200
|
||||
ctx.message = `${model.name} created successfully`
|
||||
|
@ -180,7 +170,8 @@ exports.find = async function(ctx) {
|
|||
}
|
||||
|
||||
exports.destroy = async function(ctx) {
|
||||
const db = new CouchDB(ctx.user.instanceId)
|
||||
const instanceId = ctx.user.instanceId
|
||||
const db = new CouchDB()
|
||||
const record = await db.get(ctx.params.recordId)
|
||||
if (record.modelId !== ctx.params.modelId) {
|
||||
ctx.throw(400, "Supplied modelId doesn't match the record's modelId")
|
||||
|
@ -188,9 +179,10 @@ exports.destroy = async function(ctx) {
|
|||
}
|
||||
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
||||
ctx.status = 200
|
||||
// for automations
|
||||
// for automations include the record that was deleted
|
||||
ctx.record = record
|
||||
emitEvent(`record:delete`, ctx, record)
|
||||
ctx.eventEmitter &&
|
||||
ctx.eventEmitter.emitRecord(`record:delete`, instanceId, record)
|
||||
}
|
||||
|
||||
exports.validate = async function(ctx) {
|
||||
|
|
|
@ -1,44 +0,0 @@
|
|||
let events = require("events")
|
||||
|
||||
// Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||
// the state of the message, implement this for the sake of maintaining API consistency
|
||||
function newJob(queue, message) {
|
||||
return {
|
||||
timestamp: Date.now(),
|
||||
queue: queue,
|
||||
data: message,
|
||||
}
|
||||
}
|
||||
|
||||
// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock
|
||||
class InMemoryQueue {
|
||||
// opts is not used by this as there is no real use case when in memory, but is the same API as Bull
|
||||
constructor(name, opts) {
|
||||
this._name = name
|
||||
this._opts = opts
|
||||
this._messages = []
|
||||
this._emitter = new events.EventEmitter()
|
||||
}
|
||||
|
||||
// same API as bull, provide a callback and it will respond when messages are available
|
||||
process(func) {
|
||||
this._emitter.on("message", async () => {
|
||||
if (this._messages.length <= 0) {
|
||||
return
|
||||
}
|
||||
let msg = this._messages.shift()
|
||||
let resp = func(msg)
|
||||
if (resp.then != null) {
|
||||
await resp
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// simply puts a message to the queue and emits to the queue for processing
|
||||
add(msg) {
|
||||
this._messages.push(newJob(this._name, msg))
|
||||
this._emitter.emit("message")
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = InMemoryQueue
|
|
@ -1,8 +1,8 @@
|
|||
const CouchDB = require("../db")
|
||||
const emitter = require("../events/index")
|
||||
const InMemoryQueue = require("./queue/inMemoryQueue")
|
||||
const InMemoryQueue = require("../utilities/queue/inMemoryQueue")
|
||||
|
||||
let automationQueue = new InMemoryQueue()
|
||||
let automationQueue = new InMemoryQueue("automationQueue")
|
||||
|
||||
const FAKE_STRING = "TEST"
|
||||
const FAKE_BOOL = false
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
const CouchDB = require("./index")
|
||||
const emitter = require("../events/index")
|
||||
const InMemoryQueue = require("../utilities/queue/inMemoryQueue")
|
||||
|
||||
/**
|
||||
* This functionality makes sure that when records with links are created, updated or deleted they are processed
|
||||
* correctly - making sure that no stale links are left around and that all links have been made successfully.
|
||||
*/
|
||||
|
||||
const EventType = {
|
||||
RECORD_SAVE: "record:save",
|
||||
RECORD_UPDATE: "record:update",
|
||||
RECORD_DELETE: "record:delete",
|
||||
MODEL_SAVE: "model:save",
|
||||
MODEL_DELETE: "model:delete",
|
||||
}
|
||||
const linkedRecordQueue = new InMemoryQueue("linkedRecordQueue")
|
||||
|
||||
function createEmitterCallback(eventName) {
|
||||
emitter.on(eventName, function(event) {
|
||||
if (!event || !event.record || !event.record.modelId) {
|
||||
return
|
||||
}
|
||||
linkedRecordQueue.add({
|
||||
type: eventName,
|
||||
event,
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
for (let typeKey of Object.keys(EventType)) {
|
||||
createEmitterCallback(EventType[typeKey])
|
||||
}
|
||||
|
||||
function doesModelHaveLinkedRecords(model) {
|
||||
for (let key of Object.keys(model.schema)) {
|
||||
const { type } = model.schema[key]
|
||||
if (type === "link") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
linkedRecordQueue.process(async job => {
|
||||
let event = job.data
|
||||
// can't operate without these properties
|
||||
if (event.instanceId == null || event.modelId == null) {
|
||||
return
|
||||
}
|
||||
const db = new CouchDB(event.instanceId)
|
||||
let model = event.model == null ? await db.get(event.modelId) : event.model
|
||||
// model doesn't have links, can stop here
|
||||
if (!doesModelHaveLinkedRecords(model)) {
|
||||
return
|
||||
}
|
||||
// no linked records to operate on
|
||||
if (model == null) {
|
||||
return
|
||||
}
|
||||
switch (event.type) {
|
||||
case EventType.RECORD_SAVE:
|
||||
break
|
||||
case EventType.RECORD_UPDATE:
|
||||
break
|
||||
case EventType.RECORD_DELETE:
|
||||
break
|
||||
case EventType.MODEL_SAVE:
|
||||
break
|
||||
case EventType.MODEL_DELETE:
|
||||
break
|
||||
}
|
||||
})
|
|
@ -6,6 +6,45 @@ const EventEmitter = require("events").EventEmitter
|
|||
* future.
|
||||
*/
|
||||
|
||||
const emitter = new EventEmitter()
|
||||
/**
|
||||
* Extending the standard emitter to some syntactic sugar and standardisation to the emitted event.
|
||||
* This is specifically quite important for mustache used in automations.
|
||||
*/
|
||||
class BudibaseEmitter extends EventEmitter {
|
||||
emitRecord(eventName, instanceId, record, model = null) {
|
||||
let event = {
|
||||
record,
|
||||
instanceId,
|
||||
modelId: record.modelId,
|
||||
}
|
||||
if (model) {
|
||||
event.model = model
|
||||
}
|
||||
if (record._id) {
|
||||
event.id = record._id
|
||||
}
|
||||
if (record._rev) {
|
||||
event.revision = record._rev
|
||||
}
|
||||
this.emit(eventName, event)
|
||||
}
|
||||
|
||||
emitModel(eventName, instanceId, model = null) {
|
||||
let event = {
|
||||
model,
|
||||
instanceId,
|
||||
modelId: model._id,
|
||||
}
|
||||
if (model._id) {
|
||||
event.id = model._id
|
||||
}
|
||||
if (model._rev) {
|
||||
event.revision = model._rev
|
||||
}
|
||||
this.emit(eventName, event)
|
||||
}
|
||||
}
|
||||
|
||||
const emitter = new BudibaseEmitter()
|
||||
|
||||
module.exports = emitter
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
let events = require("events")
|
||||
|
||||
/**
|
||||
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||
* the state of the message, this object constructor implements the same schema of Bull jobs
|
||||
* for the sake of maintaining API consistency.
|
||||
* @param {string} queue The name of the queue which the message will be carried on.
|
||||
* @param {object} message The JSON message which will be passed back to the consumer.
|
||||
* @returns {Object} A new job which can now be put onto the queue, this is mostly an
|
||||
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
|
||||
*/
|
||||
function newJob(queue, message) {
|
||||
return {
|
||||
timestamp: Date.now(),
|
||||
queue: queue,
|
||||
data: message,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock.
|
||||
* It is relatively simple, using an event emitter internally to register when messages are available
|
||||
* to the consumers - in can support many inputs and many consumers.
|
||||
*/
|
||||
class InMemoryQueue {
|
||||
/**
|
||||
* The constructor the queue, exactly the same as that of Bulls.
|
||||
* @param {string} name The name of the queue which is being configured.
|
||||
* @param {object|null} opts This is not used by the in memory queue as there is no real use
|
||||
* case when in memory, but is the same API as Bull
|
||||
*/
|
||||
constructor(name, opts = null) {
|
||||
this._name = name
|
||||
this._opts = opts
|
||||
this._messages = []
|
||||
this._emitter = new events.EventEmitter()
|
||||
}
|
||||
|
||||
/**
|
||||
* Same callback API as Bull, each callback passed to this will consume messages as they are
|
||||
* available. Please note this is a queue service, not a notification service, so each
|
||||
* consumer will receive different messages.
|
||||
* @param {function<object>} func The callback function which will return a "Job", the same
|
||||
* as the Bull API, within this job the property "data" contains the JSON message. Please
|
||||
* note this is incredibly limited compared to Bull as in reality the Job would contain
|
||||
* a lot more information about the queue and current status of Bull cluster.
|
||||
*/
|
||||
process(func) {
|
||||
this._emitter.on("message", async () => {
|
||||
if (this._messages.length <= 0) {
|
||||
return
|
||||
}
|
||||
let msg = this._messages.shift()
|
||||
let resp = func(msg)
|
||||
if (resp.then != null) {
|
||||
await resp
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// simply puts a message to the queue and emits to the queue for processing
|
||||
/**
|
||||
* Simple function to replicate the add message functionality of Bull, putting
|
||||
* a new message on the queue. This then emits an event which will be used to
|
||||
* return the message to a consumer (if one is attached).
|
||||
* @param {object} msg A message to be transported over the queue, this should be
|
||||
* a JSON message as this is required by Bull.
|
||||
*/
|
||||
add(msg) {
|
||||
if (typeof msg !== "object") {
|
||||
throw "Queue only supports carrying JSON."
|
||||
}
|
||||
this._messages.push(newJob(this._name, msg))
|
||||
this._emitter.emit("message")
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = InMemoryQueue
|
Loading…
Reference in New Issue