Simplify, use only queues

This commit is contained in:
Adria Navarro 2024-03-06 14:07:39 +01:00
parent 5109477e52
commit 4ce85cde1a
2 changed files with 79 additions and 201 deletions

View File

@ -1,100 +1,55 @@
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
import BaseCache from "./base"
import { getDocWritethroughClient } from "../redis/init"
import { JobQueue, createQueue } from "../queue"
import * as dbUtils from "../db"
import { Duration, newid } from "../utils"
import { locks } from ".."
let CACHE: BaseCache | null = null
async function getCache() {
if (!CACHE) {
const client = await getDocWritethroughClient()
CACHE = new BaseCache(client)
}
return CACHE
}
import { string } from "yargs"
import { db } from ".."
import { locks } from "../redis"
import { Duration } from "../utils"
interface ProcessDocMessage {
dbName: string
docId: string
cacheKeyPrefix: string
messageId: string
data: Record<string, any>
}
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
JobQueue.DOC_WRITETHROUGH_QUEUE,
{
jobOptions: {
attempts: 5,
backoff: {
type: "fixed",
delay: 1000,
},
},
}
JobQueue.DOC_WRITETHROUGH_QUEUE
)
class DocWritethroughProcessor {
init() {
docWritethroughProcessorQueue.process(async message => {
const { cacheKeyPrefix, messageId } = message.data
const cache = await getCache()
const latestMessageId = await cache.get(
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
)
if (messageId !== latestMessageId) {
// Nothing to do, another message overrode it
return
}
const lockResponse = await locks.doWithLock(
const result = await locks.doWithLock(
{
type: LockType.TRY_TWICE,
type: LockType.DEFAULT,
name: LockName.PERSIST_DOC_WRITETHROUGH,
resource: cacheKeyPrefix,
resource: `${message.data.dbName}:${message.data.docId}`,
ttl: Duration.fromSeconds(60).toMs(),
},
async () => {
const latestMessageId = await cache.get(
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
)
if (messageId !== latestMessageId) {
// Nothing to do, another message overrode it
return
}
await this.persistToDb(cache, message.data)
console.log("DocWritethrough persisted", { data: message.data })
await cache.deleteIfValue(
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID,
latestMessageId
)
await this.persistToDb(message.data)
}
)
if (!lockResponse.executed) {
throw new Error(`Ignoring redlock conflict in write-through cache`)
if (!result.executed) {
throw new Error(
`Error persisting docWritethrough message: ${message.id}`
)
}
})
return this
}
private async persistToDb(
cache: BaseCache,
{
dbName,
docId,
cacheKeyPrefix,
}: {
dbName: string
docId: string
cacheKeyPrefix: string
}
) {
private async persistToDb({
dbName,
docId,
data,
}: {
dbName: string
docId: string
data: Record<string, any>
}) {
const db = dbUtils.getDB(dbName)
let doc: AnyDocument | undefined
try {
@ -103,19 +58,8 @@ class DocWritethroughProcessor {
doc = { _id: docId }
}
const keysToPersist = await cache.keys(
REDIS_KEYS(cacheKeyPrefix).DATA.GET_ALL
)
for (const key of keysToPersist) {
const data = await cache.get(key, { useTenancy: false })
doc[data.key] = data.value
}
doc = { ...doc, ...data }
await db.put(doc)
for (const key of keysToPersist) {
await cache.delete(key, { useTenancy: false })
}
}
}
@ -124,15 +68,10 @@ export const processor = new DocWritethroughProcessor().init()
export class DocWritethrough {
private db: Database
private _docId: string
private writeRateMs: number
private cacheKeyPrefix: string
constructor(db: Database, docId: string, writeRateMs: number) {
constructor(db: Database, docId: string) {
this.db = db
this._docId = docId
this.writeRateMs = writeRateMs
this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
}
get docId() {
@ -140,41 +79,10 @@ export class DocWritethrough {
}
async patch(data: Record<string, any>) {
const cache = await getCache()
await this.storeToCache(cache, data)
const messageId = newid()
await cache.store(
REDIS_KEYS(this.cacheKeyPrefix).LATEST_MESSAGE_ID,
messageId
)
docWritethroughProcessorQueue.add(
{
dbName: this.db.name,
docId: this.docId,
cacheKeyPrefix: this.cacheKeyPrefix,
messageId,
},
{
delay: this.writeRateMs,
}
)
}
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
data = Object.entries(data).reduce((acc, [key, value]) => {
acc[REDIS_KEYS(this.cacheKeyPrefix).DATA.VALUE(key)] = { key, value }
return acc
}, {} as Record<string, any>)
await cache.bulkStore(data, null)
await docWritethroughProcessorQueue.add({
dbName: this.db.name,
docId: this.docId,
data,
})
}
}
const REDIS_KEYS = (prefix: string) => ({
DATA: {
VALUE: (key: string) => prefix + ":data:" + key,
GET_ALL: prefix + ":data:*",
},
LATEST_MESSAGE_ID: prefix + ":info:latestMessageId",
})

View File

@ -1,3 +1,5 @@
import tk from "timekeeper"
import _ from "lodash"
import { DBTestConfiguration, generator, structures } from "../../../tests"
import { getDB } from "../../db"
@ -7,24 +9,11 @@ import { DocWritethrough, processor } from "../docWritethrough"
import InMemoryQueue from "../../queue/inMemoryQueue"
import { docWritethroughProcessorQueue } from "../docWritethrough"
const WRITE_RATE_MS = 1000
const initialTime = Date.now()
jest.useFakeTimers({
now: initialTime,
})
function resetTime() {
jest.setSystemTime(initialTime)
}
async function travelForward(ms: number) {
await jest.advanceTimersByTimeAsync(ms)
async function waitForQueueCompletion() {
const queue: InMemoryQueue = docWritethroughProcessorQueue as never
while (queue.hasRunningJobs()) {
await jest.runOnlyPendingTimersAsync()
}
await queue.waitForCompletion()
}
describe("docWritethrough", () => {
@ -44,30 +33,28 @@ describe("docWritethrough", () => {
}
beforeEach(async () => {
resetTime()
jest.clearAllMocks()
documentId = structures.uuid()
docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS)
docWritethrough = new DocWritethrough(db, documentId)
})
it("patching will not persist if timeout does not hit", async () => {
it("patching will not persist until the messages are persisted", async () => {
await config.doInTenant(async () => {
await travelForward(WRITE_RATE_MS)
await docWritethrough.patch(generatePatchObject(2))
await docWritethrough.patch(generatePatchObject(2))
await travelForward(WRITE_RATE_MS - 1)
expect(await db.exists(documentId)).toBe(false)
})
})
it("patching will persist if timeout hits", async () => {
it("patching will persist when the messages are persisted", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
// This will not be persisted
const patch3 = generatePatchObject(3)
@ -77,9 +64,9 @@ describe("docWritethrough", () => {
_id: documentId,
...patch1,
...patch2,
_rev: expect.stringMatching(/1-.+/),
createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(),
updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(),
_rev: expect.stringMatching(/2-.+/),
createdAt: new Date(initialTime).toISOString(),
updatedAt: new Date(initialTime).toISOString(),
})
})
})
@ -91,12 +78,12 @@ describe("docWritethrough", () => {
await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
@ -114,12 +101,13 @@ describe("docWritethrough", () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await travelForward(WRITE_RATE_MS)
const date1 = new Date()
await waitForQueueCompletion()
await docWritethrough.patch(patch2)
await travelForward(WRITE_RATE_MS)
tk.travel(Date.now() + 100)
const date2 = new Date()
await waitForQueueCompletion()
expect(date1).not.toEqual(date2)
expect(await db.get(documentId)).toEqual(
@ -135,7 +123,7 @@ describe("docWritethrough", () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
const patch2 = generatePatchObject(1)
await docWritethrough.patch(patch2)
@ -146,14 +134,14 @@ describe("docWritethrough", () => {
})
)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
const patch3 = {
...generatePatchObject(3),
[keyToOverride]: generator.word(),
}
await docWritethrough.patch(patch3)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
@ -169,8 +157,7 @@ describe("docWritethrough", () => {
await config.doInTenant(async () => {
const secondDocWritethrough = new DocWritethrough(
db,
structures.db.id(),
WRITE_RATE_MS
structures.db.id()
)
const doc1Patch = generatePatchObject(2)
@ -178,13 +165,13 @@ describe("docWritethrough", () => {
const doc2Patch = generatePatchObject(1)
await secondDocWritethrough.patch(doc2Patch)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
const doc1Patch2 = generatePatchObject(3)
await docWritethrough.patch(doc1Patch2)
const doc2Patch2 = generatePatchObject(3)
await secondDocWritethrough.patch(doc2Patch2)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
expect(await db.get(docWritethrough.docId)).toEqual(
expect.objectContaining({
@ -207,9 +194,7 @@ describe("docWritethrough", () => {
const initialPatch = generatePatchObject(5)
await docWritethrough.patch(initialPatch)
await travelForward(WRITE_RATE_MS)
await docWritethrough.patch({})
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(initialPatch)
@ -217,10 +202,10 @@ describe("docWritethrough", () => {
await db.remove(await db.get(documentId))
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
const extraPatch = generatePatchObject(5)
await docWritethrough.patch(extraPatch)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(extraPatch)
@ -231,59 +216,44 @@ describe("docWritethrough", () => {
})
})
it("concurrent calls will not cause multiple saves", async () => {
it("concurrent calls will not cause conflicts", async () => {
async function parallelPatch(count: number) {
await Promise.all(
Array.from({ length: count }).map(() =>
docWritethrough.patch(generatePatchObject(1))
)
const patches = Array.from({ length: count }).map(() =>
generatePatchObject(1)
)
await Promise.all(patches.map(p => docWritethrough.patch(p)))
return patches.reduce((acc, c) => {
acc = { ...acc, ...c }
return acc
}, {})
}
const persistToDbSpy = jest.spyOn(processor as any, "persistToDb")
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add")
await config.doInTenant(async () => {
await parallelPatch(5)
expect(storeToCacheSpy).toBeCalledTimes(5)
expect(persistToDbSpy).not.toBeCalled()
expect(await db.exists(documentId)).toBe(false)
let patches = await parallelPatch(5)
expect(queueMessageSpy).toBeCalledTimes(5)
await travelForward(WRITE_RATE_MS)
await parallelPatch(40)
expect(storeToCacheSpy).toBeCalledTimes(45)
expect(persistToDbSpy).toBeCalledTimes(1)
// Ideally we want to spy on persistToDb from ./docWritethrough, but due our barrel files configuration required quite of a complex setup.
// We are relying on the document being stored only once (otherwise we would have _rev updated)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/1-.+/),
})
expect.objectContaining(patches)
)
await parallelPatch(10)
patches = { ...patches, ...(await parallelPatch(40)) }
expect(queueMessageSpy).toBeCalledTimes(45)
expect(storeToCacheSpy).toBeCalledTimes(55)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/1-.+/),
})
expect.objectContaining(patches)
)
await travelForward(WRITE_RATE_MS)
patches = { ...patches, ...(await parallelPatch(10)) }
expect(queueMessageSpy).toBeCalledTimes(55)
await parallelPatch(5)
await travelForward(WRITE_RATE_MS)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/3-.+/),
})
expect.objectContaining(patches)
)
expect(storeToCacheSpy).toBeCalledTimes(60)
})
})
})