Merge branch 'master' into ts/builder-components-helpers
This commit is contained in:
commit
bda6a24a9d
|
@ -3,6 +3,7 @@ import { newid } from "../utils"
|
||||||
import { Queue, QueueOptions, JobOptions } from "./queue"
|
import { Queue, QueueOptions, JobOptions } from "./queue"
|
||||||
import { helpers } from "@budibase/shared-core"
|
import { helpers } from "@budibase/shared-core"
|
||||||
import { Job, JobId, JobInformation } from "bull"
|
import { Job, JobId, JobInformation } from "bull"
|
||||||
|
import { cloneDeep } from "lodash"
|
||||||
|
|
||||||
function jobToJobInformation(job: Job): JobInformation {
|
function jobToJobInformation(job: Job): JobInformation {
|
||||||
let cron = ""
|
let cron = ""
|
||||||
|
@ -33,12 +34,13 @@ function jobToJobInformation(job: Job): JobInformation {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
interface JobMessage<T = any> extends Partial<Job<T>> {
|
export interface TestQueueMessage<T = any> extends Partial<Job<T>> {
|
||||||
id: string
|
id: string
|
||||||
timestamp: number
|
timestamp: number
|
||||||
queue: Queue<T>
|
queue: Queue<T>
|
||||||
data: any
|
data: any
|
||||||
opts?: JobOptions
|
opts?: JobOptions
|
||||||
|
manualTrigger?: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -47,12 +49,16 @@ interface JobMessage<T = any> extends Partial<Job<T>> {
|
||||||
* internally to register when messages are available to the consumers - in can
|
* internally to register when messages are available to the consumers - in can
|
||||||
* support many inputs and many consumers.
|
* support many inputs and many consumers.
|
||||||
*/
|
*/
|
||||||
class InMemoryQueue implements Partial<Queue> {
|
export class InMemoryQueue<T = any> implements Partial<Queue<T>> {
|
||||||
_name: string
|
_name: string
|
||||||
_opts?: QueueOptions
|
_opts?: QueueOptions
|
||||||
_messages: JobMessage[]
|
_messages: TestQueueMessage<T>[]
|
||||||
_queuedJobIds: Set<string>
|
_queuedJobIds: Set<string>
|
||||||
_emitter: NodeJS.EventEmitter<{ message: [JobMessage]; completed: [Job] }>
|
_emitter: NodeJS.EventEmitter<{
|
||||||
|
message: [TestQueueMessage<T>]
|
||||||
|
completed: [Job<T>]
|
||||||
|
removed: [TestQueueMessage<T>]
|
||||||
|
}>
|
||||||
_runCount: number
|
_runCount: number
|
||||||
_addCount: number
|
_addCount: number
|
||||||
|
|
||||||
|
@ -82,7 +88,15 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
*/
|
*/
|
||||||
async process(concurrencyOrFunc: number | any, func?: any) {
|
async process(concurrencyOrFunc: number | any, func?: any) {
|
||||||
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
|
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
|
||||||
this._emitter.on("message", async message => {
|
this._emitter.on("message", async msg => {
|
||||||
|
const message = cloneDeep(msg)
|
||||||
|
|
||||||
|
// For the purpose of testing, don't trigger cron jobs immediately.
|
||||||
|
// Require the test to trigger them manually with timestamps.
|
||||||
|
if (!message.manualTrigger && message.opts?.repeat != null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
let resp = func(message)
|
let resp = func(message)
|
||||||
|
|
||||||
async function retryFunc(fnc: any) {
|
async function retryFunc(fnc: any) {
|
||||||
|
@ -97,7 +111,7 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
if (resp.then != null) {
|
if (resp.then != null) {
|
||||||
try {
|
try {
|
||||||
await retryFunc(resp)
|
await retryFunc(resp)
|
||||||
this._emitter.emit("completed", message as Job)
|
this._emitter.emit("completed", message as Job<T>)
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
console.error(e)
|
console.error(e)
|
||||||
}
|
}
|
||||||
|
@ -114,7 +128,6 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
return this as any
|
return this as any
|
||||||
}
|
}
|
||||||
|
|
||||||
// simply puts a message to the queue and emits to the queue for processing
|
|
||||||
/**
|
/**
|
||||||
* Simple function to replicate the add message functionality of Bull, putting
|
* Simple function to replicate the add message functionality of Bull, putting
|
||||||
* a new message on the queue. This then emits an event which will be used to
|
* a new message on the queue. This then emits an event which will be used to
|
||||||
|
@ -123,7 +136,13 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
* a JSON message as this is required by Bull.
|
* a JSON message as this is required by Bull.
|
||||||
* @param repeat serves no purpose for the import queue.
|
* @param repeat serves no purpose for the import queue.
|
||||||
*/
|
*/
|
||||||
async add(data: any, opts?: JobOptions) {
|
async add(data: T | string, optsOrT?: JobOptions | T) {
|
||||||
|
if (typeof data === "string") {
|
||||||
|
throw new Error("doesn't support named jobs")
|
||||||
|
}
|
||||||
|
|
||||||
|
const opts = optsOrT as JobOptions
|
||||||
|
|
||||||
const jobId = opts?.jobId?.toString()
|
const jobId = opts?.jobId?.toString()
|
||||||
if (jobId && this._queuedJobIds.has(jobId)) {
|
if (jobId && this._queuedJobIds.has(jobId)) {
|
||||||
console.log(`Ignoring already queued job ${jobId}`)
|
console.log(`Ignoring already queued job ${jobId}`)
|
||||||
|
@ -138,7 +157,7 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
}
|
}
|
||||||
|
|
||||||
const pushMessage = () => {
|
const pushMessage = () => {
|
||||||
const message: JobMessage = {
|
const message: TestQueueMessage = {
|
||||||
id: newid(),
|
id: newid(),
|
||||||
timestamp: Date.now(),
|
timestamp: Date.now(),
|
||||||
queue: this as unknown as Queue,
|
queue: this as unknown as Queue,
|
||||||
|
@ -164,13 +183,14 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
*/
|
*/
|
||||||
async close() {}
|
async close() {}
|
||||||
|
|
||||||
/**
|
async removeRepeatableByKey(id: string) {
|
||||||
* This removes a cron which has been implemented, this is part of Bull API.
|
for (const [idx, message] of this._messages.entries()) {
|
||||||
* @param cronJobId The cron which is to be removed.
|
if (message.id === id) {
|
||||||
*/
|
this._messages.splice(idx, 1)
|
||||||
async removeRepeatableByKey(cronJobId: string) {
|
this._emitter.emit("removed", message)
|
||||||
// TODO: implement for testing
|
return
|
||||||
console.log(cronJobId)
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async removeJobs(_pattern: string) {
|
async removeJobs(_pattern: string) {
|
||||||
|
@ -193,6 +213,16 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
|
|
||||||
|
manualTrigger(id: JobId) {
|
||||||
|
for (const message of this._messages) {
|
||||||
|
if (message.id === id) {
|
||||||
|
this._emitter.emit("message", { ...message, manualTrigger: true })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new Error(`Job with id ${id} not found`)
|
||||||
|
}
|
||||||
|
|
||||||
on(event: string, callback: (...args: any[]) => void): Queue {
|
on(event: string, callback: (...args: any[]) => void): Queue {
|
||||||
// @ts-expect-error - this callback can be one of many types
|
// @ts-expect-error - this callback can be one of many types
|
||||||
this._emitter.on(event, callback)
|
this._emitter.on(event, callback)
|
||||||
|
@ -214,7 +244,9 @@ class InMemoryQueue implements Partial<Queue> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async getRepeatableJobs() {
|
async getRepeatableJobs() {
|
||||||
return this._messages.map(job => jobToJobInformation(job as Job))
|
return this._messages
|
||||||
|
.filter(job => job.opts?.repeat != null)
|
||||||
|
.map(job => jobToJobInformation(job as Job))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,2 +1,3 @@
|
||||||
export * from "./queue"
|
export * from "./queue"
|
||||||
export * from "./constants"
|
export * from "./constants"
|
||||||
|
export * from "./inMemoryQueue"
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import env from "../../environment"
|
import env from "../../environment"
|
||||||
import { AutomationResults, Automation, App } from "@budibase/types"
|
import { AutomationResults, Automation, App } from "@budibase/types"
|
||||||
import { automations } from "@budibase/pro"
|
import { automations } from "@budibase/pro"
|
||||||
import { db as dbUtils } from "@budibase/backend-core"
|
import { db as dbUtils, logging } from "@budibase/backend-core"
|
||||||
import sizeof from "object-sizeof"
|
import sizeof from "object-sizeof"
|
||||||
|
|
||||||
const MAX_LOG_SIZE_MB = 5
|
const MAX_LOG_SIZE_MB = 5
|
||||||
|
@ -32,7 +32,16 @@ export async function storeLog(
|
||||||
if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) {
|
if (bytes / MB_IN_BYTES > MAX_LOG_SIZE_MB) {
|
||||||
sanitiseResults(results)
|
sanitiseResults(results)
|
||||||
}
|
}
|
||||||
|
try {
|
||||||
await automations.logs.storeLog(automation, results)
|
await automations.logs.storeLog(automation, results)
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.status === 413 && e.request?.data) {
|
||||||
|
// if content is too large we shouldn't log it
|
||||||
|
delete e.request.data
|
||||||
|
e.request.data = { message: "removed due to large size" }
|
||||||
|
}
|
||||||
|
logging.logAlert("Error writing automation log", e)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function checkAppMetadata(apps: App[]) {
|
export async function checkAppMetadata(apps: App[]) {
|
||||||
|
|
|
@ -1,45 +0,0 @@
|
||||||
import tk from "timekeeper"
|
|
||||||
import "../../../environment"
|
|
||||||
import * as automations from "../../index"
|
|
||||||
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
|
||||||
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
|
||||||
|
|
||||||
const initialTime = Date.now()
|
|
||||||
tk.freeze(initialTime)
|
|
||||||
|
|
||||||
const oneMinuteInMs = 60 * 1000
|
|
||||||
|
|
||||||
describe("cron automations", () => {
|
|
||||||
const config = new TestConfiguration()
|
|
||||||
|
|
||||||
beforeAll(async () => {
|
|
||||||
await automations.init()
|
|
||||||
await config.init()
|
|
||||||
})
|
|
||||||
|
|
||||||
afterAll(async () => {
|
|
||||||
await automations.shutdown()
|
|
||||||
config.end()
|
|
||||||
})
|
|
||||||
|
|
||||||
beforeEach(() => {
|
|
||||||
tk.freeze(initialTime)
|
|
||||||
})
|
|
||||||
|
|
||||||
it("should initialise the automation timestamp", async () => {
|
|
||||||
await createAutomationBuilder(config).onCron({ cron: "* * * * *" }).save()
|
|
||||||
|
|
||||||
tk.travel(Date.now() + oneMinuteInMs)
|
|
||||||
await config.publish()
|
|
||||||
|
|
||||||
const { data } = await config.getAutomationLogs()
|
|
||||||
expect(data).toHaveLength(1)
|
|
||||||
expect(data).toEqual([
|
|
||||||
expect.objectContaining({
|
|
||||||
trigger: expect.objectContaining({
|
|
||||||
outputs: { timestamp: initialTime + oneMinuteInMs },
|
|
||||||
}),
|
|
||||||
}),
|
|
||||||
])
|
|
||||||
})
|
|
||||||
})
|
|
|
@ -21,6 +21,11 @@ describe("Attempt to run a basic loop automation", () => {
|
||||||
})
|
})
|
||||||
|
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
|
const { automations } = await config.api.automation.fetch()
|
||||||
|
for (const automation of automations) {
|
||||||
|
await config.api.automation.delete(automation)
|
||||||
|
}
|
||||||
|
|
||||||
table = await config.api.table.save(basicTable())
|
table = await config.api.table.save(basicTable())
|
||||||
await config.api.row.save(table._id!, {})
|
await config.api.row.save(table._id!, {})
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,6 +1,15 @@
|
||||||
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
import { createAutomationBuilder } from "../utilities/AutomationTestBuilder"
|
||||||
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
import TestConfiguration from "../../../tests/utilities/TestConfiguration"
|
||||||
import { captureAutomationResults } from "../utilities"
|
import {
|
||||||
|
captureAutomationMessages,
|
||||||
|
captureAutomationRemovals,
|
||||||
|
captureAutomationResults,
|
||||||
|
triggerCron,
|
||||||
|
} from "../utilities"
|
||||||
|
import { automations } from "@budibase/pro"
|
||||||
|
import { AutomationData, AutomationStatus } from "@budibase/types"
|
||||||
|
import { MAX_AUTOMATION_RECURRING_ERRORS } from "../../../constants"
|
||||||
|
import { queue } from "@budibase/backend-core"
|
||||||
|
|
||||||
describe("cron trigger", () => {
|
describe("cron trigger", () => {
|
||||||
const config = new TestConfiguration()
|
const config = new TestConfiguration()
|
||||||
|
@ -13,6 +22,13 @@ describe("cron trigger", () => {
|
||||||
config.end()
|
config.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const { automations } = await config.api.automation.fetch()
|
||||||
|
for (const automation of automations) {
|
||||||
|
await config.api.automation.delete(automation)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
it("should queue a Bull cron job", async () => {
|
it("should queue a Bull cron job", async () => {
|
||||||
const { automation } = await createAutomationBuilder(config)
|
const { automation } = await createAutomationBuilder(config)
|
||||||
.onCron({ cron: "* * * * *" })
|
.onCron({ cron: "* * * * *" })
|
||||||
|
@ -21,12 +37,12 @@ describe("cron trigger", () => {
|
||||||
})
|
})
|
||||||
.save()
|
.save()
|
||||||
|
|
||||||
const jobs = await captureAutomationResults(automation, () =>
|
const messages = await captureAutomationMessages(automation, () =>
|
||||||
config.api.application.publish()
|
config.api.application.publish()
|
||||||
)
|
)
|
||||||
expect(jobs).toHaveLength(1)
|
expect(messages).toHaveLength(1)
|
||||||
|
|
||||||
const repeat = jobs[0].opts?.repeat
|
const repeat = messages[0].opts?.repeat
|
||||||
if (!repeat || !("cron" in repeat)) {
|
if (!repeat || !("cron" in repeat)) {
|
||||||
throw new Error("Expected cron repeat")
|
throw new Error("Expected cron repeat")
|
||||||
}
|
}
|
||||||
|
@ -49,4 +65,85 @@ describe("cron trigger", () => {
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("should stop if the job fails more than 3 times", async () => {
|
||||||
|
const { automation } = await createAutomationBuilder(config)
|
||||||
|
.onCron({ cron: "* * * * *" })
|
||||||
|
.queryRows({
|
||||||
|
// @ts-expect-error intentionally sending invalid data
|
||||||
|
tableId: null,
|
||||||
|
})
|
||||||
|
.save()
|
||||||
|
|
||||||
|
const [message] = await captureAutomationMessages(automation, () =>
|
||||||
|
config.api.application.publish()
|
||||||
|
)
|
||||||
|
|
||||||
|
await config.withProdApp(async () => {
|
||||||
|
let results: queue.TestQueueMessage<AutomationData>[] = []
|
||||||
|
const removed = await captureAutomationRemovals(automation, async () => {
|
||||||
|
results = await captureAutomationResults(automation, async () => {
|
||||||
|
for (let i = 0; i < MAX_AUTOMATION_RECURRING_ERRORS; i++) {
|
||||||
|
triggerCron(message)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
expect(removed).toHaveLength(1)
|
||||||
|
expect(removed[0].id).toEqual(message.id)
|
||||||
|
|
||||||
|
expect(results).toHaveLength(5)
|
||||||
|
|
||||||
|
const search = await automations.logs.logSearch({
|
||||||
|
automationId: automation._id,
|
||||||
|
status: AutomationStatus.STOPPED_ERROR,
|
||||||
|
})
|
||||||
|
expect(search.data).toHaveLength(1)
|
||||||
|
expect(search.data[0].status).toEqual(AutomationStatus.STOPPED_ERROR)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should fill in the timestamp if one is not provided", async () => {
|
||||||
|
const runner = await createAutomationBuilder(config)
|
||||||
|
.onCron({ cron: "* * * * *" })
|
||||||
|
.serverLog({
|
||||||
|
text: "Hello, world!",
|
||||||
|
})
|
||||||
|
.save()
|
||||||
|
|
||||||
|
await config.api.application.publish()
|
||||||
|
|
||||||
|
const results = await captureAutomationResults(
|
||||||
|
runner.automation,
|
||||||
|
async () => {
|
||||||
|
await runner.trigger({ timeout: 1000, fields: {} })
|
||||||
|
}
|
||||||
|
)
|
||||||
|
expect(results).toHaveLength(1)
|
||||||
|
expect(results[0].data.event.timestamp).toBeWithin(
|
||||||
|
Date.now() - 1000,
|
||||||
|
Date.now() + 1000
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should use the given timestamp if one is given", async () => {
|
||||||
|
const timestamp = 1234
|
||||||
|
const runner = await createAutomationBuilder(config)
|
||||||
|
.onCron({ cron: "* * * * *" })
|
||||||
|
.serverLog({
|
||||||
|
text: "Hello, world!",
|
||||||
|
})
|
||||||
|
.save()
|
||||||
|
|
||||||
|
await config.api.application.publish()
|
||||||
|
|
||||||
|
const results = await captureAutomationResults(
|
||||||
|
runner.automation,
|
||||||
|
async () => {
|
||||||
|
await runner.trigger({ timeout: 1000, fields: {}, timestamp })
|
||||||
|
}
|
||||||
|
)
|
||||||
|
expect(results).toHaveLength(1)
|
||||||
|
expect(results[0].data.event.timestamp).toEqual(timestamp)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -220,10 +220,34 @@ class AutomationRunner<TStep extends AutomationTriggerStepId> {
|
||||||
async trigger(
|
async trigger(
|
||||||
request: TriggerAutomationRequest
|
request: TriggerAutomationRequest
|
||||||
): Promise<TriggerAutomationResponse> {
|
): Promise<TriggerAutomationResponse> {
|
||||||
|
if (!this.config.prodAppId) {
|
||||||
|
throw new Error(
|
||||||
|
"Automations can only be triggered in a production app context, call config.api.application.publish()"
|
||||||
|
)
|
||||||
|
}
|
||||||
|
// Because you can only trigger automations in a production app context, we
|
||||||
|
// wrap the trigger call to make tests a bit cleaner. If you really want to
|
||||||
|
// test triggering an automation in a dev app context, you can use the
|
||||||
|
// automation API directly.
|
||||||
|
return await this.config.withProdApp(async () => {
|
||||||
|
try {
|
||||||
return await this.config.api.automation.trigger(
|
return await this.config.api.automation.trigger(
|
||||||
this.automation._id!,
|
this.automation._id!,
|
||||||
request
|
request
|
||||||
)
|
)
|
||||||
|
} catch (e: any) {
|
||||||
|
if (e.cause.status === 404) {
|
||||||
|
throw new Error(
|
||||||
|
`Automation with ID ${
|
||||||
|
this.automation._id
|
||||||
|
} not found in app ${this.config.getAppId()}. You may have forgotten to call config.api.application.publish().`,
|
||||||
|
{ cause: e }
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import { Knex } from "knex"
|
||||||
import { getQueue } from "../.."
|
import { getQueue } from "../.."
|
||||||
import { Job } from "bull"
|
import { Job } from "bull"
|
||||||
import { helpers } from "@budibase/shared-core"
|
import { helpers } from "@budibase/shared-core"
|
||||||
|
import { queue } from "@budibase/backend-core"
|
||||||
|
|
||||||
let config: TestConfiguration
|
let config: TestConfiguration
|
||||||
|
|
||||||
|
@ -20,6 +21,17 @@ export function afterAll() {
|
||||||
config.end()
|
config.end()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function getTestQueue(): queue.InMemoryQueue<AutomationData> {
|
||||||
|
return getQueue() as unknown as queue.InMemoryQueue<AutomationData>
|
||||||
|
}
|
||||||
|
|
||||||
|
export function triggerCron(message: Job<AutomationData>) {
|
||||||
|
if (!message.opts?.repeat || !("cron" in message.opts.repeat)) {
|
||||||
|
throw new Error("Expected cron message")
|
||||||
|
}
|
||||||
|
getTestQueue().manualTrigger(message.id)
|
||||||
|
}
|
||||||
|
|
||||||
export async function runInProd(fn: any) {
|
export async function runInProd(fn: any) {
|
||||||
env._set("NODE_ENV", "production")
|
env._set("NODE_ENV", "production")
|
||||||
let error
|
let error
|
||||||
|
@ -34,23 +46,99 @@ export async function runInProd(fn: any) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function captureAllAutomationRemovals(f: () => Promise<unknown>) {
|
||||||
|
const messages: Job<AutomationData>[] = []
|
||||||
|
const queue = getQueue()
|
||||||
|
|
||||||
|
const messageListener = async (message: Job<AutomationData>) => {
|
||||||
|
messages.push(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.on("removed", messageListener)
|
||||||
|
try {
|
||||||
|
await f()
|
||||||
|
// Queue messages tend to be send asynchronously in API handlers, so there's
|
||||||
|
// no guarantee that awaiting this function will have queued anything yet.
|
||||||
|
// We wait here to make sure we're queued _after_ any existing async work.
|
||||||
|
await helpers.wait(100)
|
||||||
|
} finally {
|
||||||
|
queue.off("removed", messageListener)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function captureAutomationRemovals(
|
||||||
|
automation: Automation | string,
|
||||||
|
f: () => Promise<unknown>
|
||||||
|
) {
|
||||||
|
const messages = await captureAllAutomationRemovals(f)
|
||||||
|
return messages.filter(
|
||||||
|
m =>
|
||||||
|
m.data.automation._id ===
|
||||||
|
(typeof automation === "string" ? automation : automation._id)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function captureAllAutomationMessages(f: () => Promise<unknown>) {
|
||||||
|
const messages: Job<AutomationData>[] = []
|
||||||
|
const queue = getQueue()
|
||||||
|
|
||||||
|
const messageListener = async (message: Job<AutomationData>) => {
|
||||||
|
messages.push(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
queue.on("message", messageListener)
|
||||||
|
try {
|
||||||
|
await f()
|
||||||
|
// Queue messages tend to be send asynchronously in API handlers, so there's
|
||||||
|
// no guarantee that awaiting this function will have queued anything yet.
|
||||||
|
// We wait here to make sure we're queued _after_ any existing async work.
|
||||||
|
await helpers.wait(100)
|
||||||
|
} finally {
|
||||||
|
queue.off("message", messageListener)
|
||||||
|
}
|
||||||
|
|
||||||
|
return messages
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function captureAutomationMessages(
|
||||||
|
automation: Automation | string,
|
||||||
|
f: () => Promise<unknown>
|
||||||
|
) {
|
||||||
|
const messages = await captureAllAutomationMessages(f)
|
||||||
|
return messages.filter(
|
||||||
|
m =>
|
||||||
|
m.data.automation._id ===
|
||||||
|
(typeof automation === "string" ? automation : automation._id)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Capture all automation runs that occur during the execution of a function.
|
* Capture all automation runs that occur during the execution of a function.
|
||||||
* This function will wait for all messages to be processed before returning.
|
* This function will wait for all messages to be processed before returning.
|
||||||
*/
|
*/
|
||||||
export async function captureAllAutomationResults(
|
export async function captureAllAutomationResults(
|
||||||
f: () => Promise<unknown>
|
f: () => Promise<unknown>
|
||||||
): Promise<Job<AutomationData>[]> {
|
): Promise<queue.TestQueueMessage<AutomationData>[]> {
|
||||||
const runs: Job<AutomationData>[] = []
|
const runs: queue.TestQueueMessage<AutomationData>[] = []
|
||||||
const queue = getQueue()
|
const queue = getQueue()
|
||||||
let messagesReceived = 0
|
let messagesOutstanding = 0
|
||||||
|
|
||||||
const completedListener = async (job: Job<AutomationData>) => {
|
const completedListener = async (
|
||||||
|
job: queue.TestQueueMessage<AutomationData>
|
||||||
|
) => {
|
||||||
runs.push(job)
|
runs.push(job)
|
||||||
messagesReceived--
|
messagesOutstanding--
|
||||||
}
|
}
|
||||||
const messageListener = async () => {
|
const messageListener = async (
|
||||||
messagesReceived++
|
message: queue.TestQueueMessage<AutomationData>
|
||||||
|
) => {
|
||||||
|
// Don't count cron messages, as they don't get triggered automatically.
|
||||||
|
if (!message.manualTrigger && message.opts?.repeat != null) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
messagesOutstanding++
|
||||||
}
|
}
|
||||||
queue.on("message", messageListener)
|
queue.on("message", messageListener)
|
||||||
queue.on("completed", completedListener)
|
queue.on("completed", completedListener)
|
||||||
|
@ -61,9 +149,18 @@ export async function captureAllAutomationResults(
|
||||||
// We wait here to make sure we're queued _after_ any existing async work.
|
// We wait here to make sure we're queued _after_ any existing async work.
|
||||||
await helpers.wait(100)
|
await helpers.wait(100)
|
||||||
} finally {
|
} finally {
|
||||||
|
const waitMax = 10000
|
||||||
|
let waited = 0
|
||||||
// eslint-disable-next-line no-unmodified-loop-condition
|
// eslint-disable-next-line no-unmodified-loop-condition
|
||||||
while (messagesReceived > 0) {
|
while (messagesOutstanding > 0) {
|
||||||
await helpers.wait(50)
|
await helpers.wait(50)
|
||||||
|
waited += 50
|
||||||
|
if (waited > waitMax) {
|
||||||
|
// eslint-disable-next-line no-unsafe-finally
|
||||||
|
throw new Error(
|
||||||
|
`Timed out waiting for automation runs to complete. ${messagesOutstanding} messages waiting for completion.`
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
queue.off("completed", completedListener)
|
queue.off("completed", completedListener)
|
||||||
queue.off("message", messageListener)
|
queue.off("message", messageListener)
|
||||||
|
|
|
@ -72,7 +72,7 @@ export async function processEvent(job: AutomationJob) {
|
||||||
|
|
||||||
const task = async () => {
|
const task = async () => {
|
||||||
try {
|
try {
|
||||||
if (isCronTrigger(job.data.automation)) {
|
if (isCronTrigger(job.data.automation) && !job.data.event.timestamp) {
|
||||||
// Requires the timestamp at run time
|
// Requires the timestamp at run time
|
||||||
job.data.event.timestamp = Date.now()
|
job.data.event.timestamp = Date.now()
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,11 +261,13 @@ export default class TestConfiguration {
|
||||||
async withApp<R>(app: App | string, f: () => Promise<R>) {
|
async withApp<R>(app: App | string, f: () => Promise<R>) {
|
||||||
const oldAppId = this.appId
|
const oldAppId = this.appId
|
||||||
this.appId = typeof app === "string" ? app : app.appId
|
this.appId = typeof app === "string" ? app : app.appId
|
||||||
|
return await context.doInAppContext(this.appId, async () => {
|
||||||
try {
|
try {
|
||||||
return await f()
|
return await f()
|
||||||
} finally {
|
} finally {
|
||||||
this.appId = oldAppId
|
this.appId = oldAppId
|
||||||
}
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async withProdApp<R>(f: () => Promise<R>) {
|
async withProdApp<R>(f: () => Promise<R>) {
|
||||||
|
|
|
@ -155,23 +155,12 @@ class Orchestrator {
|
||||||
return step
|
return step
|
||||||
}
|
}
|
||||||
|
|
||||||
async getMetadata(): Promise<AutomationMetadata> {
|
isCron(): boolean {
|
||||||
const metadataId = generateAutomationMetadataID(this.automation._id!)
|
return isRecurring(this.automation)
|
||||||
const db = context.getAppDB()
|
|
||||||
let metadata: AutomationMetadata
|
|
||||||
try {
|
|
||||||
metadata = await db.get(metadataId)
|
|
||||||
} catch (err) {
|
|
||||||
metadata = {
|
|
||||||
_id: metadataId,
|
|
||||||
errorCount: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return metadata
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async stopCron(reason: string) {
|
async stopCron(reason: string) {
|
||||||
if (!this.job.opts.repeat) {
|
if (!this.isCron()) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logging.logWarn(
|
logging.logWarn(
|
||||||
|
@ -192,45 +181,37 @@ class Orchestrator {
|
||||||
await storeLog(automation, this.executionOutput)
|
await storeLog(automation, this.executionOutput)
|
||||||
}
|
}
|
||||||
|
|
||||||
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> {
|
async getMetadata(): Promise<AutomationMetadata> {
|
||||||
if (!metadata.errorCount || !this.job.opts.repeat) {
|
const metadataId = generateAutomationMetadataID(this.automation._id!)
|
||||||
return false
|
const db = context.getAppDB()
|
||||||
}
|
const metadata = await db.tryGet<AutomationMetadata>(metadataId)
|
||||||
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
return metadata || { _id: metadataId, errorCount: 0 }
|
||||||
await this.stopCron("errors")
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async updateMetadata(metadata: AutomationMetadata) {
|
async incrementErrorCount() {
|
||||||
const output = this.executionOutput,
|
|
||||||
automation = this.automation
|
|
||||||
if (!output || !isRecurring(automation)) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
const count = metadata.errorCount
|
|
||||||
const isError = isErrorInOutput(output)
|
|
||||||
// nothing to do in this scenario, escape
|
|
||||||
if (!count && !isError) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if (isError) {
|
|
||||||
metadata.errorCount = count ? count + 1 : 1
|
|
||||||
} else {
|
|
||||||
metadata.errorCount = 0
|
|
||||||
}
|
|
||||||
const db = context.getAppDB()
|
const db = context.getAppDB()
|
||||||
|
let err: Error | undefined = undefined
|
||||||
|
for (let attempt = 0; attempt < 10; attempt++) {
|
||||||
|
const metadata = await this.getMetadata()
|
||||||
|
metadata.errorCount ||= 0
|
||||||
|
metadata.errorCount++
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await db.put(metadata)
|
await db.put(metadata)
|
||||||
} catch (err) {
|
return metadata.errorCount
|
||||||
|
} catch (error: any) {
|
||||||
|
err = error
|
||||||
|
await helpers.wait(1000 + Math.random() * 1000)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
logging.logAlertWithInfo(
|
logging.logAlertWithInfo(
|
||||||
"Failed to write automation metadata",
|
"Failed to update error count in automation metadata",
|
||||||
db.name,
|
db.name,
|
||||||
automation._id!,
|
this.automation._id!,
|
||||||
err
|
err
|
||||||
)
|
)
|
||||||
}
|
return undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
updateExecutionOutput(id: string, stepId: string, inputs: any, outputs: any) {
|
updateExecutionOutput(id: string, stepId: string, inputs: any, outputs: any) {
|
||||||
|
@ -293,18 +274,6 @@ class Orchestrator {
|
||||||
await enrichBaseContext(this.context)
|
await enrichBaseContext(this.context)
|
||||||
this.context.user = this.currentUser
|
this.context.user = this.currentUser
|
||||||
|
|
||||||
let metadata
|
|
||||||
|
|
||||||
// check if this is a recurring automation,
|
|
||||||
if (isProdAppID(this.appId) && isRecurring(this.automation)) {
|
|
||||||
span?.addTags({ recurring: true })
|
|
||||||
metadata = await this.getMetadata()
|
|
||||||
const shouldStop = await this.checkIfShouldStop(metadata)
|
|
||||||
if (shouldStop) {
|
|
||||||
span?.addTags({ shouldStop: true })
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const start = performance.now()
|
const start = performance.now()
|
||||||
|
|
||||||
await this.executeSteps(this.automation.definition.steps)
|
await this.executeSteps(this.automation.definition.steps)
|
||||||
|
@ -320,23 +289,22 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
try {
|
let errorCount = 0
|
||||||
await storeLog(this.automation, this.executionOutput)
|
|
||||||
} catch (e: any) {
|
|
||||||
if (e.status === 413 && e.request?.data) {
|
|
||||||
// if content is too large we shouldn't log it
|
|
||||||
delete e.request.data
|
|
||||||
e.request.data = { message: "removed due to large size" }
|
|
||||||
}
|
|
||||||
logging.logAlert("Error writing automation log", e)
|
|
||||||
}
|
|
||||||
if (
|
if (
|
||||||
isProdAppID(this.appId) &&
|
isProdAppID(this.appId) &&
|
||||||
isRecurring(this.automation) &&
|
this.isCron() &&
|
||||||
metadata
|
isErrorInOutput(this.executionOutput)
|
||||||
) {
|
) {
|
||||||
await this.updateMetadata(metadata)
|
errorCount = (await this.incrementErrorCount()) || 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
||||||
|
await this.stopCron("errors")
|
||||||
|
span?.addTags({ shouldStop: true })
|
||||||
|
} else {
|
||||||
|
await storeLog(this.automation, this.executionOutput)
|
||||||
|
}
|
||||||
|
|
||||||
return this.executionOutput
|
return this.executionOutput
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -763,7 +731,7 @@ export async function executeInThread(
|
||||||
})) as AutomationResponse
|
})) as AutomationResponse
|
||||||
}
|
}
|
||||||
|
|
||||||
export const removeStalled = async (job: Job) => {
|
export const removeStalled = async (job: Job<AutomationData>) => {
|
||||||
const appId = job.data.event.appId
|
const appId = job.data.event.appId
|
||||||
if (!appId) {
|
if (!appId) {
|
||||||
throw new Error("Unable to execute, event doesn't contain app ID.")
|
throw new Error("Unable to execute, event doesn't contain app ID.")
|
||||||
|
|
|
@ -65,6 +65,7 @@ export interface ClearAutomationLogResponse {
|
||||||
|
|
||||||
export interface TriggerAutomationRequest {
|
export interface TriggerAutomationRequest {
|
||||||
fields: Record<string, any>
|
fields: Record<string, any>
|
||||||
|
timestamp?: number
|
||||||
// time in seconds
|
// time in seconds
|
||||||
timeout: number
|
timeout: number
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue