Merge branch 'BUDI-8064/doc-writethrough' into BUDI-8046/scim-logger
This commit is contained in:
commit
a6666f662c
|
@ -1,41 +1,38 @@
|
||||||
import { AnyDocument, Database, LockName, LockType } from "@budibase/types"
|
import { AnyDocument, Database } from "@budibase/types"
|
||||||
|
|
||||||
import { JobQueue, createQueue } from "../queue"
|
import { JobQueue, createQueue } from "../queue"
|
||||||
import * as dbUtils from "../db"
|
import * as dbUtils from "../db"
|
||||||
import { string } from "yargs"
|
import { logWarn } from "../logging"
|
||||||
import { db } from ".."
|
|
||||||
import { locks } from "../redis"
|
|
||||||
import { Duration } from "../utils"
|
|
||||||
|
|
||||||
interface ProcessDocMessage {
|
interface ProcessDocMessage {
|
||||||
dbName: string
|
dbName: string
|
||||||
docId: string
|
docId: string
|
||||||
|
|
||||||
data: Record<string, any>
|
data: Record<string, any>
|
||||||
}
|
}
|
||||||
|
|
||||||
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
||||||
JobQueue.DOC_WRITETHROUGH_QUEUE
|
JobQueue.DOC_WRITETHROUGH_QUEUE,
|
||||||
|
{
|
||||||
|
jobOptions: {
|
||||||
|
// We might have plenty of 409, we want to allow running almost infinitely
|
||||||
|
attempts: Number.MAX_SAFE_INTEGER,
|
||||||
|
},
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
class DocWritethroughProcessor {
|
class DocWritethroughProcessor {
|
||||||
init() {
|
init() {
|
||||||
docWritethroughProcessorQueue.process(async message => {
|
docWritethroughProcessorQueue.process(async message => {
|
||||||
const result = await locks.doWithLock(
|
try {
|
||||||
{
|
await this.persistToDb(message.data)
|
||||||
type: LockType.TRY_ONCE,
|
} catch (err: any) {
|
||||||
name: LockName.PERSIST_DOC_WRITETHROUGH,
|
if (err.status === 409) {
|
||||||
resource: `${message.data.dbName}:${message.data.docId}`,
|
logWarn(`409 conflict in doc-writethrough cache`)
|
||||||
ttl: Duration.fromSeconds(60).toMs(),
|
// If we get a 409, it means that another job updated it meanwhile. We want to retry it to persist it again.
|
||||||
},
|
throw new Error(`Conflict persisting message ${message.id}`)
|
||||||
async () => {
|
|
||||||
await this.persistToDb(message.data)
|
|
||||||
}
|
}
|
||||||
)
|
|
||||||
if (!result.executed) {
|
throw err
|
||||||
throw new Error(
|
|
||||||
`Error persisting docWritethrough message: ${message.id}`
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
return this
|
return this
|
||||||
|
|
|
@ -82,11 +82,6 @@ type SuccessfulRedlockExecution<T> = {
|
||||||
}
|
}
|
||||||
type UnsuccessfulRedlockExecution = {
|
type UnsuccessfulRedlockExecution = {
|
||||||
executed: false
|
executed: false
|
||||||
reason: UnsuccessfulRedlockExecutionReason
|
|
||||||
}
|
|
||||||
|
|
||||||
export const enum UnsuccessfulRedlockExecutionReason {
|
|
||||||
LockTakenWithTryOnce = "LOCK_TAKEN_WITH_TRY_ONCE",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type RedlockExecution<T> =
|
type RedlockExecution<T> =
|
||||||
|
@ -146,10 +141,7 @@ export async function doWithLock<T>(
|
||||||
if (opts.type === LockType.TRY_ONCE) {
|
if (opts.type === LockType.TRY_ONCE) {
|
||||||
// don't throw for try-once locks, they will always error
|
// don't throw for try-once locks, they will always error
|
||||||
// due to retry count (0) exceeded
|
// due to retry count (0) exceeded
|
||||||
return {
|
return { executed: false }
|
||||||
executed: false,
|
|
||||||
reason: UnsuccessfulRedlockExecutionReason.LockTakenWithTryOnce,
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@ export interface GroupUser {
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface UserGroupRoles {
|
export interface UserGroupRoles {
|
||||||
[key: string]: string
|
[key: string]: string | undefined
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface SearchGroupRequest {}
|
export interface SearchGroupRequest {}
|
||||||
|
|
|
@ -48,7 +48,7 @@ export interface GroupAddedOnboardingEvent extends BaseEvent {
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface GroupPermissionsEditedEvent extends BaseEvent {
|
export interface GroupPermissionsEditedEvent extends BaseEvent {
|
||||||
permissions: Record<string, string>
|
permissions: Record<string, string | undefined>
|
||||||
groupId: string
|
groupId: string
|
||||||
audited: {
|
audited: {
|
||||||
name: string
|
name: string
|
||||||
|
|
|
@ -23,7 +23,6 @@ export enum LockName {
|
||||||
APP_MIGRATION = "app_migrations",
|
APP_MIGRATION = "app_migrations",
|
||||||
PROCESS_AUTO_COLUMNS = "process_auto_columns",
|
PROCESS_AUTO_COLUMNS = "process_auto_columns",
|
||||||
PROCESS_USER_INVITE = "process_user_invite",
|
PROCESS_USER_INVITE = "process_user_invite",
|
||||||
PERSIST_DOC_WRITETHROUGH = "persist_doc_writethrough",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export type LockOptions = {
|
export type LockOptions = {
|
||||||
|
|
Loading…
Reference in New Issue