diff --git a/packages/backend-core/src/logging/pino/logger.ts b/packages/backend-core/src/logging/pino/logger.ts index cebc78ffc7..c96bc83e04 100644 --- a/packages/backend-core/src/logging/pino/logger.ts +++ b/packages/backend-core/src/logging/pino/logger.ts @@ -96,6 +96,7 @@ if (!env.DISABLE_PINO_LOGGER) { const mergingObject: any = { err: error, + pid: process.pid, ...contextObject, } diff --git a/packages/pro b/packages/pro index 2adc101c1e..86c32b80e0 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit 2adc101c1ede13f861f282d702f45b94ab91fd41 +Subproject commit 86c32b80e08d2f19b57dcc2a3159667ac5a86c21 diff --git a/packages/server/package.json b/packages/server/package.json index 9f053401d8..cb53a1618e 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -100,6 +100,7 @@ "mssql": "6.2.3", "mysql2": "2.3.3", "node-fetch": "2.6.7", + "object-sizeof": "2.6.1", "open": "8.4.0", "openai": "^3.2.1", "pg": "8.10.0", diff --git a/packages/server/src/automations/logging/index.ts b/packages/server/src/automations/logging/index.ts index e3cc9d273c..9d16f15a67 100644 --- a/packages/server/src/automations/logging/index.ts +++ b/packages/server/src/automations/logging/index.ts @@ -2,6 +2,23 @@ import env from "../../environment" import { AutomationResults, Automation, App } from "@budibase/types" import { automations } from "@budibase/pro" import { db as dbUtils } from "@budibase/backend-core" +import sizeof from "object-sizeof" + +const MAX_LOG_SIZE_MB = 5 +const MB_IN_BYTES = 1024 * 1024 + +function sanitiseResults(results: AutomationResults) { + const message = `[removed] - max results size of ${MAX_LOG_SIZE_MB}MB exceeded` + for (let step of results.steps) { + step.inputs = { + message, + } + step.outputs = { + message, + success: step.outputs.success, + } + } +} export async function storeLog( automation: Automation, @@ -11,6 +28,10 @@ export async function storeLog( if (env.DISABLE_AUTOMATION_LOGS) { return } + const bytes = sizeof(results) + if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) { + sanitiseResults(results) + } await automations.logs.storeLog(automation, results) } diff --git a/packages/server/src/environment.ts b/packages/server/src/environment.ts index 3e5e475360..0ba708e7bb 100644 --- a/packages/server/src/environment.ts +++ b/packages/server/src/environment.ts @@ -80,6 +80,7 @@ const environment = { ENABLE_ANALYTICS: process.env.ENABLE_ANALYTICS, SELF_HOSTED: process.env.SELF_HOSTED, HTTP_MB_LIMIT: process.env.HTTP_MB_LIMIT, + FORKED_PROCESS_NAME: process.env.FORKED_PROCESS_NAME || "main", // old CLIENT_ID: process.env.CLIENT_ID, _set(key: string, value: any) { diff --git a/packages/server/src/threads/automation.ts b/packages/server/src/threads/automation.ts index 9db647b693..563af93303 100644 --- a/packages/server/src/threads/automation.ts +++ b/packages/server/src/threads/automation.ts @@ -19,6 +19,7 @@ import { AutomationStatus, AutomationMetadata, AutomationJob, + AutomationData, } from "@budibase/types" import { LoopStep, @@ -37,8 +38,8 @@ const LOOP_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.LOOP.stepId const CRON_STEP_ID = triggerDefs.CRON.stepId const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED } -function getLoopIterations(loopStep: LoopStep, input: LoopInput) { - const binding = automationUtils.typecastForLooping(loopStep, input) +function getLoopIterations(loopStep: LoopStep) { + let binding = loopStep.inputs.binding if (!binding) { return 0 } @@ -68,7 +69,6 @@ class Orchestrator { constructor(job: AutomationJob) { let automation = job.data.automation let triggerOutput = job.data.event - let timeout = job.data.event.timeout const metadata = triggerOutput.metadata this._chainCount = metadata ? metadata.automationChainCount! : 0 this._appId = triggerOutput.appId as string @@ -252,7 +252,7 @@ class Orchestrator { return } } - + const start = performance.now() for (let step of automation.definition.steps) { if (timeoutFlag) { break @@ -277,22 +277,17 @@ class Orchestrator { if (loopStep) { input = await processObject(loopStep.inputs, this._context) - iterations = getLoopIterations(loopStep as LoopStep, input) + iterations = getLoopIterations(loopStep as LoopStep) } for (let index = 0; index < iterations; index++) { let originalStepInput = cloneDeep(step.inputs) // Handle if the user has set a max iteration count or if it reaches the max limit set by us if (loopStep && input.binding) { - let newInput: any = await processObject( - loopStep.inputs, - cloneDeep(this._context) - ) - let tempOutput = { items: loopSteps, iterations: iterationCount } try { - newInput.binding = automationUtils.typecastForLooping( + loopStep.inputs.binding = automationUtils.typecastForLooping( loopStep as LoopStep, - newInput + loopStep.inputs as LoopInput ) } catch (err) { this.updateContextAndOutput(loopStepNumber, step, tempOutput, { @@ -303,13 +298,12 @@ class Orchestrator { loopStep = undefined break } - let item = [] if ( typeof loopStep.inputs.binding === "string" && loopStep.inputs.option === "String" ) { - item = automationUtils.stringSplit(newInput.binding) + item = automationUtils.stringSplit(loopStep.inputs.binding) } else if (Array.isArray(loopStep.inputs.binding)) { item = loopStep.inputs.binding } @@ -351,6 +345,7 @@ class Orchestrator { } } } + if ( index === env.AUTOMATION_MAX_ITERATIONS || index === parseInt(loopStep.inputs.iterations) @@ -479,8 +474,22 @@ class Orchestrator { } } + const end = performance.now() + const executionTime = end - start + + console.log(`Execution time: ${executionTime} milliseconds`) + // store the logs for the automation run - await storeLog(this._automation, this.executionOutput) + try { + await storeLog(this._automation, this.executionOutput) + } catch (e: any) { + if (e.status === 413 && e.request?.data) { + // if content is too large we shouldn't log it + delete e.request.data + e.request.data = { message: "removed due to large size" } + } + logging.logAlert("Error writing automation log", e) + } if (isProdAppID(this._appId) && isRecurring(automation) && metadata) { await this.updateMetadata(metadata) } @@ -488,23 +497,31 @@ class Orchestrator { } } -export function execute(job: Job, callback: WorkerCallback) { +export function execute(job: Job, callback: WorkerCallback) { const appId = job.data.event.appId + const automationId = job.data.automation._id if (!appId) { throw new Error("Unable to execute, event doesn't contain app ID.") } - return context.doInAppContext(appId, async () => { - const envVars = await sdkUtils.getEnvironmentVariables() - // put into automation thread for whole context - await context.doInEnvironmentContext(envVars, async () => { - const automationOrchestrator = new Orchestrator(job) - try { - const response = await automationOrchestrator.execute() - callback(null, response) - } catch (err) { - callback(err) - } - }) + if (!automationId) { + throw new Error("Unable to execute, event doesn't contain automation ID.") + } + return context.doInAutomationContext({ + appId, + automationId, + task: async () => { + const envVars = await sdkUtils.getEnvironmentVariables() + // put into automation thread for whole context + await context.doInEnvironmentContext(envVars, async () => { + const automationOrchestrator = new Orchestrator(job) + try { + const response = await automationOrchestrator.execute() + callback(null, response) + } catch (err) { + callback(err) + } + }) + }, }) } diff --git a/packages/server/src/threads/index.ts b/packages/server/src/threads/index.ts index 9b6bffa867..6afaa9bb4e 100644 --- a/packages/server/src/threads/index.ts +++ b/packages/server/src/threads/index.ts @@ -38,6 +38,9 @@ export class Thread { this.count = opts.count ? opts.count : 1 this.disableThreading = this.shouldDisableThreading() if (!this.disableThreading) { + console.debug( + `[${env.FORKED_PROCESS_NAME}] initialising worker farm type=${type}` + ) const workerOpts: any = { autoStart: true, maxConcurrentWorkers: this.count, @@ -45,6 +48,7 @@ export class Thread { env: { ...process.env, FORKED_PROCESS: "1", + FORKED_PROCESS_NAME: type, }, }, } @@ -54,6 +58,10 @@ export class Thread { } this.workers = workerFarm(workerOpts, typeToFile(type), ["execute"]) Thread.workerRefs.push(this.workers) + } else { + console.debug( + `[${env.FORKED_PROCESS_NAME}] skipping worker farm type=${type}` + ) } } @@ -72,9 +80,7 @@ export class Thread { function fire(worker: any) { worker.execute(job, (err: any, response: any) => { if (err && err.type === "TimeoutError") { - reject( - new Error(`Query response time exceeded ${timeout}ms timeout.`) - ) + reject(new Error(`Thread timeout exceeded ${timeout}ms timeout.`)) } else if (err) { reject(err) } else { diff --git a/packages/server/src/threads/utils.ts b/packages/server/src/threads/utils.ts index 37c7a6d3f9..a0f3bbdc47 100644 --- a/packages/server/src/threads/utils.ts +++ b/packages/server/src/threads/utils.ts @@ -26,8 +26,10 @@ function makeVariableKey(queryId: string, variable: string) { export function threadSetup() { // don't run this if not threading if (env.isTest() || env.DISABLE_THREADING || !env.isInThread()) { + console.debug(`[${env.FORKED_PROCESS_NAME}] thread setup skipped`) return } + console.debug(`[${env.FORKED_PROCESS_NAME}] thread setup running`) db.init() } diff --git a/yarn.lock b/yarn.lock index c5aad94d5b..c365222ce3 100644 --- a/yarn.lock +++ b/yarn.lock @@ -19043,6 +19043,13 @@ object-keys@~0.4.0: resolved "https://registry.yarnpkg.com/object-keys/-/object-keys-0.4.0.tgz#28a6aae7428dd2c3a92f3d95f21335dd204e0336" integrity sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw== +object-sizeof@2.6.1: + version "2.6.1" + resolved "https://registry.yarnpkg.com/object-sizeof/-/object-sizeof-2.6.1.tgz#1e2b6a01d182c268dbb07ee3403f539de45f63d3" + integrity sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg== + dependencies: + buffer "^6.0.3" + object-visit@^1.0.0: version "1.0.1" resolved "https://registry.yarnpkg.com/object-visit/-/object-visit-1.0.1.tgz#f79c4493af0c5377b59fe39d395e41042dd045bb"