Merge pull request #11548 from Budibase/fix/automation-improvements

Automation improvements
This commit is contained in:
Michael Drury 2023-08-18 13:30:12 +01:00 committed by GitHub
commit 2059c258fa
20 changed files with 233 additions and 124 deletions

View File

@ -1,5 +1,6 @@
import env from "../environment"
import * as context from "../context"
export * from "./installation"
/**
* Read the TENANT_FEATURE_FLAGS env var and return an array of features flags for each tenant.

View File

@ -0,0 +1,17 @@
export function processFeatureEnvVar<T>(
fullList: string[],
featureList?: string
) {
let list
if (!featureList) {
list = fullList
} else {
list = featureList.split(",")
}
for (let feature of list) {
if (!fullList.includes(feature)) {
throw new Error(`Feature: ${feature} is not an allowed option`)
}
}
return list as unknown as T[]
}

View File

@ -6,7 +6,8 @@ export * as roles from "./security/roles"
export * as permissions from "./security/permissions"
export * as accounts from "./accounts"
export * as installation from "./installation"
export * as featureFlags from "./featureFlags"
export * as featureFlags from "./features"
export * as features from "./features/installation"
export * as sessions from "./security/sessions"
export * as platform from "./platform"
export * as auth from "./auth"

View File

@ -4,11 +4,13 @@
$: isError = !value || value.toLowerCase() === "error"
$: isStoppedError = value?.toLowerCase() === "stopped_error"
$: isStopped = value?.toLowerCase() === "stopped" || isStoppedError
$: info = getInfo(isError, isStopped)
$: isStopped = value?.toLowerCase() === "stopped"
$: info = getInfo(isError, isStopped, isStoppedError)
const getInfo = (error, stopped) => {
if (error) {
function getInfo(error, stopped, stoppedError) {
if (stoppedError) {
return { color: "red", message: "Stopped - Error" }
} else if (error) {
return { color: "red", message: "Error" }
} else if (stopped) {
return { color: "yellow", message: "Stopped" }

View File

@ -22,7 +22,8 @@
const ERROR = "error",
SUCCESS = "success",
STOPPED = "stopped"
STOPPED = "stopped",
STOPPED_ERROR = "stopped_error"
const sidePanel = getContext("side-panel")
let pageInfo = createPaginationStore()
@ -52,6 +53,7 @@
{ value: SUCCESS, label: "Success" },
{ value: ERROR, label: "Error" },
{ value: STOPPED, label: "Stopped" },
{ value: STOPPED_ERROR, label: "Stopped - Error" },
]
const runHistorySchema = {

@ -1 +1 @@
Subproject commit 02626390cde905a248cb60729968667c9e49fae9
Subproject commit 9b9c8cc08f271bfc5dd401860f344f6eb336ab35

View File

@ -77,18 +77,19 @@ async function initDeployedApp(prodAppId: any) {
)
).rows.map((row: any) => row.doc)
await clearMetadata()
console.log("You have " + automations.length + " automations")
const { count } = await disableAllCrons(prodAppId)
const promises = []
console.log("Disabling prod crons..")
await disableAllCrons(prodAppId)
console.log("Prod Cron triggers disabled..")
console.log("Enabling cron triggers for deployed app..")
for (let automation of automations) {
promises.push(enableCronTrigger(prodAppId, automation))
}
await Promise.all(promises)
console.log("Enabled cron triggers for deployed app..")
// sync the automations back to the dev DB - since there is now cron
const results = await Promise.all(promises)
const enabledCount = results
.map(result => result.enabled)
.filter(result => result).length
console.log(
`Cleared ${count} old CRON, enabled ${enabledCount} new CRON triggers for app deployment`
)
// sync the automations back to the dev DB - since there is now CRON
// information attached
await sdk.applications.syncApp(dbCore.getDevAppID(prodAppId), {
automationOnly: true,

View File

@ -1,120 +1,41 @@
import Sentry from "@sentry/node"
if (process.env.DD_APM_ENABLED) {
require("./ddApm")
}
// need to load environment first
import env from "./environment"
import { ExtendableContext } from "koa"
import * as db from "./db"
db.init()
import Koa from "koa"
import koaBody from "koa-body"
import http from "http"
import * as api from "./api"
import * as automations from "./automations"
import { Thread } from "./threads"
import * as redis from "./utilities/redis"
import { ServiceType } from "@budibase/types"
import {
events,
logging,
middleware,
timers,
env as coreEnv,
} from "@budibase/backend-core"
import { env as coreEnv } from "@budibase/backend-core"
coreEnv._set("SERVICE_TYPE", ServiceType.APPS)
import { apiEnabled } from "./features"
import createKoaApp from "./koa"
import Koa from "koa"
import { Server } from "http"
import { startup } from "./startup"
const Sentry = require("@sentry/node")
const destroyable = require("server-destroy")
const { userAgent } = require("koa-useragent")
const app = new Koa()
let app: Koa, server: Server
let mbNumber = parseInt(env.HTTP_MB_LIMIT || "10")
if (!mbNumber || isNaN(mbNumber)) {
mbNumber = 10
}
// set up top level koa middleware
app.use(
koaBody({
multipart: true,
formLimit: `${mbNumber}mb`,
jsonLimit: `${mbNumber}mb`,
textLimit: `${mbNumber}mb`,
// @ts-ignore
enableTypes: ["json", "form", "text"],
parsedMethods: ["POST", "PUT", "PATCH", "DELETE"],
})
)
app.use(middleware.correlation)
app.use(middleware.pino)
app.use(userAgent)
if (env.isProd()) {
async function start() {
if (apiEnabled()) {
const koa = createKoaApp()
app = koa.app
server = koa.server
}
await startup(app, server)
if (env.isProd()) {
env._set("NODE_ENV", "production")
Sentry.init()
app.on("error", (err: any, ctx: ExtendableContext) => {
Sentry.withScope(function (scope: any) {
scope.addEventProcessor(function (event: any) {
return Sentry.Handlers.parseRequest(event, ctx.request)
})
Sentry.captureException(err)
})
})
}
}
const server = http.createServer(app.callback())
destroyable(server)
let shuttingDown = false,
errCode = 0
server.on("close", async () => {
// already in process
if (shuttingDown) {
return
}
shuttingDown = true
console.log("Server Closed")
timers.cleanup()
await automations.shutdown()
await redis.shutdown()
events.shutdown()
await Thread.shutdown()
api.shutdown()
if (!env.isTest()) {
process.exit(errCode)
}
start().catch(err => {
console.error(`Failed server startup - ${err.message}`)
})
export default server.listen(env.PORT || 0, async () => {
await startup(app, server)
})
const shutdown = () => {
server.close()
// @ts-ignore
server.destroy()
export function getServer() {
return server
}
process.on("uncaughtException", err => {
// @ts-ignore
// don't worry about this error, comes from zlib isn't important
if (err && err["code"] === "ERR_INVALID_CHAR") {
return
}
errCode = -1
logging.logAlert("Uncaught exception.", err)
shutdown()
})
process.on("SIGTERM", () => {
shutdown()
})
process.on("SIGINT", () => {
shutdown()
})

View File

@ -2,6 +2,7 @@ import { processEvent } from "./utils"
import { automationQueue } from "./bullboard"
import { rebootTrigger } from "./triggers"
import BullQueue from "bull"
import { automationsEnabled } from "../features"
export { automationQueue } from "./bullboard"
export { shutdown } from "./bullboard"
@ -12,6 +13,9 @@ export { BUILTIN_ACTION_DEFINITIONS, getActionDefinitions } from "./actions"
* This module is built purely to kick off the worker farm and manage the inputs/outputs
*/
export async function init() {
if (!automationsEnabled()) {
return
}
// this promise will not complete
const promise = automationQueue.process(async job => {
await processEvent(job)

View File

@ -15,9 +15,13 @@ import {
WebhookActionType,
} from "@budibase/types"
import sdk from "../sdk"
import { automationsEnabled } from "../features"
const WH_STEP_ID = definitions.WEBHOOK.stepId
const Runner = new Thread(ThreadType.AUTOMATION)
let Runner: Thread
if (automationsEnabled()) {
Runner = new Thread(ThreadType.AUTOMATION)
}
function loggingArgs(
job: AutomationJob,
@ -130,7 +134,8 @@ export async function disableAllCrons(appId: any) {
}
}
}
return Promise.all(promises)
const results = await Promise.all(promises)
return { count: results.length / 2 }
}
export async function disableCronById(jobId: number | string) {
@ -169,6 +174,7 @@ export async function enableCronTrigger(appId: any, automation: Automation) {
const needsCreated =
!sdk.automations.isReboot(automation) &&
!sdk.automations.disabled(automation)
let enabled = false
// need to create cron job
if (validCron && needsCreated) {
@ -191,8 +197,9 @@ export async function enableCronTrigger(appId: any, automation: Automation) {
automation._id = response.id
automation._rev = response.rev
})
enabled = true
}
return automation
return { enabled, automation }
}
/**

View File

@ -38,6 +38,8 @@ function parseIntSafe(number?: string) {
}
const environment = {
// features
APP_FEATURES: process.env.APP_FEATURES,
// important - prefer app port to generic port
PORT: process.env.APP_PORT || process.env.PORT,
COUCH_DB_URL: process.env.COUCH_DB_URL,

View File

@ -0,0 +1,24 @@
import { features } from "@budibase/backend-core"
import env from "./environment"
enum AppFeature {
API = "api",
AUTOMATIONS = "automations",
}
const featureList = features.processFeatureEnvVar<AppFeature>(
Object.values(AppFeature),
env.APP_FEATURES
)
export function isFeatureEnabled(feature: AppFeature) {
return featureList.includes(feature)
}
export function automationsEnabled() {
return featureList.includes(AppFeature.AUTOMATIONS)
}
export function apiEnabled() {
return featureList.includes(AppFeature.API)
}

102
packages/server/src/koa.ts Normal file
View File

@ -0,0 +1,102 @@
import env from "./environment"
import { ExtendableContext } from "koa"
import Koa from "koa"
import koaBody from "koa-body"
import http from "http"
import * as api from "./api"
import * as automations from "./automations"
import { Thread } from "./threads"
import * as redis from "./utilities/redis"
import { events, logging, middleware, timers } from "@budibase/backend-core"
const Sentry = require("@sentry/node")
const destroyable = require("server-destroy")
const { userAgent } = require("koa-useragent")
export default function createKoaApp() {
const app = new Koa()
let mbNumber = parseInt(env.HTTP_MB_LIMIT || "10")
if (!mbNumber || isNaN(mbNumber)) {
mbNumber = 10
}
// set up top level koa middleware
app.use(
koaBody({
multipart: true,
formLimit: `${mbNumber}mb`,
jsonLimit: `${mbNumber}mb`,
textLimit: `${mbNumber}mb`,
// @ts-ignore
enableTypes: ["json", "form", "text"],
parsedMethods: ["POST", "PUT", "PATCH", "DELETE"],
})
)
app.use(middleware.correlation)
app.use(middleware.pino)
app.use(userAgent)
if (env.isProd()) {
app.on("error", (err: any, ctx: ExtendableContext) => {
Sentry.withScope(function (scope: any) {
scope.addEventProcessor(function (event: any) {
return Sentry.Handlers.parseRequest(event, ctx.request)
})
Sentry.captureException(err)
})
})
}
const server = http.createServer(app.callback())
destroyable(server)
let shuttingDown = false,
errCode = 0
server.on("close", async () => {
// already in process
if (shuttingDown) {
return
}
shuttingDown = true
console.log("Server Closed")
timers.cleanup()
await automations.shutdown()
await redis.shutdown()
events.shutdown()
await Thread.shutdown()
api.shutdown()
if (!env.isTest()) {
process.exit(errCode)
}
})
const listener = server.listen(env.PORT || 0)
const shutdown = () => {
server.close()
// @ts-ignore
server.destroy()
}
process.on("uncaughtException", err => {
// @ts-ignore
// don't worry about this error, comes from zlib isn't important
if (err && err["code"] === "ERR_INVALID_CHAR") {
return
}
errCode = -1
logging.logAlert("Uncaught exception.", err)
shutdown()
})
process.on("SIGTERM", () => {
shutdown()
})
process.on("SIGINT", () => {
shutdown()
})
return { app, server: listener }
}

View File

@ -17,6 +17,7 @@ import * as pro from "@budibase/pro"
import * as api from "./api"
import sdk from "./sdk"
import { initialise as initialiseWebsockets } from "./websockets"
import { automationsEnabled } from "./features"
let STARTUP_RAN = false
@ -97,7 +98,9 @@ export async function startup(app?: any, server?: any) {
// configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues
queuePromises.push(events.processors.init(pro.sdk.auditLogs.write))
if (automationsEnabled()) {
queuePromises.push(automations.init())
}
queuePromises.push(initPro())
if (app) {
// bring routes online as final step once everything ready

View File

@ -87,7 +87,7 @@ class TestConfiguration {
if (openServer) {
// use a random port because it doesn't matter
env.PORT = "0"
this.server = require("../../app").default
this.server = require("../../app").getServer()
// we need the request for logging in, involves cookies, hard to fake
this.request = supertest(this.server)
this.started = true
@ -178,7 +178,7 @@ class TestConfiguration {
if (this.server) {
this.server.close()
} else {
require("../../app").default.close()
require("../../app").getServer().close()
}
if (this.allApps) {
cleanup(this.allApps.map(app => app.appId))

View File

@ -20,6 +20,7 @@ import {
AutomationMetadata,
AutomationStatus,
AutomationStep,
AutomationStepStatus,
} from "@budibase/types"
import {
AutomationContext,
@ -452,7 +453,10 @@ class Orchestrator {
this.executionOutput.steps.splice(loopStepNumber + 1, 0, {
id: step.id,
stepId: step.stepId,
outputs: { status: AutomationStatus.NO_ITERATIONS, success: true },
outputs: {
status: AutomationStepStatus.NO_ITERATIONS,
success: true,
},
inputs: {},
})

View File

@ -179,12 +179,15 @@ export interface AutomationTrigger extends AutomationTriggerSchema {
id: string
}
export enum AutomationStepStatus {
NO_ITERATIONS = "no_iterations",
}
export enum AutomationStatus {
SUCCESS = "success",
ERROR = "error",
STOPPED = "stopped",
STOPPED_ERROR = "stopped_error",
NO_ITERATIONS = "no_iterations",
}
export interface AutomationResults {

View File

@ -31,6 +31,8 @@ function parseIntSafe(number: any) {
}
const environment = {
// features
WORKER_FEATURES: process.env.WORKER_FEATURES,
// auth
MINIO_ACCESS_KEY: process.env.MINIO_ACCESS_KEY,
MINIO_SECRET_KEY: process.env.MINIO_SECRET_KEY,

View File

@ -0,0 +1,13 @@
import { features } from "@budibase/backend-core"
import env from "./environment"
enum WorkerFeature {}
const featureList: WorkerFeature[] = features.processFeatureEnvVar(
Object.values(WorkerFeature),
env.WORKER_FEATURES
)
export function isFeatureEnabled(feature: WorkerFeature) {
return featureList.includes(feature)
}