diff --git a/packages/backend-core/src/context/mainContext.ts b/packages/backend-core/src/context/mainContext.ts index e5f20882d3..6a00c125ad 100644 --- a/packages/backend-core/src/context/mainContext.ts +++ b/packages/backend-core/src/context/mainContext.ts @@ -266,9 +266,9 @@ export const getProdAppId = () => { return conversions.getProdAppID(appId) } -export function doInEnvironmentContext( +export function doInEnvironmentContext( values: Record, - task: any + task: () => T ) { if (!values) { throw new Error("Must supply environment variables.") diff --git a/packages/backend-core/src/utils/Duration.ts b/packages/backend-core/src/utils/Duration.ts index 730b59d1dc..f1cefa5a1f 100644 --- a/packages/backend-core/src/utils/Duration.ts +++ b/packages/backend-core/src/utils/Duration.ts @@ -15,23 +15,27 @@ const conversion: Record = { } export class Duration { + constructor(public ms: number) {} + + to(type: DurationType) { + return this.ms / conversion[type] + } + + toMs() { + return this.ms + } + + toSeconds() { + return this.to(DurationType.SECONDS) + } + static convert(from: DurationType, to: DurationType, duration: number) { const milliseconds = duration * conversion[from] return milliseconds / conversion[to] } static from(from: DurationType, duration: number) { - return { - to: (to: DurationType) => { - return Duration.convert(from, to, duration) - }, - toMs: () => { - return Duration.convert(from, DurationType.MILLISECONDS, duration) - }, - toSeconds: () => { - return Duration.convert(from, DurationType.SECONDS, duration) - }, - } + return new Duration(duration * conversion[from]) } static fromSeconds(duration: number) { diff --git a/packages/backend-core/src/utils/index.ts b/packages/backend-core/src/utils/index.ts index ac17227459..14bc4ca231 100644 --- a/packages/backend-core/src/utils/index.ts +++ b/packages/backend-core/src/utils/index.ts @@ -2,3 +2,4 @@ export * from "./hashing" export * from "./utils" export * from "./stringUtils" export * from "./Duration" +export * from "./time" diff --git a/packages/backend-core/src/utils/time.ts b/packages/backend-core/src/utils/time.ts new file mode 100644 index 0000000000..8ee40dd29f --- /dev/null +++ b/packages/backend-core/src/utils/time.ts @@ -0,0 +1,7 @@ +import { Duration } from "./Duration" + +export async function time(f: () => Promise): Promise<[T, Duration]> { + const start = performance.now() + const result = await f() + return [result, Duration.fromMilliseconds(performance.now() - start)] +} diff --git a/packages/server/src/automations/triggers.ts b/packages/server/src/automations/triggers.ts index 2ac90f3f9c..16d5246a91 100644 --- a/packages/server/src/automations/triggers.ts +++ b/packages/server/src/automations/triggers.ts @@ -182,11 +182,12 @@ export async function externalTrigger( // values are likely to be submitted as strings, so we shall convert to correct type const coercedFields: any = {} const fields = automation.definition.trigger.inputs.fields - for (let key of Object.keys(fields || {})) { + for (const key of Object.keys(fields || {})) { coercedFields[key] = coerce(params.fields[key], fields[key]) } params.fields = coercedFields } + // row actions and webhooks flatten the fields down else if ( sdk.automations.isRowAction(automation) || @@ -198,6 +199,7 @@ export async function externalTrigger( fields: {}, } } + const data: AutomationData = { automation, event: params } const shouldTrigger = await checkTriggerFilters(automation, { diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index a01a760b93..e8b3703c75 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -10,7 +10,6 @@ import { Automation, AutomationActionStepId, AutomationJob, - AutomationResults, AutomationStepDefinition, AutomationTriggerDefinition, AutomationTriggerStepId, @@ -19,6 +18,7 @@ import { import { automationsEnabled } from "../features" import { helpers, REBOOT_CRON } from "@budibase/shared-core" import tracer from "dd-trace" +import { JobId } from "bull" const CRON_STEP_ID = automations.triggers.definitions.CRON.stepId let Runner: Thread @@ -156,11 +156,11 @@ export async function disableAllCrons(appId: any) { return { count: results.length / 2 } } -export async function disableCronById(jobId: number | string) { - const repeatJobs = await automationQueue.getRepeatableJobs() - for (let repeatJob of repeatJobs) { - if (repeatJob.id === jobId) { - await automationQueue.removeRepeatableByKey(repeatJob.key) +export async function disableCronById(jobId: JobId) { + const jobs = await automationQueue.getRepeatableJobs() + for (const job of jobs) { + if (job.id === jobId) { + await automationQueue.removeRepeatableByKey(job.key) } } console.log(`jobId=${jobId} disabled`) @@ -249,31 +249,3 @@ export async function enableCronTrigger(appId: any, automation: Automation) { export async function cleanupAutomations(appId: any) { await disableAllCrons(appId) } - -/** - * Checks if the supplied automation is of a recurring type. - * @param automation The automation to check. - * @return if it is recurring (cron). - */ -export function isRecurring(automation: Automation) { - return ( - automation.definition.trigger.stepId === - automations.triggers.definitions.CRON.stepId - ) -} - -export function isErrorInOutput(output: AutomationResults) { - let first = true, - error = false - for (let step of output.steps) { - // skip the trigger, its always successful if automation ran - if (first) { - first = false - continue - } - if (!step.outputs?.success) { - error = true - } - } - return error -} diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 5e1764403b..ca98eb9856 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -1,10 +1,6 @@ import { default as threadUtils } from "./utils" import { Job } from "bull" -import { - disableCronById, - isErrorInOutput, - isRecurring, -} from "../automations/utils" +import { disableCronById } from "../automations/utils" import * as actions from "../automations/actions" import * as automationUtils from "../automations/automationUtils" import { dataFilters, helpers } from "@budibase/shared-core" @@ -25,7 +21,6 @@ import { BranchSearchFilters, BranchStep, LoopStep, - UserBindings, ContextEmitter, LoopStepType, AutomationTriggerResult, @@ -36,14 +31,13 @@ import { } from "@budibase/types" import { AutomationContext } from "../definitions/automations" import { WorkerCallback } from "./definitions" -import { context, logging, configs } from "@budibase/backend-core" +import { context, logging, configs, utils } from "@budibase/backend-core" import { findHBSBlocks, processObject, processStringSync, } from "@budibase/string-templates" import { cloneDeep } from "lodash/fp" -import { performance } from "perf_hooks" import * as sdkUtils from "../sdk/utils" import env from "../environment" import tracer from "dd-trace" @@ -53,21 +47,25 @@ threadUtils.threadSetup() const CRON_STEP_ID = automations.triggers.definitions.CRON.stepId const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } -function matchesLoopFailureCondition(loopStep: LoopStep, currentItem: any) { - if (!loopStep.inputs.failure) { +function matchesLoopFailureCondition(step: LoopStep, currentItem: any) { + const { failure } = step.inputs + if (!failure) { return false } if (isPlainObject(currentItem)) { - return Object.values(currentItem).some(e => e === loopStep.inputs.failure) + return Object.values(currentItem).some(e => e === failure) } - return currentItem === loopStep.inputs.failure + return currentItem === failure } -function getLoopIterable(loopStep: LoopStep): any[] { - const option = loopStep.inputs.option - let input: any = loopStep.inputs.binding +// Returns an array of the things to loop over for a given LoopStep. This +// function handles the various ways that a LoopStep can be configured, parsing +// the input and returning an array of items to loop over. +function getLoopIterable(step: LoopStep): any[] { + const option = step.inputs.option + let input = step.inputs.binding if (option === LoopStepType.ARRAY && typeof input === "string") { input = JSON.parse(input) @@ -84,26 +82,83 @@ function getLoopIterable(loopStep: LoopStep): any[] { return Array.isArray(input) ? input : [input] } -async function branchMatches(ctx: AutomationContext, branch: Branch) { - const toFilter: Record = {} +function getLoopMaxIterations(loopStep: LoopStep): number { + const loopMaxIterations = + typeof loopStep.inputs.iterations === "string" + ? parseInt(loopStep.inputs.iterations) + : loopStep.inputs.iterations + return Math.min( + loopMaxIterations || env.AUTOMATION_MAX_ITERATIONS, + env.AUTOMATION_MAX_ITERATIONS + ) +} - const recurseSearchFilters = ( - filters: BranchSearchFilters - ): BranchSearchFilters => { +function stepSuccess( + step: Readonly, + outputs: Readonly>, + inputs?: Readonly> +): AutomationStepResult { + return { + id: step.id, + stepId: step.stepId, + inputs: inputs || step.inputs, + outputs: { + success: true, + ...outputs, + }, + } +} + +function stepFailure( + step: Readonly, + outputs: Readonly>, + inputs?: Readonly> +): AutomationStepResult { + return { + id: step.id, + stepId: step.stepId, + inputs: inputs || step.inputs, + outputs: { + success: false, + ...outputs, + }, + } +} + +function stepStopped(step: AutomationStep): AutomationStepResult { + return { + id: step.id, + stepId: step.stepId, + inputs: step.inputs, + outputs: STOPPED_STATUS, + } +} + +async function branchMatches( + ctx: AutomationContext, + branch: Readonly +): Promise { + const toFilter: Record = {} + const preparedCtx = prepareContext(ctx) + + // Because we allow bindings on both the left and right of each condition in + // automation branches, we can't pass the BranchSearchFilters directly to + // dataFilters.runQuery as-is. We first need to walk the filter tree and + // evaluate all of the bindings. + const evaluateBindings = (fs: Readonly) => { + const filters = cloneDeep(fs) for (const filter of Object.values(filters)) { if (!filter) { continue } if (isLogicalFilter(filter)) { - filter.conditions = filter.conditions.map(condition => - recurseSearchFilters(condition) - ) + filter.conditions = filter.conditions.map(evaluateBindings) } else { for (const [field, value] of Object.entries(filter)) { - toFilter[field] = processStringSync(field, prepareContext(ctx)) + toFilter[field] = processStringSync(field, preparedCtx) if (typeof value === "string" && findHBSBlocks(value).length > 0) { - filter[field] = processStringSync(value, prepareContext(ctx)) + filter[field] = processStringSync(value, preparedCtx) } } } @@ -114,7 +169,7 @@ async function branchMatches(ctx: AutomationContext, branch: Branch) { const result = dataFilters.runQuery( [toFilter], - recurseSearchFilters(branch.condition) + evaluateBindings(branch.condition) ) return result.length > 0 } @@ -130,7 +185,7 @@ function prepareContext(context: AutomationContext) { } } -export async function enrichBaseContext(context: AutomationContext) { +async function enrichBaseContext(context: AutomationContext) { context.env = await sdkUtils.getEnvironmentVariables() try { @@ -141,181 +196,189 @@ export async function enrichBaseContext(context: AutomationContext) { company: config.company, } } catch (e) { - // if settings doc doesn't exist, make the settings blank context.settings = {} } } -/** - * The automation orchestrator is a class responsible for executing automations. - * It handles the context of the automation and makes sure each step gets the correct - * inputs and handles any outputs. - */ +// Because the trigger appears twice in an AutomationResult, once as .trigger +// and again as .steps[0], this function makes sure that the two are kept in +// sync when setting trigger output. +function setTriggerOutput(result: AutomationResults, outputs: any) { + result.trigger.outputs = { + ...result.trigger.outputs, + ...outputs, + } + result.steps[0] = result.trigger +} + class Orchestrator { - private chainCount: number - private appId: string - private automation: Automation + private readonly job: AutomationJob private emitter: ContextEmitter - private job: AutomationJob private stopped: boolean - private executionOutput: AutomationResults - private currentUser: UserBindings | undefined - constructor(job: AutomationJob) { - this.automation = job.data.automation - - const triggerOutput = job.data.event - if ( - this.automation.definition.trigger.stepId === CRON_STEP_ID && - !triggerOutput.timestamp - ) { - triggerOutput.timestamp = Date.now() - } - - this.chainCount = triggerOutput.metadata?.automationChainCount || 0 - this.appId = triggerOutput.appId as string + constructor(job: Readonly) { this.job = job - - // remove from context - delete triggerOutput.appId - delete triggerOutput.metadata + this.stopped = false // create an emitter which has the chain count for this automation run in // it, so it can block excessive chaining if required - this.emitter = new AutomationEmitter(this.chainCount + 1) - - const trigger: AutomationTriggerResult = { - id: this.automation.definition.trigger.id, - stepId: this.automation.definition.trigger.stepId, - outputs: triggerOutput, - } - - this.executionOutput = { trigger, steps: [trigger] } - - // setup the execution output - this.stopped = false - this.currentUser = triggerOutput.user + const chainCount = job.data.event.metadata?.automationChainCount || 0 + this.emitter = new AutomationEmitter(chainCount + 1) } - async getStepFunctionality(stepId: AutomationActionStepId) { - let step = await actions.getAction(stepId) - if (step == null) { - throw `Cannot find automation step by name ${stepId}` - } - return step + get automation(): Automation { + return this.job.data.automation } - async getMetadata(): Promise { - const metadataId = generateAutomationMetadataID(this.automation._id!) + get appId(): string { + return this.job.data.event.appId! + } + + private async getMetadata(): Promise { + const id = generateAutomationMetadataID(this.automation._id!) const db = context.getAppDB() - let metadata: AutomationMetadata - try { - metadata = await db.get(metadataId) - } catch (err) { - metadata = { - _id: metadataId, - errorCount: 0, - } - } - return metadata + const doc = await db.tryGet(id) + return doc || { _id: id, errorCount: 0 } } - async stopCron(reason: string) { - if (!this.job.opts.repeat) { - return + async stopCron(reason: string, opts?: { result: AutomationResults }) { + if (!this.isCron()) { + throw new Error("Not a cron automation") } - logging.logWarn( - `CRON disabled reason=${reason} - ${this.appId}/${this.automation._id}` - ) + + const msg = `CRON disabled reason=${reason} - ${this.appId}/${this.automation._id}` + logging.logWarn(msg) + await disableCronById(this.job.id) - this.executionOutput.trigger.outputs = { - ...this.executionOutput.trigger.outputs, - success: false, - status: AutomationStatus.STOPPED, + + const { result } = opts || {} + if (result) { + setTriggerOutput(result, { + success: false, + status: AutomationStatus.STOPPED, + }) + await this.logResult(result) } - this.executionOutput.steps[0] = this.executionOutput.trigger - await storeLog(this.automation, this.executionOutput) } - async checkIfShouldStop(metadata: AutomationMetadata): Promise { - if (!metadata.errorCount || !this.job.opts.repeat) { + private async logResult(result: AutomationResults) { + try { + await storeLog(this.automation, result) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) + } + } + + private async shouldStop(metadata: AutomationMetadata): Promise { + if (!metadata.errorCount || !this.isCron()) { return false } if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { - await this.stopCron("errors") return true } return false } - async execute(): Promise { + private isCron(): boolean { + return this.automation.definition.trigger.stepId === CRON_STEP_ID + } + + private isProdApp(): boolean { + return isProdAppID(this.appId) + } + + hasErrored(context: AutomationContext): boolean { + const [_trigger, ...steps] = context.steps + for (const step of steps) { + if (step.outputs?.success === false) { + return true + } + } + return false + } + + async execute(): Promise { return tracer.trace( "Orchestrator.execute", { resource: "automation" }, async span => { - span?.addTags({ - appId: this.appId, - automationId: this.automation._id, - }) + span?.addTags({ appId: this.appId, automationId: this.automation._id }) + + const job = cloneDeep(this.job) + delete job.data.event.appId + delete job.data.event.metadata + if (!job.data.event.timestamp) { + job.data.event.timestamp = Date.now() + } + + const trigger: AutomationTriggerResult = { + id: job.data.automation.definition.trigger.id, + stepId: job.data.automation.definition.trigger.stepId, + outputs: job.data.event, + } + const result: AutomationResults = { trigger, steps: [trigger] } let metadata: AutomationMetadata | undefined = undefined - // check if this is a recurring automation, - if (isProdAppID(this.appId) && isRecurring(this.automation)) { + if (this.isProdApp() && this.isCron()) { span?.addTags({ recurring: true }) metadata = await this.getMetadata() - const shouldStop = await this.checkIfShouldStop(metadata) - if (shouldStop) { + if (await this.shouldStop(metadata)) { + await this.stopCron("errors") span?.addTags({ shouldStop: true }) - return + return result } } const ctx: AutomationContext = { - trigger: this.executionOutput.trigger.outputs, - steps: [this.executionOutput.trigger.outputs], + trigger: trigger.outputs, + steps: [trigger.outputs], stepsById: {}, stepsByName: {}, - user: this.currentUser, + user: trigger.outputs.user, } await enrichBaseContext(ctx) - const start = performance.now() - - const stepOutputs = await this.executeSteps( - ctx, - this.automation.definition.steps - ) - - this.executionOutput.steps.push(...stepOutputs) - - const end = performance.now() - const executionTime = end - start - - console.info( - `Automation ID: ${this.automation._id} Execution time: ${executionTime} milliseconds`, - { - _logKey: "automation", - executionTime, - } - ) + const timeout = + this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT try { - await storeLog(this.automation, this.executionOutput) + await helpers.withTimeout(timeout, async () => { + const [stepOutputs, executionTime] = await utils.time(() => + this.executeSteps(ctx, job.data.automation.definition.steps) + ) + + result.steps.push(...stepOutputs) + + console.info( + `Automation ID: ${ + this.automation._id + } Execution time: ${executionTime.toMs()} milliseconds`, + { + _logKey: "automation", + executionTime, + } + ) + }) } catch (e: any) { - if (e.status === 413 && e.request?.data) { - // if content is too large we shouldn't log it - delete e.request.data - e.request.data = { message: "removed due to large size" } + if (e.errno === "ETIME") { + span?.addTags({ timedOut: true }) + console.warn(`Automation execution timed out after ${timeout}ms`) } - logging.logAlert("Error writing automation log", e) } + await this.logResult(result) + if ( - isProdAppID(this.appId) && - isRecurring(this.automation) && + this.isProdApp() && + this.isCron() && metadata && - isErrorInOutput(this.executionOutput) + this.hasErrored(ctx) ) { metadata.errorCount ??= 0 metadata.errorCount++ @@ -327,12 +390,12 @@ class Orchestrator { logging.logAlertWithInfo( "Failed to write automation metadata", db.name, - this.automation._id!, + job.data.automation._id!, err ) } } - return this.executionOutput + return result } ) } @@ -341,155 +404,108 @@ class Orchestrator { ctx: AutomationContext, steps: AutomationStep[] ): Promise { - return tracer.trace( - "Orchestrator.executeSteps", - { resource: "automation" }, - async span => { - let stepIndex = 0 - const timeout = - this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT - const stepOutputs: AutomationStepResult[] = [] + return tracer.trace("Orchestrator.executeSteps", async () => { + let stepIndex = 0 + const results: AutomationStepResult[] = [] - try { - await helpers.withTimeout(timeout, async () => { - while (stepIndex < steps.length) { - if (this.stopped) { - break - } + function addToContext( + step: AutomationStep, + result: AutomationStepResult + ) { + ctx.steps.push(result.outputs) + ctx.stepsById[step.id] = result.outputs + ctx.stepsByName[step.name || step.id] = result.outputs + results.push(result) + } - const step = steps[stepIndex] - if (step.stepId === AutomationActionStepId.BRANCH) { - const [result, ...childResults] = await this.executeBranchStep( - ctx, - step - ) - - stepOutputs.push(result) - stepOutputs.push(...childResults) - - stepIndex++ - } else if (step.stepId === AutomationActionStepId.LOOP) { - const stepToLoop = steps[stepIndex + 1] - const result = await this.executeLoopStep(ctx, step, stepToLoop) - - ctx.steps.push(result.outputs) - ctx.stepsById[stepToLoop.id] = result.outputs - ctx.stepsByName[stepToLoop.name || stepToLoop.id] = - result.outputs - - stepOutputs.push(result) - stepIndex += 2 - } else { - const result = await this.executeStep(ctx, step) - - ctx.steps.push(result.outputs) - ctx.stepsById[step.id] = result.outputs - ctx.stepsByName[step.name || step.id] = result.outputs - - stepOutputs.push(result) - stepIndex++ - } - } - }) - } catch (error: any) { - if (error.errno === "ETIME") { - span?.addTags({ timedOut: true }) - console.warn(`Automation execution timed out after ${timeout}ms`) - } + while (stepIndex < steps.length) { + if (this.stopped) { + break } - return stepOutputs + const step = steps[stepIndex] + switch (step.stepId) { + case AutomationActionStepId.BRANCH: { + results.push(...(await this.executeBranchStep(ctx, step))) + stepIndex++ + break + } + case AutomationActionStepId.LOOP: { + const stepToLoop = steps[stepIndex + 1] + addToContext( + stepToLoop, + await this.executeLoopStep(ctx, step, stepToLoop) + ) + // We increment by 2 here because the way loops work is that the + // step immediately following the loop step is what gets looped. + // So when we're done looping, to advance correctly we need to + // skip the step that was looped. + stepIndex += 2 + break + } + default: { + addToContext(step, await this.executeStep(ctx, step)) + stepIndex++ + break + } + } } - ) + + return results + }) } private async executeLoopStep( ctx: AutomationContext, - loopStep: LoopStep, + step: LoopStep, stepToLoop: AutomationStep ): Promise { - await processObject(loopStep.inputs, prepareContext(ctx)) - - const result = { - id: loopStep.id, - stepId: loopStep.stepId, - inputs: loopStep.inputs, - } - - const loopMaxIterations = - typeof loopStep.inputs.iterations === "string" - ? parseInt(loopStep.inputs.iterations) - : loopStep.inputs.iterations - const maxIterations = Math.min( - loopMaxIterations || env.AUTOMATION_MAX_ITERATIONS, - env.AUTOMATION_MAX_ITERATIONS - ) + await processObject(step.inputs, prepareContext(ctx)) + const maxIterations = getLoopMaxIterations(step) const items: Record[] = [] let iterations = 0 let iterable: any[] = [] try { - iterable = getLoopIterable(loopStep) + iterable = getLoopIterable(step) } catch (err) { - return { - ...result, - outputs: { - success: false, - status: AutomationStepStatus.INCORRECT_TYPE, - }, - } + return stepFailure(stepToLoop, { + status: AutomationStepStatus.INCORRECT_TYPE, + }) } for (; iterations < iterable.length; iterations++) { const currentItem = iterable[iterations] if (iterations === maxIterations) { - return { - ...result, - outputs: { - success: false, - iterations, - items, - status: AutomationStepStatus.MAX_ITERATIONS, - }, - } + return stepFailure(stepToLoop, { + status: AutomationStepStatus.MAX_ITERATIONS, + iterations, + }) } - if (matchesLoopFailureCondition(loopStep, currentItem)) { - return { - ...result, - outputs: { - success: false, - iterations, - items, - status: AutomationStepStatus.FAILURE_CONDITION, - }, - } + if (matchesLoopFailureCondition(step, currentItem)) { + return stepFailure(stepToLoop, { + status: AutomationStepStatus.FAILURE_CONDITION, + }) } ctx.loop = { currentItem } - const loopedStepResult = await this.executeStep(ctx, stepToLoop) + const result = await this.executeStep(ctx, stepToLoop) + items.push(result.outputs) ctx.loop = undefined - items.push(loopedStepResult.outputs) } - return { - ...result, - outputs: { - success: true, - status: - iterations === 0 ? AutomationStepStatus.NO_ITERATIONS : undefined, - iterations, - items, - }, - } + const status = + iterations === 0 ? AutomationStatus.NO_CONDITION_MET : undefined + return stepSuccess(stepToLoop, { status, iterations, items }) } private async executeBranchStep( ctx: AutomationContext, - branchStep: BranchStep + step: BranchStep ): Promise { - const { branches, children } = branchStep.inputs + const { branches, children } = step.inputs for (const branch of branches) { if (await branchMatches(ctx, branch)) { @@ -497,11 +513,11 @@ class Orchestrator { return [ { - id: branchStep.id, - stepId: branchStep.stepId, - inputs: branchStep.inputs, - success: true, + id: step.id, + stepId: step.stepId, + inputs: step.inputs, outputs: { + success: true, branchName: branch.name, status: `${branch.name} branch taken`, branchId: `${branch.id}`, @@ -515,75 +531,64 @@ class Orchestrator { this.stopped = true return [ { - id: branchStep.id, - stepId: branchStep.stepId, - inputs: branchStep.inputs, - success: false, - outputs: { status: AutomationStatus.NO_CONDITION_MET }, + id: step.id, + stepId: step.stepId, + inputs: step.inputs, + outputs: { success: false, status: AutomationStatus.NO_CONDITION_MET }, }, ] } private async executeStep( ctx: AutomationContext, - step: AutomationStep + step: Readonly ): Promise { - return tracer.trace( - "Orchestrator.execute.step", - { resource: "automation" }, - async span => { - span?.addTags({ - resource: "automation", - step: { - stepId: step.stepId, - id: step.id, - name: step.name, - type: step.type, - title: step.stepTitle, - internal: step.internal, - deprecated: step.deprecated, - }, - }) - - if (this.stopped) { - return { - id: step.id, - stepId: step.stepId, - inputs: step.inputs, - outputs: STOPPED_STATUS, - } - } - - const stepFn = await this.getStepFunctionality(step.stepId) - const inputs = automationUtils.cleanInputValues( - await processObject(cloneDeep(step.inputs), prepareContext(ctx)), - step.schema.inputs.properties - ) - - const outputs = await stepFn({ - inputs, - appId: this.appId, - emitter: this.emitter, - context: prepareContext(ctx), - }) - - if ( - step.stepId === AutomationActionStepId.FILTER && - "result" in outputs && - outputs.result === false - ) { - this.stopped = true - ;(outputs as any).status = AutomationStatus.STOPPED - } - - return { - id: step.id, + return tracer.trace("Orchestrator.executeStep", async span => { + span.addTags({ + step: { stepId: step.stepId, - inputs, - outputs, - } + id: step.id, + name: step.name, + type: step.type, + title: step.stepTitle, + internal: step.internal, + deprecated: step.deprecated, + }, + }) + + if (this.stopped) { + span.addTags({ stopped: true }) + return stepStopped(step) } - ) + + const fn = await actions.getAction(step.stepId) + if (fn == null) { + throw new Error(`Cannot find automation step by name ${step.stepId}`) + } + + const inputs = automationUtils.cleanInputValues( + await processObject(cloneDeep(step.inputs), prepareContext(ctx)), + step.schema.inputs.properties + ) + + const outputs = await fn({ + inputs, + appId: this.appId, + emitter: this.emitter, + context: prepareContext(ctx), + }) + + if ( + step.stepId === AutomationActionStepId.FILTER && + "result" in outputs && + outputs.result === false + ) { + this.stopped = true + ;(outputs as any).status = AutomationStatus.STOPPED + } + + return stepSuccess(step, outputs, inputs) + }) } } @@ -604,10 +609,9 @@ export function execute(job: Job, callback: WorkerCallback) { task: async () => { const envVars = await sdkUtils.getEnvironmentVariables() await context.doInEnvironmentContext(envVars, async () => { - const automationOrchestrator = new Orchestrator(job) + const orchestrator = new Orchestrator(job) try { - const response = await automationOrchestrator.execute() - callback(null, response) + callback(null, await orchestrator.execute()) } catch (err) { callback(err) } @@ -624,24 +628,14 @@ export async function executeInThread( throw new Error("Unable to execute, event doesn't contain app ID.") } - const timeoutPromise = new Promise((_resolve, reject) => { - setTimeout(() => { - reject(new Error("Timeout exceeded")) - }, job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT) - }) - - return (await context.doInAppContext(appId, async () => { + return await context.doInAppContext(appId, async () => { await context.ensureSnippetContext() const envVars = await sdkUtils.getEnvironmentVariables() - // put into automation thread for whole context return await context.doInEnvironmentContext(envVars, async () => { - const automationOrchestrator = new Orchestrator(job) - return await Promise.race([ - automationOrchestrator.execute(), - timeoutPromise, - ]) + const orchestrator = new Orchestrator(job) + return orchestrator.execute() }) - })) as AutomationResults + }) } export const removeStalled = async (job: Job) => { @@ -650,7 +644,7 @@ export const removeStalled = async (job: Job) => { throw new Error("Unable to execute, event doesn't contain app ID.") } await context.doInAppContext(appId, async () => { - const automationOrchestrator = new Orchestrator(job) - await automationOrchestrator.stopCron("stalled") + const orchestrator = new Orchestrator(job) + await orchestrator.stopCron("stalled") }) } diff --git a/packages/types/src/documents/app/automation/automation.ts b/packages/types/src/documents/app/automation/automation.ts index ea21b5e23d..590e3e135f 100644 --- a/packages/types/src/documents/app/automation/automation.ts +++ b/packages/types/src/documents/app/automation/automation.ts @@ -192,13 +192,20 @@ export enum AutomationStoppedReason { TRIGGER_FILTER_NOT_MET = "Automation did not run. Filter conditions in trigger were not met.", } +export interface AutomationStepResultOutputs { + success: boolean + [key: string]: any +} + +export interface AutomationStepResultInputs { + [key: string]: any +} + export interface AutomationStepResult { id: string stepId: AutomationActionStepId - inputs: Record - outputs: Record - success?: boolean - message?: string + inputs: AutomationStepResultInputs + outputs: AutomationStepResultOutputs } export interface AutomationTriggerResult {