Merge pull request #15488 from Budibase/automation-tests-3
Create an automation cron trigger test.
This commit is contained in:
commit
25e2b380a6
|
@ -1,45 +1,58 @@
|
||||||
import events from "events"
|
import events from "events"
|
||||||
import { newid } from "../utils"
|
import { newid } from "../utils"
|
||||||
import { Queue, QueueOptions, JobOptions } from "./queue"
|
import { Queue, QueueOptions, JobOptions } from "./queue"
|
||||||
|
import { helpers } from "@budibase/shared-core"
|
||||||
|
import { Job, JobId, JobInformation } from "bull"
|
||||||
|
|
||||||
interface JobMessage {
|
function jobToJobInformation(job: Job): JobInformation {
|
||||||
|
let cron = ""
|
||||||
|
let every = -1
|
||||||
|
let tz: string | undefined = undefined
|
||||||
|
let endDate: number | undefined = undefined
|
||||||
|
|
||||||
|
const repeat = job.opts?.repeat
|
||||||
|
if (repeat) {
|
||||||
|
endDate = repeat.endDate ? new Date(repeat.endDate).getTime() : Date.now()
|
||||||
|
tz = repeat.tz
|
||||||
|
if ("cron" in repeat) {
|
||||||
|
cron = repeat.cron
|
||||||
|
} else {
|
||||||
|
every = repeat.every
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
id: job.id.toString(),
|
||||||
|
name: "",
|
||||||
|
key: job.id.toString(),
|
||||||
|
tz,
|
||||||
|
endDate,
|
||||||
|
cron,
|
||||||
|
every,
|
||||||
|
next: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
interface JobMessage<T = any> extends Partial<Job<T>> {
|
||||||
id: string
|
id: string
|
||||||
timestamp: number
|
timestamp: number
|
||||||
queue: string
|
queue: Queue<T>
|
||||||
data: any
|
data: any
|
||||||
opts?: JobOptions
|
opts?: JobOptions
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in
|
||||||
* the state of the message, this object constructor implements the same schema of Bull jobs
|
* memory as a sort of mock. It is relatively simple, using an event emitter
|
||||||
* for the sake of maintaining API consistency.
|
* internally to register when messages are available to the consumers - in can
|
||||||
* @param queue The name of the queue which the message will be carried on.
|
* support many inputs and many consumers.
|
||||||
* @param message The JSON message which will be passed back to the consumer.
|
|
||||||
* @returns A new job which can now be put onto the queue, this is mostly an
|
|
||||||
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
|
|
||||||
*/
|
|
||||||
function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
|
|
||||||
return {
|
|
||||||
id: newid(),
|
|
||||||
timestamp: Date.now(),
|
|
||||||
queue: queue,
|
|
||||||
data: message,
|
|
||||||
opts,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is designed to replicate Bull (https://github.com/OptimalBits/bull) in memory as a sort of mock.
|
|
||||||
* It is relatively simple, using an event emitter internally to register when messages are available
|
|
||||||
* to the consumers - in can support many inputs and many consumers.
|
|
||||||
*/
|
*/
|
||||||
class InMemoryQueue implements Partial<Queue> {
|
class InMemoryQueue implements Partial<Queue> {
|
||||||
_name: string
|
_name: string
|
||||||
_opts?: QueueOptions
|
_opts?: QueueOptions
|
||||||
_messages: JobMessage[]
|
_messages: JobMessage[]
|
||||||
_queuedJobIds: Set<string>
|
_queuedJobIds: Set<string>
|
||||||
_emitter: NodeJS.EventEmitter
|
_emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }>
|
||||||
_runCount: number
|
_runCount: number
|
||||||
_addCount: number
|
_addCount: number
|
||||||
|
|
||||||
|
@ -69,34 +82,29 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
*/
|
*/
|
||||||
async process(concurrencyOrFunc: number | any, func?: any) {
|
async process(concurrencyOrFunc: number | any, func?: any) {
|
||||||
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
|
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
|
||||||
this._emitter.on("message", async () => {
|
this._emitter.on("message", async message => {
|
||||||
if (this._messages.length <= 0) {
|
let resp = func(message)
|
||||||
return
|
|
||||||
}
|
|
||||||
let msg = this._messages.shift()
|
|
||||||
|
|
||||||
let resp = func(msg)
|
|
||||||
|
|
||||||
async function retryFunc(fnc: any) {
|
async function retryFunc(fnc: any) {
|
||||||
try {
|
try {
|
||||||
await fnc
|
await fnc
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
await new Promise<void>(r => setTimeout(() => r(), 50))
|
await helpers.wait(50)
|
||||||
|
await retryFunc(func(message))
|
||||||
await retryFunc(func(msg))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (resp.then != null) {
|
if (resp.then != null) {
|
||||||
try {
|
try {
|
||||||
await retryFunc(resp)
|
await retryFunc(resp)
|
||||||
|
this._emitter.emit("completed", message as Job)
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
console.error(e)
|
console.error(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this._runCount++
|
this._runCount++
|
||||||
const jobId = msg?.opts?.jobId?.toString()
|
const jobId = message.opts?.jobId?.toString()
|
||||||
if (jobId && msg?.opts?.removeOnComplete) {
|
if (jobId && message.opts?.removeOnComplete) {
|
||||||
this._queuedJobIds.delete(jobId)
|
this._queuedJobIds.delete(jobId)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
@ -130,9 +138,16 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
}
|
}
|
||||||
|
|
||||||
const pushMessage = () => {
|
const pushMessage = () => {
|
||||||
this._messages.push(newJob(this._name, data, opts))
|
const message: JobMessage = {
|
||||||
|
id: newid(),
|
||||||
|
timestamp: Date.now(),
|
||||||
|
queue: this as unknown as Queue,
|
||||||
|
data,
|
||||||
|
opts,
|
||||||
|
}
|
||||||
|
this._messages.push(message)
|
||||||
this._addCount++
|
this._addCount++
|
||||||
this._emitter.emit("message")
|
this._emitter.emit("message", message)
|
||||||
}
|
}
|
||||||
|
|
||||||
const delay = opts?.delay
|
const delay = opts?.delay
|
||||||
|
@ -158,13 +173,6 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
console.log(cronJobId)
|
console.log(cronJobId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Implemented for tests
|
|
||||||
*/
|
|
||||||
async getRepeatableJobs() {
|
|
||||||
return []
|
|
||||||
}
|
|
||||||
|
|
||||||
async removeJobs(_pattern: string) {
|
async removeJobs(_pattern: string) {
|
||||||
// no-op
|
// no-op
|
||||||
}
|
}
|
||||||
|
@ -176,13 +184,31 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
|
|
||||||
async getJob() {
|
async getJob(id: JobId) {
|
||||||
|
for (const message of this._messages) {
|
||||||
|
if (message.id === id) {
|
||||||
|
return message as Job
|
||||||
|
}
|
||||||
|
}
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
on() {
|
on(event: string, callback: (...args: any[]) => void): Queue {
|
||||||
// do nothing
|
// @ts-expect-error - this callback can be one of many types
|
||||||
return this as any
|
this._emitter.on(event, callback)
|
||||||
|
return this as unknown as Queue
|
||||||
|
}
|
||||||
|
|
||||||
|
async count() {
|
||||||
|
return this._messages.length
|
||||||
|
}
|
||||||
|
|
||||||
|
async getCompletedCount() {
|
||||||
|
return this._runCount
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobs() {
|
||||||
|
return this._messages.map(job => jobToJobInformation(job as Job))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,9 +5,9 @@ import * as automation from "../threads/automation"
|
||||||
import { backups } from "@budibase/pro"
|
import { backups } from "@budibase/pro"
|
||||||
import { getAppMigrationQueue } from "../appMigrations/queue"
|
import { getAppMigrationQueue } from "../appMigrations/queue"
|
||||||
import { createBullBoard } from "@bull-board/api"
|
import { createBullBoard } from "@bull-board/api"
|
||||||
import BullQueue from "bull"
|
import { AutomationData } from "@budibase/types"
|
||||||
|
|
||||||
export const automationQueue: BullQueue.Queue = queue.createQueue(
|
export const automationQueue = queue.createQueue<AutomationData>(
|
||||||
queue.JobQueue.AUTOMATION,
|
queue.JobQueue.AUTOMATION,
|
||||||
{ removeStalledCb: automation.removeStalled }
|
{ removeStalledCb: automation.removeStalled }
|
||||||
)
|
)
|
||||||
|
@ -16,24 +16,20 @@ const PATH_PREFIX = "/bulladmin"
|
||||||
|
|
||||||
export async function init() {
|
export async function init() {
|
||||||
// Set up queues for bull board admin
|
// Set up queues for bull board admin
|
||||||
|
const queues = [new BullAdapter(automationQueue)]
|
||||||
|
|
||||||
const backupQueue = backups.getBackupQueue()
|
const backupQueue = backups.getBackupQueue()
|
||||||
const appMigrationQueue = getAppMigrationQueue()
|
|
||||||
const queues = [automationQueue]
|
|
||||||
if (backupQueue) {
|
if (backupQueue) {
|
||||||
queues.push(backupQueue)
|
queues.push(new BullAdapter(backupQueue))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const appMigrationQueue = getAppMigrationQueue()
|
||||||
if (appMigrationQueue) {
|
if (appMigrationQueue) {
|
||||||
queues.push(appMigrationQueue)
|
queues.push(new BullAdapter(appMigrationQueue))
|
||||||
}
|
}
|
||||||
const adapters = []
|
|
||||||
const serverAdapter: any = new KoaAdapter()
|
const serverAdapter = new KoaAdapter()
|
||||||
for (let queue of queues) {
|
createBullBoard({ queues, serverAdapter })
|
||||||
adapters.push(new BullAdapter(queue))
|
|
||||||
}
|
|
||||||
createBullBoard({
|
|
||||||
queues: adapters,
|
|
||||||
serverAdapter,
|
|
||||||
})
|
|
||||||
serverAdapter.setBasePath(PATH_PREFIX)
|
serverAdapter.setBasePath(PATH_PREFIX)
|
||||||
return serverAdapter.registerPlugin()
|
return serverAdapter.registerPlugin()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
import { processEvent } from "./utils"
|
import { processEvent } from "./utils"
|
||||||
import { automationQueue } from "./bullboard"
|
import { automationQueue } from "./bullboard"
|
||||||
import { rebootTrigger } from "./triggers"
|
import { rebootTrigger } from "./triggers"
|
||||||
import BullQueue from "bull"
|
|
||||||
import { automationsEnabled } from "../features"
|
import { automationsEnabled } from "../features"
|
||||||
|
|
||||||
export { automationQueue } from "./bullboard"
|
export { automationQueue } from "./bullboard"
|
||||||
|
@ -25,6 +24,6 @@ export async function init() {
|
||||||
return promise
|
return promise
|
||||||
}
|
}
|
||||||
|
|
||||||
export function getQueues(): BullQueue.Queue[] {
|
export function getQueue() {
|
||||||
return [automationQueue]
|
return automationQueue
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,62 @@
|
||||||
|
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
||||||
|
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
||||||
|
import { getQueue } from "../.."
|
||||||
|
import { Job } from "bull"
|
||||||
|
|
||||||
|
describe("cron trigger", () => {
|
||||||
|
const config = new TestConfiguration()
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
await config.init()
|
||||||
|
})
|
||||||
|
|
||||||
|
afterAll(() => {
|
||||||
|
config.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should queue a Bull cron job", async () => {
|
||||||
|
const queue = getQueue()
|
||||||
|
expect(await queue.getCompletedCount()).toEqual(0)
|
||||||
|
|
||||||
|
const jobPromise = new Promise<Job>(resolve => {
|
||||||
|
queue.on("completed", async job => {
|
||||||
|
resolve(job)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
await createAutomationBuilder({ config })
|
||||||
|
.cron({ cron: "* * * * *" })
|
||||||
|
.serverLog({
|
||||||
|
text: "Hello, world!",
|
||||||
|
})
|
||||||
|
.save()
|
||||||
|
|
||||||
|
await config.api.application.publish(config.getAppId())
|
||||||
|
|
||||||
|
expect(await queue.getCompletedCount()).toEqual(1)
|
||||||
|
|
||||||
|
const job = await jobPromise
|
||||||
|
const repeat = job.opts?.repeat
|
||||||
|
if (!repeat || !("cron" in repeat)) {
|
||||||
|
throw new Error("Expected cron repeat")
|
||||||
|
}
|
||||||
|
expect(repeat.cron).toEqual("* * * * *")
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should fail if the cron expression is invalid", async () => {
|
||||||
|
await createAutomationBuilder({ config })
|
||||||
|
.cron({ cron: "* * * * * *" })
|
||||||
|
.serverLog({
|
||||||
|
text: "Hello, world!",
|
||||||
|
})
|
||||||
|
.save()
|
||||||
|
|
||||||
|
await config.api.application.publish(config.getAppId(), {
|
||||||
|
status: 500,
|
||||||
|
body: {
|
||||||
|
message:
|
||||||
|
'Deployment Failed: Invalid automation CRON "* * * * * *" - Expected 5 values, but got 6.',
|
||||||
|
},
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
|
@ -230,7 +230,7 @@ export async function enableCronTrigger(appId: any, automation: Automation) {
|
||||||
// can't use getAppDB here as this is likely to be called from dev app,
|
// can't use getAppDB here as this is likely to be called from dev app,
|
||||||
// but this call could be for dev app or prod app, need to just use what
|
// but this call could be for dev app or prod app, need to just use what
|
||||||
// was passed in
|
// was passed in
|
||||||
await dbCore.doWithDB(appId, async (db: any) => {
|
await dbCore.doWithDB(appId, async db => {
|
||||||
const response = await db.put(automation)
|
const response = await db.put(automation)
|
||||||
automation._id = response.id
|
automation._id = response.id
|
||||||
automation._rev = response.rev
|
automation._rev = response.rev
|
||||||
|
|
|
@ -33,7 +33,10 @@ export class ApplicationAPI extends TestAPI {
|
||||||
await this._delete(`/api/applications/${appId}`, { expectations })
|
await this._delete(`/api/applications/${appId}`, { expectations })
|
||||||
}
|
}
|
||||||
|
|
||||||
publish = async (appId: string): Promise<PublishResponse> => {
|
publish = async (
|
||||||
|
appId: string,
|
||||||
|
expectations?: Expectations
|
||||||
|
): Promise<PublishResponse> => {
|
||||||
return await this._post<PublishResponse>(
|
return await this._post<PublishResponse>(
|
||||||
`/api/applications/${appId}/publish`,
|
`/api/applications/${appId}/publish`,
|
||||||
{
|
{
|
||||||
|
@ -42,14 +45,16 @@ export class ApplicationAPI extends TestAPI {
|
||||||
headers: {
|
headers: {
|
||||||
[constants.Header.APP_ID]: appId,
|
[constants.Header.APP_ID]: appId,
|
||||||
},
|
},
|
||||||
|
expectations,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
unpublish = async (appId: string): Promise<void> => {
|
unpublish = async (
|
||||||
await this._post(`/api/applications/${appId}/unpublish`, {
|
appId: string,
|
||||||
expectations: { status: 200 },
|
expectations?: Expectations
|
||||||
})
|
): Promise<void> => {
|
||||||
|
await this._post(`/api/applications/${appId}/unpublish`, { expectations })
|
||||||
}
|
}
|
||||||
|
|
||||||
sync = async (
|
sync = async (
|
||||||
|
@ -144,13 +149,20 @@ export class ApplicationAPI extends TestAPI {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fetch = async ({ status }: { status?: AppStatus } = {}): Promise<App[]> => {
|
fetch = async (
|
||||||
|
{ status }: { status?: AppStatus } = {},
|
||||||
|
expectations?: Expectations
|
||||||
|
): Promise<App[]> => {
|
||||||
return await this._get<App[]>("/api/applications", {
|
return await this._get<App[]>("/api/applications", {
|
||||||
query: { status },
|
query: { status },
|
||||||
|
expectations,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
addSampleData = async (appId: string): Promise<void> => {
|
addSampleData = async (
|
||||||
await this._post(`/api/applications/${appId}/sample`)
|
appId: string,
|
||||||
|
expectations?: Expectations
|
||||||
|
): Promise<void> => {
|
||||||
|
await this._post(`/api/applications/${appId}/sample`, { expectations })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue