Merge pull request #6120 from Budibase/fix/graceful-shutdown
Graceful server shutdown and cleaning up hooks which keep instance alive
This commit is contained in:
commit
351e91a5bc
|
@ -0,0 +1 @@
|
||||||
|
module.exports = require("./src/logging")
|
|
@ -0,0 +1,16 @@
|
||||||
|
const NonErrors = ["AccountError"]
|
||||||
|
|
||||||
|
function isSuppressed(e) {
|
||||||
|
return e && e["suppressAlert"]
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.logAlert = (message, e = null) => {
|
||||||
|
if (e && NonErrors.includes(e.name) && isSuppressed(e)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
let errorJson = ""
|
||||||
|
if (e) {
|
||||||
|
errorJson = ": " + JSON.stringify(e, Object.getOwnPropertyNames(e))
|
||||||
|
}
|
||||||
|
console.error(`bb-alert: ${message} ${errorJson}`)
|
||||||
|
}
|
|
@ -23,7 +23,7 @@ function connectionError(timeout, err) {
|
||||||
if (CLOSED) {
|
if (CLOSED) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
CLIENT.end()
|
CLIENT.disconnect()
|
||||||
CLOSED = true
|
CLOSED = true
|
||||||
// always clear this on error
|
// always clear this on error
|
||||||
clearTimeout(timeout)
|
clearTimeout(timeout)
|
||||||
|
|
|
@ -12,6 +12,7 @@ const { mainRoutes, staticRoutes, publicRoutes } = require("./routes")
|
||||||
const pkg = require("../../package.json")
|
const pkg = require("../../package.json")
|
||||||
const env = require("../environment")
|
const env = require("../environment")
|
||||||
const { middleware: pro } = require("@budibase/pro")
|
const { middleware: pro } = require("@budibase/pro")
|
||||||
|
const { shutdown } = require("./routes/public")
|
||||||
|
|
||||||
const router = new Router()
|
const router = new Router()
|
||||||
|
|
||||||
|
@ -90,4 +91,5 @@ router.use(publicRoutes.allowedMethods())
|
||||||
router.use(staticRoutes.routes())
|
router.use(staticRoutes.routes())
|
||||||
router.use(staticRoutes.allowedMethods())
|
router.use(staticRoutes.allowedMethods())
|
||||||
|
|
||||||
module.exports = router
|
module.exports.router = router
|
||||||
|
module.exports.shutdown = shutdown
|
||||||
|
|
|
@ -29,6 +29,7 @@ function getApiLimitPerSecond(): number {
|
||||||
return parseInt(env.API_REQ_LIMIT_PER_SEC)
|
return parseInt(env.API_REQ_LIMIT_PER_SEC)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let rateLimitStore: any = null
|
||||||
if (!env.isTest()) {
|
if (!env.isTest()) {
|
||||||
const REDIS_OPTS = getRedisOptions()
|
const REDIS_OPTS = getRedisOptions()
|
||||||
let options
|
let options
|
||||||
|
@ -47,8 +48,9 @@ if (!env.isTest()) {
|
||||||
database: 1,
|
database: 1,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
rateLimitStore = new Stores.Redis(options)
|
||||||
RateLimit.defaultOptions({
|
RateLimit.defaultOptions({
|
||||||
store: new Stores.Redis(options),
|
store: rateLimitStore,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// rate limiting, allows for 2 requests per second
|
// rate limiting, allows for 2 requests per second
|
||||||
|
@ -128,3 +130,10 @@ applyRoutes(queryEndpoints, PermissionTypes.QUERY, "queryId")
|
||||||
applyRoutes(rowEndpoints, PermissionTypes.TABLE, "tableId", "rowId")
|
applyRoutes(rowEndpoints, PermissionTypes.TABLE, "tableId", "rowId")
|
||||||
|
|
||||||
export default publicRouter
|
export default publicRouter
|
||||||
|
|
||||||
|
export const shutdown = () => {
|
||||||
|
if (rateLimitStore) {
|
||||||
|
rateLimitStore.client.disconnect()
|
||||||
|
rateLimitStore = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@ const automations = require("./automations/index")
|
||||||
const Sentry = require("@sentry/node")
|
const Sentry = require("@sentry/node")
|
||||||
const fileSystem = require("./utilities/fileSystem")
|
const fileSystem = require("./utilities/fileSystem")
|
||||||
const bullboard = require("./automations/bullboard")
|
const bullboard = require("./automations/bullboard")
|
||||||
|
const { logAlert } = require("@budibase/backend-core/logging")
|
||||||
|
const { Thread } = require("./threads")
|
||||||
import redis from "./utilities/redis"
|
import redis from "./utilities/redis"
|
||||||
import * as migrations from "./migrations"
|
import * as migrations from "./migrations"
|
||||||
|
|
||||||
|
@ -49,7 +51,7 @@ app.context.eventEmitter = eventEmitter
|
||||||
app.context.auth = {}
|
app.context.auth = {}
|
||||||
|
|
||||||
// api routes
|
// api routes
|
||||||
app.use(api.routes())
|
app.use(api.router.routes())
|
||||||
|
|
||||||
if (env.isProd()) {
|
if (env.isProd()) {
|
||||||
env._set("NODE_ENV", "production")
|
env._set("NODE_ENV", "production")
|
||||||
|
@ -68,11 +70,24 @@ if (env.isProd()) {
|
||||||
const server = http.createServer(app.callback())
|
const server = http.createServer(app.callback())
|
||||||
destroyable(server)
|
destroyable(server)
|
||||||
|
|
||||||
|
let shuttingDown = false,
|
||||||
|
errCode = 0
|
||||||
server.on("close", async () => {
|
server.on("close", async () => {
|
||||||
if (env.NODE_ENV !== "jest") {
|
// already in process
|
||||||
|
if (shuttingDown) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shuttingDown = true
|
||||||
|
if (!env.isTest()) {
|
||||||
console.log("Server Closed")
|
console.log("Server Closed")
|
||||||
}
|
}
|
||||||
|
await automations.shutdown()
|
||||||
await redis.shutdown()
|
await redis.shutdown()
|
||||||
|
await Thread.shutdown()
|
||||||
|
api.shutdown()
|
||||||
|
if (!env.isTest()) {
|
||||||
|
process.exit(errCode)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
module.exports = server.listen(env.PORT || 0, async () => {
|
module.exports = server.listen(env.PORT || 0, async () => {
|
||||||
|
@ -90,7 +105,13 @@ const shutdown = () => {
|
||||||
}
|
}
|
||||||
|
|
||||||
process.on("uncaughtException", err => {
|
process.on("uncaughtException", err => {
|
||||||
console.error(err)
|
// @ts-ignore
|
||||||
|
// don't worry about this error, comes from zlib isn't important
|
||||||
|
if (err && err["code"] === "ERR_INVALID_CHAR") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
errCode = -1
|
||||||
|
logAlert("Uncaught exception.", err)
|
||||||
shutdown()
|
shutdown()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -102,7 +123,7 @@ process.on("SIGTERM", () => {
|
||||||
// not recommended in a clustered environment
|
// not recommended in a clustered environment
|
||||||
if (!env.HTTP_MIGRATIONS) {
|
if (!env.HTTP_MIGRATIONS) {
|
||||||
migrations.migrate().catch(err => {
|
migrations.migrate().catch(err => {
|
||||||
console.error("Error performing migrations. Exiting.\n", err)
|
logAlert("Error performing migrations. Exiting.", err)
|
||||||
shutdown()
|
shutdown()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -45,4 +45,12 @@ exports.init = () => {
|
||||||
return serverAdapter.registerPlugin()
|
return serverAdapter.registerPlugin()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.shutdown = async () => {
|
||||||
|
if (automationQueue) {
|
||||||
|
clearInterval(cleanupInternal)
|
||||||
|
await automationQueue.close()
|
||||||
|
automationQueue = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
exports.queue = automationQueue
|
exports.queue = automationQueue
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
const { processEvent } = require("./utils")
|
const { processEvent } = require("./utils")
|
||||||
const { queue } = require("./bullboard")
|
const { queue, shutdown } = require("./bullboard")
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
||||||
|
@ -14,4 +14,9 @@ exports.init = function () {
|
||||||
exports.getQueues = () => {
|
exports.getQueues = () => {
|
||||||
return [queue]
|
return [queue]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
exports.shutdown = () => {
|
||||||
|
return shutdown()
|
||||||
|
}
|
||||||
|
|
||||||
exports.queue = queue
|
exports.queue = queue
|
||||||
|
|
|
@ -28,6 +28,8 @@ export class Thread {
|
||||||
workers: any
|
workers: any
|
||||||
timeoutMs: any
|
timeoutMs: any
|
||||||
|
|
||||||
|
static workerRefs: any[] = []
|
||||||
|
|
||||||
constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) {
|
constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) {
|
||||||
this.type = type
|
this.type = type
|
||||||
this.count = opts.count ? opts.count : 1
|
this.count = opts.count ? opts.count : 1
|
||||||
|
@ -46,6 +48,7 @@ export class Thread {
|
||||||
workerOpts.maxCallTime = opts.timeoutMs
|
workerOpts.maxCallTime = opts.timeoutMs
|
||||||
}
|
}
|
||||||
this.workers = workerFarm(workerOpts, typeToFile(type))
|
this.workers = workerFarm(workerOpts, typeToFile(type))
|
||||||
|
Thread.workerRefs.push(this.workers)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -73,4 +76,23 @@ export class Thread {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static shutdown() {
|
||||||
|
return new Promise<void>(resolve => {
|
||||||
|
if (Thread.workerRefs.length === 0) {
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
let count = 0
|
||||||
|
function complete() {
|
||||||
|
count++
|
||||||
|
if (count >= Thread.workerRefs.length) {
|
||||||
|
resolve()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (let worker of Thread.workerRefs) {
|
||||||
|
workerFarm.end(worker, complete)
|
||||||
|
}
|
||||||
|
Thread.workerRefs = []
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,13 @@ class InMemoryQueue {
|
||||||
this._emitter.emit("message")
|
this._emitter.emit("message")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* replicating the close function from bull, which waits for jobs to finish.
|
||||||
|
*/
|
||||||
|
async close() {
|
||||||
|
return []
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This removes a cron which has been implemented, this is part of Bull API.
|
* This removes a cron which has been implemented, this is part of Bull API.
|
||||||
* @param {string} cronJobId The cron which is to be removed.
|
* @param {string} cronJobId The cron which is to be removed.
|
||||||
|
|
|
@ -12,6 +12,7 @@ const destroyable = require("server-destroy")
|
||||||
const koaBody = require("koa-body")
|
const koaBody = require("koa-body")
|
||||||
const koaSession = require("koa-session")
|
const koaSession = require("koa-session")
|
||||||
const { passport } = require("@budibase/backend-core/auth")
|
const { passport } = require("@budibase/backend-core/auth")
|
||||||
|
const { logAlert } = require("@budibase/backend-core/logging")
|
||||||
const logger = require("koa-pino-logger")
|
const logger = require("koa-pino-logger")
|
||||||
const http = require("http")
|
const http = require("http")
|
||||||
const api = require("./api")
|
const api = require("./api")
|
||||||
|
@ -28,7 +29,6 @@ app.keys = ["secret", "key"]
|
||||||
// set up top level koa middleware
|
// set up top level koa middleware
|
||||||
app.use(koaBody({ multipart: true }))
|
app.use(koaBody({ multipart: true }))
|
||||||
app.use(koaSession(app))
|
app.use(koaSession(app))
|
||||||
|
|
||||||
app.use(
|
app.use(
|
||||||
logger({
|
logger({
|
||||||
prettyPrint: {
|
prettyPrint: {
|
||||||
|
@ -62,25 +62,38 @@ if (env.isProd()) {
|
||||||
const server = http.createServer(app.callback())
|
const server = http.createServer(app.callback())
|
||||||
destroyable(server)
|
destroyable(server)
|
||||||
|
|
||||||
|
let shuttingDown = false,
|
||||||
|
errCode = 0
|
||||||
server.on("close", async () => {
|
server.on("close", async () => {
|
||||||
if (env.isProd()) {
|
if (shuttingDown) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shuttingDown = true
|
||||||
|
if (!env.isTest()) {
|
||||||
console.log("Server Closed")
|
console.log("Server Closed")
|
||||||
}
|
}
|
||||||
await redis.shutdown()
|
await redis.shutdown()
|
||||||
|
if (!env.isTest()) {
|
||||||
|
process.exit(errCode)
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const shutdown = () => {
|
||||||
|
server.close()
|
||||||
|
server.destroy()
|
||||||
|
}
|
||||||
|
|
||||||
module.exports = server.listen(parseInt(env.PORT || 4002), async () => {
|
module.exports = server.listen(parseInt(env.PORT || 4002), async () => {
|
||||||
console.log(`Worker running on ${JSON.stringify(server.address())}`)
|
console.log(`Worker running on ${JSON.stringify(server.address())}`)
|
||||||
await redis.init()
|
await redis.init()
|
||||||
})
|
})
|
||||||
|
|
||||||
process.on("uncaughtException", err => {
|
process.on("uncaughtException", err => {
|
||||||
console.error(err)
|
errCode = -1
|
||||||
server.close()
|
logAlert("Uncaught exception.", err)
|
||||||
server.destroy()
|
shutdown()
|
||||||
})
|
})
|
||||||
|
|
||||||
process.on("SIGTERM", () => {
|
process.on("SIGTERM", () => {
|
||||||
server.close()
|
shutdown()
|
||||||
server.destroy()
|
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue