diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index e367c9e060..38a162435d 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -3,6 +3,9 @@ import { getDocWritethroughClient } from "../redis/init" import { AnyDocument, Database, LockName, LockType } from "@budibase/types" import * as locks from "../redis/redlockImpl" +import { JobQueue, createQueue } from "../queue" +import { context, db as dbUtils } from ".." + const DEFAULT_WRITE_RATE_MS = 10000 let CACHE: BaseCache | null = null @@ -14,17 +17,63 @@ async function getCache() { return CACHE } -interface CacheItem { - nextWrite: number +interface ProcessDocMessage { + tenantId: string + dbName: string + docId: string + cacheKeyPrefix: string } +export const docWritethroughProcessorQueue = createQueue( + JobQueue.DOC_WRITETHROUGH_QUEUE +) + +docWritethroughProcessorQueue.process(async message => { + const { dbName, tenantId, docId, cacheKeyPrefix } = message.data + const cache = await getCache() + await context.doInTenant(tenantId, async () => { + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: cacheKeyPrefix, + ttl: 15000, + }, + async () => { + const db = dbUtils.getDB(dbName) + let doc: AnyDocument | undefined + try { + doc = await db.get(docId) + } catch { + doc = { _id: docId } + } + + const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`) + for (const key of keysToPersist) { + const data = await cache.get(key, { useTenancy: false }) + doc[data.key] = data.value + } + + await db.put(doc) + + for (const key of keysToPersist) { + await cache.delete(key, { useTenancy: false }) + } + } + ) + + if (!lockResponse.executed) { + console.log(`Ignoring redlock conflict in write-through cache`) + } + }) +}) + export class DocWritethrough { private db: Database private _docId: string private writeRateMs: number private cacheKeyPrefix: string - private docInfoCacheKey: string constructor( db: Database, @@ -35,54 +84,31 @@ export class DocWritethrough { this._docId = docId this.writeRateMs = writeRateMs this.cacheKeyPrefix = `${this.db.name}:${this.docId}` - this.docInfoCacheKey = `${this.cacheKeyPrefix}:info` } get docId() { return this._docId } - private makeNextWriteInfoItem(): CacheItem { - return { nextWrite: Date.now() + this.writeRateMs } - } - async patch(data: Record) { const cache = await getCache() await this.storeToCache(cache, data) - const updateDb = await this.shouldUpdateDb(cache) - - if (updateDb) { - const lockResponse = await locks.doWithLock( - { - type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH, - resource: this.docInfoCacheKey, - ttl: 15000, - }, - async () => { - if (await this.shouldUpdateDb(cache)) { - await this.persistToDb(cache) - await cache.store( - this.docInfoCacheKey, - this.makeNextWriteInfoItem() - ) - } - } - ) - - if (!lockResponse.executed) { - console.log(`Ignoring redlock conflict in write-through cache`) + docWritethroughProcessorQueue.add( + { + tenantId: context.getTenantId(), + dbName: this.db.name, + docId: this.docId, + cacheKeyPrefix: this.cacheKeyPrefix, + }, + { + delay: this.writeRateMs - 1, + jobId: this.cacheKeyPrefix, + removeOnFail: true, + removeOnComplete: true, } - } - } - - private async shouldUpdateDb(cache: BaseCache) { - const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () => - this.makeNextWriteInfoItem() ) - return Date.now() >= cacheItem.nextWrite } private async storeToCache(cache: BaseCache, data: Record) { @@ -91,25 +117,4 @@ export class DocWritethrough { await cache.store(cacheKey, { key, value }, undefined) } } - - private async persistToDb(cache: BaseCache) { - let doc: AnyDocument | undefined - try { - doc = await this.db.get(this.docId) - } catch { - doc = { _id: this.docId } - } - - const keysToPersist = await cache.keys(`${this.cacheKeyPrefix}:data:*`) - for (const key of keysToPersist) { - const data = await cache.get(key, { useTenancy: false }) - doc[data.key] = data.value - } - - await this.db.put(doc) - - for (const key of keysToPersist) { - await cache.delete(key, { useTenancy: false }) - } - } }