Type inMemoryQueue
This commit is contained in:
parent
3c944073eb
commit
770aff4f5e
|
@ -1,5 +1,6 @@
|
|||
import events from "events"
|
||||
import { timeout } from "../utils"
|
||||
import { Queue, QueueOptions, JobOptions } from "./queue"
|
||||
|
||||
/**
|
||||
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||
|
@ -24,9 +25,9 @@ function newJob(queue: string, message: any) {
|
|||
* It is relatively simple, using an event emitter internally to register when messages are available
|
||||
* to the consumers - in can support many inputs and many consumers.
|
||||
*/
|
||||
class InMemoryQueue {
|
||||
class InMemoryQueue implements Partial<Queue> {
|
||||
_name: string
|
||||
_opts?: any
|
||||
_opts?: QueueOptions
|
||||
_messages: any[]
|
||||
_emitter: EventEmitter
|
||||
_runCount: number
|
||||
|
@ -37,7 +38,7 @@ class InMemoryQueue {
|
|||
* @param opts This is not used by the in memory queue as there is no real use
|
||||
* case when in memory, but is the same API as Bull
|
||||
*/
|
||||
constructor(name: string, opts?: any) {
|
||||
constructor(name: string, opts?: QueueOptions) {
|
||||
this._name = name
|
||||
this._opts = opts
|
||||
this._messages = []
|
||||
|
@ -55,8 +56,12 @@ class InMemoryQueue {
|
|||
* note this is incredibly limited compared to Bull as in reality the Job would contain
|
||||
* a lot more information about the queue and current status of Bull cluster.
|
||||
*/
|
||||
process(func: any) {
|
||||
async process(func: any) {
|
||||
this._emitter.on("message", async () => {
|
||||
const delay = this._opts?.defaultJobOptions?.delay
|
||||
if (delay) {
|
||||
await new Promise<void>(r => setTimeout(() => r(), delay))
|
||||
}
|
||||
if (this._messages.length <= 0) {
|
||||
return
|
||||
}
|
||||
|
@ -70,7 +75,7 @@ class InMemoryQueue {
|
|||
}
|
||||
|
||||
async isReady() {
|
||||
return true
|
||||
return this as any
|
||||
}
|
||||
|
||||
// simply puts a message to the queue and emits to the queue for processing
|
||||
|
@ -83,27 +88,26 @@ class InMemoryQueue {
|
|||
* @param repeat serves no purpose for the import queue.
|
||||
*/
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
add(msg: any, repeat: boolean) {
|
||||
if (typeof msg !== "object") {
|
||||
async add(data: any, opts?: JobOptions) {
|
||||
if (typeof data !== "object") {
|
||||
throw "Queue only supports carrying JSON."
|
||||
}
|
||||
this._messages.push(newJob(this._name, msg))
|
||||
this._messages.push(newJob(this._name, data))
|
||||
this._addCount++
|
||||
this._emitter.emit("message")
|
||||
return {} as any
|
||||
}
|
||||
|
||||
/**
|
||||
* replicating the close function from bull, which waits for jobs to finish.
|
||||
*/
|
||||
async close() {
|
||||
return []
|
||||
}
|
||||
async close() {}
|
||||
|
||||
/**
|
||||
* This removes a cron which has been implemented, this is part of Bull API.
|
||||
* @param cronJobId The cron which is to be removed.
|
||||
*/
|
||||
removeRepeatableByKey(cronJobId: string) {
|
||||
async removeRepeatableByKey(cronJobId: string) {
|
||||
// TODO: implement for testing
|
||||
console.log(cronJobId)
|
||||
}
|
||||
|
@ -111,12 +115,12 @@ class InMemoryQueue {
|
|||
/**
|
||||
* Implemented for tests
|
||||
*/
|
||||
getRepeatableJobs() {
|
||||
async getRepeatableJobs() {
|
||||
return []
|
||||
}
|
||||
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
removeJobs(pattern: string) {
|
||||
async removeJobs(pattern: string) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
@ -128,12 +132,12 @@ class InMemoryQueue {
|
|||
}
|
||||
|
||||
async getJob() {
|
||||
return {}
|
||||
return null
|
||||
}
|
||||
|
||||
on() {
|
||||
// do nothing
|
||||
return this
|
||||
return this as any
|
||||
}
|
||||
|
||||
async waitForCompletion() {
|
||||
|
|
|
@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners"
|
|||
import { Duration } from "../utils"
|
||||
import * as timers from "../timers"
|
||||
|
||||
export { QueueOptions, Queue, JobOptions } from "bull"
|
||||
|
||||
// the queue lock is held for 5 minutes
|
||||
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
|
||||
// queue lock is refreshed every 30 seconds
|
||||
|
|
Loading…
Reference in New Issue