Improve APM traces around automations.
This commit is contained in:
parent
07a9b881cd
commit
ddf64b9f70
|
@ -123,7 +123,7 @@ export async function doInAutomationContext<T>(params: {
|
|||
task: () => T
|
||||
}): Promise<T> {
|
||||
await ensureSnippetContext()
|
||||
return newContext(
|
||||
return await newContext(
|
||||
{
|
||||
tenantId: getTenantIDFromAppID(params.appId),
|
||||
appId: params.appId,
|
||||
|
|
|
@ -40,39 +40,35 @@ function loggingArgs(job: AutomationJob) {
|
|||
}
|
||||
|
||||
export async function processEvent(job: AutomationJob) {
|
||||
return tracer.trace(
|
||||
"processEvent",
|
||||
{ resource: "automation" },
|
||||
async span => {
|
||||
const appId = job.data.event.appId!
|
||||
const automationId = job.data.automation._id!
|
||||
return tracer.trace("processEvent", async span => {
|
||||
const appId = job.data.event.appId!
|
||||
const automationId = job.data.automation._id!
|
||||
|
||||
span?.addTags({
|
||||
appId,
|
||||
automationId,
|
||||
job: {
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
attemptsMade: job.attemptsMade,
|
||||
opts: {
|
||||
attempts: job.opts.attempts,
|
||||
priority: job.opts.priority,
|
||||
delay: job.opts.delay,
|
||||
repeat: job.opts.repeat,
|
||||
backoff: job.opts.backoff,
|
||||
lifo: job.opts.lifo,
|
||||
timeout: job.opts.timeout,
|
||||
jobId: job.opts.jobId,
|
||||
removeOnComplete: job.opts.removeOnComplete,
|
||||
removeOnFail: job.opts.removeOnFail,
|
||||
stackTraceLimit: job.opts.stackTraceLimit,
|
||||
preventParsingData: job.opts.preventParsingData,
|
||||
},
|
||||
},
|
||||
})
|
||||
span.addTags({
|
||||
appId,
|
||||
automationId,
|
||||
job: {
|
||||
id: job.id,
|
||||
name: job.name,
|
||||
attemptsMade: job.attemptsMade,
|
||||
attempts: job.opts.attempts,
|
||||
priority: job.opts.priority,
|
||||
delay: job.opts.delay,
|
||||
repeat: job.opts.repeat,
|
||||
backoff: job.opts.backoff,
|
||||
lifo: job.opts.lifo,
|
||||
timeout: job.opts.timeout,
|
||||
jobId: job.opts.jobId,
|
||||
removeOnComplete: job.opts.removeOnComplete,
|
||||
removeOnFail: job.opts.removeOnFail,
|
||||
stackTraceLimit: job.opts.stackTraceLimit,
|
||||
preventParsingData: job.opts.preventParsingData,
|
||||
},
|
||||
})
|
||||
|
||||
const task = async () => {
|
||||
try {
|
||||
const task = async () => {
|
||||
try {
|
||||
return await tracer.trace("task", async () => {
|
||||
if (isCronTrigger(job.data.automation) && !job.data.event.timestamp) {
|
||||
// Requires the timestamp at run time
|
||||
job.data.event.timestamp = Date.now()
|
||||
|
@ -81,25 +77,19 @@ export async function processEvent(job: AutomationJob) {
|
|||
console.log("automation running", ...loggingArgs(job))
|
||||
|
||||
const runFn = () => Runner.run(job)
|
||||
const result = await quotas.addAutomation(runFn, {
|
||||
automationId,
|
||||
})
|
||||
const result = await quotas.addAutomation(runFn, { automationId })
|
||||
console.log("automation completed", ...loggingArgs(job))
|
||||
return result
|
||||
} catch (err) {
|
||||
span?.addTags({ error: true })
|
||||
console.error(
|
||||
`automation was unable to run`,
|
||||
err,
|
||||
...loggingArgs(job)
|
||||
)
|
||||
return { err }
|
||||
}
|
||||
})
|
||||
} catch (err) {
|
||||
span.addTags({ error: true })
|
||||
console.error(`automation was unable to run`, err, ...loggingArgs(job))
|
||||
return { err }
|
||||
}
|
||||
|
||||
return await context.doInAutomationContext({ appId, automationId, task })
|
||||
}
|
||||
)
|
||||
|
||||
return await context.doInAutomationContext({ appId, automationId, task })
|
||||
})
|
||||
}
|
||||
|
||||
export async function updateTestHistory(
|
||||
|
|
|
@ -310,87 +310,83 @@ class Orchestrator {
|
|||
}
|
||||
|
||||
async execute(): Promise<AutomationResults> {
|
||||
return tracer.trace(
|
||||
"Orchestrator.execute",
|
||||
{ resource: "automation" },
|
||||
async span => {
|
||||
span?.addTags({ appId: this.appId, automationId: this.automation._id })
|
||||
return await tracer.trace("execute", async span => {
|
||||
span.addTags({ appId: this.appId, automationId: this.automation._id })
|
||||
|
||||
const job = cloneDeep(this.job)
|
||||
delete job.data.event.appId
|
||||
delete job.data.event.metadata
|
||||
const job = cloneDeep(this.job)
|
||||
delete job.data.event.appId
|
||||
delete job.data.event.metadata
|
||||
|
||||
if (this.isCron() && !job.data.event.timestamp) {
|
||||
job.data.event.timestamp = Date.now()
|
||||
}
|
||||
|
||||
const trigger: AutomationTriggerResult = {
|
||||
id: job.data.automation.definition.trigger.id,
|
||||
stepId: job.data.automation.definition.trigger.stepId,
|
||||
inputs: null,
|
||||
outputs: job.data.event,
|
||||
}
|
||||
const result: AutomationResults = { trigger, steps: [trigger] }
|
||||
|
||||
const ctx: AutomationContext = {
|
||||
trigger: trigger.outputs,
|
||||
steps: [trigger.outputs],
|
||||
stepsById: {},
|
||||
stepsByName: {},
|
||||
user: trigger.outputs.user,
|
||||
}
|
||||
await enrichBaseContext(ctx)
|
||||
|
||||
const timeout =
|
||||
this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT
|
||||
|
||||
try {
|
||||
await helpers.withTimeout(timeout, async () => {
|
||||
const [stepOutputs, executionTime] = await utils.time(() =>
|
||||
this.executeSteps(ctx, job.data.automation.definition.steps)
|
||||
)
|
||||
|
||||
result.steps.push(...stepOutputs)
|
||||
|
||||
console.info(
|
||||
`Automation ID: ${
|
||||
this.automation._id
|
||||
} Execution time: ${executionTime.toMs()} milliseconds`,
|
||||
{
|
||||
_logKey: "automation",
|
||||
executionTime,
|
||||
}
|
||||
)
|
||||
})
|
||||
} catch (e: any) {
|
||||
if (e.errno === "ETIME") {
|
||||
span?.addTags({ timedOut: true })
|
||||
console.warn(`Automation execution timed out after ${timeout}ms`)
|
||||
}
|
||||
}
|
||||
|
||||
let errorCount = 0
|
||||
if (this.isProdApp() && this.isCron() && this.hasErrored(ctx)) {
|
||||
errorCount = (await this.incrementErrorCount()) || 0
|
||||
}
|
||||
|
||||
if (errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
||||
await this.stopCron("errors", { result })
|
||||
span?.addTags({ shouldStop: true })
|
||||
} else {
|
||||
await this.logResult(result)
|
||||
}
|
||||
|
||||
return result
|
||||
if (this.isCron() && !job.data.event.timestamp) {
|
||||
job.data.event.timestamp = Date.now()
|
||||
}
|
||||
)
|
||||
|
||||
const trigger: AutomationTriggerResult = {
|
||||
id: job.data.automation.definition.trigger.id,
|
||||
stepId: job.data.automation.definition.trigger.stepId,
|
||||
inputs: null,
|
||||
outputs: job.data.event,
|
||||
}
|
||||
const result: AutomationResults = { trigger, steps: [trigger] }
|
||||
|
||||
const ctx: AutomationContext = {
|
||||
trigger: trigger.outputs,
|
||||
steps: [trigger.outputs],
|
||||
stepsById: {},
|
||||
stepsByName: {},
|
||||
user: trigger.outputs.user,
|
||||
}
|
||||
await enrichBaseContext(ctx)
|
||||
|
||||
const timeout =
|
||||
this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT
|
||||
|
||||
try {
|
||||
await helpers.withTimeout(timeout, async () => {
|
||||
const [stepOutputs, executionTime] = await utils.time(() =>
|
||||
this.executeSteps(ctx, job.data.automation.definition.steps)
|
||||
)
|
||||
|
||||
result.steps.push(...stepOutputs)
|
||||
|
||||
console.info(
|
||||
`Automation ID: ${
|
||||
this.automation._id
|
||||
} Execution time: ${executionTime.toMs()} milliseconds`,
|
||||
{
|
||||
_logKey: "automation",
|
||||
executionTime,
|
||||
}
|
||||
)
|
||||
})
|
||||
} catch (e: any) {
|
||||
if (e.errno === "ETIME") {
|
||||
span?.addTags({ timedOut: true })
|
||||
console.warn(`Automation execution timed out after ${timeout}ms`)
|
||||
}
|
||||
}
|
||||
|
||||
let errorCount = 0
|
||||
if (this.isProdApp() && this.isCron() && this.hasErrored(ctx)) {
|
||||
errorCount = (await this.incrementErrorCount()) || 0
|
||||
}
|
||||
|
||||
if (errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
||||
await this.stopCron("errors", { result })
|
||||
span?.addTags({ shouldStop: true })
|
||||
} else {
|
||||
await this.logResult(result)
|
||||
}
|
||||
|
||||
return result
|
||||
})
|
||||
}
|
||||
|
||||
private async executeSteps(
|
||||
ctx: AutomationContext,
|
||||
steps: AutomationStep[]
|
||||
): Promise<AutomationStepResult[]> {
|
||||
return tracer.trace("Orchestrator.executeSteps", async () => {
|
||||
return await tracer.trace("executeSteps", async () => {
|
||||
let stepIndex = 0
|
||||
const results: AutomationStepResult[] = []
|
||||
|
||||
|
@ -446,74 +442,92 @@ class Orchestrator {
|
|||
step: LoopStep,
|
||||
stepToLoop: AutomationStep
|
||||
): Promise<AutomationStepResult> {
|
||||
await processObject(step.inputs, prepareContext(ctx))
|
||||
return await tracer.trace("executeLoopStep", async span => {
|
||||
await processObject(step.inputs, prepareContext(ctx))
|
||||
|
||||
const maxIterations = getLoopMaxIterations(step)
|
||||
const items: Record<string, any>[] = []
|
||||
let iterations = 0
|
||||
let iterable: any[] = []
|
||||
try {
|
||||
iterable = getLoopIterable(step)
|
||||
} catch (err) {
|
||||
return stepFailure(stepToLoop, {
|
||||
status: AutomationStepStatus.INCORRECT_TYPE,
|
||||
})
|
||||
}
|
||||
|
||||
for (; iterations < iterable.length; iterations++) {
|
||||
const currentItem = iterable[iterations]
|
||||
|
||||
if (iterations === maxIterations) {
|
||||
return stepFailure(stepToLoop, {
|
||||
status: AutomationStepStatus.MAX_ITERATIONS,
|
||||
const maxIterations = getLoopMaxIterations(step)
|
||||
const items: Record<string, any>[] = []
|
||||
let iterations = 0
|
||||
let iterable: any[] = []
|
||||
try {
|
||||
iterable = getLoopIterable(step)
|
||||
} catch (err) {
|
||||
span.addTags({
|
||||
status: AutomationStepStatus.INCORRECT_TYPE,
|
||||
iterations,
|
||||
})
|
||||
}
|
||||
|
||||
if (matchesLoopFailureCondition(step, currentItem)) {
|
||||
return stepFailure(stepToLoop, {
|
||||
status: AutomationStepStatus.FAILURE_CONDITION,
|
||||
status: AutomationStepStatus.INCORRECT_TYPE,
|
||||
})
|
||||
}
|
||||
|
||||
ctx.loop = { currentItem }
|
||||
const result = await this.executeStep(ctx, stepToLoop)
|
||||
items.push(result.outputs)
|
||||
ctx.loop = undefined
|
||||
}
|
||||
for (; iterations < iterable.length; iterations++) {
|
||||
const currentItem = iterable[iterations]
|
||||
|
||||
const status =
|
||||
iterations === 0 ? AutomationStatus.NO_CONDITION_MET : undefined
|
||||
return stepSuccess(stepToLoop, { status, iterations, items })
|
||||
if (iterations === maxIterations) {
|
||||
span.addTags({
|
||||
status: AutomationStepStatus.MAX_ITERATIONS,
|
||||
iterations,
|
||||
})
|
||||
return stepFailure(stepToLoop, {
|
||||
status: AutomationStepStatus.MAX_ITERATIONS,
|
||||
iterations,
|
||||
})
|
||||
}
|
||||
|
||||
if (matchesLoopFailureCondition(step, currentItem)) {
|
||||
span.addTags({
|
||||
status: AutomationStepStatus.FAILURE_CONDITION,
|
||||
iterations,
|
||||
})
|
||||
return stepFailure(stepToLoop, {
|
||||
status: AutomationStepStatus.FAILURE_CONDITION,
|
||||
})
|
||||
}
|
||||
|
||||
ctx.loop = { currentItem }
|
||||
const result = await this.executeStep(ctx, stepToLoop)
|
||||
items.push(result.outputs)
|
||||
ctx.loop = undefined
|
||||
}
|
||||
|
||||
const status =
|
||||
iterations === 0 ? AutomationStatus.NO_CONDITION_MET : undefined
|
||||
return stepSuccess(stepToLoop, { status, iterations, items })
|
||||
})
|
||||
}
|
||||
|
||||
private async executeBranchStep(
|
||||
ctx: AutomationContext,
|
||||
step: BranchStep
|
||||
): Promise<AutomationStepResult[]> {
|
||||
const { branches, children } = step.inputs
|
||||
return await tracer.trace("executeBranchStep", async span => {
|
||||
const { branches, children } = step.inputs
|
||||
|
||||
for (const branch of branches) {
|
||||
if (await branchMatches(ctx, branch)) {
|
||||
return [
|
||||
stepSuccess(step, {
|
||||
branchName: branch.name,
|
||||
status: `${branch.name} branch taken`,
|
||||
branchId: `${branch.id}`,
|
||||
}),
|
||||
...(await this.executeSteps(ctx, children?.[branch.id] || [])),
|
||||
]
|
||||
for (const branch of branches) {
|
||||
if (await branchMatches(ctx, branch)) {
|
||||
span.addTags({ branchName: branch.name, branchId: branch.id })
|
||||
return [
|
||||
stepSuccess(step, {
|
||||
branchName: branch.name,
|
||||
status: `${branch.name} branch taken`,
|
||||
branchId: `${branch.id}`,
|
||||
}),
|
||||
...(await this.executeSteps(ctx, children?.[branch.id] || [])),
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return [stepFailure(step, { status: AutomationStatus.NO_CONDITION_MET })]
|
||||
span.addTags({ status: AutomationStatus.NO_CONDITION_MET })
|
||||
return [stepFailure(step, { status: AutomationStatus.NO_CONDITION_MET })]
|
||||
})
|
||||
}
|
||||
|
||||
private async executeStep(
|
||||
ctx: AutomationContext,
|
||||
step: Readonly<AutomationStep>
|
||||
): Promise<AutomationStepResult> {
|
||||
return tracer.trace("Orchestrator.executeStep", async span => {
|
||||
return await tracer.trace(step.stepId, async span => {
|
||||
span.addTags({
|
||||
step: {
|
||||
stepId: step.stepId,
|
||||
|
@ -524,6 +538,7 @@ class Orchestrator {
|
|||
internal: step.internal,
|
||||
deprecated: step.deprecated,
|
||||
},
|
||||
inputsKeys: Object.keys(step.inputs),
|
||||
})
|
||||
|
||||
if (this.stopped) {
|
||||
|
@ -557,6 +572,7 @@ class Orchestrator {
|
|||
;(outputs as any).status = AutomationStatus.STOPPED
|
||||
}
|
||||
|
||||
span.addTags({ outputsKeys: Object.keys(outputs) })
|
||||
return stepSuccess(step, outputs, inputs)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue