budibase/packages/server/src/automations/utils.ts

303 lines
8.9 KiB
TypeScript
Raw Normal View History

import { Thread, ThreadType } from "../threads"
import { definitions } from "./triggerInfo"
import { automationQueue } from "./bullboard"
import newid from "../db/newid"
import { updateEntityMetadata } from "../utilities"
import { MetadataTypes } from "../constants"
import { db as dbCore, context } from "@budibase/backend-core"
2022-09-28 09:56:45 +02:00
import { getAutomationMetadataParams } from "../db/utils"
import { cloneDeep } from "lodash/fp"
import { quotas } from "@budibase/pro"
2023-07-18 11:41:51 +02:00
import {
Automation,
AutomationJob,
Webhook,
WebhookActionType,
} from "@budibase/types"
import sdk from "../sdk"
const WH_STEP_ID = definitions.WEBHOOK.stepId
const Runner = new Thread(ThreadType.AUTOMATION)
function loggingArgs(
job: AutomationJob,
timing?: { start: number; complete?: boolean }
) {
const logs: any[] = [
{
_logKey: "automation",
trigger: job.data.automation.definition.trigger.event,
},
{
_logKey: "bull",
jobId: job.id,
},
]
if (timing?.start) {
logs.push({
_logKey: "startTime",
start: timing.start,
})
}
if (timing?.start && timing?.complete) {
const end = new Date().getTime()
const duration = end - timing.start
logs.push({
_logKey: "endTime",
end,
})
logs.push({
_logKey: "duration",
duration,
})
}
return logs
2022-09-28 09:56:45 +02:00
}
2023-05-17 14:54:20 +02:00
export async function processEvent(job: AutomationJob) {
const appId = job.data.event.appId!
const automationId = job.data.automation._id!
const start = new Date().getTime()
const task = async () => {
2023-05-17 14:54:20 +02:00
try {
// need to actually await these so that an error can be captured properly
console.log("automation running", ...loggingArgs(job, { start }))
2023-05-17 14:54:20 +02:00
const runFn = () => Runner.run(job)
2023-05-17 14:54:20 +02:00
const result = await quotas.addAutomation(runFn, {
automationId,
})
const end = new Date().getTime()
const duration = end - start
console.log(
"automation completed",
...loggingArgs(job, { start, complete: true })
)
2023-05-17 14:54:20 +02:00
return result
} catch (err) {
console.error(
`automation was unable to run`,
err,
...loggingArgs(job, { start, complete: true })
)
2023-05-17 14:54:20 +02:00
return { err }
}
}
return await context.doInAutomationContext({ appId, automationId, task })
}
export async function updateTestHistory(
appId: any,
automation: any,
history: any
) {
return updateEntityMetadata(
MetadataTypes.AUTOMATION_TEST_HISTORY,
automation._id,
(metadata: any) => {
if (metadata && Array.isArray(metadata.history)) {
metadata.history.push(history)
} else {
metadata = {
history: [history],
}
}
return metadata
}
)
}
export function removeDeprecated(definitions: any) {
const base = cloneDeep(definitions)
for (let key of Object.keys(base)) {
if (base[key].deprecated) {
delete base[key]
}
}
return base
}
// end the repetition and the job itself
export async function disableAllCrons(appId: any) {
const promises = []
const jobs = await automationQueue.getRepeatableJobs()
for (let job of jobs) {
if (job.key.includes(`${appId}_cron`)) {
promises.push(automationQueue.removeRepeatableByKey(job.key))
if (job.id) {
promises.push(automationQueue.removeJobs(job.id))
}
}
}
return Promise.all(promises)
}
export async function disableCronById(jobId: number | string) {
const repeatJobs = await automationQueue.getRepeatableJobs()
for (let repeatJob of repeatJobs) {
if (repeatJob.id === jobId) {
await automationQueue.removeRepeatableByKey(repeatJob.key)
}
}
2022-09-28 09:56:45 +02:00
console.log(`jobId=${jobId} disabled`)
}
export async function clearMetadata() {
const db = context.getProdAppDB()
2022-09-28 09:56:45 +02:00
const automationMetadata = (
await db.allDocs(
getAutomationMetadataParams({
include_docs: true,
})
)
).rows.map((row: any) => row.doc)
for (let metadata of automationMetadata) {
metadata._deleted = true
}
await db.bulkDocs(automationMetadata)
}
/**
* This function handles checking of any cron jobs that need to be enabled/updated.
* @param {string} appId The ID of the app in which we are checking for webhooks
* @param {object|undefined} automation The automation object to be updated.
*/
2022-09-28 09:56:45 +02:00
export async function enableCronTrigger(appId: any, automation: Automation) {
const trigger = automation ? automation.definition.trigger : null
const validCron = sdk.automations.isCron(automation) && trigger?.inputs.cron
const needsCreated =
!sdk.automations.isReboot(automation) &&
!sdk.automations.disabled(automation)
2022-09-28 09:56:45 +02:00
// need to create cron job
if (validCron && needsCreated) {
// make a job id rather than letting Bull decide, makes it easier to handle on way out
const jobId = `${appId}_cron_${newid()}`
const job: any = await automationQueue.add(
{
automation,
event: { appId, timestamp: Date.now() },
},
{ repeat: { cron: trigger.inputs.cron }, jobId }
)
// Assign cron job ID from bull so we can remove it later if the cron trigger is removed
trigger.cronJobId = job.id
// can't use getAppDB here as this is likely to be called from dev app,
// but this call could be for dev app or prod app, need to just use what
// was passed in
await dbCore.doWithDB(appId, async (db: any) => {
const response = await db.put(automation)
automation._id = response.id
automation._rev = response.rev
})
}
return automation
}
/**
* This function handles checking if any webhooks need to be created or deleted for automations.
* @param {string} appId The ID of the app in which we are checking for webhooks
* @param {object|undefined} oldAuto The old automation object if updating/deleting
* @param {object|undefined} newAuto The new automation object if creating/updating
* @returns {Promise<object|undefined>} After this is complete the new automation object may have been updated and should be
* written to DB (this does not write to DB as it would be wasteful to repeat).
*/
export async function checkForWebhooks({ oldAuto, newAuto }: any) {
const appId = context.getAppId()
if (!appId) {
throw new Error("Unable to check webhooks - no app ID in context.")
}
const oldTrigger = oldAuto ? oldAuto.definition.trigger : null
const newTrigger = newAuto ? newAuto.definition.trigger : null
const triggerChanged =
oldTrigger && newTrigger && oldTrigger.id !== newTrigger.id
function isWebhookTrigger(auto: any) {
return (
auto &&
auto.definition.trigger &&
auto.definition.trigger.stepId === WH_STEP_ID
)
}
// need to delete webhook
if (
isWebhookTrigger(oldAuto) &&
(!isWebhookTrigger(newAuto) || triggerChanged) &&
oldTrigger.webhookId
) {
try {
2023-07-18 12:00:02 +02:00
const db = context.getAppDB()
// need to get the webhook to get the rev
2023-07-18 11:41:51 +02:00
const webhook = await db.get<Webhook>(oldTrigger.webhookId)
// might be updating - reset the inputs to remove the URLs
if (newTrigger) {
delete newTrigger.webhookId
newTrigger.inputs = {}
}
2023-07-18 11:41:51 +02:00
await sdk.automations.webhook.destroy(webhook._id!, webhook._rev!)
} catch (err) {
// don't worry about not being able to delete, if it doesn't exist all good
}
}
// need to create webhook
if (
(!isWebhookTrigger(oldAuto) || triggerChanged) &&
isWebhookTrigger(newAuto)
) {
const webhook = await sdk.automations.webhook.save(
sdk.automations.webhook.newDoc(
"Automation webhook",
WebhookActionType.AUTOMATION,
newAuto._id
)
)
const id = webhook._id
newTrigger.webhookId = id
// the app ID has to be development for this endpoint
// it can only be used when building the app
// but the trigger endpoint will always be used in production
const prodAppId = dbCore.getProdAppID(appId)
newTrigger.inputs = {
schemaUrl: `api/webhooks/schema/${appId}/${id}`,
triggerUrl: `api/webhooks/trigger/${prodAppId}/${id}`,
}
}
return newAuto
}
/**
* When removing an app/unpublishing it need to make sure automations are cleaned up (cron).
* @param appId {string} the app that is being removed.
* @return {Promise<void>} clean is complete if this succeeds.
*/
export async function cleanupAutomations(appId: any) {
await disableAllCrons(appId)
}
2022-09-28 09:56:45 +02:00
/**
* Checks if the supplied automation is of a recurring type.
* @param automation The automation to check.
* @return {boolean} if it is recurring (cron).
*/
export function isRecurring(automation: Automation) {
return automation.definition.trigger.stepId === definitions.CRON.stepId
}
export function isErrorInOutput(output: {
steps: { outputs?: { success: boolean } }[]
}) {
let first = true,
error = false
for (let step of output.steps) {
// skip the trigger, its always successful if automation ran
if (first) {
first = false
continue
}
if (!step.outputs?.success) {
error = true
}
}
return error
}