Do not use lock
This commit is contained in:
parent
e584d82e6f
commit
f5e2dc7a27
|
@ -1,7 +1,6 @@
|
||||||
import BaseCache from "./base"
|
import BaseCache from "./base"
|
||||||
import { getDocWritethroughClient } from "../redis/init"
|
import { getDocWritethroughClient } from "../redis/init"
|
||||||
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
import { AnyDocument, Database } from "@budibase/types"
|
||||||
import * as locks from "../redis/redlockImpl"
|
|
||||||
|
|
||||||
import { JobQueue, createQueue } from "../queue"
|
import { JobQueue, createQueue } from "../queue"
|
||||||
import * as context from "../context"
|
import * as context from "../context"
|
||||||
|
@ -17,7 +16,6 @@ async function getCache() {
|
||||||
}
|
}
|
||||||
|
|
||||||
interface ProcessDocMessage {
|
interface ProcessDocMessage {
|
||||||
tenantId: string
|
|
||||||
dbName: string
|
dbName: string
|
||||||
docId: string
|
docId: string
|
||||||
cacheKeyPrefix: string
|
cacheKeyPrefix: string
|
||||||
|
@ -28,34 +26,8 @@ export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
||||||
)
|
)
|
||||||
|
|
||||||
docWritethroughProcessorQueue.process(async message => {
|
docWritethroughProcessorQueue.process(async message => {
|
||||||
const { tenantId, cacheKeyPrefix } = message.data
|
await persistToDb(message.data)
|
||||||
await context.doInTenant(tenantId, async () => {
|
console.log("DocWritethrough persisted", { data: message.data })
|
||||||
const lockResponse = await locks.doWithLock(
|
|
||||||
{
|
|
||||||
type: LockType.TRY_ONCE,
|
|
||||||
name: LockName.PERSIST_WRITETHROUGH,
|
|
||||||
resource: cacheKeyPrefix,
|
|
||||||
ttl: 15000,
|
|
||||||
},
|
|
||||||
async () => {
|
|
||||||
await persistToDb(message.data)
|
|
||||||
console.log("DocWritethrough persisted", { data: message.data })
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
if (!lockResponse.executed) {
|
|
||||||
if (
|
|
||||||
lockResponse.reason !==
|
|
||||||
locks.UnsuccessfulRedlockExecutionReason.LockTakenWithTryOnce
|
|
||||||
) {
|
|
||||||
console.error("Error persisting docWritethrough", {
|
|
||||||
data: message.data,
|
|
||||||
})
|
|
||||||
throw "Error persisting docWritethrough"
|
|
||||||
}
|
|
||||||
console.log(`Ignoring redlock conflict in write-through cache`)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
export async function persistToDb({
|
export async function persistToDb({
|
||||||
|
@ -94,7 +66,6 @@ export class DocWritethrough {
|
||||||
private db: Database
|
private db: Database
|
||||||
private _docId: string
|
private _docId: string
|
||||||
private writeRateMs: number
|
private writeRateMs: number
|
||||||
private tenantId: string
|
|
||||||
|
|
||||||
private cacheKeyPrefix: string
|
private cacheKeyPrefix: string
|
||||||
|
|
||||||
|
@ -103,7 +74,6 @@ export class DocWritethrough {
|
||||||
this._docId = docId
|
this._docId = docId
|
||||||
this.writeRateMs = writeRateMs
|
this.writeRateMs = writeRateMs
|
||||||
this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
|
this.cacheKeyPrefix = `${this.db.name}:${this.docId}`
|
||||||
this.tenantId = context.getTenantId()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
get docId() {
|
get docId() {
|
||||||
|
@ -117,7 +87,6 @@ export class DocWritethrough {
|
||||||
|
|
||||||
docWritethroughProcessorQueue.add(
|
docWritethroughProcessorQueue.add(
|
||||||
{
|
{
|
||||||
tenantId: this.tenantId,
|
|
||||||
dbName: this.db.name,
|
dbName: this.db.name,
|
||||||
docId: this.docId,
|
docId: this.docId,
|
||||||
cacheKeyPrefix: this.cacheKeyPrefix,
|
cacheKeyPrefix: this.cacheKeyPrefix,
|
||||||
|
|
|
@ -47,9 +47,7 @@ describe("docWritethrough", () => {
|
||||||
beforeEach(async () => {
|
beforeEach(async () => {
|
||||||
resetTime()
|
resetTime()
|
||||||
documentId = structures.uuid()
|
documentId = structures.uuid()
|
||||||
await config.doInTenant(async () => {
|
docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS)
|
||||||
docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS)
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
|
|
||||||
it("patching will not persist if timeout does not hit", async () => {
|
it("patching will not persist if timeout does not hit", async () => {
|
||||||
|
|
Loading…
Reference in New Issue