diff --git a/packages/server/src/automations/automationUtils.ts b/packages/server/src/automations/automationUtils.ts index bb94df9131..eacf81ef92 100644 --- a/packages/server/src/automations/automationUtils.ts +++ b/packages/server/src/automations/automationUtils.ts @@ -294,3 +294,13 @@ export function typecastForLooping(input: LoopStepInputs) { } return input.binding } + +export function ensureMaxIterationsAsNumber( + value: number | string | undefined +): number | undefined { + if (typeof value === "number") return value + if (typeof value === "string") { + return parseInt(value) + } + return undefined +} diff --git a/packages/server/src/automations/tests/scenarios/scenarios.spec.ts b/packages/server/src/automations/tests/scenarios/scenarios.spec.ts index c0c30e46e4..a0dab7f177 100644 --- a/packages/server/src/automations/tests/scenarios/scenarios.spec.ts +++ b/packages/server/src/automations/tests/scenarios/scenarios.spec.ts @@ -23,42 +23,148 @@ describe("Automation Scenarios", () => { afterAll(setup.afterAll) - // eslint-disable-next-line jest/no-commented-out-tests - // describe("Branching automations", () => { - // eslint-disable-next-line jest/no-commented-out-tests - // it("should run an automation with a trigger, loop, and create row step", async () => { - // const builder = createAutomationBuilder({ - // name: "Test Trigger with Loop and Create Row", - // }) + describe("Branching automations", () => { + it("should run a multiple nested branching automation", async () => { + const builder = createAutomationBuilder({ + name: "Test Trigger with Loop and Create Row", + }) - // builder - // .serverLog({ text: "Starting automation" }) - // .branch({ - // topLevelBranch1: { - // steps: stepBuilder => - // stepBuilder.serverLog({ text: "Branch 1" }).branch({ - // branch1: { - // steps: stepBuilder => - // stepBuilder.serverLog({ text: "Branch 1.1" }), - // condition: { notEmpty: { column: 10 } }, - // }, - // branch2: { - // steps: stepBuilder => - // stepBuilder.serverLog({ text: "Branch 1.2" }), - // condition: { fuzzy: { column: "sadsd" } }, - // }, - // }), - // condition: { equal: { column: 10 } }, - // }, - // topLevelBranch2: { - // steps: stepBuilder => stepBuilder.serverLog({ text: "Branch 2" }), - // condition: { equal: { column: 20 } }, - // }, - // }) - // .run() - // }) + const results = await builder + .appAction({ fields: {} }) + .serverLog({ text: "Starting automation" }) + .branch({ + topLevelBranch1: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Branch 1" }).branch({ + branch1: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Branch 1.1" }), + condition: { + equal: { "steps.1.success": true }, + }, + }, + branch2: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Branch 1.2" }), + condition: { + equal: { "steps.1.success": false }, + }, + }, + }), + condition: { + equal: { "steps.1.success": true }, + }, + }, + topLevelBranch2: { + steps: stepBuilder => stepBuilder.serverLog({ text: "Branch 2" }), + condition: { + equal: { "steps.1.success": false }, + }, + }, + }) + .run() - // }) + expect(results.steps[2].outputs.message).toContain("Branch 1.1") + }) + + it("should execute correct branch based on string equality", async () => { + const builder = createAutomationBuilder({ + name: "String Equality Branching", + }) + + const results = await builder + .appAction({ fields: { status: "active" } }) + .branch({ + activeBranch: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Active user" }), + condition: { + equal: { "trigger.fields.status": "active" }, + }, + }, + inactiveBranch: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Inactive user" }), + condition: { + equal: { "trigger.fields.status": "inactive" }, + }, + }, + }) + .run() + + expect(results.steps[0].outputs.message).toContain("Active user") + }) + + it("should handle multiple conditions with AND operator", async () => { + const builder = createAutomationBuilder({ + name: "Multiple AND Conditions Branching", + }) + + const results = await builder + .appAction({ fields: { status: "active", role: "admin" } }) + .branch({ + activeAdminBranch: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Active admin user" }), + condition: { + $and: { + conditions: [ + { equal: { "trigger.fields.status": "active" } }, + { equal: { "trigger.fields.role": "admin" } }, + ], + }, + }, + }, + otherBranch: { + steps: stepBuilder => stepBuilder.serverLog({ text: "Other user" }), + condition: { + notEqual: { "trigger.fields.status": "active" }, + }, + }, + }) + .run() + + expect(results.steps[0].outputs.message).toContain("Active admin user") + }) + + it("should handle multiple conditions with OR operator", async () => { + const builder = createAutomationBuilder({ + name: "Multiple OR Conditions Branching", + }) + + const results = await builder + .appAction({ fields: { status: "test", role: "user" } }) + .branch({ + specialBranch: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Special user" }), + condition: { + $or: { + conditions: [ + { equal: { "trigger.fields.status": "test" } }, + { equal: { "trigger.fields.role": "admin" } }, + ], + }, + }, + }, + regularBranch: { + steps: stepBuilder => + stepBuilder.serverLog({ text: "Regular user" }), + condition: { + $and: { + conditions: [ + { notEqual: { "trigger.fields.status": "active" } }, + { notEqual: { "trigger.fields.role": "admin" } }, + ], + }, + }, + }, + }) + .run() + + expect(results.steps[0].outputs.message).toContain("Special user") + }) + }) describe("Loop automations", () => { it("should run an automation with a trigger, loop, and create row step", async () => { @@ -108,6 +214,89 @@ describe("Automation Scenarios", () => { }) }) + it("should run an automation where a loop step is between two normal steps to ensure context correctness", async () => { + const builder = createAutomationBuilder({ + name: "Test Trigger with Loop and Create Row", + }) + + const results = await builder + .rowSaved( + { tableId: table._id! }, + { + row: { + name: "Trigger Row", + description: "This row triggers the automation", + }, + id: "1234", + revision: "1", + } + ) + .queryRows({ + tableId: table._id!, + }) + .loop({ + option: LoopStepType.ARRAY, + binding: [1, 2, 3], + }) + .serverLog({ text: "Message {{loop.currentItem}}" }) + .serverLog({ text: "{{steps.1.rows.0._id}}" }) + .run() + + results.steps[1].outputs.items.forEach( + (output: ServerLogStepOutputs, index: number) => { + expect(output).toMatchObject({ + success: true, + }) + expect(output.message).toContain(`Message ${index + 1}`) + } + ) + + expect(results.steps[2].outputs.message).toContain("ro_ta") + }) + + it("if an incorrect type is passed to the loop it should return an error", async () => { + const builder = createAutomationBuilder({ + name: "Test Loop error", + }) + + const results = await builder + .appAction({ fields: {} }) + .loop({ + option: LoopStepType.ARRAY, + binding: "1, 2, 3", + }) + .serverLog({ text: "Message {{loop.currentItem}}" }) + .run() + + expect(results.steps[0].outputs).toEqual({ + success: false, + status: "INCORRECT_TYPE", + }) + }) + + it("ensure the loop stops if the failure condition is reached", async () => { + const builder = createAutomationBuilder({ + name: "Test Loop error", + }) + + const results = await builder + .appAction({ fields: {} }) + .loop({ + option: LoopStepType.ARRAY, + binding: ["test", "test2", "test3"], + failure: "test2", + }) + .serverLog({ text: "Message {{loop.currentItem}}" }) + .run() + + expect(results.steps[0].outputs).toEqual( + expect.objectContaining({ + status: "FAILURE_CONDITION_MET", + success: false, + }) + ) + }) + it("should run an automation where a loop is successfully run twice", async () => { const builder = createAutomationBuilder({ name: "Test Trigger with Loop and Create Row", diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index c2470e78d4..eff8407104 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -8,7 +8,7 @@ import { import * as actions from "../automations/actions" import * as automationUtils from "../automations/automationUtils" import { replaceFakeBindings } from "../automations/loopUtils" - +import { dataFilters, helpers } from "@budibase/shared-core" import { default as AutomationEmitter } from "../events/AutomationEmitter" import { generateAutomationMetadataID, isProdAppID } from "../db/utils" import { definitions as triggerDefs } from "../automations/triggerInfo" @@ -23,12 +23,14 @@ import { AutomationStatus, AutomationStep, AutomationStepStatus, + BranchStep, LoopStep, + SearchFilters, } from "@budibase/types" import { AutomationContext, TriggerOutput } from "../definitions/automations" import { WorkerCallback } from "./definitions" import { context, logging } from "@budibase/backend-core" -import { processObject } from "@budibase/string-templates" +import { processObject, processStringSync } from "@budibase/string-templates" import { cloneDeep } from "lodash/fp" import { performance } from "perf_hooks" import * as sdkUtils from "../sdk/utils" @@ -64,36 +66,40 @@ function getLoopIterations(loopStep: LoopStep) { * inputs and handles any outputs. */ class Orchestrator { - _chainCount: number - _appId: string - _automation: Automation - _emitter: any - _context: AutomationContext - _job: Job - executionOutput: AutomationContext + private chainCount: number + private appId: string + private automation: Automation + private emitter: any + private context: AutomationContext + private job: Job + private loopStepOutputs: LoopStep[] + private stopped: boolean + private executionOutput: AutomationContext constructor(job: AutomationJob) { let automation = job.data.automation let triggerOutput = job.data.event const metadata = triggerOutput.metadata - this._chainCount = metadata ? metadata.automationChainCount! : 0 - this._appId = triggerOutput.appId as string - this._job = job + this.chainCount = metadata ? metadata.automationChainCount! : 0 + this.appId = triggerOutput.appId as string + this.job = job const triggerStepId = automation.definition.trigger.stepId triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput) // remove from context delete triggerOutput.appId delete triggerOutput.metadata // step zero is never used as the template string is zero indexed for customer facing - this._context = { steps: [{}], trigger: triggerOutput } - this._automation = automation + this.context = { steps: [{}], trigger: triggerOutput } + this.automation = automation // create an emitter which has the chain count for this automation run in it, so it can block // excessive chaining if required - this._emitter = new AutomationEmitter(this._chainCount + 1) + this.emitter = new AutomationEmitter(this.chainCount + 1) this.executionOutput = { trigger: {}, steps: [] } // setup the execution output const triggerId = automation.definition.trigger.id this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput) + this.loopStepOutputs = [] + this.stopped = false } cleanupTriggerOutputs(stepId: string, triggerOutput: TriggerOutput) { @@ -112,7 +118,7 @@ class Orchestrator { } async getMetadata(): Promise { - const metadataId = generateAutomationMetadataID(this._automation._id!) + const metadataId = generateAutomationMetadataID(this.automation._id!) const db = context.getAppDB() let metadata: AutomationMetadata try { @@ -127,15 +133,15 @@ class Orchestrator { } async stopCron(reason: string) { - if (!this._job.opts.repeat) { + if (!this.job.opts.repeat) { return } logging.logWarn( - `CRON disabled reason=${reason} - ${this._appId}/${this._automation._id}` + `CRON disabled reason=${reason} - ${this.appId}/${this.automation._id}` ) - const automation = this._automation + const automation = this.automation const trigger = automation.definition.trigger - await disableCronById(this._job.id) + await disableCronById(this.job.id) this.updateExecutionOutput( trigger.id, trigger.stepId, @@ -149,7 +155,7 @@ class Orchestrator { } async checkIfShouldStop(metadata: AutomationMetadata): Promise { - if (!metadata.errorCount || !this._job.opts.repeat) { + if (!metadata.errorCount || !this.job.opts.repeat) { return false } if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) { @@ -161,7 +167,7 @@ class Orchestrator { async updateMetadata(metadata: AutomationMetadata) { const output = this.executionOutput, - automation = this._automation + automation = this.automation if (!output || !isRecurring(automation)) { return } @@ -216,7 +222,7 @@ class Orchestrator { output: any, result: { success: boolean; status: string } ) { - if (!currentLoopStepIndex) { + if (currentLoopStepIndex === undefined) { throw new Error("No loop step number provided.") } this.executionOutput.steps.splice(currentLoopStepIndex, 0, { @@ -229,7 +235,7 @@ class Orchestrator { }, inputs: step.inputs, }) - this._context.steps.splice(currentLoopStepIndex, 0, { + this.context.steps.splice(currentLoopStepIndex, 0, { ...output, success: result.success, status: result.status, @@ -242,25 +248,15 @@ class Orchestrator { { resource: "automation" }, async span => { span?.addTags({ - appId: this._appId, - automationId: this._automation._id, + appId: this.appId, + automationId: this.automation._id, }) + this.context.env = await sdkUtils.getEnvironmentVariables() - // this will retrieve from context created at start of thread - this._context.env = await sdkUtils.getEnvironmentVariables() - let automation = this._automation - let stopped = false - let loopStep: LoopStep | undefined - - let stepCount = 0 - let currentLoopStepIndex: number = 0 - let loopSteps: LoopStep[] | undefined = [] let metadata - let timeoutFlag = false - let wasLoopStep = false - let timeout = this._job.data.event.timeout + // check if this is a recurring automation, - if (isProdAppID(this._appId) && isRecurring(automation)) { + if (isProdAppID(this.appId) && isRecurring(this.automation)) { span?.addTags({ recurring: true }) metadata = await this.getMetadata() const shouldStop = await this.checkIfShouldStop(metadata) @@ -270,272 +266,22 @@ class Orchestrator { } } const start = performance.now() - for (const step of automation.definition.steps) { - const stepSpan = tracer.startSpan("Orchestrator.execute.step", { - childOf: span, - }) - stepSpan.addTags({ - resource: "automation", - step: { - stepId: step.stepId, - id: step.id, - name: step.name, - type: step.type, - title: step.stepTitle, - internal: step.internal, - deprecated: step.deprecated, - }, - }) - let input, - iterations = 1, - iterationCount = 0 - - try { - if (timeoutFlag) { - span?.addTags({ timedOut: true }) - break - } - - if (timeout) { - setTimeout(() => { - timeoutFlag = true - }, timeout || env.AUTOMATION_THREAD_TIMEOUT) - } - - stepCount++ - if (step.stepId === AutomationActionStepId.LOOP) { - loopStep = step - currentLoopStepIndex = stepCount - continue - } - - if (loopStep) { - input = await processObject(loopStep.inputs, this._context) - iterations = getLoopIterations(loopStep) - stepSpan?.addTags({ step: { iterations } }) - } - - for (let stepIndex = 0; stepIndex < iterations; stepIndex++) { - let originalStepInput = cloneDeep(step.inputs) - if (loopStep && input?.binding) { - let tempOutput = { - items: loopSteps, - iterations: iterationCount, - } - try { - loopStep.inputs.binding = automationUtils.typecastForLooping( - loopStep.inputs - ) - } catch (err) { - this.updateContextAndOutput( - currentLoopStepIndex, - step, - tempOutput, - { - status: AutomationErrors.INCORRECT_TYPE, - success: false, - } - ) - loopSteps = undefined - loopStep = undefined - break - } - let item: any[] = [] - if ( - typeof loopStep.inputs.binding === "string" && - loopStep.inputs.option === "String" - ) { - item = automationUtils.stringSplit(loopStep.inputs.binding) - } else if (Array.isArray(loopStep.inputs.binding)) { - item = loopStep.inputs.binding - } - this._context.steps[currentLoopStepIndex] = { - currentItem: item[stepIndex], - } - - originalStepInput = replaceFakeBindings( - originalStepInput, - currentLoopStepIndex - ) - - if ( - stepIndex === env.AUTOMATION_MAX_ITERATIONS || - (loopStep.inputs.iterations && - stepIndex === loopStep.inputs.iterations) - ) { - this.updateContextAndOutput( - currentLoopStepIndex, - step, - tempOutput, - { - status: AutomationErrors.MAX_ITERATIONS, - success: true, - } - ) - loopSteps = undefined - loopStep = undefined - break - } - - let isFailure = false - const currentItem = - this._context.steps[currentLoopStepIndex]?.currentItem - if (currentItem && typeof currentItem === "object") { - isFailure = Object.keys(currentItem).some(value => { - return currentItem[value] === loopStep?.inputs.failure - }) - } else { - isFailure = - currentItem && currentItem === loopStep.inputs.failure - } - - if (isFailure) { - this.updateContextAndOutput( - currentLoopStepIndex, - step, - tempOutput, - { - status: AutomationErrors.FAILURE_CONDITION, - success: false, - } - ) - loopSteps = undefined - loopStep = undefined - break - } - } - - // execution stopped, record state for that - if (stopped) { - this.updateExecutionOutput( - step.id, - step.stepId, - {}, - STOPPED_STATUS - ) - continue - } - - let stepFn = await this.getStepFunctionality( - step.stepId as AutomationActionStepId - ) - let inputs = await processObject(originalStepInput, this._context) - inputs = automationUtils.cleanInputValues( - inputs, - step.schema.inputs - ) - try { - // appId is always passed - const outputs = await stepFn({ - inputs: inputs, - appId: this._appId, - emitter: this._emitter, - context: this._context, - }) - - this._context.steps[stepCount] = outputs - // if filter causes us to stop execution don't break the loop, set a var - // so that we can finish iterating through the steps and record that it stopped - if ( - step.stepId === AutomationActionStepId.FILTER && - !outputs.result - ) { - stopped = true - this.updateExecutionOutput( - step.id, - step.stepId, - step.inputs, - { - ...outputs, - ...STOPPED_STATUS, - } - ) - continue - } - if (loopStep && loopSteps) { - loopSteps.push(outputs) - } else { - this.updateExecutionOutput( - step.id, - step.stepId, - step.inputs, - outputs - ) - } - } catch (err) { - console.error(`Automation error - ${step.stepId} - ${err}`) - return err - } - - if (loopStep) { - iterationCount++ - if (stepIndex === iterations - 1) { - loopStep = undefined - this._context.steps.splice(currentLoopStepIndex, 1) - break - } - } - } - } finally { - stepSpan?.finish() - } - - if (loopStep && iterations === 0) { - loopStep = undefined - this.executionOutput.steps.splice(currentLoopStepIndex + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: { - status: AutomationStepStatus.NO_ITERATIONS, - success: true, - }, - inputs: {}, - }) - - this._context.steps.splice(currentLoopStepIndex, 1) - iterations = 1 - } - - // Delete the step after the loop step as it's irrelevant, since information is included - // in the loop step - if (wasLoopStep && !loopStep) { - this._context.steps.splice(currentLoopStepIndex + 1, 1) - wasLoopStep = false - } - if (loopSteps && loopSteps.length) { - let tempOutput = { - success: true, - items: loopSteps, - iterations: iterationCount, - } - this.executionOutput.steps.splice(currentLoopStepIndex + 1, 0, { - id: step.id, - stepId: step.stepId, - outputs: tempOutput, - inputs: step.inputs, - }) - this._context.steps[currentLoopStepIndex] = tempOutput - - wasLoopStep = true - loopSteps = [] - } - } + await this.executeSteps(this.automation.definition.steps) const end = performance.now() const executionTime = end - start console.info( - `Automation ID: ${automation._id} Execution time: ${executionTime} milliseconds`, + `Automation ID: ${this.automation._id} Execution time: ${executionTime} milliseconds`, { _logKey: "automation", executionTime, } ) - // store the logs for the automation run try { - await storeLog(this._automation, this.executionOutput) + await storeLog(this.automation, this.executionOutput) } catch (e: any) { if (e.status === 413 && e.request?.data) { // if content is too large we shouldn't log it @@ -544,13 +290,288 @@ class Orchestrator { } logging.logAlert("Error writing automation log", e) } - if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { + if ( + isProdAppID(this.appId) && + isRecurring(this.automation) && + metadata + ) { await this.updateMetadata(metadata) } return this.executionOutput } ) } + + private async executeSteps(steps: AutomationStep[]): Promise { + return tracer.trace( + "Orchestrator.executeSteps", + { resource: "automation" }, + async span => { + let stepIndex = 0 + const timeout = + this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT + + try { + await helpers.withTimeout( + timeout, + (async () => { + while (stepIndex < steps.length) { + const step = steps[stepIndex] + if (step.stepId === AutomationActionStepId.BRANCH) { + await this.executeBranchStep(step) + stepIndex++ + } else if (step.stepId === AutomationActionStepId.LOOP) { + stepIndex = await this.executeLoopStep(step, steps, stepIndex) + } else { + await this.executeStep(step) + stepIndex++ + } + } + })() + ) + } catch (error: any) { + if (error.errno === "ETIME") { + span?.addTags({ timedOut: true }) + console.warn(`Automation execution timed out after ${timeout}ms`) + } + } + } + ) + } + + private async executeLoopStep( + loopStep: LoopStep, + steps: AutomationStep[], + currentIndex: number + ): Promise { + await processObject(loopStep.inputs, this.context) + const iterations = getLoopIterations(loopStep) + let stepToLoopIndex = currentIndex + 1 + let iterationCount = 0 + let shouldCleanup = true + + for (let loopStepIndex = 0; loopStepIndex < iterations; loopStepIndex++) { + try { + loopStep.inputs.binding = automationUtils.typecastForLooping( + loopStep.inputs + ) + } catch (err) { + this.updateContextAndOutput( + stepToLoopIndex, + steps[stepToLoopIndex], + {}, + { + status: AutomationErrors.INCORRECT_TYPE, + success: false, + } + ) + shouldCleanup = false + break + } + const maxIterations = automationUtils.ensureMaxIterationsAsNumber( + loopStep.inputs.iterations + ) + + if ( + loopStepIndex === env.AUTOMATION_MAX_ITERATIONS || + (loopStep.inputs.iterations && loopStepIndex === maxIterations) + ) { + this.updateContextAndOutput( + stepToLoopIndex, + steps[stepToLoopIndex], + { + items: this.loopStepOutputs, + iterations: loopStepIndex, + }, + { + status: AutomationErrors.MAX_ITERATIONS, + success: true, + } + ) + shouldCleanup = false + break + } + + let isFailure = false + const currentItem = this.getCurrentLoopItem(loopStep, loopStepIndex) + if (currentItem && typeof currentItem === "object") { + isFailure = Object.keys(currentItem).some(value => { + return currentItem[value] === loopStep?.inputs.failure + }) + } else { + isFailure = currentItem && currentItem === loopStep.inputs.failure + } + + if (isFailure) { + this.updateContextAndOutput( + loopStepIndex, + steps[stepToLoopIndex], + { + items: this.loopStepOutputs, + iterations: loopStepIndex, + }, + { + status: AutomationErrors.FAILURE_CONDITION, + success: false, + } + ) + shouldCleanup = false + break + } + + this.context.steps[currentIndex + 1] = { + currentItem: this.getCurrentLoopItem(loopStep, loopStepIndex), + } + + stepToLoopIndex = currentIndex + 1 + + await this.executeStep(steps[stepToLoopIndex], stepToLoopIndex) + iterationCount++ + } + + if (shouldCleanup) { + let tempOutput = + iterations === 0 + ? { + status: AutomationStepStatus.NO_ITERATIONS, + success: true, + } + : { + success: true, + items: this.loopStepOutputs, + iterations: iterationCount, + } + + // Loop Step clean up + this.executionOutput.steps.splice(currentIndex + 1, 0, { + id: steps[stepToLoopIndex].id, + stepId: steps[stepToLoopIndex].stepId, + outputs: tempOutput, + inputs: steps[stepToLoopIndex].inputs, + }) + this.context.steps[currentIndex + 1] = tempOutput + this.loopStepOutputs = [] + } + + return stepToLoopIndex + 1 + } + private async executeBranchStep(branchStep: BranchStep): Promise { + const { branches, children } = branchStep.inputs + + for (const branch of branches) { + const condition = await this.evaluateBranchCondition(branch.condition) + if (condition) { + const branchSteps = children?.[branch.name] || [] + await this.executeSteps(branchSteps) + break + } + } + } + + private async evaluateBranchCondition( + conditions: SearchFilters + ): Promise { + const toFilter: Record = {} + + const processedConditions = dataFilters.recurseSearchFilters( + conditions, + filter => { + Object.entries(filter).forEach(([_, value]) => { + Object.entries(value).forEach(([field, _]) => { + const fromContext = processStringSync( + `{{ literal ${field} }}`, + this.context + ) + toFilter[field] = fromContext + }) + }) + return filter + } + ) + + const result = dataFilters.runQuery([toFilter], processedConditions) + return result.length > 0 + } + private async executeStep( + step: AutomationStep, + loopIteration?: number + ): Promise { + return tracer.trace( + "Orchestrator.execute.step", + { resource: "automation" }, + async span => { + span?.addTags({ + resource: "automation", + step: { + stepId: step.stepId, + id: step.id, + name: step.name, + type: step.type, + title: step.stepTitle, + internal: step.internal, + deprecated: step.deprecated, + }, + }) + + if (this.stopped) { + this.updateExecutionOutput(step.id, step.stepId, {}, STOPPED_STATUS) + return + } + + let originalStepInput = cloneDeep(step.inputs) + if (loopIteration !== undefined) { + originalStepInput = replaceFakeBindings( + originalStepInput, + loopIteration + ) + } + const stepFn = await this.getStepFunctionality(step.stepId) + let inputs = await processObject(originalStepInput, this.context) + inputs = automationUtils.cleanInputValues(inputs, step.schema.inputs) + + const outputs = await stepFn({ + inputs: inputs, + appId: this.appId, + emitter: this.emitter, + context: this.context, + }) + this.handleStepOutput(step, outputs, loopIteration) + } + ) + } + + private getCurrentLoopItem(loopStep: LoopStep, index: number): any { + if (!loopStep) return null + if ( + typeof loopStep.inputs.binding === "string" && + loopStep.inputs.option === "String" + ) { + return automationUtils.stringSplit(loopStep.inputs.binding)[index] + } else if (Array.isArray(loopStep.inputs.binding)) { + return loopStep.inputs.binding[index] + } + return null + } + + private handleStepOutput( + step: AutomationStep, + outputs: any, + loopIteration: number | undefined + ): void { + if (step.stepId === AutomationActionStepId.FILTER && !outputs.result) { + this.stopped = true + this.updateExecutionOutput(step.id, step.stepId, step.inputs, { + ...outputs, + ...STOPPED_STATUS, + }) + } else if (loopIteration !== undefined) { + this.loopStepOutputs = this.loopStepOutputs || [] + this.loopStepOutputs.push(outputs) + } else { + this.updateExecutionOutput(step.id, step.stepId, step.inputs, outputs) + } + this.context.steps[this.context.steps.length] = outputs + } } export function execute(job: Job, callback: WorkerCallback) { diff --git a/packages/shared-core/src/filters.ts b/packages/shared-core/src/filters.ts index fa0b5c92ed..70fb24b373 100644 --- a/packages/shared-core/src/filters.ts +++ b/packages/shared-core/src/filters.ts @@ -127,6 +127,25 @@ export function recurseLogicalOperators( return filters } +export function recurseSearchFilters( + filters: SearchFilters, + processFn: (filter: SearchFilters) => SearchFilters +): SearchFilters { + // Process the current level + filters = processFn(filters) + + // Recurse through logical operators + for (const logical of Object.values(LogicalOperator)) { + if (filters[logical]) { + filters[logical]!.conditions = filters[logical]!.conditions.map( + condition => recurseSearchFilters(condition, processFn) + ) + } + } + + return filters +} + /** * Removes any fields that contain empty strings that would cause inconsistent * behaviour with how backend tables are filtered (no value means no filter). diff --git a/packages/shared-core/src/helpers/helpers.ts b/packages/shared-core/src/helpers/helpers.ts index 16891de35b..8dbdb7bbfd 100644 --- a/packages/shared-core/src/helpers/helpers.ts +++ b/packages/shared-core/src/helpers/helpers.ts @@ -83,3 +83,32 @@ export const getUserLabel = (user: User) => { return email } } + +export function cancelableTimeout( + timeout: number +): [Promise, () => void] { + let timeoutId: NodeJS.Timeout + return [ + new Promise((resolve, reject) => { + timeoutId = setTimeout(() => { + reject({ + status: 301, + errno: "ETIME", + }) + }, timeout) + }), + () => { + clearTimeout(timeoutId) + }, + ] +} + +export async function withTimeout( + timeout: number, + promise: Promise +): Promise { + const [timeoutPromise, cancel] = cancelableTimeout(timeout) + const result = (await Promise.race([promise, timeoutPromise])) as T + cancel() + return result +} diff --git a/packages/worker/src/api/routes/global/tests/realEmail.spec.ts b/packages/worker/src/api/routes/global/tests/realEmail.spec.ts index a18d8ee247..99dfb7f824 100644 --- a/packages/worker/src/api/routes/global/tests/realEmail.spec.ts +++ b/packages/worker/src/api/routes/global/tests/realEmail.spec.ts @@ -2,6 +2,8 @@ jest.unmock("node-fetch") import { TestConfiguration } from "../../../../tests" import { EmailTemplatePurpose } from "../../../../constants" import { objectStore } from "@budibase/backend-core" +import { helpers } from "@budibase/shared-core" + import tk from "timekeeper" import { EmailAttachment } from "@budibase/types" @@ -12,33 +14,6 @@ const nodemailer = require("nodemailer") // for the real email tests give them a long time to try complete/fail jest.setTimeout(30000) -function cancelableTimeout(timeout: number): [Promise, () => void] { - let timeoutId: NodeJS.Timeout - return [ - new Promise((resolve, reject) => { - timeoutId = setTimeout(() => { - reject({ - status: 301, - errno: "ETIME", - }) - }, timeout) - }), - () => { - clearTimeout(timeoutId) - }, - ] -} - -async function withTimeout( - timeout: number, - promise: Promise -): Promise { - const [timeoutPromise, cancel] = cancelableTimeout(timeout) - const result = (await Promise.race([promise, timeoutPromise])) as T - cancel() - return result -} - describe("/api/global/email", () => { const config = new TestConfiguration() @@ -57,8 +32,8 @@ describe("/api/global/email", () => { ) { let response, text try { - await withTimeout(20000, config.saveEtherealSmtpConfig()) - await withTimeout(20000, config.saveSettingsConfig()) + await helpers.withTimeout(20000, config.saveEtherealSmtpConfig()) + await helpers.withTimeout(20000, config.saveSettingsConfig()) let res if (attachments) { res = await config.api.emails