Merge branch 'master' into feature/automation-sidebar
This commit is contained in:
commit
fac979f2a0
|
@ -56,6 +56,7 @@
|
|||
"koa-pino-logger": "4.0.0",
|
||||
"lodash": "4.17.21",
|
||||
"node-fetch": "2.6.7",
|
||||
"object-sizeof": "2.6.1",
|
||||
"passport-google-oauth": "2.0.0",
|
||||
"passport-local": "1.0.0",
|
||||
"passport-oauth2-refresh": "^2.1.0",
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import { AnyDocument, Database, Document, DocumentType } from "@budibase/types"
|
||||
|
||||
import { JobQueue, Queue, createQueue } from "../queue"
|
||||
import { BudibaseQueue, JobQueue } from "../queue"
|
||||
import * as dbUtils from "../db"
|
||||
|
||||
interface ProcessDocMessage {
|
||||
|
@ -13,11 +13,11 @@ const PERSIST_MAX_ATTEMPTS = 100
|
|||
let processor: DocWritethroughProcessor | undefined
|
||||
|
||||
export class DocWritethroughProcessor {
|
||||
private static _queue: Queue
|
||||
private static _queue: BudibaseQueue<ProcessDocMessage>
|
||||
|
||||
public static get queue() {
|
||||
if (!DocWritethroughProcessor._queue) {
|
||||
DocWritethroughProcessor._queue = createQueue<ProcessDocMessage>(
|
||||
DocWritethroughProcessor._queue = new BudibaseQueue<ProcessDocMessage>(
|
||||
JobQueue.DOC_WRITETHROUGH_QUEUE,
|
||||
{
|
||||
jobOptions: {
|
||||
|
|
|
@ -18,7 +18,9 @@ import {
|
|||
const initialTime = Date.now()
|
||||
|
||||
async function waitForQueueCompletion() {
|
||||
await utils.queue.processMessages(DocWritethroughProcessor.queue)
|
||||
await utils.queue.processMessages(
|
||||
DocWritethroughProcessor.queue.getBullQueue()
|
||||
)
|
||||
}
|
||||
|
||||
beforeAll(() => utils.queue.useRealQueues())
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import BullQueue from "bull"
|
||||
import { createQueue, JobQueue } from "../../queue"
|
||||
import { BudibaseQueue, JobQueue } from "../../queue"
|
||||
import { Event, Identity } from "@budibase/types"
|
||||
|
||||
export interface EventPayload {
|
||||
|
@ -9,10 +8,19 @@ export interface EventPayload {
|
|||
timestamp?: string | number
|
||||
}
|
||||
|
||||
export let asyncEventQueue: BullQueue.Queue
|
||||
export let asyncEventQueue: BudibaseQueue<EventPayload>
|
||||
|
||||
export function init() {
|
||||
asyncEventQueue = createQueue<EventPayload>(JobQueue.SYSTEM_EVENT_QUEUE)
|
||||
asyncEventQueue = new BudibaseQueue<EventPayload>(
|
||||
JobQueue.SYSTEM_EVENT_QUEUE,
|
||||
{
|
||||
jobTags: (event: EventPayload) => {
|
||||
return {
|
||||
"event.name": event.event,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
export async function shutdown() {
|
||||
|
|
|
@ -8,24 +8,30 @@ import {
|
|||
} from "@budibase/types"
|
||||
import { EventProcessor } from "./types"
|
||||
import { getAppId, doInTenant, getTenantId } from "../../context"
|
||||
import BullQueue from "bull"
|
||||
import { createQueue, JobQueue } from "../../queue"
|
||||
import { BudibaseQueue, JobQueue } from "../../queue"
|
||||
import { isAudited } from "../../utils"
|
||||
import env from "../../environment"
|
||||
|
||||
export default class AuditLogsProcessor implements EventProcessor {
|
||||
static auditLogsEnabled = false
|
||||
static auditLogQueue: BullQueue.Queue<AuditLogQueueEvent>
|
||||
static auditLogQueue: BudibaseQueue<AuditLogQueueEvent>
|
||||
|
||||
// can't use constructor as need to return promise
|
||||
static init(fn: AuditLogFn) {
|
||||
AuditLogsProcessor.auditLogsEnabled = true
|
||||
const writeAuditLogs = fn
|
||||
AuditLogsProcessor.auditLogQueue = createQueue<AuditLogQueueEvent>(
|
||||
JobQueue.AUDIT_LOG
|
||||
AuditLogsProcessor.auditLogQueue = new BudibaseQueue<AuditLogQueueEvent>(
|
||||
JobQueue.AUDIT_LOG,
|
||||
{
|
||||
jobTags: (event: AuditLogQueueEvent) => {
|
||||
return {
|
||||
"event.name": event.event,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
return AuditLogsProcessor.auditLogQueue.process(async job => {
|
||||
return doInTenant(job.data.tenantId, async () => {
|
||||
await doInTenant(job.data.tenantId, async () => {
|
||||
let properties = job.data.properties
|
||||
if (properties.audited) {
|
||||
properties = {
|
||||
|
|
|
@ -96,12 +96,17 @@ export class InMemoryQueue<T = any> implements Partial<Queue<T>> {
|
|||
|
||||
let resp = func(message)
|
||||
|
||||
async function retryFunc(fnc: any) {
|
||||
async function retryFunc(fnc: any, attempt = 0) {
|
||||
try {
|
||||
await fnc
|
||||
} catch (e: any) {
|
||||
await helpers.wait(50)
|
||||
await retryFunc(func(message))
|
||||
attempt++
|
||||
if (attempt < 3) {
|
||||
await helpers.wait(100 * attempt)
|
||||
await retryFunc(func(message), attempt)
|
||||
} else {
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,10 +2,12 @@ import env from "../environment"
|
|||
import { getRedisOptions } from "../redis/utils"
|
||||
import { JobQueue } from "./constants"
|
||||
import InMemoryQueue from "./inMemoryQueue"
|
||||
import BullQueue, { QueueOptions, JobOptions } from "bull"
|
||||
import BullQueue, { Queue, QueueOptions, JobOptions, Job } from "bull"
|
||||
import { addListeners, StalledFn } from "./listeners"
|
||||
import { Duration } from "../utils"
|
||||
import * as timers from "../timers"
|
||||
import tracer from "dd-trace"
|
||||
import sizeof from "object-sizeof"
|
||||
|
||||
export type { QueueOptions, Queue, JobOptions } from "bull"
|
||||
|
||||
|
@ -15,7 +17,7 @@ const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
|
|||
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
|
||||
// cleanup the queue every 60 seconds
|
||||
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
|
||||
let QUEUES: BullQueue.Queue[] = []
|
||||
let QUEUES: Queue[] = []
|
||||
let cleanupInterval: NodeJS.Timeout
|
||||
|
||||
async function cleanup() {
|
||||
|
@ -25,49 +27,205 @@ async function cleanup() {
|
|||
}
|
||||
}
|
||||
|
||||
export function createQueue<T>(
|
||||
jobQueue: JobQueue,
|
||||
opts: {
|
||||
removeStalledCb?: StalledFn
|
||||
maxStalledCount?: number
|
||||
jobOptions?: JobOptions
|
||||
} = {}
|
||||
): BullQueue.Queue<T> {
|
||||
const redisOpts = getRedisOptions()
|
||||
const queueConfig: QueueOptions = {
|
||||
redis: redisOpts,
|
||||
settings: {
|
||||
maxStalledCount: opts.maxStalledCount ? opts.maxStalledCount : 0,
|
||||
lockDuration: QUEUE_LOCK_MS,
|
||||
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
|
||||
},
|
||||
async function withMetrics<T>(
|
||||
name: string,
|
||||
cb: () => Promise<T>,
|
||||
tags?: Record<string, string | number>
|
||||
): Promise<T> {
|
||||
const start = performance.now()
|
||||
try {
|
||||
const result = await cb()
|
||||
tracer.dogstatsd.increment(`${name}.success`, 1, tags)
|
||||
return result
|
||||
} catch (err) {
|
||||
tracer.dogstatsd.increment(`${name}.error`, 1, tags)
|
||||
throw err
|
||||
} finally {
|
||||
const durationMs = performance.now() - start
|
||||
tracer.dogstatsd.distribution(`${name}.duration.ms`, durationMs, tags)
|
||||
tracer.dogstatsd.increment(name, 1, tags)
|
||||
}
|
||||
if (opts.jobOptions) {
|
||||
queueConfig.defaultJobOptions = opts.jobOptions
|
||||
}
|
||||
|
||||
function jobOptsTags(opts: JobOptions) {
|
||||
return {
|
||||
"job.opts.attempts": opts.attempts,
|
||||
"job.opts.backoff": opts.backoff,
|
||||
"job.opts.delay": opts.delay,
|
||||
"job.opts.jobId": opts.jobId,
|
||||
"job.opts.lifo": opts.lifo,
|
||||
"job.opts.preventParsingData": opts.preventParsingData,
|
||||
"job.opts.priority": opts.priority,
|
||||
"job.opts.removeOnComplete": opts.removeOnComplete,
|
||||
"job.opts.removeOnFail": opts.removeOnFail,
|
||||
"job.opts.repeat": opts.repeat,
|
||||
"job.opts.stackTraceLimit": opts.stackTraceLimit,
|
||||
"job.opts.timeout": opts.timeout,
|
||||
}
|
||||
let queue: BullQueue.Queue<T>
|
||||
if (!env.isTest()) {
|
||||
queue = new BullQueue(jobQueue, queueConfig)
|
||||
} else if (
|
||||
process.env.BULL_TEST_REDIS_PORT &&
|
||||
!isNaN(+process.env.BULL_TEST_REDIS_PORT)
|
||||
) {
|
||||
queue = new BullQueue(jobQueue, {
|
||||
redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT },
|
||||
})
|
||||
} else {
|
||||
queue = new InMemoryQueue(jobQueue, queueConfig) as any
|
||||
}
|
||||
|
||||
function jobTags(job: Job) {
|
||||
return {
|
||||
"job.id": job.id,
|
||||
"job.attemptsMade": job.attemptsMade,
|
||||
"job.timestamp": job.timestamp,
|
||||
"job.data.sizeBytes": sizeof(job.data),
|
||||
...jobOptsTags(job.opts || {}),
|
||||
}
|
||||
addListeners(queue, jobQueue, opts?.removeStalledCb)
|
||||
QUEUES.push(queue)
|
||||
if (!cleanupInterval && !env.isTest()) {
|
||||
cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS)
|
||||
// fire off an initial cleanup
|
||||
cleanup().catch(err => {
|
||||
console.error(`Unable to cleanup ${jobQueue} initially - ${err}`)
|
||||
}
|
||||
|
||||
export interface BudibaseQueueOpts<T> {
|
||||
removeStalledCb?: StalledFn
|
||||
maxStalledCount?: number
|
||||
jobOptions?: JobOptions
|
||||
jobTags?: (job: T) => Record<string, any>
|
||||
}
|
||||
|
||||
export class BudibaseQueue<T> {
|
||||
private queue: Queue<T>
|
||||
private opts: BudibaseQueueOpts<T>
|
||||
private jobQueue: JobQueue
|
||||
|
||||
constructor(jobQueue: JobQueue, opts: BudibaseQueueOpts<T> = {}) {
|
||||
this.opts = opts
|
||||
this.jobQueue = jobQueue
|
||||
this.queue = this.initQueue()
|
||||
}
|
||||
|
||||
private initQueue() {
|
||||
const redisOpts = getRedisOptions()
|
||||
const queueConfig: QueueOptions = {
|
||||
redis: redisOpts,
|
||||
settings: {
|
||||
maxStalledCount: this.opts.maxStalledCount
|
||||
? this.opts.maxStalledCount
|
||||
: 0,
|
||||
lockDuration: QUEUE_LOCK_MS,
|
||||
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
|
||||
},
|
||||
}
|
||||
if (this.opts.jobOptions) {
|
||||
queueConfig.defaultJobOptions = this.opts.jobOptions
|
||||
}
|
||||
let queue: Queue<T>
|
||||
if (!env.isTest()) {
|
||||
queue = new BullQueue(this.jobQueue, queueConfig)
|
||||
} else if (
|
||||
process.env.BULL_TEST_REDIS_PORT &&
|
||||
!isNaN(+process.env.BULL_TEST_REDIS_PORT)
|
||||
) {
|
||||
queue = new BullQueue(this.jobQueue, {
|
||||
redis: { host: "localhost", port: +process.env.BULL_TEST_REDIS_PORT },
|
||||
})
|
||||
} else {
|
||||
queue = new InMemoryQueue(this.jobQueue, queueConfig) as any
|
||||
}
|
||||
|
||||
addListeners(queue, this.jobQueue, this.opts.removeStalledCb)
|
||||
QUEUES.push(queue)
|
||||
if (!cleanupInterval && !env.isTest()) {
|
||||
cleanupInterval = timers.set(cleanup, CLEANUP_PERIOD_MS)
|
||||
// fire off an initial cleanup
|
||||
cleanup().catch(err => {
|
||||
console.error(`Unable to cleanup ${this.jobQueue} initially - ${err}`)
|
||||
})
|
||||
}
|
||||
return queue
|
||||
}
|
||||
|
||||
getBullQueue() {
|
||||
return this.queue
|
||||
}
|
||||
|
||||
process(
|
||||
concurrency: number,
|
||||
cb: (job: Job<T>) => Promise<void>
|
||||
): Promise<void>
|
||||
process(cb: (job: Job<T>) => Promise<void>): Promise<void>
|
||||
process(...args: any[]) {
|
||||
let concurrency: number | undefined = undefined
|
||||
let cb: (job: Job<T>) => Promise<void>
|
||||
if (args.length === 2) {
|
||||
concurrency = args[0]
|
||||
cb = args[1]
|
||||
} else {
|
||||
cb = args[0]
|
||||
}
|
||||
|
||||
const wrappedCb = async (job: Job<T>) => {
|
||||
await tracer.trace("queue.process", async span => {
|
||||
// @ts-expect-error monkey patching the parent span id
|
||||
if (job.data._parentSpanContext) {
|
||||
// @ts-expect-error monkey patching the parent span id
|
||||
const parentContext = job.data._parentSpanContext
|
||||
const parent = {
|
||||
traceId: parentContext.traceId,
|
||||
spanId: parentContext.spanId,
|
||||
toTraceId: () => parentContext.traceId,
|
||||
toSpanId: () => parentContext.spanId,
|
||||
toTraceparent: () => "",
|
||||
}
|
||||
span.addLink(parent)
|
||||
}
|
||||
span.addTags({ "queue.name": this.jobQueue, ...jobTags(job) })
|
||||
if (this.opts.jobTags) {
|
||||
span.addTags(this.opts.jobTags(job.data))
|
||||
}
|
||||
|
||||
tracer.dogstatsd.distribution(
|
||||
"queue.process.sizeBytes",
|
||||
sizeof(job.data),
|
||||
this.metricTags()
|
||||
)
|
||||
await this.withMetrics("queue.process", () => cb(job))
|
||||
})
|
||||
}
|
||||
|
||||
if (concurrency) {
|
||||
return this.queue.process(concurrency, wrappedCb)
|
||||
} else {
|
||||
return this.queue.process(wrappedCb)
|
||||
}
|
||||
}
|
||||
|
||||
async add(data: T, opts?: JobOptions): Promise<Job<T>> {
|
||||
return await tracer.trace("queue.add", async span => {
|
||||
span.addTags({
|
||||
"queue.name": this.jobQueue,
|
||||
"job.data.sizeBytes": sizeof(data),
|
||||
...jobOptsTags(opts || {}),
|
||||
})
|
||||
if (this.opts.jobTags) {
|
||||
span.addTags(this.opts.jobTags(data))
|
||||
}
|
||||
// @ts-expect-error monkey patching the parent span id
|
||||
data._parentSpanContext = {
|
||||
traceId: span.context().toTraceId(),
|
||||
spanId: span.context().toSpanId(),
|
||||
}
|
||||
|
||||
tracer.dogstatsd.distribution(
|
||||
"queue.add.sizeBytes",
|
||||
sizeof(data),
|
||||
this.metricTags()
|
||||
)
|
||||
return await this.withMetrics("queue.add", () =>
|
||||
this.queue.add(data, opts)
|
||||
)
|
||||
})
|
||||
}
|
||||
return queue
|
||||
|
||||
private withMetrics<T>(name: string, cb: () => Promise<T>) {
|
||||
return withMetrics(name, cb, this.metricTags())
|
||||
}
|
||||
|
||||
private metricTags() {
|
||||
return { queueName: this.jobQueue }
|
||||
}
|
||||
|
||||
close() {
|
||||
return this.queue.close()
|
||||
}
|
||||
}
|
||||
|
||||
export async function shutdown() {
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit d9d2766261e02b11318ae76ad0d79698e14f89a9
|
||||
Subproject commit 538deccd0000090447d6cfe7f9716b16fdfefcc2
|
|
@ -16,7 +16,7 @@ export const router: Router = new Router()
|
|||
|
||||
router.get("/health", async ctx => {
|
||||
if (automationsEnabled()) {
|
||||
if (!(await automationQueue.isReady())) {
|
||||
if (!(await automationQueue.getBullQueue().isReady())) {
|
||||
ctx.status = 503
|
||||
return
|
||||
}
|
||||
|
|
|
@ -2276,6 +2276,25 @@ if (descriptions.length) {
|
|||
expect(updated.attachment.key).toBe(newAttachment.key)
|
||||
})
|
||||
|
||||
it("should allow updating signature row", async () => {
|
||||
const { table, row } = await coreAttachmentEnrichment(
|
||||
{
|
||||
signature: {
|
||||
type: FieldType.SIGNATURE_SINGLE,
|
||||
name: "signature",
|
||||
constraints: { presence: false },
|
||||
},
|
||||
},
|
||||
"signature",
|
||||
`${uuid.v4()}.png`
|
||||
)
|
||||
|
||||
const newSignature = generateAttachment(`${uuid.v4()}.png`)
|
||||
row["signature"] = newSignature
|
||||
const updated = await config.api.row.save(table._id!, row)
|
||||
expect(updated.signature.key).toBe(newSignature.key)
|
||||
})
|
||||
|
||||
it("should allow enriching attachment list rows", async () => {
|
||||
await coreAttachmentEnrichment(
|
||||
{
|
||||
|
|
|
@ -13,7 +13,7 @@ export type AppMigrationJob = {
|
|||
|
||||
// always create app migration queue - so that events can be pushed and read from it
|
||||
// across the different api and automation services
|
||||
const appMigrationQueue = queue.createQueue<AppMigrationJob>(
|
||||
const appMigrationQueue = new queue.BudibaseQueue<AppMigrationJob>(
|
||||
queue.JobQueue.APP_MIGRATION,
|
||||
{
|
||||
jobOptions: {
|
||||
|
|
|
@ -7,25 +7,36 @@ import { getAppMigrationQueue } from "../appMigrations/queue"
|
|||
import { createBullBoard } from "@bull-board/api"
|
||||
import { AutomationData } from "@budibase/types"
|
||||
|
||||
export const automationQueue = queue.createQueue<AutomationData>(
|
||||
export const automationQueue = new queue.BudibaseQueue<AutomationData>(
|
||||
queue.JobQueue.AUTOMATION,
|
||||
{ removeStalledCb: automation.removeStalled }
|
||||
{
|
||||
removeStalledCb: automation.removeStalled,
|
||||
jobTags: (job: AutomationData) => {
|
||||
return {
|
||||
"automation.id": job.automation._id,
|
||||
"automation.name": job.automation.name,
|
||||
"automation.appId": job.automation.appId,
|
||||
"automation.createdAt": job.automation.createdAt,
|
||||
"automation.trigger": job.automation.definition.trigger.stepId,
|
||||
}
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
const PATH_PREFIX = "/bulladmin"
|
||||
|
||||
export async function init() {
|
||||
// Set up queues for bull board admin
|
||||
const queues = [new BullAdapter(automationQueue)]
|
||||
const queues = [new BullAdapter(automationQueue.getBullQueue())]
|
||||
|
||||
const backupQueue = backups.getBackupQueue()
|
||||
if (backupQueue) {
|
||||
queues.push(new BullAdapter(backupQueue))
|
||||
queues.push(new BullAdapter(backupQueue.getBullQueue()))
|
||||
}
|
||||
|
||||
const appMigrationQueue = getAppMigrationQueue()
|
||||
if (appMigrationQueue) {
|
||||
queues.push(new BullAdapter(appMigrationQueue))
|
||||
queues.push(new BullAdapter(appMigrationQueue.getBullQueue()))
|
||||
}
|
||||
|
||||
const serverAdapter = new KoaAdapter()
|
||||
|
|
|
@ -22,7 +22,7 @@ export function afterAll() {
|
|||
}
|
||||
|
||||
export function getTestQueue(): queue.InMemoryQueue<AutomationData> {
|
||||
return getQueue() as unknown as queue.InMemoryQueue<AutomationData>
|
||||
return getQueue().getBullQueue() as unknown as queue.InMemoryQueue<AutomationData>
|
||||
}
|
||||
|
||||
export function triggerCron(message: Job<AutomationData>) {
|
||||
|
@ -48,7 +48,7 @@ export async function runInProd(fn: any) {
|
|||
|
||||
export async function captureAllAutomationRemovals(f: () => Promise<unknown>) {
|
||||
const messages: Job<AutomationData>[] = []
|
||||
const queue = getQueue()
|
||||
const queue = getQueue().getBullQueue()
|
||||
|
||||
const messageListener = async (message: Job<AutomationData>) => {
|
||||
messages.push(message)
|
||||
|
@ -82,7 +82,7 @@ export async function captureAutomationRemovals(
|
|||
|
||||
export async function captureAllAutomationMessages(f: () => Promise<unknown>) {
|
||||
const messages: Job<AutomationData>[] = []
|
||||
const queue = getQueue()
|
||||
const queue = getQueue().getBullQueue()
|
||||
|
||||
const messageListener = async (message: Job<AutomationData>) => {
|
||||
messages.push(message)
|
||||
|
@ -122,7 +122,7 @@ export async function captureAllAutomationResults(
|
|||
f: () => Promise<unknown>
|
||||
): Promise<queue.TestQueueMessage<AutomationData>[]> {
|
||||
const runs: queue.TestQueueMessage<AutomationData>[] = []
|
||||
const queue = getQueue()
|
||||
const queue = getQueue().getBullQueue()
|
||||
let messagesOutstanding = 0
|
||||
|
||||
const completedListener = async (
|
||||
|
|
|
@ -115,12 +115,14 @@ export async function updateTestHistory(
|
|||
// end the repetition and the job itself
|
||||
export async function disableAllCrons(appId: any) {
|
||||
const promises = []
|
||||
const jobs = await automationQueue.getRepeatableJobs()
|
||||
const jobs = await automationQueue.getBullQueue().getRepeatableJobs()
|
||||
for (let job of jobs) {
|
||||
if (job.key.includes(`${appId}_cron`)) {
|
||||
promises.push(automationQueue.removeRepeatableByKey(job.key))
|
||||
promises.push(
|
||||
automationQueue.getBullQueue().removeRepeatableByKey(job.key)
|
||||
)
|
||||
if (job.id) {
|
||||
promises.push(automationQueue.removeJobs(job.id))
|
||||
promises.push(automationQueue.getBullQueue().removeJobs(job.id))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -129,10 +131,10 @@ export async function disableAllCrons(appId: any) {
|
|||
}
|
||||
|
||||
export async function disableCronById(jobId: JobId) {
|
||||
const jobs = await automationQueue.getRepeatableJobs()
|
||||
const jobs = await automationQueue.getBullQueue().getRepeatableJobs()
|
||||
for (const job of jobs) {
|
||||
if (job.id === jobId) {
|
||||
await automationQueue.removeRepeatableByKey(job.key)
|
||||
await automationQueue.getBullQueue().removeRepeatableByKey(job.key)
|
||||
}
|
||||
}
|
||||
console.log(`jobId=${jobId} disabled`)
|
||||
|
|
|
@ -371,8 +371,7 @@ class SqlServerIntegration extends Sql implements DatasourcePlus {
|
|||
? `${query.sql}; SELECT SCOPE_IDENTITY() AS id;`
|
||||
: query.sql
|
||||
this.log(sql, query.bindings)
|
||||
const resp = await request.query(sql)
|
||||
return resp
|
||||
return await request.query(sql)
|
||||
} catch (err: any) {
|
||||
let readableMessage = getReadableErrorMessage(
|
||||
SourceName.SQL_SERVER,
|
||||
|
|
|
@ -190,7 +190,7 @@ describe("oauth2 utils", () => {
|
|||
|
||||
await config.doInContext(config.appId, () => getToken(oauthConfig._id))
|
||||
await testUtils.queue.processMessages(
|
||||
cache.docWritethrough.DocWritethroughProcessor.queue
|
||||
cache.docWritethrough.DocWritethroughProcessor.queue.getBullQueue()
|
||||
)
|
||||
|
||||
const usageLog = await config.doInContext(config.appId, () =>
|
||||
|
@ -216,7 +216,7 @@ describe("oauth2 utils", () => {
|
|||
config.doInContext(config.appId, () => getToken(oauthConfig._id))
|
||||
).rejects.toThrow()
|
||||
await testUtils.queue.processMessages(
|
||||
cache.docWritethrough.DocWritethroughProcessor.queue
|
||||
cache.docWritethrough.DocWritethroughProcessor.queue.getBullQueue()
|
||||
)
|
||||
|
||||
const usageLog = await config.doInContext(config.appId, () =>
|
||||
|
@ -247,7 +247,7 @@ describe("oauth2 utils", () => {
|
|||
getToken(oauthConfig._id)
|
||||
)
|
||||
await testUtils.queue.processMessages(
|
||||
cache.docWritethrough.DocWritethroughProcessor.queue
|
||||
cache.docWritethrough.DocWritethroughProcessor.queue.getBullQueue()
|
||||
)
|
||||
|
||||
for (const appId of [config.appId, config.prodAppId]) {
|
||||
|
|
|
@ -30,4 +30,12 @@ export const SWITCHABLE_TYPES: SwitchableTypes = {
|
|||
FieldType.LONGFORM,
|
||||
],
|
||||
[FieldType.NUMBER]: [FieldType.NUMBER, FieldType.BOOLEAN],
|
||||
[FieldType.JSON]: [
|
||||
FieldType.JSON,
|
||||
FieldType.ARRAY,
|
||||
FieldType.ATTACHMENTS,
|
||||
FieldType.ATTACHMENT_SINGLE,
|
||||
FieldType.BB_REFERENCE,
|
||||
FieldType.SIGNATURE_SINGLE,
|
||||
],
|
||||
}
|
||||
|
|
|
@ -128,6 +128,7 @@ export enum FieldType {
|
|||
export const JsonTypes = [
|
||||
FieldType.ATTACHMENT_SINGLE,
|
||||
FieldType.ATTACHMENTS,
|
||||
FieldType.SIGNATURE_SINGLE,
|
||||
// only BB_REFERENCE is JSON, it's an array, BB_REFERENCE_SINGLE is a string type
|
||||
FieldType.BB_REFERENCE,
|
||||
FieldType.JSON,
|
||||
|
|
Loading…
Reference in New Issue