Merge pull request #10737 from Budibase/fix/looping-performance-bug
Fix performance issue with looping in automations
This commit is contained in:
commit
3564724741
|
@ -96,6 +96,7 @@ if (!env.DISABLE_PINO_LOGGER) {
|
||||||
|
|
||||||
const mergingObject: any = {
|
const mergingObject: any = {
|
||||||
err: error,
|
err: error,
|
||||||
|
pid: process.pid,
|
||||||
...contextObject,
|
...contextObject,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 2adc101c1ede13f861f282d702f45b94ab91fd41
|
Subproject commit 86c32b80e08d2f19b57dcc2a3159667ac5a86c21
|
|
@ -100,6 +100,7 @@
|
||||||
"mssql": "6.2.3",
|
"mssql": "6.2.3",
|
||||||
"mysql2": "2.3.3",
|
"mysql2": "2.3.3",
|
||||||
"node-fetch": "2.6.7",
|
"node-fetch": "2.6.7",
|
||||||
|
"object-sizeof": "2.6.1",
|
||||||
"open": "8.4.0",
|
"open": "8.4.0",
|
||||||
"openai": "^3.2.1",
|
"openai": "^3.2.1",
|
||||||
"pg": "8.10.0",
|
"pg": "8.10.0",
|
||||||
|
|
|
@ -2,6 +2,23 @@ import env from "../../environment"
|
||||||
import { AutomationResults, Automation, App } from "@budibase/types"
|
import { AutomationResults, Automation, App } from "@budibase/types"
|
||||||
import { automations } from "@budibase/pro"
|
import { automations } from "@budibase/pro"
|
||||||
import { db as dbUtils } from "@budibase/backend-core"
|
import { db as dbUtils } from "@budibase/backend-core"
|
||||||
|
import sizeof from "object-sizeof"
|
||||||
|
|
||||||
|
const MAX_LOG_SIZE_MB = 5
|
||||||
|
const MB_IN_BYTES = 1024 * 1024
|
||||||
|
|
||||||
|
function sanitiseResults(results: AutomationResults) {
|
||||||
|
const message = `[removed] - max results size of ${MAX_LOG_SIZE_MB}MB exceeded`
|
||||||
|
for (let step of results.steps) {
|
||||||
|
step.inputs = {
|
||||||
|
message,
|
||||||
|
}
|
||||||
|
step.outputs = {
|
||||||
|
message,
|
||||||
|
success: step.outputs.success,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export async function storeLog(
|
export async function storeLog(
|
||||||
automation: Automation,
|
automation: Automation,
|
||||||
|
@ -11,6 +28,10 @@ export async function storeLog(
|
||||||
if (env.DISABLE_AUTOMATION_LOGS) {
|
if (env.DISABLE_AUTOMATION_LOGS) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
const bytes = sizeof(results)
|
||||||
|
if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) {
|
||||||
|
sanitiseResults(results)
|
||||||
|
}
|
||||||
await automations.logs.storeLog(automation, results)
|
await automations.logs.storeLog(automation, results)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,6 +80,7 @@ const environment = {
|
||||||
ENABLE_ANALYTICS: process.env.ENABLE_ANALYTICS,
|
ENABLE_ANALYTICS: process.env.ENABLE_ANALYTICS,
|
||||||
SELF_HOSTED: process.env.SELF_HOSTED,
|
SELF_HOSTED: process.env.SELF_HOSTED,
|
||||||
HTTP_MB_LIMIT: process.env.HTTP_MB_LIMIT,
|
HTTP_MB_LIMIT: process.env.HTTP_MB_LIMIT,
|
||||||
|
FORKED_PROCESS_NAME: process.env.FORKED_PROCESS_NAME || "main",
|
||||||
// old
|
// old
|
||||||
CLIENT_ID: process.env.CLIENT_ID,
|
CLIENT_ID: process.env.CLIENT_ID,
|
||||||
_set(key: string, value: any) {
|
_set(key: string, value: any) {
|
||||||
|
|
|
@ -19,6 +19,7 @@ import {
|
||||||
AutomationStatus,
|
AutomationStatus,
|
||||||
AutomationMetadata,
|
AutomationMetadata,
|
||||||
AutomationJob,
|
AutomationJob,
|
||||||
|
AutomationData,
|
||||||
} from "@budibase/types"
|
} from "@budibase/types"
|
||||||
import {
|
import {
|
||||||
LoopStep,
|
LoopStep,
|
||||||
|
@ -37,8 +38,8 @@ const LOOP_STEP_ID = actions.BUILTIN_ACTION_DEFINITIONS.LOOP.stepId
|
||||||
const CRON_STEP_ID = triggerDefs.CRON.stepId
|
const CRON_STEP_ID = triggerDefs.CRON.stepId
|
||||||
const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED }
|
const STOPPED_STATUS = { success: true, status: AutomationStatus.STOPPED }
|
||||||
|
|
||||||
function getLoopIterations(loopStep: LoopStep, input: LoopInput) {
|
function getLoopIterations(loopStep: LoopStep) {
|
||||||
const binding = automationUtils.typecastForLooping(loopStep, input)
|
let binding = loopStep.inputs.binding
|
||||||
if (!binding) {
|
if (!binding) {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -68,7 +69,6 @@ class Orchestrator {
|
||||||
constructor(job: AutomationJob) {
|
constructor(job: AutomationJob) {
|
||||||
let automation = job.data.automation
|
let automation = job.data.automation
|
||||||
let triggerOutput = job.data.event
|
let triggerOutput = job.data.event
|
||||||
let timeout = job.data.event.timeout
|
|
||||||
const metadata = triggerOutput.metadata
|
const metadata = triggerOutput.metadata
|
||||||
this._chainCount = metadata ? metadata.automationChainCount! : 0
|
this._chainCount = metadata ? metadata.automationChainCount! : 0
|
||||||
this._appId = triggerOutput.appId as string
|
this._appId = triggerOutput.appId as string
|
||||||
|
@ -252,7 +252,7 @@ class Orchestrator {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
const start = performance.now()
|
||||||
for (let step of automation.definition.steps) {
|
for (let step of automation.definition.steps) {
|
||||||
if (timeoutFlag) {
|
if (timeoutFlag) {
|
||||||
break
|
break
|
||||||
|
@ -277,22 +277,17 @@ class Orchestrator {
|
||||||
|
|
||||||
if (loopStep) {
|
if (loopStep) {
|
||||||
input = await processObject(loopStep.inputs, this._context)
|
input = await processObject(loopStep.inputs, this._context)
|
||||||
iterations = getLoopIterations(loopStep as LoopStep, input)
|
iterations = getLoopIterations(loopStep as LoopStep)
|
||||||
}
|
}
|
||||||
for (let index = 0; index < iterations; index++) {
|
for (let index = 0; index < iterations; index++) {
|
||||||
let originalStepInput = cloneDeep(step.inputs)
|
let originalStepInput = cloneDeep(step.inputs)
|
||||||
// Handle if the user has set a max iteration count or if it reaches the max limit set by us
|
// Handle if the user has set a max iteration count or if it reaches the max limit set by us
|
||||||
if (loopStep && input.binding) {
|
if (loopStep && input.binding) {
|
||||||
let newInput: any = await processObject(
|
|
||||||
loopStep.inputs,
|
|
||||||
cloneDeep(this._context)
|
|
||||||
)
|
|
||||||
|
|
||||||
let tempOutput = { items: loopSteps, iterations: iterationCount }
|
let tempOutput = { items: loopSteps, iterations: iterationCount }
|
||||||
try {
|
try {
|
||||||
newInput.binding = automationUtils.typecastForLooping(
|
loopStep.inputs.binding = automationUtils.typecastForLooping(
|
||||||
loopStep as LoopStep,
|
loopStep as LoopStep,
|
||||||
newInput
|
loopStep.inputs as LoopInput
|
||||||
)
|
)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
this.updateContextAndOutput(loopStepNumber, step, tempOutput, {
|
this.updateContextAndOutput(loopStepNumber, step, tempOutput, {
|
||||||
|
@ -303,13 +298,12 @@ class Orchestrator {
|
||||||
loopStep = undefined
|
loopStep = undefined
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
let item = []
|
let item = []
|
||||||
if (
|
if (
|
||||||
typeof loopStep.inputs.binding === "string" &&
|
typeof loopStep.inputs.binding === "string" &&
|
||||||
loopStep.inputs.option === "String"
|
loopStep.inputs.option === "String"
|
||||||
) {
|
) {
|
||||||
item = automationUtils.stringSplit(newInput.binding)
|
item = automationUtils.stringSplit(loopStep.inputs.binding)
|
||||||
} else if (Array.isArray(loopStep.inputs.binding)) {
|
} else if (Array.isArray(loopStep.inputs.binding)) {
|
||||||
item = loopStep.inputs.binding
|
item = loopStep.inputs.binding
|
||||||
}
|
}
|
||||||
|
@ -351,6 +345,7 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
if (
|
||||||
index === env.AUTOMATION_MAX_ITERATIONS ||
|
index === env.AUTOMATION_MAX_ITERATIONS ||
|
||||||
index === parseInt(loopStep.inputs.iterations)
|
index === parseInt(loopStep.inputs.iterations)
|
||||||
|
@ -479,8 +474,22 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const end = performance.now()
|
||||||
|
const executionTime = end - start
|
||||||
|
|
||||||
|
console.log(`Execution time: ${executionTime} milliseconds`)
|
||||||
|
|
||||||
// store the logs for the automation run
|
// store the logs for the automation run
|
||||||
|
try {
|
||||||
await storeLog(this._automation, this.executionOutput)
|
await storeLog(this._automation, this.executionOutput)
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.status === 413 && e.request?.data) {
|
||||||
|
// if content is too large we shouldn't log it
|
||||||
|
delete e.request.data
|
||||||
|
e.request.data = { message: "removed due to large size" }
|
||||||
|
}
|
||||||
|
logging.logAlert("Error writing automation log", e)
|
||||||
|
}
|
||||||
if (isProdAppID(this._appId) && isRecurring(automation) && metadata) {
|
if (isProdAppID(this._appId) && isRecurring(automation) && metadata) {
|
||||||
await this.updateMetadata(metadata)
|
await this.updateMetadata(metadata)
|
||||||
}
|
}
|
||||||
|
@ -488,12 +497,19 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export function execute(job: Job, callback: WorkerCallback) {
|
export function execute(job: Job<AutomationData>, callback: WorkerCallback) {
|
||||||
const appId = job.data.event.appId
|
const appId = job.data.event.appId
|
||||||
|
const automationId = job.data.automation._id
|
||||||
if (!appId) {
|
if (!appId) {
|
||||||
throw new Error("Unable to execute, event doesn't contain app ID.")
|
throw new Error("Unable to execute, event doesn't contain app ID.")
|
||||||
}
|
}
|
||||||
return context.doInAppContext(appId, async () => {
|
if (!automationId) {
|
||||||
|
throw new Error("Unable to execute, event doesn't contain automation ID.")
|
||||||
|
}
|
||||||
|
return context.doInAutomationContext({
|
||||||
|
appId,
|
||||||
|
automationId,
|
||||||
|
task: async () => {
|
||||||
const envVars = await sdkUtils.getEnvironmentVariables()
|
const envVars = await sdkUtils.getEnvironmentVariables()
|
||||||
// put into automation thread for whole context
|
// put into automation thread for whole context
|
||||||
await context.doInEnvironmentContext(envVars, async () => {
|
await context.doInEnvironmentContext(envVars, async () => {
|
||||||
|
@ -505,6 +521,7 @@ export function execute(job: Job, callback: WorkerCallback) {
|
||||||
callback(err)
|
callback(err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -38,6 +38,9 @@ export class Thread {
|
||||||
this.count = opts.count ? opts.count : 1
|
this.count = opts.count ? opts.count : 1
|
||||||
this.disableThreading = this.shouldDisableThreading()
|
this.disableThreading = this.shouldDisableThreading()
|
||||||
if (!this.disableThreading) {
|
if (!this.disableThreading) {
|
||||||
|
console.debug(
|
||||||
|
`[${env.FORKED_PROCESS_NAME}] initialising worker farm type=${type}`
|
||||||
|
)
|
||||||
const workerOpts: any = {
|
const workerOpts: any = {
|
||||||
autoStart: true,
|
autoStart: true,
|
||||||
maxConcurrentWorkers: this.count,
|
maxConcurrentWorkers: this.count,
|
||||||
|
@ -45,6 +48,7 @@ export class Thread {
|
||||||
env: {
|
env: {
|
||||||
...process.env,
|
...process.env,
|
||||||
FORKED_PROCESS: "1",
|
FORKED_PROCESS: "1",
|
||||||
|
FORKED_PROCESS_NAME: type,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -54,6 +58,10 @@ export class Thread {
|
||||||
}
|
}
|
||||||
this.workers = workerFarm(workerOpts, typeToFile(type), ["execute"])
|
this.workers = workerFarm(workerOpts, typeToFile(type), ["execute"])
|
||||||
Thread.workerRefs.push(this.workers)
|
Thread.workerRefs.push(this.workers)
|
||||||
|
} else {
|
||||||
|
console.debug(
|
||||||
|
`[${env.FORKED_PROCESS_NAME}] skipping worker farm type=${type}`
|
||||||
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -72,9 +80,7 @@ export class Thread {
|
||||||
function fire(worker: any) {
|
function fire(worker: any) {
|
||||||
worker.execute(job, (err: any, response: any) => {
|
worker.execute(job, (err: any, response: any) => {
|
||||||
if (err && err.type === "TimeoutError") {
|
if (err && err.type === "TimeoutError") {
|
||||||
reject(
|
reject(new Error(`Thread timeout exceeded ${timeout}ms timeout.`))
|
||||||
new Error(`Query response time exceeded ${timeout}ms timeout.`)
|
|
||||||
)
|
|
||||||
} else if (err) {
|
} else if (err) {
|
||||||
reject(err)
|
reject(err)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -26,8 +26,10 @@ function makeVariableKey(queryId: string, variable: string) {
|
||||||
export function threadSetup() {
|
export function threadSetup() {
|
||||||
// don't run this if not threading
|
// don't run this if not threading
|
||||||
if (env.isTest() || env.DISABLE_THREADING || !env.isInThread()) {
|
if (env.isTest() || env.DISABLE_THREADING || !env.isInThread()) {
|
||||||
|
console.debug(`[${env.FORKED_PROCESS_NAME}] thread setup skipped`)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
console.debug(`[${env.FORKED_PROCESS_NAME}] thread setup running`)
|
||||||
db.init()
|
db.init()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19043,6 +19043,13 @@ object-keys@~0.4.0:
|
||||||
resolved "https://registry.yarnpkg.com/object-keys/-/object-keys-0.4.0.tgz#28a6aae7428dd2c3a92f3d95f21335dd204e0336"
|
resolved "https://registry.yarnpkg.com/object-keys/-/object-keys-0.4.0.tgz#28a6aae7428dd2c3a92f3d95f21335dd204e0336"
|
||||||
integrity sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw==
|
integrity sha512-ncrLw+X55z7bkl5PnUvHwFK9FcGuFYo9gtjws2XtSzL+aZ8tm830P60WJ0dSmFVaSalWieW5MD7kEdnXda9yJw==
|
||||||
|
|
||||||
|
object-sizeof@2.6.1:
|
||||||
|
version "2.6.1"
|
||||||
|
resolved "https://registry.yarnpkg.com/object-sizeof/-/object-sizeof-2.6.1.tgz#1e2b6a01d182c268dbb07ee3403f539de45f63d3"
|
||||||
|
integrity sha512-a7VJ1Zx7ZuHceKwjgfsSqzV/X0PVGvpZz7ho3Dn4Cs0LLcR5e5WuV+gsbizmplD8s0nAXMJmckKB2rkSiPm/Gg==
|
||||||
|
dependencies:
|
||||||
|
buffer "^6.0.3"
|
||||||
|
|
||||||
object-visit@^1.0.0:
|
object-visit@^1.0.0:
|
||||||
version "1.0.1"
|
version "1.0.1"
|
||||||
resolved "https://registry.yarnpkg.com/object-visit/-/object-visit-1.0.1.tgz#f79c4493af0c5377b59fe39d395e41042dd045bb"
|
resolved "https://registry.yarnpkg.com/object-visit/-/object-visit-1.0.1.tgz#f79c4493af0c5377b59fe39d395e41042dd045bb"
|
||||||
|
|
Loading…
Reference in New Issue