diff --git a/packages/backend-core/src/context/mainContext.ts b/packages/backend-core/src/context/mainContext.ts index 861777b679..9aa887166e 100644 --- a/packages/backend-core/src/context/mainContext.ts +++ b/packages/backend-core/src/context/mainContext.ts @@ -104,6 +104,18 @@ async function newContext(updates: ContextMap, task: any) { return Context.run(context, task) } +export async function doInAutomationContext(appId: string, automationId: string, task: any): Promise { + const tenantId = getTenantIDFromAppID(appId) + return newContext( + { + tenantId, + appId, + automationId + }, + task + ) +} + export async function doInContext(appId: string, task: any): Promise { const tenantId = getTenantIDFromAppID(appId) return newContext( @@ -187,6 +199,11 @@ export function getTenantId(): string { return tenantId } +export function getAutomationId(): string | undefined { + const context = Context.get() + return context?.automationId +} + export function getAppId(): string | undefined { const context = Context.get() const foundId = context?.appId diff --git a/packages/backend-core/src/context/types.ts b/packages/backend-core/src/context/types.ts index 727dad80bc..d687a93594 100644 --- a/packages/backend-core/src/context/types.ts +++ b/packages/backend-core/src/context/types.ts @@ -7,4 +7,5 @@ export type ContextMap = { identity?: IdentityContext environmentVariables?: Record isScim?: boolean + automationId?: string } diff --git a/packages/backend-core/src/logging/pino/logger.ts b/packages/backend-core/src/logging/pino/logger.ts index 276377eb00..14c8e7e5a9 100644 --- a/packages/backend-core/src/logging/pino/logger.ts +++ b/packages/backend-core/src/logging/pino/logger.ts @@ -39,6 +39,7 @@ if (!env.DISABLE_PINO_LOGGER) { objects?: any[] tenantId?: string appId?: string + automationId?: string identityId?: string identityType?: IdentityType correlationId?: string @@ -86,6 +87,7 @@ if (!env.DISABLE_PINO_LOGGER) { contextObject = { tenantId: getTenantId(), appId: getAppId(), + automationId: getAutomationId(), identityId: identity?._id, identityType: identity?.type, correlationId: correlation.getId(), @@ -159,6 +161,16 @@ if (!env.DISABLE_PINO_LOGGER) { return appId } + const getAutomationId = () => { + let appId + try { + appId = context.getAutomationId() + } catch (e) { + // do nothing + } + return appId + } + const getIdentity = () => { let identity try { diff --git a/packages/backend-core/src/queue/listeners.ts b/packages/backend-core/src/queue/listeners.ts index 331b690fe9..fd59bec911 100644 --- a/packages/backend-core/src/queue/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -1,5 +1,6 @@ import { Job, JobId, Queue } from "bull" import { JobQueue } from "./constants" +import * as context from "../context" export type StalledFn = (job: Job) => Promise @@ -31,77 +32,135 @@ function handleStalled(queue: Queue, removeStalledCb?: StalledFn) { }) } -function logging(queue: Queue, jobQueue: JobQueue) { - let eventType: string - switch (jobQueue) { - case JobQueue.AUTOMATION: - eventType = "automation-event" - break - case JobQueue.APP_BACKUP: - eventType = "app-backup-event" - break - case JobQueue.AUDIT_LOG: - eventType = "audit-log-event" - break - case JobQueue.SYSTEM_EVENT_QUEUE: - eventType = "system-event" - break +function getLogParams( + eventType: QueueEventType, + event: BullEvent, + opts: { + job?: Job, + jobId?: JobId, + error?: Error + } = {}, + extra: any = {} +) { + const message = `[BULL] ${eventType}=${event}` + const err = opts.error + + const data = { + eventType, + event, + job: opts.job, + jobId: opts.jobId || opts.job?.id, + ...extra } + + return [message, err, data] +} + +enum BullEvent { + ERROR="error", + WAITING="waiting", + ACTIVE="active", + STALLED="stalled", + PROGRESS="progress", + COMPLETED="completed", + FAILED="failed", + PAUSED="paused", + RESUMED="resumed", + CLEANED="cleaned", + DRAINED="drained", + REMOVED="removed", +} + +enum QueueEventType { + AUTOMATION_EVENT="automation-event", + APP_BACKUP_EVENT="app-backup-event", + AUDIT_LOG_EVENT="audit-log-event", + SYSTEM_EVENT="system-event" +} + +const EventTypeMap: { [key in JobQueue]: QueueEventType } = { + [JobQueue.AUTOMATION]: QueueEventType.AUTOMATION_EVENT, + [JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT, + [JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT, + [JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT, +} + +function logging(queue: Queue, jobQueue: JobQueue) { + const eventType = EventTypeMap[jobQueue] + + function doInJobContext(job: Job, task: any) { + // if this is an automation job try to get the app id + const appId = job.data.event?.appId + if (appId) { + return context.doInContext(appId, task) + } else { + task() + } + } + + queue + .on(BullEvent.STALLED, async (job: Job) => { + // A job has been marked as stalled. This is useful for debugging job + // workers that crash or pause the event loop. + await doInJobContext(job, () => { + console.error(...getLogParams(eventType, BullEvent.STALLED, { job })) + }) + }) + .on(BullEvent.ERROR, (error: any) => { + // An error occurred. + console.error(...getLogParams(eventType, BullEvent.ERROR, { error })) + }) + if (process.env.NODE_DEBUG?.includes("bull")) { queue - .on("error", (error: any) => { - // An error occurred. - console.error(`${eventType}=error error=${JSON.stringify(error)}`) - }) - .on("waiting", (jobId: JobId) => { + .on(BullEvent.WAITING, (jobId: JobId) => { // A Job is waiting to be processed as soon as a worker is idling. - console.log(`${eventType}=waiting jobId=${jobId}`) + console.info(...getLogParams(eventType, BullEvent.WAITING, { jobId })) }) - .on("active", (job: Job, jobPromise: any) => { + .on(BullEvent.ACTIVE, async (job: Job, jobPromise: any) => { // A job has started. You can use `jobPromise.cancel()`` to abort it. - console.log(`${eventType}=active jobId=${job.id}`) + await doInJobContext(job, () => { + console.info(...getLogParams(eventType, BullEvent.ACTIVE, { job })) + }) }) - .on("stalled", (job: Job) => { - // A job has been marked as stalled. This is useful for debugging job - // workers that crash or pause the event loop. - console.error( - `${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}` - ) + .on(BullEvent.PROGRESS, async (job: Job, progress: any) => { + // A job's progress was updated + await doInJobContext(job, () => { + console.info(...getLogParams(eventType, BullEvent.PROGRESS, { job }, { progress })) + }) }) - .on("progress", (job: Job, progress: any) => { - // A job's progress was updated! - console.log( - `${eventType}=progress jobId=${job.id} progress=${progress}` - ) - }) - .on("completed", (job: Job, result) => { + .on(BullEvent.COMPLETED, async (job: Job, result) => { // A job successfully completed with a `result`. - console.log(`${eventType}=completed jobId=${job.id} result=${result}`) + await doInJobContext(job, () => { + console.info(...getLogParams(eventType, BullEvent.COMPLETED, { job }, { result })) + }) }) - .on("failed", (job, err: any) => { + .on(BullEvent.FAILED, async (job: Job, error: any) => { // A job failed with reason `err`! - console.log(`${eventType}=failed jobId=${job.id} error=${err}`) + await doInJobContext(job, () => { + console.error(...getLogParams(eventType, BullEvent.FAILED, { job, error })) + }) }) - .on("paused", () => { + .on(BullEvent.PAUSED, () => { // The queue has been paused. - console.log(`${eventType}=paused`) + console.info(...getLogParams(eventType, BullEvent.PAUSED)) }) - .on("resumed", (job: Job) => { + .on(BullEvent.RESUMED, () => { // The queue has been resumed. - console.log(`${eventType}=paused jobId=${job.id}`) + console.info(...getLogParams(eventType, BullEvent.RESUMED)) }) - .on("cleaned", (jobs: Job[], type: string) => { + .on(BullEvent.CLEANED, (jobs: Job[], type: string) => { // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned // jobs, and `type` is the type of jobs cleaned. - console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`) + console.info(...getLogParams(eventType, BullEvent.CLEANED, {}, { length: jobs.length, type } )) }) - .on("drained", () => { + .on(BullEvent.DRAINED, () => { // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) - console.log(`${eventType}=drained`) + console.info(...getLogParams(eventType, BullEvent.DRAINED )) }) - .on("removed", (job: Job) => { + .on(BullEvent.REMOVED, (job: Job) => { // A job successfully removed. - console.log(`${eventType}=removed jobId=${job.id}`) + console.info(...getLogParams(eventType, BullEvent.REMOVED, { job } )) }) } } diff --git a/packages/server/src/automations/utils.ts b/packages/server/src/automations/utils.ts index 5296a0fa50..50c9b77818 100644 --- a/packages/server/src/automations/utils.ts +++ b/packages/server/src/automations/utils.ts @@ -8,7 +8,7 @@ import { db as dbCore, context } from "@budibase/backend-core" import { getAutomationMetadataParams } from "../db/utils" import { cloneDeep } from "lodash/fp" import { quotas } from "@budibase/pro" -import { Automation, WebhookActionType } from "@budibase/types" +import { Automation, AutomationJob, WebhookActionType } from "@budibase/types" import sdk from "../sdk" const REBOOT_CRON = "@reboot" @@ -16,27 +16,32 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId const CRON_STEP_ID = definitions.CRON.stepId const Runner = new Thread(ThreadType.AUTOMATION) -const jobMessage = (job: any, message: string) => { - return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}` +function loggingArgs(job: AutomationJob) { + return { + jobId: job.id, + trigger: job.data.automation.definition.trigger.event + } } -export async function processEvent(job: any) { - try { - const automationId = job.data.automation._id - console.log(jobMessage(job, "running")) - // need to actually await these so that an error can be captured properly - return await context.doInContext(job.data.event.appId, async () => { +export async function processEvent(job: AutomationJob) { + const appId = job.data.event.appId! + const automationId = job.data.automation._id! + return await context.doInAutomationContext(appId, automationId, async () => { + try { + // need to actually await these so that an error can be captured properly + console.log("automation running", loggingArgs(job)) + const runFn = () => Runner.run(job) - return quotas.addAutomation(runFn, { + const result = await quotas.addAutomation(runFn, { automationId, }) - }) - } catch (err) { - const errJson = JSON.stringify(err) - console.error(jobMessage(job, `was unable to run - ${errJson}`)) - console.trace(err) - return { err } - } + console.log("automation completed", loggingArgs(job)) + return result + } catch (err) { + console.error(`automation was unable to run`, err, loggingArgs(job)) + return { err } + } + }) } export async function updateTestHistory(