Merge pull request #14984 from Budibase/test/use-real-bullqueues

Use real Bull queues for testing
This commit is contained in:
Adria Navarro 2024-11-08 10:18:03 +01:00 committed by GitHub
commit f97b10707d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 137 additions and 97 deletions

View File

@ -1,7 +1,12 @@
import tk from "timekeeper" import tk from "timekeeper"
import _ from "lodash" import _ from "lodash"
import { DBTestConfiguration, generator, structures } from "../../../tests" import {
DBTestConfiguration,
generator,
structures,
utils,
} from "../../../tests"
import { getDB } from "../../db" import { getDB } from "../../db"
import { import {
@ -10,15 +15,14 @@ import {
init, init,
} from "../docWritethrough" } from "../docWritethrough"
import InMemoryQueue from "../../queue/inMemoryQueue"
const initialTime = Date.now() const initialTime = Date.now()
async function waitForQueueCompletion() { async function waitForQueueCompletion() {
const queue: InMemoryQueue = DocWritethroughProcessor.queue as never await utils.queue.processMessages(DocWritethroughProcessor.queue)
await queue.waitForCompletion()
} }
beforeAll(() => utils.queue.useRealQueues())
describe("docWritethrough", () => { describe("docWritethrough", () => {
beforeAll(() => { beforeAll(() => {
init() init()
@ -67,7 +71,7 @@ describe("docWritethrough", () => {
const patch3 = generatePatchObject(3) const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3) await docWritethrough.patch(patch3)
expect(await db.get(documentId)).toEqual({ expect(await db.tryGet(documentId)).toEqual({
_id: documentId, _id: documentId,
...patch1, ...patch1,
...patch2, ...patch2,
@ -92,7 +96,7 @@ describe("docWritethrough", () => {
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
_id: documentId, _id: documentId,
...patch1, ...patch1,
@ -117,7 +121,7 @@ describe("docWritethrough", () => {
await waitForQueueCompletion() await waitForQueueCompletion()
expect(date1).not.toEqual(date2) expect(date1).not.toEqual(date2)
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
createdAt: date1.toISOString(), createdAt: date1.toISOString(),
updatedAt: date2.toISOString(), updatedAt: date2.toISOString(),
@ -135,7 +139,7 @@ describe("docWritethrough", () => {
await docWritethrough.patch(patch2) await docWritethrough.patch(patch2)
const keyToOverride = _.sample(Object.keys(patch1))! const keyToOverride = _.sample(Object.keys(patch1))!
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
[keyToOverride]: patch1[keyToOverride], [keyToOverride]: patch1[keyToOverride],
}) })
@ -150,7 +154,7 @@ describe("docWritethrough", () => {
await docWritethrough.patch(patch3) await docWritethrough.patch(patch3)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
...patch1, ...patch1,
...patch2, ...patch2,
@ -180,14 +184,14 @@ describe("docWritethrough", () => {
await secondDocWritethrough.patch(doc2Patch2) await secondDocWritethrough.patch(doc2Patch2)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(docWritethrough.docId)).toEqual( expect(await db.tryGet(docWritethrough.docId)).toEqual(
expect.objectContaining({ expect.objectContaining({
...doc1Patch, ...doc1Patch,
...doc1Patch2, ...doc1Patch2,
}) })
) )
expect(await db.get(secondDocWritethrough.docId)).toEqual( expect(await db.tryGet(secondDocWritethrough.docId)).toEqual(
expect.objectContaining({ expect.objectContaining({
...doc2Patch, ...doc2Patch,
...doc2Patch2, ...doc2Patch2,
@ -203,7 +207,7 @@ describe("docWritethrough", () => {
await docWritethrough.patch(initialPatch) await docWritethrough.patch(initialPatch)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining(initialPatch) expect.objectContaining(initialPatch)
) )
@ -214,10 +218,10 @@ describe("docWritethrough", () => {
await docWritethrough.patch(extraPatch) await docWritethrough.patch(extraPatch)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining(extraPatch) expect.objectContaining(extraPatch)
) )
expect(await db.get(documentId)).not.toEqual( expect(await db.tryGet(documentId)).not.toEqual(
expect.objectContaining(initialPatch) expect.objectContaining(initialPatch)
) )
}) })
@ -242,7 +246,7 @@ describe("docWritethrough", () => {
expect(queueMessageSpy).toHaveBeenCalledTimes(5) expect(queueMessageSpy).toHaveBeenCalledTimes(5)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining(patches) expect.objectContaining(patches)
) )
@ -250,7 +254,7 @@ describe("docWritethrough", () => {
expect(queueMessageSpy).toHaveBeenCalledTimes(45) expect(queueMessageSpy).toHaveBeenCalledTimes(45)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining(patches) expect.objectContaining(patches)
) )
@ -258,20 +262,18 @@ describe("docWritethrough", () => {
expect(queueMessageSpy).toHaveBeenCalledTimes(55) expect(queueMessageSpy).toHaveBeenCalledTimes(55)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining(patches) expect.objectContaining(patches)
) )
}) })
}) })
// This is not yet supported it("patches will execute in order", async () => {
// eslint-disable-next-line jest/no-disabled-tests
it.skip("patches will execute in order", async () => {
let incrementalValue = 0 let incrementalValue = 0
const keyToOverride = generator.word() const keyToOverride = generator.word()
async function incrementalPatches(count: number) { async function incrementalPatches(count: number) {
for (let i = 0; i < count; i++) { for (let i = 0; i < count; i++) {
await docWritethrough.patch({ [keyToOverride]: incrementalValue++ }) await docWritethrough.patch({ [keyToOverride]: ++incrementalValue })
} }
} }
@ -279,13 +281,13 @@ describe("docWritethrough", () => {
await incrementalPatches(5) await incrementalPatches(5)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining({ [keyToOverride]: 5 }) expect.objectContaining({ [keyToOverride]: 5 })
) )
await incrementalPatches(40) await incrementalPatches(40)
await waitForQueueCompletion() await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual( expect(await db.tryGet(documentId)).toEqual(
expect.objectContaining({ [keyToOverride]: 45 }) expect.objectContaining({ [keyToOverride]: 45 })
) )
}) })

View File

@ -1,5 +1,5 @@
import events from "events" import events from "events"
import { newid, timeout } from "../utils" import { newid } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue" import { Queue, QueueOptions, JobOptions } from "./queue"
interface JobMessage { interface JobMessage {
@ -184,16 +184,6 @@ class InMemoryQueue implements Partial<Queue> {
// do nothing // do nothing
return this as any return this as any
} }
async waitForCompletion() {
do {
await timeout(50)
} while (this.hasRunningJobs())
}
hasRunningJobs() {
return this._addCount > this._runCount
}
} }
export default InMemoryQueue export default InMemoryQueue

View File

@ -15,7 +15,7 @@ const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs() const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
// cleanup the queue every 60 seconds // cleanup the queue every 60 seconds
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs() const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let QUEUES: BullQueue.Queue[] = []
let cleanupInterval: NodeJS.Timeout let cleanupInterval: NodeJS.Timeout
async function cleanup() { async function cleanup() {
@ -45,11 +45,18 @@ export function createQueue<T>(
if (opts.jobOptions) { if (opts.jobOptions) {
queueConfig.defaultJobOptions = opts.jobOptions queueConfig.defaultJobOptions = opts.jobOptions
} }
let queue: any let queue: BullQueue.Queue<T>
if (!env.isTest()) { if (!env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig) queue = new BullQueue(jobQueue, queueConfig)
} else if (
process.env.BULL_TEST_REDIS_PORT &&
!isNaN(+process.env.BULL_TEST_REDIS_PORT)
) {
queue = new BullQueue(jobQueue, {
redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT },
})
} else { } else {
queue = new InMemoryQueue(jobQueue, queueConfig) queue = new InMemoryQueue(jobQueue, queueConfig) as any
} }
addListeners(queue, jobQueue, opts?.removeStalledCb) addListeners(queue, jobQueue, opts?.removeStalledCb)
QUEUES.push(queue) QUEUES.push(queue)

View File

@ -4,3 +4,4 @@ export { generator } from "./structures"
export * as testContainerUtils from "./testContainerUtils" export * as testContainerUtils from "./testContainerUtils"
export * as utils from "./utils" export * as utils from "./utils"
export * from "./jestUtils" export * from "./jestUtils"
export * as queue from "./queue"

View File

@ -0,0 +1,9 @@
import { Queue } from "bull"
export async function processMessages(queue: Queue) {
do {
await queue.whenCurrentJobsFinished()
} while (await queue.count())
await queue.whenCurrentJobsFinished()
}

View File

@ -1,4 +1,6 @@
import { execSync } from "child_process" import { execSync } from "child_process"
import { cloneDeep } from "lodash"
import { GenericContainer, StartedTestContainer } from "testcontainers"
const IPV4_PORT_REGEX = new RegExp(`0\\.0\\.0\\.0:(\\d+)->(\\d+)/tcp`, "g") const IPV4_PORT_REGEX = new RegExp(`0\\.0\\.0\\.0:(\\d+)->(\\d+)/tcp`, "g")
@ -106,3 +108,58 @@ export function setupEnv(...envs: any[]) {
} }
} }
} }
export async function startContainer(container: GenericContainer) {
const imageName = (container as any).imageName.string as string
let key: string = imageName
if (imageName.includes("@sha256")) {
key = imageName.split("@")[0]
}
key = key.replace(/\//g, "-").replace(/:/g, "-")
container = container
.withReuse()
.withLabels({ "com.budibase": "true" })
.withName(`${key}_testcontainer`)
let startedContainer: StartedTestContainer | undefined = undefined
let lastError = undefined
for (let i = 0; i < 10; i++) {
try {
// container.start() is not an idempotent operation, calling `start`
// modifies the internal state of a GenericContainer instance such that
// the hash it uses to determine reuse changes. We need to clone the
// container before calling start to ensure that we're using the same
// reuse hash every time.
const containerCopy = cloneDeep(container)
startedContainer = await containerCopy.start()
lastError = undefined
break
} catch (e: any) {
lastError = e
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
if (!startedContainer) {
if (lastError) {
throw lastError
}
throw new Error(`failed to start container: ${imageName}`)
}
const info = getContainerById(startedContainer.getId())
if (!info) {
throw new Error("Container not found")
}
// Some Docker runtimes, when you expose a port, will bind it to both
// 127.0.0.1 and ::1, so ipv4 and ipv6. The port spaces of ipv4 and ipv6
// addresses are not shared, and testcontainers will sometimes give you back
// the ipv6 port. There's no way to know that this has happened, and if you
// try to then connect to `localhost:port` you may attempt to bind to the v4
// address which could be unbound or even an entirely different container. For
// that reason, we don't use testcontainers' `getExposedPort` function,
// preferring instead our own method that guaranteed v4 ports.
return getExposedV4Ports(info)
}

View File

@ -1 +1,2 @@
export * as time from "./time" export * as time from "./time"
export * as queue from "./queue"

View File

@ -0,0 +1,27 @@
import { Queue } from "bull"
import { GenericContainer, Wait } from "testcontainers"
import { startContainer } from "../testContainerUtils"
export async function useRealQueues() {
const ports = await startContainer(
new GenericContainer("redis")
.withExposedPorts(6379)
.withWaitStrategy(
Wait.forSuccessfulCommand(`redis-cli`).withStartupTimeout(10000)
)
)
const port = ports.find(x => x.container === 6379)?.host
if (!port) {
throw new Error("Redis port not found")
}
process.env.BULL_TEST_REDIS_PORT = port.toString()
}
export async function processMessages(queue: Queue) {
do {
await queue.whenCurrentJobsFinished()
} while (await queue.count())
await queue.whenCurrentJobsFinished()
}

@ -1 +1 @@
Subproject commit 04bee88597edb1edb88ed299d0597b587f0362ec Subproject commit 80770215c6159e4d47f3529fd02e74bc8ad07543

View File

@ -6,12 +6,12 @@ import * as mysql from "./mysql"
import * as mssql from "./mssql" import * as mssql from "./mssql"
import * as mariadb from "./mariadb" import * as mariadb from "./mariadb"
import * as oracle from "./oracle" import * as oracle from "./oracle"
import { GenericContainer, StartedTestContainer } from "testcontainers"
import { testContainerUtils } from "@budibase/backend-core/tests" import { testContainerUtils } from "@budibase/backend-core/tests"
import cloneDeep from "lodash/cloneDeep"
export type DatasourceProvider = () => Promise<Datasource> export type DatasourceProvider = () => Promise<Datasource>
export const { startContainer } = testContainerUtils
export enum DatabaseName { export enum DatabaseName {
POSTGRES = "postgres", POSTGRES = "postgres",
MONGODB = "mongodb", MONGODB = "mongodb",
@ -71,58 +71,3 @@ export async function knexClient(ds: Datasource) {
} }
} }
} }
export async function startContainer(container: GenericContainer) {
const imageName = (container as any).imageName.string as string
let key: string = imageName
if (imageName.includes("@sha256")) {
key = imageName.split("@")[0]
}
key = key.replaceAll("/", "-").replaceAll(":", "-")
container = container
.withReuse()
.withLabels({ "com.budibase": "true" })
.withName(`${key}_testcontainer`)
let startedContainer: StartedTestContainer | undefined = undefined
let lastError = undefined
for (let i = 0; i < 10; i++) {
try {
// container.start() is not an idempotent operation, calling `start`
// modifies the internal state of a GenericContainer instance such that
// the hash it uses to determine reuse changes. We need to clone the
// container before calling start to ensure that we're using the same
// reuse hash every time.
const containerCopy = cloneDeep(container)
startedContainer = await containerCopy.start()
lastError = undefined
break
} catch (e: any) {
lastError = e
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
if (!startedContainer) {
if (lastError) {
throw lastError
}
throw new Error(`failed to start container: ${imageName}`)
}
const info = testContainerUtils.getContainerById(startedContainer.getId())
if (!info) {
throw new Error("Container not found")
}
// Some Docker runtimes, when you expose a port, will bind it to both
// 127.0.0.1 and ::1, so ipv4 and ipv6. The port spaces of ipv4 and ipv6
// addresses are not shared, and testcontainers will sometimes give you back
// the ipv6 port. There's no way to know that this has happened, and if you
// try to then connect to `localhost:port` you may attempt to bind to the v4
// address which could be unbound or even an entirely different container. For
// that reason, we don't use testcontainers' `getExposedPort` function,
// preferring instead our own method that guaranteed v4 ports.
return testContainerUtils.getExposedV4Ports(info)
}

View File

@ -237,6 +237,7 @@ export default class TestConfiguration {
if (!this) { if (!this) {
return return
} }
if (this.server) { if (this.server) {
this.server.close() this.server.close()
} else { } else {

View File

@ -12,7 +12,7 @@ dbConfig.init()
import env from "../environment" import env from "../environment"
import * as controllers from "./controllers" import * as controllers from "./controllers"
const supertest = require("supertest") import supertest from "supertest"
import { Config } from "../constants" import { Config } from "../constants"
import { import {