Merge pull request #12474 from Budibase/chore/allow_redis_without_ttl
Smart locking mechanism for redis locks without TTL
This commit is contained in:
commit
28466a89ac
|
@ -72,7 +72,7 @@
|
|||
"@types/tar-fs": "2.0.1",
|
||||
"@types/uuid": "8.3.4",
|
||||
"chance": "1.1.8",
|
||||
"ioredis-mock": "8.7.0",
|
||||
"ioredis-mock": "8.9.0",
|
||||
"jest": "29.6.2",
|
||||
"jest-environment-node": "29.6.2",
|
||||
"jest-serial-runner": "1.2.1",
|
||||
|
|
|
@ -2,8 +2,9 @@ import Redlock from "redlock"
|
|||
import { getLockClient } from "./init"
|
||||
import { LockOptions, LockType } from "@budibase/types"
|
||||
import * as context from "../context"
|
||||
import env from "../environment"
|
||||
import { logWarn } from "../logging"
|
||||
import { utils } from "@budibase/shared-core"
|
||||
import { Duration } from "../utils"
|
||||
|
||||
async function getClient(
|
||||
type: LockType,
|
||||
|
@ -12,9 +13,7 @@ async function getClient(
|
|||
if (type === LockType.CUSTOM) {
|
||||
return newRedlock(opts)
|
||||
}
|
||||
if (env.isTest() && type !== LockType.TRY_ONCE) {
|
||||
return newRedlock(OPTIONS.TEST)
|
||||
}
|
||||
|
||||
switch (type) {
|
||||
case LockType.TRY_ONCE: {
|
||||
return newRedlock(OPTIONS.TRY_ONCE)
|
||||
|
@ -28,13 +27,16 @@ async function getClient(
|
|||
case LockType.DELAY_500: {
|
||||
return newRedlock(OPTIONS.DELAY_500)
|
||||
}
|
||||
case LockType.AUTO_EXTEND: {
|
||||
return newRedlock(OPTIONS.AUTO_EXTEND)
|
||||
}
|
||||
default: {
|
||||
throw new Error(`Could not get redlock client: ${type}`)
|
||||
throw utils.unreachable(type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const OPTIONS = {
|
||||
const OPTIONS: Record<keyof typeof LockType, Redlock.Options> = {
|
||||
TRY_ONCE: {
|
||||
// immediately throws an error if the lock is already held
|
||||
retryCount: 0,
|
||||
|
@ -42,11 +44,6 @@ const OPTIONS = {
|
|||
TRY_TWICE: {
|
||||
retryCount: 1,
|
||||
},
|
||||
TEST: {
|
||||
// higher retry count in unit tests
|
||||
// due to high contention.
|
||||
retryCount: 100,
|
||||
},
|
||||
DEFAULT: {
|
||||
// the expected clock drift; for more details
|
||||
// see http://redis.io/topics/distlock
|
||||
|
@ -67,10 +64,14 @@ const OPTIONS = {
|
|||
DELAY_500: {
|
||||
retryDelay: 500,
|
||||
},
|
||||
CUSTOM: {},
|
||||
AUTO_EXTEND: {
|
||||
retryCount: -1,
|
||||
},
|
||||
}
|
||||
|
||||
export async function newRedlock(opts: Redlock.Options = {}) {
|
||||
let options = { ...OPTIONS.DEFAULT, ...opts }
|
||||
const options = { ...OPTIONS.DEFAULT, ...opts }
|
||||
const redisWrapper = await getLockClient()
|
||||
const client = redisWrapper.getClient()
|
||||
return new Redlock([client], options)
|
||||
|
@ -100,17 +101,36 @@ function getLockName(opts: LockOptions) {
|
|||
return name
|
||||
}
|
||||
|
||||
export const AUTO_EXTEND_POLLING_MS = Duration.fromSeconds(10).toMs()
|
||||
|
||||
export async function doWithLock<T>(
|
||||
opts: LockOptions,
|
||||
task: () => Promise<T>
|
||||
): Promise<RedlockExecution<T>> {
|
||||
const redlock = await getClient(opts.type, opts.customOptions)
|
||||
let lock
|
||||
let lock: Redlock.Lock | undefined
|
||||
let timeout: NodeJS.Timeout | undefined
|
||||
try {
|
||||
const name = getLockName(opts)
|
||||
|
||||
const ttl =
|
||||
opts.type === LockType.AUTO_EXTEND ? AUTO_EXTEND_POLLING_MS : opts.ttl
|
||||
|
||||
// create the lock
|
||||
lock = await redlock.lock(name, opts.ttl)
|
||||
lock = await redlock.lock(name, ttl)
|
||||
|
||||
if (opts.type === LockType.AUTO_EXTEND) {
|
||||
// We keep extending the lock while the task is running
|
||||
const extendInIntervals = (): void => {
|
||||
timeout = setTimeout(async () => {
|
||||
lock = await lock!.extend(ttl, () => opts.onExtend && opts.onExtend())
|
||||
|
||||
extendInIntervals()
|
||||
}, ttl / 2)
|
||||
}
|
||||
|
||||
extendInIntervals()
|
||||
}
|
||||
|
||||
// perform locked task
|
||||
// need to await to ensure completion before unlocking
|
||||
|
@ -131,8 +151,7 @@ export async function doWithLock<T>(
|
|||
throw e
|
||||
}
|
||||
} finally {
|
||||
if (lock) {
|
||||
await lock.unlock()
|
||||
}
|
||||
clearTimeout(timeout)
|
||||
await lock?.unlock()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,105 @@
|
|||
import { LockName, LockType, LockOptions } from "@budibase/types"
|
||||
import { AUTO_EXTEND_POLLING_MS, doWithLock } from "../redlockImpl"
|
||||
import { DBTestConfiguration, generator } from "../../../tests"
|
||||
|
||||
describe("redlockImpl", () => {
|
||||
beforeEach(() => {
|
||||
jest.useFakeTimers()
|
||||
})
|
||||
|
||||
describe("doWithLock", () => {
|
||||
const config = new DBTestConfiguration()
|
||||
const lockTtl = AUTO_EXTEND_POLLING_MS
|
||||
|
||||
function runLockWithExecutionTime({
|
||||
opts,
|
||||
task,
|
||||
executionTimeMs,
|
||||
}: {
|
||||
opts: LockOptions
|
||||
task: () => Promise<string>
|
||||
executionTimeMs: number
|
||||
}) {
|
||||
return config.doInTenant(() =>
|
||||
doWithLock(opts, async () => {
|
||||
// Run in multiple intervals until hitting the expected time
|
||||
const interval = lockTtl / 10
|
||||
for (let i = executionTimeMs; i > 0; i -= interval) {
|
||||
await jest.advanceTimersByTimeAsync(interval)
|
||||
}
|
||||
return task()
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
it.each(Object.values(LockType))(
|
||||
"should return the task value and release the lock",
|
||||
async (lockType: LockType) => {
|
||||
const expectedResult = generator.guid()
|
||||
const mockTask = jest.fn().mockResolvedValue(expectedResult)
|
||||
|
||||
const opts: LockOptions = {
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
type: lockType,
|
||||
ttl: lockTtl,
|
||||
}
|
||||
|
||||
const result = await runLockWithExecutionTime({
|
||||
opts,
|
||||
task: mockTask,
|
||||
executionTimeMs: 0,
|
||||
})
|
||||
|
||||
expect(result.executed).toBe(true)
|
||||
expect(result.executed && result.result).toBe(expectedResult)
|
||||
expect(mockTask).toHaveBeenCalledTimes(1)
|
||||
}
|
||||
)
|
||||
|
||||
it("should extend when type is autoextend", async () => {
|
||||
const expectedResult = generator.guid()
|
||||
const mockTask = jest.fn().mockResolvedValue(expectedResult)
|
||||
const mockOnExtend = jest.fn()
|
||||
|
||||
const opts: LockOptions = {
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
type: LockType.AUTO_EXTEND,
|
||||
onExtend: mockOnExtend,
|
||||
}
|
||||
|
||||
const result = await runLockWithExecutionTime({
|
||||
opts,
|
||||
task: mockTask,
|
||||
executionTimeMs: lockTtl * 2.5,
|
||||
})
|
||||
|
||||
expect(result.executed).toBe(true)
|
||||
expect(result.executed && result.result).toBe(expectedResult)
|
||||
expect(mockTask).toHaveBeenCalledTimes(1)
|
||||
expect(mockOnExtend).toHaveBeenCalledTimes(5)
|
||||
})
|
||||
|
||||
it.each(Object.values(LockType).filter(t => t !== LockType.AUTO_EXTEND))(
|
||||
"should timeout when type is %s",
|
||||
async (lockType: LockType) => {
|
||||
const mockTask = jest.fn().mockResolvedValue("mockResult")
|
||||
|
||||
const opts: LockOptions = {
|
||||
name: LockName.PERSIST_WRITETHROUGH,
|
||||
type: lockType,
|
||||
ttl: lockTtl,
|
||||
}
|
||||
|
||||
await expect(
|
||||
runLockWithExecutionTime({
|
||||
opts,
|
||||
task: mockTask,
|
||||
executionTimeMs: lockTtl * 2,
|
||||
})
|
||||
).rejects.toThrowError(
|
||||
`Unable to fully release the lock on resource \"lock:${config.tenantId}_persist_writethrough\".`
|
||||
)
|
||||
}
|
||||
)
|
||||
})
|
||||
})
|
|
@ -2152,7 +2152,7 @@
|
|||
"/applications/{appId}/publish": {
|
||||
"post": {
|
||||
"operationId": "appPublish",
|
||||
"summary": "Unpublish an application",
|
||||
"summary": "Publish an application",
|
||||
"tags": [
|
||||
"applications"
|
||||
],
|
||||
|
|
|
@ -1761,7 +1761,7 @@ paths:
|
|||
"/applications/{appId}/publish":
|
||||
post:
|
||||
operationId: appPublish
|
||||
summary: Unpublish an application
|
||||
summary: Publish an application
|
||||
tags:
|
||||
- applications
|
||||
parameters:
|
||||
|
|
|
@ -10,6 +10,7 @@ export enum LockType {
|
|||
DEFAULT = "default",
|
||||
DELAY_500 = "delay_500",
|
||||
CUSTOM = "custom",
|
||||
AUTO_EXTEND = "auto_extend",
|
||||
}
|
||||
|
||||
export enum LockName {
|
||||
|
@ -21,7 +22,7 @@ export enum LockName {
|
|||
QUOTA_USAGE_EVENT = "quota_usage_event",
|
||||
}
|
||||
|
||||
export interface LockOptions {
|
||||
export type LockOptions = {
|
||||
/**
|
||||
* The lock type determines which client to use
|
||||
*/
|
||||
|
@ -35,10 +36,6 @@ export interface LockOptions {
|
|||
* The name for the lock
|
||||
*/
|
||||
name: LockName
|
||||
/**
|
||||
* The ttl to auto-expire the lock if not unlocked manually
|
||||
*/
|
||||
ttl: number
|
||||
/**
|
||||
* The individual resource to lock. This is useful for locking around very specific identifiers, e.g. a document that is prone to conflicts
|
||||
*/
|
||||
|
@ -47,4 +44,16 @@ export interface LockOptions {
|
|||
* This is a system-wide lock - don't use tenancy in lock key
|
||||
*/
|
||||
systemLock?: boolean
|
||||
}
|
||||
} & (
|
||||
| {
|
||||
/**
|
||||
* The ttl to auto-expire the lock if not unlocked manually
|
||||
*/
|
||||
ttl: number
|
||||
type: Exclude<LockType, LockType.AUTO_EXTEND>
|
||||
}
|
||||
| {
|
||||
type: LockType.AUTO_EXTEND
|
||||
onExtend?: () => void
|
||||
}
|
||||
)
|
||||
|
|
10
yarn.lock
10
yarn.lock
|
@ -12667,16 +12667,16 @@ invert-kv@^2.0.0:
|
|||
resolved "https://registry.yarnpkg.com/invert-kv/-/invert-kv-2.0.0.tgz#7393f5afa59ec9ff5f67a27620d11c226e3eec02"
|
||||
integrity sha512-wPVv/y/QQ/Uiirj/vh3oP+1Ww+AWehmi1g5fFWGPF6IpCBCDVrhgHRMvrLfdYcwDh3QJbGXDW4JAuzxElLSqKA==
|
||||
|
||||
ioredis-mock@8.7.0:
|
||||
version "8.7.0"
|
||||
resolved "https://registry.yarnpkg.com/ioredis-mock/-/ioredis-mock-8.7.0.tgz#9877a85e0d233e1b49123d1c6e320df01e9a1d36"
|
||||
integrity sha512-BJcSjkR3sIMKbH93fpFzwlWi/jl1kd5I3vLvGQxnJ/W/6bD2ksrxnyQN186ljAp3Foz4p1ivViDE3rZeKEAluA==
|
||||
ioredis-mock@8.9.0:
|
||||
version "8.9.0"
|
||||
resolved "https://registry.yarnpkg.com/ioredis-mock/-/ioredis-mock-8.9.0.tgz#5d694c4b81d3835e4291e0b527f947e260981779"
|
||||
integrity sha512-yIglcCkI1lvhwJVoMsR51fotZVsPsSk07ecTCgRTRlicG0Vq3lke6aAaHklyjmRNRsdYAgswqC2A0bPtQK4LSw==
|
||||
dependencies:
|
||||
"@ioredis/as-callback" "^3.0.0"
|
||||
"@ioredis/commands" "^1.2.0"
|
||||
fengari "^0.1.4"
|
||||
fengari-interop "^0.1.3"
|
||||
semver "^7.3.8"
|
||||
semver "^7.5.4"
|
||||
|
||||
ioredis@5.3.2:
|
||||
version "5.3.2"
|
||||
|
|
Loading…
Reference in New Issue