Speed up server tests.

This commit is contained in:
Sam Rose 2025-03-04 15:42:48 +00:00
parent 2c43740500
commit 4abf0e8431
No known key found for this signature in database
11 changed files with 363 additions and 331 deletions

View File

@ -88,6 +88,16 @@ export default async function setup() {
content: ` content: `
[log] [log]
level = warn level = warn
[httpd]
socket_options = [{nodelay, true}]
[couchdb]
single_node = true
[cluster]
n = 1
q = 1
`, `,
target: "/opt/couchdb/etc/local.d/test-couchdb.ini", target: "/opt/couchdb/etc/local.d/test-couchdb.ini",
}, },

View File

@ -3,7 +3,6 @@ import { newid } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue" import { Queue, QueueOptions, JobOptions } from "./queue"
import { helpers } from "@budibase/shared-core" import { helpers } from "@budibase/shared-core"
import { Job, JobId, JobInformation } from "bull" import { Job, JobId, JobInformation } from "bull"
import { cloneDeep } from "lodash"
function jobToJobInformation(job: Job): JobInformation { function jobToJobInformation(job: Job): JobInformation {
let cron = "" let cron = ""
@ -89,7 +88,7 @@ export class InMemoryQueue<T = any> implements Partial<Queue<T>> {
async process(concurrencyOrFunc: number | any, func?: any) { async process(concurrencyOrFunc: number | any, func?: any) {
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
this._emitter.on("message", async msg => { this._emitter.on("message", async msg => {
const message = cloneDeep(msg) const message = msg
// For the purpose of testing, don't trigger cron jobs immediately. // For the purpose of testing, don't trigger cron jobs immediately.
// Require the test to trigger them manually with timestamps. // Require the test to trigger them manually with timestamps.
@ -165,6 +164,9 @@ export class InMemoryQueue<T = any> implements Partial<Queue<T>> {
opts, opts,
} }
this._messages.push(message) this._messages.push(message)
if (this._messages.length > 1000) {
this._messages.shift()
}
this._addCount++ this._addCount++
this._emitter.emit("message", message) this._emitter.emit("message", message)
} }

@ -1 +1 @@
Subproject commit b28dbd549284cf450be7f25ad85aadf614d08f0b Subproject commit f8af563b6a78391d6e19fd0c94fc78724c27ee83

View File

@ -166,17 +166,17 @@ if (descriptions.length) {
) )
} }
const resetRowUsage = async () => { // const resetRowUsage = async () => {
await config.doInContext( // await config.doInContext(
undefined, // undefined,
async () => // async () =>
await quotas.setUsage( // await quotas.setUsage(
0, // 0,
StaticQuotaName.ROWS, // StaticQuotaName.ROWS,
QuotaUsageType.STATIC // QuotaUsageType.STATIC
) // )
) // )
} // }
const getRowUsage = async () => { const getRowUsage = async () => {
const { total } = await config.doInContext(undefined, () => const { total } = await config.doInContext(undefined, () =>
@ -188,20 +188,30 @@ if (descriptions.length) {
return total return total
} }
const assertRowUsage = async (expected: number) => { async function expectRowUsage(expected: number, f: () => Promise<void>) {
const usage = await getRowUsage() return await quotas.withEnabled(async () => {
const before = await getRowUsage()
await f()
const after = await getRowUsage()
const usage = after - before
// Because our quota tracking is not perfect, we allow a 10% margin of // Because our quota tracking is not perfect, we allow a 10% margin of
// error. This is to account for the fact that parallel writes can result // error. This is to account for the fact that parallel writes can
// in some quota updates getting lost. We don't have any need to solve this // result in some quota updates getting lost. We don't have any need
// right now, so we just allow for some error. // to solve this right now, so we just allow for some error.
if (expected === 0) { if (expected === 0) {
expect(usage).toEqual(0) expect(usage).toEqual(0)
return return
} }
if (usage < 0) {
expect(usage).toBeGreaterThan(expected * 1.1)
expect(usage).toBeLessThan(expected * 0.9)
} else {
expect(usage).toBeGreaterThan(expected * 0.9) expect(usage).toBeGreaterThan(expected * 0.9)
expect(usage).toBeLessThan(expected * 1.1) expect(usage).toBeLessThan(expected * 1.1)
} }
})
}
const defaultRowFields = isInternal const defaultRowFields = isInternal
? { ? {
@ -216,28 +226,28 @@ if (descriptions.length) {
}) })
beforeEach(async () => { beforeEach(async () => {
await resetRowUsage() // await resetRowUsage()
}) })
describe("create", () => { describe("create", () => {
it("creates a new row successfully", async () => { it("creates a new row successfully", async () => {
const rowUsage = await getRowUsage() await expectRowUsage(isInternal ? 1 : 0, async () => {
const row = await config.api.row.save(table._id!, { const row = await config.api.row.save(table._id!, {
name: "Test Contact", name: "Test Contact",
}) })
expect(row.name).toEqual("Test Contact") expect(row.name).toEqual("Test Contact")
expect(row._rev).toBeDefined() expect(row._rev).toBeDefined()
await assertRowUsage(isInternal ? rowUsage + 1 : rowUsage) })
}) })
it("fails to create a row for a table that does not exist", async () => { it("fails to create a row for a table that does not exist", async () => {
const rowUsage = await getRowUsage() await expectRowUsage(0, async () => {
await config.api.row.save("1234567", {}, { status: 404 }) await config.api.row.save("1234567", {}, { status: 404 })
await assertRowUsage(rowUsage) })
}) })
it("fails to create a row if required fields are missing", async () => { it("fails to create a row if required fields are missing", async () => {
const rowUsage = await getRowUsage() await expectRowUsage(0, async () => {
const table = await config.api.table.save( const table = await config.api.table.save(
saveTableRequest({ saveTableRequest({
schema: { schema: {
@ -264,13 +274,12 @@ if (descriptions.length) {
}, },
} }
) )
await assertRowUsage(rowUsage) })
}) })
isInternal && isInternal &&
it("increment row autoId per create row request", async () => { it("increment row autoId per create row request", async () => {
const rowUsage = await getRowUsage() await expectRowUsage(isInternal ? 10 : 0, async () => {
const newTable = await config.api.table.save( const newTable = await config.api.table.save(
saveTableRequest({ saveTableRequest({
schema: { schema: {
@ -299,7 +308,7 @@ if (descriptions.length) {
expect(row["Row ID"]).toBeGreaterThan(previousId) expect(row["Row ID"]).toBeGreaterThan(previousId)
previousId = row["Row ID"] previousId = row["Row ID"]
} }
await assertRowUsage(isInternal ? rowUsage + 10 : rowUsage) })
}) })
isInternal && isInternal &&
@ -985,8 +994,8 @@ if (descriptions.length) {
describe("update", () => { describe("update", () => {
it("updates an existing row successfully", async () => { it("updates an existing row successfully", async () => {
const existing = await config.api.row.save(table._id!, {}) const existing = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage()
await expectRowUsage(0, async () => {
const res = await config.api.row.save(table._id!, { const res = await config.api.row.save(table._id!, {
_id: existing._id, _id: existing._id,
_rev: existing._rev, _rev: existing._rev,
@ -994,7 +1003,7 @@ if (descriptions.length) {
}) })
expect(res.name).toEqual("Updated Name") expect(res.name).toEqual("Updated Name")
await assertRowUsage(rowUsage) })
}) })
!isInternal && !isInternal &&
@ -1177,8 +1186,7 @@ if (descriptions.length) {
it("should update only the fields that are supplied", async () => { it("should update only the fields that are supplied", async () => {
const existing = await config.api.row.save(table._id!, {}) const existing = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage() await expectRowUsage(0, async () => {
const row = await config.api.row.patch(table._id!, { const row = await config.api.row.patch(table._id!, {
_id: existing._id!, _id: existing._id!,
_rev: existing._rev!, _rev: existing._rev!,
@ -1193,7 +1201,7 @@ if (descriptions.length) {
expect(savedRow.description).toEqual(existing.description) expect(savedRow.description).toEqual(existing.description)
expect(savedRow.name).toEqual("Updated Name") expect(savedRow.name).toEqual("Updated Name")
await assertRowUsage(rowUsage) })
}) })
it("should update only the fields that are supplied and emit the correct oldRow", async () => { it("should update only the fields that are supplied and emit the correct oldRow", async () => {
@ -1224,8 +1232,8 @@ if (descriptions.length) {
it("should throw an error when given improper types", async () => { it("should throw an error when given improper types", async () => {
const existing = await config.api.row.save(table._id!, {}) const existing = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage()
await expectRowUsage(0, async () => {
await config.api.row.patch( await config.api.row.patch(
table._id!, table._id!,
{ {
@ -1236,8 +1244,7 @@ if (descriptions.length) {
}, },
{ status: 400 } { status: 400 }
) )
})
await assertRowUsage(rowUsage)
}) })
it("should not overwrite links if those links are not set", async () => { it("should not overwrite links if those links are not set", async () => {
@ -1452,25 +1459,25 @@ if (descriptions.length) {
it("should be able to delete a row", async () => { it("should be able to delete a row", async () => {
const createdRow = await config.api.row.save(table._id!, {}) const createdRow = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage()
await expectRowUsage(isInternal ? -1 : 0, async () => {
const res = await config.api.row.bulkDelete(table._id!, { const res = await config.api.row.bulkDelete(table._id!, {
rows: [createdRow], rows: [createdRow],
}) })
expect(res[0]._id).toEqual(createdRow._id) expect(res[0]._id).toEqual(createdRow._id)
await assertRowUsage(isInternal ? rowUsage - 1 : rowUsage) })
}) })
it("should be able to delete a row with ID only", async () => { it("should be able to delete a row with ID only", async () => {
const createdRow = await config.api.row.save(table._id!, {}) const createdRow = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage()
await expectRowUsage(isInternal ? -1 : 0, async () => {
const res = await config.api.row.bulkDelete(table._id!, { const res = await config.api.row.bulkDelete(table._id!, {
rows: [createdRow._id!], rows: [createdRow._id!],
}) })
expect(res[0]._id).toEqual(createdRow._id) expect(res[0]._id).toEqual(createdRow._id)
expect(res[0].tableId).toEqual(table._id!) expect(res[0].tableId).toEqual(table._id!)
await assertRowUsage(isInternal ? rowUsage - 1 : rowUsage) })
}) })
it("should be able to bulk delete rows, including a row that doesn't exist", async () => { it("should be able to bulk delete rows, including a row that doesn't exist", async () => {
@ -1560,20 +1567,18 @@ if (descriptions.length) {
}) })
it("should return no errors on valid row", async () => { it("should return no errors on valid row", async () => {
const rowUsage = await getRowUsage() await expectRowUsage(0, async () => {
const res = await config.api.row.validate(table._id!, { const res = await config.api.row.validate(table._id!, {
name: "ivan", name: "ivan",
}) })
expect(res.valid).toBe(true) expect(res.valid).toBe(true)
expect(Object.keys(res.errors)).toEqual([]) expect(Object.keys(res.errors)).toEqual([])
await assertRowUsage(rowUsage) })
}) })
it("should errors on invalid row", async () => { it("should errors on invalid row", async () => {
const rowUsage = await getRowUsage() await expectRowUsage(0, async () => {
const res = await config.api.row.validate(table._id!, { name: 1 }) const res = await config.api.row.validate(table._id!, { name: 1 })
if (isInternal) { if (isInternal) {
@ -1584,7 +1589,7 @@ if (descriptions.length) {
expect(res.valid).toBe(true) expect(res.valid).toBe(true)
expect(Object.keys(res.errors)).toEqual([]) expect(Object.keys(res.errors)).toEqual([])
} }
await assertRowUsage(rowUsage) })
}) })
}) })
@ -1596,15 +1601,15 @@ if (descriptions.length) {
it("should be able to delete a bulk set of rows", async () => { it("should be able to delete a bulk set of rows", async () => {
const row1 = await config.api.row.save(table._id!, {}) const row1 = await config.api.row.save(table._id!, {})
const row2 = await config.api.row.save(table._id!, {}) const row2 = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage()
await expectRowUsage(isInternal ? -2 : 0, async () => {
const res = await config.api.row.bulkDelete(table._id!, { const res = await config.api.row.bulkDelete(table._id!, {
rows: [row1, row2], rows: [row1, row2],
}) })
expect(res.length).toEqual(2) expect(res.length).toEqual(2)
await config.api.row.get(table._id!, row1._id!, { status: 404 }) await config.api.row.get(table._id!, row1._id!, { status: 404 })
await assertRowUsage(isInternal ? rowUsage - 2 : rowUsage) })
}) })
it("should be able to delete a variety of row set types", async () => { it("should be able to delete a variety of row set types", async () => {
@ -1613,41 +1618,42 @@ if (descriptions.length) {
config.api.row.save(table._id!, {}), config.api.row.save(table._id!, {}),
config.api.row.save(table._id!, {}), config.api.row.save(table._id!, {}),
]) ])
const rowUsage = await getRowUsage()
await expectRowUsage(isInternal ? -3 : 0, async () => {
const res = await config.api.row.bulkDelete(table._id!, { const res = await config.api.row.bulkDelete(table._id!, {
rows: [row1, row2._id!, { _id: row3._id }], rows: [row1, row2._id!, { _id: row3._id }],
}) })
expect(res.length).toEqual(3) expect(res.length).toEqual(3)
await config.api.row.get(table._id!, row1._id!, { status: 404 }) await config.api.row.get(table._id!, row1._id!, { status: 404 })
await assertRowUsage(isInternal ? rowUsage - 3 : rowUsage) })
}) })
it("should accept a valid row object and delete the row", async () => { it("should accept a valid row object and delete the row", async () => {
const row1 = await config.api.row.save(table._id!, {}) const row1 = await config.api.row.save(table._id!, {})
const rowUsage = await getRowUsage()
const res = await config.api.row.delete(table._id!, row1 as DeleteRow) await expectRowUsage(isInternal ? -1 : 0, async () => {
const res = await config.api.row.delete(
table._id!,
row1 as DeleteRow
)
expect(res.id).toEqual(row1._id) expect(res.id).toEqual(row1._id)
await config.api.row.get(table._id!, row1._id!, { status: 404 }) await config.api.row.get(table._id!, row1._id!, { status: 404 })
await assertRowUsage(isInternal ? rowUsage - 1 : rowUsage) })
}) })
it.each([{ not: "valid" }, { rows: 123 }, "invalid"])( it.each([{ not: "valid" }, { rows: 123 }, "invalid"])(
"should ignore malformed/invalid delete request: %s", "should ignore malformed/invalid delete request: %s",
async (request: any) => { async (request: any) => {
const rowUsage = await getRowUsage() await expectRowUsage(0, async () => {
await config.api.row.delete(table._id!, request, { await config.api.row.delete(table._id!, request, {
status: 400, status: 400,
body: { body: {
message: "Invalid delete rows request", message: "Invalid delete rows request",
}, },
}) })
})
await assertRowUsage(rowUsage)
} }
) )
}) })
@ -1733,8 +1739,7 @@ if (descriptions.length) {
}) })
) )
const rowUsage = await getRowUsage() await expectRowUsage(isInternal ? 2 : 0, async () => {
await config.api.row.bulkImport(table._id!, { await config.api.row.bulkImport(table._id!, {
rows: [ rows: [
{ {
@ -1756,8 +1761,7 @@ if (descriptions.length) {
expect(rows[0].description).toEqual("Row 1 description") expect(rows[0].description).toEqual("Row 1 description")
expect(rows[1].name).toEqual("Row 2") expect(rows[1].name).toEqual("Row 2")
expect(rows[1].description).toEqual("Row 2 description") expect(rows[1].description).toEqual("Row 2 description")
})
await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage)
}) })
isInternal && isInternal &&
@ -1782,8 +1786,7 @@ if (descriptions.length) {
description: "Existing description", description: "Existing description",
}) })
const rowUsage = await getRowUsage() await expectRowUsage(2, async () => {
await config.api.row.bulkImport(table._id!, { await config.api.row.bulkImport(table._id!, {
rows: [ rows: [
{ {
@ -1809,8 +1812,7 @@ if (descriptions.length) {
expect(rows[1].description).toEqual("Row 2 description") expect(rows[1].description).toEqual("Row 2 description")
expect(rows[2].name).toEqual("Updated existing row") expect(rows[2].name).toEqual("Updated existing row")
expect(rows[2].description).toEqual("Existing description") expect(rows[2].description).toEqual("Existing description")
})
await assertRowUsage(rowUsage + 2)
}) })
isInternal && isInternal &&
@ -1835,8 +1837,7 @@ if (descriptions.length) {
description: "Existing description", description: "Existing description",
}) })
const rowUsage = await getRowUsage() await expectRowUsage(3, async () => {
await config.api.row.bulkImport(table._id!, { await config.api.row.bulkImport(table._id!, {
rows: [ rows: [
{ {
@ -1863,8 +1864,7 @@ if (descriptions.length) {
expect(rows[2].description).toEqual("Row 2 description") expect(rows[2].description).toEqual("Row 2 description")
expect(rows[3].name).toEqual("Updated existing row") expect(rows[3].name).toEqual("Updated existing row")
expect(rows[3].description).toEqual("Existing description") expect(rows[3].description).toEqual("Existing description")
})
await assertRowUsage(rowUsage + 3)
}) })
// Upserting isn't yet supported in MSSQL / Oracle, see: // Upserting isn't yet supported in MSSQL / Oracle, see:
@ -2187,8 +2187,8 @@ if (descriptions.length) {
return { linkedTable, firstRow, secondRow } return { linkedTable, firstRow, secondRow }
} }
) )
const rowUsage = await getRowUsage()
await expectRowUsage(0, async () => {
// test basic enrichment // test basic enrichment
const resBasic = await config.api.row.get( const resBasic = await config.api.row.get(
linkedTable._id!, linkedTable._id!,
@ -2209,7 +2209,7 @@ if (descriptions.length) {
expect(resEnriched.link[0]._id).toBe(firstRow._id) expect(resEnriched.link[0]._id).toBe(firstRow._id)
expect(resEnriched.link[0].name).toBe("Test Contact") expect(resEnriched.link[0].name).toBe("Test Contact")
expect(resEnriched.link[0].description).toBe("original description") expect(resEnriched.link[0].description).toBe("original description")
await assertRowUsage(rowUsage) })
}) })
}) })

View File

@ -13,24 +13,26 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
import TestConfiguration from "../../../tests/utilities/TestConfiguration" import TestConfiguration from "../../../tests/utilities/TestConfiguration"
describe("Attempt to run a basic loop automation", () => { describe("Attempt to run a basic loop automation", () => {
const config = new TestConfiguration() let config: TestConfiguration
let table: Table let table: Table
beforeAll(async () => { beforeAll(async () => {
await config.init()
await automation.init() await automation.init()
}) })
beforeEach(async () => { beforeEach(async () => {
await config.api.automation.deleteAll() config = new TestConfiguration()
await config.init()
table = await config.api.table.save(basicTable()) table = await config.api.table.save(basicTable())
await config.api.row.save(table._id!, {}) await config.api.row.save(table._id!, {})
}) })
afterEach(async () => {
config.end()
})
afterAll(async () => { afterAll(async () => {
await automation.shutdown() await automation.shutdown()
config.end()
}) })
it("attempt to run a basic loop", async () => { it("attempt to run a basic loop", async () => {

View File

@ -44,11 +44,13 @@ describe("test the openai action", () => {
} }
const expectAIUsage = async <T>(expected: number, f: () => Promise<T>) => { const expectAIUsage = async <T>(expected: number, f: () => Promise<T>) => {
return await quotas.withEnabled(async () => {
const before = await getAIUsage() const before = await getAIUsage()
const result = await f() const result = await f()
const after = await getAIUsage() const after = await getAIUsage()
expect(after - before).toEqual(expected) expect(after - before).toEqual(expected)
return result return result
})
} }
it("should be able to receive a response from ChatGPT given a prompt", async () => { it("should be able to receive a response from ChatGPT given a prompt", async () => {

View File

@ -20,9 +20,12 @@ export interface TriggerOutput {
export interface AutomationContext { export interface AutomationContext {
trigger: AutomationTriggerResultOutputs trigger: AutomationTriggerResultOutputs
steps: [AutomationTriggerResultOutputs, ...AutomationStepResultOutputs[]] steps: Record<
stepsById: Record<string, AutomationStepResultOutputs> string,
AutomationStepResultOutputs | AutomationTriggerResultOutputs
>
stepsByName: Record<string, AutomationStepResultOutputs> stepsByName: Record<string, AutomationStepResultOutputs>
stepsById: Record<string, AutomationStepResultOutputs>
env?: Record<string, string> env?: Record<string, string>
user?: UserBindings user?: UserBindings
settings?: { settings?: {
@ -31,4 +34,6 @@ export interface AutomationContext {
company?: string company?: string
} }
loop?: { currentItem: any } loop?: { currentItem: any }
_stepIndex: number
_error: boolean
} }

View File

@ -32,6 +32,8 @@ class AutomationEmitter implements ContextEmitter {
if (chainAutomations === true) { if (chainAutomations === true) {
return MAX_AUTOMATIONS_ALLOWED return MAX_AUTOMATIONS_ALLOWED
} else if (env.isTest()) {
return 0
} else if (chainAutomations === undefined && env.SELF_HOSTED) { } else if (chainAutomations === undefined && env.SELF_HOSTED) {
return MAX_AUTOMATIONS_ALLOWED return MAX_AUTOMATIONS_ALLOWED
} else { } else {

View File

@ -3,6 +3,7 @@ import * as matchers from "jest-extended"
import { env as coreEnv, timers } from "@budibase/backend-core" import { env as coreEnv, timers } from "@budibase/backend-core"
import { testContainerUtils } from "@budibase/backend-core/tests" import { testContainerUtils } from "@budibase/backend-core/tests"
import nock from "nock" import nock from "nock"
import { quotas } from "@budibase/pro"
expect.extend(matchers) expect.extend(matchers)
if (!process.env.CI) { if (!process.env.CI) {
@ -23,6 +24,10 @@ nock.enableNetConnect(host => {
testContainerUtils.setupEnv(env, coreEnv) testContainerUtils.setupEnv(env, coreEnv)
afterAll(() => { beforeAll(async () => {
quotas.disable()
})
afterAll(async () => {
timers.cleanup() timers.cleanup()
}) })

View File

@ -146,8 +146,9 @@ export abstract class TestAPI {
} }
} }
let resp: Response | undefined = undefined
try { try {
return await req resp = await req
} catch (e: any) { } catch (e: any) {
// We've found that occasionally the connection between supertest and the // We've found that occasionally the connection between supertest and the
// server supertest starts gets reset. Not sure why, but retrying it // server supertest starts gets reset. Not sure why, but retrying it
@ -161,6 +162,7 @@ export abstract class TestAPI {
} }
throw e throw e
} }
return resp
} }
protected async getHeaders( protected async getHeaders(

View File

@ -143,7 +143,6 @@ async function branchMatches(
branch: Readonly<Branch> branch: Readonly<Branch>
): Promise<boolean> { ): Promise<boolean> {
const toFilter: Record<string, any> = {} const toFilter: Record<string, any> = {}
const preparedCtx = prepareContext(ctx)
// Because we allow bindings on both the left and right of each condition in // Because we allow bindings on both the left and right of each condition in
// automation branches, we can't pass the BranchSearchFilters directly to // automation branches, we can't pass the BranchSearchFilters directly to
@ -160,9 +159,9 @@ async function branchMatches(
filter.conditions = filter.conditions.map(evaluateBindings) filter.conditions = filter.conditions.map(evaluateBindings)
} else { } else {
for (const [field, value] of Object.entries(filter)) { for (const [field, value] of Object.entries(filter)) {
toFilter[field] = processStringSync(field, preparedCtx) toFilter[field] = processStringSync(field, ctx)
if (typeof value === "string" && findHBSBlocks(value).length > 0) { if (typeof value === "string" && findHBSBlocks(value).length > 0) {
filter[field] = processStringSync(value, preparedCtx) filter[field] = processStringSync(value, ctx)
} }
} }
} }
@ -178,17 +177,6 @@ async function branchMatches(
return result.length > 0 return result.length > 0
} }
function prepareContext(context: AutomationContext) {
return {
...context,
steps: {
...context.steps,
...context.stepsById,
...context.stepsByName,
},
}
}
async function enrichBaseContext(context: AutomationContext) { async function enrichBaseContext(context: AutomationContext) {
context.env = await sdkUtils.getEnvironmentVariables() context.env = await sdkUtils.getEnvironmentVariables()
@ -304,41 +292,44 @@ class Orchestrator {
} }
hasErrored(context: AutomationContext): boolean { hasErrored(context: AutomationContext): boolean {
const [_trigger, ...steps] = context.steps return context._error === true
for (const step of steps) { // const [_trigger, ...steps] = context.steps
if (step.success === false) { // for (const step of steps) {
return true // if (step.success === false) {
} // return true
} // }
return false // }
// return false
} }
async execute(): Promise<AutomationResults> { async execute(): Promise<AutomationResults> {
return await tracer.trace("execute", async span => { return await tracer.trace("execute", async span => {
span.addTags({ appId: this.appId, automationId: this.automation._id }) span.addTags({ appId: this.appId, automationId: this.automation._id })
const job = cloneDeep(this.job) const data = cloneDeep(this.job.data)
delete job.data.event.appId delete data.event.appId
delete job.data.event.metadata delete data.event.metadata
if (this.isCron() && !job.data.event.timestamp) { if (this.isCron() && !data.event.timestamp) {
job.data.event.timestamp = Date.now() data.event.timestamp = Date.now()
} }
const trigger: AutomationTriggerResult = { const trigger: AutomationTriggerResult = {
id: job.data.automation.definition.trigger.id, id: data.automation.definition.trigger.id,
stepId: job.data.automation.definition.trigger.stepId, stepId: data.automation.definition.trigger.stepId,
inputs: null, inputs: null,
outputs: job.data.event, outputs: data.event,
} }
const result: AutomationResults = { trigger, steps: [trigger] } const result: AutomationResults = { trigger, steps: [trigger] }
const ctx: AutomationContext = { const ctx: AutomationContext = {
trigger: trigger.outputs, trigger: trigger.outputs,
steps: [trigger.outputs], steps: { "0": trigger.outputs },
stepsById: {},
stepsByName: {}, stepsByName: {},
stepsById: {},
user: trigger.outputs.user, user: trigger.outputs.user,
_error: false,
_stepIndex: 1,
} }
await enrichBaseContext(ctx) await enrichBaseContext(ctx)
@ -348,7 +339,7 @@ class Orchestrator {
try { try {
await helpers.withTimeout(timeout, async () => { await helpers.withTimeout(timeout, async () => {
const [stepOutputs, executionTime] = await utils.time(() => const [stepOutputs, executionTime] = await utils.time(() =>
this.executeSteps(ctx, job.data.automation.definition.steps) this.executeSteps(ctx, data.automation.definition.steps)
) )
result.steps.push(...stepOutputs) result.steps.push(...stepOutputs)
@ -400,9 +391,20 @@ class Orchestrator {
step: AutomationStep, step: AutomationStep,
result: AutomationStepResult result: AutomationStepResult
) { ) {
ctx.steps.push(result.outputs) ctx.steps[step.id] = result.outputs
ctx.steps[step.name || step.id] = result.outputs
ctx.stepsById[step.id] = result.outputs ctx.stepsById[step.id] = result.outputs
ctx.stepsByName[step.name || step.id] = result.outputs ctx.stepsByName[step.name || step.id] = result.outputs
ctx._stepIndex ||= 0
ctx.steps[ctx._stepIndex] = result.outputs
ctx._stepIndex++
if (result.outputs.success === false) {
ctx._error = true
}
results.push(result) results.push(result)
} }
@ -449,7 +451,7 @@ class Orchestrator {
stepToLoop: AutomationStep stepToLoop: AutomationStep
): Promise<AutomationStepResult> { ): Promise<AutomationStepResult> {
return await tracer.trace("executeLoopStep", async span => { return await tracer.trace("executeLoopStep", async span => {
await processObject(step.inputs, prepareContext(ctx)) await processObject(step.inputs, ctx)
const maxIterations = getLoopMaxIterations(step) const maxIterations = getLoopMaxIterations(step)
const items: Record<string, any>[] = [] const items: Record<string, any>[] = []
@ -558,7 +560,7 @@ class Orchestrator {
} }
const inputs = automationUtils.cleanInputValues( const inputs = automationUtils.cleanInputValues(
await processObject(cloneDeep(step.inputs), prepareContext(ctx)), await processObject(cloneDeep(step.inputs), ctx),
step.schema.inputs.properties step.schema.inputs.properties
) )
@ -566,7 +568,7 @@ class Orchestrator {
inputs, inputs,
appId: this.appId, appId: this.appId,
emitter: this.emitter, emitter: this.emitter,
context: prepareContext(ctx), context: ctx,
}) })
if ( if (