Adding feature flagging, the option to only start the automations, or the API, meaning we can split the service if needed.
This commit is contained in:
parent
e068e301ff
commit
95b8a4ea10
|
@ -1,120 +1,41 @@
|
||||||
|
import Sentry from "@sentry/node"
|
||||||
|
|
||||||
if (process.env.DD_APM_ENABLED) {
|
if (process.env.DD_APM_ENABLED) {
|
||||||
require("./ddApm")
|
require("./ddApm")
|
||||||
}
|
}
|
||||||
|
|
||||||
// need to load environment first
|
// need to load environment first
|
||||||
import env from "./environment"
|
import env from "./environment"
|
||||||
|
|
||||||
import { ExtendableContext } from "koa"
|
|
||||||
import * as db from "./db"
|
import * as db from "./db"
|
||||||
db.init()
|
db.init()
|
||||||
import Koa from "koa"
|
|
||||||
import koaBody from "koa-body"
|
|
||||||
import http from "http"
|
|
||||||
import * as api from "./api"
|
|
||||||
import * as automations from "./automations"
|
|
||||||
import { Thread } from "./threads"
|
|
||||||
import * as redis from "./utilities/redis"
|
|
||||||
import { ServiceType } from "@budibase/types"
|
import { ServiceType } from "@budibase/types"
|
||||||
import {
|
import { env as coreEnv } from "@budibase/backend-core"
|
||||||
events,
|
|
||||||
logging,
|
|
||||||
middleware,
|
|
||||||
timers,
|
|
||||||
env as coreEnv,
|
|
||||||
} from "@budibase/backend-core"
|
|
||||||
coreEnv._set("SERVICE_TYPE", ServiceType.APPS)
|
coreEnv._set("SERVICE_TYPE", ServiceType.APPS)
|
||||||
|
import { apiEnabled } from "./features"
|
||||||
|
import createKoaApp from "./koa"
|
||||||
|
import Koa from "koa"
|
||||||
|
import { Server } from "http"
|
||||||
import { startup } from "./startup"
|
import { startup } from "./startup"
|
||||||
const Sentry = require("@sentry/node")
|
|
||||||
const destroyable = require("server-destroy")
|
|
||||||
const { userAgent } = require("koa-useragent")
|
|
||||||
|
|
||||||
const app = new Koa()
|
let app: Koa, server: Server
|
||||||
|
|
||||||
let mbNumber = parseInt(env.HTTP_MB_LIMIT || "10")
|
async function start() {
|
||||||
if (!mbNumber || isNaN(mbNumber)) {
|
if (apiEnabled()) {
|
||||||
mbNumber = 10
|
const koa = createKoaApp()
|
||||||
}
|
app = koa.app
|
||||||
// set up top level koa middleware
|
server = koa.server
|
||||||
app.use(
|
|
||||||
koaBody({
|
|
||||||
multipart: true,
|
|
||||||
formLimit: `${mbNumber}mb`,
|
|
||||||
jsonLimit: `${mbNumber}mb`,
|
|
||||||
textLimit: `${mbNumber}mb`,
|
|
||||||
// @ts-ignore
|
|
||||||
enableTypes: ["json", "form", "text"],
|
|
||||||
parsedMethods: ["POST", "PUT", "PATCH", "DELETE"],
|
|
||||||
})
|
|
||||||
)
|
|
||||||
|
|
||||||
app.use(middleware.correlation)
|
|
||||||
app.use(middleware.pino)
|
|
||||||
app.use(userAgent)
|
|
||||||
|
|
||||||
if (env.isProd()) {
|
|
||||||
env._set("NODE_ENV", "production")
|
|
||||||
Sentry.init()
|
|
||||||
|
|
||||||
app.on("error", (err: any, ctx: ExtendableContext) => {
|
|
||||||
Sentry.withScope(function (scope: any) {
|
|
||||||
scope.addEventProcessor(function (event: any) {
|
|
||||||
return Sentry.Handlers.parseRequest(event, ctx.request)
|
|
||||||
})
|
|
||||||
Sentry.captureException(err)
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
const server = http.createServer(app.callback())
|
|
||||||
destroyable(server)
|
|
||||||
|
|
||||||
let shuttingDown = false,
|
|
||||||
errCode = 0
|
|
||||||
|
|
||||||
server.on("close", async () => {
|
|
||||||
// already in process
|
|
||||||
if (shuttingDown) {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
shuttingDown = true
|
|
||||||
console.log("Server Closed")
|
|
||||||
timers.cleanup()
|
|
||||||
await automations.shutdown()
|
|
||||||
await redis.shutdown()
|
|
||||||
events.shutdown()
|
|
||||||
await Thread.shutdown()
|
|
||||||
api.shutdown()
|
|
||||||
if (!env.isTest()) {
|
|
||||||
process.exit(errCode)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
export default server.listen(env.PORT || 0, async () => {
|
|
||||||
await startup(app, server)
|
await startup(app, server)
|
||||||
})
|
if (env.isProd()) {
|
||||||
|
env._set("NODE_ENV", "production")
|
||||||
const shutdown = () => {
|
Sentry.init()
|
||||||
server.close()
|
}
|
||||||
// @ts-ignore
|
|
||||||
server.destroy()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
process.on("uncaughtException", err => {
|
start().catch(err => {
|
||||||
// @ts-ignore
|
console.error(`Failed server startup - ${err.message}`)
|
||||||
// don't worry about this error, comes from zlib isn't important
|
|
||||||
if (err && err["code"] === "ERR_INVALID_CHAR") {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
errCode = -1
|
|
||||||
logging.logAlert("Uncaught exception.", err)
|
|
||||||
shutdown()
|
|
||||||
})
|
})
|
||||||
|
|
||||||
process.on("SIGTERM", () => {
|
export function getServer() {
|
||||||
shutdown()
|
return app
|
||||||
})
|
}
|
||||||
|
|
||||||
process.on("SIGINT", () => {
|
|
||||||
shutdown()
|
|
||||||
})
|
|
||||||
|
|
|
@ -2,6 +2,7 @@ import { processEvent } from "./utils"
|
||||||
import { automationQueue } from "./bullboard"
|
import { automationQueue } from "./bullboard"
|
||||||
import { rebootTrigger } from "./triggers"
|
import { rebootTrigger } from "./triggers"
|
||||||
import BullQueue from "bull"
|
import BullQueue from "bull"
|
||||||
|
import { automationsEnabled } from "../features"
|
||||||
|
|
||||||
export { automationQueue } from "./bullboard"
|
export { automationQueue } from "./bullboard"
|
||||||
export { shutdown } from "./bullboard"
|
export { shutdown } from "./bullboard"
|
||||||
|
@ -12,6 +13,9 @@ export { BUILTIN_ACTION_DEFINITIONS, getActionDefinitions } from "./actions"
|
||||||
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
* This module is built purely to kick off the worker farm and manage the inputs/outputs
|
||||||
*/
|
*/
|
||||||
export async function init() {
|
export async function init() {
|
||||||
|
if (!automationsEnabled()) {
|
||||||
|
return
|
||||||
|
}
|
||||||
// this promise will not complete
|
// this promise will not complete
|
||||||
const promise = automationQueue.process(async job => {
|
const promise = automationQueue.process(async job => {
|
||||||
await processEvent(job)
|
await processEvent(job)
|
||||||
|
|
|
@ -15,9 +15,13 @@ import {
|
||||||
WebhookActionType,
|
WebhookActionType,
|
||||||
} from "@budibase/types"
|
} from "@budibase/types"
|
||||||
import sdk from "../sdk"
|
import sdk from "../sdk"
|
||||||
|
import { automationsEnabled } from "../features"
|
||||||
|
|
||||||
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
||||||
const Runner = new Thread(ThreadType.AUTOMATION)
|
let Runner: Thread
|
||||||
|
if (automationsEnabled()) {
|
||||||
|
Runner = new Thread(ThreadType.AUTOMATION)
|
||||||
|
}
|
||||||
|
|
||||||
function loggingArgs(
|
function loggingArgs(
|
||||||
job: AutomationJob,
|
job: AutomationJob,
|
||||||
|
|
|
@ -38,6 +38,8 @@ function parseIntSafe(number?: string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const environment = {
|
const environment = {
|
||||||
|
// features
|
||||||
|
APP_FEATURES: process.env.APP_FEATURES,
|
||||||
// important - prefer app port to generic port
|
// important - prefer app port to generic port
|
||||||
PORT: process.env.APP_PORT || process.env.PORT,
|
PORT: process.env.APP_PORT || process.env.PORT,
|
||||||
COUCH_DB_URL: process.env.COUCH_DB_URL,
|
COUCH_DB_URL: process.env.COUCH_DB_URL,
|
||||||
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
import env from "./environment"
|
||||||
|
|
||||||
|
enum AppFeature {
|
||||||
|
API = "api",
|
||||||
|
AUTOMATIONS = "automations",
|
||||||
|
}
|
||||||
|
|
||||||
|
const featureList = processFeatureList()
|
||||||
|
|
||||||
|
function processFeatureList() {
|
||||||
|
const fullList = Object.values(AppFeature) as string[]
|
||||||
|
let list
|
||||||
|
if (!env.APP_FEATURES) {
|
||||||
|
list = fullList
|
||||||
|
} else {
|
||||||
|
list = env.APP_FEATURES.split(",")
|
||||||
|
}
|
||||||
|
for (let feature of list) {
|
||||||
|
if (!fullList.includes(feature)) {
|
||||||
|
throw new Error(`Feature: ${feature} is not an allowed option`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return list
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isFeatureEnabled(feature: AppFeature) {
|
||||||
|
return featureList.includes(feature)
|
||||||
|
}
|
||||||
|
|
||||||
|
export function automationsEnabled() {
|
||||||
|
return featureList.includes(AppFeature.AUTOMATIONS)
|
||||||
|
}
|
||||||
|
|
||||||
|
export function apiEnabled() {
|
||||||
|
return featureList.includes(AppFeature.API)
|
||||||
|
}
|
|
@ -0,0 +1,102 @@
|
||||||
|
import env from "./environment"
|
||||||
|
import { ExtendableContext } from "koa"
|
||||||
|
import Koa from "koa"
|
||||||
|
import koaBody from "koa-body"
|
||||||
|
import http from "http"
|
||||||
|
import * as api from "./api"
|
||||||
|
import * as automations from "./automations"
|
||||||
|
import { Thread } from "./threads"
|
||||||
|
import * as redis from "./utilities/redis"
|
||||||
|
import { events, logging, middleware, timers } from "@budibase/backend-core"
|
||||||
|
const Sentry = require("@sentry/node")
|
||||||
|
const destroyable = require("server-destroy")
|
||||||
|
const { userAgent } = require("koa-useragent")
|
||||||
|
|
||||||
|
export default function createKoaApp() {
|
||||||
|
const app = new Koa()
|
||||||
|
|
||||||
|
let mbNumber = parseInt(env.HTTP_MB_LIMIT || "10")
|
||||||
|
if (!mbNumber || isNaN(mbNumber)) {
|
||||||
|
mbNumber = 10
|
||||||
|
}
|
||||||
|
// set up top level koa middleware
|
||||||
|
app.use(
|
||||||
|
koaBody({
|
||||||
|
multipart: true,
|
||||||
|
formLimit: `${mbNumber}mb`,
|
||||||
|
jsonLimit: `${mbNumber}mb`,
|
||||||
|
textLimit: `${mbNumber}mb`,
|
||||||
|
// @ts-ignore
|
||||||
|
enableTypes: ["json", "form", "text"],
|
||||||
|
parsedMethods: ["POST", "PUT", "PATCH", "DELETE"],
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
app.use(middleware.correlation)
|
||||||
|
app.use(middleware.pino)
|
||||||
|
app.use(userAgent)
|
||||||
|
|
||||||
|
if (env.isProd()) {
|
||||||
|
app.on("error", (err: any, ctx: ExtendableContext) => {
|
||||||
|
Sentry.withScope(function (scope: any) {
|
||||||
|
scope.addEventProcessor(function (event: any) {
|
||||||
|
return Sentry.Handlers.parseRequest(event, ctx.request)
|
||||||
|
})
|
||||||
|
Sentry.captureException(err)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
const server = http.createServer(app.callback())
|
||||||
|
destroyable(server)
|
||||||
|
|
||||||
|
let shuttingDown = false,
|
||||||
|
errCode = 0
|
||||||
|
|
||||||
|
server.on("close", async () => {
|
||||||
|
// already in process
|
||||||
|
if (shuttingDown) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shuttingDown = true
|
||||||
|
console.log("Server Closed")
|
||||||
|
timers.cleanup()
|
||||||
|
await automations.shutdown()
|
||||||
|
await redis.shutdown()
|
||||||
|
events.shutdown()
|
||||||
|
await Thread.shutdown()
|
||||||
|
api.shutdown()
|
||||||
|
if (!env.isTest()) {
|
||||||
|
process.exit(errCode)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const listener = server.listen(env.PORT || 0)
|
||||||
|
|
||||||
|
const shutdown = () => {
|
||||||
|
server.close()
|
||||||
|
// @ts-ignore
|
||||||
|
server.destroy()
|
||||||
|
}
|
||||||
|
|
||||||
|
process.on("uncaughtException", err => {
|
||||||
|
// @ts-ignore
|
||||||
|
// don't worry about this error, comes from zlib isn't important
|
||||||
|
if (err && err["code"] === "ERR_INVALID_CHAR") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
errCode = -1
|
||||||
|
logging.logAlert("Uncaught exception.", err)
|
||||||
|
shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
process.on("SIGTERM", () => {
|
||||||
|
shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
process.on("SIGINT", () => {
|
||||||
|
shutdown()
|
||||||
|
})
|
||||||
|
|
||||||
|
return { app, server: listener }
|
||||||
|
}
|
|
@ -17,6 +17,7 @@ import * as pro from "@budibase/pro"
|
||||||
import * as api from "./api"
|
import * as api from "./api"
|
||||||
import sdk from "./sdk"
|
import sdk from "./sdk"
|
||||||
import { initialise as initialiseWebsockets } from "./websockets"
|
import { initialise as initialiseWebsockets } from "./websockets"
|
||||||
|
import { automationsEnabled } from "./features"
|
||||||
|
|
||||||
let STARTUP_RAN = false
|
let STARTUP_RAN = false
|
||||||
|
|
||||||
|
@ -97,7 +98,9 @@ export async function startup(app?: any, server?: any) {
|
||||||
// configure events to use the pro audit log write
|
// configure events to use the pro audit log write
|
||||||
// can't integrate directly into backend-core due to cyclic issues
|
// can't integrate directly into backend-core due to cyclic issues
|
||||||
queuePromises.push(events.processors.init(pro.sdk.auditLogs.write))
|
queuePromises.push(events.processors.init(pro.sdk.auditLogs.write))
|
||||||
queuePromises.push(automations.init())
|
if (automationsEnabled()) {
|
||||||
|
queuePromises.push(automations.init())
|
||||||
|
}
|
||||||
queuePromises.push(initPro())
|
queuePromises.push(initPro())
|
||||||
if (app) {
|
if (app) {
|
||||||
// bring routes online as final step once everything ready
|
// bring routes online as final step once everything ready
|
||||||
|
|
|
@ -87,7 +87,7 @@ class TestConfiguration {
|
||||||
if (openServer) {
|
if (openServer) {
|
||||||
// use a random port because it doesn't matter
|
// use a random port because it doesn't matter
|
||||||
env.PORT = "0"
|
env.PORT = "0"
|
||||||
this.server = require("../../app").default
|
this.server = require("../../app").getServer()
|
||||||
// we need the request for logging in, involves cookies, hard to fake
|
// we need the request for logging in, involves cookies, hard to fake
|
||||||
this.request = supertest(this.server)
|
this.request = supertest(this.server)
|
||||||
this.started = true
|
this.started = true
|
||||||
|
@ -178,7 +178,7 @@ class TestConfiguration {
|
||||||
if (this.server) {
|
if (this.server) {
|
||||||
this.server.close()
|
this.server.close()
|
||||||
} else {
|
} else {
|
||||||
require("../../app").default.close()
|
require("../../app").getServer().close()
|
||||||
}
|
}
|
||||||
if (this.allApps) {
|
if (this.allApps) {
|
||||||
cleanup(this.allApps.map(app => app.appId))
|
cleanup(this.allApps.map(app => app.appId))
|
||||||
|
|
|
@ -31,6 +31,8 @@ function parseIntSafe(number: any) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const environment = {
|
const environment = {
|
||||||
|
// features
|
||||||
|
WORKER_FEATURES: process.env.WORKER_FEATURES,
|
||||||
// auth
|
// auth
|
||||||
MINIO_ACCESS_KEY: process.env.MINIO_ACCESS_KEY,
|
MINIO_ACCESS_KEY: process.env.MINIO_ACCESS_KEY,
|
||||||
MINIO_SECRET_KEY: process.env.MINIO_SECRET_KEY,
|
MINIO_SECRET_KEY: process.env.MINIO_SECRET_KEY,
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
import env from "./environment"
|
||||||
|
|
||||||
|
enum WorkerFeature {}
|
||||||
|
|
||||||
|
const featureList: WorkerFeature[] = processFeatureList()
|
||||||
|
|
||||||
|
function processFeatureList() {
|
||||||
|
const fullList = Object.values(WorkerFeature) as string[]
|
||||||
|
let list
|
||||||
|
if (!env.WORKER_FEATURES) {
|
||||||
|
list = fullList
|
||||||
|
} else {
|
||||||
|
list = env.WORKER_FEATURES.split(",")
|
||||||
|
}
|
||||||
|
for (let feature of list) {
|
||||||
|
if (!fullList.includes(feature)) {
|
||||||
|
throw new Error(`Feature: ${feature} is not an allowed option`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// casting ok - confirmed definitely is a list of worker features
|
||||||
|
return list as unknown as WorkerFeature[]
|
||||||
|
}
|
||||||
|
|
||||||
|
export function isFeatureEnabled(feature: WorkerFeature) {
|
||||||
|
return featureList.includes(feature)
|
||||||
|
}
|
Loading…
Reference in New Issue