Fixing issue with server not shutting down correctly when an error occurs, making sure that everything clears up gracefully.

This commit is contained in:
mike12345567 2022-05-30 21:22:06 +01:00
parent b46ab2eb09
commit 06a32ed357
9 changed files with 83 additions and 12 deletions

View File

@ -23,7 +23,7 @@ function connectionError(timeout, err) {
if (CLOSED) { if (CLOSED) {
return return
} }
CLIENT.end() CLIENT.disconnect()
CLOSED = true CLOSED = true
// always clear this on error // always clear this on error
clearTimeout(timeout) clearTimeout(timeout)

View File

@ -12,6 +12,7 @@ const { mainRoutes, staticRoutes, publicRoutes } = require("./routes")
const pkg = require("../../package.json") const pkg = require("../../package.json")
const env = require("../environment") const env = require("../environment")
const { middleware: pro } = require("@budibase/pro") const { middleware: pro } = require("@budibase/pro")
const { shutdown } = require("./routes/public")
const router = new Router() const router = new Router()
@ -90,4 +91,5 @@ router.use(publicRoutes.allowedMethods())
router.use(staticRoutes.routes()) router.use(staticRoutes.routes())
router.use(staticRoutes.allowedMethods()) router.use(staticRoutes.allowedMethods())
module.exports = router module.exports.router = router
module.exports.shutdown = shutdown

View File

@ -29,6 +29,7 @@ function getApiLimitPerSecond(): number {
return parseInt(env.API_REQ_LIMIT_PER_SEC) return parseInt(env.API_REQ_LIMIT_PER_SEC)
} }
let rateLimitStore: any = null
if (!env.isTest()) { if (!env.isTest()) {
const REDIS_OPTS = getRedisOptions() const REDIS_OPTS = getRedisOptions()
let options let options
@ -47,8 +48,9 @@ if (!env.isTest()) {
database: 1, database: 1,
} }
} }
rateLimitStore = new Stores.Redis(options)
RateLimit.defaultOptions({ RateLimit.defaultOptions({
store: new Stores.Redis(options), store: rateLimitStore,
}) })
} }
// rate limiting, allows for 2 requests per second // rate limiting, allows for 2 requests per second
@ -128,3 +130,10 @@ applyRoutes(queryEndpoints, PermissionTypes.QUERY, "queryId")
applyRoutes(rowEndpoints, PermissionTypes.TABLE, "tableId", "rowId") applyRoutes(rowEndpoints, PermissionTypes.TABLE, "tableId", "rowId")
export default publicRouter export default publicRouter
export const shutdown = () => {
if (rateLimitStore) {
rateLimitStore.client.disconnect()
rateLimitStore = null
}
}

View File

@ -14,6 +14,8 @@ const automations = require("./automations/index")
const Sentry = require("@sentry/node") const Sentry = require("@sentry/node")
const fileSystem = require("./utilities/fileSystem") const fileSystem = require("./utilities/fileSystem")
const bullboard = require("./automations/bullboard") const bullboard = require("./automations/bullboard")
const context = require("@budibase/backend-core/context")
const { Thread } = require("./threads")
import redis from "./utilities/redis" import redis from "./utilities/redis"
import * as migrations from "./migrations" import * as migrations from "./migrations"
@ -49,7 +51,7 @@ app.context.eventEmitter = eventEmitter
app.context.auth = {} app.context.auth = {}
// api routes // api routes
app.use(api.routes()) app.use(api.router.routes())
if (env.isProd()) { if (env.isProd()) {
env._set("NODE_ENV", "production") env._set("NODE_ENV", "production")
@ -68,11 +70,20 @@ if (env.isProd()) {
const server = http.createServer(app.callback()) const server = http.createServer(app.callback())
destroyable(server) destroyable(server)
let shuttingDown = false
server.on("close", async () => { server.on("close", async () => {
if (env.NODE_ENV !== "jest") { // already in process
if (shuttingDown) {
return
}
shuttingDown = true
if (!env.isTest()) {
console.log("Server Closed") console.log("Server Closed")
} }
await automations.shutdown()
await redis.shutdown() await redis.shutdown()
await Thread.shutdown()
api.shutdown()
}) })
module.exports = server.listen(env.PORT || 0, async () => { module.exports = server.listen(env.PORT || 0, async () => {

View File

@ -45,4 +45,12 @@ exports.init = () => {
return serverAdapter.registerPlugin() return serverAdapter.registerPlugin()
} }
exports.shutdown = async () => {
if (automationQueue) {
clearInterval(cleanupInternal)
await automationQueue.close()
automationQueue = null
}
}
exports.queue = automationQueue exports.queue = automationQueue

View File

@ -1,5 +1,5 @@
const { processEvent } = require("./utils") const { processEvent } = require("./utils")
const { queue } = require("./bullboard") const { queue, shutdown } = require("./bullboard")
/** /**
* This module is built purely to kick off the worker farm and manage the inputs/outputs * This module is built purely to kick off the worker farm and manage the inputs/outputs
@ -14,4 +14,9 @@ exports.init = function () {
exports.getQueues = () => { exports.getQueues = () => {
return [queue] return [queue]
} }
exports.shutdown = () => {
return shutdown()
}
exports.queue = queue exports.queue = queue

View File

@ -28,6 +28,8 @@ export class Thread {
workers: any workers: any
timeoutMs: any timeoutMs: any
static workerRefs: any[] = []
constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) { constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) {
this.type = type this.type = type
this.count = opts.count ? opts.count : 1 this.count = opts.count ? opts.count : 1
@ -46,6 +48,7 @@ export class Thread {
workerOpts.maxCallTime = opts.timeoutMs workerOpts.maxCallTime = opts.timeoutMs
} }
this.workers = workerFarm(workerOpts, typeToFile(type)) this.workers = workerFarm(workerOpts, typeToFile(type))
Thread.workerRefs.push(this.workers)
} }
} }
@ -73,4 +76,23 @@ export class Thread {
}) })
}) })
} }
static shutdown() {
return new Promise<void>(resolve => {
if (Thread.workerRefs.length === 0) {
resolve()
}
let count = 0
function complete() {
count++
if (count >= Thread.workerRefs.length) {
resolve()
}
}
for (let worker of Thread.workerRefs) {
workerFarm.end(worker, complete)
}
Thread.workerRefs = []
})
}
} }

View File

@ -75,6 +75,13 @@ class InMemoryQueue {
this._emitter.emit("message") this._emitter.emit("message")
} }
/**
* replicating the close function from bull, which waits for jobs to finish.
*/
async close() {
return []
}
/** /**
* This removes a cron which has been implemented, this is part of Bull API. * This removes a cron which has been implemented, this is part of Bull API.
* @param {string} cronJobId The cron which is to be removed. * @param {string} cronJobId The cron which is to be removed.

View File

@ -28,7 +28,6 @@ app.keys = ["secret", "key"]
// set up top level koa middleware // set up top level koa middleware
app.use(koaBody({ multipart: true })) app.use(koaBody({ multipart: true }))
app.use(koaSession(app)) app.use(koaSession(app))
app.use( app.use(
logger({ logger({
prettyPrint: { prettyPrint: {
@ -62,13 +61,23 @@ if (env.isProd()) {
const server = http.createServer(app.callback()) const server = http.createServer(app.callback())
destroyable(server) destroyable(server)
let shuttingDown = false
server.on("close", async () => { server.on("close", async () => {
if (env.isProd()) { if (shuttingDown) {
return
}
shuttingDown = true
if (!env.isTest()) {
console.log("Server Closed") console.log("Server Closed")
} }
await redis.shutdown() await redis.shutdown()
}) })
const shutdown = () => {
server.close()
server.destroy()
}
module.exports = server.listen(parseInt(env.PORT || 4002), async () => { module.exports = server.listen(parseInt(env.PORT || 4002), async () => {
console.log(`Worker running on ${JSON.stringify(server.address())}`) console.log(`Worker running on ${JSON.stringify(server.address())}`)
await redis.init() await redis.init()
@ -76,11 +85,9 @@ module.exports = server.listen(parseInt(env.PORT || 4002), async () => {
process.on("uncaughtException", err => { process.on("uncaughtException", err => {
console.error(err) console.error(err)
server.close() shutdown()
server.destroy()
}) })
process.on("SIGTERM", () => { process.on("SIGTERM", () => {
server.close() shutdown()
server.destroy()
}) })