Add locking framework
This commit is contained in:
parent
783c012a26
commit
84685d3340
|
@ -37,6 +37,7 @@ const core = {
|
||||||
db,
|
db,
|
||||||
...dbConstants,
|
...dbConstants,
|
||||||
redis,
|
redis,
|
||||||
|
locks: redis.redlock,
|
||||||
objectStore,
|
objectStore,
|
||||||
utils,
|
utils,
|
||||||
users,
|
users,
|
||||||
|
|
|
@ -3,9 +3,11 @@
|
||||||
import Client from "../redis"
|
import Client from "../redis"
|
||||||
import utils from "../redis/utils"
|
import utils from "../redis/utils"
|
||||||
import clients from "../redis/init"
|
import clients from "../redis/init"
|
||||||
|
import * as redlock from "../redis/redlock"
|
||||||
|
|
||||||
export = {
|
export = {
|
||||||
Client,
|
Client,
|
||||||
utils,
|
utils,
|
||||||
clients,
|
clients,
|
||||||
|
redlock,
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,27 +1,23 @@
|
||||||
const Client = require("./index")
|
const Client = require("./index")
|
||||||
const utils = require("./utils")
|
const utils = require("./utils")
|
||||||
const { getRedlock } = require("./redlock")
|
|
||||||
|
|
||||||
let userClient, sessionClient, appClient, cacheClient, writethroughClient
|
let userClient,
|
||||||
let migrationsRedlock
|
sessionClient,
|
||||||
|
appClient,
|
||||||
// turn retry off so that only one instance can ever hold the lock
|
cacheClient,
|
||||||
const migrationsRedlockConfig = { retryCount: 0 }
|
writethroughClient,
|
||||||
|
lockClient
|
||||||
|
|
||||||
async function init() {
|
async function init() {
|
||||||
userClient = await new Client(utils.Databases.USER_CACHE).init()
|
userClient = await new Client(utils.Databases.USER_CACHE).init()
|
||||||
sessionClient = await new Client(utils.Databases.SESSIONS).init()
|
sessionClient = await new Client(utils.Databases.SESSIONS).init()
|
||||||
appClient = await new Client(utils.Databases.APP_METADATA).init()
|
appClient = await new Client(utils.Databases.APP_METADATA).init()
|
||||||
cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init()
|
cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init()
|
||||||
|
lockClient = await new Client(utils.Databases.LOCKS).init()
|
||||||
writethroughClient = await new Client(
|
writethroughClient = await new Client(
|
||||||
utils.Databases.WRITE_THROUGH,
|
utils.Databases.WRITE_THROUGH,
|
||||||
utils.SelectableDatabases.WRITE_THROUGH
|
utils.SelectableDatabases.WRITE_THROUGH
|
||||||
).init()
|
).init()
|
||||||
// pass the underlying ioredis client to redlock
|
|
||||||
migrationsRedlock = getRedlock(
|
|
||||||
cacheClient.getClient(),
|
|
||||||
migrationsRedlockConfig
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
process.on("exit", async () => {
|
process.on("exit", async () => {
|
||||||
|
@ -30,6 +26,7 @@ process.on("exit", async () => {
|
||||||
if (appClient) await appClient.finish()
|
if (appClient) await appClient.finish()
|
||||||
if (cacheClient) await cacheClient.finish()
|
if (cacheClient) await cacheClient.finish()
|
||||||
if (writethroughClient) await writethroughClient.finish()
|
if (writethroughClient) await writethroughClient.finish()
|
||||||
|
if (lockClient) await lockClient.finish()
|
||||||
})
|
})
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
@ -63,10 +60,10 @@ module.exports = {
|
||||||
}
|
}
|
||||||
return writethroughClient
|
return writethroughClient
|
||||||
},
|
},
|
||||||
getMigrationsRedlock: async () => {
|
getLockClient: async () => {
|
||||||
if (!migrationsRedlock) {
|
if (!lockClient) {
|
||||||
await init()
|
await init()
|
||||||
}
|
}
|
||||||
return migrationsRedlock
|
return lockClient
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,14 +1,37 @@
|
||||||
import Redlock from "redlock"
|
import Redlock, { Options } from "redlock"
|
||||||
|
import { getLockClient } from "./init"
|
||||||
|
import { LockOptions, LockType } from "@budibase/types"
|
||||||
|
import * as tenancy from "../tenancy"
|
||||||
|
|
||||||
export const getRedlock = (redisClient: any, opts = { retryCount: 10 }) => {
|
let noRetryRedlock: Redlock | undefined
|
||||||
return new Redlock([redisClient], {
|
|
||||||
|
const getClient = async (type: LockType): Promise<Redlock> => {
|
||||||
|
switch (type) {
|
||||||
|
case LockType.TRY_ONCE: {
|
||||||
|
if (!noRetryRedlock) {
|
||||||
|
noRetryRedlock = await newRedlock(OPTIONS.TRY_ONCE)
|
||||||
|
}
|
||||||
|
return noRetryRedlock
|
||||||
|
}
|
||||||
|
default: {
|
||||||
|
throw new Error(`Could not get redlock client: ${type}`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const OPTIONS = {
|
||||||
|
TRY_ONCE: {
|
||||||
|
// immediately throws an error if the lock is already held
|
||||||
|
retryCount: 0,
|
||||||
|
},
|
||||||
|
DEFAULT: {
|
||||||
// the expected clock drift; for more details
|
// the expected clock drift; for more details
|
||||||
// see http://redis.io/topics/distlock
|
// see http://redis.io/topics/distlock
|
||||||
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
|
driftFactor: 0.01, // multiplied by lock ttl to determine drift time
|
||||||
|
|
||||||
// the max number of times Redlock will attempt
|
// the max number of times Redlock will attempt
|
||||||
// to lock a resource before erroring
|
// to lock a resource before erroring
|
||||||
retryCount: opts.retryCount,
|
retryCount: 10,
|
||||||
|
|
||||||
// the time in ms between attempts
|
// the time in ms between attempts
|
||||||
retryDelay: 200, // time in ms
|
retryDelay: 200, // time in ms
|
||||||
|
@ -16,6 +39,45 @@ export const getRedlock = (redisClient: any, opts = { retryCount: 10 }) => {
|
||||||
// the max time in ms randomly added to retries
|
// the max time in ms randomly added to retries
|
||||||
// to improve performance under high contention
|
// to improve performance under high contention
|
||||||
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
|
// see https://www.awsarchitectureblog.com/2015/03/backoff.html
|
||||||
retryJitter: 200, // time in ms
|
retryJitter: 100, // time in ms
|
||||||
})
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
export const newRedlock = async (opts: Options = {}) => {
|
||||||
|
let options = { ...OPTIONS.DEFAULT, ...opts }
|
||||||
|
const redisWrapper = await getLockClient()
|
||||||
|
const client = redisWrapper.getClient()
|
||||||
|
return new Redlock([client], options)
|
||||||
|
}
|
||||||
|
|
||||||
|
export const doWithLock = async (opts: LockOptions, task: any) => {
|
||||||
|
const redlock = await getClient(opts.type)
|
||||||
|
let lock
|
||||||
|
try {
|
||||||
|
// aquire lock
|
||||||
|
let name: string = `${tenancy.getTenantId()}_${opts.name}`
|
||||||
|
if (opts.nameSuffix) {
|
||||||
|
name = name + `_${opts.nameSuffix}`
|
||||||
|
}
|
||||||
|
lock = await redlock.lock(name, opts.ttl)
|
||||||
|
// perform locked task
|
||||||
|
return task()
|
||||||
|
} catch (e: any) {
|
||||||
|
// lock limit exceeded
|
||||||
|
if (e.name === "LockError") {
|
||||||
|
if (opts.type === LockType.TRY_ONCE) {
|
||||||
|
// don't throw for try-once locks, they will always error
|
||||||
|
// due to retry count (0) exceeded
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
throw e
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (lock) {
|
||||||
|
await lock.unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,6 +28,7 @@ exports.Databases = {
|
||||||
LICENSES: "license",
|
LICENSES: "license",
|
||||||
GENERIC_CACHE: "data_cache",
|
GENERIC_CACHE: "data_cache",
|
||||||
WRITE_THROUGH: "writeThrough",
|
WRITE_THROUGH: "writeThrough",
|
||||||
|
LOCKS: "locks",
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1,5 +1,11 @@
|
||||||
import { migrations, redis } from "@budibase/backend-core"
|
import { locks, migrations } from "@budibase/backend-core"
|
||||||
import { Migration, MigrationOptions, MigrationName } from "@budibase/types"
|
import {
|
||||||
|
Migration,
|
||||||
|
MigrationOptions,
|
||||||
|
MigrationName,
|
||||||
|
LockType,
|
||||||
|
LockName,
|
||||||
|
} from "@budibase/types"
|
||||||
import env from "../environment"
|
import env from "../environment"
|
||||||
|
|
||||||
// migration functions
|
// migration functions
|
||||||
|
@ -86,33 +92,14 @@ export const migrate = async (options?: MigrationOptions) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
const migrateWithLock = async (options?: MigrationOptions) => {
|
const migrateWithLock = async (options?: MigrationOptions) => {
|
||||||
// get a new lock client
|
await locks.doWithLock(
|
||||||
const redlock = await redis.clients.getMigrationsRedlock()
|
{
|
||||||
// lock for 15 minutes
|
type: LockType.TRY_ONCE,
|
||||||
const ttl = 1000 * 60 * 15
|
name: LockName.MIGRATIONS,
|
||||||
|
ttl: 1000 * 60 * 15, // auto expire the migration lock after 15 minutes
|
||||||
let migrationLock
|
},
|
||||||
|
async () => {
|
||||||
// acquire lock
|
await migrations.runMigrations(MIGRATIONS, options)
|
||||||
try {
|
|
||||||
migrationLock = await redlock.lock("migrations", ttl)
|
|
||||||
} catch (e: any) {
|
|
||||||
if (e.name === "LockError") {
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
throw e
|
|
||||||
}
|
}
|
||||||
}
|
)
|
||||||
|
|
||||||
// run migrations
|
|
||||||
try {
|
|
||||||
await migrations.runMigrations(MIGRATIONS, options)
|
|
||||||
} finally {
|
|
||||||
// release lock
|
|
||||||
try {
|
|
||||||
await migrationLock.unlock()
|
|
||||||
} catch (e) {
|
|
||||||
console.error("unable to release migration lock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,3 +7,4 @@ export * from "./datasources"
|
||||||
export * from "./search"
|
export * from "./search"
|
||||||
export * from "./koa"
|
export * from "./koa"
|
||||||
export * from "./auth"
|
export * from "./auth"
|
||||||
|
export * from "./locks"
|
||||||
|
|
|
@ -0,0 +1,31 @@
|
||||||
|
export enum LockType {
|
||||||
|
/**
|
||||||
|
* If this lock is already held the attempted operation will not be performed.
|
||||||
|
* No retries will take place and no error will be thrown.
|
||||||
|
*/
|
||||||
|
TRY_ONCE = "try_once",
|
||||||
|
}
|
||||||
|
|
||||||
|
export enum LockName {
|
||||||
|
MIGRATIONS = "migrations",
|
||||||
|
TRIGGER_QUOTA = "trigger_quota",
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface LockOptions {
|
||||||
|
/**
|
||||||
|
* The lock type determines which client to use
|
||||||
|
*/
|
||||||
|
type: LockType
|
||||||
|
/**
|
||||||
|
* The name for the lock
|
||||||
|
*/
|
||||||
|
name: LockName
|
||||||
|
/**
|
||||||
|
* The ttl to auto-expire the lock if not unlocked manually
|
||||||
|
*/
|
||||||
|
ttl: number
|
||||||
|
/**
|
||||||
|
* The suffix to add to the lock name for additional uniqueness
|
||||||
|
*/
|
||||||
|
nameSuffix?: string
|
||||||
|
}
|
|
@ -1,5 +1,11 @@
|
||||||
import { migrations, redis } from "@budibase/backend-core"
|
import { migrations, locks } from "@budibase/backend-core"
|
||||||
import { Migration, MigrationOptions, MigrationName } from "@budibase/types"
|
import {
|
||||||
|
Migration,
|
||||||
|
MigrationOptions,
|
||||||
|
MigrationName,
|
||||||
|
LockType,
|
||||||
|
LockName,
|
||||||
|
} from "@budibase/types"
|
||||||
import env from "../environment"
|
import env from "../environment"
|
||||||
|
|
||||||
// migration functions
|
// migration functions
|
||||||
|
@ -42,33 +48,14 @@ export const migrate = async (options?: MigrationOptions) => {
|
||||||
}
|
}
|
||||||
|
|
||||||
const migrateWithLock = async (options?: MigrationOptions) => {
|
const migrateWithLock = async (options?: MigrationOptions) => {
|
||||||
// get a new lock client
|
await locks.doWithLock(
|
||||||
const redlock = await redis.clients.getMigrationsRedlock()
|
{
|
||||||
// lock for 15 minutes
|
type: LockType.TRY_ONCE,
|
||||||
const ttl = 1000 * 60 * 15
|
name: LockName.MIGRATIONS,
|
||||||
|
ttl: 1000 * 60 * 15, // auto expire the migration lock after 15 minutes
|
||||||
let migrationLock
|
},
|
||||||
|
async () => {
|
||||||
// acquire lock
|
await migrations.runMigrations(MIGRATIONS, options)
|
||||||
try {
|
|
||||||
migrationLock = await redlock.lock("migrations", ttl)
|
|
||||||
} catch (e: any) {
|
|
||||||
if (e.name === "LockError") {
|
|
||||||
return
|
|
||||||
} else {
|
|
||||||
throw e
|
|
||||||
}
|
}
|
||||||
}
|
)
|
||||||
|
|
||||||
// run migrations
|
|
||||||
try {
|
|
||||||
await migrations.runMigrations(MIGRATIONS, options)
|
|
||||||
} finally {
|
|
||||||
// release lock
|
|
||||||
try {
|
|
||||||
await migrationLock.unlock()
|
|
||||||
} catch (e) {
|
|
||||||
console.error("unable to release migration lock")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue