Merge pull request #10739 from Budibase/fix/multi-dev-conflicts
Multidev lock and conflict management
This commit is contained in:
commit
1547280514
|
@ -72,16 +72,12 @@ describe("writethrough", () => {
|
|||
writethrough.put({ ...current, value: 4 }),
|
||||
])
|
||||
|
||||
// with a lock, this will work
|
||||
const newRev = responses.map(x => x.rev).find(x => x !== current._rev)
|
||||
expect(newRev).toBeDefined()
|
||||
expect(responses.map(x => x.rev)).toEqual(
|
||||
expect.arrayContaining([current._rev, current._rev, newRev])
|
||||
)
|
||||
expectFunctionWasCalledTimesWith(
|
||||
mocks.alerts.logWarn,
|
||||
2,
|
||||
"Ignoring redlock conflict in write-through cache"
|
||||
)
|
||||
|
||||
const output = await db.get(current._id)
|
||||
expect(output.value).toBe(4)
|
||||
|
|
|
@ -4,10 +4,10 @@ import { LockOptions, LockType } from "@budibase/types"
|
|||
import * as context from "../context"
|
||||
import env from "../environment"
|
||||
|
||||
const getClient = async (
|
||||
async function getClient(
|
||||
type: LockType,
|
||||
opts?: Redlock.Options
|
||||
): Promise<Redlock> => {
|
||||
): Promise<Redlock> {
|
||||
if (type === LockType.CUSTOM) {
|
||||
return newRedlock(opts)
|
||||
}
|
||||
|
@ -18,6 +18,9 @@ const getClient = async (
|
|||
case LockType.TRY_ONCE: {
|
||||
return newRedlock(OPTIONS.TRY_ONCE)
|
||||
}
|
||||
case LockType.TRY_TWICE: {
|
||||
return newRedlock(OPTIONS.TRY_TWICE)
|
||||
}
|
||||
case LockType.DEFAULT: {
|
||||
return newRedlock(OPTIONS.DEFAULT)
|
||||
}
|
||||
|
@ -35,6 +38,9 @@ const OPTIONS = {
|
|||
// immediately throws an error if the lock is already held
|
||||
retryCount: 0,
|
||||
},
|
||||
TRY_TWICE: {
|
||||
retryCount: 1,
|
||||
},
|
||||
TEST: {
|
||||
// higher retry count in unit tests
|
||||
// due to high contention.
|
||||
|
@ -62,7 +68,7 @@ const OPTIONS = {
|
|||
},
|
||||
}
|
||||
|
||||
const newRedlock = async (opts: Redlock.Options = {}) => {
|
||||
export async function newRedlock(opts: Redlock.Options = {}) {
|
||||
let options = { ...OPTIONS.DEFAULT, ...opts }
|
||||
const redisWrapper = await getLockClient()
|
||||
const client = redisWrapper.getClient()
|
||||
|
@ -81,22 +87,26 @@ type RedlockExecution<T> =
|
|||
| SuccessfulRedlockExecution<T>
|
||||
| UnsuccessfulRedlockExecution
|
||||
|
||||
export const doWithLock = async <T>(
|
||||
function getLockName(opts: LockOptions) {
|
||||
// determine lock name
|
||||
// by default use the tenantId for uniqueness, unless using a system lock
|
||||
const prefix = opts.systemLock ? "system" : context.getTenantId()
|
||||
let name: string = `lock:${prefix}_${opts.name}`
|
||||
// add additional unique name if required
|
||||
if (opts.resource) {
|
||||
name = name + `_${opts.resource}`
|
||||
}
|
||||
return name
|
||||
}
|
||||
|
||||
export async function doWithLock<T>(
|
||||
opts: LockOptions,
|
||||
task: () => Promise<T>
|
||||
): Promise<RedlockExecution<T>> => {
|
||||
): Promise<RedlockExecution<T>> {
|
||||
const redlock = await getClient(opts.type, opts.customOptions)
|
||||
let lock
|
||||
try {
|
||||
// determine lock name
|
||||
// by default use the tenantId for uniqueness, unless using a system lock
|
||||
const prefix = opts.systemLock ? "system" : context.getTenantId()
|
||||
let name: string = `lock:${prefix}_${opts.name}`
|
||||
|
||||
// add additional unique name if required
|
||||
if (opts.resource) {
|
||||
name = name + `_${opts.resource}`
|
||||
}
|
||||
const name = getLockName(opts)
|
||||
|
||||
// create the lock
|
||||
lock = await redlock.lock(name, opts.ttl)
|
||||
|
@ -112,7 +122,6 @@ export const doWithLock = async <T>(
|
|||
if (opts.type === LockType.TRY_ONCE) {
|
||||
// don't throw for try-once locks, they will always error
|
||||
// due to retry count (0) exceeded
|
||||
console.warn(e)
|
||||
return { executed: false }
|
||||
} else {
|
||||
console.error(e)
|
||||
|
|
|
@ -9,8 +9,8 @@ import {
|
|||
checkDebounce,
|
||||
setDebounce,
|
||||
} from "../utilities/redis"
|
||||
import { db as dbCore, cache, permissions } from "@budibase/backend-core"
|
||||
import { BBContext, Database } from "@budibase/types"
|
||||
import { db as dbCore, cache } from "@budibase/backend-core"
|
||||
import { UserCtx, Database } from "@budibase/types"
|
||||
|
||||
const DEBOUNCE_TIME_SEC = 30
|
||||
|
||||
|
@ -23,7 +23,7 @@ const DEBOUNCE_TIME_SEC = 30
|
|||
* through the authorized middleware *
|
||||
****************************************************/
|
||||
|
||||
async function checkDevAppLocks(ctx: BBContext) {
|
||||
async function checkDevAppLocks(ctx: UserCtx) {
|
||||
const appId = ctx.appId
|
||||
|
||||
// if any public usage, don't proceed
|
||||
|
@ -42,7 +42,7 @@ async function checkDevAppLocks(ctx: BBContext) {
|
|||
}
|
||||
}
|
||||
|
||||
async function updateAppUpdatedAt(ctx: BBContext) {
|
||||
async function updateAppUpdatedAt(ctx: UserCtx) {
|
||||
const appId = ctx.appId
|
||||
// if debouncing skip this update
|
||||
// get methods also aren't updating
|
||||
|
@ -50,20 +50,29 @@ async function updateAppUpdatedAt(ctx: BBContext) {
|
|||
return
|
||||
}
|
||||
await dbCore.doWithDB(appId, async (db: Database) => {
|
||||
const metadata = await db.get(DocumentType.APP_METADATA)
|
||||
metadata.updatedAt = new Date().toISOString()
|
||||
try {
|
||||
const metadata = await db.get(DocumentType.APP_METADATA)
|
||||
metadata.updatedAt = new Date().toISOString()
|
||||
|
||||
metadata.updatedBy = getGlobalIDFromUserMetadataID(ctx.user?.userId!)
|
||||
metadata.updatedBy = getGlobalIDFromUserMetadataID(ctx.user?.userId!)
|
||||
|
||||
const response = await db.put(metadata)
|
||||
metadata._rev = response.rev
|
||||
await cache.app.invalidateAppMetadata(appId, metadata)
|
||||
// set a new debounce record with a short TTL
|
||||
await setDebounce(appId, DEBOUNCE_TIME_SEC)
|
||||
const response = await db.put(metadata)
|
||||
metadata._rev = response.rev
|
||||
await cache.app.invalidateAppMetadata(appId, metadata)
|
||||
// set a new debounce record with a short TTL
|
||||
await setDebounce(appId, DEBOUNCE_TIME_SEC)
|
||||
} catch (err: any) {
|
||||
// if a 409 occurs, then multiple clients connected at the same time - ignore
|
||||
if (err?.status === 409) {
|
||||
return
|
||||
} else {
|
||||
throw err
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export default async function builder(ctx: BBContext) {
|
||||
export default async function builder(ctx: UserCtx) {
|
||||
const appId = ctx.appId
|
||||
// this only functions within an app context
|
||||
if (!appId) {
|
||||
|
|
|
@ -35,10 +35,20 @@ export const getComponentLibraryManifest = async (library: string) => {
|
|||
const filename = "manifest.json"
|
||||
|
||||
if (env.isDev() || env.isTest()) {
|
||||
const path = join(TOP_LEVEL_PATH, "packages/client", filename)
|
||||
// always load from new so that updates are refreshed
|
||||
delete require.cache[require.resolve(path)]
|
||||
return require(path)
|
||||
const paths = [
|
||||
join(TOP_LEVEL_PATH, "packages/client", filename),
|
||||
join(process.cwd(), "client", filename),
|
||||
]
|
||||
for (let path of paths) {
|
||||
if (fs.existsSync(path)) {
|
||||
// always load from new so that updates are refreshed
|
||||
delete require.cache[require.resolve(path)]
|
||||
return require(path)
|
||||
}
|
||||
}
|
||||
throw new Error(
|
||||
`Unable to find ${filename} in development environment (may need to build).`
|
||||
)
|
||||
}
|
||||
|
||||
if (!appId) {
|
||||
|
|
|
@ -6,6 +6,7 @@ export enum LockType {
|
|||
* No retries will take place and no error will be thrown.
|
||||
*/
|
||||
TRY_ONCE = "try_once",
|
||||
TRY_TWICE = "try_twice",
|
||||
DEFAULT = "default",
|
||||
DELAY_500 = "delay_500",
|
||||
CUSTOM = "custom",
|
||||
|
|
Loading…
Reference in New Issue