Updating bull parameters to see if this helps with queue stalling.

This commit is contained in:
mike12345567 2023-10-31 16:56:19 +00:00
parent a4ce8983e7
commit df6f8dad7e
2 changed files with 16 additions and 3 deletions

View File

@ -36,7 +36,7 @@ class InMemoryQueue {
* @param opts This is not used by the in memory queue as there is no real use * @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull * case when in memory, but is the same API as Bull
*/ */
constructor(name: string, opts = null) { constructor(name: string, opts?: any) {
this._name = name this._name = name
this._opts = opts this._opts = opts
this._messages = [] this._messages = []

View File

@ -2,10 +2,16 @@ import env from "../environment"
import { getRedisOptions } from "../redis/utils" import { getRedisOptions } from "../redis/utils"
import { JobQueue } from "./constants" import { JobQueue } from "./constants"
import InMemoryQueue from "./inMemoryQueue" import InMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull" import BullQueue, { QueueOptions } from "bull"
import { addListeners, StalledFn } from "./listeners" import { addListeners, StalledFn } from "./listeners"
import * as timers from "../timers" import * as timers from "../timers"
import * as Redis from "ioredis"
// the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = 300000
// queue lock is refreshed every 30 seconds
const QUEUE_LOCK_RENEW_INTERNAL_MS = 30000
// cleanup the queue every 60 seconds
const CLEANUP_PERIOD_MS = 60 * 1000 const CLEANUP_PERIOD_MS = 60 * 1000
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
let cleanupInterval: NodeJS.Timeout let cleanupInterval: NodeJS.Timeout
@ -21,7 +27,14 @@ export function createQueue<T>(
opts: { removeStalledCb?: StalledFn } = {} opts: { removeStalledCb?: StalledFn } = {}
): BullQueue.Queue<T> { ): BullQueue.Queue<T> {
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions() const { opts: redisOpts, redisProtocolUrl } = getRedisOptions()
const queueConfig: any = redisProtocolUrl || { redis: redisOpts } const queueConfig: QueueOptions = {
redis: redisProtocolUrl! || (redisOpts as Redis.RedisOptions),
settings: {
maxStalledCount: 0,
lockDuration: QUEUE_LOCK_MS,
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
},
}
let queue: any let queue: any
if (!env.isTest()) { if (!env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig) queue = new BullQueue(jobQueue, queueConfig)