DocWritethrough
This commit is contained in:
parent
76b9cbcc5f
commit
ff7c8d3b95
|
@ -0,0 +1,102 @@
|
||||||
|
import BaseCache from "./base"
|
||||||
|
import { getDocWritethroughClient } from "../redis/init"
|
||||||
|
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
||||||
|
import * as locks from "../redis/redlockImpl"
|
||||||
|
|
||||||
|
const DEFAULT_WRITE_RATE_MS = 10000
|
||||||
|
|
||||||
|
let CACHE: BaseCache | null = null
|
||||||
|
async function getCache() {
|
||||||
|
if (!CACHE) {
|
||||||
|
const client = await getDocWritethroughClient()
|
||||||
|
CACHE = new BaseCache(client)
|
||||||
|
}
|
||||||
|
return CACHE
|
||||||
|
}
|
||||||
|
|
||||||
|
interface CacheItem {
|
||||||
|
lastWrite: number
|
||||||
|
}
|
||||||
|
|
||||||
|
export class DocWritethrough {
|
||||||
|
db: Database
|
||||||
|
docId: string
|
||||||
|
writeRateMs: number
|
||||||
|
|
||||||
|
constructor(
|
||||||
|
db: Database,
|
||||||
|
docId: string,
|
||||||
|
writeRateMs: number = DEFAULT_WRITE_RATE_MS
|
||||||
|
) {
|
||||||
|
this.db = db
|
||||||
|
this.docId = docId
|
||||||
|
this.writeRateMs = writeRateMs
|
||||||
|
}
|
||||||
|
|
||||||
|
private makeCacheItem(): CacheItem {
|
||||||
|
return { lastWrite: Date.now() }
|
||||||
|
}
|
||||||
|
|
||||||
|
async patch(data: Record<string, any>) {
|
||||||
|
const cache = await getCache()
|
||||||
|
|
||||||
|
const key = `${this.docId}:info`
|
||||||
|
const cacheItem = await cache.withCache(
|
||||||
|
key,
|
||||||
|
null,
|
||||||
|
() => this.makeCacheItem(),
|
||||||
|
{
|
||||||
|
useTenancy: false,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
await this.storeToCache(cache, data)
|
||||||
|
|
||||||
|
const updateDb =
|
||||||
|
!cacheItem || cacheItem.lastWrite <= Date.now() - this.writeRateMs
|
||||||
|
// let output = this.doc
|
||||||
|
if (updateDb) {
|
||||||
|
await this.persistToDb(cache)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
const cacheKey = this.docId + ":data:" + key
|
||||||
|
await cache.store(cacheKey, { key, value }, undefined)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async persistToDb(cache: BaseCache) {
|
||||||
|
const key = `${this.db.name}_${this.docId}`
|
||||||
|
|
||||||
|
const lockResponse = await locks.doWithLock(
|
||||||
|
{
|
||||||
|
type: LockType.TRY_ONCE,
|
||||||
|
name: LockName.PERSIST_WRITETHROUGH,
|
||||||
|
resource: key,
|
||||||
|
ttl: 15000,
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
let doc: AnyDocument | undefined
|
||||||
|
try {
|
||||||
|
doc = await this.db.get(this.docId)
|
||||||
|
} catch {
|
||||||
|
doc = { _id: this.docId }
|
||||||
|
}
|
||||||
|
|
||||||
|
const keysToPersist = await cache.keys(`${this.docId}:data:*`)
|
||||||
|
for (const key of keysToPersist) {
|
||||||
|
const data = await cache.get(key, { useTenancy: false })
|
||||||
|
doc[data.key] = data.value
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.db.put(doc)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if (!lockResponse.executed) {
|
||||||
|
throw `DocWriteThrough could not be persisted to db for ${key}`
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -134,6 +134,15 @@ export class DatabaseImpl implements Database {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async docExists(id: string): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
await this.get(id)
|
||||||
|
return true
|
||||||
|
} catch {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async getMultiple<T extends Document>(
|
async getMultiple<T extends Document>(
|
||||||
ids: string[],
|
ids: string[],
|
||||||
opts?: { allowMissing?: boolean }
|
opts?: { allowMissing?: boolean }
|
||||||
|
|
|
@ -38,6 +38,13 @@ export class DDInstrumentedDatabase implements Database {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
docExists(id: string): Promise<boolean> {
|
||||||
|
return tracer.trace("db.docExists", span => {
|
||||||
|
span?.addTags({ db_name: this.name, doc_id: id })
|
||||||
|
return this.db.docExists(id)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
getMultiple<T extends Document>(
|
getMultiple<T extends Document>(
|
||||||
ids: string[],
|
ids: string[],
|
||||||
opts?: { allowMissing?: boolean | undefined } | undefined
|
opts?: { allowMissing?: boolean | undefined } | undefined
|
||||||
|
|
|
@ -122,6 +122,7 @@ export interface Database {
|
||||||
|
|
||||||
exists(): Promise<boolean>
|
exists(): Promise<boolean>
|
||||||
get<T extends Document>(id?: string): Promise<T>
|
get<T extends Document>(id?: string): Promise<T>
|
||||||
|
docExists(id: string): Promise<boolean>
|
||||||
getMultiple<T extends Document>(
|
getMultiple<T extends Document>(
|
||||||
ids: string[],
|
ids: string[],
|
||||||
opts?: { allowMissing?: boolean }
|
opts?: { allowMissing?: boolean }
|
||||||
|
|
Loading…
Reference in New Issue