Create a cron trigger test.
This commit is contained in:
parent
bd6d6534be
commit
d03fdc8bb1
|
@ -1,32 +1,44 @@
|
|||
import events from "events"
|
||||
import { newid } from "../utils"
|
||||
import { Queue, QueueOptions, JobOptions } from "./queue"
|
||||
import { helpers } from "@budibase/shared-core"
|
||||
import { Job, JobId, JobInformation } from "bull"
|
||||
|
||||
interface JobMessage {
|
||||
id: string
|
||||
timestamp: number
|
||||
queue: string
|
||||
data: any
|
||||
opts?: JobOptions
|
||||
function jobToJobInformation(job: Job): JobInformation {
|
||||
let cron = ""
|
||||
let every = -1
|
||||
let tz: string | undefined = undefined
|
||||
let endDate: number | undefined = undefined
|
||||
|
||||
const repeat = job.opts?.repeat
|
||||
if (repeat) {
|
||||
endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : Date.now()
|
||||
tz = repeat.tz
|
||||
if ("cron" in repeat) {
|
||||
cron = repeat.cron
|
||||
} else {
|
||||
every = repeat.every
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
id: job.id.toString(),
|
||||
name: "",
|
||||
key: job.id.toString(),
|
||||
tz,
|
||||
endDate,
|
||||
cron,
|
||||
every,
|
||||
next: 0,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||
* the state of the message, this object constructor implements the same schema of Bull jobs
|
||||
* for the sake of maintaining API consistency.
|
||||
* @param queue The name of the queue which the message will be carried on.
|
||||
* @param message The JSON message which will be passed back to the consumer.
|
||||
* @returns A new job which can now be put onto the queue, this is mostly an
|
||||
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
|
||||
*/
|
||||
function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
|
||||
return {
|
||||
id: newid(),
|
||||
timestamp: Date.now(),
|
||||
queue: queue,
|
||||
data: message,
|
||||
opts,
|
||||
}
|
||||
interface JobMessage<T = any> extends Partial<Job<T>> {
|
||||
id: string
|
||||
timestamp: number
|
||||
queue: Queue<T>
|
||||
data: any
|
||||
opts?: JobOptions
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -40,7 +52,7 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
_opts?: QueueOptions
|
||||
_messages: JobMessage[]
|
||||
_queuedJobIds: Set<string>
|
||||
_emitter: NodeJS.EventEmitter
|
||||
_emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }>
|
||||
_runCount: number
|
||||
_addCount: number
|
||||
|
||||
|
@ -70,34 +82,29 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
*/
|
||||
async process(concurrencyOrFunc: number | any, func?: any) {
|
||||
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
|
||||
this._emitter.on("message", async () => {
|
||||
if (this._messages.length <= 0) {
|
||||
return
|
||||
}
|
||||
let msg = this._messages.shift()
|
||||
|
||||
let resp = func(msg)
|
||||
this._emitter.on("message", async message => {
|
||||
let resp = func(message)
|
||||
|
||||
async function retryFunc(fnc: any) {
|
||||
try {
|
||||
await fnc
|
||||
} catch (e: any) {
|
||||
await new Promise<void>(r => setTimeout(() => r(), 50))
|
||||
|
||||
await retryFunc(func(msg))
|
||||
await helpers.wait(50)
|
||||
await retryFunc(func(message))
|
||||
}
|
||||
}
|
||||
|
||||
if (resp.then != null) {
|
||||
try {
|
||||
await retryFunc(resp)
|
||||
this._emitter.emit("completed", message as Job)
|
||||
} catch (e: any) {
|
||||
console.error(e)
|
||||
}
|
||||
}
|
||||
this._runCount++
|
||||
const jobId = msg?.opts?.jobId?.toString()
|
||||
if (jobId && msg?.opts?.removeOnComplete) {
|
||||
const jobId = message.opts?.jobId?.toString()
|
||||
if (jobId && message.opts?.removeOnComplete) {
|
||||
this._queuedJobIds.delete(jobId)
|
||||
}
|
||||
})
|
||||
|
@ -131,9 +138,16 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
}
|
||||
|
||||
const pushMessage = () => {
|
||||
this._messages.push(newJob(this._name, data, opts))
|
||||
const message: JobMessage = {
|
||||
id: newid(),
|
||||
timestamp: Date.now(),
|
||||
queue: this as unknown as Queue,
|
||||
data,
|
||||
opts,
|
||||
}
|
||||
this._messages.push(message)
|
||||
this._addCount++
|
||||
this._emitter.emit("message")
|
||||
this._emitter.emit("message", message)
|
||||
}
|
||||
|
||||
const delay = opts?.delay
|
||||
|
@ -159,13 +173,6 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
console.log(cronJobId)
|
||||
}
|
||||
|
||||
/**
|
||||
* Implemented for tests
|
||||
*/
|
||||
async getRepeatableJobs() {
|
||||
return []
|
||||
}
|
||||
|
||||
async removeJobs(_pattern: string) {
|
||||
// no-op
|
||||
}
|
||||
|
@ -177,13 +184,31 @@ class InMemoryQueue implements Partial<Queue> {
|
|||
return []
|
||||
}
|
||||
|
||||
async getJob() {
|
||||
async getJob(id: JobId) {
|
||||
for (const message of this._messages) {
|
||||
if (message.id === id) {
|
||||
return message as Job
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
on() {
|
||||
// do nothing
|
||||
return this as any
|
||||
on(event: string, callback: (...args: any[]) => void): Queue {
|
||||
// @ts-expect-error - this callback can be one of many types
|
||||
this._emitter.on(event, callback)
|
||||
return this as unknown as Queue
|
||||
}
|
||||
|
||||
async count() {
|
||||
return this._messages.length
|
||||
}
|
||||
|
||||
async getCompletedCount() {
|
||||
return this._runCount
|
||||
}
|
||||
|
||||
async getRepeatableJobs() {
|
||||
return this._messages.map(job => jobToJobInformation(job as Job))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
||||
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
||||
import { getQueue } from "../.."
|
||||
import { Job } from "bull"
|
||||
|
||||
describe("cron trigger", () => {
|
||||
const config = new TestConfiguration()
|
||||
|
@ -13,9 +14,15 @@ describe("cron trigger", () => {
|
|||
config.end()
|
||||
})
|
||||
|
||||
it("should run the webhook automation - checking for parameters", async () => {
|
||||
it("should queue a Bull cron job", async () => {
|
||||
const queue = getQueue()
|
||||
expect(await queue.count()).toEqual(0)
|
||||
expect(await queue.getCompletedCount()).toEqual(0)
|
||||
|
||||
const jobPromise = new Promise<Job>(resolve => {
|
||||
queue.on("completed", async job => {
|
||||
resolve(job)
|
||||
})
|
||||
})
|
||||
|
||||
await createAutomationBuilder({ config })
|
||||
.cron({ cron: "* * * * *" })
|
||||
|
@ -26,6 +33,13 @@ describe("cron trigger", () => {
|
|||
|
||||
await config.publish()
|
||||
|
||||
expect(await queue.count()).toEqual(1)
|
||||
expect(await queue.getCompletedCount()).toEqual(1)
|
||||
|
||||
const job = await jobPromise
|
||||
const repeat = job.opts?.repeat
|
||||
if (!repeat || !("cron" in repeat)) {
|
||||
throw new Error("Expected cron repeat")
|
||||
}
|
||||
expect(repeat.cron).toEqual("* * * * *")
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue