Merge pull request #13294 from Budibase/BUDI-8100/budi-cli-broken-with-redis-errors-and-hanging

[Fix] CLI broken with redis errors and hanging
This commit is contained in:
Adria Navarro 2024-03-19 16:14:55 +01:00 committed by GitHub
commit 1da50a7400
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 15 deletions

View File

@ -1,6 +1,6 @@
import { AnyDocument, Database } from "@budibase/types"
import { JobQueue, createQueue } from "../queue"
import { JobQueue, Queue, createQueue } from "../queue"
import * as dbUtils from "../db"
interface ProcessDocMessage {
@ -12,18 +12,26 @@ interface ProcessDocMessage {
const PERSIST_MAX_ATTEMPTS = 100
let processor: DocWritethroughProcessor | undefined
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
export class DocWritethroughProcessor {
private static _queue: Queue
public static get queue() {
if (!DocWritethroughProcessor._queue) {
DocWritethroughProcessor._queue = createQueue<ProcessDocMessage>(
JobQueue.DOC_WRITETHROUGH_QUEUE,
{
jobOptions: {
attempts: PERSIST_MAX_ATTEMPTS,
},
}
)
)
}
return DocWritethroughProcessor._queue
}
class DocWritethroughProcessor {
init() {
docWritethroughProcessorQueue.process(async message => {
DocWritethroughProcessor.queue.process(async message => {
try {
await this.persistToDb(message.data)
} catch (err: any) {
@ -76,7 +84,7 @@ export class DocWritethrough {
}
async patch(data: Record<string, any>) {
await docWritethroughProcessorQueue.add({
await DocWritethroughProcessor.queue.add({
dbName: this.db.name,
docId: this.docId,
data,

View File

@ -6,7 +6,7 @@ import { getDB } from "../../db"
import {
DocWritethrough,
docWritethroughProcessorQueue,
DocWritethroughProcessor,
init,
} from "../docWritethrough"
@ -15,7 +15,7 @@ import InMemoryQueue from "../../queue/inMemoryQueue"
const initialTime = Date.now()
async function waitForQueueCompletion() {
const queue: InMemoryQueue = docWritethroughProcessorQueue as never
const queue: InMemoryQueue = DocWritethroughProcessor.queue as never
await queue.waitForCompletion()
}
@ -235,7 +235,7 @@ describe("docWritethrough", () => {
return acc
}, {})
}
const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add")
const queueMessageSpy = jest.spyOn(DocWritethroughProcessor.queue, "add")
await config.doInTenant(async () => {
let patches = await parallelPatch(5)

View File

@ -11,6 +11,7 @@
"types": ["node", "jest"],
"outDir": "dist",
"skipLibCheck": true,
"baseUrl": ".",
"paths": {
"@budibase/types": ["../types/src"],
"@budibase/backend-core": ["../backend-core/src"],