diff --git a/packages/backend-core/src/index.ts b/packages/backend-core/src/index.ts index ffffd8240a..c7cf9f56cc 100644 --- a/packages/backend-core/src/index.ts +++ b/packages/backend-core/src/index.ts @@ -30,6 +30,7 @@ export * as timers from "./timers" export { default as env } from "./environment" export * as blacklist from "./blacklist" export * as docUpdates from "./docUpdates" +export * from "./utils/Duration" export { SearchParams } from "./db" // Add context to tenancy for backwards compatibility // only do this for external usages to prevent internal diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index af2ec6dbaa..a8add7ecb6 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -36,7 +36,7 @@ class InMemoryQueue { * @param opts This is not used by the in memory queue as there is no real use * case when in memory, but is the same API as Bull */ - constructor(name: string, opts = null) { + constructor(name: string, opts?: any) { this._name = name this._opts = opts this._messages = [] diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 0658147709..c0d1861de3 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -2,11 +2,18 @@ import env from "../environment" import { getRedisOptions } from "../redis/utils" import { JobQueue } from "./constants" import InMemoryQueue from "./inMemoryQueue" -import BullQueue from "bull" +import BullQueue, { QueueOptions } from "bull" import { addListeners, StalledFn } from "./listeners" +import { Duration } from "../utils" import * as timers from "../timers" +import * as Redis from "ioredis" -const CLEANUP_PERIOD_MS = 60 * 1000 +// the queue lock is held for 5 minutes +const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() +// queue lock is refreshed every 30 seconds +const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs() +// cleanup the queue every 60 seconds +const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs() let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let cleanupInterval: NodeJS.Timeout @@ -21,7 +28,14 @@ export function createQueue( opts: { removeStalledCb?: StalledFn } = {} ): BullQueue.Queue { const { opts: redisOpts, redisProtocolUrl } = getRedisOptions() - const queueConfig: any = redisProtocolUrl || { redis: redisOpts } + const queueConfig: QueueOptions = { + redis: redisProtocolUrl! || (redisOpts as Redis.RedisOptions), + settings: { + maxStalledCount: 0, + lockDuration: QUEUE_LOCK_MS, + lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS, + }, + } let queue: any if (!env.isTest()) { queue = new BullQueue(jobQueue, queueConfig) diff --git a/packages/backend-core/src/utils/Duration.ts b/packages/backend-core/src/utils/Duration.ts new file mode 100644 index 0000000000..f376c2f7c7 --- /dev/null +++ b/packages/backend-core/src/utils/Duration.ts @@ -0,0 +1,49 @@ +export enum DurationType { + MILLISECONDS = "milliseconds", + SECONDS = "seconds", + MINUTES = "minutes", + HOURS = "hours", + DAYS = "days", +} + +const conversion: Record = { + milliseconds: 1, + seconds: 1000, + minutes: 60 * 1000, + hours: 60 * 60 * 1000, + days: 24 * 60 * 60 * 1000, +} + +export class Duration { + static convert(from: DurationType, to: DurationType, duration: number) { + const milliseconds = duration * conversion[from] + return milliseconds / conversion[to] + } + + static from(from: DurationType, duration: number) { + return { + to: (to: DurationType) => { + return Duration.convert(from, to, duration) + }, + toMs: () => { + return Duration.convert(from, DurationType.MILLISECONDS, duration) + }, + } + } + + static fromSeconds(duration: number) { + return Duration.from(DurationType.SECONDS, duration) + } + + static fromMinutes(duration: number) { + return Duration.from(DurationType.MINUTES, duration) + } + + static fromHours(duration: number) { + return Duration.from(DurationType.HOURS, duration) + } + + static fromDays(duration: number) { + return Duration.from(DurationType.DAYS, duration) + } +} diff --git a/packages/backend-core/src/utils/index.ts b/packages/backend-core/src/utils/index.ts index 318a7f13ba..ac17227459 100644 --- a/packages/backend-core/src/utils/index.ts +++ b/packages/backend-core/src/utils/index.ts @@ -1,3 +1,4 @@ export * from "./hashing" export * from "./utils" export * from "./stringUtils" +export * from "./Duration" diff --git a/packages/backend-core/src/utils/tests/Duration.spec.ts b/packages/backend-core/src/utils/tests/Duration.spec.ts new file mode 100644 index 0000000000..46b996f788 --- /dev/null +++ b/packages/backend-core/src/utils/tests/Duration.spec.ts @@ -0,0 +1,19 @@ +import { Duration, DurationType } from "../Duration" + +describe("duration", () => { + it("should convert minutes to milliseconds", () => { + expect(Duration.fromMinutes(5).toMs()).toBe(300000) + }) + + it("should convert seconds to milliseconds", () => { + expect(Duration.fromSeconds(30).toMs()).toBe(30000) + }) + + it("should convert days to milliseconds", () => { + expect(Duration.fromDays(1).toMs()).toBe(86400000) + }) + + it("should convert minutes to days", () => { + expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1) + }) +})