More refactoring.

This commit is contained in:
Sam Rose 2025-02-13 17:44:48 +00:00
parent 4e97f72a43
commit 2eb3a9fcd8
No known key found for this signature in database
8 changed files with 380 additions and 393 deletions

View File

@ -266,9 +266,9 @@ export const getProdAppId = () => {
return conversions.getProdAppID(appId) return conversions.getProdAppID(appId)
} }
export function doInEnvironmentContext( export function doInEnvironmentContext<T>(
values: Record<string, string>, values: Record<string, string>,
task: any task: () => T
) { ) {
if (!values) { if (!values) {
throw new Error("Must supply environment variables.") throw new Error("Must supply environment variables.")

View File

@ -15,23 +15,27 @@ const conversion: Record<DurationType, number> = {
} }
export class Duration { export class Duration {
constructor(public ms: number) {}
to(type: DurationType) {
return this.ms / conversion[type]
}
toMs() {
return this.ms
}
toSeconds() {
return this.to(DurationType.SECONDS)
}
static convert(from: DurationType, to: DurationType, duration: number) { static convert(from: DurationType, to: DurationType, duration: number) {
const milliseconds = duration * conversion[from] const milliseconds = duration * conversion[from]
return milliseconds / conversion[to] return milliseconds / conversion[to]
} }
static from(from: DurationType, duration: number) { static from(from: DurationType, duration: number) {
return { return new Duration(duration * conversion[from])
to: (to: DurationType) => {
return Duration.convert(from, to, duration)
},
toMs: () => {
return Duration.convert(from, DurationType.MILLISECONDS, duration)
},
toSeconds: () => {
return Duration.convert(from, DurationType.SECONDS, duration)
},
}
} }
static fromSeconds(duration: number) { static fromSeconds(duration: number) {

View File

@ -2,3 +2,4 @@ export * from "./hashing"
export * from "./utils" export * from "./utils"
export * from "./stringUtils" export * from "./stringUtils"
export * from "./Duration" export * from "./Duration"
export * from "./time"

View File

@ -0,0 +1,7 @@
import { Duration } from "./Duration"
export async function time<T>(f: () => Promise<T>): Promise<[T, Duration]> {
const start = performance.now()
const result = await f()
return [result, Duration.fromMilliseconds(performance.now() - start)]
}

View File

@ -182,11 +182,12 @@ export async function externalTrigger(
// values are likely to be submitted as strings, so we shall convert to correct type // values are likely to be submitted as strings, so we shall convert to correct type
const coercedFields: any = {} const coercedFields: any = {}
const fields = automation.definition.trigger.inputs.fields const fields = automation.definition.trigger.inputs.fields
for (let key of Object.keys(fields || {})) { for (const key of Object.keys(fields || {})) {
coercedFields[key] = coerce(params.fields[key], fields[key]) coercedFields[key] = coerce(params.fields[key], fields[key])
} }
params.fields = coercedFields params.fields = coercedFields
} }
// row actions and webhooks flatten the fields down // row actions and webhooks flatten the fields down
else if ( else if (
sdk.automations.isRowAction(automation) || sdk.automations.isRowAction(automation) ||
@ -198,6 +199,7 @@ export async function externalTrigger(
fields: {}, fields: {},
} }
} }
const data: AutomationData = { automation, event: params } const data: AutomationData = { automation, event: params }
const shouldTrigger = await checkTriggerFilters(automation, { const shouldTrigger = await checkTriggerFilters(automation, {

View File

@ -10,7 +10,6 @@ import {
Automation, Automation,
AutomationActionStepId, AutomationActionStepId,
AutomationJob, AutomationJob,
AutomationResults,
AutomationStepDefinition, AutomationStepDefinition,
AutomationTriggerDefinition, AutomationTriggerDefinition,
AutomationTriggerStepId, AutomationTriggerStepId,
@ -19,6 +18,7 @@ import {
import { automationsEnabled } from "../features" import { automationsEnabled } from "../features"
import { helpers, REBOOT_CRON } from "@budibase/shared-core" import { helpers, REBOOT_CRON } from "@budibase/shared-core"
import tracer from "dd-trace" import tracer from "dd-trace"
import { JobId } from "bull"
const CRON_STEP_ID = automations.triggers.definitions.CRON.stepId const CRON_STEP_ID = automations.triggers.definitions.CRON.stepId
let Runner: Thread let Runner: Thread
@ -156,11 +156,11 @@ export async function disableAllCrons(appId: any) {
return { count: results.length / 2 } return { count: results.length / 2 }
} }
export async function disableCronById(jobId: number | string) { export async function disableCronById(jobId: JobId) {
const repeatJobs = await automationQueue.getRepeatableJobs() const jobs = await automationQueue.getRepeatableJobs()
for (let repeatJob of repeatJobs) { for (const job of jobs) {
if (repeatJob.id === jobId) { if (job.id === jobId) {
await automationQueue.removeRepeatableByKey(repeatJob.key) await automationQueue.removeRepeatableByKey(job.key)
} }
} }
console.log(`jobId=${jobId} disabled`) console.log(`jobId=${jobId} disabled`)
@ -249,31 +249,3 @@ export async function enableCronTrigger(appId: any, automation: Automation) {
export async function cleanupAutomations(appId: any) { export async function cleanupAutomations(appId: any) {
await disableAllCrons(appId) await disableAllCrons(appId)
} }
/**
* Checks if the supplied automation is of a recurring type.
* @param automation The automation to check.
* @return if it is recurring (cron).
*/
export function isRecurring(automation: Automation) {
return (
automation.definition.trigger.stepId ===
automations.triggers.definitions.CRON.stepId
)
}
export function isErrorInOutput(output: AutomationResults) {
let first = true,
error = false
for (let step of output.steps) {
// skip the trigger, its always successful if automation ran
if (first) {
first = false
continue
}
if (!step.outputs?.success) {
error = true
}
}
return error
}

View File

@ -1,10 +1,6 @@
import { default as threadUtils } from "./utils" import { default as threadUtils } from "./utils"
import { Job } from "bull" import { Job } from "bull"
import { import { disableCronById } from "../automations/utils"
disableCronById,
isErrorInOutput,
isRecurring,
} from "../automations/utils"
import * as actions from "../automations/actions" import * as actions from "../automations/actions"
import * as automationUtils from "../automations/automationUtils" import * as automationUtils from "../automations/automationUtils"
import { dataFilters, helpers } from "@budibase/shared-core" import { dataFilters, helpers } from "@budibase/shared-core"
@ -25,7 +21,6 @@ import {
BranchSearchFilters, BranchSearchFilters,
BranchStep, BranchStep,
LoopStep, LoopStep,
UserBindings,
ContextEmitter, ContextEmitter,
LoopStepType, LoopStepType,
AutomationTriggerResult, AutomationTriggerResult,
@ -36,14 +31,13 @@ import {
} from "@budibase/types" } from "@budibase/types"
import { AutomationContext } from "../definitions/automations" import { AutomationContext } from "../definitions/automations"
import { WorkerCallback } from "./definitions" import { WorkerCallback } from "./definitions"
import { context, logging, configs } from "@budibase/backend-core" import { context, logging, configs, utils } from "@budibase/backend-core"
import { import {
findHBSBlocks, findHBSBlocks,
processObject, processObject,
processStringSync, processStringSync,
} from "@budibase/string-templates" } from "@budibase/string-templates"
import { cloneDeep } from "lodash/fp" import { cloneDeep } from "lodash/fp"
import { performance } from "perf_hooks"
import * as sdkUtils from "../sdk/utils" import * as sdkUtils from "../sdk/utils"
import env from "../environment" import env from "../environment"
import tracer from "dd-trace" import tracer from "dd-trace"
@ -53,21 +47,25 @@ threadUtils.threadSetup()
const CRON_STEP_ID = automations.triggers.definitions.CRON.stepId const CRON_STEP_ID = automations.triggers.definitions.CRON.stepId
const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED }
function matchesLoopFailureCondition(loopStep: LoopStep, currentItem: any) { function matchesLoopFailureCondition(step: LoopStep, currentItem: any) {
if (!loopStep.inputs.failure) { const { failure } = step.inputs
if (!failure) {
return false return false
} }
if (isPlainObject(currentItem)) { if (isPlainObject(currentItem)) {
return Object.values(currentItem).some(e => e === loopStep.inputs.failure) return Object.values(currentItem).some(e => e === failure)
} }
return currentItem === loopStep.inputs.failure return currentItem === failure
} }
function getLoopIterable(loopStep: LoopStep): any[] { // Returns an array of the things to loop over for a given LoopStep. This
const option = loopStep.inputs.option // function handles the various ways that a LoopStep can be configured, parsing
let input: any = loopStep.inputs.binding // the input and returning an array of items to loop over.
function getLoopIterable(step: LoopStep): any[] {
const option = step.inputs.option
let input = step.inputs.binding
if (option === LoopStepType.ARRAY && typeof input === "string") { if (option === LoopStepType.ARRAY && typeof input === "string") {
input = JSON.parse(input) input = JSON.parse(input)
@ -84,26 +82,83 @@ function getLoopIterable(loopStep: LoopStep): any[] {
return Array.isArray(input) ? input : [input] return Array.isArray(input) ? input : [input]
} }
async function branchMatches(ctx: AutomationContext, branch: Branch) { function getLoopMaxIterations(loopStep: LoopStep): number {
const toFilter: Record<string, any> = {} const loopMaxIterations =
typeof loopStep.inputs.iterations === "string"
? parseInt(loopStep.inputs.iterations)
: loopStep.inputs.iterations
return Math.min(
loopMaxIterations || env.AUTOMATION_MAX_ITERATIONS,
env.AUTOMATION_MAX_ITERATIONS
)
}
const recurseSearchFilters = ( function stepSuccess(
filters: BranchSearchFilters step: Readonly<AutomationStep>,
): BranchSearchFilters => { outputs: Readonly<Record<string, any>>,
inputs?: Readonly<Record<string, any>>
): AutomationStepResult {
return {
id: step.id,
stepId: step.stepId,
inputs: inputs || step.inputs,
outputs: {
success: true,
...outputs,
},
}
}
function stepFailure(
step: Readonly<AutomationStep>,
outputs: Readonly<Record<string, any>>,
inputs?: Readonly<Record<string, any>>
): AutomationStepResult {
return {
id: step.id,
stepId: step.stepId,
inputs: inputs || step.inputs,
outputs: {
success: false,
...outputs,
},
}
}
function stepStopped(step: AutomationStep): AutomationStepResult {
return {
id: step.id,
stepId: step.stepId,
inputs: step.inputs,
outputs: STOPPED_STATUS,
}
}
async function branchMatches(
ctx: AutomationContext,
branch: Readonly<Branch>
): Promise<boolean> {
const toFilter: Record<string, any> = {}
const preparedCtx = prepareContext(ctx)
// Because we allow bindings on both the left and right of each condition in
// automation branches, we can't pass the BranchSearchFilters directly to
// dataFilters.runQuery as-is. We first need to walk the filter tree and
// evaluate all of the bindings.
const evaluateBindings = (fs: Readonly<BranchSearchFilters>) => {
const filters = cloneDeep(fs)
for (const filter of Object.values(filters)) { for (const filter of Object.values(filters)) {
if (!filter) { if (!filter) {
continue continue
} }
if (isLogicalFilter(filter)) { if (isLogicalFilter(filter)) {
filter.conditions = filter.conditions.map(condition => filter.conditions = filter.conditions.map(evaluateBindings)
recurseSearchFilters(condition)
)
} else { } else {
for (const [field, value] of Object.entries(filter)) { for (const [field, value] of Object.entries(filter)) {
toFilter[field] = processStringSync(field, prepareContext(ctx)) toFilter[field] = processStringSync(field, preparedCtx)
if (typeof value === "string" && findHBSBlocks(value).length > 0) { if (typeof value === "string" && findHBSBlocks(value).length > 0) {
filter[field] = processStringSync(value, prepareContext(ctx)) filter[field] = processStringSync(value, preparedCtx)
} }
} }
} }
@ -114,7 +169,7 @@ async function branchMatches(ctx: AutomationContext, branch: Branch) {
const result = dataFilters.runQuery( const result = dataFilters.runQuery(
[toFilter], [toFilter],
recurseSearchFilters(branch.condition) evaluateBindings(branch.condition)
) )
return result.length > 0 return result.length > 0
} }
@ -130,7 +185,7 @@ function prepareContext(context: AutomationContext) {
} }
} }
export async function enrichBaseContext(context: AutomationContext) { async function enrichBaseContext(context: AutomationContext) {
context.env = await sdkUtils.getEnvironmentVariables() context.env = await sdkUtils.getEnvironmentVariables()
try { try {
@ -141,181 +196,189 @@ export async function enrichBaseContext(context: AutomationContext) {
company: config.company, company: config.company,
} }
} catch (e) { } catch (e) {
// if settings doc doesn't exist, make the settings blank
context.settings = {} context.settings = {}
} }
} }
/** // Because the trigger appears twice in an AutomationResult, once as .trigger
* The automation orchestrator is a class responsible for executing automations. // and again as .steps[0], this function makes sure that the two are kept in
* It handles the context of the automation and makes sure each step gets the correct // sync when setting trigger output.
* inputs and handles any outputs. function setTriggerOutput(result: AutomationResults, outputs: any) {
*/ result.trigger.outputs = {
...result.trigger.outputs,
...outputs,
}
result.steps[0] = result.trigger
}
class Orchestrator { class Orchestrator {
private chainCount: number private readonly job: AutomationJob
private appId: string
private automation: Automation
private emitter: ContextEmitter private emitter: ContextEmitter
private job: AutomationJob
private stopped: boolean private stopped: boolean
private executionOutput: AutomationResults
private currentUser: UserBindings | undefined
constructor(job: AutomationJob) { constructor(job: Readonly<AutomationJob>) {
this.automation = job.data.automation
const triggerOutput = job.data.event
if (
this.automation.definition.trigger.stepId === CRON_STEP_ID &&
!triggerOutput.timestamp
) {
triggerOutput.timestamp = Date.now()
}
this.chainCount = triggerOutput.metadata?.automationChainCount || 0
this.appId = triggerOutput.appId as string
this.job = job this.job = job
this.stopped = false
// remove from context
delete triggerOutput.appId
delete triggerOutput.metadata
// create an emitter which has the chain count for this automation run in // create an emitter which has the chain count for this automation run in
// it, so it can block excessive chaining if required // it, so it can block excessive chaining if required
this.emitter = new AutomationEmitter(this.chainCount + 1) const chainCount = job.data.event.metadata?.automationChainCount || 0
this.emitter = new AutomationEmitter(chainCount + 1)
const trigger: AutomationTriggerResult = {
id: this.automation.definition.trigger.id,
stepId: this.automation.definition.trigger.stepId,
outputs: triggerOutput,
}
this.executionOutput = { trigger, steps: [trigger] }
// setup the execution output
this.stopped = false
this.currentUser = triggerOutput.user
} }
async getStepFunctionality(stepId: AutomationActionStepId) { get automation(): Automation {
let step = await actions.getAction(stepId) return this.job.data.automation
if (step == null) {
throw `Cannot find automation step by name ${stepId}`
}
return step
} }
async getMetadata(): Promise<AutomationMetadata> { get appId(): string {
const metadataId = generateAutomationMetadataID(this.automation._id!) return this.job.data.event.appId!
}
private async getMetadata(): Promise<AutomationMetadata> {
const id = generateAutomationMetadataID(this.automation._id!)
const db = context.getAppDB() const db = context.getAppDB()
let metadata: AutomationMetadata const doc = await db.tryGet<AutomationMetadata>(id)
try { return doc || { _id: id, errorCount: 0 }
metadata = await db.get(metadataId)
} catch (err) {
metadata = {
_id: metadataId,
errorCount: 0,
}
}
return metadata
} }
async stopCron(reason: string) { async stopCron(reason: string, opts?: { result: AutomationResults }) {
if (!this.job.opts.repeat) { if (!this.isCron()) {
return throw new Error("Not a cron automation")
} }
logging.logWarn(
`CRON disabled reason=${reason} - ${this.appId}/${this.automation._id}` const msg = `CRON disabled reason=${reason} - ${this.appId}/${this.automation._id}`
) logging.logWarn(msg)
await disableCronById(this.job.id) await disableCronById(this.job.id)
this.executionOutput.trigger.outputs = {
...this.executionOutput.trigger.outputs, const { result } = opts || {}
success: false, if (result) {
status: AutomationStatus.STOPPED, setTriggerOutput(result, {
success: false,
status: AutomationStatus.STOPPED,
})
await this.logResult(result)
} }
this.executionOutput.steps[0] = this.executionOutput.trigger
await storeLog(this.automation, this.executionOutput)
} }
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> { private async logResult(result: AutomationResults) {
if (!metadata.errorCount || !this.job.opts.repeat) { try {
await storeLog(this.automation, result)
} catch (e: any) {
if (e.status === 413 && e.request?.data) {
// if content is too large we shouldn't log it
delete e.request.data
e.request.data = { message: "removed due to large size" }
}
logging.logAlert("Error writing automation log", e)
}
}
private async shouldStop(metadata: AutomationMetadata): Promise<boolean> {
if (!metadata.errorCount || !this.isCron()) {
return false return false
} }
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
await this.stopCron("errors")
return true return true
} }
return false return false
} }
async execute(): Promise<AutomationResults | undefined> { private isCron(): boolean {
return this.automation.definition.trigger.stepId === CRON_STEP_ID
}
private isProdApp(): boolean {
return isProdAppID(this.appId)
}
hasErrored(context: AutomationContext): boolean {
const [_trigger, ...steps] = context.steps
for (const step of steps) {
if (step.outputs?.success === false) {
return true
}
}
return false
}
async execute(): Promise<AutomationResults> {
return tracer.trace( return tracer.trace(
"Orchestrator.execute", "Orchestrator.execute",
{ resource: "automation" }, { resource: "automation" },
async span => { async span => {
span?.addTags({ span?.addTags({ appId: this.appId, automationId: this.automation._id })
appId: this.appId,
automationId: this.automation._id, const job = cloneDeep(this.job)
}) delete job.data.event.appId
delete job.data.event.metadata
if (!job.data.event.timestamp) {
job.data.event.timestamp = Date.now()
}
const trigger: AutomationTriggerResult = {
id: job.data.automation.definition.trigger.id,
stepId: job.data.automation.definition.trigger.stepId,
outputs: job.data.event,
}
const result: AutomationResults = { trigger, steps: [trigger] }
let metadata: AutomationMetadata | undefined = undefined let metadata: AutomationMetadata | undefined = undefined
// check if this is a recurring automation, if (this.isProdApp() && this.isCron()) {
if (isProdAppID(this.appId) && isRecurring(this.automation)) {
span?.addTags({ recurring: true }) span?.addTags({ recurring: true })
metadata = await this.getMetadata() metadata = await this.getMetadata()
const shouldStop = await this.checkIfShouldStop(metadata) if (await this.shouldStop(metadata)) {
if (shouldStop) { await this.stopCron("errors")
span?.addTags({ shouldStop: true }) span?.addTags({ shouldStop: true })
return return result
} }
} }
const ctx: AutomationContext = { const ctx: AutomationContext = {
trigger: this.executionOutput.trigger.outputs, trigger: trigger.outputs,
steps: [this.executionOutput.trigger.outputs], steps: [trigger.outputs],
stepsById: {}, stepsById: {},
stepsByName: {}, stepsByName: {},
user: this.currentUser, user: trigger.outputs.user,
} }
await enrichBaseContext(ctx) await enrichBaseContext(ctx)
const start = performance.now() const timeout =
this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT
const stepOutputs = await this.executeSteps(
ctx,
this.automation.definition.steps
)
this.executionOutput.steps.push(...stepOutputs)
const end = performance.now()
const executionTime = end - start
console.info(
`Automation ID: ${this.automation._id} Execution time: ${executionTime} milliseconds`,
{
_logKey: "automation",
executionTime,
}
)
try { try {
await storeLog(this.automation, this.executionOutput) await helpers.withTimeout(timeout, async () => {
const [stepOutputs, executionTime] = await utils.time(() =>
this.executeSteps(ctx, job.data.automation.definition.steps)
)
result.steps.push(...stepOutputs)
console.info(
`Automation ID: ${
this.automation._id
} Execution time: ${executionTime.toMs()} milliseconds`,
{
_logKey: "automation",
executionTime,
}
)
})
} catch (e: any) { } catch (e: any) {
if (e.status === 413 && e.request?.data) { if (e.errno === "ETIME") {
// if content is too large we shouldn't log it span?.addTags({ timedOut: true })
delete e.request.data console.warn(`Automation execution timed out after ${timeout}ms`)
e.request.data = { message: "removed due to large size" }
} }
logging.logAlert("Error writing automation log", e)
} }
await this.logResult(result)
if ( if (
isProdAppID(this.appId) && this.isProdApp() &&
isRecurring(this.automation) && this.isCron() &&
metadata && metadata &&
isErrorInOutput(this.executionOutput) this.hasErrored(ctx)
) { ) {
metadata.errorCount ??= 0 metadata.errorCount ??= 0
metadata.errorCount++ metadata.errorCount++
@ -327,12 +390,12 @@ class Orchestrator {
logging.logAlertWithInfo( logging.logAlertWithInfo(
"Failed to write automation metadata", "Failed to write automation metadata",
db.name, db.name,
this.automation._id!, job.data.automation._id!,
err err
) )
} }
} }
return this.executionOutput return result
} }
) )
} }
@ -341,155 +404,108 @@ class Orchestrator {
ctx: AutomationContext, ctx: AutomationContext,
steps: AutomationStep[] steps: AutomationStep[]
): Promise<AutomationStepResult[]> { ): Promise<AutomationStepResult[]> {
return tracer.trace( return tracer.trace("Orchestrator.executeSteps", async () => {
"Orchestrator.executeSteps", let stepIndex = 0
{ resource: "automation" }, const results: AutomationStepResult[] = []
async span => {
let stepIndex = 0
const timeout =
this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT
const stepOutputs: AutomationStepResult[] = []
try { function addToContext(
await helpers.withTimeout(timeout, async () => { step: AutomationStep,
while (stepIndex < steps.length) { result: AutomationStepResult
if (this.stopped) { ) {
break ctx.steps.push(result.outputs)
} ctx.stepsById[step.id] = result.outputs
ctx.stepsByName[step.name || step.id] = result.outputs
results.push(result)
}
const step = steps[stepIndex] while (stepIndex < steps.length) {
if (step.stepId === AutomationActionStepId.BRANCH) { if (this.stopped) {
const [result, ...childResults] = await this.executeBranchStep( break
ctx,
step
)
stepOutputs.push(result)
stepOutputs.push(...childResults)
stepIndex++
} else if (step.stepId === AutomationActionStepId.LOOP) {
const stepToLoop = steps[stepIndex + 1]
const result = await this.executeLoopStep(ctx, step, stepToLoop)
ctx.steps.push(result.outputs)
ctx.stepsById[stepToLoop.id] = result.outputs
ctx.stepsByName[stepToLoop.name || stepToLoop.id] =
result.outputs
stepOutputs.push(result)
stepIndex += 2
} else {
const result = await this.executeStep(ctx, step)
ctx.steps.push(result.outputs)
ctx.stepsById[step.id] = result.outputs
ctx.stepsByName[step.name || step.id] = result.outputs
stepOutputs.push(result)
stepIndex++
}
}
})
} catch (error: any) {
if (error.errno === "ETIME") {
span?.addTags({ timedOut: true })
console.warn(`Automation execution timed out after ${timeout}ms`)
}
} }
return stepOutputs const step = steps[stepIndex]
switch (step.stepId) {
case AutomationActionStepId.BRANCH: {
results.push(...(await this.executeBranchStep(ctx, step)))
stepIndex++
break
}
case AutomationActionStepId.LOOP: {
const stepToLoop = steps[stepIndex + 1]
addToContext(
stepToLoop,
await this.executeLoopStep(ctx, step, stepToLoop)
)
// We increment by 2 here because the way loops work is that the
// step immediately following the loop step is what gets looped.
// So when we're done looping, to advance correctly we need to
// skip the step that was looped.
stepIndex += 2
break
}
default: {
addToContext(step, await this.executeStep(ctx, step))
stepIndex++
break
}
}
} }
)
return results
})
} }
private async executeLoopStep( private async executeLoopStep(
ctx: AutomationContext, ctx: AutomationContext,
loopStep: LoopStep, step: LoopStep,
stepToLoop: AutomationStep stepToLoop: AutomationStep
): Promise<AutomationStepResult> { ): Promise<AutomationStepResult> {
await processObject(loopStep.inputs, prepareContext(ctx)) await processObject(step.inputs, prepareContext(ctx))
const result = {
id: loopStep.id,
stepId: loopStep.stepId,
inputs: loopStep.inputs,
}
const loopMaxIterations =
typeof loopStep.inputs.iterations === "string"
? parseInt(loopStep.inputs.iterations)
: loopStep.inputs.iterations
const maxIterations = Math.min(
loopMaxIterations || env.AUTOMATION_MAX_ITERATIONS,
env.AUTOMATION_MAX_ITERATIONS
)
const maxIterations = getLoopMaxIterations(step)
const items: Record<string, any>[] = [] const items: Record<string, any>[] = []
let iterations = 0 let iterations = 0
let iterable: any[] = [] let iterable: any[] = []
try { try {
iterable = getLoopIterable(loopStep) iterable = getLoopIterable(step)
} catch (err) { } catch (err) {
return { return stepFailure(stepToLoop, {
...result, status: AutomationStepStatus.INCORRECT_TYPE,
outputs: { })
success: false,
status: AutomationStepStatus.INCORRECT_TYPE,
},
}
} }
for (; iterations < iterable.length; iterations++) { for (; iterations < iterable.length; iterations++) {
const currentItem = iterable[iterations] const currentItem = iterable[iterations]
if (iterations === maxIterations) { if (iterations === maxIterations) {
return { return stepFailure(stepToLoop, {
...result, status: AutomationStepStatus.MAX_ITERATIONS,
outputs: { iterations,
success: false, })
iterations,
items,
status: AutomationStepStatus.MAX_ITERATIONS,
},
}
} }
if (matchesLoopFailureCondition(loopStep, currentItem)) { if (matchesLoopFailureCondition(step, currentItem)) {
return { return stepFailure(stepToLoop, {
...result, status: AutomationStepStatus.FAILURE_CONDITION,
outputs: { })
success: false,
iterations,
items,
status: AutomationStepStatus.FAILURE_CONDITION,
},
}
} }
ctx.loop = { currentItem } ctx.loop = { currentItem }
const loopedStepResult = await this.executeStep(ctx, stepToLoop) const result = await this.executeStep(ctx, stepToLoop)
items.push(result.outputs)
ctx.loop = undefined ctx.loop = undefined
items.push(loopedStepResult.outputs)
} }
return { const status =
...result, iterations === 0 ? AutomationStatus.NO_CONDITION_MET : undefined
outputs: { return stepSuccess(stepToLoop, { status, iterations, items })
success: true,
status:
iterations === 0 ? AutomationStepStatus.NO_ITERATIONS : undefined,
iterations,
items,
},
}
} }
private async executeBranchStep( private async executeBranchStep(
ctx: AutomationContext, ctx: AutomationContext,
branchStep: BranchStep step: BranchStep
): Promise<AutomationStepResult[]> { ): Promise<AutomationStepResult[]> {
const { branches, children } = branchStep.inputs const { branches, children } = step.inputs
for (const branch of branches) { for (const branch of branches) {
if (await branchMatches(ctx, branch)) { if (await branchMatches(ctx, branch)) {
@ -497,11 +513,11 @@ class Orchestrator {
return [ return [
{ {
id: branchStep.id, id: step.id,
stepId: branchStep.stepId, stepId: step.stepId,
inputs: branchStep.inputs, inputs: step.inputs,
success: true,
outputs: { outputs: {
success: true,
branchName: branch.name, branchName: branch.name,
status: `${branch.name} branch taken`, status: `${branch.name} branch taken`,
branchId: `${branch.id}`, branchId: `${branch.id}`,
@ -515,75 +531,64 @@ class Orchestrator {
this.stopped = true this.stopped = true
return [ return [
{ {
id: branchStep.id, id: step.id,
stepId: branchStep.stepId, stepId: step.stepId,
inputs: branchStep.inputs, inputs: step.inputs,
success: false, outputs: { success: false, status: AutomationStatus.NO_CONDITION_MET },
outputs: { status: AutomationStatus.NO_CONDITION_MET },
}, },
] ]
} }
private async executeStep( private async executeStep(
ctx: AutomationContext, ctx: AutomationContext,
step: AutomationStep step: Readonly<AutomationStep>
): Promise<AutomationStepResult> { ): Promise<AutomationStepResult> {
return tracer.trace( return tracer.trace("Orchestrator.executeStep", async span => {
"Orchestrator.execute.step", span.addTags({
{ resource: "automation" }, step: {
async span => {
span?.addTags({
resource: "automation",
step: {
stepId: step.stepId,
id: step.id,
name: step.name,
type: step.type,
title: step.stepTitle,
internal: step.internal,
deprecated: step.deprecated,
},
})
if (this.stopped) {
return {
id: step.id,
stepId: step.stepId,
inputs: step.inputs,
outputs: STOPPED_STATUS,
}
}
const stepFn = await this.getStepFunctionality(step.stepId)
const inputs = automationUtils.cleanInputValues(
await processObject(cloneDeep(step.inputs), prepareContext(ctx)),
step.schema.inputs.properties
)
const outputs = await stepFn({
inputs,
appId: this.appId,
emitter: this.emitter,
context: prepareContext(ctx),
})
if (
step.stepId === AutomationActionStepId.FILTER &&
"result" in outputs &&
outputs.result === false
) {
this.stopped = true
;(outputs as any).status = AutomationStatus.STOPPED
}
return {
id: step.id,
stepId: step.stepId, stepId: step.stepId,
inputs, id: step.id,
outputs, name: step.name,
} type: step.type,
title: step.stepTitle,
internal: step.internal,
deprecated: step.deprecated,
},
})
if (this.stopped) {
span.addTags({ stopped: true })
return stepStopped(step)
} }
)
const fn = await actions.getAction(step.stepId)
if (fn == null) {
throw new Error(`Cannot find automation step by name ${step.stepId}`)
}
const inputs = automationUtils.cleanInputValues(
await processObject(cloneDeep(step.inputs), prepareContext(ctx)),
step.schema.inputs.properties
)
const outputs = await fn({
inputs,
appId: this.appId,
emitter: this.emitter,
context: prepareContext(ctx),
})
if (
step.stepId === AutomationActionStepId.FILTER &&
"result" in outputs &&
outputs.result === false
) {
this.stopped = true
;(outputs as any).status = AutomationStatus.STOPPED
}
return stepSuccess(step, outputs, inputs)
})
} }
} }
@ -604,10 +609,9 @@ export function execute(job: Job<AutomationData>, callback: WorkerCallback) {
task: async () => { task: async () => {
const envVars = await sdkUtils.getEnvironmentVariables() const envVars = await sdkUtils.getEnvironmentVariables()
await context.doInEnvironmentContext(envVars, async () => { await context.doInEnvironmentContext(envVars, async () => {
const automationOrchestrator = new Orchestrator(job) const orchestrator = new Orchestrator(job)
try { try {
const response = await automationOrchestrator.execute() callback(null, await orchestrator.execute())
callback(null, response)
} catch (err) { } catch (err) {
callback(err) callback(err)
} }
@ -624,24 +628,14 @@ export async function executeInThread(
throw new Error("Unable to execute, event doesn't contain app ID.") throw new Error("Unable to execute, event doesn't contain app ID.")
} }
const timeoutPromise = new Promise((_resolve, reject) => { return await context.doInAppContext(appId, async () => {
setTimeout(() => {
reject(new Error("Timeout exceeded"))
}, job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT)
})
return (await context.doInAppContext(appId, async () => {
await context.ensureSnippetContext() await context.ensureSnippetContext()
const envVars = await sdkUtils.getEnvironmentVariables() const envVars = await sdkUtils.getEnvironmentVariables()
// put into automation thread for whole context
return await context.doInEnvironmentContext(envVars, async () => { return await context.doInEnvironmentContext(envVars, async () => {
const automationOrchestrator = new Orchestrator(job) const orchestrator = new Orchestrator(job)
return await Promise.race([ return orchestrator.execute()
automationOrchestrator.execute(),
timeoutPromise,
])
}) })
})) as AutomationResults })
} }
export const removeStalled = async (job: Job) => { export const removeStalled = async (job: Job) => {
@ -650,7 +644,7 @@ export const removeStalled = async (job: Job) => {
throw new Error("Unable to execute, event doesn't contain app ID.") throw new Error("Unable to execute, event doesn't contain app ID.")
} }
await context.doInAppContext(appId, async () => { await context.doInAppContext(appId, async () => {
const automationOrchestrator = new Orchestrator(job) const orchestrator = new Orchestrator(job)
await automationOrchestrator.stopCron("stalled") await orchestrator.stopCron("stalled")
}) })
} }

View File

@ -192,13 +192,20 @@ export enum AutomationStoppedReason {
TRIGGER_FILTER_NOT_MET = "Automation did not run. Filter conditions in trigger were not met.", TRIGGER_FILTER_NOT_MET = "Automation did not run. Filter conditions in trigger were not met.",
} }
export interface AutomationStepResultOutputs {
success: boolean
[key: string]: any
}
export interface AutomationStepResultInputs {
[key: string]: any
}
export interface AutomationStepResult { export interface AutomationStepResult {
id: string id: string
stepId: AutomationActionStepId stepId: AutomationActionStepId
inputs: Record<string, any> inputs: AutomationStepResultInputs
outputs: Record<string, any> outputs: AutomationStepResultOutputs
success?: boolean
message?: string
} }
export interface AutomationTriggerResult { export interface AutomationTriggerResult {