From 4ce85cde1afd34872bfe7c401d73cbf77651a660 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Wed, 6 Mar 2024 14:07:39 +0100 Subject: [PATCH] Simplify, use only queues --- .../backend-core/src/cache/docWritethrough.ts | 154 ++++-------------- .../src/cache/tests/docWritethrough.spec.ts | 126 ++++++-------- 2 files changed, 79 insertions(+), 201 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index af3df11a9c..cee272cef6 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -1,100 +1,55 @@ import { AnyDocument, Database, LockName, LockType } from "@budibase/types" -import BaseCache from "./base" -import { getDocWritethroughClient } from "../redis/init" import { JobQueue, createQueue } from "../queue" import * as dbUtils from "../db" -import { Duration, newid } from "../utils" -import { locks } from ".." - -let CACHE: BaseCache | null = null -async function getCache() { - if (!CACHE) { - const client = await getDocWritethroughClient() - CACHE = new BaseCache(client) - } - return CACHE -} +import { string } from "yargs" +import { db } from ".." +import { locks } from "../redis" +import { Duration } from "../utils" interface ProcessDocMessage { dbName: string docId: string - cacheKeyPrefix: string - messageId: string + + data: Record } export const docWritethroughProcessorQueue = createQueue( - JobQueue.DOC_WRITETHROUGH_QUEUE, - { - jobOptions: { - attempts: 5, - backoff: { - type: "fixed", - delay: 1000, - }, - }, - } + JobQueue.DOC_WRITETHROUGH_QUEUE ) class DocWritethroughProcessor { init() { docWritethroughProcessorQueue.process(async message => { - const { cacheKeyPrefix, messageId } = message.data - - const cache = await getCache() - const latestMessageId = await cache.get( - REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID - ) - if (messageId !== latestMessageId) { - // Nothing to do, another message overrode it - return - } - - const lockResponse = await locks.doWithLock( + const result = await locks.doWithLock( { - type: LockType.TRY_TWICE, + type: LockType.DEFAULT, name: LockName.PERSIST_DOC_WRITETHROUGH, - resource: cacheKeyPrefix, + resource: `${message.data.dbName}:${message.data.docId}`, ttl: Duration.fromSeconds(60).toMs(), }, async () => { - const latestMessageId = await cache.get( - REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID - ) - if (messageId !== latestMessageId) { - // Nothing to do, another message overrode it - return - } - - await this.persistToDb(cache, message.data) - console.log("DocWritethrough persisted", { data: message.data }) - - await cache.deleteIfValue( - REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID, - latestMessageId - ) + await this.persistToDb(message.data) } ) - - if (!lockResponse.executed) { - throw new Error(`Ignoring redlock conflict in write-through cache`) + if (!result.executed) { + throw new Error( + `Error persisting docWritethrough message: ${message.id}` + ) } }) return this } - private async persistToDb( - cache: BaseCache, - { - dbName, - docId, - cacheKeyPrefix, - }: { - dbName: string - docId: string - cacheKeyPrefix: string - } - ) { + private async persistToDb({ + dbName, + docId, + data, + }: { + dbName: string + docId: string + data: Record + }) { const db = dbUtils.getDB(dbName) let doc: AnyDocument | undefined try { @@ -103,19 +58,8 @@ class DocWritethroughProcessor { doc = { _id: docId } } - const keysToPersist = await cache.keys( - REDIS_KEYS(cacheKeyPrefix).DATA.GET_ALL - ) - for (const key of keysToPersist) { - const data = await cache.get(key, { useTenancy: false }) - doc[data.key] = data.value - } - + doc = { ...doc, ...data } await db.put(doc) - - for (const key of keysToPersist) { - await cache.delete(key, { useTenancy: false }) - } } } @@ -124,15 +68,10 @@ export const processor = new DocWritethroughProcessor().init() export class DocWritethrough { private db: Database private _docId: string - private writeRateMs: number - private cacheKeyPrefix: string - - constructor(db: Database, docId: string, writeRateMs: number) { + constructor(db: Database, docId: string) { this.db = db this._docId = docId - this.writeRateMs = writeRateMs - this.cacheKeyPrefix = `${this.db.name}:${this.docId}` } get docId() { @@ -140,41 +79,10 @@ export class DocWritethrough { } async patch(data: Record) { - const cache = await getCache() - - await this.storeToCache(cache, data) - const messageId = newid() - await cache.store( - REDIS_KEYS(this.cacheKeyPrefix).LATEST_MESSAGE_ID, - messageId - ) - - docWritethroughProcessorQueue.add( - { - dbName: this.db.name, - docId: this.docId, - cacheKeyPrefix: this.cacheKeyPrefix, - messageId, - }, - { - delay: this.writeRateMs, - } - ) - } - - private async storeToCache(cache: BaseCache, data: Record) { - data = Object.entries(data).reduce((acc, [key, value]) => { - acc[REDIS_KEYS(this.cacheKeyPrefix).DATA.VALUE(key)] = { key, value } - return acc - }, {} as Record) - await cache.bulkStore(data, null) + await docWritethroughProcessorQueue.add({ + dbName: this.db.name, + docId: this.docId, + data, + }) } } - -const REDIS_KEYS = (prefix: string) => ({ - DATA: { - VALUE: (key: string) => prefix + ":data:" + key, - GET_ALL: prefix + ":data:*", - }, - LATEST_MESSAGE_ID: prefix + ":info:latestMessageId", -}) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index b909f4624f..9beb25df93 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -1,3 +1,5 @@ +import tk from "timekeeper" + import _ from "lodash" import { DBTestConfiguration, generator, structures } from "../../../tests" import { getDB } from "../../db" @@ -7,24 +9,11 @@ import { DocWritethrough, processor } from "../docWritethrough" import InMemoryQueue from "../../queue/inMemoryQueue" import { docWritethroughProcessorQueue } from "../docWritethrough" -const WRITE_RATE_MS = 1000 - const initialTime = Date.now() -jest.useFakeTimers({ - now: initialTime, -}) - -function resetTime() { - jest.setSystemTime(initialTime) -} -async function travelForward(ms: number) { - await jest.advanceTimersByTimeAsync(ms) - +async function waitForQueueCompletion() { const queue: InMemoryQueue = docWritethroughProcessorQueue as never - while (queue.hasRunningJobs()) { - await jest.runOnlyPendingTimersAsync() - } + await queue.waitForCompletion() } describe("docWritethrough", () => { @@ -44,30 +33,28 @@ describe("docWritethrough", () => { } beforeEach(async () => { - resetTime() + jest.clearAllMocks() documentId = structures.uuid() - docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) + docWritethrough = new DocWritethrough(db, documentId) }) - it("patching will not persist if timeout does not hit", async () => { + it("patching will not persist until the messages are persisted", async () => { await config.doInTenant(async () => { - await travelForward(WRITE_RATE_MS) await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2)) - await travelForward(WRITE_RATE_MS - 1) expect(await db.exists(documentId)).toBe(false) }) }) - it("patching will persist if timeout hits", async () => { + it("patching will persist when the messages are persisted", async () => { await config.doInTenant(async () => { const patch1 = generatePatchObject(2) const patch2 = generatePatchObject(2) await docWritethrough.patch(patch1) await docWritethrough.patch(patch2) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() // This will not be persisted const patch3 = generatePatchObject(3) @@ -77,9 +64,9 @@ describe("docWritethrough", () => { _id: documentId, ...patch1, ...patch2, - _rev: expect.stringMatching(/1-.+/), - createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), - updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), + _rev: expect.stringMatching(/2-.+/), + createdAt: new Date(initialTime).toISOString(), + updatedAt: new Date(initialTime).toISOString(), }) }) }) @@ -91,12 +78,12 @@ describe("docWritethrough", () => { await docWritethrough.patch(patch1) await docWritethrough.patch(patch2) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() const patch3 = generatePatchObject(3) await docWritethrough.patch(patch3) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( expect.objectContaining({ @@ -114,12 +101,13 @@ describe("docWritethrough", () => { const patch1 = generatePatchObject(2) const patch2 = generatePatchObject(2) await docWritethrough.patch(patch1) - await travelForward(WRITE_RATE_MS) const date1 = new Date() + await waitForQueueCompletion() await docWritethrough.patch(patch2) - await travelForward(WRITE_RATE_MS) + tk.travel(Date.now() + 100) const date2 = new Date() + await waitForQueueCompletion() expect(date1).not.toEqual(date2) expect(await db.get(documentId)).toEqual( @@ -135,7 +123,7 @@ describe("docWritethrough", () => { await config.doInTenant(async () => { const patch1 = generatePatchObject(2) await docWritethrough.patch(patch1) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() const patch2 = generatePatchObject(1) await docWritethrough.patch(patch2) @@ -146,14 +134,14 @@ describe("docWritethrough", () => { }) ) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() const patch3 = { ...generatePatchObject(3), [keyToOverride]: generator.word(), } await docWritethrough.patch(patch3) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( expect.objectContaining({ @@ -169,8 +157,7 @@ describe("docWritethrough", () => { await config.doInTenant(async () => { const secondDocWritethrough = new DocWritethrough( db, - structures.db.id(), - WRITE_RATE_MS + structures.db.id() ) const doc1Patch = generatePatchObject(2) @@ -178,13 +165,13 @@ describe("docWritethrough", () => { const doc2Patch = generatePatchObject(1) await secondDocWritethrough.patch(doc2Patch) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() const doc1Patch2 = generatePatchObject(3) await docWritethrough.patch(doc1Patch2) const doc2Patch2 = generatePatchObject(3) await secondDocWritethrough.patch(doc2Patch2) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() expect(await db.get(docWritethrough.docId)).toEqual( expect.objectContaining({ @@ -207,9 +194,7 @@ describe("docWritethrough", () => { const initialPatch = generatePatchObject(5) await docWritethrough.patch(initialPatch) - await travelForward(WRITE_RATE_MS) - - await docWritethrough.patch({}) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( expect.objectContaining(initialPatch) @@ -217,10 +202,10 @@ describe("docWritethrough", () => { await db.remove(await db.get(documentId)) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() const extraPatch = generatePatchObject(5) await docWritethrough.patch(extraPatch) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( expect.objectContaining(extraPatch) @@ -231,59 +216,44 @@ describe("docWritethrough", () => { }) }) - it("concurrent calls will not cause multiple saves", async () => { + it("concurrent calls will not cause conflicts", async () => { async function parallelPatch(count: number) { - await Promise.all( - Array.from({ length: count }).map(() => - docWritethrough.patch(generatePatchObject(1)) - ) + const patches = Array.from({ length: count }).map(() => + generatePatchObject(1) ) + await Promise.all(patches.map(p => docWritethrough.patch(p))) + + return patches.reduce((acc, c) => { + acc = { ...acc, ...c } + return acc + }, {}) } - const persistToDbSpy = jest.spyOn(processor as any, "persistToDb") - const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache") + const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add") await config.doInTenant(async () => { - await parallelPatch(5) - expect(storeToCacheSpy).toBeCalledTimes(5) - expect(persistToDbSpy).not.toBeCalled() - expect(await db.exists(documentId)).toBe(false) + let patches = await parallelPatch(5) + expect(queueMessageSpy).toBeCalledTimes(5) - await travelForward(WRITE_RATE_MS) - - await parallelPatch(40) - - expect(storeToCacheSpy).toBeCalledTimes(45) - expect(persistToDbSpy).toBeCalledTimes(1) - // Ideally we want to spy on persistToDb from ./docWritethrough, but due our barrel files configuration required quite of a complex setup. - // We are relying on the document being stored only once (otherwise we would have _rev updated) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( - expect.objectContaining({ - _id: documentId, - _rev: expect.stringMatching(/1-.+/), - }) + expect.objectContaining(patches) ) - await parallelPatch(10) + patches = { ...patches, ...(await parallelPatch(40)) } + expect(queueMessageSpy).toBeCalledTimes(45) - expect(storeToCacheSpy).toBeCalledTimes(55) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( - expect.objectContaining({ - _id: documentId, - _rev: expect.stringMatching(/1-.+/), - }) + expect.objectContaining(patches) ) - await travelForward(WRITE_RATE_MS) + patches = { ...patches, ...(await parallelPatch(10)) } + expect(queueMessageSpy).toBeCalledTimes(55) - await parallelPatch(5) - await travelForward(WRITE_RATE_MS) + await waitForQueueCompletion() expect(await db.get(documentId)).toEqual( - expect.objectContaining({ - _id: documentId, - _rev: expect.stringMatching(/3-.+/), - }) + expect.objectContaining(patches) ) - expect(storeToCacheSpy).toBeCalledTimes(60) }) }) })