Merge pull request #10637 from Budibase/merge-master

Merge master -> develop
This commit is contained in:
Rory Powell 2023-05-18 10:13:35 +01:00 committed by GitHub
commit cea7865ad8
15 changed files with 271 additions and 86 deletions

View File

@ -1,5 +1,5 @@
{ {
"version": "2.6.16-alpha.5", "version": "2.6.18",
"npmClient": "yarn", "npmClient": "yarn",
"packages": [ "packages": [
"packages/backend-core", "packages/backend-core",

View File

@ -104,6 +104,22 @@ async function newContext(updates: ContextMap, task: any) {
return Context.run(context, task) return Context.run(context, task)
} }
export async function doInAutomationContext(params: {
appId: string
automationId: string
task: any
}): Promise<any> {
const tenantId = getTenantIDFromAppID(params.appId)
return newContext(
{
tenantId,
appId: params.appId,
automationId: params.automationId,
},
params.task
)
}
export async function doInContext(appId: string, task: any): Promise<any> { export async function doInContext(appId: string, task: any): Promise<any> {
const tenantId = getTenantIDFromAppID(appId) const tenantId = getTenantIDFromAppID(appId)
return newContext( return newContext(
@ -187,6 +203,11 @@ export function getTenantId(): string {
return tenantId return tenantId
} }
export function getAutomationId(): string | undefined {
const context = Context.get()
return context?.automationId
}
export function getAppId(): string | undefined { export function getAppId(): string | undefined {
const context = Context.get() const context = Context.get()
const foundId = context?.appId const foundId = context?.appId

View File

@ -7,4 +7,5 @@ export type ContextMap = {
identity?: IdentityContext identity?: IdentityContext
environmentVariables?: Record<string, string> environmentVariables?: Record<string, string>
isScim?: boolean isScim?: boolean
automationId?: string
} }

View File

@ -39,6 +39,7 @@ if (!env.DISABLE_PINO_LOGGER) {
objects?: any[] objects?: any[]
tenantId?: string tenantId?: string
appId?: string appId?: string
automationId?: string
identityId?: string identityId?: string
identityType?: IdentityType identityType?: IdentityType
correlationId?: string correlationId?: string
@ -86,18 +87,44 @@ if (!env.DISABLE_PINO_LOGGER) {
contextObject = { contextObject = {
tenantId: getTenantId(), tenantId: getTenantId(),
appId: getAppId(), appId: getAppId(),
automationId: getAutomationId(),
identityId: identity?._id, identityId: identity?._id,
identityType: identity?.type, identityType: identity?.type,
correlationId: correlation.getId(), correlationId: correlation.getId(),
} }
} }
const mergingObject = { const mergingObject: any = {
objects: objects.length ? objects : undefined,
err: error, err: error,
...contextObject, ...contextObject,
} }
if (objects.length) {
// init generic data object for params supplied that don't have a
// '_logKey' field. This prints an object using argument index as the key
// e.g. { 0: {}, 1: {} }
const data: any = {}
let dataIndex = 0
for (let i = 0; i < objects.length; i++) {
const object = objects[i]
// the object has specified a log key
// use this instead of generic key
const logKey = object._logKey
if (logKey) {
delete object._logKey
mergingObject[logKey] = object
} else {
data[dataIndex] = object
dataIndex++
}
}
if (Object.keys(data).length) {
mergingObject.data = data
}
}
return [mergingObject, message] return [mergingObject, message]
} }
@ -159,6 +186,16 @@ if (!env.DISABLE_PINO_LOGGER) {
return appId return appId
} }
const getAutomationId = () => {
let appId
try {
appId = context.getAutomationId()
} catch (e) {
// do nothing
}
return appId
}
const getIdentity = () => { const getIdentity = () => {
let identity let identity
try { try {

View File

@ -128,6 +128,7 @@ class InMemoryQueue {
on() { on() {
// do nothing // do nothing
return this
} }
async waitForCompletion() { async waitForCompletion() {

View File

@ -1,5 +1,6 @@
import { Job, JobId, Queue } from "bull" import { Job, JobId, Queue } from "bull"
import { JobQueue } from "./constants" import { JobQueue } from "./constants"
import * as context from "../context"
export type StalledFn = (job: Job) => Promise<void> export type StalledFn = (job: Job) => Promise<void>
@ -31,77 +32,164 @@ function handleStalled(queue: Queue, removeStalledCb?: StalledFn) {
}) })
} }
function logging(queue: Queue, jobQueue: JobQueue) { function getLogParams(
let eventType: string eventType: QueueEventType,
switch (jobQueue) { event: BullEvent,
case JobQueue.AUTOMATION: opts: {
eventType = "automation-event" job?: Job
break jobId?: JobId
case JobQueue.APP_BACKUP: error?: Error
eventType = "app-backup-event" } = {},
break extra: any = {}
case JobQueue.AUDIT_LOG: ) {
eventType = "audit-log-event" const message = `[BULL] ${eventType}=${event}`
break const err = opts.error
case JobQueue.SYSTEM_EVENT_QUEUE:
eventType = "system-event" const bullLog = {
break _logKey: "bull",
eventType,
event,
job: opts.job,
jobId: opts.jobId || opts.job?.id,
...extra,
} }
if (process.env.NODE_DEBUG?.includes("bull")) {
let automationLog
if (opts.job?.data?.automation) {
automationLog = {
_logKey: "automation",
trigger: opts.job
? opts.job.data.automation.definition.trigger.event
: undefined,
}
}
return [message, err, bullLog, automationLog]
}
enum BullEvent {
ERROR = "error",
WAITING = "waiting",
ACTIVE = "active",
STALLED = "stalled",
PROGRESS = "progress",
COMPLETED = "completed",
FAILED = "failed",
PAUSED = "paused",
RESUMED = "resumed",
CLEANED = "cleaned",
DRAINED = "drained",
REMOVED = "removed",
}
enum QueueEventType {
AUTOMATION_EVENT = "automation-event",
APP_BACKUP_EVENT = "app-backup-event",
AUDIT_LOG_EVENT = "audit-log-event",
SYSTEM_EVENT = "system-event",
}
const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
[JobQueue.AUTOMATION]: QueueEventType.AUTOMATION_EVENT,
[JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT,
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
}
function logging(queue: Queue, jobQueue: JobQueue) {
const eventType = EventTypeMap[jobQueue]
function doInJobContext(job: Job, task: any) {
// if this is an automation job try to get the app id
const appId = job.data.event?.appId
if (appId) {
return context.doInContext(appId, task)
} else {
task()
}
}
queue queue
.on("error", (error: any) => { .on(BullEvent.STALLED, async (job: Job) => {
// An error occurred.
console.error(`${eventType}=error error=${JSON.stringify(error)}`)
})
.on("waiting", (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling.
console.log(`${eventType}=waiting jobId=${jobId}`)
})
.on("active", (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log(`${eventType}=active jobId=${job.id}`)
})
.on("stalled", (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job // A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop. // workers that crash or pause the event loop.
console.error( await doInJobContext(job, () => {
`${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}` console.error(...getLogParams(eventType, BullEvent.STALLED, { job }))
})
})
.on(BullEvent.ERROR, (error: any) => {
// An error occurred.
console.error(...getLogParams(eventType, BullEvent.ERROR, { error }))
})
if (process.env.NODE_DEBUG?.includes("bull")) {
queue
.on(BullEvent.WAITING, (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling.
console.info(...getLogParams(eventType, BullEvent.WAITING, { jobId }))
})
.on(BullEvent.ACTIVE, async (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it.
await doInJobContext(job, () => {
console.info(...getLogParams(eventType, BullEvent.ACTIVE, { job }))
})
})
.on(BullEvent.PROGRESS, async (job: Job, progress: any) => {
// A job's progress was updated
await doInJobContext(job, () => {
console.info(
...getLogParams(
eventType,
BullEvent.PROGRESS,
{ job },
{ progress }
)
) )
}) })
.on("progress", (job: Job, progress: any) => {
// A job's progress was updated!
console.log(
`${eventType}=progress jobId=${job.id} progress=${progress}`
)
}) })
.on("completed", (job: Job, result) => { .on(BullEvent.COMPLETED, async (job: Job, result) => {
// A job successfully completed with a `result`. // A job successfully completed with a `result`.
console.log(`${eventType}=completed jobId=${job.id} result=${result}`) await doInJobContext(job, () => {
console.info(
...getLogParams(eventType, BullEvent.COMPLETED, { job }, { result })
)
}) })
.on("failed", (job, err: any) => { })
.on(BullEvent.FAILED, async (job: Job, error: any) => {
// A job failed with reason `err`! // A job failed with reason `err`!
console.log(`${eventType}=failed jobId=${job.id} error=${err}`) await doInJobContext(job, () => {
console.error(
...getLogParams(eventType, BullEvent.FAILED, { job, error })
)
}) })
.on("paused", () => { })
.on(BullEvent.PAUSED, () => {
// The queue has been paused. // The queue has been paused.
console.log(`${eventType}=paused`) console.info(...getLogParams(eventType, BullEvent.PAUSED))
}) })
.on("resumed", (job: Job) => { .on(BullEvent.RESUMED, () => {
// The queue has been resumed. // The queue has been resumed.
console.log(`${eventType}=paused jobId=${job.id}`) console.info(...getLogParams(eventType, BullEvent.RESUMED))
}) })
.on("cleaned", (jobs: Job[], type: string) => { .on(BullEvent.CLEANED, (jobs: Job[], type: string) => {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned. // jobs, and `type` is the type of jobs cleaned.
console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`) console.info(
...getLogParams(
eventType,
BullEvent.CLEANED,
{},
{ length: jobs.length, type }
)
)
}) })
.on("drained", () => { .on(BullEvent.DRAINED, () => {
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
console.log(`${eventType}=drained`) console.info(...getLogParams(eventType, BullEvent.DRAINED))
}) })
.on("removed", (job: Job) => { .on(BullEvent.REMOVED, (job: Job) => {
// A job successfully removed. // A job successfully removed.
console.log(`${eventType}=removed jobId=${job.id}`) console.info(...getLogParams(eventType, BullEvent.REMOVED, { job }))
}) })
} }
} }

View File

@ -9,7 +9,7 @@ import { checkTestFlag } from "../utilities/redis"
import * as utils from "./utils" import * as utils from "./utils"
import env from "../environment" import env from "../environment"
import { context, db as dbCore } from "@budibase/backend-core" import { context, db as dbCore } from "@budibase/backend-core"
import { Automation, Row } from "@budibase/types" import { Automation, Row, AutomationData, AutomationJob } from "@budibase/types"
export const TRIGGER_DEFINITIONS = definitions export const TRIGGER_DEFINITIONS = definitions
const JOB_OPTS = { const JOB_OPTS = {
@ -109,14 +109,16 @@ export async function externalTrigger(
} }
params.fields = coercedFields params.fields = coercedFields
} }
const data: Record<string, any> = { automation, event: params }
const data: AutomationData = { automation, event: params as any }
if (getResponses) { if (getResponses) {
data.event = { data.event = {
...data.event, ...data.event,
appId: context.getAppId(), appId: context.getAppId(),
automation, automation,
} }
return utils.processEvent({ data }) const job = { data } as AutomationJob
return utils.processEvent(job)
} else { } else {
return automationQueue.add(data, JOB_OPTS) return automationQueue.add(data, JOB_OPTS)
} }

View File

@ -8,7 +8,7 @@ import { db as dbCore, context } from "@budibase/backend-core"
import { getAutomationMetadataParams } from "../db/utils" import { getAutomationMetadataParams } from "../db/utils"
import { cloneDeep } from "lodash/fp" import { cloneDeep } from "lodash/fp"
import { quotas } from "@budibase/pro" import { quotas } from "@budibase/pro"
import { Automation, WebhookActionType } from "@budibase/types" import { Automation, AutomationJob, WebhookActionType } from "@budibase/types"
import sdk from "../sdk" import sdk from "../sdk"
const REBOOT_CRON = "@reboot" const REBOOT_CRON = "@reboot"
@ -16,29 +16,42 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId
const CRON_STEP_ID = definitions.CRON.stepId const CRON_STEP_ID = definitions.CRON.stepId
const Runner = new Thread(ThreadType.AUTOMATION) const Runner = new Thread(ThreadType.AUTOMATION)
const jobMessage = (job: any, message: string) => { function loggingArgs(job: AutomationJob) {
return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}` return [
{
_logKey: "automation",
trigger: job.data.automation.definition.trigger.event,
},
{
_logKey: "bull",
jobId: job.id,
},
]
} }
export async function processEvent(job: any) { export async function processEvent(job: AutomationJob) {
const appId = job.data.event.appId!
const automationId = job.data.automation._id!
const task = async () => {
try { try {
const automationId = job.data.automation._id
console.log(jobMessage(job, "running"))
// need to actually await these so that an error can be captured properly // need to actually await these so that an error can be captured properly
return await context.doInContext(job.data.event.appId, async () => { console.log("automation running", ...loggingArgs(job))
const runFn = () => Runner.run(job) const runFn = () => Runner.run(job)
return quotas.addAutomation(runFn, { const result = await quotas.addAutomation(runFn, {
automationId, automationId,
}) })
}) console.log("automation completed", ...loggingArgs(job))
return result
} catch (err) { } catch (err) {
const errJson = JSON.stringify(err) console.error(`automation was unable to run`, err, ...loggingArgs(job))
console.error(jobMessage(job, `was unable to run - ${errJson}`))
console.trace(err)
return { err } return { err }
} }
} }
return await context.doInAutomationContext({ appId, automationId, task })
}
export async function updateTestHistory( export async function updateTestHistory(
appId: any, appId: any,
automation: any, automation: any,

View File

@ -1,4 +1,4 @@
import { AutomationResults, AutomationStep, Document } from "@budibase/types" import { AutomationResults, AutomationStep } from "@budibase/types"
export enum LoopStepType { export enum LoopStepType {
ARRAY = "Array", ARRAY = "Array",
@ -27,7 +27,3 @@ export interface AutomationContext extends AutomationResults {
env?: Record<string, string> env?: Record<string, string>
trigger: any trigger: any
} }
export interface AutomationMetadata extends Document {
errorCount?: number
}

View File

@ -13,13 +13,18 @@ import { generateAutomationMetadataID, isProdAppID } from "../db/utils"
import { definitions as triggerDefs } from "../automations/triggerInfo" import { definitions as triggerDefs } from "../automations/triggerInfo"
import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants" import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants"
import { storeLog } from "../automations/logging" import { storeLog } from "../automations/logging"
import { Automation, AutomationStep, AutomationStatus } from "@budibase/types" import {
Automation,
AutomationStep,
AutomationStatus,
AutomationMetadata,
AutomationJob,
} from "@budibase/types"
import { import {
LoopStep, LoopStep,
LoopInput, LoopInput,
TriggerOutput, TriggerOutput,
AutomationContext, AutomationContext,
AutomationMetadata,
} from "../definitions/automations" } from "../definitions/automations"
import { WorkerCallback } from "./definitions" import { WorkerCallback } from "./definitions"
import { context, logging } from "@budibase/backend-core" import { context, logging } from "@budibase/backend-core"
@ -60,11 +65,11 @@ class Orchestrator {
_job: Job _job: Job
executionOutput: AutomationContext executionOutput: AutomationContext
constructor(job: Job) { constructor(job: AutomationJob) {
let automation = job.data.automation, let automation = job.data.automation
triggerOutput = job.data.event let triggerOutput = job.data.event
const metadata = triggerOutput.metadata const metadata = triggerOutput.metadata
this._chainCount = metadata ? metadata.automationChainCount : 0 this._chainCount = metadata ? metadata.automationChainCount! : 0
this._appId = triggerOutput.appId as string this._appId = triggerOutput.appId as string
this._job = job this._job = job
const triggerStepId = automation.definition.trigger.stepId const triggerStepId = automation.definition.trigger.stepId

View File

@ -1,5 +1,3 @@
import { EnvironmentVariablesDecrypted } from "@budibase/types"
export type WorkerCallback = (error: any, response?: any) => void export type WorkerCallback = (error: any, response?: any) => void
export interface QueryEvent { export interface QueryEvent {

View File

@ -1,5 +1,7 @@
import workerFarm from "worker-farm" import workerFarm from "worker-farm"
import env from "../environment" import env from "../environment"
import { AutomationJob } from "@budibase/types"
import { QueryEvent } from "./definitions"
export const ThreadType = { export const ThreadType = {
QUERY: "query", QUERY: "query",
@ -64,11 +66,11 @@ export class Thread {
) )
} }
run(data: any) { run(job: AutomationJob | QueryEvent) {
const timeout = this.timeoutMs const timeout = this.timeoutMs
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
function fire(worker: any) { function fire(worker: any) {
worker.execute(data, (err: any, response: any) => { worker.execute(job, (err: any, response: any) => {
if (err && err.type === "TimeoutError") { if (err && err.type === "TimeoutError") {
reject( reject(
new Error(`Query response time exceeded ${timeout}ms timeout.`) new Error(`Query response time exceeded ${timeout}ms timeout.`)

View File

@ -178,3 +178,8 @@ export type AutomationStepInput = {
appId: string appId: string
apiKey?: string apiKey?: string
} }
export interface AutomationMetadata extends Document {
errorCount?: number
automationChainCount?: number
}

View File

@ -0,0 +1,15 @@
import { Automation, AutomationMetadata } from "../../documents"
import { Job } from "bull"
export interface AutomationDataEvent {
appId?: string
metadata?: AutomationMetadata
automation?: Automation
}
export interface AutomationData {
event: AutomationDataEvent
automation: Automation
}
export type AutomationJob = Job<AutomationData>

View File

@ -1,3 +1,4 @@
export * from "./automations"
export * from "./hosting" export * from "./hosting"
export * from "./context" export * from "./context"
export * from "./events" export * from "./events"