Test with spies
This commit is contained in:
parent
8e8378d1be
commit
61c4b83650
|
@ -1,11 +1,11 @@
|
||||||
|
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
||||||
import BaseCache from "./base"
|
import BaseCache from "./base"
|
||||||
import { getDocWritethroughClient } from "../redis/init"
|
import { getDocWritethroughClient } from "../redis/init"
|
||||||
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
|
||||||
|
|
||||||
import { JobQueue, createQueue } from "../queue"
|
import { JobQueue, createQueue } from "../queue"
|
||||||
import * as dbUtils from "../db"
|
import * as dbUtils from "../db"
|
||||||
import { Duration, newid } from "../utils"
|
import { Duration, newid } from "../utils"
|
||||||
import { context, locks } from ".."
|
import { locks } from ".."
|
||||||
|
|
||||||
let CACHE: BaseCache | null = null
|
let CACHE: BaseCache | null = null
|
||||||
async function getCache() {
|
async function getCache() {
|
||||||
|
@ -36,26 +36,12 @@ export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
docWritethroughProcessorQueue.process(async message => {
|
class DocWritethroughProcessor {
|
||||||
const { cacheKeyPrefix, messageId } = message.data
|
init() {
|
||||||
|
docWritethroughProcessorQueue.process(async message => {
|
||||||
|
const { cacheKeyPrefix, messageId } = message.data
|
||||||
|
|
||||||
const cache = await getCache()
|
const cache = await getCache()
|
||||||
const latestMessageId = await cache.get(
|
|
||||||
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
|
|
||||||
)
|
|
||||||
if (messageId !== latestMessageId) {
|
|
||||||
// Nothing to do, another message overrode it
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const lockResponse = await locks.doWithLock(
|
|
||||||
{
|
|
||||||
type: LockType.TRY_TWICE,
|
|
||||||
name: LockName.PERSIST_DOC_WRITETHROUGH,
|
|
||||||
resource: cacheKeyPrefix,
|
|
||||||
ttl: Duration.fromSeconds(60).toMs(),
|
|
||||||
},
|
|
||||||
async () => {
|
|
||||||
const latestMessageId = await cache.get(
|
const latestMessageId = await cache.get(
|
||||||
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
|
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
|
||||||
)
|
)
|
||||||
|
@ -64,56 +50,77 @@ docWritethroughProcessorQueue.process(async message => {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
await persistToDb(cache, message.data)
|
const lockResponse = await locks.doWithLock(
|
||||||
console.log("DocWritethrough persisted", { data: message.data })
|
{
|
||||||
|
type: LockType.TRY_TWICE,
|
||||||
|
name: LockName.PERSIST_DOC_WRITETHROUGH,
|
||||||
|
resource: cacheKeyPrefix,
|
||||||
|
ttl: Duration.fromSeconds(60).toMs(),
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
const latestMessageId = await cache.get(
|
||||||
|
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID
|
||||||
|
)
|
||||||
|
if (messageId !== latestMessageId) {
|
||||||
|
// Nothing to do, another message overrode it
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
await cache.deleteIfValue(
|
await this.persistToDb(cache, message.data)
|
||||||
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID,
|
console.log("DocWritethrough persisted", { data: message.data })
|
||||||
latestMessageId
|
|
||||||
|
await cache.deleteIfValue(
|
||||||
|
REDIS_KEYS(cacheKeyPrefix).LATEST_MESSAGE_ID,
|
||||||
|
latestMessageId
|
||||||
|
)
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if (!lockResponse.executed) {
|
||||||
|
throw new Error(`Ignoring redlock conflict in write-through cache`)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
private async persistToDb(
|
||||||
|
cache: BaseCache,
|
||||||
|
{
|
||||||
|
dbName,
|
||||||
|
docId,
|
||||||
|
cacheKeyPrefix,
|
||||||
|
}: {
|
||||||
|
dbName: string
|
||||||
|
docId: string
|
||||||
|
cacheKeyPrefix: string
|
||||||
|
}
|
||||||
|
) {
|
||||||
|
const db = dbUtils.getDB(dbName)
|
||||||
|
let doc: AnyDocument | undefined
|
||||||
|
try {
|
||||||
|
doc = await db.get(docId)
|
||||||
|
} catch {
|
||||||
|
doc = { _id: docId }
|
||||||
}
|
}
|
||||||
)
|
|
||||||
|
|
||||||
if (!lockResponse.executed) {
|
const keysToPersist = await cache.keys(
|
||||||
throw new Error(`Ignoring redlock conflict in write-through cache`)
|
REDIS_KEYS(cacheKeyPrefix).DATA.GET_ALL
|
||||||
}
|
)
|
||||||
})
|
for (const key of keysToPersist) {
|
||||||
|
const data = await cache.get(key, { useTenancy: false })
|
||||||
|
doc[data.key] = data.value
|
||||||
|
}
|
||||||
|
|
||||||
export async function persistToDb(
|
await db.put(doc)
|
||||||
cache: BaseCache,
|
|
||||||
{
|
|
||||||
dbName,
|
|
||||||
docId,
|
|
||||||
cacheKeyPrefix,
|
|
||||||
}: {
|
|
||||||
dbName: string
|
|
||||||
docId: string
|
|
||||||
cacheKeyPrefix: string
|
|
||||||
}
|
|
||||||
) {
|
|
||||||
const db = dbUtils.getDB(dbName)
|
|
||||||
let doc: AnyDocument | undefined
|
|
||||||
try {
|
|
||||||
doc = await db.get(docId)
|
|
||||||
} catch {
|
|
||||||
doc = { _id: docId }
|
|
||||||
}
|
|
||||||
|
|
||||||
const keysToPersist = await cache.keys(
|
for (const key of keysToPersist) {
|
||||||
REDIS_KEYS(cacheKeyPrefix).DATA.GET_ALL
|
await cache.delete(key, { useTenancy: false })
|
||||||
)
|
}
|
||||||
for (const key of keysToPersist) {
|
|
||||||
const data = await cache.get(key, { useTenancy: false })
|
|
||||||
doc[data.key] = data.value
|
|
||||||
}
|
|
||||||
|
|
||||||
await db.put(doc)
|
|
||||||
|
|
||||||
for (const key of keysToPersist) {
|
|
||||||
await cache.delete(key, { useTenancy: false })
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const processor = new DocWritethroughProcessor().init()
|
||||||
|
|
||||||
export class DocWritethrough {
|
export class DocWritethrough {
|
||||||
private db: Database
|
private db: Database
|
||||||
private _docId: string
|
private _docId: string
|
||||||
|
|
|
@ -1,12 +1,11 @@
|
||||||
|
import _ from "lodash"
|
||||||
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
||||||
import { getDB } from "../../db"
|
import { getDB } from "../../db"
|
||||||
import _ from "lodash"
|
|
||||||
|
|
||||||
import {
|
import { DocWritethrough, processor } from "../docWritethrough"
|
||||||
DocWritethrough,
|
|
||||||
docWritethroughProcessorQueue,
|
|
||||||
} from "../docWritethrough"
|
|
||||||
import InMemoryQueue from "../../queue/inMemoryQueue"
|
import InMemoryQueue from "../../queue/inMemoryQueue"
|
||||||
|
import { docWritethroughProcessorQueue } from "../docWritethrough"
|
||||||
|
|
||||||
const WRITE_RATE_MS = 1000
|
const WRITE_RATE_MS = 1000
|
||||||
|
|
||||||
|
@ -240,12 +239,13 @@ describe("docWritethrough", () => {
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
const persistToDbSpy = jest.spyOn(processor as any, "persistToDb")
|
||||||
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
|
const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache")
|
||||||
|
|
||||||
await config.doInTenant(async () => {
|
await config.doInTenant(async () => {
|
||||||
await parallelPatch(5)
|
await parallelPatch(5)
|
||||||
expect(storeToCacheSpy).toBeCalledTimes(5)
|
expect(storeToCacheSpy).toBeCalledTimes(5)
|
||||||
|
expect(persistToDbSpy).not.toBeCalled()
|
||||||
expect(await db.exists(documentId)).toBe(false)
|
expect(await db.exists(documentId)).toBe(false)
|
||||||
|
|
||||||
await travelForward(WRITE_RATE_MS)
|
await travelForward(WRITE_RATE_MS)
|
||||||
|
@ -253,7 +253,7 @@ describe("docWritethrough", () => {
|
||||||
await parallelPatch(40)
|
await parallelPatch(40)
|
||||||
|
|
||||||
expect(storeToCacheSpy).toBeCalledTimes(45)
|
expect(storeToCacheSpy).toBeCalledTimes(45)
|
||||||
|
expect(persistToDbSpy).toBeCalledTimes(1)
|
||||||
// Ideally we want to spy on persistToDb from ./docWritethrough, but due our barrel files configuration required quite of a complex setup.
|
// Ideally we want to spy on persistToDb from ./docWritethrough, but due our barrel files configuration required quite of a complex setup.
|
||||||
// We are relying on the document being stored only once (otherwise we would have _rev updated)
|
// We are relying on the document being stored only once (otherwise we would have _rev updated)
|
||||||
expect(await db.get(documentId)).toEqual(
|
expect(await db.get(documentId)).toEqual(
|
||||||
|
|
Loading…
Reference in New Issue