Use redlock for writethrough
This commit is contained in:
parent
66217d6b08
commit
c254c565e4
|
@ -1,10 +1,13 @@
|
|||
import { structures, DBTestConfiguration } from "../../../tests"
|
||||
import {
|
||||
structures,
|
||||
DBTestConfiguration,
|
||||
expectFunctionWasCalledTimesWith,
|
||||
} from "../../../tests"
|
||||
import { Writethrough } from "../writethrough"
|
||||
import { getDB } from "../../db"
|
||||
import tk from "timekeeper"
|
||||
|
||||
const START_DATE = Date.now()
|
||||
tk.freeze(START_DATE)
|
||||
tk.freeze(Date.now())
|
||||
|
||||
const DELAY = 5000
|
||||
|
||||
|
@ -17,34 +20,67 @@ describe("writethrough", () => {
|
|||
const writethrough = new Writethrough(db, DELAY)
|
||||
const writethrough2 = new Writethrough(db2, DELAY)
|
||||
|
||||
const docId = structures.uuid()
|
||||
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks()
|
||||
})
|
||||
|
||||
describe("put", () => {
|
||||
let first: any
|
||||
let current: any
|
||||
|
||||
it("should be able to store, will go to DB", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const response = await writethrough.put({ _id: "test", value: 1 })
|
||||
const response = await writethrough.put({
|
||||
_id: docId,
|
||||
value: 1,
|
||||
})
|
||||
const output = await db.get(response.id)
|
||||
first = output
|
||||
current = output
|
||||
expect(output.value).toBe(1)
|
||||
})
|
||||
})
|
||||
|
||||
it("second put shouldn't update DB", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const response = await writethrough.put({ ...first, value: 2 })
|
||||
const response = await writethrough.put({ ...current, value: 2 })
|
||||
const output = await db.get(response.id)
|
||||
expect(first._rev).toBe(output._rev)
|
||||
expect(current._rev).toBe(output._rev)
|
||||
expect(output.value).toBe(1)
|
||||
})
|
||||
})
|
||||
|
||||
it("should put it again after delay period", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
tk.freeze(START_DATE + DELAY + 1)
|
||||
const response = await writethrough.put({ ...first, value: 3 })
|
||||
tk.freeze(Date.now() + DELAY + 1)
|
||||
const response = await writethrough.put({ ...current, value: 3 })
|
||||
const output = await db.get(response.id)
|
||||
expect(response.rev).not.toBe(first._rev)
|
||||
expect(response.rev).not.toBe(current._rev)
|
||||
expect(output.value).toBe(3)
|
||||
|
||||
current = output
|
||||
})
|
||||
})
|
||||
|
||||
it("should handle parallel DB updates ignoring conflicts", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
tk.freeze(Date.now() + DELAY + 1)
|
||||
const responses = await Promise.all([
|
||||
writethrough.put({ ...current, value: 4 }),
|
||||
writethrough.put({ ...current, value: 4 }),
|
||||
writethrough.put({ ...current, value: 4 }),
|
||||
])
|
||||
|
||||
const newRev = responses.map(x => x.rev).find(x => x !== current._rev)
|
||||
expect(newRev).toBeDefined()
|
||||
expect(responses.map(x => x.rev)).toEqual(
|
||||
expect.arrayContaining([current._rev, current._rev, newRev])
|
||||
)
|
||||
expectFunctionWasCalledTimesWith(
|
||||
console.warn,
|
||||
2,
|
||||
"bb-warn: Ignoring redlock conflict in write-through cache"
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -52,8 +88,8 @@ describe("writethrough", () => {
|
|||
describe("get", () => {
|
||||
it("should be able to retrieve", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const response = await writethrough.get("test")
|
||||
expect(response.value).toBe(3)
|
||||
const response = await writethrough.get(docId)
|
||||
expect(response.value).toBe(4)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
import BaseCache from "./base"
|
||||
import { getWritethroughClient } from "../redis/init"
|
||||
import { logWarn } from "../logging"
|
||||
import { Database } from "@budibase/types"
|
||||
import { Database, Document, LockName, LockType } from "@budibase/types"
|
||||
import * as locks from "../redis/redlockImpl"
|
||||
|
||||
const DEFAULT_WRITE_RATE_MS = 10000
|
||||
let CACHE: BaseCache | null = null
|
||||
|
@ -29,38 +30,55 @@ function makeCacheItem(doc: any, lastWrite: number | null = null): CacheItem {
|
|||
|
||||
export async function put(
|
||||
db: Database,
|
||||
doc: any,
|
||||
doc: Document,
|
||||
writeRateMs: number = DEFAULT_WRITE_RATE_MS
|
||||
) {
|
||||
const cache = await getCache()
|
||||
const key = doc._id
|
||||
let cacheItem: CacheItem | undefined = await cache.get(makeCacheKey(db, key))
|
||||
let cacheItem: CacheItem | undefined
|
||||
if (key) {
|
||||
cacheItem = await cache.get(makeCacheKey(db, key))
|
||||
}
|
||||
const updateDb = !cacheItem || cacheItem.lastWrite < Date.now() - writeRateMs
|
||||
let output = doc
|
||||
if (updateDb) {
|
||||
const writeDb = async (toWrite: any) => {
|
||||
// doc should contain the _id and _rev
|
||||
const response = await db.put(toWrite)
|
||||
output = {
|
||||
...doc,
|
||||
_id: response.id,
|
||||
_rev: response.rev,
|
||||
}
|
||||
}
|
||||
try {
|
||||
await writeDb(doc)
|
||||
} catch (err: any) {
|
||||
if (err.status !== 409) {
|
||||
throw err
|
||||
} else {
|
||||
// Swallow 409s but log them
|
||||
logWarn(`Ignoring conflict in write-through cache`)
|
||||
const lockResponse = await locks.doWithLock(
|
||||
{
|
||||
type: LockType.TRY_ONCE,
|
||||
name: LockName.PERSIST_WRITETHROUGH(key!),
|
||||
ttl: 1000,
|
||||
},
|
||||
async () => {
|
||||
const writeDb = async (toWrite: any) => {
|
||||
// doc should contain the _id and _rev
|
||||
const response = await db.put(toWrite)
|
||||
output = {
|
||||
...doc,
|
||||
_id: response.id,
|
||||
_rev: response.rev,
|
||||
}
|
||||
}
|
||||
try {
|
||||
await writeDb(doc)
|
||||
} catch (err: any) {
|
||||
if (err.status !== 409) {
|
||||
throw err
|
||||
} else {
|
||||
// Swallow 409s but log them
|
||||
logWarn(`Ignoring conflict in write-through cache`)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
if (!lockResponse.executed) {
|
||||
logWarn(`Ignoring redlock conflict in write-through cache`)
|
||||
}
|
||||
}
|
||||
// if we are updating the DB then need to set the lastWrite to now
|
||||
cacheItem = makeCacheItem(output, updateDb ? null : cacheItem?.lastWrite)
|
||||
await cache.store(makeCacheKey(db, key), cacheItem)
|
||||
if (output._id) {
|
||||
await cache.store(makeCacheKey(db, output._id), cacheItem)
|
||||
}
|
||||
return { ok: true, id: output._id, rev: output._rev }
|
||||
}
|
||||
|
||||
|
|
|
@ -4,4 +4,6 @@ export { generator } from "./structures"
|
|||
export * as testEnv from "./testEnv"
|
||||
export * as testContainerUtils from "./testContainerUtils"
|
||||
|
||||
export * from "./jestUtils"
|
||||
|
||||
export { default as DBTestConfiguration } from "./DBTestConfiguration"
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
export function expectFunctionWasCalledTimesWith(
|
||||
jestFunction: any,
|
||||
times: number,
|
||||
argument: any
|
||||
) {
|
||||
expect(
|
||||
jestFunction.mock.calls.filter((call: any) => call[0] === argument).length
|
||||
).toBe(times)
|
||||
}
|
|
@ -8,11 +8,22 @@ export enum LockType {
|
|||
DELAY_500 = "delay_500",
|
||||
}
|
||||
|
||||
export enum LockName {
|
||||
MIGRATIONS = "migrations",
|
||||
TRIGGER_QUOTA = "trigger_quota",
|
||||
SYNC_ACCOUNT_LICENSE = "sync_account_license",
|
||||
UPDATE_TENANTS_DOC = "update_tenants_doc",
|
||||
export class LockName {
|
||||
static readonly MIGRATIONS = new LockName("migrations")
|
||||
static readonly TRIGGER_QUOTA = new LockName("trigger_quota")
|
||||
static readonly SYNC_ACCOUNT_LICENSE = new LockName("sync_account_license")
|
||||
static readonly UPDATE_TENANTS_DOC = new LockName("update_tenants_doc")
|
||||
static readonly PERSIST_WRITETHROUGH = (key: string) =>
|
||||
new LockName(`persist_writethrough_${key}`)
|
||||
|
||||
constructor(public readonly value: string) {}
|
||||
|
||||
valueOf() {
|
||||
return this.value
|
||||
}
|
||||
toString() {
|
||||
return this.valueOf()
|
||||
}
|
||||
}
|
||||
|
||||
export interface LockOptions {
|
||||
|
|
Loading…
Reference in New Issue