From 5a7f0ba58693bde1aa319318a6ed41bb02fa68ce Mon Sep 17 00:00:00 2001 From: Peter Clement Date: Tue, 30 May 2023 14:25:28 +0100 Subject: [PATCH 1/7] Fix performance issue with looping and context --- packages/server/src/threads/automation.ts | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 9db647b693..b9ef30cbca 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -37,8 +37,8 @@ const LOOP_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.LOOP.stepId const CRON_STEP_ID = triggerDefs.CRON.stepId const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } -function getLoopIterations(loopStep: LoopStep, input: LoopInput) { - const binding = automationUtils.typecastForLooping(loopStep, input) +function getLoopIterations(loopStep: LoopStep) { + let binding = loopStep.inputs.binding if (!binding) { return 0 } @@ -68,7 +68,6 @@ class Orchestrator { constructor(job: AutomationJob) { let automation = job.data.automation let triggerOutput = job.data.event - let timeout = job.data.event.timeout const metadata = triggerOutput.metadata this._chainCount = metadata ? metadata.automationChainCount! : 0 this._appId = triggerOutput.appId as string @@ -277,22 +276,17 @@ class Orchestrator { if (loopStep) { input = await processObject(loopStep.inputs, this._context) - iterations = getLoopIterations(loopStep as LoopStep, input) + iterations = getLoopIterations(loopStep as LoopStep) } for (let index = 0; index < iterations; index++) { let originalStepInput = cloneDeep(step.inputs) // Handle if the user has set a max iteration count or if it reaches the max limit set by us if (loopStep && input.binding) { - let newInput: any = await processObject( - loopStep.inputs, - cloneDeep(this._context) - ) - let tempOutput = { items: loopSteps, iterations: iterationCount } try { - newInput.binding = automationUtils.typecastForLooping( + loopStep.inputs.binding = automationUtils.typecastForLooping( loopStep as LoopStep, - newInput + loopStep.inputs as LoopInput ) } catch (err) { this.updateContextAndOutput(loopStepNumber, step, tempOutput, { @@ -303,13 +297,12 @@ class Orchestrator { loopStep = undefined break } - let item = [] if ( typeof loopStep.inputs.binding === "string" && loopStep.inputs.option === "String" ) { - item = automationUtils.stringSplit(newInput.binding) + item = automationUtils.stringSplit(loopStep.inputs.binding) } else if (Array.isArray(loopStep.inputs.binding)) { item = loopStep.inputs.binding } @@ -351,6 +344,7 @@ class Orchestrator { } } } + if ( index === env.AUTOMATION_MAX_ITERATIONS || index === parseInt(loopStep.inputs.iterations) @@ -439,6 +433,7 @@ class Orchestrator { break } } + console.log("end of loop!") } if (loopStep && iterations === 0) { From 0a91e5bed1c8a9d73dfdefbceb8e8aa72aa70131 Mon Sep 17 00:00:00 2001 From: Peter Clement Date: Tue, 30 May 2023 14:26:49 +0100 Subject: [PATCH 2/7] update let to const --- packages/server/src/threads/automation.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index b9ef30cbca..af61f3bf0a 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -38,7 +38,7 @@ const CRON_STEP_ID = triggerDefs.CRON.stepId const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } function getLoopIterations(loopStep: LoopStep) { - let binding = loopStep.inputs.binding + const binding = loopStep.inputs.binding if (!binding) { return 0 } From 3119ba5a8db8245179437f9f4c04b46c1612edc3 Mon Sep 17 00:00:00 2001 From: Peter Clement Date: Tue, 30 May 2023 14:41:04 +0100 Subject: [PATCH 3/7] remove log --- packages/server/src/threads/automation.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index af61f3bf0a..a24def349f 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -344,7 +344,6 @@ class Orchestrator { } } } - if ( index === env.AUTOMATION_MAX_ITERATIONS || index === parseInt(loopStep.inputs.iterations) @@ -433,7 +432,6 @@ class Orchestrator { break } } - console.log("end of loop!") } if (loopStep && iterations === 0) { From 6230e62b9be5c99df928591195181e8ce09217d1 Mon Sep 17 00:00:00 2001 From: Rory Powell Date: Thu, 25 May 2023 12:09:12 +0100 Subject: [PATCH 4/7] Automation investigation logging + max doc size fix for automation log --- .../backend-core/src/logging/pino/logger.ts | 1 + packages/pro | 2 +- packages/server/package.json | 1 + .../server/src/automations/logging/index.ts | 21 +++++++++ packages/server/src/environment.ts | 1 + packages/server/src/threads/automation.ts | 44 ++++++++++++------- packages/server/src/threads/index.ts | 6 ++- packages/server/src/threads/utils.ts | 2 + yarn.lock | 7 +++ 9 files changed, 68 insertions(+), 17 deletions(-) diff --git a/packages/backend-core/src/logging/pino/logger.ts b/packages/backend-core/src/logging/pino/logger.ts index cebc78ffc7..c96bc83e04 100644 --- a/packages/backend-core/src/logging/pino/logger.ts +++ b/packages/backend-core/src/logging/pino/logger.ts @@ -96,6 +96,7 @@ if (!env.DISABLE_PINO_LOGGER) { const mergingObject: any = { err: error, + pid: process.pid, ...contextObject, } diff --git a/packages/pro b/packages/pro index 2adc101c1e..86c32b80e0 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit 2adc101c1ede13f861f282d702f45b94ab91fd41 +Subproject commit 86c32b80e08d2f19b57dcc2a3159667ac5a86c21 diff --git a/packages/server/package.json b/packages/server/package.json index 9f053401d8..cb53a1618e 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -100,6 +100,7 @@ "mssql": "6.2.3", "mysql2": "2.3.3", "node-fetch": "2.6.7", + "object-sizeof": "2.6.1", "open": "8.4.0", "openai": "^3.2.1", "pg": "8.10.0", diff --git a/packages/server/src/automations/logging/index.ts b/packages/server/src/automations/logging/index.ts index e3cc9d273c..fd47990488 100644 --- a/packages/server/src/automations/logging/index.ts +++ b/packages/server/src/automations/logging/index.ts @@ -2,6 +2,23 @@ import env from "../../environment" import { AutomationResults, Automation, App } from "@budibase/types" import { automations } from "@budibase/pro" import { db as dbUtils } from "@budibase/backend-core" +import sizeof from "object-sizeof" + +const MAX_LOG_SIZE_MB = 5 +const MB_IN_BYTES = 1024 * 1024 + +function sanitiseResults(results: AutomationResults) { + const message = `[removed] - max results size of ${MAX_LOG_SIZE_MB}MB exceeded` + for (let step of results.steps) { + step.inputs = { + message, + } + step.outputs = { + message, + success: step.outputs.success + } + } +} export async function storeLog( automation: Automation, @@ -11,6 +28,10 @@ export async function storeLog( if (env.DISABLE_AUTOMATION_LOGS) { return } + const bytes = sizeof(results) + if ((bytes / MB_IN_BYTES) > MAX_LOG_SIZE_MB) { + sanitiseResults(results) + } await automations.logs.storeLog(automation, results) } diff --git a/packages/server/src/environment.ts b/packages/server/src/environment.ts index 3e5e475360..0ba708e7bb 100644 --- a/packages/server/src/environment.ts +++ b/packages/server/src/environment.ts @@ -80,6 +80,7 @@ const environment = { ENABLE_ANALYTICS: process.env.ENABLE_ANALYTICS, SELF_HOSTED: process.env.SELF_HOSTED, HTTP_MB_LIMIT: process.env.HTTP_MB_LIMIT, + FORKED_PROCESS_NAME: process.env.FORKED_PROCESS_NAME || "main", // old CLIENT_ID: process.env.CLIENT_ID, _set(key: string, value: any) { diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 9db647b693..7959f60724 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -19,6 +19,7 @@ import { AutomationStatus, AutomationMetadata, AutomationJob, + AutomationData, } from "@budibase/types" import { LoopStep, @@ -480,7 +481,16 @@ class Orchestrator { } // store the logs for the automation run - await storeLog(this._automation, this.executionOutput) + try { + await storeLog(this._automation, this.executionOutput) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) + } if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { await this.updateMetadata(metadata) } @@ -488,24 +498,28 @@ class Orchestrator { } } -export function execute(job: Job, callback: WorkerCallback) { +export function execute(job: Job, callback: WorkerCallback) { const appId = job.data.event.appId + const automationId = job.data.automation._id if (!appId) { throw new Error("Unable to execute, event doesn't contain app ID.") } - return context.doInAppContext(appId, async () => { - const envVars = await sdkUtils.getEnvironmentVariables() - // put into automation thread for whole context - await context.doInEnvironmentContext(envVars, async () => { - const automationOrchestrator = new Orchestrator(job) - try { - const response = await automationOrchestrator.execute() - callback(null, response) - } catch (err) { - callback(err) - } - }) - }) + if (!automationId) { + throw new Error("Unable to execute, event doesn't contain automation ID.") + } + return context.doInAutomationContext({ appId, automationId, task: async () => { + const envVars = await sdkUtils.getEnvironmentVariables() + // put into automation thread for whole context + await context.doInEnvironmentContext(envVars, async () => { + const automationOrchestrator = new Orchestrator(job) + try { + const response = await automationOrchestrator.execute() + callback(null, response) + } catch (err) { + callback(err) + } + }) + }}) } export function executeSynchronously(job: Job) { diff --git a/packages/server/src/threads/index.ts b/packages/server/src/threads/index.ts index 9b6bffa867..6c03e5b464 100644 --- a/packages/server/src/threads/index.ts +++ b/packages/server/src/threads/index.ts @@ -38,6 +38,7 @@ export class Thread { this.count = opts.count ? opts.count : 1 this.disableThreading = this.shouldDisableThreading() if (!this.disableThreading) { + console.debug(`[${env.FORKED_PROCESS_NAME}] initialising worker farm type=${type}`) const workerOpts: any = { autoStart: true, maxConcurrentWorkers: this.count, @@ -45,6 +46,7 @@ export class Thread { env: { ...process.env, FORKED_PROCESS: "1", + FORKED_PROCESS_NAME: type, }, }, } @@ -54,6 +56,8 @@ export class Thread { } this.workers = workerFarm(workerOpts, typeToFile(type), ["execute"]) Thread.workerRefs.push(this.workers) + } else { + console.debug(`[${env.FORKED_PROCESS_NAME}] skipping worker farm type=${type}`) } } @@ -73,7 +77,7 @@ export class Thread { worker.execute(job, (err: any, response: any) => { if (err && err.type === "TimeoutError") { reject( - new Error(`Query response time exceeded ${timeout}ms timeout.`) + new Error(`Thread timeout exceeded ${timeout}ms timeout.`) ) } else if (err) { reject(err) diff --git a/packages/server/src/threads/utils.ts b/packages/server/src/threads/utils.ts index 37c7a6d3f9..a0f3bbdc47 100644 --- a/packages/server/src/threads/utils.ts +++ b/packages/server/src/threads/utils.ts @@ -26,8 +26,10 @@ function makeVariableKey(queryId: string, variable: string) { export function threadSetup() { // don't run this if not threading if (env.isTest() || env.DISABLE_THREADING || !env.isInThread()) { + console.debug(`[${env.FORKED_PROCESS_NAME}] thread setup skipped`) return } + console.debug(`[${env.FORKED_PROCESS_NAME}] thread setup running`) db.init() } diff --git a/yarn.lock b/yarn.lock index c5aad94d5b..c365222ce3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -19043,6 +19043,13 @@ object-keys@~0.4.0: resolved "https://registry.yarnpkg.com/object-keys/-/object-keys-0.4.0.tgz#28a6aae7428dd2c3a92f3d95f21335dd204e0336" integrity sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw== +object-sizeof@2.6.1: + version "2.6.1" + resolved "https://registry.yarnpkg.com/object-sizeof/-/object-sizeof-2.6.1.tgz#1e2b6a01d182c268dbb07ee3403f539de45f63d3" + integrity sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg== + dependencies: + buffer "^6.0.3" + object-visit@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/object-visit/-/object-visit-1.0.1.tgz#f79c4493af0c5377b59fe39d395e41042dd045bb" From ed6cd8144bb8bb0cffe595ecc1956bb728cf0361 Mon Sep 17 00:00:00 2001 From: Peter Clement Date: Tue, 30 May 2023 16:52:22 +0100 Subject: [PATCH 5/7] lint --- packages/server/src/threads/automation.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index a24def349f..6dde6a5c99 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -38,7 +38,7 @@ const CRON_STEP_ID = triggerDefs.CRON.stepId const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } function getLoopIterations(loopStep: LoopStep) { - const binding = loopStep.inputs.binding + let binding = loopStep.inputs.binding if (!binding) { return 0 } @@ -251,7 +251,7 @@ class Orchestrator { return } } - + const start = performance.now() for (let step of automation.definition.steps) { if (timeoutFlag) { break @@ -344,6 +344,7 @@ class Orchestrator { } } } + if ( index === env.AUTOMATION_MAX_ITERATIONS || index === parseInt(loopStep.inputs.iterations) @@ -432,6 +433,7 @@ class Orchestrator { break } } + console.log("end of loop!") } if (loopStep && iterations === 0) { @@ -472,6 +474,11 @@ class Orchestrator { } } + const end = performance.now() + const executionTime = end - start + + console.log(`Execution time: ${executionTime} milliseconds`) + // store the logs for the automation run await storeLog(this._automation, this.executionOutput) if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { From b0783d373ac2b3376059623b0c8538969ddd0751 Mon Sep 17 00:00:00 2001 From: Rory Powell Date: Tue, 30 May 2023 19:16:36 +0100 Subject: [PATCH 6/7] Lint --- packages/server/src/automations/logging/index.ts | 4 ++-- packages/server/src/threads/automation.ts | 8 ++++++-- packages/server/src/threads/index.ts | 12 +++++++----- 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/packages/server/src/automations/logging/index.ts b/packages/server/src/automations/logging/index.ts index fd47990488..9d16f15a67 100644 --- a/packages/server/src/automations/logging/index.ts +++ b/packages/server/src/automations/logging/index.ts @@ -15,7 +15,7 @@ function sanitiseResults(results: AutomationResults) { } step.outputs = { message, - success: step.outputs.success + success: step.outputs.success, } } } @@ -29,7 +29,7 @@ export async function storeLog( return } const bytes = sizeof(results) - if ((bytes / MB_IN_BYTES) > MAX_LOG_SIZE_MB) { + if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) { sanitiseResults(results) } await automations.logs.storeLog(automation, results) diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 0d5586c235..60a07dab09 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -507,7 +507,10 @@ export function execute(job: Job, callback: WorkerCallback) { if (!automationId) { throw new Error("Unable to execute, event doesn't contain automation ID.") } - return context.doInAutomationContext({ appId, automationId, task: async () => { + return context.doInAutomationContext({ + appId, + automationId, + task: async () => { const envVars = await sdkUtils.getEnvironmentVariables() // put into automation thread for whole context await context.doInEnvironmentContext(envVars, async () => { @@ -519,7 +522,8 @@ export function execute(job: Job, callback: WorkerCallback) { callback(err) } }) - }}) + }, + }) } export function executeSynchronously(job: Job) { diff --git a/packages/server/src/threads/index.ts b/packages/server/src/threads/index.ts index 6c03e5b464..6afaa9bb4e 100644 --- a/packages/server/src/threads/index.ts +++ b/packages/server/src/threads/index.ts @@ -38,7 +38,9 @@ export class Thread { this.count = opts.count ? opts.count : 1 this.disableThreading = this.shouldDisableThreading() if (!this.disableThreading) { - console.debug(`[${env.FORKED_PROCESS_NAME}] initialising worker farm type=${type}`) + console.debug( + `[${env.FORKED_PROCESS_NAME}] initialising worker farm type=${type}` + ) const workerOpts: any = { autoStart: true, maxConcurrentWorkers: this.count, @@ -57,7 +59,9 @@ export class Thread { this.workers = workerFarm(workerOpts, typeToFile(type), ["execute"]) Thread.workerRefs.push(this.workers) } else { - console.debug(`[${env.FORKED_PROCESS_NAME}] skipping worker farm type=${type}`) + console.debug( + `[${env.FORKED_PROCESS_NAME}] skipping worker farm type=${type}` + ) } } @@ -76,9 +80,7 @@ export class Thread { function fire(worker: any) { worker.execute(job, (err: any, response: any) => { if (err && err.type === "TimeoutError") { - reject( - new Error(`Thread timeout exceeded ${timeout}ms timeout.`) - ) + reject(new Error(`Thread timeout exceeded ${timeout}ms timeout.`)) } else if (err) { reject(err) } else { From a1dbd6753509f1f904668ddca006b46cc005d870 Mon Sep 17 00:00:00 2001 From: Rory Powell Date: Tue, 30 May 2023 19:23:19 +0100 Subject: [PATCH 7/7] Remove debug log --- packages/server/src/threads/automation.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 60a07dab09..563af93303 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -434,7 +434,6 @@ class Orchestrator { break } } - console.log("end of loop!") } if (loopStep && iterations === 0) {