Merge pull request #12280 from Budibase/fix/update-bull-queue-parameters

Restore bull parameter changes + reduce Redis config complexity
This commit is contained in:
Martin McKeaveney 2023-11-06 19:42:16 +00:00 committed by GitHub
commit 880e9a911f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 142 additions and 54 deletions

View File

@ -30,6 +30,7 @@ export * as timers from "./timers"
export { default as env } from "./environment" export { default as env } from "./environment"
export * as blacklist from "./blacklist" export * as blacklist from "./blacklist"
export * as docUpdates from "./docUpdates" export * as docUpdates from "./docUpdates"
export * from "./utils/Duration"
export { SearchParams } from "./db" export { SearchParams } from "./db"
// Add context to tenancy for backwards compatibility // Add context to tenancy for backwards compatibility
// only do this for external usages to prevent internal // only do this for external usages to prevent internal

View File

@ -36,7 +36,7 @@ class InMemoryQueue {
* @param opts This is not used by the in memory queue as there is no real use * @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull * case when in memory, but is the same API as Bull
*/ */
constructor(name: string, opts = null) { constructor(name: string, opts?: any) {
this._name = name this._name = name
this._opts = opts this._opts = opts
this._messages = [] this._messages = []

View File

@ -2,11 +2,17 @@ import env from "../environment"
import { getRedisOptions } from "../redis/utils" import { getRedisOptions } from "../redis/utils"
import { JobQueue } from "./constants" import { JobQueue } from "./constants"
import InMemoryQueue from "./inMemoryQueue" import InMemoryQueue from "./inMemoryQueue"
import BullQueue from "bull" import BullQueue, { QueueOptions } from "bull"
import { addListeners, StalledFn } from "./listeners" import { addListeners, StalledFn } from "./listeners"
import { Duration } from "../utils"
import * as timers from "../timers" import * as timers from "../timers"
const CLEANUP_PERIOD_MS = 60 * 1000 // the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
// queue lock is refreshed every 30 seconds
const QUEUE_LOCK_RENEW_INTERNAL_MS = Duration.fromSeconds(30).toMs()
// cleanup the queue every 60 seconds
const CLEANUP_PERIOD_MS = Duration.fromSeconds(60).toMs()
let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = [] let QUEUES: BullQueue.Queue[] | InMemoryQueue[] = []
let cleanupInterval: NodeJS.Timeout let cleanupInterval: NodeJS.Timeout
@ -20,8 +26,15 @@ export function createQueue<T>(
jobQueue: JobQueue, jobQueue: JobQueue,
opts: { removeStalledCb?: StalledFn } = {} opts: { removeStalledCb?: StalledFn } = {}
): BullQueue.Queue<T> { ): BullQueue.Queue<T> {
const { opts: redisOpts, redisProtocolUrl } = getRedisOptions() const redisOpts = getRedisOptions()
const queueConfig: any = redisProtocolUrl || { redis: redisOpts } const queueConfig: QueueOptions = {
redis: redisOpts,
settings: {
maxStalledCount: 0,
lockDuration: QUEUE_LOCK_MS,
lockRenewTime: QUEUE_LOCK_RENEW_INTERNAL_MS,
},
}
let queue: any let queue: any
if (!env.isTest()) { if (!env.isTest()) {
queue = new BullQueue(jobQueue, queueConfig) queue = new BullQueue(jobQueue, queueConfig)

View File

@ -16,6 +16,7 @@ import {
getRedisOptions, getRedisOptions,
SEPARATOR, SEPARATOR,
SelectableDatabase, SelectableDatabase,
getRedisConnectionDetails,
} from "./utils" } from "./utils"
import * as timers from "../timers" import * as timers from "../timers"
@ -91,12 +92,11 @@ function init(selectDb = DEFAULT_SELECT_DB) {
if (client) { if (client) {
client.disconnect() client.disconnect()
} }
const { redisProtocolUrl, opts, host, port } = getRedisOptions() const { host, port } = getRedisConnectionDetails()
const opts = getRedisOptions()
if (CLUSTERED) { if (CLUSTERED) {
client = new RedisCore.Cluster([{ host, port }], opts) client = new RedisCore.Cluster([{ host, port }], opts)
} else if (redisProtocolUrl) {
client = new RedisCore(redisProtocolUrl)
} else { } else {
client = new RedisCore(opts) client = new RedisCore(opts)
} }

View File

@ -1,4 +1,5 @@
import env from "../environment" import env from "../environment"
import * as Redis from "ioredis"
const SLOT_REFRESH_MS = 2000 const SLOT_REFRESH_MS = 2000
const CONNECT_TIMEOUT_MS = 10000 const CONNECT_TIMEOUT_MS = 10000
@ -42,7 +43,7 @@ export enum Databases {
export enum SelectableDatabase { export enum SelectableDatabase {
DEFAULT = 0, DEFAULT = 0,
SOCKET_IO = 1, SOCKET_IO = 1,
UNUSED_1 = 2, RATE_LIMITING = 2,
UNUSED_2 = 3, UNUSED_2 = 3,
UNUSED_3 = 4, UNUSED_3 = 4,
UNUSED_4 = 5, UNUSED_4 = 5,
@ -58,7 +59,7 @@ export enum SelectableDatabase {
UNUSED_14 = 15, UNUSED_14 = 15,
} }
export function getRedisOptions() { export function getRedisConnectionDetails() {
let password = env.REDIS_PASSWORD let password = env.REDIS_PASSWORD
let url: string[] | string = env.REDIS_URL.split("//") let url: string[] | string = env.REDIS_URL.split("//")
// get rid of the protocol // get rid of the protocol
@ -74,28 +75,34 @@ export function getRedisOptions() {
} }
const [host, port] = url.split(":") const [host, port] = url.split(":")
let redisProtocolUrl return {
host,
// fully qualified redis URL password,
if (/rediss?:\/\//.test(env.REDIS_URL)) { port: parseInt(port),
redisProtocolUrl = env.REDIS_URL
} }
}
const opts: any = { export function getRedisOptions() {
const { host, password, port } = getRedisConnectionDetails()
let redisOpts: Redis.RedisOptions = {
connectTimeout: CONNECT_TIMEOUT_MS, connectTimeout: CONNECT_TIMEOUT_MS,
port: port,
host,
password,
} }
let opts: Redis.ClusterOptions | Redis.RedisOptions = redisOpts
if (env.REDIS_CLUSTERED) { if (env.REDIS_CLUSTERED) {
opts.redisOptions = {} opts = {
opts.redisOptions.tls = {} connectTimeout: CONNECT_TIMEOUT_MS,
opts.redisOptions.password = password redisOptions: {
opts.slotsRefreshTimeout = SLOT_REFRESH_MS ...redisOpts,
opts.dnsLookup = (address: string, callback: any) => callback(null, address) tls: {},
} else { },
opts.host = host slotsRefreshTimeout: SLOT_REFRESH_MS,
opts.port = port dnsLookup: (address: string, callback: any) => callback(null, address),
opts.password = password } as Redis.ClusterOptions
} }
return { opts, host, port: parseInt(port), redisProtocolUrl } return opts
} }
export function addDbPrefix(db: string, key: string) { export function addDbPrefix(db: string, key: string) {

View File

@ -0,0 +1,49 @@
export enum DurationType {
MILLISECONDS = "milliseconds",
SECONDS = "seconds",
MINUTES = "minutes",
HOURS = "hours",
DAYS = "days",
}
const conversion: Record<DurationType, number> = {
milliseconds: 1,
seconds: 1000,
minutes: 60 * 1000,
hours: 60 * 60 * 1000,
days: 24 * 60 * 60 * 1000,
}
export class Duration {
static convert(from: DurationType, to: DurationType, duration: number) {
const milliseconds = duration * conversion[from]
return milliseconds / conversion[to]
}
static from(from: DurationType, duration: number) {
return {
to: (to: DurationType) => {
return Duration.convert(from, to, duration)
},
toMs: () => {
return Duration.convert(from, DurationType.MILLISECONDS, duration)
},
}
}
static fromSeconds(duration: number) {
return Duration.from(DurationType.SECONDS, duration)
}
static fromMinutes(duration: number) {
return Duration.from(DurationType.MINUTES, duration)
}
static fromHours(duration: number) {
return Duration.from(DurationType.HOURS, duration)
}
static fromDays(duration: number) {
return Duration.from(DurationType.DAYS, duration)
}
}

View File

@ -1,3 +1,4 @@
export * from "./hashing" export * from "./hashing"
export * from "./utils" export * from "./utils"
export * from "./stringUtils" export * from "./stringUtils"
export * from "./Duration"

View File

@ -0,0 +1,19 @@
import { Duration, DurationType } from "../Duration"
describe("duration", () => {
it("should convert minutes to milliseconds", () => {
expect(Duration.fromMinutes(5).toMs()).toBe(300000)
})
it("should convert seconds to milliseconds", () => {
expect(Duration.fromSeconds(30).toMs()).toBe(30000)
})
it("should convert days to milliseconds", () => {
expect(Duration.fromDays(1).toMs()).toBe(86400000)
})
it("should convert minutes to days", () => {
expect(Duration.fromMinutes(1440).to(DurationType.DAYS)).toBe(1)
})
})

View File

@ -15,6 +15,16 @@ import env from "../../../environment"
const Router = require("@koa/router") const Router = require("@koa/router")
const { RateLimit, Stores } = require("koa2-ratelimit") const { RateLimit, Stores } = require("koa2-ratelimit")
import { middleware, redis } from "@budibase/backend-core" import { middleware, redis } from "@budibase/backend-core"
import { SelectableDatabase } from "@budibase/backend-core/src/redis/utils"
interface KoaRateLimitOptions {
socket: {
host: string
port: number
}
password?: string
database?: number
}
const PREFIX = "/api/public/v1" const PREFIX = "/api/public/v1"
// allow a lot more requests when in test // allow a lot more requests when in test
@ -29,32 +39,21 @@ function getApiLimitPerSecond(): number {
let rateLimitStore: any = null let rateLimitStore: any = null
if (!env.isTest()) { if (!env.isTest()) {
const REDIS_OPTS = redis.utils.getRedisOptions() const { password, host, port } = redis.utils.getRedisConnectionDetails()
let options let options: KoaRateLimitOptions = {
if (REDIS_OPTS.redisProtocolUrl) { socket: {
// fully qualified redis URL host: host,
options = { port: port,
url: REDIS_OPTS.redisProtocolUrl, },
} }
} else {
options = {
socket: {
host: REDIS_OPTS.host,
port: REDIS_OPTS.port,
},
}
if (REDIS_OPTS.opts?.password || REDIS_OPTS.opts.redisOptions?.password) { if (password) {
// @ts-ignore options.password = password
options.password = }
REDIS_OPTS.opts.password || REDIS_OPTS.opts.redisOptions.password
}
if (!env.REDIS_CLUSTERED) { if (!env.REDIS_CLUSTERED) {
// @ts-ignore // Can't set direct redis db in clustered env
// Can't set direct redis db in clustered env options.database = SelectableDatabase.RATE_LIMITING
options.database = 1
}
} }
rateLimitStore = new Stores.Redis(options) rateLimitStore = new Stores.Redis(options)
RateLimit.defaultOptions({ RateLimit.defaultOptions({

View File

@ -31,10 +31,6 @@ import destroyable from "server-destroy"
import { initPro } from "./initPro" import { initPro } from "./initPro"
import { handleScimBody } from "./middleware/handleScimBody" import { handleScimBody } from "./middleware/handleScimBody"
// configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues
events.processors.init(proSdk.auditLogs.write)
if (coreEnv.ENABLE_SSO_MAINTENANCE_MODE) { if (coreEnv.ENABLE_SSO_MAINTENANCE_MODE) {
console.warn( console.warn(
"Warning: ENABLE_SSO_MAINTENANCE_MODE is set. It is recommended this flag is disabled if maintenance is not in progress" "Warning: ENABLE_SSO_MAINTENANCE_MODE is set. It is recommended this flag is disabled if maintenance is not in progress"
@ -93,6 +89,9 @@ export default server.listen(parseInt(env.PORT || "4002"), async () => {
console.log(`Worker running on ${JSON.stringify(server.address())}`) console.log(`Worker running on ${JSON.stringify(server.address())}`)
await initPro() await initPro()
await redis.init() await redis.init()
// configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues
await events.processors.init(proSdk.auditLogs.write)
}) })
process.on("uncaughtException", err => { process.on("uncaughtException", err => {