Merge pull request #9835 from Budibase/bug/budi-5901-usage-quota-document-conflicts-can-cause
Bug - budi-5901 usage quota document conflicts can cause
This commit is contained in:
commit
9d3c48ca52
|
@ -1,10 +1,13 @@
|
||||||
import { structures, DBTestConfiguration } from "../../../tests"
|
import {
|
||||||
|
structures,
|
||||||
|
DBTestConfiguration,
|
||||||
|
expectFunctionWasCalledTimesWith,
|
||||||
|
} from "../../../tests"
|
||||||
import { Writethrough } from "../writethrough"
|
import { Writethrough } from "../writethrough"
|
||||||
import { getDB } from "../../db"
|
import { getDB } from "../../db"
|
||||||
import tk from "timekeeper"
|
import tk from "timekeeper"
|
||||||
|
|
||||||
const START_DATE = Date.now()
|
tk.freeze(Date.now())
|
||||||
tk.freeze(START_DATE)
|
|
||||||
|
|
||||||
const DELAY = 5000
|
const DELAY = 5000
|
||||||
|
|
||||||
|
@ -17,34 +20,99 @@ describe("writethrough", () => {
|
||||||
const writethrough = new Writethrough(db, DELAY)
|
const writethrough = new Writethrough(db, DELAY)
|
||||||
const writethrough2 = new Writethrough(db2, DELAY)
|
const writethrough2 = new Writethrough(db2, DELAY)
|
||||||
|
|
||||||
|
const docId = structures.uuid()
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
jest.clearAllMocks()
|
||||||
|
})
|
||||||
|
|
||||||
describe("put", () => {
|
describe("put", () => {
|
||||||
let first: any
|
let current: any
|
||||||
|
|
||||||
it("should be able to store, will go to DB", async () => {
|
it("should be able to store, will go to DB", async () => {
|
||||||
await config.doInTenant(async () => {
|
await config.doInTenant(async () => {
|
||||||
const response = await writethrough.put({ _id: "test", value: 1 })
|
const response = await writethrough.put({
|
||||||
|
_id: docId,
|
||||||
|
value: 1,
|
||||||
|
})
|
||||||
const output = await db.get(response.id)
|
const output = await db.get(response.id)
|
||||||
first = output
|
current = output
|
||||||
expect(output.value).toBe(1)
|
expect(output.value).toBe(1)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
it("second put shouldn't update DB", async () => {
|
it("second put shouldn't update DB", async () => {
|
||||||
await config.doInTenant(async () => {
|
await config.doInTenant(async () => {
|
||||||
const response = await writethrough.put({ ...first, value: 2 })
|
const response = await writethrough.put({ ...current, value: 2 })
|
||||||
const output = await db.get(response.id)
|
const output = await db.get(response.id)
|
||||||
expect(first._rev).toBe(output._rev)
|
expect(current._rev).toBe(output._rev)
|
||||||
expect(output.value).toBe(1)
|
expect(output.value).toBe(1)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
it("should put it again after delay period", async () => {
|
it("should put it again after delay period", async () => {
|
||||||
await config.doInTenant(async () => {
|
await config.doInTenant(async () => {
|
||||||
tk.freeze(START_DATE + DELAY + 1)
|
tk.freeze(Date.now() + DELAY + 1)
|
||||||
const response = await writethrough.put({ ...first, value: 3 })
|
const response = await writethrough.put({ ...current, value: 3 })
|
||||||
const output = await db.get(response.id)
|
const output = await db.get(response.id)
|
||||||
expect(response.rev).not.toBe(first._rev)
|
expect(response.rev).not.toBe(current._rev)
|
||||||
expect(output.value).toBe(3)
|
expect(output.value).toBe(3)
|
||||||
|
|
||||||
|
current = output
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should handle parallel DB updates ignoring conflicts", async () => {
|
||||||
|
await config.doInTenant(async () => {
|
||||||
|
tk.freeze(Date.now() + DELAY + 1)
|
||||||
|
const responses = await Promise.all([
|
||||||
|
writethrough.put({ ...current, value: 4 }),
|
||||||
|
writethrough.put({ ...current, value: 4 }),
|
||||||
|
writethrough.put({ ...current, value: 4 }),
|
||||||
|
])
|
||||||
|
|
||||||
|
const newRev = responses.map(x => x.rev).find(x => x !== current._rev)
|
||||||
|
expect(newRev).toBeDefined()
|
||||||
|
expect(responses.map(x => x.rev)).toEqual(
|
||||||
|
expect.arrayContaining([current._rev, current._rev, newRev])
|
||||||
|
)
|
||||||
|
expectFunctionWasCalledTimesWith(
|
||||||
|
console.warn,
|
||||||
|
2,
|
||||||
|
"bb-warn: Ignoring redlock conflict in write-through cache"
|
||||||
|
)
|
||||||
|
|
||||||
|
const output = await db.get(current._id)
|
||||||
|
expect(output.value).toBe(4)
|
||||||
|
expect(output._rev).toBe(newRev)
|
||||||
|
|
||||||
|
current = output
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
it("should handle updates with documents falling behind", async () => {
|
||||||
|
await config.doInTenant(async () => {
|
||||||
|
tk.freeze(Date.now() + DELAY + 1)
|
||||||
|
|
||||||
|
const id = structures.uuid()
|
||||||
|
await writethrough.put({ _id: id, value: 1 })
|
||||||
|
const doc = await writethrough.get(id)
|
||||||
|
|
||||||
|
// Updating document
|
||||||
|
tk.freeze(Date.now() + DELAY + 1)
|
||||||
|
await writethrough.put({ ...doc, value: 2 })
|
||||||
|
|
||||||
|
// Update with the old rev value
|
||||||
|
tk.freeze(Date.now() + DELAY + 1)
|
||||||
|
const res = await writethrough.put({
|
||||||
|
...doc,
|
||||||
|
value: 3,
|
||||||
|
})
|
||||||
|
expect(res.ok).toBe(true)
|
||||||
|
|
||||||
|
const output = await db.get(id)
|
||||||
|
expect(output.value).toBe(3)
|
||||||
|
expect(output._rev).toBe(res.rev)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
@ -52,8 +120,8 @@ describe("writethrough", () => {
|
||||||
describe("get", () => {
|
describe("get", () => {
|
||||||
it("should be able to retrieve", async () => {
|
it("should be able to retrieve", async () => {
|
||||||
await config.doInTenant(async () => {
|
await config.doInTenant(async () => {
|
||||||
const response = await writethrough.get("test")
|
const response = await writethrough.get(docId)
|
||||||
expect(response.value).toBe(3)
|
expect(response.value).toBe(4)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
|
@ -1,7 +1,8 @@
|
||||||
import BaseCache from "./base"
|
import BaseCache from "./base"
|
||||||
import { getWritethroughClient } from "../redis/init"
|
import { getWritethroughClient } from "../redis/init"
|
||||||
import { logWarn } from "../logging"
|
import { logWarn } from "../logging"
|
||||||
import { Database } from "@budibase/types"
|
import { Database, Document, LockName, LockType } from "@budibase/types"
|
||||||
|
import * as locks from "../redis/redlockImpl"
|
||||||
|
|
||||||
const DEFAULT_WRITE_RATE_MS = 10000
|
const DEFAULT_WRITE_RATE_MS = 10000
|
||||||
let CACHE: BaseCache | null = null
|
let CACHE: BaseCache | null = null
|
||||||
|
@ -27,20 +28,31 @@ function makeCacheItem(doc: any, lastWrite: number | null = null): CacheItem {
|
||||||
return { doc, lastWrite: lastWrite || Date.now() }
|
return { doc, lastWrite: lastWrite || Date.now() }
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function put(
|
async function put(
|
||||||
db: Database,
|
db: Database,
|
||||||
doc: any,
|
doc: Document,
|
||||||
writeRateMs: number = DEFAULT_WRITE_RATE_MS
|
writeRateMs: number = DEFAULT_WRITE_RATE_MS
|
||||||
) {
|
) {
|
||||||
const cache = await getCache()
|
const cache = await getCache()
|
||||||
const key = doc._id
|
const key = doc._id
|
||||||
let cacheItem: CacheItem | undefined = await cache.get(makeCacheKey(db, key))
|
let cacheItem: CacheItem | undefined
|
||||||
|
if (key) {
|
||||||
|
cacheItem = await cache.get(makeCacheKey(db, key))
|
||||||
|
}
|
||||||
const updateDb = !cacheItem || cacheItem.lastWrite < Date.now() - writeRateMs
|
const updateDb = !cacheItem || cacheItem.lastWrite < Date.now() - writeRateMs
|
||||||
let output = doc
|
let output = doc
|
||||||
if (updateDb) {
|
if (updateDb) {
|
||||||
|
const lockResponse = await locks.doWithLock(
|
||||||
|
{
|
||||||
|
type: LockType.TRY_ONCE,
|
||||||
|
name: LockName.PERSIST_WRITETHROUGH,
|
||||||
|
resource: key,
|
||||||
|
ttl: 1000,
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
const writeDb = async (toWrite: any) => {
|
const writeDb = async (toWrite: any) => {
|
||||||
// doc should contain the _id and _rev
|
// doc should contain the _id and _rev
|
||||||
const response = await db.put(toWrite)
|
const response = await db.put(toWrite, { force: true })
|
||||||
output = {
|
output = {
|
||||||
...doc,
|
...doc,
|
||||||
_id: response.id,
|
_id: response.id,
|
||||||
|
@ -58,13 +70,20 @@ export async function put(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
)
|
||||||
|
if (!lockResponse.executed) {
|
||||||
|
logWarn(`Ignoring redlock conflict in write-through cache`)
|
||||||
|
}
|
||||||
|
}
|
||||||
// if we are updating the DB then need to set the lastWrite to now
|
// if we are updating the DB then need to set the lastWrite to now
|
||||||
cacheItem = makeCacheItem(output, updateDb ? null : cacheItem?.lastWrite)
|
cacheItem = makeCacheItem(output, updateDb ? null : cacheItem?.lastWrite)
|
||||||
await cache.store(makeCacheKey(db, key), cacheItem)
|
if (output._id) {
|
||||||
|
await cache.store(makeCacheKey(db, output._id), cacheItem)
|
||||||
|
}
|
||||||
return { ok: true, id: output._id, rev: output._rev }
|
return { ok: true, id: output._id, rev: output._rev }
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function get(db: Database, id: string): Promise<any> {
|
async function get(db: Database, id: string): Promise<any> {
|
||||||
const cache = await getCache()
|
const cache = await getCache()
|
||||||
const cacheKey = makeCacheKey(db, id)
|
const cacheKey = makeCacheKey(db, id)
|
||||||
let cacheItem: CacheItem = await cache.get(cacheKey)
|
let cacheItem: CacheItem = await cache.get(cacheKey)
|
||||||
|
@ -76,11 +95,7 @@ export async function get(db: Database, id: string): Promise<any> {
|
||||||
return cacheItem.doc
|
return cacheItem.doc
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function remove(
|
async function remove(db: Database, docOrId: any, rev?: any): Promise<void> {
|
||||||
db: Database,
|
|
||||||
docOrId: any,
|
|
||||||
rev?: any
|
|
||||||
): Promise<void> {
|
|
||||||
const cache = await getCache()
|
const cache = await getCache()
|
||||||
if (!docOrId) {
|
if (!docOrId) {
|
||||||
throw new Error("No ID/Rev provided.")
|
throw new Error("No ID/Rev provided.")
|
||||||
|
|
|
@ -24,7 +24,7 @@ const getClient = async (type: LockType): Promise<Redlock> => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export const OPTIONS = {
|
const OPTIONS = {
|
||||||
TRY_ONCE: {
|
TRY_ONCE: {
|
||||||
// immediately throws an error if the lock is already held
|
// immediately throws an error if the lock is already held
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
|
@ -56,14 +56,29 @@ export const OPTIONS = {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
export const newRedlock = async (opts: Options = {}) => {
|
const newRedlock = async (opts: Options = {}) => {
|
||||||
let options = { ...OPTIONS.DEFAULT, ...opts }
|
let options = { ...OPTIONS.DEFAULT, ...opts }
|
||||||
const redisWrapper = await getLockClient()
|
const redisWrapper = await getLockClient()
|
||||||
const client = redisWrapper.getClient()
|
const client = redisWrapper.getClient()
|
||||||
return new Redlock([client], options)
|
return new Redlock([client], options)
|
||||||
}
|
}
|
||||||
|
|
||||||
export const doWithLock = async (opts: LockOptions, task: any) => {
|
type SuccessfulRedlockExecution<T> = {
|
||||||
|
executed: true
|
||||||
|
result: T
|
||||||
|
}
|
||||||
|
type UnsuccessfulRedlockExecution = {
|
||||||
|
executed: false
|
||||||
|
}
|
||||||
|
|
||||||
|
type RedlockExecution<T> =
|
||||||
|
| SuccessfulRedlockExecution<T>
|
||||||
|
| UnsuccessfulRedlockExecution
|
||||||
|
|
||||||
|
export const doWithLock = async <T>(
|
||||||
|
opts: LockOptions,
|
||||||
|
task: () => Promise<T>
|
||||||
|
): Promise<RedlockExecution<T>> => {
|
||||||
const redlock = await getClient(opts.type)
|
const redlock = await getClient(opts.type)
|
||||||
let lock
|
let lock
|
||||||
try {
|
try {
|
||||||
|
@ -73,8 +88,8 @@ export const doWithLock = async (opts: LockOptions, task: any) => {
|
||||||
let name: string = `lock:${prefix}_${opts.name}`
|
let name: string = `lock:${prefix}_${opts.name}`
|
||||||
|
|
||||||
// add additional unique name if required
|
// add additional unique name if required
|
||||||
if (opts.nameSuffix) {
|
if (opts.resource) {
|
||||||
name = name + `_${opts.nameSuffix}`
|
name = name + `_${opts.resource}`
|
||||||
}
|
}
|
||||||
|
|
||||||
// create the lock
|
// create the lock
|
||||||
|
@ -83,7 +98,7 @@ export const doWithLock = async (opts: LockOptions, task: any) => {
|
||||||
// perform locked task
|
// perform locked task
|
||||||
// need to await to ensure completion before unlocking
|
// need to await to ensure completion before unlocking
|
||||||
const result = await task()
|
const result = await task()
|
||||||
return result
|
return { executed: true, result }
|
||||||
} catch (e: any) {
|
} catch (e: any) {
|
||||||
console.warn("lock error")
|
console.warn("lock error")
|
||||||
// lock limit exceeded
|
// lock limit exceeded
|
||||||
|
@ -92,7 +107,7 @@ export const doWithLock = async (opts: LockOptions, task: any) => {
|
||||||
// don't throw for try-once locks, they will always error
|
// don't throw for try-once locks, they will always error
|
||||||
// due to retry count (0) exceeded
|
// due to retry count (0) exceeded
|
||||||
console.warn(e)
|
console.warn(e)
|
||||||
return
|
return { executed: false }
|
||||||
} else {
|
} else {
|
||||||
console.error(e)
|
console.error(e)
|
||||||
throw e
|
throw e
|
||||||
|
|
|
@ -4,4 +4,6 @@ export { generator } from "./structures"
|
||||||
export * as testEnv from "./testEnv"
|
export * as testEnv from "./testEnv"
|
||||||
export * as testContainerUtils from "./testContainerUtils"
|
export * as testContainerUtils from "./testContainerUtils"
|
||||||
|
|
||||||
|
export * from "./jestUtils"
|
||||||
|
|
||||||
export { default as DBTestConfiguration } from "./DBTestConfiguration"
|
export { default as DBTestConfiguration } from "./DBTestConfiguration"
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
export function expectFunctionWasCalledTimesWith(
|
||||||
|
jestFunction: any,
|
||||||
|
times: number,
|
||||||
|
argument: any
|
||||||
|
) {
|
||||||
|
expect(
|
||||||
|
jestFunction.mock.calls.filter((call: any) => call[0] === argument).length
|
||||||
|
).toBe(times)
|
||||||
|
}
|
|
@ -1,5 +1,12 @@
|
||||||
|
import { structures } from ".."
|
||||||
import { newid } from "../../../src/newid"
|
import { newid } from "../../../src/newid"
|
||||||
|
|
||||||
export function id() {
|
export function id() {
|
||||||
return `db_${newid()}`
|
return `db_${newid()}`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function rev() {
|
||||||
|
return `${structures.generator.character({
|
||||||
|
numeric: true,
|
||||||
|
})}-${structures.uuid().replace(/-/, "")}`
|
||||||
|
}
|
||||||
|
|
|
@ -13,6 +13,7 @@ export enum LockName {
|
||||||
TRIGGER_QUOTA = "trigger_quota",
|
TRIGGER_QUOTA = "trigger_quota",
|
||||||
SYNC_ACCOUNT_LICENSE = "sync_account_license",
|
SYNC_ACCOUNT_LICENSE = "sync_account_license",
|
||||||
UPDATE_TENANTS_DOC = "update_tenants_doc",
|
UPDATE_TENANTS_DOC = "update_tenants_doc",
|
||||||
|
PERSIST_WRITETHROUGH = "persist_writethrough",
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface LockOptions {
|
export interface LockOptions {
|
||||||
|
@ -29,9 +30,9 @@ export interface LockOptions {
|
||||||
*/
|
*/
|
||||||
ttl: number
|
ttl: number
|
||||||
/**
|
/**
|
||||||
* The suffix to add to the lock name for additional uniqueness
|
* The individual resource to lock. This is useful for locking around very specific identifiers, e.g. a document that is prone to conflicts
|
||||||
*/
|
*/
|
||||||
nameSuffix?: string
|
resource?: string
|
||||||
/**
|
/**
|
||||||
* This is a system-wide lock - don't use tenancy in lock key
|
* This is a system-wide lock - don't use tenancy in lock key
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue