This commit is contained in:
Adria Navarro 2024-03-05 13:50:58 +01:00
parent 151bfd103b
commit caff2876dd
3 changed files with 186 additions and 109 deletions

View File

@ -4,7 +4,8 @@ import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
import * as locks from "../redis/redlockImpl"
import { JobQueue, createQueue } from "../queue"
import { context, db as dbUtils } from ".."
import * as context from "../context"
import * as dbUtils from "../db"
const DEFAULT_WRITE_RATE_MS = 10000
@ -28,50 +29,71 @@ export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
JobQueue.DOC_WRITETHROUGH_QUEUE
)
docWritethroughProcessorQueue.process(async message => {
const { dbName, tenantId, docId, cacheKeyPrefix } = message.data
const cache = await getCache()
await context.doInTenant(tenantId, async () => {
const lockResponse = await locks.doWithLock(
{
type: LockType.TRY_ONCE,
name: LockName.PERSIST_WRITETHROUGH,
resource: cacheKeyPrefix,
ttl: 15000,
},
async () => {
const db = dbUtils.getDB(dbName)
let doc: AnyDocument | undefined
try {
doc = await db.get(docId)
} catch {
doc = { _id: docId }
let _init = false
export const init = () => {
if (_init) {
return
}
docWritethroughProcessorQueue.process(async message => {
const { tenantId, cacheKeyPrefix } = message.data
await context.doInTenant(tenantId, async () => {
const lockResponse = await locks.doWithLock(
{
type: LockType.TRY_ONCE,
name: LockName.PERSIST_WRITETHROUGH,
resource: cacheKeyPrefix,
ttl: 15000,
},
async () => {
await persistToDb(message.data)
}
)
const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`)
for (const key of keysToPersist) {
const data = await cache.get(key, { useTenancy: false })
doc[data.key] = data.value
}
await db.put(doc)
for (const key of keysToPersist) {
await cache.delete(key, { useTenancy: false })
}
if (!lockResponse.executed) {
console.log(`Ignoring redlock conflict in write-through cache`)
}
)
if (!lockResponse.executed) {
console.log(`Ignoring redlock conflict in write-through cache`)
}
})
})
})
_init = true
}
export async function persistToDb({
dbName,
docId,
cacheKeyPrefix,
}: {
dbName: string
docId: string
cacheKeyPrefix: string
}) {
const cache = await getCache()
const db = dbUtils.getDB(dbName)
let doc: AnyDocument | undefined
try {
doc = await db.get(docId)
} catch {
doc = { _id: docId }
}
const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`)
for (const key of keysToPersist) {
const data = await cache.get(key, { useTenancy: false })
doc[data.key] = data.value
}
await db.put(doc)
for (const key of keysToPersist) {
await cache.delete(key, { useTenancy: false })
}
}
export class DocWritethrough {
private db: Database
private _docId: string
private writeRateMs: number
private tenantId: string
private cacheKeyPrefix: string
@ -84,6 +106,7 @@ export class DocWritethrough {
this._docId = docId
this.writeRateMs = writeRateMs
this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
this.tenantId = context.getTenantId()
}
get docId() {
@ -97,13 +120,13 @@ export class DocWritethrough {
docWritethroughProcessorQueue.add(
{
tenantId: context.getTenantId(),
tenantId: this.tenantId,
dbName: this.db.name,
docId: this.docId,
cacheKeyPrefix: this.cacheKeyPrefix,
},
{
delay: this.writeRateMs - 1,
delay: this.writeRateMs,
jobId: this.cacheKeyPrefix,
removeOnFail: true,
removeOnComplete: true,

View File

@ -1,20 +1,32 @@
import tk from "timekeeper"
import { DBTestConfiguration, generator, structures } from "../../../tests"
import { getDB } from "../../db"
import { DocWritethrough } from "../docWritethrough"
import _ from "lodash"
const WRITE_RATE_MS = 500
import {
DocWritethrough,
docWritethroughProcessorQueue,
init,
} from "../docWritethrough"
import InMemoryQueue from "../../queue/inMemoryQueue"
const WRITE_RATE_MS = 1000
const initialTime = Date.now()
jest.useFakeTimers({
now: initialTime,
})
function resetTime() {
tk.travel(initialTime)
jest.setSystemTime(initialTime)
}
function travelForward(ms: number) {
const updatedTime = Date.now() + ms
tk.travel(updatedTime)
async function travelForward(ms: number) {
await jest.advanceTimersByTimeAsync(ms)
const queue: InMemoryQueue = docWritethroughProcessorQueue as never
while (queue.hasRunningJobs()) {
await jest.runOnlyPendingTimersAsync()
}
}
describe("docWritethrough", () => {
@ -33,33 +45,37 @@ describe("docWritethrough", () => {
}, {} as Record<string, any>)
}
beforeEach(() => {
beforeAll(() => init())
beforeEach(async () => {
resetTime()
documentId = structures.uuid()
docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS)
await config.doInTenant(async () => {
docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS)
})
})
it("patching will not persist if timeout from the creation does not hit", async () => {
it("patching will not persist if timeout does not hit", async () => {
await config.doInTenant(async () => {
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
await docWritethrough.patch(generatePatchObject(2))
await docWritethrough.patch(generatePatchObject(2))
travelForward(WRITE_RATE_MS - 1)
await docWritethrough.patch(generatePatchObject(2))
await travelForward(WRITE_RATE_MS - 1)
expect(await db.exists(documentId)).toBe(false)
})
})
it("patching will persist if timeout hits and next patch is called", async () => {
it("patching will persist if timeout hits", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
// This will not be persisted
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
@ -67,7 +83,6 @@ describe("docWritethrough", () => {
_id: documentId,
...patch1,
...patch2,
...patch3,
_rev: expect.stringMatching(/1-.+/),
createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(),
updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(),
@ -82,15 +97,12 @@ describe("docWritethrough", () => {
await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
travelForward(WRITE_RATE_MS)
const patch4 = generatePatchObject(3)
await docWritethrough.patch(patch4)
await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
@ -98,7 +110,6 @@ describe("docWritethrough", () => {
...patch1,
...patch2,
...patch3,
...patch4,
})
)
})
@ -109,16 +120,13 @@ describe("docWritethrough", () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const date1 = new Date()
await docWritethrough.patch(patch2)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const date2 = new Date()
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
expect(date1).not.toEqual(date2)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
@ -129,22 +137,11 @@ describe("docWritethrough", () => {
})
})
it("patching will not persist even if timeout hits but next patch is not callec", async () => {
await config.doInTenant(async () => {
await docWritethrough.patch(generatePatchObject(2))
await docWritethrough.patch(generatePatchObject(2))
travelForward(WRITE_RATE_MS)
expect(await db.exists(documentId)).toBe(false)
})
})
it("concurrent patches will override keys", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
await docWritethrough.patch(patch1)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const patch2 = generatePatchObject(1)
await docWritethrough.patch(patch2)
@ -155,13 +152,14 @@ describe("docWritethrough", () => {
})
)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const patch3 = {
...generatePatchObject(3),
[keyToOverride]: generator.word(),
}
await docWritethrough.patch(patch3)
await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
@ -173,7 +171,7 @@ describe("docWritethrough", () => {
})
})
it("concurrent patches to multiple DocWritethrough will not contaminate each other", async () => {
it("concurrent patches to different docWritethrough will not pollute each other", async () => {
await config.doInTenant(async () => {
const secondDocWritethrough = new DocWritethrough(
db,
@ -186,12 +184,13 @@ describe("docWritethrough", () => {
const doc2Patch = generatePatchObject(1)
await secondDocWritethrough.patch(doc2Patch)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const doc1Patch2 = generatePatchObject(3)
await docWritethrough.patch(doc1Patch2)
const doc2Patch2 = generatePatchObject(3)
await secondDocWritethrough.patch(doc2Patch2)
await travelForward(WRITE_RATE_MS)
expect(await db.get(docWritethrough.docId)).toEqual(
expect.objectContaining({
@ -214,7 +213,7 @@ describe("docWritethrough", () => {
const initialPatch = generatePatchObject(5)
await docWritethrough.patch(initialPatch)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
await docWritethrough.patch({})
@ -224,9 +223,10 @@ describe("docWritethrough", () => {
await db.remove(await db.get(documentId))
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
const extraPatch = generatePatchObject(5)
await docWritethrough.patch(extraPatch)
await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual(
expect.objectContaining(extraPatch)
@ -246,30 +246,46 @@ describe("docWritethrough", () => {
)
}
const persistToDbSpy = jest.spyOn(docWritethrough as any, "persistToDb")
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
await config.doInTenant(async () => {
await parallelPatch(5)
expect(persistToDbSpy).not.toBeCalled()
expect(storeToCacheSpy).toBeCalledTimes(5)
expect(await db.exists(documentId)).toBe(false)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
await parallelPatch(40)
expect(persistToDbSpy).toBeCalledTimes(1)
expect(storeToCacheSpy).toBeCalledTimes(45)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/1-.+/),
})
)
await parallelPatch(10)
expect(persistToDbSpy).toBeCalledTimes(1)
expect(storeToCacheSpy).toBeCalledTimes(55)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/1-.+/),
})
)
travelForward(WRITE_RATE_MS)
await travelForward(WRITE_RATE_MS)
await parallelPatch(5)
expect(persistToDbSpy).toBeCalledTimes(2)
await travelForward(WRITE_RATE_MS)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
_rev: expect.stringMatching(/3-.+/),
})
)
expect(storeToCacheSpy).toBeCalledTimes(60)
})
})

View File

@ -2,6 +2,13 @@ import events from "events"
import { timeout } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue"
interface JobMessage {
timestamp: number
queue: string
data: any
opts?: JobOptions
}
/**
* Bull works with a Job wrapper around all messages that contains a lot more information about
* the state of the message, this object constructor implements the same schema of Bull jobs
@ -11,12 +18,12 @@ import { Queue, QueueOptions, JobOptions } from "./queue"
* @returns A new job which can now be put onto the queue, this is mostly an
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
*/
function newJob(queue: string, message: any) {
function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
return {
timestamp: Date.now(),
queue: queue,
data: message,
opts: {},
opts,
}
}
@ -28,10 +35,12 @@ function newJob(queue: string, message: any) {
class InMemoryQueue implements Partial<Queue> {
_name: string
_opts?: QueueOptions
_messages: any[]
_messages: JobMessage[]
_queuedJobIds: Set<string>
_emitter: EventEmitter
_runCount: number
_addCount: number
/**
* The constructor the queue, exactly the same as that of Bulls.
* @param name The name of the queue which is being configured.
@ -45,6 +54,7 @@ class InMemoryQueue implements Partial<Queue> {
this._emitter = new events.EventEmitter()
this._runCount = 0
this._addCount = 0
this._queuedJobIds = new Set<string>()
}
/**
@ -58,19 +68,24 @@ class InMemoryQueue implements Partial<Queue> {
*/
async process(func: any) {
this._emitter.on("message", async () => {
const delay = this._opts?.defaultJobOptions?.delay
if (delay) {
await new Promise<void>(r => setTimeout(() => r(), delay))
try {
if (this._messages.length <= 0) {
return
}
let msg = this._messages.shift()
let resp = func(msg)
if (resp.then != null) {
await resp
}
this._runCount++
const jobId = msg?.opts?.jobId?.toString()
if (jobId && msg?.opts?.removeOnComplete) {
this._queuedJobIds.delete(jobId)
}
} catch (e: any) {
throw e
}
if (this._messages.length <= 0) {
return
}
let msg = this._messages.shift()
let resp = func(msg)
if (resp.then != null) {
await resp
}
this._runCount++
})
}
@ -89,12 +104,31 @@ class InMemoryQueue implements Partial<Queue> {
*/
// eslint-disable-next-line no-unused-vars
async add(data: any, opts?: JobOptions) {
const jobId = opts?.jobId?.toString()
if (jobId && this._queuedJobIds.has(jobId)) {
console.log(`Ignoring already queued job ${jobId}`)
return
}
if (typeof data !== "object") {
throw "Queue only supports carrying JSON."
}
this._messages.push(newJob(this._name, data))
this._addCount++
this._emitter.emit("message")
if (jobId) {
this._queuedJobIds.add(jobId)
}
const pushMessage = () => {
this._messages.push(newJob(this._name, data, opts))
this._addCount++
this._emitter.emit("message")
}
const delay = opts?.delay
if (delay) {
setTimeout(pushMessage, delay)
} else {
pushMessage()
}
return {} as any
}
@ -143,7 +177,11 @@ class InMemoryQueue implements Partial<Queue> {
async waitForCompletion() {
do {
await timeout(50)
} while (this._addCount < this._runCount)
} while (this.hasRunningJobs)
}
hasRunningJobs() {
return this._addCount > this._runCount
}
}