Use bull
This commit is contained in:
parent
91468d2569
commit
2d84bc5da2
|
@ -3,6 +3,9 @@ import { getDocWritethroughClient } from "../redis/init"
|
||||||
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
||||||
import * as locks from "../redis/redlockImpl"
|
import * as locks from "../redis/redlockImpl"
|
||||||
|
|
||||||
|
import { JobQueue, createQueue } from "../queue"
|
||||||
|
import { context, db as dbUtils } from ".."
|
||||||
|
|
||||||
const DEFAULT_WRITE_RATE_MS = 10000
|
const DEFAULT_WRITE_RATE_MS = 10000
|
||||||
|
|
||||||
let CACHE: BaseCache | null = null
|
let CACHE: BaseCache | null = null
|
||||||
|
@ -14,17 +17,63 @@ async function getCache() {
|
||||||
return CACHE
|
return CACHE
|
||||||
}
|
}
|
||||||
|
|
||||||
interface CacheItem {
|
interface ProcessDocMessage {
|
||||||
nextWrite: number
|
tenantId: string
|
||||||
|
dbName: string
|
||||||
|
docId: string
|
||||||
|
cacheKeyPrefix: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
||||||
|
JobQueue.DOC_WRITETHROUGH_QUEUE
|
||||||
|
)
|
||||||
|
|
||||||
|
docWritethroughProcessorQueue.process(async message => {
|
||||||
|
const { dbName, tenantId, docId, cacheKeyPrefix } = message.data
|
||||||
|
const cache = await getCache()
|
||||||
|
await context.doInTenant(tenantId, async () => {
|
||||||
|
const lockResponse = await locks.doWithLock(
|
||||||
|
{
|
||||||
|
type: LockType.TRY_ONCE,
|
||||||
|
name: LockName.PERSIST_WRITETHROUGH,
|
||||||
|
resource: cacheKeyPrefix,
|
||||||
|
ttl: 15000,
|
||||||
|
},
|
||||||
|
async () => {
|
||||||
|
const db = dbUtils.getDB(dbName)
|
||||||
|
let doc: AnyDocument | undefined
|
||||||
|
try {
|
||||||
|
doc = await db.get(docId)
|
||||||
|
} catch {
|
||||||
|
doc = { _id: docId }
|
||||||
|
}
|
||||||
|
|
||||||
|
const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`)
|
||||||
|
for (const key of keysToPersist) {
|
||||||
|
const data = await cache.get(key, { useTenancy: false })
|
||||||
|
doc[data.key] = data.value
|
||||||
|
}
|
||||||
|
|
||||||
|
await db.put(doc)
|
||||||
|
|
||||||
|
for (const key of keysToPersist) {
|
||||||
|
await cache.delete(key, { useTenancy: false })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
if (!lockResponse.executed) {
|
||||||
|
console.log(`Ignoring redlock conflict in write-through cache`)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
export class DocWritethrough {
|
export class DocWritethrough {
|
||||||
private db: Database
|
private db: Database
|
||||||
private _docId: string
|
private _docId: string
|
||||||
private writeRateMs: number
|
private writeRateMs: number
|
||||||
|
|
||||||
private cacheKeyPrefix: string
|
private cacheKeyPrefix: string
|
||||||
private docInfoCacheKey: string
|
|
||||||
|
|
||||||
constructor(
|
constructor(
|
||||||
db: Database,
|
db: Database,
|
||||||
|
@ -35,54 +84,31 @@ export class DocWritethrough {
|
||||||
this._docId = docId
|
this._docId = docId
|
||||||
this.writeRateMs = writeRateMs
|
this.writeRateMs = writeRateMs
|
||||||
this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
|
this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
|
||||||
this.docInfoCacheKey = `${this.cacheKeyPrefix}:info`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get docId() {
|
get docId() {
|
||||||
return this._docId
|
return this._docId
|
||||||
}
|
}
|
||||||
|
|
||||||
private makeNextWriteInfoItem(): CacheItem {
|
|
||||||
return { nextWrite: Date.now() + this.writeRateMs }
|
|
||||||
}
|
|
||||||
|
|
||||||
async patch(data: Record<string, any>) {
|
async patch(data: Record<string, any>) {
|
||||||
const cache = await getCache()
|
const cache = await getCache()
|
||||||
|
|
||||||
await this.storeToCache(cache, data)
|
await this.storeToCache(cache, data)
|
||||||
|
|
||||||
const updateDb = await this.shouldUpdateDb(cache)
|
docWritethroughProcessorQueue.add(
|
||||||
|
|
||||||
if (updateDb) {
|
|
||||||
const lockResponse = await locks.doWithLock(
|
|
||||||
{
|
{
|
||||||
type: LockType.TRY_ONCE,
|
tenantId: context.getTenantId(),
|
||||||
name: LockName.PERSIST_WRITETHROUGH,
|
dbName: this.db.name,
|
||||||
resource: this.docInfoCacheKey,
|
docId: this.docId,
|
||||||
ttl: 15000,
|
cacheKeyPrefix: this.cacheKeyPrefix,
|
||||||
},
|
},
|
||||||
async () => {
|
{
|
||||||
if (await this.shouldUpdateDb(cache)) {
|
delay: this.writeRateMs - 1,
|
||||||
await this.persistToDb(cache)
|
jobId: this.cacheKeyPrefix,
|
||||||
await cache.store(
|
removeOnFail: true,
|
||||||
this.docInfoCacheKey,
|
removeOnComplete: true,
|
||||||
this.makeNextWriteInfoItem()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
if (!lockResponse.executed) {
|
|
||||||
console.log(`Ignoring redlock conflict in write-through cache`)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async shouldUpdateDb(cache: BaseCache) {
|
|
||||||
const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () =>
|
|
||||||
this.makeNextWriteInfoItem()
|
|
||||||
)
|
|
||||||
return Date.now() >= cacheItem.nextWrite
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
|
private async storeToCache(cache: BaseCache, data: Record<string, any>) {
|
||||||
|
@ -91,25 +117,4 @@ export class DocWritethrough {
|
||||||
await cache.store(cacheKey, { key, value }, undefined)
|
await cache.store(cacheKey, { key, value }, undefined)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async persistToDb(cache: BaseCache) {
|
|
||||||
let doc: AnyDocument | undefined
|
|
||||||
try {
|
|
||||||
doc = await this.db.get(this.docId)
|
|
||||||
} catch {
|
|
||||||
doc = { _id: this.docId }
|
|
||||||
}
|
|
||||||
|
|
||||||
const keysToPersist = await cache.keys(`${this.cacheKeyPrefix}:data:*`)
|
|
||||||
for (const key of keysToPersist) {
|
|
||||||
const data = await cache.get(key, { useTenancy: false })
|
|
||||||
doc[data.key] = data.value
|
|
||||||
}
|
|
||||||
|
|
||||||
await this.db.put(doc)
|
|
||||||
|
|
||||||
for (const key of keysToPersist) {
|
|
||||||
await cache.delete(key, { useTenancy: false })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue