From 406410d9168377cf84d3a6f61b8dfe1357721f8e Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 19 Dec 2023 15:57:56 +0000 Subject: [PATCH 1/2] Add DataDog tracing to automations. --- packages/server/src/automations/utils.ts | 69 ++- packages/server/src/threads/automation.ts | 580 ++++++++++++---------- 2 files changed, 378 insertions(+), 271 deletions(-) diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 53c4d9d3b7..0c28787f67 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -16,6 +16,7 @@ import { } from "@budibase/types" import sdk from "../sdk" import { automationsEnabled } from "../features" +import tracer from "dd-trace" const REBOOT_CRON = "@reboot" const WH_STEP_ID = definitions.WEBHOOK.stepId @@ -39,26 +40,62 @@ function loggingArgs(job: AutomationJob) { } export async function processEvent(job: AutomationJob) { - const appId = job.data.event.appId! - const automationId = job.data.automation._id! - const task = async () => { - try { - // need to actually await these so that an error can be captured properly - console.log("automation running", ...loggingArgs(job)) + return tracer.trace( + "processEvent", + { resource: "automation" }, + async span => { + const appId = job.data.event.appId! + const automationId = job.data.automation._id! - const runFn = () => Runner.run(job) - const result = await quotas.addAutomation(runFn, { + span?.addTags({ + appId, automationId, + job: { + id: job.id, + name: job.name, + attemptsMade: job.attemptsMade, + opts: { + attempts: job.opts.attempts, + priority: job.opts.priority, + delay: job.opts.delay, + repeat: job.opts.repeat, + backoff: job.opts.backoff, + lifo: job.opts.lifo, + timeout: job.opts.timeout, + jobId: job.opts.jobId, + removeOnComplete: job.opts.removeOnComplete, + removeOnFail: job.opts.removeOnFail, + stackTraceLimit: job.opts.stackTraceLimit, + preventParsingData: job.opts.preventParsingData, + }, + }, }) - console.log("automation completed", ...loggingArgs(job)) - return result - } catch (err) { - console.error(`automation was unable to run`, err, ...loggingArgs(job)) - return { err } - } - } - return await context.doInAutomationContext({ appId, automationId, task }) + const task = async () => { + try { + // need to actually await these so that an error can be captured properly + console.log("automation running", ...loggingArgs(job)) + + const runFn = () => Runner.run(job) + const result = await quotas.addAutomation(runFn, { + automationId, + }) + console.log("automation completed", ...loggingArgs(job)) + return result + } catch (err) { + span?.addTags({ error: true }) + console.error( + `automation was unable to run`, + err, + ...loggingArgs(job) + ) + return { err } + } + } + + return await context.doInAutomationContext({ appId, automationId, task }) + } + ) } export async function updateTestHistory( diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index d1fcc2be72..4447899f96 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -34,6 +34,7 @@ import { cloneDeep } from "lodash/fp" import { performance } from "perf_hooks" import * as sdkUtils from "../sdk/utils" import env from "../environment" +import tracer from "dd-trace" threadUtils.threadSetup() const FILTER_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.FILTER.stepId @@ -242,278 +243,347 @@ class Orchestrator { } async execute(): Promise { - // this will retrieve from context created at start of thread - this._context.env = await sdkUtils.getEnvironmentVariables() - let automation = this._automation - let stopped = false - let loopStep: AutomationStep | undefined = undefined + return tracer.trace( + "Orchestrator.execute", + { resource: "automation" }, + async span => { + span?.addTags({ + appId: this._appId, + automationId: this._automation._id, + }) - let stepCount = 0 - let loopStepNumber: any = undefined - let loopSteps: LoopStep[] | undefined = [] - let metadata - let timeoutFlag = false - let wasLoopStep = false - let timeout = this._job.data.event.timeout - // check if this is a recurring automation, - if (isProdAppID(this._appId) && isRecurring(automation)) { - metadata = await this.getMetadata() - const shouldStop = await this.checkIfShouldStop(metadata) - if (shouldStop) { - return - } - } - const start = performance.now() - for (let step of automation.definition.steps) { - if (timeoutFlag) { - break - } + // this will retrieve from context created at start of thread + this._context.env = await sdkUtils.getEnvironmentVariables() + let automation = this._automation + let stopped = false + let loopStep: AutomationStep | undefined = undefined - if (timeout) { - setTimeout(() => { - timeoutFlag = true - }, timeout || 12000) - } - - stepCount++ - let input: any, - iterations = 1, - iterationCount = 0 - - if (step.stepId === LOOP_STEP_ID) { - loopStep = step - loopStepNumber = stepCount - continue - } - - if (loopStep) { - input = await processObject(loopStep.inputs, this._context) - iterations = getLoopIterations(loopStep as LoopStep) - } - for (let index = 0; index < iterations; index++) { - let originalStepInput = cloneDeep(step.inputs) - // Handle if the user has set a max iteration count or if it reaches the max limit set by us - if (loopStep && input.binding) { - let tempOutput = { items: loopSteps, iterations: iterationCount } - try { - loopStep.inputs.binding = automationUtils.typecastForLooping( - loopStep as LoopStep, - loopStep.inputs as LoopInput - ) - } catch (err) { - this.updateContextAndOutput(loopStepNumber, step, tempOutput, { - status: AutomationErrors.INCORRECT_TYPE, - success: false, - }) - loopSteps = undefined - loopStep = undefined - break - } - let item = [] - if ( - typeof loopStep.inputs.binding === "string" && - loopStep.inputs.option === "String" - ) { - item = automationUtils.stringSplit(loopStep.inputs.binding) - } else if (Array.isArray(loopStep.inputs.binding)) { - item = loopStep.inputs.binding - } - this._context.steps[loopStepNumber] = { - currentItem: item[index], - } - - // The "Loop" binding in the front end is "fake", so replace it here so the context can understand it - // Pretty hacky because we need to account for the row object - for (let [key, value] of Object.entries(originalStepInput)) { - if (typeof value === "object") { - for (let [innerKey, innerValue] of Object.entries( - originalStepInput[key] - )) { - if (typeof innerValue === "string") { - originalStepInput[key][innerKey] = - automationUtils.substituteLoopStep( - innerValue, - `steps.${loopStepNumber}` - ) - } else if (typeof value === "object") { - for (let [innerObject, innerValue] of Object.entries( - originalStepInput[key][innerKey] - )) { - originalStepInput[key][innerKey][innerObject] = - automationUtils.substituteLoopStep( - innerValue as string, - `steps.${loopStepNumber}` - ) - } - } - } - } else { - if (typeof value === "string") { - originalStepInput[key] = automationUtils.substituteLoopStep( - value, - `steps.${loopStepNumber}` - ) - } - } - } - - if ( - index === env.AUTOMATION_MAX_ITERATIONS || - index === parseInt(loopStep.inputs.iterations) - ) { - this.updateContextAndOutput(loopStepNumber, step, tempOutput, { - status: AutomationErrors.MAX_ITERATIONS, - success: true, - }) - loopSteps = undefined - loopStep = undefined - break - } - - let isFailure = false - const currentItem = this._context.steps[loopStepNumber]?.currentItem - if (currentItem && typeof currentItem === "object") { - isFailure = Object.keys(currentItem).some(value => { - return currentItem[value] === loopStep?.inputs.failure - }) - } else { - isFailure = currentItem && currentItem === loopStep.inputs.failure - } - - if (isFailure) { - this.updateContextAndOutput(loopStepNumber, step, tempOutput, { - status: AutomationErrors.FAILURE_CONDITION, - success: false, - }) - loopSteps = undefined - loopStep = undefined - break + let stepCount = 0 + let loopStepNumber: any = undefined + let loopSteps: LoopStep[] | undefined = [] + let metadata + let timeoutFlag = false + let wasLoopStep = false + let timeout = this._job.data.event.timeout + // check if this is a recurring automation, + if (isProdAppID(this._appId) && isRecurring(automation)) { + span?.addTags({ recurring: true }) + metadata = await this.getMetadata() + const shouldStop = await this.checkIfShouldStop(metadata) + if (shouldStop) { + span?.addTags({ shouldStop: true }) + return } } - - // execution stopped, record state for that - if (stopped) { - this.updateExecutionOutput(step.id, step.stepId, {}, STOPPED_STATUS) - continue - } - - // If it's a loop step, we need to manually add the bindings to the context - let stepFn = await this.getStepFunctionality(step.stepId) - let inputs = await processObject(originalStepInput, this._context) - inputs = automationUtils.cleanInputValues(inputs, step.schema.inputs) - - try { - // appId is always passed - const outputs = await stepFn({ - inputs: inputs, - appId: this._appId, - emitter: this._emitter, - context: this._context, + const start = performance.now() + for (let step of automation.definition.steps) { + const stepSpan = tracer.startSpan("Orchestrator.execute.step", { + childOf: span, + }) + stepSpan.addTags({ + resource: "automation", + step: { + stepId: step.stepId, + id: step.id, + name: step.name, + type: step.type, + title: step.stepTitle, + internal: step.internal, + deprecated: step.deprecated, + }, }) - this._context.steps[stepCount] = outputs - // if filter causes us to stop execution don't break the loop, set a var - // so that we can finish iterating through the steps and record that it stopped - if (step.stepId === FILTER_STEP_ID && !outputs.result) { - stopped = true - this.updateExecutionOutput(step.id, step.stepId, step.inputs, { - ...outputs, - ...STOPPED_STATUS, - }) - continue - } - if (loopStep && loopSteps) { - loopSteps.push(outputs) - } else { - this.updateExecutionOutput( - step.id, - step.stepId, - step.inputs, - outputs - ) - } - } catch (err) { - console.error(`Automation error - ${step.stepId} - ${err}`) - return err - } + let input: any, + iterations = 1, + iterationCount = 0 - if (loopStep) { - iterationCount++ - if (index === iterations - 1) { + try { + if (timeoutFlag) { + span?.addTags({ timedOut: true }) + break + } + + if (timeout) { + setTimeout(() => { + timeoutFlag = true + }, timeout || 12000) + } + + stepCount++ + if (step.stepId === LOOP_STEP_ID) { + loopStep = step + loopStepNumber = stepCount + continue + } + + if (loopStep) { + input = await processObject(loopStep.inputs, this._context) + iterations = getLoopIterations(loopStep as LoopStep) + stepSpan?.addTags({ step: { iterations } }) + } + for (let index = 0; index < iterations; index++) { + let originalStepInput = cloneDeep(step.inputs) + // Handle if the user has set a max iteration count or if it reaches the max limit set by us + if (loopStep && input.binding) { + let tempOutput = { + items: loopSteps, + iterations: iterationCount, + } + try { + loopStep.inputs.binding = automationUtils.typecastForLooping( + loopStep as LoopStep, + loopStep.inputs as LoopInput + ) + } catch (err) { + this.updateContextAndOutput( + loopStepNumber, + step, + tempOutput, + { + status: AutomationErrors.INCORRECT_TYPE, + success: false, + } + ) + loopSteps = undefined + loopStep = undefined + break + } + let item = [] + if ( + typeof loopStep.inputs.binding === "string" && + loopStep.inputs.option === "String" + ) { + item = automationUtils.stringSplit(loopStep.inputs.binding) + } else if (Array.isArray(loopStep.inputs.binding)) { + item = loopStep.inputs.binding + } + this._context.steps[loopStepNumber] = { + currentItem: item[index], + } + + // The "Loop" binding in the front end is "fake", so replace it here so the context can understand it + // Pretty hacky because we need to account for the row object + for (let [key, value] of Object.entries(originalStepInput)) { + if (typeof value === "object") { + for (let [innerKey, innerValue] of Object.entries( + originalStepInput[key] + )) { + if (typeof innerValue === "string") { + originalStepInput[key][innerKey] = + automationUtils.substituteLoopStep( + innerValue, + `steps.${loopStepNumber}` + ) + } else if (typeof value === "object") { + for (let [innerObject, innerValue] of Object.entries( + originalStepInput[key][innerKey] + )) { + originalStepInput[key][innerKey][innerObject] = + automationUtils.substituteLoopStep( + innerValue as string, + `steps.${loopStepNumber}` + ) + } + } + } + } else { + if (typeof value === "string") { + originalStepInput[key] = + automationUtils.substituteLoopStep( + value, + `steps.${loopStepNumber}` + ) + } + } + } + + if ( + index === env.AUTOMATION_MAX_ITERATIONS || + index === parseInt(loopStep.inputs.iterations) + ) { + this.updateContextAndOutput( + loopStepNumber, + step, + tempOutput, + { + status: AutomationErrors.MAX_ITERATIONS, + success: true, + } + ) + loopSteps = undefined + loopStep = undefined + break + } + + let isFailure = false + const currentItem = + this._context.steps[loopStepNumber]?.currentItem + if (currentItem && typeof currentItem === "object") { + isFailure = Object.keys(currentItem).some(value => { + return currentItem[value] === loopStep?.inputs.failure + }) + } else { + isFailure = + currentItem && currentItem === loopStep.inputs.failure + } + + if (isFailure) { + this.updateContextAndOutput( + loopStepNumber, + step, + tempOutput, + { + status: AutomationErrors.FAILURE_CONDITION, + success: false, + } + ) + loopSteps = undefined + loopStep = undefined + break + } + } + + // execution stopped, record state for that + if (stopped) { + this.updateExecutionOutput( + step.id, + step.stepId, + {}, + STOPPED_STATUS + ) + continue + } + + // If it's a loop step, we need to manually add the bindings to the context + let stepFn = await this.getStepFunctionality(step.stepId) + let inputs = await processObject(originalStepInput, this._context) + inputs = automationUtils.cleanInputValues( + inputs, + step.schema.inputs + ) + + try { + // appId is always passed + const outputs = await stepFn({ + inputs: inputs, + appId: this._appId, + emitter: this._emitter, + context: this._context, + }) + + this._context.steps[stepCount] = outputs + // if filter causes us to stop execution don't break the loop, set a var + // so that we can finish iterating through the steps and record that it stopped + if (step.stepId === FILTER_STEP_ID && !outputs.result) { + stopped = true + this.updateExecutionOutput( + step.id, + step.stepId, + step.inputs, + { + ...outputs, + ...STOPPED_STATUS, + } + ) + continue + } + if (loopStep && loopSteps) { + loopSteps.push(outputs) + } else { + this.updateExecutionOutput( + step.id, + step.stepId, + step.inputs, + outputs + ) + } + } catch (err) { + console.error(`Automation error - ${step.stepId} - ${err}`) + return err + } + + if (loopStep) { + iterationCount++ + if (index === iterations - 1) { + loopStep = undefined + this._context.steps.splice(loopStepNumber, 1) + break + } + } + } + } finally { + stepSpan?.finish() + } + + if (loopStep && iterations === 0) { loopStep = undefined + this.executionOutput.steps.splice(loopStepNumber + 1, 0, { + id: step.id, + stepId: step.stepId, + outputs: { + status: AutomationStepStatus.NO_ITERATIONS, + success: true, + }, + inputs: {}, + }) + this._context.steps.splice(loopStepNumber, 1) - break + iterations = 1 + } + + // Delete the step after the loop step as it's irrelevant, since information is included + // in the loop step + if (wasLoopStep && !loopStep) { + this._context.steps.splice(loopStepNumber + 1, 1) + wasLoopStep = false + } + if (loopSteps && loopSteps.length) { + let tempOutput = { + success: true, + items: loopSteps, + iterations: iterationCount, + } + this.executionOutput.steps.splice(loopStepNumber + 1, 0, { + id: step.id, + stepId: step.stepId, + outputs: tempOutput, + inputs: step.inputs, + }) + this._context.steps[loopStepNumber] = tempOutput + + wasLoopStep = true + loopSteps = [] } } - } - if (loopStep && iterations === 0) { - loopStep = undefined - this.executionOutput.steps.splice(loopStepNumber + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: { - status: AutomationStepStatus.NO_ITERATIONS, - success: true, - }, - inputs: {}, - }) + const end = performance.now() + const executionTime = end - start - this._context.steps.splice(loopStepNumber, 1) - iterations = 1 - } + console.info( + `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, + { + _logKey: "automation", + executionTime, + } + ) - // Delete the step after the loop step as it's irrelevant, since information is included - // in the loop step - if (wasLoopStep && !loopStep) { - this._context.steps.splice(loopStepNumber + 1, 1) - wasLoopStep = false - } - if (loopSteps && loopSteps.length) { - let tempOutput = { - success: true, - items: loopSteps, - iterations: iterationCount, + // store the logs for the automation run + try { + await storeLog(this._automation, this.executionOutput) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) } - this.executionOutput.steps.splice(loopStepNumber + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: tempOutput, - inputs: step.inputs, - }) - this._context.steps[loopStepNumber] = tempOutput - - wasLoopStep = true - loopSteps = [] - } - } - - const end = performance.now() - const executionTime = end - start - - console.info( - `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, - { - _logKey: "automation", - executionTime, + if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { + await this.updateMetadata(metadata) + } + return this.executionOutput } ) - - // store the logs for the automation run - try { - await storeLog(this._automation, this.executionOutput) - } catch (e: any) { - if (e.status === 413 && e.request?.data) { - // if content is too large we shouldn't log it - delete e.request.data - e.request.data = { message: "removed due to large size" } - } - logging.logAlert("Error writing automation log", e) - } - if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { - await this.updateMetadata(metadata) - } - return this.executionOutput } } From fef3107c0e75c89a145f38bd4b3522fd33980d71 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 19 Dec 2023 16:17:34 +0000 Subject: [PATCH 2/2] Fix automation tests. --- packages/backend-core/src/queue/inMemoryQueue.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index ac7cdf550b..c05bbffbe9 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -15,6 +15,7 @@ function newJob(queue: string, message: any) { timestamp: Date.now(), queue: queue, data: message, + opts: {}, } }