Merge pull request #12270 from Budibase/revert-12237-fix/update-bull-queue-parameters
Reverting changes to bull parameters
This commit is contained in:
commit
2ffb4b96cb
|
@ -30,7 +30,6 @@ export * as timers from "./timers"
|
||||||
export { default as env } from "./environment"
|
export { default as env } from "./environment"
|
||||||
export * as blacklist from "./blacklist"
|
export * as blacklist from "./blacklist"
|
||||||
export * as docUpdates from "./docUpdates"
|
export * as docUpdates from "./docUpdates"
|
||||||
export * from "./utils/Duration"
|
|
||||||
export { SearchParams } from "./db"
|
export { SearchParams } from "./db"
|
||||||
// Add context to tenancy for backwards compatibility
|
// Add context to tenancy for backwards compatibility
|
||||||
// only do this for external usages to prevent internal
|
// only do this for external usages to prevent internal
|
||||||
|
|
|
@ -36,7 +36,7 @@ class InMemoryQueue {
|
||||||
* @param opts This is not used by the in memory queue as there is no real use
|
* @param opts This is not used by the in memory queue as there is no real use
|
||||||
* case when in memory, but is the same API as Bull
|
* case when in memory, but is the same API as Bull
|
||||||
*/
|
*/
|
||||||
constructor(name: string, opts?: any) {
|
constructor(name: string, opts = null) {
|
||||||
this._name = name
|
this._name = name
|
||||||
this._opts = opts
|
this._opts = opts
|
||||||
this._messages = []
|
this._messages = []
|
||||||
|
|
|
@ -2,18 +2,11 @@ import env from "../environment"
|
||||||
import { getRedisOptions } from "../redis/utils"
|
import { getRedisOptions } from "../redis/utils"
|
||||||
import { JobQueue } from "./constants"
|
import { JobQueue } from "./constants"
|
||||||
import InMemoryQueue from "./inMemoryQueue"
|
import InMemoryQueue from "./inMemoryQueue"
|
||||||
import BullQueue, { QueueOptions } from "bull"
|
import BullQueue from "bull"
|
||||||
import { addListeners, StalledFn } from "./listeners"
|
import { addListeners, StalledFn } from "./listeners"
|
||||||
import { Duration } from "../utils"
|
|
||||||
import * as timers from "../timers"
|
import * as timers from "../timers"
|
||||||
import * as Redis from "ioredis"
|
|
||||||
|
|
||||||
// the queue lock is held for 5 minutes
|
const CLEANUP_PERIOD_MS = 60 * 1000
|
||||||
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
|
|
||||||
// queue lock is refreshed every 30 seconds
|
|
||||||
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
|
|
||||||
// cleanup the queue every 60 seconds
|
|
||||||
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
|
|
||||||
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
|
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
|
||||||
let cleanupInterval: NodeJS.Timeout
|
let cleanupInterval: NodeJS.Timeout
|
||||||
|
|
||||||
|
@ -28,14 +21,7 @@ export function createQueue<T>(
|
||||||
opts: { removeStalledCb?: StalledFn } = {}
|
opts: { removeStalledCb?: StalledFn } = {}
|
||||||
): BullQueue.Queue<T> {
|
): BullQueue.Queue<T> {
|
||||||
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions()
|
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions()
|
||||||
const queueConfig: QueueOptions = {
|
const queueConfig: any = redisProtocolUrl || { redis: redisOpts }
|
||||||
redis: redisProtocolUrl! || (redisOpts as Redis.RedisOptions),
|
|
||||||
settings: {
|
|
||||||
maxStalledCount: 0,
|
|
||||||
lockDuration: QUEUE_LOCK_MS,
|
|
||||||
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
let queue: any
|
let queue: any
|
||||||
if (!env.isTest()) {
|
if (!env.isTest()) {
|
||||||
queue = new BullQueue(jobQueue, queueConfig)
|
queue = new BullQueue(jobQueue, queueConfig)
|
||||||
|
|
|
@ -1,49 +0,0 @@
|
||||||
export enum DurationType {
|
|
||||||
MILLISECONDS = "milliseconds",
|
|
||||||
SECONDS = "seconds",
|
|
||||||
MINUTES = "minutes",
|
|
||||||
HOURS = "hours",
|
|
||||||
DAYS = "days",
|
|
||||||
}
|
|
||||||
|
|
||||||
const conversion: Record<DurationType, number> = {
|
|
||||||
milliseconds: 1,
|
|
||||||
seconds: 1000,
|
|
||||||
minutes: 60 * 1000,
|
|
||||||
hours: 60 * 60 * 1000,
|
|
||||||
days: 24 * 60 * 60 * 1000,
|
|
||||||
}
|
|
||||||
|
|
||||||
export class Duration {
|
|
||||||
static convert(from: DurationType, to: DurationType, duration: number) {
|
|
||||||
const milliseconds = duration * conversion[from]
|
|
||||||
return milliseconds / conversion[to]
|
|
||||||
}
|
|
||||||
|
|
||||||
static from(from: DurationType, duration: number) {
|
|
||||||
return {
|
|
||||||
to: (to: DurationType) => {
|
|
||||||
return Duration.convert(from, to, duration)
|
|
||||||
},
|
|
||||||
toMs: () => {
|
|
||||||
return Duration.convert(from, DurationType.MILLISECONDS, duration)
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static fromSeconds(duration: number) {
|
|
||||||
return Duration.from(DurationType.SECONDS, duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
static fromMinutes(duration: number) {
|
|
||||||
return Duration.from(DurationType.MINUTES, duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
static fromHours(duration: number) {
|
|
||||||
return Duration.from(DurationType.HOURS, duration)
|
|
||||||
}
|
|
||||||
|
|
||||||
static fromDays(duration: number) {
|
|
||||||
return Duration.from(DurationType.DAYS, duration)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,4 +1,3 @@
|
||||||
export * from "./hashing"
|
export * from "./hashing"
|
||||||
export * from "./utils"
|
export * from "./utils"
|
||||||
export * from "./stringUtils"
|
export * from "./stringUtils"
|
||||||
export * from "./Duration"
|
|
||||||
|
|
|
@ -1,19 +0,0 @@
|
||||||
import { Duration, DurationType } from "../Duration"
|
|
||||||
|
|
||||||
describe("duration", () => {
|
|
||||||
it("should convert minutes to milliseconds", () => {
|
|
||||||
expect(Duration.fromMinutes(5).toMs()).toBe(300000)
|
|
||||||
})
|
|
||||||
|
|
||||||
it("should convert seconds to milliseconds", () => {
|
|
||||||
expect(Duration.fromSeconds(30).toMs()).toBe(30000)
|
|
||||||
})
|
|
||||||
|
|
||||||
it("should convert days to milliseconds", () => {
|
|
||||||
expect(Duration.fromDays(1).toMs()).toBe(86400000)
|
|
||||||
})
|
|
||||||
|
|
||||||
it("should convert minutes to days", () => {
|
|
||||||
expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1)
|
|
||||||
})
|
|
||||||
})
|
|
Loading…
Reference in New Issue