diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 47b3f0672f..b72651e21f 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -1,7 +1,12 @@ import tk from "timekeeper" import _ from "lodash" -import { DBTestConfiguration, generator, structures } from "../../../tests" +import { + DBTestConfiguration, + generator, + structures, + utils, +} from "../../../tests" import { getDB } from "../../db" import { @@ -10,15 +15,14 @@ import { init, } from "../docWritethrough" -import InMemoryQueue from "../../queue/inMemoryQueue" - const initialTime = Date.now() async function waitForQueueCompletion() { - const queue: InMemoryQueue = DocWritethroughProcessor.queue as never - await queue.waitForCompletion() + await utils.queue.processMessages(DocWritethroughProcessor.queue) } +beforeAll(() => utils.queue.useRealQueues()) + describe("docWritethrough", () => { beforeAll(() => { init() @@ -67,7 +71,7 @@ describe("docWritethrough", () => { const patch3 = generatePatchObject(3) await docWritethrough.patch(patch3) - expect(await db.get(documentId)).toEqual({ + expect(await db.tryGet(documentId)).toEqual({ _id: documentId, ...patch1, ...patch2, @@ -92,7 +96,7 @@ describe("docWritethrough", () => { await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining({ _id: documentId, ...patch1, @@ -117,7 +121,7 @@ describe("docWritethrough", () => { await waitForQueueCompletion() expect(date1).not.toEqual(date2) - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining({ createdAt: date1.toISOString(), updatedAt: date2.toISOString(), @@ -135,7 +139,7 @@ describe("docWritethrough", () => { await docWritethrough.patch(patch2) const keyToOverride = _.sample(Object.keys(patch1))! - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining({ [keyToOverride]: patch1[keyToOverride], }) @@ -150,7 +154,7 @@ describe("docWritethrough", () => { await docWritethrough.patch(patch3) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining({ ...patch1, ...patch2, @@ -180,14 +184,14 @@ describe("docWritethrough", () => { await secondDocWritethrough.patch(doc2Patch2) await waitForQueueCompletion() - expect(await db.get(docWritethrough.docId)).toEqual( + expect(await db.tryGet(docWritethrough.docId)).toEqual( expect.objectContaining({ ...doc1Patch, ...doc1Patch2, }) ) - expect(await db.get(secondDocWritethrough.docId)).toEqual( + expect(await db.tryGet(secondDocWritethrough.docId)).toEqual( expect.objectContaining({ ...doc2Patch, ...doc2Patch2, @@ -203,7 +207,7 @@ describe("docWritethrough", () => { await docWritethrough.patch(initialPatch) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining(initialPatch) ) @@ -214,10 +218,10 @@ describe("docWritethrough", () => { await docWritethrough.patch(extraPatch) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining(extraPatch) ) - expect(await db.get(documentId)).not.toEqual( + expect(await db.tryGet(documentId)).not.toEqual( expect.objectContaining(initialPatch) ) }) @@ -242,7 +246,7 @@ describe("docWritethrough", () => { expect(queueMessageSpy).toHaveBeenCalledTimes(5) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining(patches) ) @@ -250,7 +254,7 @@ describe("docWritethrough", () => { expect(queueMessageSpy).toHaveBeenCalledTimes(45) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining(patches) ) @@ -258,20 +262,18 @@ describe("docWritethrough", () => { expect(queueMessageSpy).toHaveBeenCalledTimes(55) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining(patches) ) }) }) - // This is not yet supported - // eslint-disable-next-line jest/no-disabled-tests - it.skip("patches will execute in order", async () => { + it("patches will execute in order", async () => { let incrementalValue = 0 const keyToOverride = generator.word() async function incrementalPatches(count: number) { for (let i = 0; i < count; i++) { - await docWritethrough.patch({ [keyToOverride]: incrementalValue++ }) + await docWritethrough.patch({ [keyToOverride]: ++incrementalValue }) } } @@ -279,13 +281,13 @@ describe("docWritethrough", () => { await incrementalPatches(5) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining({ [keyToOverride]: 5 }) ) await incrementalPatches(40) await waitForQueueCompletion() - expect(await db.get(documentId)).toEqual( + expect(await db.tryGet(documentId)).toEqual( expect.objectContaining({ [keyToOverride]: 45 }) ) }) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 1d8544828d..dd8d3daa37 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,5 +1,5 @@ import events from "events" -import { newid, timeout } from "../utils" +import { newid } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" interface JobMessage { @@ -184,16 +184,6 @@ class InMemoryQueue implements Partial { // do nothing return this as any } - - async waitForCompletion() { - do { - await timeout(50) - } while (this.hasRunningJobs()) - } - - hasRunningJobs() { - return this._addCount > this._runCount - } } export default InMemoryQueue diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index f633d0885e..f5d710f02d 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -15,7 +15,7 @@ const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs() // cleanup the queue every 60 seconds const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs() -let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] +let QUEUES: BullQueue.Queue[] = [] let cleanupInterval: NodeJS.Timeout async function cleanup() { @@ -45,11 +45,18 @@ export function createQueue( if (opts.jobOptions) { queueConfig.defaultJobOptions = opts.jobOptions } - let queue: any + let queue: BullQueue.Queue if (!env.isTest()) { queue = new BullQueue(jobQueue, queueConfig) + } else if ( + process.env.BULL_TEST_REDIS_PORT && + !isNaN(+process.env.BULL_TEST_REDIS_PORT) + ) { + queue = new BullQueue(jobQueue, { + redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT }, + }) } else { - queue = new InMemoryQueue(jobQueue, queueConfig) + queue = new InMemoryQueue(jobQueue, queueConfig) as any } addListeners(queue, jobQueue, opts?.removeStalledCb) QUEUES.push(queue) diff --git a/packages/backend-core/tests/core/utilities/index.ts b/packages/backend-core/tests/core/utilities/index.ts index 787d69be2c..c3d81784c8 100644 --- a/packages/backend-core/tests/core/utilities/index.ts +++ b/packages/backend-core/tests/core/utilities/index.ts @@ -4,3 +4,4 @@ export { generator } from "./structures" export * as testContainerUtils from "./testContainerUtils" export * as utils from "./utils" export * from "./jestUtils" +export * as queue from "./queue" diff --git a/packages/backend-core/tests/core/utilities/queue.ts b/packages/backend-core/tests/core/utilities/queue.ts new file mode 100644 index 0000000000..49dd33ca29 --- /dev/null +++ b/packages/backend-core/tests/core/utilities/queue.ts @@ -0,0 +1,9 @@ +import { Queue } from "bull" + +export async function processMessages(queue: Queue) { + do { + await queue.whenCurrentJobsFinished() + } while (await queue.count()) + + await queue.whenCurrentJobsFinished() +} diff --git a/packages/backend-core/tests/core/utilities/testContainerUtils.ts b/packages/backend-core/tests/core/utilities/testContainerUtils.ts index 1a25bb28f4..71d7fa32db 100644 --- a/packages/backend-core/tests/core/utilities/testContainerUtils.ts +++ b/packages/backend-core/tests/core/utilities/testContainerUtils.ts @@ -1,4 +1,6 @@ import { execSync } from "child_process" +import { cloneDeep } from "lodash" +import { GenericContainer, StartedTestContainer } from "testcontainers" const IPV4_PORT_REGEX = new RegExp(`0\\.0\\.0\\.0:(\\d+)->(\\d+)/tcp`, "g") @@ -106,3 +108,58 @@ export function setupEnv(...envs: any[]) { } } } + +export async function startContainer(container: GenericContainer) { + const imageName = (container as any).imageName.string as string + let key: string = imageName + if (imageName.includes("@sha256")) { + key = imageName.split("@")[0] + } + key = key.replace(/\//g, "-").replace(/:/g, "-") + + container = container + .withReuse() + .withLabels({ "com.budibase": "true" }) + .withName(`${key}_testcontainer`) + + let startedContainer: StartedTestContainer | undefined = undefined + let lastError = undefined + for (let i = 0; i < 10; i++) { + try { + // container.start() is not an idempotent operation, calling `start` + // modifies the internal state of a GenericContainer instance such that + // the hash it uses to determine reuse changes. We need to clone the + // container before calling start to ensure that we're using the same + // reuse hash every time. + const containerCopy = cloneDeep(container) + startedContainer = await containerCopy.start() + lastError = undefined + break + } catch (e: any) { + lastError = e + await new Promise(resolve => setTimeout(resolve, 1000)) + } + } + + if (!startedContainer) { + if (lastError) { + throw lastError + } + throw new Error(`failed to start container: ${imageName}`) + } + + const info = getContainerById(startedContainer.getId()) + if (!info) { + throw new Error("Container not found") + } + + // Some Docker runtimes, when you expose a port, will bind it to both + // 127.0.0.1 and ::1, so ipv4 and ipv6. The port spaces of ipv4 and ipv6 + // addresses are not shared, and testcontainers will sometimes give you back + // the ipv6 port. There's no way to know that this has happened, and if you + // try to then connect to `localhost:port` you may attempt to bind to the v4 + // address which could be unbound or even an entirely different container. For + // that reason, we don't use testcontainers' `getExposedPort` function, + // preferring instead our own method that guaranteed v4 ports. + return getExposedV4Ports(info) +} diff --git a/packages/backend-core/tests/core/utilities/utils/index.ts b/packages/backend-core/tests/core/utilities/utils/index.ts index 41a249c7e6..3d28189c53 100644 --- a/packages/backend-core/tests/core/utilities/utils/index.ts +++ b/packages/backend-core/tests/core/utilities/utils/index.ts @@ -1 +1,2 @@ export * as time from "./time" +export * as queue from "./queue" diff --git a/packages/backend-core/tests/core/utilities/utils/queue.ts b/packages/backend-core/tests/core/utilities/utils/queue.ts new file mode 100644 index 0000000000..3ad7d6b4b4 --- /dev/null +++ b/packages/backend-core/tests/core/utilities/utils/queue.ts @@ -0,0 +1,27 @@ +import { Queue } from "bull" +import { GenericContainer, Wait } from "testcontainers" +import { startContainer } from "../testContainerUtils" + +export async function useRealQueues() { + const ports = await startContainer( + new GenericContainer("redis") + .withExposedPorts(6379) + .withWaitStrategy( + Wait.forSuccessfulCommand(`redis-cli`).withStartupTimeout(10000) + ) + ) + + const port = ports.find(x => x.container === 6379)?.host + if (!port) { + throw new Error("Redis port not found") + } + process.env.BULL_TEST_REDIS_PORT = port.toString() +} + +export async function processMessages(queue: Queue) { + do { + await queue.whenCurrentJobsFinished() + } while (await queue.count()) + + await queue.whenCurrentJobsFinished() +} diff --git a/packages/pro b/packages/pro index 04bee88597..80770215c6 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit 04bee88597edb1edb88ed299d0597b587f0362ec +Subproject commit 80770215c6159e4d47f3529fd02e74bc8ad07543 diff --git a/packages/server/src/integrations/tests/utils/index.ts b/packages/server/src/integrations/tests/utils/index.ts index b6f8b5b92a..6313556df7 100644 --- a/packages/server/src/integrations/tests/utils/index.ts +++ b/packages/server/src/integrations/tests/utils/index.ts @@ -6,12 +6,12 @@ import * as mysql from "./mysql" import * as mssql from "./mssql" import * as mariadb from "./mariadb" import * as oracle from "./oracle" -import { GenericContainer, StartedTestContainer } from "testcontainers" import { testContainerUtils } from "@budibase/backend-core/tests" -import cloneDeep from "lodash/cloneDeep" export type DatasourceProvider = () => Promise +export const { startContainer } = testContainerUtils + export enum DatabaseName { POSTGRES = "postgres", MONGODB = "mongodb", @@ -71,58 +71,3 @@ export async function knexClient(ds: Datasource) { } } } - -export async function startContainer(container: GenericContainer) { - const imageName = (container as any).imageName.string as string - let key: string = imageName - if (imageName.includes("@sha256")) { - key = imageName.split("@")[0] - } - key = key.replaceAll("/", "-").replaceAll(":", "-") - - container = container - .withReuse() - .withLabels({ "com.budibase": "true" }) - .withName(`${key}_testcontainer`) - - let startedContainer: StartedTestContainer | undefined = undefined - let lastError = undefined - for (let i = 0; i < 10; i++) { - try { - // container.start() is not an idempotent operation, calling `start` - // modifies the internal state of a GenericContainer instance such that - // the hash it uses to determine reuse changes. We need to clone the - // container before calling start to ensure that we're using the same - // reuse hash every time. - const containerCopy = cloneDeep(container) - startedContainer = await containerCopy.start() - lastError = undefined - break - } catch (e: any) { - lastError = e - await new Promise(resolve => setTimeout(resolve, 1000)) - } - } - - if (!startedContainer) { - if (lastError) { - throw lastError - } - throw new Error(`failed to start container: ${imageName}`) - } - - const info = testContainerUtils.getContainerById(startedContainer.getId()) - if (!info) { - throw new Error("Container not found") - } - - // Some Docker runtimes, when you expose a port, will bind it to both - // 127.0.0.1 and ::1, so ipv4 and ipv6. The port spaces of ipv4 and ipv6 - // addresses are not shared, and testcontainers will sometimes give you back - // the ipv6 port. There's no way to know that this has happened, and if you - // try to then connect to `localhost:port` you may attempt to bind to the v4 - // address which could be unbound or even an entirely different container. For - // that reason, we don't use testcontainers' `getExposedPort` function, - // preferring instead our own method that guaranteed v4 ports. - return testContainerUtils.getExposedV4Ports(info) -} diff --git a/packages/server/src/tests/utilities/TestConfiguration.ts b/packages/server/src/tests/utilities/TestConfiguration.ts index 5ed60a59b6..abecf6df44 100644 --- a/packages/server/src/tests/utilities/TestConfiguration.ts +++ b/packages/server/src/tests/utilities/TestConfiguration.ts @@ -237,6 +237,7 @@ export default class TestConfiguration { if (!this) { return } + if (this.server) { this.server.close() } else { diff --git a/packages/worker/src/tests/TestConfiguration.ts b/packages/worker/src/tests/TestConfiguration.ts index c2cf005308..440d6dc776 100644 --- a/packages/worker/src/tests/TestConfiguration.ts +++ b/packages/worker/src/tests/TestConfiguration.ts @@ -12,7 +12,7 @@ dbConfig.init() import env from "../environment" import * as controllers from "./controllers" -const supertest = require("supertest") +import supertest from "supertest" import { Config } from "../constants" import {