Fixes and tests
This commit is contained in:
parent
6b8f67ed41
commit
66751728bb
|
@ -23,6 +23,8 @@ export class DocWritethrough {
|
|||
private _docId: string
|
||||
private writeRateMs: number
|
||||
|
||||
private docInfoCacheKey: string
|
||||
|
||||
constructor(
|
||||
db: Database,
|
||||
docId: string,
|
||||
|
@ -31,6 +33,7 @@ export class DocWritethrough {
|
|||
this.db = db
|
||||
this._docId = docId
|
||||
this.writeRateMs = writeRateMs
|
||||
this.docInfoCacheKey = `${this.docId}:info`
|
||||
}
|
||||
|
||||
get docId() {
|
||||
|
@ -44,26 +47,39 @@ export class DocWritethrough {
|
|||
async patch(data: Record<string, any>) {
|
||||
const cache = await getCache()
|
||||
|
||||
const key = `${this.docId}:info`
|
||||
const cacheItem = await cache.withCache(
|
||||
key,
|
||||
null,
|
||||
() => this.makeCacheItem(),
|
||||
{
|
||||
useTenancy: false,
|
||||
}
|
||||
)
|
||||
|
||||
await this.storeToCache(cache, data)
|
||||
|
||||
const updateDb =
|
||||
!cacheItem || cacheItem.lastWrite <= Date.now() - this.writeRateMs
|
||||
// let output = this.doc
|
||||
const updateDb = await this.shouldUpdateDb(cache)
|
||||
|
||||
if (updateDb) {
|
||||
await this.persistToDb(cache)
|
||||
const lockResponse = await locks.doWithLock(
|
||||
{
|
||||
type: LockType.TRY_ONCE,
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
resource: this.docInfoCacheKey,
|
||||
ttl: 15000,
|
||||
},
|
||||
async () => {
|
||||
if (await this.shouldUpdateDb(cache)) {
|
||||
await this.persistToDb(cache)
|
||||
await cache.store(this.docInfoCacheKey, this.makeCacheItem())
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if (!lockResponse.executed) {
|
||||
console.log(`Ignoring redlock conflict in write-through cache`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async shouldUpdateDb(cache: BaseCache) {
|
||||
const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () =>
|
||||
this.makeCacheItem()
|
||||
)
|
||||
return cacheItem.lastWrite <= Date.now() - this.writeRateMs
|
||||
}
|
||||
|
||||
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
|
||||
for (const [key, value] of Object.entries(data)) {
|
||||
const cacheKey = this.docId + ":data:" + key
|
||||
|
@ -72,39 +88,23 @@ export class DocWritethrough {
|
|||
}
|
||||
|
||||
private async persistToDb(cache: BaseCache) {
|
||||
const key = `${this.db.name}_${this.docId}`
|
||||
let doc: AnyDocument | undefined
|
||||
try {
|
||||
doc = await this.db.get(this.docId)
|
||||
} catch {
|
||||
doc = { _id: this.docId }
|
||||
}
|
||||
|
||||
const lockResponse = await locks.doWithLock(
|
||||
{
|
||||
type: LockType.TRY_ONCE,
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
resource: key,
|
||||
ttl: 15000,
|
||||
},
|
||||
async () => {
|
||||
let doc: AnyDocument | undefined
|
||||
try {
|
||||
doc = await this.db.get(this.docId)
|
||||
} catch {
|
||||
doc = { _id: this.docId }
|
||||
}
|
||||
const keysToPersist = await cache.keys(`${this.docId}:data:*`)
|
||||
for (const key of keysToPersist) {
|
||||
const data = await cache.get(key, { useTenancy: false })
|
||||
doc[data.key] = data.value
|
||||
}
|
||||
|
||||
const keysToPersist = await cache.keys(`${this.docId}:data:*`)
|
||||
for (const key of keysToPersist) {
|
||||
const data = await cache.get(key, { useTenancy: false })
|
||||
doc[data.key] = data.value
|
||||
}
|
||||
await this.db.put(doc)
|
||||
|
||||
await this.db.put(doc)
|
||||
|
||||
for (const key of keysToPersist) {
|
||||
await cache.delete(key, { useTenancy: false })
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
if (!lockResponse.executed) {
|
||||
throw `DocWriteThrough could not be persisted to db for ${key}`
|
||||
for (const key of keysToPersist) {
|
||||
await cache.delete(key, { useTenancy: false })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,10 @@
|
|||
import tk from "timekeeper"
|
||||
import { env } from "../.."
|
||||
|
||||
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
||||
import { getDB } from "../../db"
|
||||
import { DocWritethrough } from "../docWritethrough"
|
||||
import _ from "lodash"
|
||||
|
||||
env._set("MOCK_REDIS", null)
|
||||
|
||||
const WRITE_RATE_MS = 500
|
||||
|
||||
const initialTime = Date.now()
|
||||
|
@ -238,5 +236,42 @@ describe("docWritethrough", () => {
|
|||
)
|
||||
})
|
||||
})
|
||||
|
||||
it("concurrent calls will not cause multiple saves", async () => {
|
||||
async function parallelPatch(count: number) {
|
||||
await Promise.all(
|
||||
Array.from({ length: count }).map(() =>
|
||||
docWritethrough.patch(generatePatchObject(1))
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
const persistToDbSpy = jest.spyOn(docWritethrough as any, "persistToDb")
|
||||
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
|
||||
|
||||
await config.doInTenant(async () => {
|
||||
await parallelPatch(5)
|
||||
expect(persistToDbSpy).not.toBeCalled()
|
||||
expect(storeToCacheSpy).toBeCalledTimes(5)
|
||||
|
||||
travelForward(WRITE_RATE_MS)
|
||||
|
||||
await parallelPatch(40)
|
||||
|
||||
expect(persistToDbSpy).toBeCalledTimes(1)
|
||||
expect(storeToCacheSpy).toBeCalledTimes(45)
|
||||
|
||||
await parallelPatch(10)
|
||||
|
||||
expect(persistToDbSpy).toBeCalledTimes(1)
|
||||
expect(storeToCacheSpy).toBeCalledTimes(55)
|
||||
|
||||
travelForward(WRITE_RATE_MS)
|
||||
|
||||
await parallelPatch(5)
|
||||
expect(persistToDbSpy).toBeCalledTimes(2)
|
||||
expect(storeToCacheSpy).toBeCalledTimes(60)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue