Merge branch 'BUDI-8064/doc-writethrough' into BUDI-8046/scim-logger
This commit is contained in:
commit
5bd5ea06e7
|
@ -7,8 +7,6 @@ import { JobQueue, createQueue } from "../queue"
|
|||
import * as context from "../context"
|
||||
import * as dbUtils from "../db"
|
||||
|
||||
const DEFAULT_WRITE_RATE_MS = 10000
|
||||
|
||||
let CACHE: BaseCache | null = null
|
||||
async function getCache() {
|
||||
if (!CACHE) {
|
||||
|
@ -29,33 +27,27 @@ export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
|||
JobQueue.DOC_WRITETHROUGH_QUEUE
|
||||
)
|
||||
|
||||
let _init = false
|
||||
export const init = () => {
|
||||
if (_init) {
|
||||
return
|
||||
}
|
||||
docWritethroughProcessorQueue.process(async message => {
|
||||
const { tenantId, cacheKeyPrefix } = message.data
|
||||
await context.doInTenant(tenantId, async () => {
|
||||
const lockResponse = await locks.doWithLock(
|
||||
{
|
||||
type: LockType.TRY_ONCE,
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
resource: cacheKeyPrefix,
|
||||
ttl: 15000,
|
||||
},
|
||||
async () => {
|
||||
await persistToDb(message.data)
|
||||
}
|
||||
)
|
||||
|
||||
if (!lockResponse.executed) {
|
||||
console.log(`Ignoring redlock conflict in write-through cache`)
|
||||
docWritethroughProcessorQueue.process(async message => {
|
||||
const { tenantId, cacheKeyPrefix } = message.data
|
||||
await context.doInTenant(tenantId, async () => {
|
||||
const lockResponse = await locks.doWithLock(
|
||||
{
|
||||
type: LockType.TRY_ONCE,
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
resource: cacheKeyPrefix,
|
||||
ttl: 15000,
|
||||
},
|
||||
async () => {
|
||||
await persistToDb(message.data)
|
||||
console.log("DocWritethrough persisted", { data: message.data })
|
||||
}
|
||||
})
|
||||
)
|
||||
|
||||
if (!lockResponse.executed) {
|
||||
console.log(`Ignoring redlock conflict in write-through cache`)
|
||||
}
|
||||
})
|
||||
_init = true
|
||||
}
|
||||
})
|
||||
|
||||
export async function persistToDb({
|
||||
dbName,
|
||||
|
@ -97,11 +89,7 @@ export class DocWritethrough {
|
|||
|
||||
private cacheKeyPrefix: string
|
||||
|
||||
constructor(
|
||||
db: Database,
|
||||
docId: string,
|
||||
writeRateMs: number = DEFAULT_WRITE_RATE_MS
|
||||
) {
|
||||
constructor(db: Database, docId: string, writeRateMs: number) {
|
||||
this.db = db
|
||||
this._docId = docId
|
||||
this.writeRateMs = writeRateMs
|
||||
|
|
|
@ -5,7 +5,6 @@ import _ from "lodash"
|
|||
import {
|
||||
DocWritethrough,
|
||||
docWritethroughProcessorQueue,
|
||||
init,
|
||||
} from "../docWritethrough"
|
||||
import InMemoryQueue from "../../queue/inMemoryQueue"
|
||||
|
||||
|
@ -45,8 +44,6 @@ describe("docWritethrough", () => {
|
|||
}, {} as Record<string, any>)
|
||||
}
|
||||
|
||||
beforeAll(() => init())
|
||||
|
||||
beforeEach(async () => {
|
||||
resetTime()
|
||||
documentId = structures.uuid()
|
||||
|
|
Loading…
Reference in New Issue