Merge branch 'master' into BUDI-9038/validate-js-helpers
This commit is contained in:
commit
871b9b2b95
|
@ -1,6 +1,6 @@
|
||||||
{
|
{
|
||||||
"$schema": "node_modules/lerna/schemas/lerna-schema.json",
|
"$schema": "node_modules/lerna/schemas/lerna-schema.json",
|
||||||
"version": "3.4.13",
|
"version": "3.4.15",
|
||||||
"npmClient": "yarn",
|
"npmClient": "yarn",
|
||||||
"concurrency": 20,
|
"concurrency": 20,
|
||||||
"command": {
|
"command": {
|
||||||
|
|
|
@ -123,7 +123,7 @@ export async function doInAutomationContext<T>(params: {
|
||||||
task: () => T
|
task: () => T
|
||||||
}): Promise<T> {
|
}): Promise<T> {
|
||||||
await ensureSnippetContext()
|
await ensureSnippetContext()
|
||||||
return newContext(
|
return await newContext(
|
||||||
{
|
{
|
||||||
tenantId: getTenantIDFromAppID(params.appId),
|
tenantId: getTenantIDFromAppID(params.appId),
|
||||||
appId: params.appId,
|
appId: params.appId,
|
||||||
|
|
|
@ -5,10 +5,10 @@ import {
|
||||||
SqlQuery,
|
SqlQuery,
|
||||||
Table,
|
Table,
|
||||||
TableSourceType,
|
TableSourceType,
|
||||||
|
SEPARATOR,
|
||||||
} from "@budibase/types"
|
} from "@budibase/types"
|
||||||
import { DEFAULT_BB_DATASOURCE_ID } from "../constants"
|
import { DEFAULT_BB_DATASOURCE_ID } from "../constants"
|
||||||
import { Knex } from "knex"
|
import { Knex } from "knex"
|
||||||
import { SEPARATOR } from "../db"
|
|
||||||
import environment from "../environment"
|
import environment from "../environment"
|
||||||
|
|
||||||
const DOUBLE_SEPARATOR = `${SEPARATOR}${SEPARATOR}`
|
const DOUBLE_SEPARATOR = `${SEPARATOR}${SEPARATOR}`
|
||||||
|
|
|
@ -8,6 +8,7 @@ import {
|
||||||
UIComponentError,
|
UIComponentError,
|
||||||
ComponentDefinition,
|
ComponentDefinition,
|
||||||
DependsOnComponentSetting,
|
DependsOnComponentSetting,
|
||||||
|
Screen,
|
||||||
} from "@budibase/types"
|
} from "@budibase/types"
|
||||||
import { queries } from "./queries"
|
import { queries } from "./queries"
|
||||||
import { views } from "./views"
|
import { views } from "./views"
|
||||||
|
@ -66,6 +67,7 @@ export const screenComponentErrorList = derived(
|
||||||
if (!$selectedScreen) {
|
if (!$selectedScreen) {
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
|
const screen = $selectedScreen
|
||||||
|
|
||||||
const datasources = {
|
const datasources = {
|
||||||
...reduceBy("_id", $tables.list),
|
...reduceBy("_id", $tables.list),
|
||||||
|
@ -79,7 +81,9 @@ export const screenComponentErrorList = derived(
|
||||||
const errors: UIComponentError[] = []
|
const errors: UIComponentError[] = []
|
||||||
|
|
||||||
function checkComponentErrors(component: Component, ancestors: string[]) {
|
function checkComponentErrors(component: Component, ancestors: string[]) {
|
||||||
errors.push(...getInvalidDatasources(component, datasources, definitions))
|
errors.push(
|
||||||
|
...getInvalidDatasources(screen, component, datasources, definitions)
|
||||||
|
)
|
||||||
errors.push(...getMissingRequiredSettings(component, definitions))
|
errors.push(...getMissingRequiredSettings(component, definitions))
|
||||||
errors.push(...getMissingAncestors(component, definitions, ancestors))
|
errors.push(...getMissingAncestors(component, definitions, ancestors))
|
||||||
|
|
||||||
|
@ -95,6 +99,7 @@ export const screenComponentErrorList = derived(
|
||||||
)
|
)
|
||||||
|
|
||||||
function getInvalidDatasources(
|
function getInvalidDatasources(
|
||||||
|
screen: Screen,
|
||||||
component: Component,
|
component: Component,
|
||||||
datasources: Record<string, any>,
|
datasources: Record<string, any>,
|
||||||
definitions: Record<string, ComponentDefinition>
|
definitions: Record<string, ComponentDefinition>
|
||||||
|
|
|
@ -7,6 +7,7 @@ import {
|
||||||
CreateRowStepOutputs,
|
CreateRowStepOutputs,
|
||||||
FieldType,
|
FieldType,
|
||||||
FilterCondition,
|
FilterCondition,
|
||||||
|
AutomationStepStatus,
|
||||||
} from "@budibase/types"
|
} from "@budibase/types"
|
||||||
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
||||||
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
||||||
|
@ -560,5 +561,25 @@ describe("Attempt to run a basic loop automation", () => {
|
||||||
status: "stopped",
|
status: "stopped",
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("should not fail if queryRows returns nothing", async () => {
|
||||||
|
const table = await config.api.table.save(basicTable())
|
||||||
|
const results = await createAutomationBuilder(config)
|
||||||
|
.onAppAction()
|
||||||
|
.queryRows({
|
||||||
|
tableId: table._id!,
|
||||||
|
})
|
||||||
|
.loop({
|
||||||
|
option: LoopStepType.ARRAY,
|
||||||
|
binding: "{{ steps.1.rows }}",
|
||||||
|
})
|
||||||
|
.serverLog({ text: "Message {{loop.currentItem}}" })
|
||||||
|
.test({ fields: {} })
|
||||||
|
|
||||||
|
expect(results.steps[1].outputs.success).toBe(true)
|
||||||
|
expect(results.steps[1].outputs.status).toBe(
|
||||||
|
AutomationStepStatus.NO_ITERATIONS
|
||||||
|
)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -40,39 +40,35 @@ function loggingArgs(job: AutomationJob) {
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function processEvent(job: AutomationJob) {
|
export async function processEvent(job: AutomationJob) {
|
||||||
return tracer.trace(
|
return tracer.trace("processEvent", async span => {
|
||||||
"processEvent",
|
const appId = job.data.event.appId!
|
||||||
{ resource: "automation" },
|
const automationId = job.data.automation._id!
|
||||||
async span => {
|
|
||||||
const appId = job.data.event.appId!
|
|
||||||
const automationId = job.data.automation._id!
|
|
||||||
|
|
||||||
span?.addTags({
|
span.addTags({
|
||||||
appId,
|
appId,
|
||||||
automationId,
|
automationId,
|
||||||
job: {
|
job: {
|
||||||
id: job.id,
|
id: job.id,
|
||||||
name: job.name,
|
name: job.name,
|
||||||
attemptsMade: job.attemptsMade,
|
attemptsMade: job.attemptsMade,
|
||||||
opts: {
|
attempts: job.opts.attempts,
|
||||||
attempts: job.opts.attempts,
|
priority: job.opts.priority,
|
||||||
priority: job.opts.priority,
|
delay: job.opts.delay,
|
||||||
delay: job.opts.delay,
|
repeat: job.opts.repeat,
|
||||||
repeat: job.opts.repeat,
|
backoff: job.opts.backoff,
|
||||||
backoff: job.opts.backoff,
|
lifo: job.opts.lifo,
|
||||||
lifo: job.opts.lifo,
|
timeout: job.opts.timeout,
|
||||||
timeout: job.opts.timeout,
|
jobId: job.opts.jobId,
|
||||||
jobId: job.opts.jobId,
|
removeOnComplete: job.opts.removeOnComplete,
|
||||||
removeOnComplete: job.opts.removeOnComplete,
|
removeOnFail: job.opts.removeOnFail,
|
||||||
removeOnFail: job.opts.removeOnFail,
|
stackTraceLimit: job.opts.stackTraceLimit,
|
||||||
stackTraceLimit: job.opts.stackTraceLimit,
|
preventParsingData: job.opts.preventParsingData,
|
||||||
preventParsingData: job.opts.preventParsingData,
|
},
|
||||||
},
|
})
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
const task = async () => {
|
const task = async () => {
|
||||||
try {
|
try {
|
||||||
|
return await tracer.trace("task", async () => {
|
||||||
if (isCronTrigger(job.data.automation) && !job.data.event.timestamp) {
|
if (isCronTrigger(job.data.automation) && !job.data.event.timestamp) {
|
||||||
// Requires the timestamp at run time
|
// Requires the timestamp at run time
|
||||||
job.data.event.timestamp = Date.now()
|
job.data.event.timestamp = Date.now()
|
||||||
|
@ -81,25 +77,19 @@ export async function processEvent(job: AutomationJob) {
|
||||||
console.log("automation running", ...loggingArgs(job))
|
console.log("automation running", ...loggingArgs(job))
|
||||||
|
|
||||||
const runFn = () => Runner.run(job)
|
const runFn = () => Runner.run(job)
|
||||||
const result = await quotas.addAutomation(runFn, {
|
const result = await quotas.addAutomation(runFn, { automationId })
|
||||||
automationId,
|
|
||||||
})
|
|
||||||
console.log("automation completed", ...loggingArgs(job))
|
console.log("automation completed", ...loggingArgs(job))
|
||||||
return result
|
return result
|
||||||
} catch (err) {
|
})
|
||||||
span?.addTags({ error: true })
|
} catch (err) {
|
||||||
console.error(
|
span.addTags({ error: true })
|
||||||
`automation was unable to run`,
|
console.error(`automation was unable to run`, err, ...loggingArgs(job))
|
||||||
err,
|
return { err }
|
||||||
...loggingArgs(job)
|
|
||||||
)
|
|
||||||
return { err }
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return await context.doInAutomationContext({ appId, automationId, task })
|
|
||||||
}
|
}
|
||||||
)
|
|
||||||
|
return await context.doInAutomationContext({ appId, automationId, task })
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function updateTestHistory(
|
export async function updateTestHistory(
|
||||||
|
|
|
@ -62,12 +62,16 @@ const SCHEMA: Integration = {
|
||||||
type: DatasourceFieldType.STRING,
|
type: DatasourceFieldType.STRING,
|
||||||
required: true,
|
required: true,
|
||||||
},
|
},
|
||||||
|
rev: {
|
||||||
|
type: DatasourceFieldType.STRING,
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
class CouchDBIntegration implements IntegrationBase {
|
export class CouchDBIntegration implements IntegrationBase {
|
||||||
private readonly client: Database
|
private readonly client: Database
|
||||||
|
|
||||||
constructor(config: CouchDBConfig) {
|
constructor(config: CouchDBConfig) {
|
||||||
|
@ -82,7 +86,8 @@ class CouchDBIntegration implements IntegrationBase {
|
||||||
connected: false,
|
connected: false,
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
response.connected = await this.client.exists()
|
await this.client.allDocs({ limit: 1 })
|
||||||
|
response.connected = true
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
response.error = e.message as string
|
response.error = e.message as string
|
||||||
}
|
}
|
||||||
|
@ -99,13 +104,9 @@ class CouchDBIntegration implements IntegrationBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
async read(query: { json: string | object }) {
|
async read(query: { json: string | object }) {
|
||||||
const parsed = this.parse(query)
|
const params = { include_docs: true, ...this.parse(query) }
|
||||||
const params = {
|
|
||||||
include_docs: true,
|
|
||||||
...parsed,
|
|
||||||
}
|
|
||||||
const result = await this.client.allDocs(params)
|
const result = await this.client.allDocs(params)
|
||||||
return result.rows.map(row => row.doc)
|
return result.rows.map(row => row.doc!)
|
||||||
}
|
}
|
||||||
|
|
||||||
async update(query: { json: string | object }) {
|
async update(query: { json: string | object }) {
|
||||||
|
@ -121,8 +122,8 @@ class CouchDBIntegration implements IntegrationBase {
|
||||||
return await this.client.get(query.id)
|
return await this.client.get(query.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
async delete(query: { id: string }) {
|
async delete(query: { id: string; rev: string }) {
|
||||||
return await this.client.remove(query.id)
|
return await this.client.remove(query.id, query.rev)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,84 +1,87 @@
|
||||||
jest.mock("@budibase/backend-core", () => {
|
import { env } from "@budibase/backend-core"
|
||||||
const core = jest.requireActual("@budibase/backend-core")
|
import { CouchDBIntegration } from "../couchdb"
|
||||||
return {
|
import { generator } from "@budibase/backend-core/tests"
|
||||||
...core,
|
|
||||||
db: {
|
|
||||||
...core.db,
|
|
||||||
DatabaseWithConnection: function () {
|
|
||||||
return {
|
|
||||||
allDocs: jest.fn().mockReturnValue({ rows: [] }),
|
|
||||||
put: jest.fn(),
|
|
||||||
get: jest.fn().mockReturnValue({ _rev: "a" }),
|
|
||||||
remove: jest.fn(),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
import { default as CouchDBIntegration } from "../couchdb"
|
function couchSafeID(): string {
|
||||||
|
// CouchDB IDs must start with a letter, so we prepend an 'a'.
|
||||||
|
return `a${generator.guid()}`
|
||||||
|
}
|
||||||
|
|
||||||
class TestConfiguration {
|
function doc(data: Record<string, any>): string {
|
||||||
integration: any
|
return JSON.stringify({ _id: couchSafeID(), ...data })
|
||||||
|
}
|
||||||
|
|
||||||
constructor(
|
function query(data?: Record<string, any>): { json: string } {
|
||||||
config: any = { url: "http://somewhere", database: "something" }
|
return { json: doc(data || {}) }
|
||||||
) {
|
|
||||||
this.integration = new CouchDBIntegration.integration(config)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
describe("CouchDB Integration", () => {
|
describe("CouchDB Integration", () => {
|
||||||
let config: any
|
let couchdb: CouchDBIntegration
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
config = new TestConfiguration()
|
couchdb = new CouchDBIntegration({
|
||||||
})
|
url: env.COUCH_DB_URL,
|
||||||
|
database: couchSafeID(),
|
||||||
it("calls the create method with the correct params", async () => {
|
|
||||||
const doc = {
|
|
||||||
test: 1,
|
|
||||||
}
|
|
||||||
await config.integration.create({
|
|
||||||
json: JSON.stringify(doc),
|
|
||||||
})
|
|
||||||
expect(config.integration.client.put).toHaveBeenCalledWith(doc)
|
|
||||||
})
|
|
||||||
|
|
||||||
it("calls the read method with the correct params", async () => {
|
|
||||||
const doc = {
|
|
||||||
name: "search",
|
|
||||||
}
|
|
||||||
|
|
||||||
await config.integration.read({
|
|
||||||
json: JSON.stringify(doc),
|
|
||||||
})
|
|
||||||
|
|
||||||
expect(config.integration.client.allDocs).toHaveBeenCalledWith({
|
|
||||||
include_docs: true,
|
|
||||||
name: "search",
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
it("calls the update method with the correct params", async () => {
|
it("successfully connects", async () => {
|
||||||
const doc = {
|
const { connected } = await couchdb.testConnection()
|
||||||
_id: "1234",
|
expect(connected).toBe(true)
|
||||||
name: "search",
|
|
||||||
}
|
|
||||||
|
|
||||||
await config.integration.update({
|
|
||||||
json: JSON.stringify(doc),
|
|
||||||
})
|
|
||||||
|
|
||||||
expect(config.integration.client.put).toHaveBeenCalledWith({
|
|
||||||
...doc,
|
|
||||||
_rev: "a",
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
it("calls the delete method with the correct params", async () => {
|
it("can create documents", async () => {
|
||||||
const id = "1234"
|
const { id, ok, rev } = await couchdb.create(query({ test: 1 }))
|
||||||
await config.integration.delete({ id })
|
expect(id).toBeDefined()
|
||||||
expect(config.integration.client.remove).toHaveBeenCalledWith(id)
|
expect(ok).toBe(true)
|
||||||
|
expect(rev).toBeDefined()
|
||||||
|
})
|
||||||
|
|
||||||
|
it("can read created documents", async () => {
|
||||||
|
const { id, ok, rev } = await couchdb.create(query({ test: 1 }))
|
||||||
|
expect(id).toBeDefined()
|
||||||
|
expect(ok).toBe(true)
|
||||||
|
expect(rev).toBeDefined()
|
||||||
|
|
||||||
|
const docs = await couchdb.read(query())
|
||||||
|
expect(docs).toEqual([
|
||||||
|
{
|
||||||
|
_id: id,
|
||||||
|
_rev: rev,
|
||||||
|
test: 1,
|
||||||
|
createdAt: expect.any(String),
|
||||||
|
updatedAt: expect.any(String),
|
||||||
|
},
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it("can update documents", async () => {
|
||||||
|
const { id, ok, rev } = await couchdb.create(query({ test: 1 }))
|
||||||
|
expect(ok).toBe(true)
|
||||||
|
|
||||||
|
const { id: newId, rev: newRev } = await couchdb.update(
|
||||||
|
query({ _id: id, _rev: rev, test: 2 })
|
||||||
|
)
|
||||||
|
const docs = await couchdb.read(query())
|
||||||
|
expect(docs).toEqual([
|
||||||
|
{
|
||||||
|
_id: newId,
|
||||||
|
_rev: newRev,
|
||||||
|
test: 2,
|
||||||
|
createdAt: expect.any(String),
|
||||||
|
updatedAt: expect.any(String),
|
||||||
|
},
|
||||||
|
])
|
||||||
|
})
|
||||||
|
|
||||||
|
it("can delete documents", async () => {
|
||||||
|
const { id, ok, rev } = await couchdb.create(query({ test: 1 }))
|
||||||
|
expect(ok).toBe(true)
|
||||||
|
|
||||||
|
const deleteResponse = await couchdb.delete({ id, rev })
|
||||||
|
expect(deleteResponse.ok).toBe(true)
|
||||||
|
|
||||||
|
const docs = await couchdb.read(query())
|
||||||
|
expect(docs).toBeEmpty()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -68,7 +68,11 @@ function getLoopIterable(step: LoopStep): any[] {
|
||||||
let input = step.inputs.binding
|
let input = step.inputs.binding
|
||||||
|
|
||||||
if (option === LoopStepType.ARRAY && typeof input === "string") {
|
if (option === LoopStepType.ARRAY && typeof input === "string") {
|
||||||
input = JSON.parse(input)
|
if (input === "") {
|
||||||
|
input = []
|
||||||
|
} else {
|
||||||
|
input = JSON.parse(input)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (option === LoopStepType.STRING && Array.isArray(input)) {
|
if (option === LoopStepType.STRING && Array.isArray(input)) {
|
||||||
|
@ -310,87 +314,83 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute(): Promise<AutomationResults> {
|
async execute(): Promise<AutomationResults> {
|
||||||
return tracer.trace(
|
return await tracer.trace("execute", async span => {
|
||||||
"Orchestrator.execute",
|
span.addTags({ appId: this.appId, automationId: this.automation._id })
|
||||||
{ resource: "automation" },
|
|
||||||
async span => {
|
|
||||||
span?.addTags({ appId: this.appId, automationId: this.automation._id })
|
|
||||||
|
|
||||||
const job = cloneDeep(this.job)
|
const job = cloneDeep(this.job)
|
||||||
delete job.data.event.appId
|
delete job.data.event.appId
|
||||||
delete job.data.event.metadata
|
delete job.data.event.metadata
|
||||||
|
|
||||||
if (this.isCron() && !job.data.event.timestamp) {
|
if (this.isCron() && !job.data.event.timestamp) {
|
||||||
job.data.event.timestamp = Date.now()
|
job.data.event.timestamp = Date.now()
|
||||||
}
|
|
||||||
|
|
||||||
const trigger: AutomationTriggerResult = {
|
|
||||||
id: job.data.automation.definition.trigger.id,
|
|
||||||
stepId: job.data.automation.definition.trigger.stepId,
|
|
||||||
inputs: null,
|
|
||||||
outputs: job.data.event,
|
|
||||||
}
|
|
||||||
const result: AutomationResults = { trigger, steps: [trigger] }
|
|
||||||
|
|
||||||
const ctx: AutomationContext = {
|
|
||||||
trigger: trigger.outputs,
|
|
||||||
steps: [trigger.outputs],
|
|
||||||
stepsById: {},
|
|
||||||
stepsByName: {},
|
|
||||||
user: trigger.outputs.user,
|
|
||||||
}
|
|
||||||
await enrichBaseContext(ctx)
|
|
||||||
|
|
||||||
const timeout =
|
|
||||||
this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT
|
|
||||||
|
|
||||||
try {
|
|
||||||
await helpers.withTimeout(timeout, async () => {
|
|
||||||
const [stepOutputs, executionTime] = await utils.time(() =>
|
|
||||||
this.executeSteps(ctx, job.data.automation.definition.steps)
|
|
||||||
)
|
|
||||||
|
|
||||||
result.steps.push(...stepOutputs)
|
|
||||||
|
|
||||||
console.info(
|
|
||||||
`Automation ID: ${
|
|
||||||
this.automation._id
|
|
||||||
} Execution time: ${executionTime.toMs()} milliseconds`,
|
|
||||||
{
|
|
||||||
_logKey: "automation",
|
|
||||||
executionTime,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
})
|
|
||||||
} catch (e: any) {
|
|
||||||
if (e.errno === "ETIME") {
|
|
||||||
span?.addTags({ timedOut: true })
|
|
||||||
console.warn(`Automation execution timed out after ${timeout}ms`)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let errorCount = 0
|
|
||||||
if (this.isProdApp() && this.isCron() && this.hasErrored(ctx)) {
|
|
||||||
errorCount = (await this.incrementErrorCount()) || 0
|
|
||||||
}
|
|
||||||
|
|
||||||
if (errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
|
||||||
await this.stopCron("errors", { result })
|
|
||||||
span?.addTags({ shouldStop: true })
|
|
||||||
} else {
|
|
||||||
await this.logResult(result)
|
|
||||||
}
|
|
||||||
|
|
||||||
return result
|
|
||||||
}
|
}
|
||||||
)
|
|
||||||
|
const trigger: AutomationTriggerResult = {
|
||||||
|
id: job.data.automation.definition.trigger.id,
|
||||||
|
stepId: job.data.automation.definition.trigger.stepId,
|
||||||
|
inputs: null,
|
||||||
|
outputs: job.data.event,
|
||||||
|
}
|
||||||
|
const result: AutomationResults = { trigger, steps: [trigger] }
|
||||||
|
|
||||||
|
const ctx: AutomationContext = {
|
||||||
|
trigger: trigger.outputs,
|
||||||
|
steps: [trigger.outputs],
|
||||||
|
stepsById: {},
|
||||||
|
stepsByName: {},
|
||||||
|
user: trigger.outputs.user,
|
||||||
|
}
|
||||||
|
await enrichBaseContext(ctx)
|
||||||
|
|
||||||
|
const timeout =
|
||||||
|
this.job.data.event.timeout || env.AUTOMATION_THREAD_TIMEOUT
|
||||||
|
|
||||||
|
try {
|
||||||
|
await helpers.withTimeout(timeout, async () => {
|
||||||
|
const [stepOutputs, executionTime] = await utils.time(() =>
|
||||||
|
this.executeSteps(ctx, job.data.automation.definition.steps)
|
||||||
|
)
|
||||||
|
|
||||||
|
result.steps.push(...stepOutputs)
|
||||||
|
|
||||||
|
console.info(
|
||||||
|
`Automation ID: ${
|
||||||
|
this.automation._id
|
||||||
|
} Execution time: ${executionTime.toMs()} milliseconds`,
|
||||||
|
{
|
||||||
|
_logKey: "automation",
|
||||||
|
executionTime,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
})
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.errno === "ETIME") {
|
||||||
|
span?.addTags({ timedOut: true })
|
||||||
|
console.warn(`Automation execution timed out after ${timeout}ms`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let errorCount = 0
|
||||||
|
if (this.isProdApp() && this.isCron() && this.hasErrored(ctx)) {
|
||||||
|
errorCount = (await this.incrementErrorCount()) || 0
|
||||||
|
}
|
||||||
|
|
||||||
|
if (errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
||||||
|
await this.stopCron("errors", { result })
|
||||||
|
span?.addTags({ shouldStop: true })
|
||||||
|
} else {
|
||||||
|
await this.logResult(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
private async executeSteps(
|
private async executeSteps(
|
||||||
ctx: AutomationContext,
|
ctx: AutomationContext,
|
||||||
steps: AutomationStep[]
|
steps: AutomationStep[]
|
||||||
): Promise<AutomationStepResult[]> {
|
): Promise<AutomationStepResult[]> {
|
||||||
return tracer.trace("Orchestrator.executeSteps", async () => {
|
return await tracer.trace("executeSteps", async () => {
|
||||||
let stepIndex = 0
|
let stepIndex = 0
|
||||||
const results: AutomationStepResult[] = []
|
const results: AutomationStepResult[] = []
|
||||||
|
|
||||||
|
@ -446,74 +446,92 @@ class Orchestrator {
|
||||||
step: LoopStep,
|
step: LoopStep,
|
||||||
stepToLoop: AutomationStep
|
stepToLoop: AutomationStep
|
||||||
): Promise<AutomationStepResult> {
|
): Promise<AutomationStepResult> {
|
||||||
await processObject(step.inputs, prepareContext(ctx))
|
return await tracer.trace("executeLoopStep", async span => {
|
||||||
|
await processObject(step.inputs, prepareContext(ctx))
|
||||||
|
|
||||||
const maxIterations = getLoopMaxIterations(step)
|
const maxIterations = getLoopMaxIterations(step)
|
||||||
const items: Record<string, any>[] = []
|
const items: Record<string, any>[] = []
|
||||||
let iterations = 0
|
let iterations = 0
|
||||||
let iterable: any[] = []
|
let iterable: any[] = []
|
||||||
try {
|
try {
|
||||||
iterable = getLoopIterable(step)
|
iterable = getLoopIterable(step)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return stepFailure(stepToLoop, {
|
span.addTags({
|
||||||
status: AutomationStepStatus.INCORRECT_TYPE,
|
status: AutomationStepStatus.INCORRECT_TYPE,
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
for (; iterations < iterable.length; iterations++) {
|
|
||||||
const currentItem = iterable[iterations]
|
|
||||||
|
|
||||||
if (iterations === maxIterations) {
|
|
||||||
return stepFailure(stepToLoop, {
|
|
||||||
status: AutomationStepStatus.MAX_ITERATIONS,
|
|
||||||
iterations,
|
iterations,
|
||||||
})
|
})
|
||||||
}
|
|
||||||
|
|
||||||
if (matchesLoopFailureCondition(step, currentItem)) {
|
|
||||||
return stepFailure(stepToLoop, {
|
return stepFailure(stepToLoop, {
|
||||||
status: AutomationStepStatus.FAILURE_CONDITION,
|
status: AutomationStepStatus.INCORRECT_TYPE,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx.loop = { currentItem }
|
for (; iterations < iterable.length; iterations++) {
|
||||||
const result = await this.executeStep(ctx, stepToLoop)
|
const currentItem = iterable[iterations]
|
||||||
items.push(result.outputs)
|
|
||||||
ctx.loop = undefined
|
|
||||||
}
|
|
||||||
|
|
||||||
const status =
|
if (iterations === maxIterations) {
|
||||||
iterations === 0 ? AutomationStatus.NO_CONDITION_MET : undefined
|
span.addTags({
|
||||||
return stepSuccess(stepToLoop, { status, iterations, items })
|
status: AutomationStepStatus.MAX_ITERATIONS,
|
||||||
|
iterations,
|
||||||
|
})
|
||||||
|
return stepFailure(stepToLoop, {
|
||||||
|
status: AutomationStepStatus.MAX_ITERATIONS,
|
||||||
|
iterations,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
if (matchesLoopFailureCondition(step, currentItem)) {
|
||||||
|
span.addTags({
|
||||||
|
status: AutomationStepStatus.FAILURE_CONDITION,
|
||||||
|
iterations,
|
||||||
|
})
|
||||||
|
return stepFailure(stepToLoop, {
|
||||||
|
status: AutomationStepStatus.FAILURE_CONDITION,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.loop = { currentItem }
|
||||||
|
const result = await this.executeStep(ctx, stepToLoop)
|
||||||
|
items.push(result.outputs)
|
||||||
|
ctx.loop = undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
const status =
|
||||||
|
iterations === 0 ? AutomationStepStatus.NO_ITERATIONS : undefined
|
||||||
|
return stepSuccess(stepToLoop, { status, iterations, items })
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
private async executeBranchStep(
|
private async executeBranchStep(
|
||||||
ctx: AutomationContext,
|
ctx: AutomationContext,
|
||||||
step: BranchStep
|
step: BranchStep
|
||||||
): Promise<AutomationStepResult[]> {
|
): Promise<AutomationStepResult[]> {
|
||||||
const { branches, children } = step.inputs
|
return await tracer.trace("executeBranchStep", async span => {
|
||||||
|
const { branches, children } = step.inputs
|
||||||
|
|
||||||
for (const branch of branches) {
|
for (const branch of branches) {
|
||||||
if (await branchMatches(ctx, branch)) {
|
if (await branchMatches(ctx, branch)) {
|
||||||
return [
|
span.addTags({ branchName: branch.name, branchId: branch.id })
|
||||||
stepSuccess(step, {
|
return [
|
||||||
branchName: branch.name,
|
stepSuccess(step, {
|
||||||
status: `${branch.name} branch taken`,
|
branchName: branch.name,
|
||||||
branchId: `${branch.id}`,
|
status: `${branch.name} branch taken`,
|
||||||
}),
|
branchId: `${branch.id}`,
|
||||||
...(await this.executeSteps(ctx, children?.[branch.id] || [])),
|
}),
|
||||||
]
|
...(await this.executeSteps(ctx, children?.[branch.id] || [])),
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return [stepFailure(step, { status: AutomationStatus.NO_CONDITION_MET })]
|
span.addTags({ status: AutomationStatus.NO_CONDITION_MET })
|
||||||
|
return [stepFailure(step, { status: AutomationStatus.NO_CONDITION_MET })]
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
private async executeStep(
|
private async executeStep(
|
||||||
ctx: AutomationContext,
|
ctx: AutomationContext,
|
||||||
step: Readonly<AutomationStep>
|
step: Readonly<AutomationStep>
|
||||||
): Promise<AutomationStepResult> {
|
): Promise<AutomationStepResult> {
|
||||||
return tracer.trace("Orchestrator.executeStep", async span => {
|
return await tracer.trace(step.stepId, async span => {
|
||||||
span.addTags({
|
span.addTags({
|
||||||
step: {
|
step: {
|
||||||
stepId: step.stepId,
|
stepId: step.stepId,
|
||||||
|
@ -524,6 +542,7 @@ class Orchestrator {
|
||||||
internal: step.internal,
|
internal: step.internal,
|
||||||
deprecated: step.deprecated,
|
deprecated: step.deprecated,
|
||||||
},
|
},
|
||||||
|
inputsKeys: Object.keys(step.inputs),
|
||||||
})
|
})
|
||||||
|
|
||||||
if (this.stopped) {
|
if (this.stopped) {
|
||||||
|
@ -557,6 +576,7 @@ class Orchestrator {
|
||||||
;(outputs as any).status = AutomationStatus.STOPPED
|
;(outputs as any).status = AutomationStatus.STOPPED
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.addTags({ outputsKeys: Object.keys(outputs) })
|
||||||
return stepSuccess(step, outputs, inputs)
|
return stepSuccess(step, outputs, inputs)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue