Some groundwork for the linked records, building up a much more in-depth emitter for models and records to drive the record cleanup.
This commit is contained in:
parent
403da250b9
commit
b24ff486d8
|
@ -12,12 +12,12 @@ exports.fetch = async function(ctx) {
|
||||||
|
|
||||||
exports.find = async function(ctx) {
|
exports.find = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const db = new CouchDB(ctx.user.instanceId)
|
||||||
const model = await db.get(ctx.params.id)
|
ctx.body = await db.get(ctx.params.id)
|
||||||
ctx.body = model
|
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.save = async function(ctx) {
|
exports.save = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
const modelToSave = {
|
const modelToSave = {
|
||||||
type: "model",
|
type: "model",
|
||||||
_id: newid(),
|
_id: newid(),
|
||||||
|
@ -54,7 +54,7 @@ exports.save = async function(ctx) {
|
||||||
modelToSave._rev = result.rev
|
modelToSave._rev = result.rev
|
||||||
|
|
||||||
const { schema } = ctx.request.body
|
const { schema } = ctx.request.body
|
||||||
for (let key in schema) {
|
for (let key of Object.keys(schema)) {
|
||||||
// model has a linked record
|
// model has a linked record
|
||||||
if (schema[key].type === "link") {
|
if (schema[key].type === "link") {
|
||||||
// create the link field in the other model
|
// create the link field in the other model
|
||||||
|
@ -84,13 +84,18 @@ exports.save = async function(ctx) {
|
||||||
}
|
}
|
||||||
await db.put(designDoc)
|
await db.put(designDoc)
|
||||||
|
|
||||||
|
// syntactic sugar for event emission
|
||||||
|
modelToSave.modelId = modelToSave._id
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitModel(`model:save`, instanceId, modelToSave)
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `Model ${ctx.request.body.name} saved successfully.`
|
ctx.message = `Model ${ctx.request.body.name} saved successfully.`
|
||||||
ctx.body = modelToSave
|
ctx.body = modelToSave
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.destroy = async function(ctx) {
|
exports.destroy = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
|
|
||||||
const modelToDelete = await db.get(ctx.params.modelId)
|
const modelToDelete = await db.get(ctx.params.modelId)
|
||||||
|
|
||||||
|
@ -105,7 +110,7 @@ exports.destroy = async function(ctx) {
|
||||||
)
|
)
|
||||||
|
|
||||||
// Delete linked record fields in dependent models
|
// Delete linked record fields in dependent models
|
||||||
for (let key in modelToDelete.schema) {
|
for (let key of Object.keys(modelToDelete.schema)) {
|
||||||
const { type, modelId } = modelToDelete.schema[key]
|
const { type, modelId } = modelToDelete.schema[key]
|
||||||
if (type === "link") {
|
if (type === "link") {
|
||||||
const linkedModel = await db.get(modelId)
|
const linkedModel = await db.get(modelId)
|
||||||
|
@ -119,6 +124,10 @@ exports.destroy = async function(ctx) {
|
||||||
delete designDoc.views[modelViewId]
|
delete designDoc.views[modelViewId]
|
||||||
await db.put(designDoc)
|
await db.put(designDoc)
|
||||||
|
|
||||||
|
// syntactic sugar for event emission
|
||||||
|
modelToDelete.modelId = modelToDelete._id
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitModel(`model:delete`, instanceId, modelToDelete)
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `Model ${ctx.params.modelId} deleted.`
|
ctx.message = `Model ${ctx.params.modelId} deleted.`
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,21 +2,6 @@ const CouchDB = require("../../db")
|
||||||
const validateJs = require("validate.js")
|
const validateJs = require("validate.js")
|
||||||
const newid = require("../../db/newid")
|
const newid = require("../../db/newid")
|
||||||
|
|
||||||
function emitEvent(eventType, ctx, record) {
|
|
||||||
let event = {
|
|
||||||
record,
|
|
||||||
instanceId: ctx.user.instanceId,
|
|
||||||
}
|
|
||||||
// add syntactic sugar for mustache later
|
|
||||||
if (record._id) {
|
|
||||||
event.id = record._id
|
|
||||||
}
|
|
||||||
if (record._rev) {
|
|
||||||
event.revision = record._rev
|
|
||||||
}
|
|
||||||
ctx.eventEmitter && ctx.eventEmitter.emit(eventType, event)
|
|
||||||
}
|
|
||||||
|
|
||||||
validateJs.extend(validateJs.validators.datetime, {
|
validateJs.extend(validateJs.validators.datetime, {
|
||||||
parse: function(value) {
|
parse: function(value) {
|
||||||
return new Date(value).getTime()
|
return new Date(value).getTime()
|
||||||
|
@ -28,7 +13,8 @@ validateJs.extend(validateJs.validators.datetime, {
|
||||||
})
|
})
|
||||||
|
|
||||||
exports.patch = async function(ctx) {
|
exports.patch = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
const record = await db.get(ctx.params.id)
|
const record = await db.get(ctx.params.id)
|
||||||
const model = await db.get(record.modelId)
|
const model = await db.get(record.modelId)
|
||||||
const patchfields = ctx.request.body
|
const patchfields = ctx.request.body
|
||||||
|
@ -55,13 +41,16 @@ exports.patch = async function(ctx) {
|
||||||
const response = await db.put(record)
|
const response = await db.put(record)
|
||||||
record._rev = response.rev
|
record._rev = response.rev
|
||||||
record.type = "record"
|
record.type = "record"
|
||||||
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitRecord(`record:update`, instanceId, record, model)
|
||||||
ctx.body = record
|
ctx.body = record
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `${model.name} updated successfully.`
|
ctx.message = `${model.name} updated successfully.`
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.save = async function(ctx) {
|
exports.save = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB(instanceId)
|
||||||
const record = ctx.request.body
|
const record = ctx.request.body
|
||||||
record.modelId = ctx.params.modelId
|
record.modelId = ctx.params.modelId
|
||||||
|
|
||||||
|
@ -124,7 +113,8 @@ exports.save = async function(ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
emitEvent(`record:save`, ctx, record)
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitRecord(`record:save`, instanceId, record, model)
|
||||||
ctx.body = record
|
ctx.body = record
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
ctx.message = `${model.name} created successfully`
|
ctx.message = `${model.name} created successfully`
|
||||||
|
@ -180,7 +170,8 @@ exports.find = async function(ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.destroy = async function(ctx) {
|
exports.destroy = async function(ctx) {
|
||||||
const db = new CouchDB(ctx.user.instanceId)
|
const instanceId = ctx.user.instanceId
|
||||||
|
const db = new CouchDB()
|
||||||
const record = await db.get(ctx.params.recordId)
|
const record = await db.get(ctx.params.recordId)
|
||||||
if (record.modelId !== ctx.params.modelId) {
|
if (record.modelId !== ctx.params.modelId) {
|
||||||
ctx.throw(400, "Supplied modelId doesn't match the record's modelId")
|
ctx.throw(400, "Supplied modelId doesn't match the record's modelId")
|
||||||
|
@ -188,9 +179,10 @@ exports.destroy = async function(ctx) {
|
||||||
}
|
}
|
||||||
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
ctx.body = await db.remove(ctx.params.recordId, ctx.params.revId)
|
||||||
ctx.status = 200
|
ctx.status = 200
|
||||||
// for automations
|
// for automations include the record that was deleted
|
||||||
ctx.record = record
|
ctx.record = record
|
||||||
emitEvent(`record:delete`, ctx, record)
|
ctx.eventEmitter &&
|
||||||
|
ctx.eventEmitter.emitRecord(`record:delete`, instanceId, record)
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.validate = async function(ctx) {
|
exports.validate = async function(ctx) {
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
let events = require("events")
|
|
||||||
|
|
||||||
// Bull works with a Job wrapper around all messages that contains a lot more information about
|
|
||||||
// the state of the message, implement this for the sake of maintaining API consistency
|
|
||||||
function newJob(queue, message) {
|
|
||||||
return {
|
|
||||||
timestamp: Date.now(),
|
|
||||||
queue: queue,
|
|
||||||
data: message,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock
|
|
||||||
class InMemoryQueue {
|
|
||||||
// opts is not used by this as there is no real use case when in memory, but is the same API as Bull
|
|
||||||
constructor(name, opts) {
|
|
||||||
this._name = name
|
|
||||||
this._opts = opts
|
|
||||||
this._messages = []
|
|
||||||
this._emitter = new events.EventEmitter()
|
|
||||||
}
|
|
||||||
|
|
||||||
// same API as bull, provide a callback and it will respond when messages are available
|
|
||||||
process(func) {
|
|
||||||
this._emitter.on("message", async () => {
|
|
||||||
if (this._messages.length <= 0) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
let msg = this._messages.shift()
|
|
||||||
let resp = func(msg)
|
|
||||||
if (resp.then != null) {
|
|
||||||
await resp
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// simply puts a message to the queue and emits to the queue for processing
|
|
||||||
add(msg) {
|
|
||||||
this._messages.push(newJob(this._name, msg))
|
|
||||||
this._emitter.emit("message")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports = InMemoryQueue
|
|
|
@ -1,8 +1,8 @@
|
||||||
const CouchDB = require("../db")
|
const CouchDB = require("../db")
|
||||||
const emitter = require("../events/index")
|
const emitter = require("../events/index")
|
||||||
const InMemoryQueue = require("./queue/inMemoryQueue")
|
const InMemoryQueue = require("../utilities/queue/inMemoryQueue")
|
||||||
|
|
||||||
let automationQueue = new InMemoryQueue()
|
let automationQueue = new InMemoryQueue("automationQueue")
|
||||||
|
|
||||||
const FAKE_STRING = "TEST"
|
const FAKE_STRING = "TEST"
|
||||||
const FAKE_BOOL = false
|
const FAKE_BOOL = false
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
const CouchDB = require("./index")
|
||||||
|
const emitter = require("../events/index")
|
||||||
|
const InMemoryQueue = require("../utilities/queue/inMemoryQueue")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This functionality makes sure that when records with links are created, updated or deleted they are processed
|
||||||
|
* correctly - making sure that no stale links are left around and that all links have been made successfully.
|
||||||
|
*/
|
||||||
|
|
||||||
|
const EventType = {
|
||||||
|
RECORD_SAVE: "record:save",
|
||||||
|
RECORD_UPDATE: "record:update",
|
||||||
|
RECORD_DELETE: "record:delete",
|
||||||
|
MODEL_SAVE: "model:save",
|
||||||
|
MODEL_DELETE: "model:delete",
|
||||||
|
}
|
||||||
|
const linkedRecordQueue = new InMemoryQueue("linkedRecordQueue")
|
||||||
|
|
||||||
|
function createEmitterCallback(eventName) {
|
||||||
|
emitter.on(eventName, function(event) {
|
||||||
|
if (!event || !event.record || !event.record.modelId) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
linkedRecordQueue.add({
|
||||||
|
type: eventName,
|
||||||
|
event,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let typeKey of Object.keys(EventType)) {
|
||||||
|
createEmitterCallback(EventType[typeKey])
|
||||||
|
}
|
||||||
|
|
||||||
|
function doesModelHaveLinkedRecords(model) {
|
||||||
|
for (let key of Object.keys(model.schema)) {
|
||||||
|
const { type } = model.schema[key]
|
||||||
|
if (type === "link") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
linkedRecordQueue.process(async job => {
|
||||||
|
let event = job.data
|
||||||
|
// can't operate without these properties
|
||||||
|
if (event.instanceId == null || event.modelId == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const db = new CouchDB(event.instanceId)
|
||||||
|
let model = event.model == null ? await db.get(event.modelId) : event.model
|
||||||
|
// model doesn't have links, can stop here
|
||||||
|
if (!doesModelHaveLinkedRecords(model)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// no linked records to operate on
|
||||||
|
if (model == null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch (event.type) {
|
||||||
|
case EventType.RECORD_SAVE:
|
||||||
|
break
|
||||||
|
case EventType.RECORD_UPDATE:
|
||||||
|
break
|
||||||
|
case EventType.RECORD_DELETE:
|
||||||
|
break
|
||||||
|
case EventType.MODEL_SAVE:
|
||||||
|
break
|
||||||
|
case EventType.MODEL_DELETE:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
})
|
|
@ -6,6 +6,45 @@ const EventEmitter = require("events").EventEmitter
|
||||||
* future.
|
* future.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
const emitter = new EventEmitter()
|
/**
|
||||||
|
* Extending the standard emitter to some syntactic sugar and standardisation to the emitted event.
|
||||||
|
* This is specifically quite important for mustache used in automations.
|
||||||
|
*/
|
||||||
|
class BudibaseEmitter extends EventEmitter {
|
||||||
|
emitRecord(eventName, instanceId, record, model = null) {
|
||||||
|
let event = {
|
||||||
|
record,
|
||||||
|
instanceId,
|
||||||
|
modelId: record.modelId,
|
||||||
|
}
|
||||||
|
if (model) {
|
||||||
|
event.model = model
|
||||||
|
}
|
||||||
|
if (record._id) {
|
||||||
|
event.id = record._id
|
||||||
|
}
|
||||||
|
if (record._rev) {
|
||||||
|
event.revision = record._rev
|
||||||
|
}
|
||||||
|
this.emit(eventName, event)
|
||||||
|
}
|
||||||
|
|
||||||
|
emitModel(eventName, instanceId, model = null) {
|
||||||
|
let event = {
|
||||||
|
model,
|
||||||
|
instanceId,
|
||||||
|
modelId: model._id,
|
||||||
|
}
|
||||||
|
if (model._id) {
|
||||||
|
event.id = model._id
|
||||||
|
}
|
||||||
|
if (model._rev) {
|
||||||
|
event.revision = model._rev
|
||||||
|
}
|
||||||
|
this.emit(eventName, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const emitter = new BudibaseEmitter()
|
||||||
|
|
||||||
module.exports = emitter
|
module.exports = emitter
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
let events = require("events")
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||||
|
* the state of the message, this object constructor implements the same schema of Bull jobs
|
||||||
|
* for the sake of maintaining API consistency.
|
||||||
|
* @param {string} queue The name of the queue which the message will be carried on.
|
||||||
|
* @param {object} message The JSON message which will be passed back to the consumer.
|
||||||
|
* @returns {Object} A new job which can now be put onto the queue, this is mostly an
|
||||||
|
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
|
||||||
|
*/
|
||||||
|
function newJob(queue, message) {
|
||||||
|
return {
|
||||||
|
timestamp: Date.now(),
|
||||||
|
queue: queue,
|
||||||
|
data: message,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock.
|
||||||
|
* It is relatively simple, using an event emitter internally to register when messages are available
|
||||||
|
* to the consumers - in can support many inputs and many consumers.
|
||||||
|
*/
|
||||||
|
class InMemoryQueue {
|
||||||
|
/**
|
||||||
|
* The constructor the queue, exactly the same as that of Bulls.
|
||||||
|
* @param {string} name The name of the queue which is being configured.
|
||||||
|
* @param {object|null} opts This is not used by the in memory queue as there is no real use
|
||||||
|
* case when in memory, but is the same API as Bull
|
||||||
|
*/
|
||||||
|
constructor(name, opts = null) {
|
||||||
|
this._name = name
|
||||||
|
this._opts = opts
|
||||||
|
this._messages = []
|
||||||
|
this._emitter = new events.EventEmitter()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Same callback API as Bull, each callback passed to this will consume messages as they are
|
||||||
|
* available. Please note this is a queue service, not a notification service, so each
|
||||||
|
* consumer will receive different messages.
|
||||||
|
* @param {function<object>} func The callback function which will return a "Job", the same
|
||||||
|
* as the Bull API, within this job the property "data" contains the JSON message. Please
|
||||||
|
* note this is incredibly limited compared to Bull as in reality the Job would contain
|
||||||
|
* a lot more information about the queue and current status of Bull cluster.
|
||||||
|
*/
|
||||||
|
process(func) {
|
||||||
|
this._emitter.on("message", async () => {
|
||||||
|
if (this._messages.length <= 0) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
let msg = this._messages.shift()
|
||||||
|
let resp = func(msg)
|
||||||
|
if (resp.then != null) {
|
||||||
|
await resp
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// simply puts a message to the queue and emits to the queue for processing
|
||||||
|
/**
|
||||||
|
* Simple function to replicate the add message functionality of Bull, putting
|
||||||
|
* a new message on the queue. This then emits an event which will be used to
|
||||||
|
* return the message to a consumer (if one is attached).
|
||||||
|
* @param {object} msg A message to be transported over the queue, this should be
|
||||||
|
* a JSON message as this is required by Bull.
|
||||||
|
*/
|
||||||
|
add(msg) {
|
||||||
|
if (typeof msg !== "object") {
|
||||||
|
throw "Queue only supports carrying JSON."
|
||||||
|
}
|
||||||
|
this._messages.push(newJob(this._name, msg))
|
||||||
|
this._emitter.emit("message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = InMemoryQueue
|
Loading…
Reference in New Issue