Merge branch 'master' into cheeks-lab-day-fields
This commit is contained in:
commit
1ca1f58272
|
@ -1,7 +1,12 @@
|
|||
import tk from "timekeeper"
|
||||
|
||||
import _ from "lodash"
|
||||
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
||||
import {
|
||||
DBTestConfiguration,
|
||||
generator,
|
||||
structures,
|
||||
utils,
|
||||
} from "../../../tests"
|
||||
import { getDB } from "../../db"
|
||||
|
||||
import {
|
||||
|
@ -10,15 +15,14 @@ import {
|
|||
init,
|
||||
} from "../docWritethrough"
|
||||
|
||||
import InMemoryQueue from "../../queue/inMemoryQueue"
|
||||
|
||||
const initialTime = Date.now()
|
||||
|
||||
async function waitForQueueCompletion() {
|
||||
const queue: InMemoryQueue = DocWritethroughProcessor.queue as never
|
||||
await queue.waitForCompletion()
|
||||
await utils.queue.processMessages(DocWritethroughProcessor.queue)
|
||||
}
|
||||
|
||||
beforeAll(() => utils.queue.useRealQueues())
|
||||
|
||||
describe("docWritethrough", () => {
|
||||
beforeAll(() => {
|
||||
init()
|
||||
|
@ -67,7 +71,7 @@ describe("docWritethrough", () => {
|
|||
const patch3 = generatePatchObject(3)
|
||||
await docWritethrough.patch(patch3)
|
||||
|
||||
expect(await db.get(documentId)).toEqual({
|
||||
expect(await db.tryGet(documentId)).toEqual({
|
||||
_id: documentId,
|
||||
...patch1,
|
||||
...patch2,
|
||||
|
@ -92,7 +96,7 @@ describe("docWritethrough", () => {
|
|||
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
_id: documentId,
|
||||
...patch1,
|
||||
|
@ -117,7 +121,7 @@ describe("docWritethrough", () => {
|
|||
await waitForQueueCompletion()
|
||||
|
||||
expect(date1).not.toEqual(date2)
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
createdAt: date1.toISOString(),
|
||||
updatedAt: date2.toISOString(),
|
||||
|
@ -135,7 +139,7 @@ describe("docWritethrough", () => {
|
|||
await docWritethrough.patch(patch2)
|
||||
|
||||
const keyToOverride = _.sample(Object.keys(patch1))!
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
[keyToOverride]: patch1[keyToOverride],
|
||||
})
|
||||
|
@ -150,7 +154,7 @@ describe("docWritethrough", () => {
|
|||
await docWritethrough.patch(patch3)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
...patch1,
|
||||
...patch2,
|
||||
|
@ -180,14 +184,14 @@ describe("docWritethrough", () => {
|
|||
await secondDocWritethrough.patch(doc2Patch2)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(docWritethrough.docId)).toEqual(
|
||||
expect(await db.tryGet(docWritethrough.docId)).toEqual(
|
||||
expect.objectContaining({
|
||||
...doc1Patch,
|
||||
...doc1Patch2,
|
||||
})
|
||||
)
|
||||
|
||||
expect(await db.get(secondDocWritethrough.docId)).toEqual(
|
||||
expect(await db.tryGet(secondDocWritethrough.docId)).toEqual(
|
||||
expect.objectContaining({
|
||||
...doc2Patch,
|
||||
...doc2Patch2,
|
||||
|
@ -203,7 +207,7 @@ describe("docWritethrough", () => {
|
|||
await docWritethrough.patch(initialPatch)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining(initialPatch)
|
||||
)
|
||||
|
||||
|
@ -214,10 +218,10 @@ describe("docWritethrough", () => {
|
|||
await docWritethrough.patch(extraPatch)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining(extraPatch)
|
||||
)
|
||||
expect(await db.get(documentId)).not.toEqual(
|
||||
expect(await db.tryGet(documentId)).not.toEqual(
|
||||
expect.objectContaining(initialPatch)
|
||||
)
|
||||
})
|
||||
|
@ -242,7 +246,7 @@ describe("docWritethrough", () => {
|
|||
expect(queueMessageSpy).toHaveBeenCalledTimes(5)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining(patches)
|
||||
)
|
||||
|
||||
|
@ -250,7 +254,7 @@ describe("docWritethrough", () => {
|
|||
expect(queueMessageSpy).toHaveBeenCalledTimes(45)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining(patches)
|
||||
)
|
||||
|
||||
|
@ -258,20 +262,18 @@ describe("docWritethrough", () => {
|
|||
expect(queueMessageSpy).toHaveBeenCalledTimes(55)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining(patches)
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
// This is not yet supported
|
||||
// eslint-disable-next-line jest/no-disabled-tests
|
||||
it.skip("patches will execute in order", async () => {
|
||||
it("patches will execute in order", async () => {
|
||||
let incrementalValue = 0
|
||||
const keyToOverride = generator.word()
|
||||
async function incrementalPatches(count: number) {
|
||||
for (let i = 0; i < count; i++) {
|
||||
await docWritethrough.patch({ [keyToOverride]: incrementalValue++ })
|
||||
await docWritethrough.patch({ [keyToOverride]: ++incrementalValue })
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -279,13 +281,13 @@ describe("docWritethrough", () => {
|
|||
await incrementalPatches(5)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining({ [keyToOverride]: 5 })
|
||||
)
|
||||
|
||||
await incrementalPatches(40)
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect(await db.tryGet(documentId)).toEqual(
|
||||
expect.objectContaining({ [keyToOverride]: 45 })
|
||||
)
|
||||
})
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
import events from "events"
|
||||
import { newid, timeout } from "../utils"
|
||||
import { newid } from "../utils"
|
||||
import { Queue, QueueOptions, JobOptions } from "./queue"
|
||||
|
||||
interface JobMessage {
|
||||
|
@ -141,7 +141,7 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
} else {
|
||||
pushMessage()
|
||||
}
|
||||
return {} as any
|
||||
return { id: jobId } as any
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -184,16 +184,6 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
// do nothing
|
||||
return this as any
|
||||
}
|
||||
|
||||
async waitForCompletion() {
|
||||
do {
|
||||
await timeout(50)
|
||||
} while (this.hasRunningJobs())
|
||||
}
|
||||
|
||||
hasRunningJobs() {
|
||||
return this._addCount > this._runCount
|
||||
}
|
||||
}
|
||||
|
||||
export default InMemoryQueue
|
||||
|
|
|
@ -15,7 +15,7 @@ const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
|
|||
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
|
||||
// cleanup the queue every 60 seconds
|
||||
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
|
||||
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
|
||||
let QUEUES: BullQueue.Queue[] = []
|
||||
let cleanupInterval: NodeJS.Timeout
|
||||
|
||||
async function cleanup() {
|
||||
|
@ -45,11 +45,18 @@ export function createQueue<T>(
|
|||
if (opts.jobOptions) {
|
||||
queueConfig.defaultJobOptions = opts.jobOptions
|
||||
}
|
||||
let queue: any
|
||||
let queue: BullQueue.Queue<T>
|
||||
if (!env.isTest()) {
|
||||
queue = new BullQueue(jobQueue, queueConfig)
|
||||
} else if (
|
||||
process.env.BULL_TEST_REDIS_PORT &&
|
||||
!isNaN(+process.env.BULL_TEST_REDIS_PORT)
|
||||
) {
|
||||
queue = new BullQueue(jobQueue, {
|
||||
redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT },
|
||||
})
|
||||
} else {
|
||||
queue = new InMemoryQueue(jobQueue, queueConfig)
|
||||
queue = new InMemoryQueue(jobQueue, queueConfig) as any
|
||||
}
|
||||
addListeners(queue, jobQueue, opts?.removeStalledCb)
|
||||
QUEUES.push(queue)
|
||||
|
|
|
@ -4,3 +4,4 @@ export { generator } from "./structures"
|
|||
export * as testContainerUtils from "./testContainerUtils"
|
||||
export * as utils from "./utils"
|
||||
export * from "./jestUtils"
|
||||
export * as queue from "./queue"
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
import { Queue } from "bull"
|
||||
|
||||
export async function processMessages(queue: Queue) {
|
||||
do {
|
||||
await queue.whenCurrentJobsFinished()
|
||||
} while (await queue.count())
|
||||
|
||||
await queue.whenCurrentJobsFinished()
|
||||
}
|
|
@ -1,4 +1,6 @@
|
|||
import { execSync } from "child_process"
|
||||
import { cloneDeep } from "lodash"
|
||||
import { GenericContainer, StartedTestContainer } from "testcontainers"
|
||||
|
||||
const IPV4_PORT_REGEX = new RegExp(`0\\.0\\.0\\.0:(\\d+)->(\\d+)/tcp`, "g")
|
||||
|
||||
|
@ -106,3 +108,58 @@ export function setupEnv(...envs: any[]) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function startContainer(container: GenericContainer) {
|
||||
const imageName = (container as any).imageName.string as string
|
||||
let key: string = imageName
|
||||
if (imageName.includes("@sha256")) {
|
||||
key = imageName.split("@")[0]
|
||||
}
|
||||
key = key.replace(/\//g, "-").replace(/:/g, "-")
|
||||
|
||||
container = container
|
||||
.withReuse()
|
||||
.withLabels({ "com.budibase": "true" })
|
||||
.withName(`${key}_testcontainer`)
|
||||
|
||||
let startedContainer: StartedTestContainer | undefined = undefined
|
||||
let lastError = undefined
|
||||
for (let i = 0; i < 10; i++) {
|
||||
try {
|
||||
// container.start() is not an idempotent operation, calling `start`
|
||||
// modifies the internal state of a GenericContainer instance such that
|
||||
// the hash it uses to determine reuse changes. We need to clone the
|
||||
// container before calling start to ensure that we're using the same
|
||||
// reuse hash every time.
|
||||
const containerCopy = cloneDeep(container)
|
||||
startedContainer = await containerCopy.start()
|
||||
lastError = undefined
|
||||
break
|
||||
} catch (e: any) {
|
||||
lastError = e
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
}
|
||||
|
||||
if (!startedContainer) {
|
||||
if (lastError) {
|
||||
throw lastError
|
||||
}
|
||||
throw new Error(`failed to start container: ${imageName}`)
|
||||
}
|
||||
|
||||
const info = getContainerById(startedContainer.getId())
|
||||
if (!info) {
|
||||
throw new Error("Container not found")
|
||||
}
|
||||
|
||||
// Some Docker runtimes, when you expose a port, will bind it to both
|
||||
// 127.0.0.1 and ::1, so ipv4 and ipv6. The port spaces of ipv4 and ipv6
|
||||
// addresses are not shared, and testcontainers will sometimes give you back
|
||||
// the ipv6 port. There's no way to know that this has happened, and if you
|
||||
// try to then connect to `localhost:port` you may attempt to bind to the v4
|
||||
// address which could be unbound or even an entirely different container. For
|
||||
// that reason, we don't use testcontainers' `getExposedPort` function,
|
||||
// preferring instead our own method that guaranteed v4 ports.
|
||||
return getExposedV4Ports(info)
|
||||
}
|
||||
|
|
|
@ -1 +1,2 @@
|
|||
export * as time from "./time"
|
||||
export * as queue from "./queue"
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
import { Queue } from "bull"
|
||||
import { GenericContainer, Wait } from "testcontainers"
|
||||
import { startContainer } from "../testContainerUtils"
|
||||
|
||||
export async function useRealQueues() {
|
||||
const ports = await startContainer(
|
||||
new GenericContainer("redis")
|
||||
.withExposedPorts(6379)
|
||||
.withWaitStrategy(
|
||||
Wait.forSuccessfulCommand(`redis-cli`).withStartupTimeout(10000)
|
||||
)
|
||||
)
|
||||
|
||||
const port = ports.find(x => x.container === 6379)?.host
|
||||
if (!port) {
|
||||
throw new Error("Redis port not found")
|
||||
}
|
||||
process.env.BULL_TEST_REDIS_PORT = port.toString()
|
||||
}
|
||||
|
||||
export async function processMessages(queue: Queue) {
|
||||
do {
|
||||
await queue.whenCurrentJobsFinished()
|
||||
} while (await queue.count())
|
||||
|
||||
await queue.whenCurrentJobsFinished()
|
||||
}
|
|
@ -1 +1 @@
|
|||
Subproject commit 04bee88597edb1edb88ed299d0597b587f0362ec
|
||||
Subproject commit 80770215c6159e4d47f3529fd02e74bc8ad07543
|
|
@ -0,0 +1,49 @@
|
|||
import tk from "timekeeper"
|
||||
import "../../environment"
|
||||
import * as automations from "../index"
|
||||
import * as setup from "./utilities"
|
||||
import { basicCronAutomation } from "../../tests/utilities/structures"
|
||||
|
||||
const initialTime = Date.now()
|
||||
tk.freeze(initialTime)
|
||||
|
||||
const oneMinuteInMs = 60 * 1000
|
||||
|
||||
describe("cron automations", () => {
|
||||
let config = setup.getConfig()
|
||||
|
||||
beforeAll(async () => {
|
||||
await automations.init()
|
||||
await config.init()
|
||||
})
|
||||
|
||||
afterAll(async () => {
|
||||
await automations.shutdown()
|
||||
setup.afterAll()
|
||||
})
|
||||
|
||||
beforeEach(() => {
|
||||
tk.freeze(initialTime)
|
||||
})
|
||||
|
||||
async function travel(ms: number) {
|
||||
tk.travel(Date.now() + ms)
|
||||
}
|
||||
|
||||
it("should initialise the automation timestamp", async () => {
|
||||
const automation = basicCronAutomation(config.appId!, "* * * * *")
|
||||
await config.api.automation.post(automation)
|
||||
await travel(oneMinuteInMs)
|
||||
await config.publish()
|
||||
|
||||
const automationLogs = await config.getAutomationLogs()
|
||||
expect(automationLogs.data).toHaveLength(1)
|
||||
expect(automationLogs.data).toEqual([
|
||||
expect.objectContaining({
|
||||
trigger: expect.objectContaining({
|
||||
outputs: { timestamp: initialTime + oneMinuteInMs },
|
||||
}),
|
||||
}),
|
||||
])
|
||||
})
|
||||
})
|
|
@ -70,6 +70,10 @@ export async function processEvent(job: AutomationJob) {
|
|||
|
||||
const task = async () => {
|
||||
try {
|
||||
if (isCronTrigger(job.data.automation)) {
|
||||
// Requires the timestamp at run time
|
||||
job.data.event.timestamp = Date.now()
|
||||
}
|
||||
// need to actually await these so that an error can be captured properly
|
||||
console.log("automation running", ...loggingArgs(job))
|
||||
|
||||
|
@ -210,15 +214,15 @@ export async function enableCronTrigger(appId: any, automation: Automation) {
|
|||
}
|
||||
// make a job id rather than letting Bull decide, makes it easier to handle on way out
|
||||
const jobId = `${appId}_cron_${utils.newid()}`
|
||||
const job: any = await automationQueue.add(
|
||||
const job = await automationQueue.add(
|
||||
{
|
||||
automation,
|
||||
event: { appId, timestamp: Date.now() },
|
||||
event: { appId },
|
||||
},
|
||||
{ repeat: { cron: cronExp }, jobId }
|
||||
)
|
||||
// Assign cron job ID from bull so we can remove it later if the cron trigger is removed
|
||||
trigger.cronJobId = job.id
|
||||
trigger.cronJobId = job.id.toString()
|
||||
// can't use getAppDB here as this is likely to be called from dev app,
|
||||
// but this call could be for dev app or prod app, need to just use what
|
||||
// was passed in
|
||||
|
|
|
@ -6,12 +6,12 @@ import * as mysql from "./mysql"
|
|||
import * as mssql from "./mssql"
|
||||
import * as mariadb from "./mariadb"
|
||||
import * as oracle from "./oracle"
|
||||
import { GenericContainer, StartedTestContainer } from "testcontainers"
|
||||
import { testContainerUtils } from "@budibase/backend-core/tests"
|
||||
import cloneDeep from "lodash/cloneDeep"
|
||||
|
||||
export type DatasourceProvider = () => Promise<Datasource>
|
||||
|
||||
export const { startContainer } = testContainerUtils
|
||||
|
||||
export enum DatabaseName {
|
||||
POSTGRES = "postgres",
|
||||
MONGODB = "mongodb",
|
||||
|
@ -71,58 +71,3 @@ export async function knexClient(ds: Datasource) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
export async function startContainer(container: GenericContainer) {
|
||||
const imageName = (container as any).imageName.string as string
|
||||
let key: string = imageName
|
||||
if (imageName.includes("@sha256")) {
|
||||
key = imageName.split("@")[0]
|
||||
}
|
||||
key = key.replaceAll("/", "-").replaceAll(":", "-")
|
||||
|
||||
container = container
|
||||
.withReuse()
|
||||
.withLabels({ "com.budibase": "true" })
|
||||
.withName(`${key}_testcontainer`)
|
||||
|
||||
let startedContainer: StartedTestContainer | undefined = undefined
|
||||
let lastError = undefined
|
||||
for (let i = 0; i < 10; i++) {
|
||||
try {
|
||||
// container.start() is not an idempotent operation, calling `start`
|
||||
// modifies the internal state of a GenericContainer instance such that
|
||||
// the hash it uses to determine reuse changes. We need to clone the
|
||||
// container before calling start to ensure that we're using the same
|
||||
// reuse hash every time.
|
||||
const containerCopy = cloneDeep(container)
|
||||
startedContainer = await containerCopy.start()
|
||||
lastError = undefined
|
||||
break
|
||||
} catch (e: any) {
|
||||
lastError = e
|
||||
await new Promise(resolve => setTimeout(resolve, 1000))
|
||||
}
|
||||
}
|
||||
|
||||
if (!startedContainer) {
|
||||
if (lastError) {
|
||||
throw lastError
|
||||
}
|
||||
throw new Error(`failed to start container: ${imageName}`)
|
||||
}
|
||||
|
||||
const info = testContainerUtils.getContainerById(startedContainer.getId())
|
||||
if (!info) {
|
||||
throw new Error("Container not found")
|
||||
}
|
||||
|
||||
// Some Docker runtimes, when you expose a port, will bind it to both
|
||||
// 127.0.0.1 and ::1, so ipv4 and ipv6. The port spaces of ipv4 and ipv6
|
||||
// addresses are not shared, and testcontainers will sometimes give you back
|
||||
// the ipv6 port. There's no way to know that this has happened, and if you
|
||||
// try to then connect to `localhost:port` you may attempt to bind to the v4
|
||||
// address which could be unbound or even an entirely different container. For
|
||||
// that reason, we don't use testcontainers' `getExposedPort` function,
|
||||
// preferring instead our own method that guaranteed v4 ports.
|
||||
return testContainerUtils.getExposedV4Ports(info)
|
||||
}
|
||||
|
|
|
@ -237,6 +237,7 @@ export default class TestConfiguration {
|
|||
if (!this) {
|
||||
return
|
||||
}
|
||||
|
||||
if (this.server) {
|
||||
this.server.close()
|
||||
} else {
|
||||
|
|
|
@ -245,6 +245,38 @@ export function basicAutomation(appId?: string): Automation {
|
|||
}
|
||||
}
|
||||
|
||||
export function basicCronAutomation(appId: string, cron: string): Automation {
|
||||
const automation: Automation = {
|
||||
name: `Automation ${generator.guid()}`,
|
||||
definition: {
|
||||
trigger: {
|
||||
stepId: AutomationTriggerStepId.CRON,
|
||||
name: "test",
|
||||
tagline: "test",
|
||||
icon: "test",
|
||||
description: "test",
|
||||
type: AutomationStepType.TRIGGER,
|
||||
id: "test",
|
||||
inputs: {
|
||||
cron,
|
||||
},
|
||||
schema: {
|
||||
inputs: {
|
||||
properties: {},
|
||||
},
|
||||
outputs: {
|
||||
properties: {},
|
||||
},
|
||||
},
|
||||
},
|
||||
steps: [],
|
||||
},
|
||||
type: "automation",
|
||||
appId,
|
||||
}
|
||||
return automation
|
||||
}
|
||||
|
||||
export function serverLogAutomation(appId?: string): Automation {
|
||||
return {
|
||||
name: "My Automation",
|
||||
|
|
|
@ -14,6 +14,7 @@ export interface AutomationDataEvent {
|
|||
row?: Row
|
||||
oldRow?: Row
|
||||
user?: UserBindings
|
||||
timestamp?: number
|
||||
}
|
||||
|
||||
export interface AutomationData {
|
||||
|
|
|
@ -12,7 +12,7 @@ dbConfig.init()
|
|||
import env from "../environment"
|
||||
import * as controllers from "./controllers"
|
||||
|
||||
const supertest = require("supertest")
|
||||
import supertest from "supertest"
|
||||
|
||||
import { Config } from "../constants"
|
||||
import {
|
||||
|
|
Loading…
Reference in New Issue