This commit is contained in:
Adria Navarro 2024-03-05 13:50:58 +01:00
parent 2d84bc5da2
commit e648503e4f
3 changed files with 186 additions and 109 deletions

View File

@ -4,7 +4,8 @@ import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
import * as locks from "../redis/redlockImpl" import * as locks from "../redis/redlockImpl"
import { JobQueue, createQueue } from "../queue" import { JobQueue, createQueue } from "../queue"
import { context, db as dbUtils } from ".." import * as context from "../context"
import * as dbUtils from "../db"
const DEFAULT_WRITE_RATE_MS = 10000 const DEFAULT_WRITE_RATE_MS = 10000
@ -28,9 +29,13 @@ export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
JobQueue.DOC_WRITETHROUGH_QUEUE JobQueue.DOC_WRITETHROUGH_QUEUE
) )
let _init = false
export const init = () => {
if (_init) {
return
}
docWritethroughProcessorQueue.process(async message => { docWritethroughProcessorQueue.process(async message => {
const { dbName, tenantId, docId, cacheKeyPrefix } = message.data const { tenantId, cacheKeyPrefix } = message.data
const cache = await getCache()
await context.doInTenant(tenantId, async () => { await context.doInTenant(tenantId, async () => {
const lockResponse = await locks.doWithLock( const lockResponse = await locks.doWithLock(
{ {
@ -40,6 +45,29 @@ docWritethroughProcessorQueue.process(async message => {
ttl: 15000, ttl: 15000,
}, },
async () => { async () => {
await persistToDb(message.data)
}
)
if (!lockResponse.executed) {
console.log(`Ignoring redlock conflict in write-through cache`)
}
})
})
_init = true
}
export async function persistToDb({
dbName,
docId,
cacheKeyPrefix,
}: {
dbName: string
docId: string
cacheKeyPrefix: string
}) {
const cache = await getCache()
const db = dbUtils.getDB(dbName) const db = dbUtils.getDB(dbName)
let doc: AnyDocument | undefined let doc: AnyDocument | undefined
try { try {
@ -60,18 +88,12 @@ docWritethroughProcessorQueue.process(async message => {
await cache.delete(key, { useTenancy: false }) await cache.delete(key, { useTenancy: false })
} }
} }
)
if (!lockResponse.executed) {
console.log(`Ignoring redlock conflict in write-through cache`)
}
})
})
export class DocWritethrough { export class DocWritethrough {
private db: Database private db: Database
private _docId: string private _docId: string
private writeRateMs: number private writeRateMs: number
private tenantId: string
private cacheKeyPrefix: string private cacheKeyPrefix: string
@ -84,6 +106,7 @@ export class DocWritethrough {
this._docId = docId this._docId = docId
this.writeRateMs = writeRateMs this.writeRateMs = writeRateMs
this.cacheKeyPrefix = `${this.db.name}:${this.docId}` this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
this.tenantId = context.getTenantId()
} }
get docId() { get docId() {
@ -97,13 +120,13 @@ export class DocWritethrough {
docWritethroughProcessorQueue.add( docWritethroughProcessorQueue.add(
{ {
tenantId: context.getTenantId(), tenantId: this.tenantId,
dbName: this.db.name, dbName: this.db.name,
docId: this.docId, docId: this.docId,
cacheKeyPrefix: this.cacheKeyPrefix, cacheKeyPrefix: this.cacheKeyPrefix,
}, },
{ {
delay: this.writeRateMs - 1, delay: this.writeRateMs,
jobId: this.cacheKeyPrefix, jobId: this.cacheKeyPrefix,
removeOnFail: true, removeOnFail: true,
removeOnComplete: true, removeOnComplete: true,

View File

@ -1,20 +1,32 @@
import tk from "timekeeper"
import { DBTestConfiguration, generator, structures } from "../../../tests" import { DBTestConfiguration, generator, structures } from "../../../tests"
import { getDB } from "../../db" import { getDB } from "../../db"
import { DocWritethrough } from "../docWritethrough"
import _ from "lodash" import _ from "lodash"
const WRITE_RATE_MS = 500 import {
DocWritethrough,
docWritethroughProcessorQueue,
init,
} from "../docWritethrough"
import InMemoryQueue from "../../queue/inMemoryQueue"
const WRITE_RATE_MS = 1000
const initialTime = Date.now() const initialTime = Date.now()
jest.useFakeTimers({
now: initialTime,
})
function resetTime() { function resetTime() {
tk.travel(initialTime) jest.setSystemTime(initialTime)
}
async function travelForward(ms: number) {
await jest.advanceTimersByTimeAsync(ms)
const queue: InMemoryQueue = docWritethroughProcessorQueue as never
while (queue.hasRunningJobs()) {
await jest.runOnlyPendingTimersAsync()
} }
function travelForward(ms: number) {
const updatedTime = Date.now() + ms
tk.travel(updatedTime)
} }
describe("docWritethrough", () => { describe("docWritethrough", () => {
@ -33,33 +45,37 @@ describe("docWritethrough", () => {
}, {} as Record<string, any>) }, {} as Record<string, any>)
} }
beforeEach(() => { beforeAll(() => init())
beforeEach(async () => {
resetTime() resetTime()
documentId = structures.uuid() documentId = structures.uuid()
await config.doInTenant(async () => {
docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS)
}) })
})
it("patching will not persist if timeout from the creation does not hit", async () => { it("patching will not persist if timeout does not hit", async () => {
await config.doInTenant(async () => { await config.doInTenant(async () => {
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2))
await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2))
travelForward(WRITE_RATE_MS - 1) await travelForward(WRITE_RATE_MS - 1)
await docWritethrough.patch(generatePatchObject(2))
expect(await db.exists(documentId)).toBe(false) expect(await db.exists(documentId)).toBe(false)
}) })
}) })
it("patching will persist if timeout hits and next patch is called", async () => { it("patching will persist if timeout hits", async () => {
await config.doInTenant(async () => { await config.doInTenant(async () => {
const patch1 = generatePatchObject(2) const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2) const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1) await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2) await docWritethrough.patch(patch2)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
// This will not be persisted
const patch3 = generatePatchObject(3) const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3) await docWritethrough.patch(patch3)
@ -67,7 +83,6 @@ describe("docWritethrough", () => {
_id: documentId, _id: documentId,
...patch1, ...patch1,
...patch2, ...patch2,
...patch3,
_rev: expect.stringMatching(/1-.+/), _rev: expect.stringMatching(/1-.+/),
createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(),
updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(),
@ -82,15 +97,12 @@ describe("docWritethrough", () => {
await docWritethrough.patch(patch1) await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2) await docWritethrough.patch(patch2)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const patch3 = generatePatchObject(3) const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3) await docWritethrough.patch(patch3)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const patch4 = generatePatchObject(3)
await docWritethrough.patch(patch4)
expect(await db.get(documentId)).toEqual( expect(await db.get(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
@ -98,7 +110,6 @@ describe("docWritethrough", () => {
...patch1, ...patch1,
...patch2, ...patch2,
...patch3, ...patch3,
...patch4,
}) })
) )
}) })
@ -109,16 +120,13 @@ describe("docWritethrough", () => {
const patch1 = generatePatchObject(2) const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2) const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1) await docWritethrough.patch(patch1)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const date1 = new Date() const date1 = new Date()
await docWritethrough.patch(patch2) await docWritethrough.patch(patch2)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const date2 = new Date() const date2 = new Date()
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
expect(date1).not.toEqual(date2) expect(date1).not.toEqual(date2)
expect(await db.get(documentId)).toEqual( expect(await db.get(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
@ -129,22 +137,11 @@ describe("docWritethrough", () => {
}) })
}) })
it("patching will not persist even if timeout hits but next patch is not callec", async () => {
await config.doInTenant(async () => {
await docWritethrough.patch(generatePatchObject(2))
await docWritethrough.patch(generatePatchObject(2))
travelForward(WRITE_RATE_MS)
expect(await db.exists(documentId)).toBe(false)
})
})
it("concurrent patches will override keys", async () => { it("concurrent patches will override keys", async () => {
await config.doInTenant(async () => { await config.doInTenant(async () => {
const patch1 = generatePatchObject(2) const patch1 = generatePatchObject(2)
await docWritethrough.patch(patch1) await docWritethrough.patch(patch1)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const patch2 = generatePatchObject(1) const patch2 = generatePatchObject(1)
await docWritethrough.patch(patch2) await docWritethrough.patch(patch2)
@ -155,13 +152,14 @@ describe("docWritethrough", () => {
}) })
) )
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const patch3 = { const patch3 = {
...generatePatchObject(3), ...generatePatchObject(3),
[keyToOverride]: generator.word(), [keyToOverride]: generator.word(),
} }
await docWritethrough.patch(patch3) await docWritethrough.patch(patch3)
await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual( expect(await db.get(documentId)).toEqual(
expect.objectContaining({ expect.objectContaining({
@ -173,7 +171,7 @@ describe("docWritethrough", () => {
}) })
}) })
it("concurrent patches to multiple DocWritethrough will not contaminate each other", async () => { it("concurrent patches to different docWritethrough will not pollute each other", async () => {
await config.doInTenant(async () => { await config.doInTenant(async () => {
const secondDocWritethrough = new DocWritethrough( const secondDocWritethrough = new DocWritethrough(
db, db,
@ -186,12 +184,13 @@ describe("docWritethrough", () => {
const doc2Patch = generatePatchObject(1) const doc2Patch = generatePatchObject(1)
await secondDocWritethrough.patch(doc2Patch) await secondDocWritethrough.patch(doc2Patch)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const doc1Patch2 = generatePatchObject(3) const doc1Patch2 = generatePatchObject(3)
await docWritethrough.patch(doc1Patch2) await docWritethrough.patch(doc1Patch2)
const doc2Patch2 = generatePatchObject(3) const doc2Patch2 = generatePatchObject(3)
await secondDocWritethrough.patch(doc2Patch2) await secondDocWritethrough.patch(doc2Patch2)
await travelForward(WRITE_RATE_MS)
expect(await db.get(docWritethrough.docId)).toEqual( expect(await db.get(docWritethrough.docId)).toEqual(
expect.objectContaining({ expect.objectContaining({
@ -214,7 +213,7 @@ describe("docWritethrough", () => {
const initialPatch = generatePatchObject(5) const initialPatch = generatePatchObject(5)
await docWritethrough.patch(initialPatch) await docWritethrough.patch(initialPatch)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
await docWritethrough.patch({}) await docWritethrough.patch({})
@ -224,9 +223,10 @@ describe("docWritethrough", () => {
await db.remove(await db.get(documentId)) await db.remove(await db.get(documentId))
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
const extraPatch = generatePatchObject(5) const extraPatch = generatePatchObject(5)
await docWritethrough.patch(extraPatch) await docWritethrough.patch(extraPatch)
await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual( expect(await db.get(documentId)).toEqual(
expect.objectContaining(extraPatch) expect.objectContaining(extraPatch)
@ -246,30 +246,46 @@ describe("docWritethrough", () => {
) )
} }
const persistToDbSpy = jest.spyOn(docWritethrough as any, "persistToDb")
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache") const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
await config.doInTenant(async () => { await config.doInTenant(async () => {
await parallelPatch(5) await parallelPatch(5)
expect(persistToDbSpy).not.toBeCalled()
expect(storeToCacheSpy).toBeCalledTimes(5) expect(storeToCacheSpy).toBeCalledTimes(5)
expect(await db.exists(documentId)).toBe(false)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
await parallelPatch(40) await parallelPatch(40)
expect(persistToDbSpy).toBeCalledTimes(1)
expect(storeToCacheSpy).toBeCalledTimes(45) expect(storeToCacheSpy).toBeCalledTimes(45)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/1-.+/),
})
)
await parallelPatch(10) await parallelPatch(10)
expect(persistToDbSpy).toBeCalledTimes(1)
expect(storeToCacheSpy).toBeCalledTimes(55) expect(storeToCacheSpy).toBeCalledTimes(55)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/1-.+/),
})
)
travelForward(WRITE_RATE_MS) await travelForward(WRITE_RATE_MS)
await parallelPatch(5) await parallelPatch(5)
expect(persistToDbSpy).toBeCalledTimes(2) await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/3-.+/),
})
)
expect(storeToCacheSpy).toBeCalledTimes(60) expect(storeToCacheSpy).toBeCalledTimes(60)
}) })
}) })

View File

@ -2,6 +2,13 @@ import events from "events"
import { timeout } from "../utils" import { timeout } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue" import { Queue, QueueOptions, JobOptions } from "./queue"
interface JobMessage {
timestamp: number
queue: string
data: any
opts?: JobOptions
}
/** /**
* Bull works with a Job wrapper around all messages that contains a lot more information about * Bull works with a Job wrapper around all messages that contains a lot more information about
* the state of the message, this object constructor implements the same schema of Bull jobs * the state of the message, this object constructor implements the same schema of Bull jobs
@ -11,12 +18,12 @@ import { Queue, QueueOptions, JobOptions } from "./queue"
* @returns A new job which can now be put onto the queue, this is mostly an * @returns A new job which can now be put onto the queue, this is mostly an
* internal structure so that an in memory queue can be easily swapped for a Bull queue. * internal structure so that an in memory queue can be easily swapped for a Bull queue.
*/ */
function newJob(queue: string, message: any) { function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
return { return {
timestamp: Date.now(), timestamp: Date.now(),
queue: queue, queue: queue,
data: message, data: message,
opts: {}, opts,
} }
} }
@ -28,10 +35,12 @@ function newJob(queue: string, message: any) {
class InMemoryQueue implements Partial<Queue> { class InMemoryQueue implements Partial<Queue> {
_name: string _name: string
_opts?: QueueOptions _opts?: QueueOptions
_messages: any[] _messages: JobMessage[]
_queuedJobIds: Set<string>
_emitter: EventEmitter _emitter: EventEmitter
_runCount: number _runCount: number
_addCount: number _addCount: number
/** /**
* The constructor the queue, exactly the same as that of Bulls. * The constructor the queue, exactly the same as that of Bulls.
* @param name The name of the queue which is being configured. * @param name The name of the queue which is being configured.
@ -45,6 +54,7 @@ class InMemoryQueue implements Partial<Queue> {
this._emitter = new events.EventEmitter() this._emitter = new events.EventEmitter()
this._runCount = 0 this._runCount = 0
this._addCount = 0 this._addCount = 0
this._queuedJobIds = new Set<string>()
} }
/** /**
@ -58,19 +68,24 @@ class InMemoryQueue implements Partial<Queue> {
*/ */
async process(func: any) { async process(func: any) {
this._emitter.on("message", async () => { this._emitter.on("message", async () => {
const delay = this._opts?.defaultJobOptions?.delay try {
if (delay) {
await new Promise<void>(r => setTimeout(() => r(), delay))
}
if (this._messages.length <= 0) { if (this._messages.length <= 0) {
return return
} }
let msg = this._messages.shift() let msg = this._messages.shift()
let resp = func(msg) let resp = func(msg)
if (resp.then != null) { if (resp.then != null) {
await resp await resp
} }
this._runCount++ this._runCount++
const jobId = msg?.opts?.jobId?.toString()
if (jobId && msg?.opts?.removeOnComplete) {
this._queuedJobIds.delete(jobId)
}
} catch (e: any) {
throw e
}
}) })
} }
@ -89,12 +104,31 @@ class InMemoryQueue implements Partial<Queue> {
*/ */
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
async add(data: any, opts?: JobOptions) { async add(data: any, opts?: JobOptions) {
const jobId = opts?.jobId?.toString()
if (jobId && this._queuedJobIds.has(jobId)) {
console.log(`Ignoring already queued job ${jobId}`)
return
}
if (typeof data !== "object") { if (typeof data !== "object") {
throw "Queue only supports carrying JSON." throw "Queue only supports carrying JSON."
} }
this._messages.push(newJob(this._name, data)) if (jobId) {
this._queuedJobIds.add(jobId)
}
const pushMessage = () => {
this._messages.push(newJob(this._name, data, opts))
this._addCount++ this._addCount++
this._emitter.emit("message") this._emitter.emit("message")
}
const delay = opts?.delay
if (delay) {
setTimeout(pushMessage, delay)
} else {
pushMessage()
}
return {} as any return {} as any
} }
@ -143,7 +177,11 @@ class InMemoryQueue implements Partial<Queue> {
async waitForCompletion() { async waitForCompletion() {
do { do {
await timeout(50) await timeout(50)
} while (this._addCount < this._runCount) } while (this.hasRunningJobs)
}
hasRunningJobs() {
return this._addCount > this._runCount
} }
} }