Merge branch 'master' into fix/build-memory-usage

This commit is contained in:
Michael Drury 2023-11-02 11:31:25 +00:00 committed by GitHub
commit 2c70cf57bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 4 deletions

View File

@ -30,6 +30,7 @@ export * as timers from "./timers"
export { default as env } from "./environment" export { default as env } from "./environment"
export * as blacklist from "./blacklist" export * as blacklist from "./blacklist"
export * as docUpdates from "./docUpdates" export * as docUpdates from "./docUpdates"
export * from "./utils/Duration"
export { SearchParams } from "./db" export { SearchParams } from "./db"
// Add context to tenancy for backwards compatibility // Add context to tenancy for backwards compatibility
// only do this for external usages to prevent internal // only do this for external usages to prevent internal

View File

@ -36,7 +36,7 @@ class InMemoryQueue {
* @param opts This is not used by the in memory queue as there is no real use * @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull * case when in memory, but is the same API as Bull
*/ */
constructor(name: string, opts = null) { constructor(name: string, opts?: any) {
this._name = name this._name = name
this._opts = opts this._opts = opts
this._messages = [] this._messages = []

View File

@ -2,11 +2,18 @@ import env from "../environment"
import { getRedisOptions } from "../redis/utils" import { getRedisOptions } from "../redis/utils"
import { JobQueue } from "./constants" import { JobQueue } from "./constants"
import InMemoryQueue from "./inMemoryQueue" import InMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull" import BullQueue, { QueueOptions } from "bull"
import { addListeners, StalledFn } from "./listeners" import { addListeners, StalledFn } from "./listeners"
import { Duration } from "../utils"
import * as timers from "../timers" import * as timers from "../timers"
import * as Redis from "ioredis"
const CLEANUP_PERIOD_MS = 60 * 1000 // the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
// queue lock is refreshed every 30 seconds
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
// cleanup the queue every 60 seconds
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
let cleanupInterval: NodeJS.Timeout let cleanupInterval: NodeJS.Timeout
@ -21,7 +28,14 @@ export function createQueue<T>(
opts: { removeStalledCb?: StalledFn } = {} opts: { removeStalledCb?: StalledFn } = {}
): BullQueue.Queue<T> { ): BullQueue.Queue<T> {
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions() const { opts: redisOpts, redisProtocolUrl } = getRedisOptions()
const queueConfig: any = redisProtocolUrl || { redis: redisOpts } const queueConfig: QueueOptions = {
redis: redisProtocolUrl! || (redisOpts as Redis.RedisOptions),
settings: {
maxStalledCount: 0,
lockDuration: QUEUE_LOCK_MS,
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
},
}
let queue: any let queue: any
if (!env.isTest()) { if (!env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig) queue = new BullQueue(jobQueue, queueConfig)

View File

@ -0,0 +1,49 @@
export enum DurationType {
MILLISECONDS = "milliseconds",
SECONDS = "seconds",
MINUTES = "minutes",
HOURS = "hours",
DAYS = "days",
}
const conversion: Record<DurationType, number> = {
milliseconds: 1,
seconds: 1000,
minutes: 60 * 1000,
hours: 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000,
}
export class Duration {
static convert(from: DurationType, to: DurationType, duration: number) {
const milliseconds = duration * conversion[from]
return milliseconds / conversion[to]
}
static from(from: DurationType, duration: number) {
return {
to: (to: DurationType) => {
return Duration.convert(from, to, duration)
},
toMs: () => {
return Duration.convert(from, DurationType.MILLISECONDS, duration)
},
}
}
static fromSeconds(duration: number) {
return Duration.from(DurationType.SECONDS, duration)
}
static fromMinutes(duration: number) {
return Duration.from(DurationType.MINUTES, duration)
}
static fromHours(duration: number) {
return Duration.from(DurationType.HOURS, duration)
}
static fromDays(duration: number) {
return Duration.from(DurationType.DAYS, duration)
}
}

View File

@ -1,3 +1,4 @@
export * from "./hashing" export * from "./hashing"
export * from "./utils" export * from "./utils"
export * from "./stringUtils" export * from "./stringUtils"
export * from "./Duration"

View File

@ -0,0 +1,19 @@
import { Duration, DurationType } from "../Duration"
describe("duration", () => {
it("should convert minutes to milliseconds", () => {
expect(Duration.fromMinutes(5).toMs()).toBe(300000)
})
it("should convert seconds to milliseconds", () => {
expect(Duration.fromSeconds(30).toMs()).toBe(30000)
})
it("should convert days to milliseconds", () => {
expect(Duration.fromDays(1).toMs()).toBe(86400000)
})
it("should convert minutes to days", () => {
expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1)
})
})