Merge pull request #11489 from Budibase/fix/automation-disable
Automation disabling and cron improvements
This commit is contained in:
commit
e6319cce67
|
@ -8,6 +8,7 @@ import {
|
|||
DatabasePutOpts,
|
||||
DatabaseCreateIndexOpts,
|
||||
DatabaseDeleteIndexOpts,
|
||||
DocExistsResponse,
|
||||
Document,
|
||||
isDocument,
|
||||
} from "@budibase/types"
|
||||
|
@ -120,6 +121,19 @@ export class DatabaseImpl implements Database {
|
|||
return this.updateOutput(() => db.get(id))
|
||||
}
|
||||
|
||||
async docExists(docId: string): Promise<DocExistsResponse> {
|
||||
const db = await this.checkSetup()
|
||||
let _rev, exists
|
||||
try {
|
||||
const { etag } = await db.head(docId)
|
||||
_rev = etag
|
||||
exists = true
|
||||
} catch (err) {
|
||||
exists = false
|
||||
}
|
||||
return { _rev, exists }
|
||||
}
|
||||
|
||||
async remove(idOrDoc: string | Document, rev?: string) {
|
||||
const db = await this.checkSetup()
|
||||
let _id: string
|
||||
|
|
|
@ -39,9 +39,8 @@ import {
|
|||
} from "../../db/defaultData/datasource_bb_default"
|
||||
import { removeAppFromUserRoles } from "../../utilities/workerRequests"
|
||||
import { stringToReadStream } from "../../utilities"
|
||||
import { doesUserHaveLock, getLocksById } from "../../utilities/redis"
|
||||
import { doesUserHaveLock } from "../../utilities/redis"
|
||||
import { cleanupAutomations } from "../../automations/utils"
|
||||
import { checkAppMetadata } from "../../automations/logging"
|
||||
import { getUniqueRows } from "../../utilities/usageQuota/rows"
|
||||
import { groups, licensing, quotas } from "@budibase/pro"
|
||||
import {
|
||||
|
@ -51,7 +50,6 @@ import {
|
|||
PlanType,
|
||||
Screen,
|
||||
UserCtx,
|
||||
ContextUser,
|
||||
} from "@budibase/types"
|
||||
import { BASE_LAYOUT_PROP_IDS } from "../../constants/layouts"
|
||||
import sdk from "../../sdk"
|
||||
|
|
|
@ -20,7 +20,7 @@ import {
|
|||
Automation,
|
||||
AutomationActionStepId,
|
||||
AutomationResults,
|
||||
BBContext,
|
||||
Ctx,
|
||||
} from "@budibase/types"
|
||||
import { getActionDefinitions as actionDefs } from "../../automations/actions"
|
||||
import sdk from "../../sdk"
|
||||
|
@ -73,7 +73,7 @@ function cleanAutomationInputs(automation: Automation) {
|
|||
return automation
|
||||
}
|
||||
|
||||
export async function create(ctx: BBContext) {
|
||||
export async function create(ctx: Ctx) {
|
||||
const db = context.getAppDB()
|
||||
let automation = ctx.request.body
|
||||
automation.appId = ctx.appId
|
||||
|
@ -142,7 +142,7 @@ export async function handleStepEvents(
|
|||
}
|
||||
}
|
||||
|
||||
export async function update(ctx: BBContext) {
|
||||
export async function update(ctx: Ctx) {
|
||||
const db = context.getAppDB()
|
||||
let automation = ctx.request.body
|
||||
automation.appId = ctx.appId
|
||||
|
@ -193,7 +193,7 @@ export async function update(ctx: BBContext) {
|
|||
builderSocket?.emitAutomationUpdate(ctx, automation)
|
||||
}
|
||||
|
||||
export async function fetch(ctx: BBContext) {
|
||||
export async function fetch(ctx: Ctx) {
|
||||
const db = context.getAppDB()
|
||||
const response = await db.allDocs(
|
||||
getAutomationParams(null, {
|
||||
|
@ -203,12 +203,11 @@ export async function fetch(ctx: BBContext) {
|
|||
ctx.body = response.rows.map(row => row.doc)
|
||||
}
|
||||
|
||||
export async function find(ctx: BBContext) {
|
||||
const db = context.getAppDB()
|
||||
ctx.body = await db.get(ctx.params.id)
|
||||
export async function find(ctx: Ctx) {
|
||||
ctx.body = await sdk.automations.get(ctx.params.id)
|
||||
}
|
||||
|
||||
export async function destroy(ctx: BBContext) {
|
||||
export async function destroy(ctx: Ctx) {
|
||||
const db = context.getAppDB()
|
||||
const automationId = ctx.params.id
|
||||
const oldAutomation = await db.get<Automation>(automationId)
|
||||
|
@ -222,11 +221,11 @@ export async function destroy(ctx: BBContext) {
|
|||
builderSocket?.emitAutomationDeletion(ctx, automationId)
|
||||
}
|
||||
|
||||
export async function logSearch(ctx: BBContext) {
|
||||
export async function logSearch(ctx: Ctx) {
|
||||
ctx.body = await automations.logs.logSearch(ctx.request.body)
|
||||
}
|
||||
|
||||
export async function clearLogError(ctx: BBContext) {
|
||||
export async function clearLogError(ctx: Ctx) {
|
||||
const { automationId, appId } = ctx.request.body
|
||||
await context.doInAppContext(appId, async () => {
|
||||
const db = context.getProdAppDB()
|
||||
|
@ -245,15 +244,15 @@ export async function clearLogError(ctx: BBContext) {
|
|||
})
|
||||
}
|
||||
|
||||
export async function getActionList(ctx: BBContext) {
|
||||
export async function getActionList(ctx: Ctx) {
|
||||
ctx.body = await getActionDefinitions()
|
||||
}
|
||||
|
||||
export async function getTriggerList(ctx: BBContext) {
|
||||
export async function getTriggerList(ctx: Ctx) {
|
||||
ctx.body = getTriggerDefinitions()
|
||||
}
|
||||
|
||||
export async function getDefinitionList(ctx: BBContext) {
|
||||
export async function getDefinitionList(ctx: Ctx) {
|
||||
ctx.body = {
|
||||
trigger: getTriggerDefinitions(),
|
||||
action: await getActionDefinitions(),
|
||||
|
@ -266,7 +265,7 @@ export async function getDefinitionList(ctx: BBContext) {
|
|||
* *
|
||||
*********************/
|
||||
|
||||
export async function trigger(ctx: BBContext) {
|
||||
export async function trigger(ctx: Ctx) {
|
||||
const db = context.getAppDB()
|
||||
let automation = await db.get<Automation>(ctx.params.id)
|
||||
|
||||
|
@ -311,7 +310,7 @@ function prepareTestInput(input: any) {
|
|||
return input
|
||||
}
|
||||
|
||||
export async function test(ctx: BBContext) {
|
||||
export async function test(ctx: Ctx) {
|
||||
const db = context.getAppDB()
|
||||
let automation = await db.get<Automation>(ctx.params.id)
|
||||
await setTestFlag(automation._id!)
|
||||
|
|
|
@ -6,11 +6,11 @@ import { isDevAppID } from "../db/utils"
|
|||
// need this to call directly, so we can get a response
|
||||
import { automationQueue } from "./bullboard"
|
||||
import { checkTestFlag } from "../utilities/redis"
|
||||
import * as utils from "./utils"
|
||||
import env from "../environment"
|
||||
import { context, db as dbCore } from "@budibase/backend-core"
|
||||
import { Automation, Row, AutomationData, AutomationJob } from "@budibase/types"
|
||||
import { executeSynchronously } from "../threads/automation"
|
||||
import sdk from "../sdk"
|
||||
|
||||
export const TRIGGER_DEFINITIONS = definitions
|
||||
const JOB_OPTS = {
|
||||
|
@ -142,7 +142,7 @@ export async function rebootTrigger() {
|
|||
let automations = await getAllAutomations()
|
||||
let rebootEvents = []
|
||||
for (let automation of automations) {
|
||||
if (utils.isRebootTrigger(automation)) {
|
||||
if (sdk.automations.isReboot(automation)) {
|
||||
const job = {
|
||||
automation,
|
||||
event: {
|
||||
|
|
|
@ -16,13 +16,14 @@ import {
|
|||
} from "@budibase/types"
|
||||
import sdk from "../sdk"
|
||||
|
||||
const REBOOT_CRON = "@reboot"
|
||||
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
||||
const CRON_STEP_ID = definitions.CRON.stepId
|
||||
const Runner = new Thread(ThreadType.AUTOMATION)
|
||||
|
||||
function loggingArgs(job: AutomationJob) {
|
||||
return [
|
||||
function loggingArgs(
|
||||
job: AutomationJob,
|
||||
timing?: { start: number; complete?: boolean }
|
||||
) {
|
||||
const logs: any[] = [
|
||||
{
|
||||
_logKey: "automation",
|
||||
trigger: job.data.automation.definition.trigger.event,
|
||||
|
@ -32,24 +33,53 @@ function loggingArgs(job: AutomationJob) {
|
|||
jobId: job.id,
|
||||
},
|
||||
]
|
||||
if (timing?.start) {
|
||||
logs.push({
|
||||
_logKey: "startTime",
|
||||
start: timing.start,
|
||||
})
|
||||
}
|
||||
if (timing?.start && timing?.complete) {
|
||||
const end = new Date().getTime()
|
||||
const duration = end - timing.start
|
||||
logs.push({
|
||||
_logKey: "endTime",
|
||||
end,
|
||||
})
|
||||
logs.push({
|
||||
_logKey: "duration",
|
||||
duration,
|
||||
})
|
||||
}
|
||||
return logs
|
||||
}
|
||||
|
||||
export async function processEvent(job: AutomationJob) {
|
||||
const appId = job.data.event.appId!
|
||||
const automationId = job.data.automation._id!
|
||||
const start = new Date().getTime()
|
||||
const task = async () => {
|
||||
try {
|
||||
// need to actually await these so that an error can be captured properly
|
||||
console.log("automation running", ...loggingArgs(job))
|
||||
console.log("automation running", ...loggingArgs(job, { start }))
|
||||
|
||||
const runFn = () => Runner.run(job)
|
||||
const result = await quotas.addAutomation(runFn, {
|
||||
automationId,
|
||||
})
|
||||
console.log("automation completed", ...loggingArgs(job))
|
||||
const end = new Date().getTime()
|
||||
const duration = end - start
|
||||
console.log(
|
||||
"automation completed",
|
||||
...loggingArgs(job, { start, complete: true })
|
||||
)
|
||||
return result
|
||||
} catch (err) {
|
||||
console.error(`automation was unable to run`, err, ...loggingArgs(job))
|
||||
console.error(
|
||||
`automation was unable to run`,
|
||||
err,
|
||||
...loggingArgs(job, { start, complete: true })
|
||||
)
|
||||
return { err }
|
||||
}
|
||||
}
|
||||
|
@ -128,19 +158,6 @@ export async function clearMetadata() {
|
|||
await db.bulkDocs(automationMetadata)
|
||||
}
|
||||
|
||||
export function isCronTrigger(auto: Automation) {
|
||||
return (
|
||||
auto &&
|
||||
auto.definition.trigger &&
|
||||
auto.definition.trigger.stepId === CRON_STEP_ID
|
||||
)
|
||||
}
|
||||
|
||||
export function isRebootTrigger(auto: Automation) {
|
||||
const trigger = auto ? auto.definition.trigger : null
|
||||
return isCronTrigger(auto) && trigger?.inputs.cron === REBOOT_CRON
|
||||
}
|
||||
|
||||
/**
|
||||
* This function handles checking of any cron jobs that need to be enabled/updated.
|
||||
* @param {string} appId The ID of the app in which we are checking for webhooks
|
||||
|
@ -148,13 +165,13 @@ export function isRebootTrigger(auto: Automation) {
|
|||
*/
|
||||
export async function enableCronTrigger(appId: any, automation: Automation) {
|
||||
const trigger = automation ? automation.definition.trigger : null
|
||||
const validCron = sdk.automations.isCron(automation) && trigger?.inputs.cron
|
||||
const needsCreated =
|
||||
!sdk.automations.isReboot(automation) &&
|
||||
!sdk.automations.disabled(automation)
|
||||
|
||||
// need to create cron job
|
||||
if (
|
||||
isCronTrigger(automation) &&
|
||||
!isRebootTrigger(automation) &&
|
||||
trigger?.inputs.cron
|
||||
) {
|
||||
if (validCron && needsCreated) {
|
||||
// make a job id rather than letting Bull decide, makes it easier to handle on way out
|
||||
const jobId = `${appId}_cron_${newid()}`
|
||||
const job: any = await automationQueue.add(
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
import { context } from "@budibase/backend-core"
|
||||
import { Automation, AutomationState, DocumentType } from "@budibase/types"
|
||||
import { definitions } from "../../../automations/triggerInfo"
|
||||
|
||||
const REBOOT_CRON = "@reboot"
|
||||
|
||||
export async function exists(automationId: string) {
|
||||
if (!automationId?.startsWith(DocumentType.AUTOMATION)) {
|
||||
throw new Error("Invalid automation ID.")
|
||||
}
|
||||
const db = context.getAppDB()
|
||||
return db.docExists(automationId)
|
||||
}
|
||||
|
||||
export async function get(automationId: string) {
|
||||
const db = context.getAppDB()
|
||||
return (await db.get(automationId)) as Automation
|
||||
}
|
||||
|
||||
export function disabled(automation: Automation) {
|
||||
return automation.state === AutomationState.DISABLED || !hasSteps(automation)
|
||||
}
|
||||
|
||||
export function isCron(automation: Automation) {
|
||||
return (
|
||||
automation?.definition.trigger &&
|
||||
automation?.definition.trigger.stepId === definitions.CRON.stepId
|
||||
)
|
||||
}
|
||||
|
||||
export function isReboot(automation: Automation) {
|
||||
const trigger = automation?.definition.trigger
|
||||
return isCron(automation) && trigger?.inputs.cron === REBOOT_CRON
|
||||
}
|
||||
|
||||
export function hasSteps(automation: Automation) {
|
||||
return automation?.definition?.steps?.length > 0
|
||||
}
|
|
@ -1,7 +1,9 @@
|
|||
import * as webhook from "./webhook"
|
||||
import * as utils from "./utils"
|
||||
import * as automations from "./automations"
|
||||
|
||||
export default {
|
||||
webhook,
|
||||
utils,
|
||||
...automations,
|
||||
}
|
||||
|
|
|
@ -2,9 +2,9 @@ import { default as threadUtils } from "./utils"
|
|||
import { Job } from "bull"
|
||||
threadUtils.threadSetup()
|
||||
import {
|
||||
isRecurring,
|
||||
disableCronById,
|
||||
isErrorInOutput,
|
||||
isRecurring,
|
||||
} from "../automations/utils"
|
||||
import * as actions from "../automations/actions"
|
||||
import * as automationUtils from "../automations/automationUtils"
|
||||
|
@ -15,17 +15,17 @@ import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants"
|
|||
import { storeLog } from "../automations/logging"
|
||||
import {
|
||||
Automation,
|
||||
AutomationStep,
|
||||
AutomationStatus,
|
||||
AutomationMetadata,
|
||||
AutomationJob,
|
||||
AutomationData,
|
||||
AutomationJob,
|
||||
AutomationMetadata,
|
||||
AutomationStatus,
|
||||
AutomationStep,
|
||||
} from "@budibase/types"
|
||||
import {
|
||||
LoopStep,
|
||||
LoopInput,
|
||||
TriggerOutput,
|
||||
AutomationContext,
|
||||
LoopInput,
|
||||
LoopStep,
|
||||
TriggerOutput,
|
||||
} from "../definitions/automations"
|
||||
import { WorkerCallback } from "./definitions"
|
||||
import { context, logging } from "@budibase/backend-core"
|
||||
|
@ -34,6 +34,8 @@ import { cloneDeep } from "lodash/fp"
|
|||
import { performance } from "perf_hooks"
|
||||
import * as sdkUtils from "../sdk/utils"
|
||||
import env from "../environment"
|
||||
import sdk from "../sdk"
|
||||
|
||||
const FILTER_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.FILTER.stepId
|
||||
const LOOP_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.LOOP.stepId
|
||||
const CRON_STEP_ID = triggerDefs.CRON.stepId
|
||||
|
@ -514,7 +516,8 @@ class Orchestrator {
|
|||
|
||||
export function execute(job: Job<AutomationData>, callback: WorkerCallback) {
|
||||
const appId = job.data.event.appId
|
||||
const automationId = job.data.automation._id
|
||||
const automation = job.data.automation
|
||||
const automationId = automation._id
|
||||
if (!appId) {
|
||||
throw new Error("Unable to execute, event doesn't contain app ID.")
|
||||
}
|
||||
|
@ -525,10 +528,30 @@ export function execute(job: Job<AutomationData>, callback: WorkerCallback) {
|
|||
appId,
|
||||
automationId,
|
||||
task: async () => {
|
||||
let automation = job.data.automation,
|
||||
isCron = sdk.automations.isCron(job.data.automation),
|
||||
notFound = false
|
||||
try {
|
||||
automation = await sdk.automations.get(automationId)
|
||||
} catch (err: any) {
|
||||
// automation no longer exists
|
||||
notFound = err
|
||||
}
|
||||
const disabled = sdk.automations.disabled(automation)
|
||||
const stopAutomation = disabled || notFound
|
||||
const envVars = await sdkUtils.getEnvironmentVariables()
|
||||
// put into automation thread for whole context
|
||||
await context.doInEnvironmentContext(envVars, async () => {
|
||||
const automationOrchestrator = new Orchestrator(job)
|
||||
// hard stop on automations
|
||||
if (isCron && stopAutomation) {
|
||||
await automationOrchestrator.stopCron(
|
||||
disabled ? "disabled" : "not_found"
|
||||
)
|
||||
}
|
||||
if (stopAutomation) {
|
||||
return
|
||||
}
|
||||
try {
|
||||
const response = await automationOrchestrator.execute()
|
||||
callback(null, response)
|
||||
|
@ -557,11 +580,10 @@ export function executeSynchronously(job: Job) {
|
|||
// put into automation thread for whole context
|
||||
return context.doInEnvironmentContext(envVars, async () => {
|
||||
const automationOrchestrator = new Orchestrator(job)
|
||||
const response = await Promise.race([
|
||||
return await Promise.race([
|
||||
automationOrchestrator.execute(),
|
||||
timeoutPromise,
|
||||
])
|
||||
return response
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -100,6 +100,10 @@ export const AutomationStepIdArray = [
|
|||
...Object.values(AutomationTriggerStepId),
|
||||
]
|
||||
|
||||
export enum AutomationState {
|
||||
DISABLED = "disabled",
|
||||
}
|
||||
|
||||
export interface Automation extends Document {
|
||||
definition: {
|
||||
steps: AutomationStep[]
|
||||
|
@ -112,6 +116,7 @@ export interface Automation extends Document {
|
|||
name: string
|
||||
internal?: boolean
|
||||
type?: string
|
||||
state?: AutomationState
|
||||
}
|
||||
|
||||
interface BaseIOStructure {
|
||||
|
|
|
@ -40,6 +40,11 @@ export type DatabasePutOpts = {
|
|||
force?: boolean
|
||||
}
|
||||
|
||||
export type DocExistsResponse = {
|
||||
_rev?: string
|
||||
exists: boolean
|
||||
}
|
||||
|
||||
export type DatabaseCreateIndexOpts = {
|
||||
index: {
|
||||
fields: string[]
|
||||
|
@ -90,6 +95,7 @@ export interface Database {
|
|||
exists(): Promise<boolean>
|
||||
checkSetup(): Promise<Nano.DocumentScope<any>>
|
||||
get<T>(id?: string): Promise<T>
|
||||
docExists(id: string): Promise<DocExistsResponse>
|
||||
remove(
|
||||
id: string | Document,
|
||||
rev?: string
|
||||
|
|
Loading…
Reference in New Issue