diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 6833c9a306..867388ae07 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -199,6 +199,12 @@ class InMemoryQueue implements Partial { return this as unknown as Queue } + off(event: string, callback: (...args: any[]) => void): Queue { + // @ts-expect-error - this callback can be one of many types + this._emitter.off(event, callback) + return this as unknown as Queue + } + async count() { return this._messages.length } diff --git a/packages/server/src/api/routes/tests/queries/generic-sql.spec.ts b/packages/server/src/api/routes/tests/queries/generic-sql.spec.ts index 13dd1b719b..863f5b65e0 100644 --- a/packages/server/src/api/routes/tests/queries/generic-sql.spec.ts +++ b/packages/server/src/api/routes/tests/queries/generic-sql.spec.ts @@ -182,7 +182,7 @@ if (descriptions.length) { }, }) - await config.api.application.publish(config.getAppId()) + await config.api.application.publish() const prodQuery = await config.api.query.getProd(query._id!) expect(prodQuery._id).toEqual(query._id) diff --git a/packages/server/src/automations/tests/triggers/cron.spec.ts b/packages/server/src/automations/tests/triggers/cron.spec.ts index 28ef49e5f5..61ef906f65 100644 --- a/packages/server/src/automations/tests/triggers/cron.spec.ts +++ b/packages/server/src/automations/tests/triggers/cron.spec.ts @@ -1,7 +1,6 @@ import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" import TestConfiguration from "../../../tests/utilities/TestConfiguration" -import { getQueue } from "../.." -import { Job } from "bull" +import { captureAutomationRuns } from "../utilities" describe("cron trigger", () => { const config = new TestConfiguration() @@ -15,15 +14,6 @@ describe("cron trigger", () => { }) it("should queue a Bull cron job", async () => { - const queue = getQueue() - expect(await queue.getCompletedCount()).toEqual(0) - - const jobPromise = new Promise(resolve => { - queue.on("completed", async job => { - resolve(job) - }) - }) - await createAutomationBuilder(config) .onCron({ cron: "* * * * *" }) .serverLog({ @@ -31,12 +21,12 @@ describe("cron trigger", () => { }) .save() - await config.api.application.publish(config.getAppId()) + const jobs = await captureAutomationRuns(() => + config.api.application.publish() + ) + expect(jobs).toHaveLength(1) - expect(await queue.getCompletedCount()).toEqual(1) - - const job = await jobPromise - const repeat = job.opts?.repeat + const repeat = jobs[0].opts?.repeat if (!repeat || !("cron" in repeat)) { throw new Error("Expected cron repeat") } diff --git a/packages/server/src/automations/tests/triggers/rowSaved.spec.ts b/packages/server/src/automations/tests/triggers/rowSaved.spec.ts new file mode 100644 index 0000000000..4bbd1b74df --- /dev/null +++ b/packages/server/src/automations/tests/triggers/rowSaved.spec.ts @@ -0,0 +1,52 @@ +import { createAutomationBuilder } from "../utilities/AutomationTestBuilder" +import TestConfiguration from "../../../tests/utilities/TestConfiguration" +import { Table } from "@budibase/types" +import { basicTable } from "../../../tests/utilities/structures" +import { captureAutomationRuns } from "../utilities" + +describe("row saved trigger", () => { + const config = new TestConfiguration() + let table: Table + + beforeAll(async () => { + await config.init() + table = await config.api.table.save(basicTable()) + await createAutomationBuilder(config) + .onRowSaved({ tableId: table._id! }) + .serverLog({ text: "Row created!" }) + .save() + + await config.api.application.publish() + }) + + afterAll(() => { + config.end() + }) + + it("should queue a Bull job when a row is created", async () => { + const jobs = await captureAutomationRuns(() => + config.withProdApp(() => config.api.row.save(table._id!, { name: "foo" })) + ) + + expect(jobs).toHaveLength(1) + expect(jobs[0].data.event).toEqual( + expect.objectContaining({ + tableId: table._id!, + row: expect.objectContaining({ name: "foo" }), + }) + ) + }) + + it("should not fire for rows created in other tables", async () => { + const otherTable = await config.api.table.save(basicTable()) + await config.api.application.publish() + + const jobs = await captureAutomationRuns(() => + config.withProdApp(() => + config.api.row.save(otherTable._id!, { name: "foo" }) + ) + ) + + expect(jobs).toBeEmpty() + }) +}) diff --git a/packages/server/src/automations/tests/utilities/index.ts b/packages/server/src/automations/tests/utilities/index.ts index 16c23f5db1..ffecab8680 100644 --- a/packages/server/src/automations/tests/utilities/index.ts +++ b/packages/server/src/automations/tests/utilities/index.ts @@ -3,8 +3,15 @@ import { context } from "@budibase/backend-core" import { BUILTIN_ACTION_DEFINITIONS, getAction } from "../../actions" import emitter from "../../../events/index" import env from "../../../environment" -import { AutomationActionStepId, Datasource } from "@budibase/types" +import { + AutomationActionStepId, + AutomationData, + Datasource, +} from "@budibase/types" import { Knex } from "knex" +import { getQueue } from "../.." +import { Job } from "bull" +import { helpers } from "@budibase/shared-core" let config: TestConfiguration @@ -63,6 +70,44 @@ export async function runStep( } } +/** + * Capture all automation runs that occur during the execution of a function. + * This function will wait for all messages to be processed before returning. + */ +export async function captureAutomationRuns( + f: () => Promise +): Promise[]> { + const runs: Job[] = [] + const queue = getQueue() + let messagesReceived = 0 + + const completedListener = async (job: Job) => { + runs.push(job) + messagesReceived-- + } + const messageListener = async () => { + messagesReceived++ + } + queue.on("message", messageListener) + queue.on("completed", completedListener) + try { + await f() + // Queue messages tend to be send asynchronously in API handlers, so there's + // no guarantee that awaiting this function will have queued anything yet. + // We wait here to make sure we're queued _after_ any existing async work. + await helpers.wait(100) + } finally { + // eslint-disable-next-line no-unmodified-loop-condition + while (messagesReceived > 0) { + await helpers.wait(50) + } + queue.off("completed", completedListener) + queue.off("message", messageListener) + } + + return runs +} + export async function saveTestQuery( config: TestConfiguration, client: Knex, diff --git a/packages/server/src/tests/utilities/api/application.ts b/packages/server/src/tests/utilities/api/application.ts index d0fcb60804..48877196fa 100644 --- a/packages/server/src/tests/utilities/api/application.ts +++ b/packages/server/src/tests/utilities/api/application.ts @@ -34,9 +34,12 @@ export class ApplicationAPI extends TestAPI { } publish = async ( - appId: string, + appId?: string, expectations?: Expectations ): Promise => { + if (!appId) { + appId = this.config.getAppId() + } return await this._post( `/api/applications/${appId}/publish`, {