Fixes and tests
This commit is contained in:
parent
00bf88c5bf
commit
6a81d21cb7
|
@ -23,6 +23,8 @@ export class DocWritethrough {
|
||||||
private _docId: string
|
private _docId: string
|
||||||
private writeRateMs: number
|
private writeRateMs: number
|
||||||
|
|
||||||
|
private docInfoCacheKey: string
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
db: Database,
|
db: Database,
|
||||||
docId: string,
|
docId: string,
|
||||||
|
@ -31,6 +33,7 @@ export class DocWritethrough {
|
||||||
this.db = db
|
this.db = db
|
||||||
this._docId = docId
|
this._docId = docId
|
||||||
this.writeRateMs = writeRateMs
|
this.writeRateMs = writeRateMs
|
||||||
|
this.docInfoCacheKey = `${this.docId}:info`
|
||||||
}
|
}
|
||||||
|
|
||||||
get docId() {
|
get docId() {
|
||||||
|
@ -44,26 +47,39 @@ export class DocWritethrough {
|
||||||
async patch(data: Record<string, any>) {
|
async patch(data: Record<string, any>) {
|
||||||
const cache = await getCache()
|
const cache = await getCache()
|
||||||
|
|
||||||
const key = `${this.docId}:info`
|
|
||||||
const cacheItem = await cache.withCache(
|
|
||||||
key,
|
|
||||||
null,
|
|
||||||
() => this.makeCacheItem(),
|
|
||||||
{
|
|
||||||
useTenancy: false,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
await this.storeToCache(cache, data)
|
await this.storeToCache(cache, data)
|
||||||
|
|
||||||
const updateDb =
|
const updateDb = await this.shouldUpdateDb(cache)
|
||||||
!cacheItem || cacheItem.lastWrite <= Date.now() - this.writeRateMs
|
|
||||||
// let output = this.doc
|
|
||||||
if (updateDb) {
|
if (updateDb) {
|
||||||
await this.persistToDb(cache)
|
const lockResponse = await locks.doWithLock(
|
||||||
|
{
|
||||||
|
type: LockType.TRY_ONCE,
|
||||||
|
name: LockName.PERSIST_WRITETHROUGH,
|
||||||
|
resource: this.docInfoCacheKey,
|
||||||
|
ttl: 15000,
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
if (await this.shouldUpdateDb(cache)) {
|
||||||
|
await this.persistToDb(cache)
|
||||||
|
await cache.store(this.docInfoCacheKey, this.makeCacheItem())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if (!lockResponse.executed) {
|
||||||
|
console.log(`Ignoring redlock conflict in write-through cache`)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private async shouldUpdateDb(cache: BaseCache) {
|
||||||
|
const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () =>
|
||||||
|
this.makeCacheItem()
|
||||||
|
)
|
||||||
|
return cacheItem.lastWrite <= Date.now() - this.writeRateMs
|
||||||
|
}
|
||||||
|
|
||||||
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
|
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
|
||||||
for (const [key, value] of Object.entries(data)) {
|
for (const [key, value] of Object.entries(data)) {
|
||||||
const cacheKey = this.docId + ":data:" + key
|
const cacheKey = this.docId + ":data:" + key
|
||||||
|
@ -72,39 +88,23 @@ export class DocWritethrough {
|
||||||
}
|
}
|
||||||
|
|
||||||
private async persistToDb(cache: BaseCache) {
|
private async persistToDb(cache: BaseCache) {
|
||||||
const key = `${this.db.name}_${this.docId}`
|
let doc: AnyDocument | undefined
|
||||||
|
try {
|
||||||
|
doc = await this.db.get(this.docId)
|
||||||
|
} catch {
|
||||||
|
doc = { _id: this.docId }
|
||||||
|
}
|
||||||
|
|
||||||
const lockResponse = await locks.doWithLock(
|
const keysToPersist = await cache.keys(`${this.docId}:data:*`)
|
||||||
{
|
for (const key of keysToPersist) {
|
||||||
type: LockType.TRY_ONCE,
|
const data = await cache.get(key, { useTenancy: false })
|
||||||
name: LockName.PERSIST_WRITETHROUGH,
|
doc[data.key] = data.value
|
||||||
resource: key,
|
}
|
||||||
ttl: 15000,
|
|
||||||
},
|
|
||||||
async () => {
|
|
||||||
let doc: AnyDocument | undefined
|
|
||||||
try {
|
|
||||||
doc = await this.db.get(this.docId)
|
|
||||||
} catch {
|
|
||||||
doc = { _id: this.docId }
|
|
||||||
}
|
|
||||||
|
|
||||||
const keysToPersist = await cache.keys(`${this.docId}:data:*`)
|
await this.db.put(doc)
|
||||||
for (const key of keysToPersist) {
|
|
||||||
const data = await cache.get(key, { useTenancy: false })
|
|
||||||
doc[data.key] = data.value
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.db.put(doc)
|
for (const key of keysToPersist) {
|
||||||
|
await cache.delete(key, { useTenancy: false })
|
||||||
for (const key of keysToPersist) {
|
|
||||||
await cache.delete(key, { useTenancy: false })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if (!lockResponse.executed) {
|
|
||||||
throw `DocWriteThrough could not be persisted to db for ${key}`
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
import tk from "timekeeper"
|
import tk from "timekeeper"
|
||||||
import { env } from "../.."
|
|
||||||
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
||||||
import { getDB } from "../../db"
|
import { getDB } from "../../db"
|
||||||
import { DocWritethrough } from "../docWritethrough"
|
import { DocWritethrough } from "../docWritethrough"
|
||||||
import _ from "lodash"
|
import _ from "lodash"
|
||||||
|
|
||||||
env._set("MOCK_REDIS", null)
|
|
||||||
|
|
||||||
const WRITE_RATE_MS = 500
|
const WRITE_RATE_MS = 500
|
||||||
|
|
||||||
const initialTime = Date.now()
|
const initialTime = Date.now()
|
||||||
|
@ -238,5 +236,42 @@ describe("docWritethrough", () => {
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("concurrent calls will not cause multiple saves", async () => {
|
||||||
|
async function parallelPatch(count: number) {
|
||||||
|
await Promise.all(
|
||||||
|
Array.from({ length: count }).map(() =>
|
||||||
|
docWritethrough.patch(generatePatchObject(1))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
const persistToDbSpy = jest.spyOn(docWritethrough as any, "persistToDb")
|
||||||
|
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
|
||||||
|
|
||||||
|
await config.doInTenant(async () => {
|
||||||
|
await parallelPatch(5)
|
||||||
|
expect(persistToDbSpy).not.toBeCalled()
|
||||||
|
expect(storeToCacheSpy).toBeCalledTimes(5)
|
||||||
|
|
||||||
|
travelForward(WRITE_RATE_MS)
|
||||||
|
|
||||||
|
await parallelPatch(40)
|
||||||
|
|
||||||
|
expect(persistToDbSpy).toBeCalledTimes(1)
|
||||||
|
expect(storeToCacheSpy).toBeCalledTimes(45)
|
||||||
|
|
||||||
|
await parallelPatch(10)
|
||||||
|
|
||||||
|
expect(persistToDbSpy).toBeCalledTimes(1)
|
||||||
|
expect(storeToCacheSpy).toBeCalledTimes(55)
|
||||||
|
|
||||||
|
travelForward(WRITE_RATE_MS)
|
||||||
|
|
||||||
|
await parallelPatch(5)
|
||||||
|
expect(persistToDbSpy).toBeCalledTimes(2)
|
||||||
|
expect(storeToCacheSpy).toBeCalledTimes(60)
|
||||||
|
})
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue