Merge pull request #10626 from Budibase/chore/automation-logging-updates

Automation logging updates
This commit is contained in:
Rory Powell 2023-05-17 14:58:16 +01:00 committed by GitHub
commit 0aff89a479
20 changed files with 632 additions and 83 deletions

View File

@ -104,6 +104,22 @@ async function newContext(updates: ContextMap, task: any) {
return Context.run(context, task) return Context.run(context, task)
} }
export async function doInAutomationContext(params: {
appId: string,
automationId: string,
task: any
}): Promise<any> {
const tenantId = getTenantIDFromAppID(params.appId)
return newContext(
{
tenantId,
appId: params.appId,
automationId: params.automationId,
},
params.task
)
}
export async function doInContext(appId: string, task: any): Promise<any> { export async function doInContext(appId: string, task: any): Promise<any> {
const tenantId = getTenantIDFromAppID(appId) const tenantId = getTenantIDFromAppID(appId)
return newContext( return newContext(
@ -187,6 +203,11 @@ export function getTenantId(): string {
return tenantId return tenantId
} }
export function getAutomationId(): string | undefined {
const context = Context.get()
return context?.automationId
}
export function getAppId(): string | undefined { export function getAppId(): string | undefined {
const context = Context.get() const context = Context.get()
const foundId = context?.appId const foundId = context?.appId

View File

@ -7,4 +7,5 @@ export type ContextMap = {
identity?: IdentityContext identity?: IdentityContext
environmentVariables?: Record<string, string> environmentVariables?: Record<string, string>
isScim?: boolean isScim?: boolean
automationId?: string
} }

View File

@ -39,6 +39,7 @@ if (!env.DISABLE_PINO_LOGGER) {
objects?: any[] objects?: any[]
tenantId?: string tenantId?: string
appId?: string appId?: string
automationId?: string
identityId?: string identityId?: string
identityType?: IdentityType identityType?: IdentityType
correlationId?: string correlationId?: string
@ -86,6 +87,7 @@ if (!env.DISABLE_PINO_LOGGER) {
contextObject = { contextObject = {
tenantId: getTenantId(), tenantId: getTenantId(),
appId: getAppId(), appId: getAppId(),
automationId: getAutomationId(),
identityId: identity?._id, identityId: identity?._id,
identityType: identity?.type, identityType: identity?.type,
correlationId: correlation.getId(), correlationId: correlation.getId(),
@ -159,6 +161,16 @@ if (!env.DISABLE_PINO_LOGGER) {
return appId return appId
} }
const getAutomationId = () => {
let appId
try {
appId = context.getAutomationId()
} catch (e) {
// do nothing
}
return appId
}
const getIdentity = () => { const getIdentity = () => {
let identity let identity
try { try {

View File

@ -128,6 +128,7 @@ class InMemoryQueue {
on() { on() {
// do nothing // do nothing
return this
} }
async waitForCompletion() { async waitForCompletion() {

View File

@ -1,5 +1,6 @@
import { Job, JobId, Queue } from "bull" import { Job, JobId, Queue } from "bull"
import { JobQueue } from "./constants" import { JobQueue } from "./constants"
import * as context from "../context"
export type StalledFn = (job: Job) => Promise<void> export type StalledFn = (job: Job) => Promise<void>
@ -31,77 +32,153 @@ function handleStalled(queue: Queue, removeStalledCb?: StalledFn) {
}) })
} }
function logging(queue: Queue, jobQueue: JobQueue) { function getLogParams(
let eventType: string eventType: QueueEventType,
switch (jobQueue) { event: BullEvent,
case JobQueue.AUTOMATION: opts: {
eventType = "automation-event" job?: Job
break jobId?: JobId
case JobQueue.APP_BACKUP: error?: Error
eventType = "app-backup-event" } = {},
break extra: any = {}
case JobQueue.AUDIT_LOG: ) {
eventType = "audit-log-event" const message = `[BULL] ${eventType}=${event}`
break const err = opts.error
case JobQueue.SYSTEM_EVENT_QUEUE:
eventType = "system-event" const data = {
break eventType,
event,
job: opts.job,
jobId: opts.jobId || opts.job?.id,
...extra,
} }
return [message, err, data]
}
enum BullEvent {
ERROR = "error",
WAITING = "waiting",
ACTIVE = "active",
STALLED = "stalled",
PROGRESS = "progress",
COMPLETED = "completed",
FAILED = "failed",
PAUSED = "paused",
RESUMED = "resumed",
CLEANED = "cleaned",
DRAINED = "drained",
REMOVED = "removed",
}
enum QueueEventType {
AUTOMATION_EVENT = "automation-event",
APP_BACKUP_EVENT = "app-backup-event",
AUDIT_LOG_EVENT = "audit-log-event",
SYSTEM_EVENT = "system-event",
}
const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
[JobQueue.AUTOMATION]: QueueEventType.AUTOMATION_EVENT,
[JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT,
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
}
function logging(queue: Queue, jobQueue: JobQueue) {
const eventType = EventTypeMap[jobQueue]
function doInJobContext(job: Job, task: any) {
// if this is an automation job try to get the app id
const appId = job.data.event?.appId
if (appId) {
return context.doInContext(appId, task)
} else {
task()
}
}
queue
.on(BullEvent.STALLED, async (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
await doInJobContext(job, () => {
console.error(...getLogParams(eventType, BullEvent.STALLED, { job }))
})
})
.on(BullEvent.ERROR, (error: any) => {
// An error occurred.
console.error(...getLogParams(eventType, BullEvent.ERROR, { error }))
})
if (process.env.NODE_DEBUG?.includes("bull")) { if (process.env.NODE_DEBUG?.includes("bull")) {
queue queue
.on("error", (error: any) => { .on(BullEvent.WAITING, (jobId: JobId) => {
// An error occurred.
console.error(`${eventType}=error error=${JSON.stringify(error)}`)
})
.on("waiting", (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling. // A Job is waiting to be processed as soon as a worker is idling.
console.log(`${eventType}=waiting jobId=${jobId}`) console.info(...getLogParams(eventType, BullEvent.WAITING, { jobId }))
}) })
.on("active", (job: Job, jobPromise: any) => { .on(BullEvent.ACTIVE, async (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it. // A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log(`${eventType}=active jobId=${job.id}`) await doInJobContext(job, () => {
console.info(...getLogParams(eventType, BullEvent.ACTIVE, { job }))
})
}) })
.on("stalled", (job: Job) => { .on(BullEvent.PROGRESS, async (job: Job, progress: any) => {
// A job has been marked as stalled. This is useful for debugging job // A job's progress was updated
// workers that crash or pause the event loop. await doInJobContext(job, () => {
console.error( console.info(
`${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}` ...getLogParams(
) eventType,
BullEvent.PROGRESS,
{ job },
{ progress }
)
)
})
}) })
.on("progress", (job: Job, progress: any) => { .on(BullEvent.COMPLETED, async (job: Job, result) => {
// A job's progress was updated!
console.log(
`${eventType}=progress jobId=${job.id} progress=${progress}`
)
})
.on("completed", (job: Job, result) => {
// A job successfully completed with a `result`. // A job successfully completed with a `result`.
console.log(`${eventType}=completed jobId=${job.id} result=${result}`) await doInJobContext(job, () => {
console.info(
...getLogParams(eventType, BullEvent.COMPLETED, { job }, { result })
)
})
}) })
.on("failed", (job, err: any) => { .on(BullEvent.FAILED, async (job: Job, error: any) => {
// A job failed with reason `err`! // A job failed with reason `err`!
console.log(`${eventType}=failed jobId=${job.id} error=${err}`) await doInJobContext(job, () => {
console.error(
...getLogParams(eventType, BullEvent.FAILED, { job, error })
)
})
}) })
.on("paused", () => { .on(BullEvent.PAUSED, () => {
// The queue has been paused. // The queue has been paused.
console.log(`${eventType}=paused`) console.info(...getLogParams(eventType, BullEvent.PAUSED))
}) })
.on("resumed", (job: Job) => { .on(BullEvent.RESUMED, () => {
// The queue has been resumed. // The queue has been resumed.
console.log(`${eventType}=paused jobId=${job.id}`) console.info(...getLogParams(eventType, BullEvent.RESUMED))
}) })
.on("cleaned", (jobs: Job[], type: string) => { .on(BullEvent.CLEANED, (jobs: Job[], type: string) => {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned // Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned. // jobs, and `type` is the type of jobs cleaned.
console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`) console.info(
...getLogParams(
eventType,
BullEvent.CLEANED,
{},
{ length: jobs.length, type }
)
)
}) })
.on("drained", () => { .on(BullEvent.DRAINED, () => {
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed) // Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
console.log(`${eventType}=drained`) console.info(...getLogParams(eventType, BullEvent.DRAINED))
}) })
.on("removed", (job: Job) => { .on(BullEvent.REMOVED, (job: Job) => {
// A job successfully removed. // A job successfully removed.
console.log(`${eventType}=removed jobId=${job.id}`) console.info(...getLogParams(eventType, BullEvent.REMOVED, { job }))
}) })
} }
} }

View File

@ -0,0 +1,140 @@
# Console Control Strings
A library of cross-platform tested terminal/console command strings for
doing things like color and cursor positioning. This is a subset of both
ansi and vt100. All control codes included work on both Windows & Unix-like
OSes, except where noted.
## Usage
```js
var consoleControl = require('console-control-strings')
console.log(consoleControl.color('blue','bgRed', 'bold') + 'hi there' + consoleControl.color('reset'))
process.stdout.write(consoleControl.goto(75, 10))
```
## Why Another?
There are tons of libraries similar to this one. I wanted one that was:
1. Very clear about compatibility goals.
2. Could emit, for instance, a start color code without an end one.
3. Returned strings w/o writing to streams.
4. Was not weighed down with other unrelated baggage.
## Functions
### var code = consoleControl.up(_num = 1_)
Returns the escape sequence to move _num_ lines up.
### var code = consoleControl.down(_num = 1_)
Returns the escape sequence to move _num_ lines down.
### var code = consoleControl.forward(_num = 1_)
Returns the escape sequence to move _num_ lines righ.
### var code = consoleControl.back(_num = 1_)
Returns the escape sequence to move _num_ lines left.
### var code = consoleControl.nextLine(_num = 1_)
Returns the escape sequence to move _num_ lines down and to the beginning of
the line.
### var code = consoleControl.previousLine(_num = 1_)
Returns the escape sequence to move _num_ lines up and to the beginning of
the line.
### var code = consoleControl.eraseData()
Returns the escape sequence to erase everything from the current cursor
position to the bottom right of the screen. This is line based, so it
erases the remainder of the current line and all following lines.
### var code = consoleControl.eraseLine()
Returns the escape sequence to erase to the end of the current line.
### var code = consoleControl.goto(_x_, _y_)
Returns the escape sequence to move the cursor to the designated position.
Note that the origin is _1, 1_ not _0, 0_.
### var code = consoleControl.gotoSOL()
Returns the escape sequence to move the cursor to the beginning of the
current line. (That is, it returns a carriage return, `\r`.)
### var code = consoleControl.hideCursor()
Returns the escape sequence to hide the cursor.
### var code = consoleControl.showCursor()
Returns the escape sequence to show the cursor.
### var code = consoleControl.color(_colors = []_)
### var code = consoleControl.color(_color1_, _color2_, _…_, _colorn_)
Returns the escape sequence to set the current terminal display attributes
(mostly colors). Arguments can either be a list of attributes or an array
of attributes. The difference between passing in an array or list of colors
and calling `.color` separately for each one, is that in the former case a
single escape sequence will be produced where as in the latter each change
will have its own distinct escape sequence. Each attribute can be one of:
* Reset:
* **reset** Reset all attributes to the terminal default.
* Styles:
* **bold** Display text as bold. In some terminals this means using a
bold font, in others this means changing the color. In some it means
both.
* **italic** Display text as italic. This is not available in most Windows terminals.
* **underline** Underline text. This is not available in most Windows Terminals.
* **inverse** Invert the foreground and background colors.
* **stopBold** Do not display text as bold.
* **stopItalic** Do not display text as italic.
* **stopUnderline** Do not underline text.
* **stopInverse** Do not invert foreground and background.
* Colors:
* **white**
* **black**
* **blue**
* **cyan**
* **green**
* **magenta**
* **red**
* **yellow**
* **grey** / **brightBlack**
* **brightRed**
* **brightGreen**
* **brightYellow**
* **brightBlue**
* **brightMagenta**
* **brightCyan**
* **brightWhite**
* Background Colors:
* **bgWhite**
* **bgBlack**
* **bgBlue**
* **bgCyan**
* **bgGreen**
* **bgMagenta**
* **bgRed**
* **bgYellow**
* **bgGrey** / **bgBrightBlack**
* **bgBrightRed**
* **bgBrightGreen**
* **bgBrightYellow**
* **bgBrightBlue**
* **bgBrightMagenta**
* **bgBrightCyan**
* **bgBrightWhite**

127
packages/pro/node_modules/copy-concurrently/README.md~ generated vendored Normal file
View File

@ -0,0 +1,127 @@
# copy-concurrently
Copy files, directories and symlinks
```
const copy = require('copy-concurrently')
copy('/path/to/thing', '/new/path/thing').then(() => {
// this is now copied
}).catch(err => {
// oh noooo
})
```
Copies files, directories and symlinks. Ownership is maintained when
running as root, permissions are always maintained. On Windows, if symlinks
are unavailable then junctions will be used.
## PUBLIC INTERFACE
### copy(from, to, [options]) → Promise
Recursively copies `from` to `to` and resolves its promise when finished.
If `to` already exists then the promise will be rejected with an `EEXIST`
error.
Options are:
* maxConcurrency (Default: `1`) The maximum number of concurrent copies to do at once.
* recurseWith - (Default: `copy.item`) The function to call on each file after recursing into a directory.
* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires
an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory
fails then we'll try making a junction instead.
Options can also include dependency injection:
* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's.
* fs - (Default: `require('fs')`) The filesystem module to use. Can be used
to use `graceful-fs` or to inject a mock.
* writeStreamAtomic - (Default: `require('fs-write-stream-atomic')`) The
implementation of `writeStreamAtomic` to use. Used to inject a mock.
* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock.
## EXTENSION INTERFACE
Ordinarily you'd only call `copy` above. But it's possible to use it's
component functions directly. This is useful if, say, you're writing
[move-concurently](https://npmjs.com/package/move-concurrently).
### copy.file(from, to, options) → Promise
Copies a ordinary file `from` to destination `to`. Uses
`fs-write-stream-atomic` to ensure that the file is entirely copied or not
at all.
Options are:
* uid, gid - (Optional) If `getuid()` is `0` then this and gid will be used to
set the user and group of `to`. If uid is present then gid must be too.
* mode - (Optional) If set then `to` will have its perms set to `mode`.
* fs - (Default: `require('fs')`) The filesystem module to use. Can be used
to use `graceful-fs` or to inject a mock.
* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's.
* writeStreamAtomic - (Default `require('fs-write-stream-atomic')`) The
implementation of `writeStreamAtomic` to use. Used to inject a mock.
### copy.symlink(from, to, options) → Promise
Copies a symlink `from` to destination `to`. If on Windows then if
symlinking fails, a junction will be used instead.
Options are:
* top - The top level the copy is being run from. This is used to determine
if the symlink destination is within the set of files we're copying or
outside it.
* fs - (Default: `require('fs')`) The filesystem module to use. Can be used
to use `graceful-fs` or to inject a mock.
* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's.
* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires
an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory
fails then we'll try making a junction instead.
### copy.recurse(from, to, options) → Promise
Reads all of the files in directory `from` and adds them to the `queue`
using `recurseWith` (by default `copy.item`).
Options are:
* queue - A [`run-queue`](https://npmjs.com/package/run-queue) object to add files found inside `from` to.
* recurseWith - (Default: `copy.item`) The function to call on each file after recursing into a directory.
* uid, gid - (Optional) If `getuid()` is `0` then this and gid will be used to
set the user and group of `to`. If uid is present then gid must be too.
* mode - (Optional) If set then `to` will have its perms set to `mode`.
* fs - (Default: `require('fs')`) The filesystem module to use. Can be used
to use `graceful-fs` or to inject a mock.
* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock.
### copy.item(from, to, options) → Promise
Copies some kind of `from` to destination `to`. This looks at the filetype
and calls `copy.file`, `copy.symlink` or `copy.recurse` as appropriate.
Symlink copies are queued with a priority such that they happen after all
file and directory copies as you can't create a junction on windows to a
file that doesn't exist yet.
Options are:
* top - The top level the copy is being run from. This is used to determine
if the symlink destination is within the set of files we're copying or
outside it.
* queue - The [`run-queue`](https://npmjs.com/package/run-queue) object to
pass to `copy.recurse` if `from` is a directory.
* recurseWith - (Default: `copy.item`) The function to call on each file after recursing into a directory.
* uid, gid - (Optional) If `getuid()` is `0` then this and gid will be used to
set the user and group of `to`. If uid is present then gid must be too.
* mode - (Optional) If set then `to` will have its perms set to `mode`.
* fs - (Default: `require('fs')`) The filesystem module to use. Can be used
to use `graceful-fs` or to inject a mock.
* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock.
* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires
an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory
fails then we'll try making a junction instead.
* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's.
* writeStreamAtomic - (Default `require('fs-write-stream-atomic')`) The
implementation of `writeStreamAtomic` to use. Used to inject a mock.

52
packages/pro/node_modules/move-concurrently/README.md~ generated vendored Normal file
View File

@ -0,0 +1,52 @@
# move-concurrently
Move files and directories.
```
const move = require('move-concurrently')
move('/path/to/thing', '/new/path/thing'), err => {
if (err) throw err
// thing is now moved!
})
```
Uses `rename` to move things as fast as possible.
If you `move` across devices or on filesystems that don't support renaming
large directories. That is, situations that result in `rename` returning
the `EXDEV` error, then `move` will fallback to copy + delete.
When recursively copying directories it will first try to rename the
contents before falling back to copying. While this will be slightly slower
in true cross-device scenarios, it is MUCH faster in cases where the
filesystem can't handle directory renames.
When copying ownership is maintained when running as root. Permissions are
always maintained. On Windows, if symlinks are unavailable then junctions
will be used.
## INTERFACE
### move(from, to, options) → Promise
Recursively moves `from` to `to` and resolves its promise when finished.
If `to` already exists then the promise will be rejected with an `EEXIST`
error.
Starts by trying to rename `from` to `to`.
Options are:
* maxConcurrency (Default: `1`) The maximum number of concurrent copies to do at once.
* isWindows - (Default: `process.platform === 'win32'`) If true enables Windows symlink semantics. This requires
an extra `stat` to determine if the destination of a symlink is a file or directory. If symlinking a directory
fails then we'll try making a junction instead.
Options can also include dependency injection:
* Promise - (Default: `global.Promise`) The promise implementation to use, defaults to Node's.
* fs - (Default: `require('fs')`) The filesystem module to use. Can be used
to use `graceful-fs` or to inject a mock.
* writeStreamAtomic - (Default: `require('fs-write-stream-atomic')`) The
implementation of `writeStreamAtomic` to use. Used to inject a mock.
* getuid - (Default: `process.getuid`) A function that returns the current UID. Used to inject a mock.

68
packages/pro/node_modules/umask/index.js~ generated vendored Normal file
View File

@ -0,0 +1,68 @@
// commands for handling umask
var util = require("util")
var log = require("npmlog")
var defaultUmask = 0022
var defaultUmaskString = toString(defaultUmask)
exports.toString = toString
exports.fromString = fromString
exports.validate = validate
function toString(val) {
val = val.toString(8)
while (val.length < 4) val = "0" + val
return val
}
function validate (data, k, val) {
// must be either an integer or an octal string.
if (typeof val === "number") {
data[k] = val
return true
}
if (typeof val === "string") {
if (val.charAt(0) !== "0" || isNaN(val)) return false
data[k] = parseInt(val, 8)
return true
}
return false
}
function fromString(val, cb) {
// synchronous callback, no zalgo
_fromString(val, cb || function (err, result) {
if (err) log.warn("invalid umask", err.message)
val = result
})
return val
}
function _fromString(val, cb) {
if(typeof val === "string") {
if (!/^[0-7]+$/.test(val)) {
return cb(new Error(util.format("Expected octal string, got %j, defaulting to %j",
val, defaultUmaskString)),
defaultUmask)
}
val = parseInt(val, 8)
} else if(typeof val !== "number") {
return cb(new Error(util.format("Expected number or octal string, got %j, defaulting to %j",
val, defaultUmaskString)),
defaultUmask)
}
if ((val < 0) || (val > 511)) {
return cb(new Error(util.format("Must be in range 0..511 (0000..0777), got %j", val)),
defaultUmask)
}
cb(null, val)
}

5
packages/pro/node_modules/umask/test/ChangeLog~ generated vendored Normal file
View File

@ -0,0 +1,5 @@
2015-01-15 Sam Mikes <smikes@Svarog.local>
* index.js: (convert_fromString) accept decimal strings provided they
don't begin with '0'

14
packages/pro/node_modules/umask/test/simple.js~ generated vendored Normal file
View File

@ -0,0 +1,14 @@
'use strict';
var Purefts = require('..');
var Code = require('code');
var Lab = require('lab');
var lab = Lab.script();
exports.lab = lab;
var describe = lab.describe;
var it = lab.it;
var expect = Code.expect;
describe('create objects', function () {

View File

@ -9,7 +9,7 @@ import { checkTestFlag } from "../utilities/redis"
import * as utils from "./utils" import * as utils from "./utils"
import env from "../environment" import env from "../environment"
import { context, db as dbCore } from "@budibase/backend-core" import { context, db as dbCore } from "@budibase/backend-core"
import { Automation, Row } from "@budibase/types" import { Automation, Row, AutomationData, AutomationJob } from "@budibase/types"
export const TRIGGER_DEFINITIONS = definitions export const TRIGGER_DEFINITIONS = definitions
const JOB_OPTS = { const JOB_OPTS = {
@ -109,14 +109,16 @@ export async function externalTrigger(
} }
params.fields = coercedFields params.fields = coercedFields
} }
const data: Record<string, any> = { automation, event: params }
const data: AutomationData = { automation, event: params as any }
if (getResponses) { if (getResponses) {
data.event = { data.event = {
...data.event, ...data.event,
appId: context.getAppId(), appId: context.getAppId(),
automation, automation,
} }
return utils.processEvent({ data }) const job = { data } as AutomationJob
return utils.processEvent(job)
} else { } else {
return automationQueue.add(data, JOB_OPTS) return automationQueue.add(data, JOB_OPTS)
} }

View File

@ -8,7 +8,7 @@ import { db as dbCore, context } from "@budibase/backend-core"
import { getAutomationMetadataParams } from "../db/utils" import { getAutomationMetadataParams } from "../db/utils"
import { cloneDeep } from "lodash/fp" import { cloneDeep } from "lodash/fp"
import { quotas } from "@budibase/pro" import { quotas } from "@budibase/pro"
import { Automation, WebhookActionType } from "@budibase/types" import { Automation, AutomationJob, WebhookActionType } from "@budibase/types"
import sdk from "../sdk" import sdk from "../sdk"
const REBOOT_CRON = "@reboot" const REBOOT_CRON = "@reboot"
@ -16,27 +16,34 @@ const WH_STEP_ID = definitions.WEBHOOK.stepId
const CRON_STEP_ID = definitions.CRON.stepId const CRON_STEP_ID = definitions.CRON.stepId
const Runner = new Thread(ThreadType.AUTOMATION) const Runner = new Thread(ThreadType.AUTOMATION)
const jobMessage = (job: any, message: string) => { function loggingArgs(job: AutomationJob) {
return `app=${job.data.event.appId} automation=${job.data.automation._id} jobId=${job.id} trigger=${job.data.automation.definition.trigger.event} : ${message}` return {
jobId: job.id,
trigger: job.data.automation.definition.trigger.event,
}
} }
export async function processEvent(job: any) { export async function processEvent(job: AutomationJob) {
try { const appId = job.data.event.appId!
const automationId = job.data.automation._id const automationId = job.data.automation._id!
console.log(jobMessage(job, "running")) const task = async () => {
// need to actually await these so that an error can be captured properly try {
return await context.doInContext(job.data.event.appId, async () => { // need to actually await these so that an error can be captured properly
console.log("automation running", loggingArgs(job))
const runFn = () => Runner.run(job) const runFn = () => Runner.run(job)
return quotas.addAutomation(runFn, { const result = await quotas.addAutomation(runFn, {
automationId, automationId,
}) })
}) console.log("automation completed", loggingArgs(job))
} catch (err) { return result
const errJson = JSON.stringify(err) } catch (err) {
console.error(jobMessage(job, `was unable to run - ${errJson}`)) console.error(`automation was unable to run`, err, loggingArgs(job))
console.trace(err) return { err }
return { err } }
} }
return await context.doInAutomationContext({ appId, automationId, task })
} }
export async function updateTestHistory( export async function updateTestHistory(

View File

@ -1,4 +1,4 @@
import { AutomationResults, AutomationStep, Document } from "@budibase/types" import { AutomationResults, AutomationStep } from "@budibase/types"
export enum LoopStepType { export enum LoopStepType {
ARRAY = "Array", ARRAY = "Array",
@ -27,7 +27,3 @@ export interface AutomationContext extends AutomationResults {
env?: Record<string, string> env?: Record<string, string>
trigger: any trigger: any
} }
export interface AutomationMetadata extends Document {
errorCount?: number
}

View File

@ -13,13 +13,18 @@ import { generateAutomationMetadataID, isProdAppID } from "../db/utils"
import { definitions as triggerDefs } from "../automations/triggerInfo" import { definitions as triggerDefs } from "../automations/triggerInfo"
import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants" import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants"
import { storeLog } from "../automations/logging" import { storeLog } from "../automations/logging"
import { Automation, AutomationStep, AutomationStatus } from "@budibase/types" import {
Automation,
AutomationStep,
AutomationStatus,
AutomationMetadata,
AutomationJob,
} from "@budibase/types"
import { import {
LoopStep, LoopStep,
LoopInput, LoopInput,
TriggerOutput, TriggerOutput,
AutomationContext, AutomationContext,
AutomationMetadata,
} from "../definitions/automations" } from "../definitions/automations"
import { WorkerCallback } from "./definitions" import { WorkerCallback } from "./definitions"
import { context, logging } from "@budibase/backend-core" import { context, logging } from "@budibase/backend-core"
@ -60,11 +65,11 @@ class Orchestrator {
_job: Job _job: Job
executionOutput: AutomationContext executionOutput: AutomationContext
constructor(job: Job) { constructor(job: AutomationJob) {
let automation = job.data.automation, let automation = job.data.automation
triggerOutput = job.data.event let triggerOutput = job.data.event
const metadata = triggerOutput.metadata const metadata = triggerOutput.metadata
this._chainCount = metadata ? metadata.automationChainCount : 0 this._chainCount = metadata ? metadata.automationChainCount! : 0
this._appId = triggerOutput.appId as string this._appId = triggerOutput.appId as string
this._job = job this._job = job
const triggerStepId = automation.definition.trigger.stepId const triggerStepId = automation.definition.trigger.stepId

View File

@ -1,5 +1,3 @@
import { EnvironmentVariablesDecrypted } from "@budibase/types"
export type WorkerCallback = (error: any, response?: any) => void export type WorkerCallback = (error: any, response?: any) => void
export interface QueryEvent { export interface QueryEvent {

View File

@ -1,5 +1,7 @@
import workerFarm from "worker-farm" import workerFarm from "worker-farm"
import env from "../environment" import env from "../environment"
import { AutomationJob } from "@budibase/types"
import { QueryEvent } from "./definitions"
export const ThreadType = { export const ThreadType = {
QUERY: "query", QUERY: "query",
@ -64,11 +66,11 @@ export class Thread {
) )
} }
run(data: any) { run(job: AutomationJob | QueryEvent) {
const timeout = this.timeoutMs const timeout = this.timeoutMs
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
function fire(worker: any) { function fire(worker: any) {
worker.execute(data, (err: any, response: any) => { worker.execute(job, (err: any, response: any) => {
if (err && err.type === "TimeoutError") { if (err && err.type === "TimeoutError") {
reject( reject(
new Error(`Query response time exceeded ${timeout}ms timeout.`) new Error(`Query response time exceeded ${timeout}ms timeout.`)

View File

@ -177,3 +177,8 @@ export type AutomationStepInput = {
appId: string appId: string
apiKey?: string apiKey?: string
} }
export interface AutomationMetadata extends Document {
errorCount?: number
automationChainCount?: number
}

View File

@ -0,0 +1,15 @@
import { Automation, AutomationMetadata } from "../../documents"
import { Job } from "bull"
export interface AutomationDataEvent {
appId?: string
metadata?: AutomationMetadata
automation?: Automation
}
export interface AutomationData {
event: AutomationDataEvent
automation: Automation
}
export type AutomationJob = Job<AutomationData>

View File

@ -1,3 +1,4 @@
export * from "./automations"
export * from "./hosting" export * from "./hosting"
export * from "./context" export * from "./context"
export * from "./events" export * from "./events"