Use autoextend as locktype
This commit is contained in:
parent
26a77298ac
commit
a32582eb8a
|
@ -4,8 +4,7 @@ import { LockOptions, LockType } from "@budibase/types"
|
||||||
import * as context from "../context"
|
import * as context from "../context"
|
||||||
import env from "../environment"
|
import env from "../environment"
|
||||||
import { logWarn } from "../logging"
|
import { logWarn } from "../logging"
|
||||||
import { Duration } from "../utils"
|
import { utils } from "@budibase/shared-core"
|
||||||
import { timers } from ".."
|
|
||||||
|
|
||||||
async function getClient(
|
async function getClient(
|
||||||
type: LockType,
|
type: LockType,
|
||||||
|
@ -14,7 +13,11 @@ async function getClient(
|
||||||
if (type === LockType.CUSTOM) {
|
if (type === LockType.CUSTOM) {
|
||||||
return newRedlock(opts)
|
return newRedlock(opts)
|
||||||
}
|
}
|
||||||
if (env.isTest() && type !== LockType.TRY_ONCE) {
|
if (
|
||||||
|
env.isTest() &&
|
||||||
|
type !== LockType.TRY_ONCE &&
|
||||||
|
type !== LockType.AUTO_EXTEND
|
||||||
|
) {
|
||||||
return newRedlock(OPTIONS.TEST)
|
return newRedlock(OPTIONS.TEST)
|
||||||
}
|
}
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
@ -30,13 +33,16 @@ async function getClient(
|
||||||
case LockType.DELAY_500: {
|
case LockType.DELAY_500: {
|
||||||
return newRedlock(OPTIONS.DELAY_500)
|
return newRedlock(OPTIONS.DELAY_500)
|
||||||
}
|
}
|
||||||
|
case LockType.AUTO_EXTEND: {
|
||||||
|
return newRedlock(OPTIONS.AUTO_EXTEND)
|
||||||
|
}
|
||||||
default: {
|
default: {
|
||||||
throw new Error(`Could not get redlock client: ${type}`)
|
throw utils.unreachable(type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const OPTIONS = {
|
const OPTIONS: Record<keyof typeof LockType | "TEST", Redlock.Options> = {
|
||||||
TRY_ONCE: {
|
TRY_ONCE: {
|
||||||
// immediately throws an error if the lock is already held
|
// immediately throws an error if the lock is already held
|
||||||
retryCount: 0,
|
retryCount: 0,
|
||||||
|
@ -69,10 +75,14 @@ const OPTIONS = {
|
||||||
DELAY_500: {
|
DELAY_500: {
|
||||||
retryDelay: 500,
|
retryDelay: 500,
|
||||||
},
|
},
|
||||||
|
CUSTOM: {},
|
||||||
|
AUTO_EXTEND: {
|
||||||
|
retryCount: -1,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function newRedlock(opts: Redlock.Options = {}) {
|
export async function newRedlock(opts: Redlock.Options = {}) {
|
||||||
let options = { ...OPTIONS.DEFAULT, ...opts }
|
let options = { ...OPTIONS, ...opts }
|
||||||
const redisWrapper = await getLockClient()
|
const redisWrapper = await getLockClient()
|
||||||
const client = redisWrapper.getClient()
|
const client = redisWrapper.getClient()
|
||||||
return new Redlock([client], options)
|
return new Redlock([client], options)
|
||||||
|
@ -108,20 +118,36 @@ export async function doWithLock<T>(
|
||||||
): Promise<RedlockExecution<T>> {
|
): Promise<RedlockExecution<T>> {
|
||||||
const redlock = await getClient(opts.type, opts.customOptions)
|
const redlock = await getClient(opts.type, opts.customOptions)
|
||||||
let lock: Redlock.Lock | undefined
|
let lock: Redlock.Lock | undefined
|
||||||
let interval
|
let timeout
|
||||||
try {
|
try {
|
||||||
const name = getLockName(opts)
|
const name = getLockName(opts)
|
||||||
|
|
||||||
let ttl = opts.ttl || Duration.fromSeconds(30).toMs()
|
|
||||||
|
|
||||||
// create the lock
|
// create the lock
|
||||||
lock = await redlock.lock(name, ttl)
|
lock = await redlock.lock(name, opts.ttl)
|
||||||
|
|
||||||
if (!opts.ttl) {
|
if (opts.type === LockType.AUTO_EXTEND) {
|
||||||
// No TTL is provided, so we keep extending the lock while the task is running
|
// No TTL is provided, so we keep extending the lock while the task is running
|
||||||
interval = timers.set(async () => {
|
const extendInIntervals = (): void => {
|
||||||
await lock?.extend(ttl / 2)
|
timeout = setTimeout(async () => {
|
||||||
}, ttl / 2)
|
let isExpired = false
|
||||||
|
try {
|
||||||
|
lock = await lock!.extend(1000)
|
||||||
|
} catch (err: any) {
|
||||||
|
isExpired = err.message.includes("Cannot extend lock on resource")
|
||||||
|
if (isExpired) {
|
||||||
|
console.error("The lock expired", { name })
|
||||||
|
} else {
|
||||||
|
throw err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isExpired) {
|
||||||
|
extendInIntervals()
|
||||||
|
}
|
||||||
|
}, opts.ttl / 2)
|
||||||
|
}
|
||||||
|
|
||||||
|
extendInIntervals()
|
||||||
}
|
}
|
||||||
|
|
||||||
// perform locked task
|
// perform locked task
|
||||||
|
@ -143,11 +169,11 @@ export async function doWithLock<T>(
|
||||||
throw e
|
throw e
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
|
if (timeout) {
|
||||||
|
clearTimeout(timeout)
|
||||||
|
}
|
||||||
if (lock) {
|
if (lock) {
|
||||||
await lock.unlock()
|
await lock.unlock()
|
||||||
}
|
}
|
||||||
if (interval) {
|
|
||||||
timers.clear(interval)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,7 @@ export enum LockType {
|
||||||
DEFAULT = "default",
|
DEFAULT = "default",
|
||||||
DELAY_500 = "delay_500",
|
DELAY_500 = "delay_500",
|
||||||
CUSTOM = "custom",
|
CUSTOM = "custom",
|
||||||
|
AUTO_EXTEND = "auto_extend",
|
||||||
}
|
}
|
||||||
|
|
||||||
export enum LockName {
|
export enum LockName {
|
||||||
|
@ -36,9 +37,9 @@ export interface LockOptions {
|
||||||
*/
|
*/
|
||||||
name: LockName
|
name: LockName
|
||||||
/**
|
/**
|
||||||
* The ttl to auto-expire the lock if not unlocked manually. If undefined, the lock will be autoextending while the process is running.
|
* The ttl to auto-expire the lock if not unlocked manually.
|
||||||
*/
|
*/
|
||||||
ttl?: number
|
ttl: number
|
||||||
/**
|
/**
|
||||||
* The individual resource to lock. This is useful for locking around very specific identifiers, e.g. a document that is prone to conflicts
|
* The individual resource to lock. This is useful for locking around very specific identifiers, e.g. a document that is prone to conflicts
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue