From 642b75e0aeae5e50e9a38d847bd9efee091129fd Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 2 Jan 2024 11:36:32 +0000 Subject: [PATCH] Revert "Remove all custom tracing to see if it's the cause of the memory leak." --- .../backend-core/src/db/couch/DatabaseImpl.ts | 4 +- packages/backend-core/src/db/db.ts | 3 +- .../backend-core/src/db/instrumentation.ts | 156 +++++ .../backend-core/src/logging/pino/logger.ts | 6 + packages/server/src/automations/utils.ts | 70 ++- packages/server/src/jsRunner.ts | 53 +- packages/server/src/middleware/currentapp.ts | 14 + packages/server/src/threads/automation.ts | 583 ++++++++++-------- .../src/utilities/rowProcessor/utils.ts | 56 +- 9 files changed, 622 insertions(+), 323 deletions(-) create mode 100644 packages/backend-core/src/db/instrumentation.ts diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index c2c0b6b21d..3fec573bb9 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -17,6 +17,7 @@ import { directCouchUrlCall } from "./utils" import { getPouchDB } from "./pouchDB" import { WriteStream, ReadStream } from "fs" import { newid } from "../../docIds/newid" +import { DDInstrumentedDatabase } from "../instrumentation" function buildNano(couchInfo: { url: string; cookie: string }) { return Nano({ @@ -35,7 +36,8 @@ export function DatabaseWithConnection( connection: string, opts?: DatabaseOpts ) { - return new DatabaseImpl(dbName, opts, connection) + const db = new DatabaseImpl(dbName, opts, connection) + return new DDInstrumentedDatabase(db) } export class DatabaseImpl implements Database { diff --git a/packages/backend-core/src/db/db.ts b/packages/backend-core/src/db/db.ts index 3e69d49f0e..197770298e 100644 --- a/packages/backend-core/src/db/db.ts +++ b/packages/backend-core/src/db/db.ts @@ -1,8 +1,9 @@ import { directCouchQuery, DatabaseImpl } from "./couch" import { CouchFindOptions, Database, DatabaseOpts } from "@budibase/types" +import { DDInstrumentedDatabase } from "./instrumentation" export function getDB(dbName: string, opts?: DatabaseOpts): Database { - return new DatabaseImpl(dbName, opts) + return new DDInstrumentedDatabase(new DatabaseImpl(dbName, opts)) } // we have to use a callback for this so that we can close diff --git a/packages/backend-core/src/db/instrumentation.ts b/packages/backend-core/src/db/instrumentation.ts new file mode 100644 index 0000000000..ba5febcba6 --- /dev/null +++ b/packages/backend-core/src/db/instrumentation.ts @@ -0,0 +1,156 @@ +import { + DocumentScope, + DocumentDestroyResponse, + DocumentInsertResponse, + DocumentBulkResponse, + OkResponse, +} from "@budibase/nano" +import { + AllDocsResponse, + AnyDocument, + Database, + DatabaseDumpOpts, + DatabasePutOpts, + DatabaseQueryOpts, + Document, +} from "@budibase/types" +import tracer from "dd-trace" +import { Writable } from "stream" + +export class DDInstrumentedDatabase implements Database { + constructor(private readonly db: Database) {} + + get name(): string { + return this.db.name + } + + exists(): Promise { + return tracer.trace("db.exists", span => { + span?.addTags({ db_name: this.name }) + return this.db.exists() + }) + } + + checkSetup(): Promise> { + return tracer.trace("db.checkSetup", span => { + span?.addTags({ db_name: this.name }) + return this.db.checkSetup() + }) + } + + get(id?: string | undefined): Promise { + return tracer.trace("db.get", span => { + span?.addTags({ db_name: this.name, doc_id: id }) + return this.db.get(id) + }) + } + + getMultiple( + ids: string[], + opts?: { allowMissing?: boolean | undefined } | undefined + ): Promise { + return tracer.trace("db.getMultiple", span => { + span?.addTags({ + db_name: this.name, + num_docs: ids.length, + allow_missing: opts?.allowMissing, + }) + return this.db.getMultiple(ids, opts) + }) + } + + remove( + id: string | Document, + rev?: string | undefined + ): Promise { + return tracer.trace("db.remove", span => { + span?.addTags({ db_name: this.name, doc_id: id }) + return this.db.remove(id, rev) + }) + } + + put( + document: AnyDocument, + opts?: DatabasePutOpts | undefined + ): Promise { + return tracer.trace("db.put", span => { + span?.addTags({ db_name: this.name, doc_id: document._id }) + return this.db.put(document, opts) + }) + } + + bulkDocs(documents: AnyDocument[]): Promise { + return tracer.trace("db.bulkDocs", span => { + span?.addTags({ db_name: this.name, num_docs: documents.length }) + return this.db.bulkDocs(documents) + }) + } + + allDocs( + params: DatabaseQueryOpts + ): Promise> { + return tracer.trace("db.allDocs", span => { + span?.addTags({ db_name: this.name }) + return this.db.allDocs(params) + }) + } + + query( + viewName: string, + params: DatabaseQueryOpts + ): Promise> { + return tracer.trace("db.query", span => { + span?.addTags({ db_name: this.name, view_name: viewName }) + return this.db.query(viewName, params) + }) + } + + destroy(): Promise { + return tracer.trace("db.destroy", span => { + span?.addTags({ db_name: this.name }) + return this.db.destroy() + }) + } + + compact(): Promise { + return tracer.trace("db.compact", span => { + span?.addTags({ db_name: this.name }) + return this.db.compact() + }) + } + + dump(stream: Writable, opts?: DatabaseDumpOpts | undefined): Promise { + return tracer.trace("db.dump", span => { + span?.addTags({ db_name: this.name }) + return this.db.dump(stream, opts) + }) + } + + load(...args: any[]): Promise { + return tracer.trace("db.load", span => { + span?.addTags({ db_name: this.name }) + return this.db.load(...args) + }) + } + + createIndex(...args: any[]): Promise { + return tracer.trace("db.createIndex", span => { + span?.addTags({ db_name: this.name }) + return this.db.createIndex(...args) + }) + } + + deleteIndex(...args: any[]): Promise { + return tracer.trace("db.deleteIndex", span => { + span?.addTags({ db_name: this.name }) + return this.db.deleteIndex(...args) + }) + } + + getIndexes(...args: any[]): Promise { + return tracer.trace("db.getIndexes", span => { + span?.addTags({ db_name: this.name }) + return this.db.getIndexes(...args) + }) + } +} diff --git a/packages/backend-core/src/logging/pino/logger.ts b/packages/backend-core/src/logging/pino/logger.ts index ad68bd300d..7a051e7f12 100644 --- a/packages/backend-core/src/logging/pino/logger.ts +++ b/packages/backend-core/src/logging/pino/logger.ts @@ -5,6 +5,7 @@ import { IdentityType } from "@budibase/types" import env from "../../environment" import * as context from "../../context" import * as correlation from "../correlation" +import tracer from "dd-trace" import { formats } from "dd-trace/ext" import { localFileDestination } from "../system" @@ -116,6 +117,11 @@ if (!env.DISABLE_PINO_LOGGER) { correlationId: correlation.getId(), } + const span = tracer.scope().active() + if (span) { + tracer.inject(span.context(), formats.LOG, contextObject) + } + const mergingObject: any = { err: error, pid: process.pid, diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 04fd36e3d1..0c28787f67 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -16,6 +16,7 @@ import { } from "@budibase/types" import sdk from "../sdk" import { automationsEnabled } from "../features" +import tracer from "dd-trace" const REBOOT_CRON = "@reboot" const WH_STEP_ID = definitions.WEBHOOK.stepId @@ -39,27 +40,62 @@ function loggingArgs(job: AutomationJob) { } export async function processEvent(job: AutomationJob) { - const appId = job.data.event.appId! - const automationId = job.data.automation._id! + return tracer.trace( + "processEvent", + { resource: "automation" }, + async span => { + const appId = job.data.event.appId! + const automationId = job.data.automation._id! - const task = async () => { - try { - // need to actually await these so that an error can be captured properly - console.log("automation running", ...loggingArgs(job)) - - const runFn = () => Runner.run(job) - const result = await quotas.addAutomation(runFn, { + span?.addTags({ + appId, automationId, + job: { + id: job.id, + name: job.name, + attemptsMade: job.attemptsMade, + opts: { + attempts: job.opts.attempts, + priority: job.opts.priority, + delay: job.opts.delay, + repeat: job.opts.repeat, + backoff: job.opts.backoff, + lifo: job.opts.lifo, + timeout: job.opts.timeout, + jobId: job.opts.jobId, + removeOnComplete: job.opts.removeOnComplete, + removeOnFail: job.opts.removeOnFail, + stackTraceLimit: job.opts.stackTraceLimit, + preventParsingData: job.opts.preventParsingData, + }, + }, }) - console.log("automation completed", ...loggingArgs(job)) - return result - } catch (err) { - console.error(`automation was unable to run`, err, ...loggingArgs(job)) - return { err } - } - } - return await context.doInAutomationContext({ appId, automationId, task }) + const task = async () => { + try { + // need to actually await these so that an error can be captured properly + console.log("automation running", ...loggingArgs(job)) + + const runFn = () => Runner.run(job) + const result = await quotas.addAutomation(runFn, { + automationId, + }) + console.log("automation completed", ...loggingArgs(job)) + return result + } catch (err) { + span?.addTags({ error: true }) + console.error( + `automation was unable to run`, + err, + ...loggingArgs(job) + ) + return { err } + } + } + + return await context.doInAutomationContext({ appId, automationId, task }) + } + ) } export async function updateTestHistory( diff --git a/packages/server/src/jsRunner.ts b/packages/server/src/jsRunner.ts index a9301feb60..ab0381a399 100644 --- a/packages/server/src/jsRunner.ts +++ b/packages/server/src/jsRunner.ts @@ -2,35 +2,44 @@ import vm from "vm" import env from "./environment" import { setJSRunner } from "@budibase/string-templates" import { context, timers } from "@budibase/backend-core" +import tracer from "dd-trace" type TrackerFn = (f: () => T) => T export function init() { setJSRunner((js: string, ctx: vm.Context) => { - const perRequestLimit = env.JS_PER_REQUEST_TIME_LIMIT_MS - let track: TrackerFn = f => f() - if (perRequestLimit) { - const bbCtx = context.getCurrentContext() - if (bbCtx) { - if (!bbCtx.jsExecutionTracker) { - bbCtx.jsExecutionTracker = - timers.ExecutionTimeTracker.withLimit(perRequestLimit) + return tracer.trace("runJS", {}, span => { + const perRequestLimit = env.JS_PER_REQUEST_TIME_LIMIT_MS + let track: TrackerFn = f => f() + if (perRequestLimit) { + const bbCtx = context.getCurrentContext() + if (bbCtx) { + if (!bbCtx.jsExecutionTracker) { + bbCtx.jsExecutionTracker = + timers.ExecutionTimeTracker.withLimit(perRequestLimit) + } + track = bbCtx.jsExecutionTracker.track.bind(bbCtx.jsExecutionTracker) + span?.addTags({ + js: { + limitMS: bbCtx.jsExecutionTracker.limitMs, + elapsedMS: bbCtx.jsExecutionTracker.elapsedMS, + }, + }) } - track = bbCtx.jsExecutionTracker.track.bind(bbCtx.jsExecutionTracker) } - } - ctx = { - ...ctx, - alert: undefined, - setInterval: undefined, - setTimeout: undefined, - } - vm.createContext(ctx) - return track(() => - vm.runInNewContext(js, ctx, { - timeout: env.JS_PER_EXECUTION_TIME_LIMIT_MS, - }) - ) + ctx = { + ...ctx, + alert: undefined, + setInterval: undefined, + setTimeout: undefined, + } + vm.createContext(ctx) + return track(() => + vm.runInNewContext(js, ctx, { + timeout: env.JS_PER_EXECUTION_TIME_LIMIT_MS, + }) + ) + }) }) } diff --git a/packages/server/src/middleware/currentapp.ts b/packages/server/src/middleware/currentapp.ts index 984dd8e5e9..ad6f2afa18 100644 --- a/packages/server/src/middleware/currentapp.ts +++ b/packages/server/src/middleware/currentapp.ts @@ -12,6 +12,7 @@ import { getCachedSelf } from "../utilities/global" import env from "../environment" import { isWebhookEndpoint } from "./utils" import { UserCtx, ContextUser } from "@budibase/types" +import tracer from "dd-trace" export default async (ctx: UserCtx, next: any) => { // try to get the appID from the request @@ -20,6 +21,11 @@ export default async (ctx: UserCtx, next: any) => { return next() } + if (requestAppId) { + const span = tracer.scope().active() + span?.setTag("appId", requestAppId) + } + // deny access to application preview if (!env.isTest()) { if ( @@ -70,6 +76,14 @@ export default async (ctx: UserCtx, next: any) => { return next() } + if (ctx.user) { + const span = tracer.scope().active() + if (ctx.user._id) { + span?.setTag("userId", ctx.user._id) + } + span?.setTag("tenantId", ctx.user.tenantId) + } + const userId = ctx.user ? generateUserMetadataID(ctx.user._id!) : undefined // if the user is not in the right tenant then make sure to wipe their cookie diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 9bb1717f3c..4447899f96 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -34,6 +34,7 @@ import { cloneDeep } from "lodash/fp" import { performance } from "perf_hooks" import * as sdkUtils from "../sdk/utils" import env from "../environment" +import tracer from "dd-trace" threadUtils.threadSetup() const FILTER_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.FILTER.stepId @@ -242,281 +243,347 @@ class Orchestrator { } async execute(): Promise { - // this will retrieve from context created at start of thread - this._context.env = await sdkUtils.getEnvironmentVariables() - let automation = this._automation - let stopped = false - let loopStep: AutomationStep | undefined = undefined + return tracer.trace( + "Orchestrator.execute", + { resource: "automation" }, + async span => { + span?.addTags({ + appId: this._appId, + automationId: this._automation._id, + }) - let stepCount = 0 - let loopStepNumber: any = undefined - let loopSteps: LoopStep[] | undefined = [] - let metadata - let timeoutFlag = false - let wasLoopStep = false - let timeout = this._job.data.event.timeout - // check if this is a recurring automation, - if (isProdAppID(this._appId) && isRecurring(automation)) { - metadata = await this.getMetadata() - const shouldStop = await this.checkIfShouldStop(metadata) - if (shouldStop) { - return - } - } - const start = performance.now() - for (let step of automation.definition.steps) { - let input: any, - iterations = 1, - iterationCount = 0 + // this will retrieve from context created at start of thread + this._context.env = await sdkUtils.getEnvironmentVariables() + let automation = this._automation + let stopped = false + let loopStep: AutomationStep | undefined = undefined - if (timeoutFlag) { - break - } - - if (timeout) { - setTimeout(() => { - timeoutFlag = true - }, timeout || 12000) - } - - stepCount++ - if (step.stepId === LOOP_STEP_ID) { - loopStep = step - loopStepNumber = stepCount - continue - } - - if (loopStep) { - input = await processObject(loopStep.inputs, this._context) - iterations = getLoopIterations(loopStep as LoopStep) - } - for (let index = 0; index < iterations; index++) { - let originalStepInput = cloneDeep(step.inputs) - // Handle if the user has set a max iteration count or if it reaches the max limit set by us - if (loopStep && input.binding) { - let tempOutput = { - items: loopSteps, - iterations: iterationCount, - } - try { - loopStep.inputs.binding = automationUtils.typecastForLooping( - loopStep as LoopStep, - loopStep.inputs as LoopInput - ) - } catch (err) { - this.updateContextAndOutput(loopStepNumber, step, tempOutput, { - status: AutomationErrors.INCORRECT_TYPE, - success: false, - }) - loopSteps = undefined - loopStep = undefined - break - } - let item = [] - if ( - typeof loopStep.inputs.binding === "string" && - loopStep.inputs.option === "String" - ) { - item = automationUtils.stringSplit(loopStep.inputs.binding) - } else if (Array.isArray(loopStep.inputs.binding)) { - item = loopStep.inputs.binding - } - this._context.steps[loopStepNumber] = { - currentItem: item[index], - } - - // The "Loop" binding in the front end is "fake", so replace it here so the context can understand it - // Pretty hacky because we need to account for the row object - for (let [key, value] of Object.entries(originalStepInput)) { - if (typeof value === "object") { - for (let [innerKey, innerValue] of Object.entries( - originalStepInput[key] - )) { - if (typeof innerValue === "string") { - originalStepInput[key][innerKey] = - automationUtils.substituteLoopStep( - innerValue, - `steps.${loopStepNumber}` - ) - } else if (typeof value === "object") { - for (let [innerObject, innerValue] of Object.entries( - originalStepInput[key][innerKey] - )) { - originalStepInput[key][innerKey][innerObject] = - automationUtils.substituteLoopStep( - innerValue as string, - `steps.${loopStepNumber}` - ) - } - } - } - } else { - if (typeof value === "string") { - originalStepInput[key] = automationUtils.substituteLoopStep( - value, - `steps.${loopStepNumber}` - ) - } - } - } - - if ( - index === env.AUTOMATION_MAX_ITERATIONS || - index === parseInt(loopStep.inputs.iterations) - ) { - this.updateContextAndOutput(loopStepNumber, step, tempOutput, { - status: AutomationErrors.MAX_ITERATIONS, - success: true, - }) - loopSteps = undefined - loopStep = undefined - break - } - - let isFailure = false - const currentItem = this._context.steps[loopStepNumber]?.currentItem - if (currentItem && typeof currentItem === "object") { - isFailure = Object.keys(currentItem).some(value => { - return currentItem[value] === loopStep?.inputs.failure - }) - } else { - isFailure = currentItem && currentItem === loopStep.inputs.failure - } - - if (isFailure) { - this.updateContextAndOutput(loopStepNumber, step, tempOutput, { - status: AutomationErrors.FAILURE_CONDITION, - success: false, - }) - loopSteps = undefined - loopStep = undefined - break + let stepCount = 0 + let loopStepNumber: any = undefined + let loopSteps: LoopStep[] | undefined = [] + let metadata + let timeoutFlag = false + let wasLoopStep = false + let timeout = this._job.data.event.timeout + // check if this is a recurring automation, + if (isProdAppID(this._appId) && isRecurring(automation)) { + span?.addTags({ recurring: true }) + metadata = await this.getMetadata() + const shouldStop = await this.checkIfShouldStop(metadata) + if (shouldStop) { + span?.addTags({ shouldStop: true }) + return } } - - // execution stopped, record state for that - if (stopped) { - this.updateExecutionOutput(step.id, step.stepId, {}, STOPPED_STATUS) - continue - } - - // If it's a loop step, we need to manually add the bindings to the context - let stepFn = await this.getStepFunctionality(step.stepId) - let inputs = await processObject(originalStepInput, this._context) - inputs = automationUtils.cleanInputValues(inputs, step.schema.inputs) - - try { - // appId is always passed - const outputs = await stepFn({ - inputs: inputs, - appId: this._appId, - emitter: this._emitter, - context: this._context, + const start = performance.now() + for (let step of automation.definition.steps) { + const stepSpan = tracer.startSpan("Orchestrator.execute.step", { + childOf: span, + }) + stepSpan.addTags({ + resource: "automation", + step: { + stepId: step.stepId, + id: step.id, + name: step.name, + type: step.type, + title: step.stepTitle, + internal: step.internal, + deprecated: step.deprecated, + }, }) - this._context.steps[stepCount] = outputs - // if filter causes us to stop execution don't break the loop, set a var - // so that we can finish iterating through the steps and record that it stopped - if (step.stepId === FILTER_STEP_ID && !outputs.result) { - stopped = true - this.updateExecutionOutput(step.id, step.stepId, step.inputs, { - ...outputs, - ...STOPPED_STATUS, - }) - continue - } - if (loopStep && loopSteps) { - loopSteps.push(outputs) - } else { - this.updateExecutionOutput( - step.id, - step.stepId, - step.inputs, - outputs - ) - } - } catch (err) { - console.error(`Automation error - ${step.stepId} - ${err}`) - return err - } + let input: any, + iterations = 1, + iterationCount = 0 - if (loopStep) { - iterationCount++ - if (index === iterations - 1) { + try { + if (timeoutFlag) { + span?.addTags({ timedOut: true }) + break + } + + if (timeout) { + setTimeout(() => { + timeoutFlag = true + }, timeout || 12000) + } + + stepCount++ + if (step.stepId === LOOP_STEP_ID) { + loopStep = step + loopStepNumber = stepCount + continue + } + + if (loopStep) { + input = await processObject(loopStep.inputs, this._context) + iterations = getLoopIterations(loopStep as LoopStep) + stepSpan?.addTags({ step: { iterations } }) + } + for (let index = 0; index < iterations; index++) { + let originalStepInput = cloneDeep(step.inputs) + // Handle if the user has set a max iteration count or if it reaches the max limit set by us + if (loopStep && input.binding) { + let tempOutput = { + items: loopSteps, + iterations: iterationCount, + } + try { + loopStep.inputs.binding = automationUtils.typecastForLooping( + loopStep as LoopStep, + loopStep.inputs as LoopInput + ) + } catch (err) { + this.updateContextAndOutput( + loopStepNumber, + step, + tempOutput, + { + status: AutomationErrors.INCORRECT_TYPE, + success: false, + } + ) + loopSteps = undefined + loopStep = undefined + break + } + let item = [] + if ( + typeof loopStep.inputs.binding === "string" && + loopStep.inputs.option === "String" + ) { + item = automationUtils.stringSplit(loopStep.inputs.binding) + } else if (Array.isArray(loopStep.inputs.binding)) { + item = loopStep.inputs.binding + } + this._context.steps[loopStepNumber] = { + currentItem: item[index], + } + + // The "Loop" binding in the front end is "fake", so replace it here so the context can understand it + // Pretty hacky because we need to account for the row object + for (let [key, value] of Object.entries(originalStepInput)) { + if (typeof value === "object") { + for (let [innerKey, innerValue] of Object.entries( + originalStepInput[key] + )) { + if (typeof innerValue === "string") { + originalStepInput[key][innerKey] = + automationUtils.substituteLoopStep( + innerValue, + `steps.${loopStepNumber}` + ) + } else if (typeof value === "object") { + for (let [innerObject, innerValue] of Object.entries( + originalStepInput[key][innerKey] + )) { + originalStepInput[key][innerKey][innerObject] = + automationUtils.substituteLoopStep( + innerValue as string, + `steps.${loopStepNumber}` + ) + } + } + } + } else { + if (typeof value === "string") { + originalStepInput[key] = + automationUtils.substituteLoopStep( + value, + `steps.${loopStepNumber}` + ) + } + } + } + + if ( + index === env.AUTOMATION_MAX_ITERATIONS || + index === parseInt(loopStep.inputs.iterations) + ) { + this.updateContextAndOutput( + loopStepNumber, + step, + tempOutput, + { + status: AutomationErrors.MAX_ITERATIONS, + success: true, + } + ) + loopSteps = undefined + loopStep = undefined + break + } + + let isFailure = false + const currentItem = + this._context.steps[loopStepNumber]?.currentItem + if (currentItem && typeof currentItem === "object") { + isFailure = Object.keys(currentItem).some(value => { + return currentItem[value] === loopStep?.inputs.failure + }) + } else { + isFailure = + currentItem && currentItem === loopStep.inputs.failure + } + + if (isFailure) { + this.updateContextAndOutput( + loopStepNumber, + step, + tempOutput, + { + status: AutomationErrors.FAILURE_CONDITION, + success: false, + } + ) + loopSteps = undefined + loopStep = undefined + break + } + } + + // execution stopped, record state for that + if (stopped) { + this.updateExecutionOutput( + step.id, + step.stepId, + {}, + STOPPED_STATUS + ) + continue + } + + // If it's a loop step, we need to manually add the bindings to the context + let stepFn = await this.getStepFunctionality(step.stepId) + let inputs = await processObject(originalStepInput, this._context) + inputs = automationUtils.cleanInputValues( + inputs, + step.schema.inputs + ) + + try { + // appId is always passed + const outputs = await stepFn({ + inputs: inputs, + appId: this._appId, + emitter: this._emitter, + context: this._context, + }) + + this._context.steps[stepCount] = outputs + // if filter causes us to stop execution don't break the loop, set a var + // so that we can finish iterating through the steps and record that it stopped + if (step.stepId === FILTER_STEP_ID && !outputs.result) { + stopped = true + this.updateExecutionOutput( + step.id, + step.stepId, + step.inputs, + { + ...outputs, + ...STOPPED_STATUS, + } + ) + continue + } + if (loopStep && loopSteps) { + loopSteps.push(outputs) + } else { + this.updateExecutionOutput( + step.id, + step.stepId, + step.inputs, + outputs + ) + } + } catch (err) { + console.error(`Automation error - ${step.stepId} - ${err}`) + return err + } + + if (loopStep) { + iterationCount++ + if (index === iterations - 1) { + loopStep = undefined + this._context.steps.splice(loopStepNumber, 1) + break + } + } + } + } finally { + stepSpan?.finish() + } + + if (loopStep && iterations === 0) { loopStep = undefined + this.executionOutput.steps.splice(loopStepNumber + 1, 0, { + id: step.id, + stepId: step.stepId, + outputs: { + status: AutomationStepStatus.NO_ITERATIONS, + success: true, + }, + inputs: {}, + }) + this._context.steps.splice(loopStepNumber, 1) - break + iterations = 1 + } + + // Delete the step after the loop step as it's irrelevant, since information is included + // in the loop step + if (wasLoopStep && !loopStep) { + this._context.steps.splice(loopStepNumber + 1, 1) + wasLoopStep = false + } + if (loopSteps && loopSteps.length) { + let tempOutput = { + success: true, + items: loopSteps, + iterations: iterationCount, + } + this.executionOutput.steps.splice(loopStepNumber + 1, 0, { + id: step.id, + stepId: step.stepId, + outputs: tempOutput, + inputs: step.inputs, + }) + this._context.steps[loopStepNumber] = tempOutput + + wasLoopStep = true + loopSteps = [] } } - } - if (loopStep && iterations === 0) { - loopStep = undefined - this.executionOutput.steps.splice(loopStepNumber + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: { - status: AutomationStepStatus.NO_ITERATIONS, - success: true, - }, - inputs: {}, - }) + const end = performance.now() + const executionTime = end - start - this._context.steps.splice(loopStepNumber, 1) - iterations = 1 - } + console.info( + `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, + { + _logKey: "automation", + executionTime, + } + ) - // Delete the step after the loop step as it's irrelevant, since information is included - // in the loop step - if (wasLoopStep && !loopStep) { - this._context.steps.splice(loopStepNumber + 1, 1) - wasLoopStep = false - } - if (loopSteps && loopSteps.length) { - let tempOutput = { - success: true, - items: loopSteps, - iterations: iterationCount, + // store the logs for the automation run + try { + await storeLog(this._automation, this.executionOutput) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) } - this.executionOutput.steps.splice(loopStepNumber + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: tempOutput, - inputs: step.inputs, - }) - this._context.steps[loopStepNumber] = tempOutput - - wasLoopStep = true - loopSteps = [] - } - } - - const end = performance.now() - const executionTime = end - start - - console.info( - `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, - { - _logKey: "automation", - executionTime, + if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { + await this.updateMetadata(metadata) + } + return this.executionOutput } ) - - // store the logs for the automation run - try { - await storeLog(this._automation, this.executionOutput) - } catch (e: any) { - if (e.status === 413 && e.request?.data) { - // if content is too large we shouldn't log it - delete e.request.data - e.request.data = { message: "removed due to large size" } - } - logging.logAlert("Error writing automation log", e) - } - if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { - await this.updateMetadata(metadata) - } - return this.executionOutput } } diff --git a/packages/server/src/utilities/rowProcessor/utils.ts b/packages/server/src/utilities/rowProcessor/utils.ts index 22c099c58c..cafd366cae 100644 --- a/packages/server/src/utilities/rowProcessor/utils.ts +++ b/packages/server/src/utilities/rowProcessor/utils.ts @@ -11,6 +11,7 @@ import { Row, Table, } from "@budibase/types" +import tracer from "dd-trace" interface FormulaOpts { dynamic?: boolean @@ -50,35 +51,42 @@ export function processFormulas( inputRows: T, { dynamic, contextRows }: FormulaOpts = { dynamic: true } ): T { - const rows = Array.isArray(inputRows) ? inputRows : [inputRows] - if (rows) { - for (let [column, schema] of Object.entries(table.schema)) { - if (schema.type !== FieldTypes.FORMULA) { - continue - } + return tracer.trace("processFormulas", {}, span => { + const numRows = Array.isArray(inputRows) ? inputRows.length : 1 + span?.addTags({ table_id: table._id, dynamic, numRows }) + const rows = Array.isArray(inputRows) ? inputRows : [inputRows] + if (rows) { + for (let [column, schema] of Object.entries(table.schema)) { + if (schema.type !== FieldTypes.FORMULA) { + continue + } - const isStatic = schema.formulaType === FormulaTypes.STATIC + const isStatic = schema.formulaType === FormulaTypes.STATIC - if ( - schema.formula == null || - (dynamic && isStatic) || - (!dynamic && !isStatic) - ) { - continue - } - // iterate through rows and process formula - for (let i = 0; i < rows.length; i++) { - let row = rows[i] - let context = contextRows ? contextRows[i] : row - let formula = schema.formula - rows[i] = { - ...row, - [column]: processStringSync(formula, context), + if ( + schema.formula == null || + (dynamic && isStatic) || + (!dynamic && !isStatic) + ) { + continue + } + // iterate through rows and process formula + for (let i = 0; i < rows.length; i++) { + let row = rows[i] + let context = contextRows ? contextRows[i] : row + let formula = schema.formula + rows[i] = { + ...row, + [column]: tracer.trace("processStringSync", {}, span => { + span?.addTags({ table_id: table._id, column, static: isStatic }) + return processStringSync(formula, context) + }), + } } } } - } - return Array.isArray(inputRows) ? rows : rows[0] + return Array.isArray(inputRows) ? rows : rows[0] + }) } /**