Adding metadata system and re-writing how Cron works, previously cron only worked in dev because it would never be enabled for the production app ID, this makes it so that it is never enabled for the dev app and when the production app is deployed it runs through all the automations and checks if any need cron jobs setup/disabled.
This commit is contained in:
parent
e84d30524b
commit
557bd2df9f
|
@ -2,16 +2,10 @@ const CouchDB = require("../../db")
|
||||||
const actions = require("../../automations/actions")
|
const actions = require("../../automations/actions")
|
||||||
const logic = require("../../automations/logic")
|
const logic = require("../../automations/logic")
|
||||||
const triggers = require("../../automations/triggers")
|
const triggers = require("../../automations/triggers")
|
||||||
const webhooks = require("./webhook")
|
const { getAutomationParams, generateAutomationID } = require("../../db/utils")
|
||||||
const {
|
const { saveEntityMetadata } = require("../../utilities")
|
||||||
getAutomationParams,
|
const { MetadataTypes } = require("../../constants")
|
||||||
generateAutomationID,
|
const { checkForWebhooks } = require("../../automations/utils")
|
||||||
isDevAppID,
|
|
||||||
isProdAppID,
|
|
||||||
} = require("../../db/utils")
|
|
||||||
|
|
||||||
const WH_STEP_ID = triggers.TRIGGER_DEFINITIONS.WEBHOOK.stepId
|
|
||||||
const CRON_STEP_ID = triggers.TRIGGER_DEFINITIONS.CRON.stepId
|
|
||||||
|
|
||||||
/*************************
|
/*************************
|
||||||
* *
|
* *
|
||||||
|
@ -26,6 +20,10 @@ function cleanAutomationInputs(automation) {
|
||||||
let steps = automation.definition.steps
|
let steps = automation.definition.steps
|
||||||
let trigger = automation.definition.trigger
|
let trigger = automation.definition.trigger
|
||||||
let allSteps = [...steps, trigger]
|
let allSteps = [...steps, trigger]
|
||||||
|
// live is not a property used anymore
|
||||||
|
if (automation.live != null) {
|
||||||
|
delete automation.live
|
||||||
|
}
|
||||||
for (let step of allSteps) {
|
for (let step of allSteps) {
|
||||||
if (step == null) {
|
if (step == null) {
|
||||||
continue
|
continue
|
||||||
|
@ -39,119 +37,6 @@ function cleanAutomationInputs(automation) {
|
||||||
return automation
|
return automation
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* This function handles checking of any cron jobs need to be created or deleted for automations.
|
|
||||||
* @param {string} appId The ID of the app in which we are checking for webhooks
|
|
||||||
* @param {object|undefined} oldAuto The old automation object if updating/deleting
|
|
||||||
* @param {object|undefined} newAuto The new automation object if creating/updating
|
|
||||||
*/
|
|
||||||
async function checkForCronTriggers({ appId, oldAuto, newAuto }) {
|
|
||||||
const oldTrigger = oldAuto ? oldAuto.definition.trigger : null
|
|
||||||
const newTrigger = newAuto ? newAuto.definition.trigger : null
|
|
||||||
function isCronTrigger(auto) {
|
|
||||||
return (
|
|
||||||
auto &&
|
|
||||||
auto.definition.trigger &&
|
|
||||||
auto.definition.trigger.stepId === CRON_STEP_ID
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
const isLive = auto => auto && auto.live
|
|
||||||
|
|
||||||
const cronTriggerRemoved =
|
|
||||||
isCronTrigger(oldAuto) && !isCronTrigger(newAuto) && oldTrigger.cronJobId
|
|
||||||
const cronTriggerDeactivated = !isLive(newAuto) && isLive(oldAuto)
|
|
||||||
|
|
||||||
const cronTriggerActivated = isLive(newAuto) && !isLive(oldAuto)
|
|
||||||
|
|
||||||
if (cronTriggerRemoved || (cronTriggerDeactivated && oldTrigger.cronJobId)) {
|
|
||||||
await triggers.automationQueue.removeRepeatableByKey(oldTrigger.cronJobId)
|
|
||||||
}
|
|
||||||
// need to create cron job
|
|
||||||
else if (isCronTrigger(newAuto) && cronTriggerActivated) {
|
|
||||||
const job = await triggers.automationQueue.add(
|
|
||||||
{
|
|
||||||
automation: newAuto,
|
|
||||||
event: { appId, timestamp: Date.now() },
|
|
||||||
},
|
|
||||||
{ repeat: { cron: newTrigger.inputs.cron } }
|
|
||||||
)
|
|
||||||
// Assign cron job ID from bull so we can remove it later if the cron trigger is removed
|
|
||||||
newTrigger.cronJobId = job.id
|
|
||||||
}
|
|
||||||
return newAuto
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This function handles checking if any webhooks need to be created or deleted for automations.
|
|
||||||
* @param {string} appId The ID of the app in which we are checking for webhooks
|
|
||||||
* @param {object|undefined} oldAuto The old automation object if updating/deleting
|
|
||||||
* @param {object|undefined} newAuto The new automation object if creating/updating
|
|
||||||
* @returns {Promise<object|undefined>} After this is complete the new automation object may have been updated and should be
|
|
||||||
* written to DB (this does not write to DB as it would be wasteful to repeat).
|
|
||||||
*/
|
|
||||||
async function checkForWebhooks({ appId, oldAuto, newAuto }) {
|
|
||||||
const oldTrigger = oldAuto ? oldAuto.definition.trigger : null
|
|
||||||
const newTrigger = newAuto ? newAuto.definition.trigger : null
|
|
||||||
const triggerChanged =
|
|
||||||
oldTrigger && newTrigger && oldTrigger.id !== newTrigger.id
|
|
||||||
function isWebhookTrigger(auto) {
|
|
||||||
return (
|
|
||||||
auto &&
|
|
||||||
auto.definition.trigger &&
|
|
||||||
auto.definition.trigger.stepId === WH_STEP_ID
|
|
||||||
)
|
|
||||||
}
|
|
||||||
// need to delete webhook
|
|
||||||
if (
|
|
||||||
isWebhookTrigger(oldAuto) &&
|
|
||||||
(!isWebhookTrigger(newAuto) || triggerChanged) &&
|
|
||||||
oldTrigger.webhookId
|
|
||||||
) {
|
|
||||||
try {
|
|
||||||
let db = new CouchDB(appId)
|
|
||||||
// need to get the webhook to get the rev
|
|
||||||
const webhook = await db.get(oldTrigger.webhookId)
|
|
||||||
const ctx = {
|
|
||||||
appId,
|
|
||||||
params: { id: webhook._id, rev: webhook._rev },
|
|
||||||
}
|
|
||||||
// might be updating - reset the inputs to remove the URLs
|
|
||||||
if (newTrigger) {
|
|
||||||
delete newTrigger.webhookId
|
|
||||||
newTrigger.inputs = {}
|
|
||||||
}
|
|
||||||
await webhooks.destroy(ctx)
|
|
||||||
} catch (err) {
|
|
||||||
// don't worry about not being able to delete, if it doesn't exist all good
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// need to create webhook
|
|
||||||
if (
|
|
||||||
(!isWebhookTrigger(oldAuto) || triggerChanged) &&
|
|
||||||
isWebhookTrigger(newAuto)
|
|
||||||
) {
|
|
||||||
const ctx = {
|
|
||||||
appId,
|
|
||||||
request: {
|
|
||||||
body: new webhooks.Webhook(
|
|
||||||
"Automation webhook",
|
|
||||||
webhooks.WebhookType.AUTOMATION,
|
|
||||||
newAuto._id
|
|
||||||
),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
await webhooks.save(ctx)
|
|
||||||
const id = ctx.body.webhook._id
|
|
||||||
newTrigger.webhookId = id
|
|
||||||
newTrigger.inputs = {
|
|
||||||
schemaUrl: `api/webhooks/schema/${appId}/${id}`,
|
|
||||||
triggerUrl: `api/webhooks/trigger/${appId}/${id}`,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return newAuto
|
|
||||||
}
|
|
||||||
|
|
||||||
exports.create = async function (ctx) {
|
exports.create = async function (ctx) {
|
||||||
const db = new CouchDB(ctx.appId)
|
const db = new CouchDB(ctx.appId)
|
||||||
let automation = ctx.request.body
|
let automation = ctx.request.body
|
||||||
|
@ -170,10 +55,6 @@ exports.create = async function (ctx) {
|
||||||
appId: ctx.appId,
|
appId: ctx.appId,
|
||||||
newAuto: automation,
|
newAuto: automation,
|
||||||
})
|
})
|
||||||
automation = await checkForCronTriggers({
|
|
||||||
appId: ctx.appId,
|
|
||||||
newAuto: automation,
|
|
||||||
})
|
|
||||||
const response = await db.put(automation)
|
const response = await db.put(automation)
|
||||||
automation._rev = response.rev
|
automation._rev = response.rev
|
||||||
|
|
||||||
|
@ -198,11 +79,6 @@ exports.update = async function (ctx) {
|
||||||
oldAuto: oldAutomation,
|
oldAuto: oldAutomation,
|
||||||
newAuto: automation,
|
newAuto: automation,
|
||||||
})
|
})
|
||||||
automation = await checkForCronTriggers({
|
|
||||||
appId: ctx.appId,
|
|
||||||
oldAuto: oldAutomation,
|
|
||||||
newAuto: automation,
|
|
||||||
})
|
|
||||||
const response = await db.put(automation)
|
const response = await db.put(automation)
|
||||||
automation._rev = response.rev
|
automation._rev = response.rev
|
||||||
|
|
||||||
|
@ -239,10 +115,6 @@ exports.destroy = async function (ctx) {
|
||||||
appId: ctx.appId,
|
appId: ctx.appId,
|
||||||
oldAuto: oldAutomation,
|
oldAuto: oldAutomation,
|
||||||
})
|
})
|
||||||
await checkForCronTriggers({
|
|
||||||
appId: ctx.appId,
|
|
||||||
oldAuto: oldAutomation,
|
|
||||||
})
|
|
||||||
ctx.body = await db.remove(ctx.params.id, ctx.params.rev)
|
ctx.body = await db.remove(ctx.params.id, ctx.params.rev)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,13 +146,6 @@ module.exports.getDefinitionList = async function (ctx) {
|
||||||
|
|
||||||
exports.trigger = async function (ctx) {
|
exports.trigger = async function (ctx) {
|
||||||
const appId = ctx.appId
|
const appId = ctx.appId
|
||||||
if (isDevAppID(appId)) {
|
|
||||||
// in dev apps don't throw an error, just don't trigger
|
|
||||||
ctx.body = {
|
|
||||||
message: "Automation not triggered, app in development.",
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const db = new CouchDB(appId)
|
const db = new CouchDB(appId)
|
||||||
let automation = await db.get(ctx.params.id)
|
let automation = await db.get(ctx.params.id)
|
||||||
await triggers.externalTrigger(automation, {
|
await triggers.externalTrigger(automation, {
|
||||||
|
@ -295,12 +160,9 @@ exports.trigger = async function (ctx) {
|
||||||
|
|
||||||
exports.test = async function (ctx) {
|
exports.test = async function (ctx) {
|
||||||
const appId = ctx.appId
|
const appId = ctx.appId
|
||||||
if (isProdAppID(appId)) {
|
|
||||||
ctx.throw(400, "Cannot test automations in production app.")
|
|
||||||
}
|
|
||||||
const db = new CouchDB(appId)
|
const db = new CouchDB(appId)
|
||||||
let automation = await db.get(ctx.params.id)
|
let automation = await db.get(ctx.params.id)
|
||||||
ctx.body = await triggers.externalTrigger(
|
const response = await triggers.externalTrigger(
|
||||||
automation,
|
automation,
|
||||||
{
|
{
|
||||||
...ctx.request.body,
|
...ctx.request.body,
|
||||||
|
@ -308,4 +170,12 @@ exports.test = async function (ctx) {
|
||||||
},
|
},
|
||||||
{ getResponses: true }
|
{ getResponses: true }
|
||||||
)
|
)
|
||||||
|
// save a test history run
|
||||||
|
await saveEntityMetadata(
|
||||||
|
ctx.appId,
|
||||||
|
MetadataTypes.AUTOMATION_TEST_HISTORY,
|
||||||
|
automation._id,
|
||||||
|
ctx.request.body
|
||||||
|
)
|
||||||
|
ctx.body = response
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,11 @@
|
||||||
const CouchDB = require("../../../db")
|
const CouchDB = require("../../../db")
|
||||||
const Deployment = require("./Deployment")
|
const Deployment = require("./Deployment")
|
||||||
const { Replication } = require("@budibase/auth/db")
|
const { Replication } = require("@budibase/auth/db")
|
||||||
const { DocumentTypes } = require("../../../db/utils")
|
const { DocumentTypes, getAutomationParams } = require("../../../db/utils")
|
||||||
|
const {
|
||||||
|
disableAllCrons,
|
||||||
|
enableCronTrigger,
|
||||||
|
} = require("../../../automations/utils")
|
||||||
|
|
||||||
// the max time we can wait for an invalidation to complete before considering it failed
|
// the max time we can wait for an invalidation to complete before considering it failed
|
||||||
const MAX_PENDING_TIME_MS = 30 * 60000
|
const MAX_PENDING_TIME_MS = 30 * 60000
|
||||||
|
@ -58,6 +62,23 @@ async function storeDeploymentHistory(deployment) {
|
||||||
return deployment
|
return deployment
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function initDeployedApp(prodAppId) {
|
||||||
|
const db = new CouchDB(prodAppId)
|
||||||
|
const automations = (
|
||||||
|
await db.allDocs(
|
||||||
|
getAutomationParams(null, {
|
||||||
|
include_docs: true,
|
||||||
|
})
|
||||||
|
)
|
||||||
|
).rows.map(row => row.doc)
|
||||||
|
const promises = []
|
||||||
|
await disableAllCrons(prodAppId)
|
||||||
|
for (let automation of automations) {
|
||||||
|
promises.push(enableCronTrigger(prodAppId, automation))
|
||||||
|
}
|
||||||
|
await Promise.all(promises)
|
||||||
|
}
|
||||||
|
|
||||||
async function deployApp(deployment) {
|
async function deployApp(deployment) {
|
||||||
try {
|
try {
|
||||||
const productionAppId = deployment.appId.replace("_dev", "")
|
const productionAppId = deployment.appId.replace("_dev", "")
|
||||||
|
@ -85,6 +106,7 @@ async function deployApp(deployment) {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
await initDeployedApp(productionAppId)
|
||||||
deployment.setStatus(DeploymentStatus.SUCCESS)
|
deployment.setStatus(DeploymentStatus.SUCCESS)
|
||||||
await storeDeploymentHistory(deployment)
|
await storeDeploymentHistory(deployment)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
const { MetadataTypes } = require("../../constants")
|
||||||
|
const CouchDB = require("../../db")
|
||||||
|
const { generateMetadataID } = require("../../db/utils")
|
||||||
|
const { saveEntityMetadata } = require("../../utilities")
|
||||||
|
|
||||||
|
exports.getTypes = async ctx => {
|
||||||
|
ctx.body = {
|
||||||
|
types: MetadataTypes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.saveMetadata = async ctx => {
|
||||||
|
const { type, entityId } = ctx.params
|
||||||
|
if (type === MetadataTypes.AUTOMATION_TEST_HISTORY) {
|
||||||
|
ctx.throw(400, "Cannot save automation history type")
|
||||||
|
}
|
||||||
|
await saveEntityMetadata(ctx.appId, type, entityId, ctx.request.body)
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.deleteMetadata = async ctx => {
|
||||||
|
const { type, entityId } = ctx.params
|
||||||
|
const db = new CouchDB(ctx.appId)
|
||||||
|
const id = generateMetadataID(type, entityId)
|
||||||
|
let rev
|
||||||
|
try {
|
||||||
|
const metadata = await db.get(id)
|
||||||
|
if (metadata) {
|
||||||
|
rev = metadata._rev
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
// don't need to error if it doesn't exist
|
||||||
|
}
|
||||||
|
if (id && rev) {
|
||||||
|
await db.remove(id, rev)
|
||||||
|
}
|
||||||
|
ctx.body = {
|
||||||
|
message: "Metadata deleted successfully.",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.getMetadata = async ctx => {
|
||||||
|
const { type, entityId } = ctx.params
|
||||||
|
const db = new CouchDB(ctx.appId)
|
||||||
|
const id = generateMetadataID(type, entityId)
|
||||||
|
ctx.body = await db.get(id)
|
||||||
|
}
|
|
@ -9,6 +9,10 @@ const {
|
||||||
} = require("@budibase/auth/permissions")
|
} = require("@budibase/auth/permissions")
|
||||||
const Joi = require("joi")
|
const Joi = require("joi")
|
||||||
const { bodyResource, paramResource } = require("../../middleware/resourceId")
|
const { bodyResource, paramResource } = require("../../middleware/resourceId")
|
||||||
|
const {
|
||||||
|
middleware: appInfoMiddleware,
|
||||||
|
AppType,
|
||||||
|
} = require("../../middleware/appInfo")
|
||||||
|
|
||||||
const router = Router()
|
const router = Router()
|
||||||
|
|
||||||
|
@ -84,23 +88,25 @@ router
|
||||||
generateValidator(false),
|
generateValidator(false),
|
||||||
controller.create
|
controller.create
|
||||||
)
|
)
|
||||||
.post(
|
|
||||||
"/api/automations/:id/trigger",
|
|
||||||
paramResource("id"),
|
|
||||||
authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE),
|
|
||||||
controller.trigger
|
|
||||||
)
|
|
||||||
.post(
|
|
||||||
"/api/automations/:id/test",
|
|
||||||
paramResource("id"),
|
|
||||||
authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE),
|
|
||||||
controller.test
|
|
||||||
)
|
|
||||||
.delete(
|
.delete(
|
||||||
"/api/automations/:id/:rev",
|
"/api/automations/:id/:rev",
|
||||||
paramResource("id"),
|
paramResource("id"),
|
||||||
authorized(BUILDER),
|
authorized(BUILDER),
|
||||||
controller.destroy
|
controller.destroy
|
||||||
)
|
)
|
||||||
|
.post(
|
||||||
|
"/api/automations/:id/trigger",
|
||||||
|
appInfoMiddleware({ appType: AppType.PROD }),
|
||||||
|
paramResource("id"),
|
||||||
|
authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE),
|
||||||
|
controller.trigger
|
||||||
|
)
|
||||||
|
.post(
|
||||||
|
"/api/automations/:id/test",
|
||||||
|
appInfoMiddleware({ appType: AppType.DEV }),
|
||||||
|
paramResource("id"),
|
||||||
|
authorized(PermissionTypes.AUTOMATION, PermissionLevels.EXECUTE),
|
||||||
|
controller.test
|
||||||
|
)
|
||||||
|
|
||||||
module.exports = router
|
module.exports = router
|
||||||
|
|
|
@ -0,0 +1,32 @@
|
||||||
|
const Router = require("@koa/router")
|
||||||
|
const controller = require("../controllers/metadata")
|
||||||
|
const {
|
||||||
|
middleware: appInfoMiddleware,
|
||||||
|
AppType,
|
||||||
|
} = require("../../middleware/appInfo")
|
||||||
|
|
||||||
|
const router = Router()
|
||||||
|
|
||||||
|
router
|
||||||
|
.post(
|
||||||
|
"/api/metadata/:type/:entityId",
|
||||||
|
appInfoMiddleware({ appType: AppType.DEV }),
|
||||||
|
controller.saveMetadata
|
||||||
|
)
|
||||||
|
.delete(
|
||||||
|
"/api/metadata/:type/:entityId",
|
||||||
|
appInfoMiddleware({ appType: AppType.DEV }),
|
||||||
|
controller.deleteMetadata
|
||||||
|
)
|
||||||
|
.get(
|
||||||
|
"/api/metadata/type",
|
||||||
|
appInfoMiddleware({ appType: AppType.DEV }),
|
||||||
|
controller.getTypes
|
||||||
|
)
|
||||||
|
.get(
|
||||||
|
"/api/metadata/:type/:entityId",
|
||||||
|
appInfoMiddleware({ appType: AppType.DEV }),
|
||||||
|
controller.getMetadata
|
||||||
|
)
|
||||||
|
|
||||||
|
module.exports = router
|
|
@ -1,14 +1,22 @@
|
||||||
const { createBullBoard } = require("bull-board")
|
const { createBullBoard } = require("bull-board")
|
||||||
const { BullAdapter } = require("bull-board/bullAdapter")
|
const { BullAdapter } = require("bull-board/bullAdapter")
|
||||||
const { getQueues } = require("./triggers")
|
|
||||||
const express = require("express")
|
const express = require("express")
|
||||||
|
const env = require("../environment")
|
||||||
|
const Queue = env.isTest()
|
||||||
|
? require("../utilities/queue/inMemoryQueue")
|
||||||
|
: require("bull")
|
||||||
|
const { JobQueues } = require("../constants")
|
||||||
|
const { utils } = require("@budibase/auth/redis")
|
||||||
|
const { opts } = utils.getRedisOptions()
|
||||||
|
|
||||||
|
let automationQueue = new Queue(JobQueues.AUTOMATIONS, { redis: opts })
|
||||||
|
|
||||||
exports.pathPrefix = "/bulladmin"
|
exports.pathPrefix = "/bulladmin"
|
||||||
|
|
||||||
exports.init = () => {
|
exports.init = () => {
|
||||||
const expressApp = express()
|
const expressApp = express()
|
||||||
// Set up queues for bull board admin
|
// Set up queues for bull board admin
|
||||||
const queues = getQueues()
|
const queues = [automationQueue]
|
||||||
const adapters = []
|
const adapters = []
|
||||||
for (let queue of queues) {
|
for (let queue of queues) {
|
||||||
adapters.push(new BullAdapter(queue))
|
adapters.push(new BullAdapter(queue))
|
||||||
|
@ -18,3 +26,5 @@ exports.init = () => {
|
||||||
expressApp.use(exports.pathPrefix, router)
|
expressApp.use(exports.pathPrefix, router)
|
||||||
return expressApp
|
return expressApp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.queue = automationQueue
|
||||||
|
|
|
@ -1,12 +1,17 @@
|
||||||
const triggers = require("./triggers")
|
|
||||||
const { processEvent } = require("./utils")
|
const { processEvent } = require("./utils")
|
||||||
|
const { queue } = require("./bullboard")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
||||||
*/
|
*/
|
||||||
exports.init = async function () {
|
exports.init = function () {
|
||||||
// don't wait this promise, it'll never end
|
// this promise will not complete
|
||||||
triggers.automationQueue.process(async job => {
|
return queue.process(async job => {
|
||||||
await processEvent(job)
|
await processEvent(job)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.getQueues = () => {
|
||||||
|
return [queue]
|
||||||
|
}
|
||||||
|
exports.queue = queue
|
||||||
|
|
|
@ -1,20 +1,12 @@
|
||||||
const CouchDB = require("../db")
|
const CouchDB = require("../db")
|
||||||
const emitter = require("../events/index")
|
const emitter = require("../events/index")
|
||||||
const env = require("../environment")
|
|
||||||
const Queue = env.isTest()
|
|
||||||
? require("../utilities/queue/inMemoryQueue")
|
|
||||||
: require("bull")
|
|
||||||
const { getAutomationParams } = require("../db/utils")
|
const { getAutomationParams } = require("../db/utils")
|
||||||
const { coerce } = require("../utilities/rowProcessor")
|
const { coerce } = require("../utilities/rowProcessor")
|
||||||
const { utils } = require("@budibase/auth/redis")
|
|
||||||
const { JobQueues } = require("../constants")
|
|
||||||
const { definitions } = require("./triggerInfo")
|
const { definitions } = require("./triggerInfo")
|
||||||
const { isDevAppID } = require("../db/utils")
|
const { isDevAppID } = require("../db/utils")
|
||||||
// need this to call directly, so we can get a response
|
// need this to call directly, so we can get a response
|
||||||
const { processEvent } = require("./utils")
|
const { processEvent } = require("./utils")
|
||||||
|
const { queue } = require("./bullboard")
|
||||||
const { opts } = utils.getRedisOptions()
|
|
||||||
let automationQueue = new Queue(JobQueues.AUTOMATIONS, { redis: opts })
|
|
||||||
|
|
||||||
const TRIGGER_DEFINITIONS = definitions
|
const TRIGGER_DEFINITIONS = definitions
|
||||||
|
|
||||||
|
@ -44,13 +36,12 @@ async function queueRelevantRowAutomations(event, eventType) {
|
||||||
let automationDef = automation.definition
|
let automationDef = automation.definition
|
||||||
let automationTrigger = automationDef ? automationDef.trigger : {}
|
let automationTrigger = automationDef ? automationDef.trigger : {}
|
||||||
if (
|
if (
|
||||||
!automation.live ||
|
|
||||||
!automationTrigger.inputs ||
|
!automationTrigger.inputs ||
|
||||||
automationTrigger.inputs.tableId !== event.row.tableId
|
automationTrigger.inputs.tableId !== event.row.tableId
|
||||||
) {
|
) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
await automationQueue.add({ automation, event })
|
await queue.add({ automation, event })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,13 +89,8 @@ exports.externalTrigger = async function (
|
||||||
if (getResponses) {
|
if (getResponses) {
|
||||||
return processEvent({ data })
|
return processEvent({ data })
|
||||||
} else {
|
} else {
|
||||||
return automationQueue.add(data)
|
return queue.add(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.getQueues = () => {
|
|
||||||
return [automationQueue]
|
|
||||||
}
|
|
||||||
exports.automationQueue = automationQueue
|
|
||||||
|
|
||||||
exports.TRIGGER_DEFINITIONS = TRIGGER_DEFINITIONS
|
exports.TRIGGER_DEFINITIONS = TRIGGER_DEFINITIONS
|
||||||
|
|
|
@ -2,6 +2,14 @@ const env = require("../environment")
|
||||||
const workerFarm = require("worker-farm")
|
const workerFarm = require("worker-farm")
|
||||||
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
|
const { getAPIKey, update, Properties } = require("../utilities/usageQuota")
|
||||||
const singleThread = require("./thread")
|
const singleThread = require("./thread")
|
||||||
|
const { definitions } = require("./triggerInfo")
|
||||||
|
const webhooks = require("../api/controllers/webhook")
|
||||||
|
const CouchDB = require("../db")
|
||||||
|
const { queue } = require("./bullboard")
|
||||||
|
const newid = require("../db/newid")
|
||||||
|
|
||||||
|
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
||||||
|
const CRON_STEP_ID = definitions.CRON.stepId
|
||||||
|
|
||||||
let workers = workerFarm(require.resolve("./thread"))
|
let workers = workerFarm(require.resolve("./thread"))
|
||||||
|
|
||||||
|
@ -54,3 +62,121 @@ exports.processEvent = async job => {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// end the repetition and the job itself
|
||||||
|
exports.disableAllCrons = async appId => {
|
||||||
|
const promises = []
|
||||||
|
const jobs = await queue.getRepeatableJobs()
|
||||||
|
for (let job of jobs) {
|
||||||
|
if (job.key.includes(`${appId}_cron`)) {
|
||||||
|
promises.push(queue.removeRepeatableByKey(job.key))
|
||||||
|
promises.push(queue.removeJobs(job.id))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Promise.all(promises)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function handles checking of any cron jobs that need to be enabled/updated.
|
||||||
|
* @param {string} appId The ID of the app in which we are checking for webhooks
|
||||||
|
* @param {object|undefined} automation The automation object to be updated.
|
||||||
|
*/
|
||||||
|
exports.enableCronTrigger = async (appId, automation) => {
|
||||||
|
const trigger = automation ? automation.definition.trigger : null
|
||||||
|
function isCronTrigger(auto) {
|
||||||
|
return (
|
||||||
|
auto &&
|
||||||
|
auto.definition.trigger &&
|
||||||
|
auto.definition.trigger.stepId === CRON_STEP_ID
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// need to create cron job
|
||||||
|
if (isCronTrigger(automation)) {
|
||||||
|
// make a job id rather than letting Bull decide, makes it easier to handle on way out
|
||||||
|
const jobId = `${appId}_cron_${newid()}`
|
||||||
|
const job = await queue.add(
|
||||||
|
{
|
||||||
|
automation,
|
||||||
|
event: { appId, timestamp: Date.now() },
|
||||||
|
},
|
||||||
|
{ repeat: { cron: trigger.inputs.cron }, jobId }
|
||||||
|
)
|
||||||
|
// Assign cron job ID from bull so we can remove it later if the cron trigger is removed
|
||||||
|
trigger.cronJobId = job.id
|
||||||
|
const db = new CouchDB(appId)
|
||||||
|
const response = await db.put(automation)
|
||||||
|
automation._id = response.id
|
||||||
|
automation._rev = response.rev
|
||||||
|
}
|
||||||
|
return automation
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function handles checking if any webhooks need to be created or deleted for automations.
|
||||||
|
* @param {string} appId The ID of the app in which we are checking for webhooks
|
||||||
|
* @param {object|undefined} oldAuto The old automation object if updating/deleting
|
||||||
|
* @param {object|undefined} newAuto The new automation object if creating/updating
|
||||||
|
* @returns {Promise<object|undefined>} After this is complete the new automation object may have been updated and should be
|
||||||
|
* written to DB (this does not write to DB as it would be wasteful to repeat).
|
||||||
|
*/
|
||||||
|
exports.checkForWebhooks = async ({ appId, oldAuto, newAuto }) => {
|
||||||
|
const oldTrigger = oldAuto ? oldAuto.definition.trigger : null
|
||||||
|
const newTrigger = newAuto ? newAuto.definition.trigger : null
|
||||||
|
const triggerChanged =
|
||||||
|
oldTrigger && newTrigger && oldTrigger.id !== newTrigger.id
|
||||||
|
function isWebhookTrigger(auto) {
|
||||||
|
return (
|
||||||
|
auto &&
|
||||||
|
auto.definition.trigger &&
|
||||||
|
auto.definition.trigger.stepId === WH_STEP_ID
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// need to delete webhook
|
||||||
|
if (
|
||||||
|
isWebhookTrigger(oldAuto) &&
|
||||||
|
(!isWebhookTrigger(newAuto) || triggerChanged) &&
|
||||||
|
oldTrigger.webhookId
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
let db = new CouchDB(appId)
|
||||||
|
// need to get the webhook to get the rev
|
||||||
|
const webhook = await db.get(oldTrigger.webhookId)
|
||||||
|
const ctx = {
|
||||||
|
appId,
|
||||||
|
params: { id: webhook._id, rev: webhook._rev },
|
||||||
|
}
|
||||||
|
// might be updating - reset the inputs to remove the URLs
|
||||||
|
if (newTrigger) {
|
||||||
|
delete newTrigger.webhookId
|
||||||
|
newTrigger.inputs = {}
|
||||||
|
}
|
||||||
|
await webhooks.destroy(ctx)
|
||||||
|
} catch (err) {
|
||||||
|
// don't worry about not being able to delete, if it doesn't exist all good
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// need to create webhook
|
||||||
|
if (
|
||||||
|
(!isWebhookTrigger(oldAuto) || triggerChanged) &&
|
||||||
|
isWebhookTrigger(newAuto)
|
||||||
|
) {
|
||||||
|
const ctx = {
|
||||||
|
appId,
|
||||||
|
request: {
|
||||||
|
body: new webhooks.Webhook(
|
||||||
|
"Automation webhook",
|
||||||
|
webhooks.WebhookType.AUTOMATION,
|
||||||
|
newAuto._id
|
||||||
|
),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
await webhooks.save(ctx)
|
||||||
|
const id = ctx.body.webhook._id
|
||||||
|
newTrigger.webhookId = id
|
||||||
|
newTrigger.inputs = {
|
||||||
|
schemaUrl: `api/webhooks/schema/${appId}/${id}`,
|
||||||
|
triggerUrl: `api/webhooks/trigger/${appId}/${id}`,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return newAuto
|
||||||
|
}
|
||||||
|
|
|
@ -123,5 +123,10 @@ exports.BaseQueryVerbs = {
|
||||||
DELETE: "delete",
|
DELETE: "delete",
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.MetadataTypes = {
|
||||||
|
AUTOMATION_TEST_INPUT: "automationTestInput",
|
||||||
|
AUTOMATION_TEST_HISTORY: "automationTestHistory",
|
||||||
|
}
|
||||||
|
|
||||||
// pass through the list from the auth/core lib
|
// pass through the list from the auth/core lib
|
||||||
exports.ObjectStoreBuckets = ObjectStoreBuckets
|
exports.ObjectStoreBuckets = ObjectStoreBuckets
|
||||||
|
|
|
@ -37,6 +37,7 @@ const DocumentTypes = {
|
||||||
DATASOURCE_PLUS: "datasource_plus",
|
DATASOURCE_PLUS: "datasource_plus",
|
||||||
QUERY: "query",
|
QUERY: "query",
|
||||||
DEPLOYMENTS: "deployments",
|
DEPLOYMENTS: "deployments",
|
||||||
|
METADATA: "metadata",
|
||||||
}
|
}
|
||||||
|
|
||||||
const ViewNames = {
|
const ViewNames = {
|
||||||
|
@ -334,6 +335,18 @@ exports.getQueryParams = (datasourceId = null, otherProps = {}) => {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.generateMetadataID = (type, entityId) => {
|
||||||
|
return `${DocumentTypes.METADATA}${SEPARATOR}${type}${SEPARATOR}${entityId}`
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.getMetadataParams = (type, entityId = null, otherProps = {}) => {
|
||||||
|
let docId = `${type}${SEPARATOR}`
|
||||||
|
if (entityId != null) {
|
||||||
|
docId += entityId
|
||||||
|
}
|
||||||
|
return getDocParams(DocumentTypes.METADATA, docId, otherProps)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This can be used with the db.allDocs to get a list of IDs
|
* This can be used with the db.allDocs to get a list of IDs
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
const { isDevAppID, isProdAppID } = require("../db/utils")
|
||||||
|
|
||||||
|
exports.AppType = {
|
||||||
|
DEV: "dev",
|
||||||
|
PROD: "prod",
|
||||||
|
}
|
||||||
|
|
||||||
|
exports.middleware =
|
||||||
|
({ appType } = {}) =>
|
||||||
|
(ctx, next) => {
|
||||||
|
const appId = ctx.appId
|
||||||
|
if (appType === exports.AppType.DEV && appId && !isDevAppID(appId)) {
|
||||||
|
ctx.throw(400, "Only apps in development support this endpoint")
|
||||||
|
}
|
||||||
|
if (appType === exports.AppType.PROD && appId && !isProdAppID(appId)) {
|
||||||
|
ctx.throw(400, "Only apps in production support this endpoint")
|
||||||
|
}
|
||||||
|
return next()
|
||||||
|
}
|
|
@ -1,6 +1,8 @@
|
||||||
const env = require("../environment")
|
const env = require("../environment")
|
||||||
const { OBJ_STORE_DIRECTORY } = require("../constants")
|
const { OBJ_STORE_DIRECTORY } = require("../constants")
|
||||||
const { sanitizeKey } = require("@budibase/auth/src/objectStore")
|
const { sanitizeKey } = require("@budibase/auth/src/objectStore")
|
||||||
|
const CouchDB = require("../db")
|
||||||
|
const { generateMetadataID } = require("../db/utils")
|
||||||
|
|
||||||
const BB_CDN = "https://cdn.budi.live"
|
const BB_CDN = "https://cdn.budi.live"
|
||||||
|
|
||||||
|
@ -55,3 +57,26 @@ exports.attachmentsRelativeURL = attachmentKey => {
|
||||||
`${exports.objectStoreUrl()}/${attachmentKey}`
|
`${exports.objectStoreUrl()}/${attachmentKey}`
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.saveEntityMetadata = async (appId, type, entityId, metadata) => {
|
||||||
|
const db = new CouchDB(appId)
|
||||||
|
const id = generateMetadataID(type, entityId)
|
||||||
|
// read it to see if it exists, we'll overwrite it no matter what
|
||||||
|
let rev
|
||||||
|
try {
|
||||||
|
const oldMetadata = await db.get(id)
|
||||||
|
rev = oldMetadata._rev
|
||||||
|
} catch (err) {
|
||||||
|
rev = null
|
||||||
|
}
|
||||||
|
metadata._id = id
|
||||||
|
if (rev) {
|
||||||
|
metadata._rev = rev
|
||||||
|
}
|
||||||
|
const response = await db.put(metadata)
|
||||||
|
return {
|
||||||
|
...metadata,
|
||||||
|
_id: id,
|
||||||
|
_rev: response.rev,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue