Add middleware to queue
This commit is contained in:
parent
75554d1bd2
commit
8ac9420e5b
|
@ -3,4 +3,5 @@ export enum JobQueue {
|
|||
APP_BACKUP = "appBackupQueue",
|
||||
AUDIT_LOG = "auditLogQueue",
|
||||
SYSTEM_EVENT_QUEUE = "systemEventQueue",
|
||||
APP_MIGRATION = "appMigration",
|
||||
}
|
||||
|
|
|
@ -87,6 +87,7 @@ enum QueueEventType {
|
|||
APP_BACKUP_EVENT = "app-backup-event",
|
||||
AUDIT_LOG_EVENT = "audit-log-event",
|
||||
SYSTEM_EVENT = "system-event",
|
||||
APP_MIGRATION = "app-migration",
|
||||
}
|
||||
|
||||
const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
|
||||
|
@ -94,6 +95,7 @@ const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
|
|||
[JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT,
|
||||
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
|
||||
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
|
||||
[JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION,
|
||||
}
|
||||
|
||||
function logging(queue: Queue, jobQueue: JobQueue) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import currentApp from "../middleware/currentapp"
|
|||
import zlib from "zlib"
|
||||
import { mainRoutes, staticRoutes, publicRoutes } from "./routes"
|
||||
import { middleware as pro } from "@budibase/pro"
|
||||
import migrations from "../middleware/appMigrations"
|
||||
|
||||
export { shutdown } from "./routes/public"
|
||||
const compress = require("koa-compress")
|
||||
|
@ -47,6 +48,8 @@ router
|
|||
// @ts-ignore
|
||||
.use(currentApp)
|
||||
.use(auth.auditLog)
|
||||
// @ts-ignore
|
||||
.use(migrations)
|
||||
|
||||
// authenticated routes
|
||||
for (let route of mainRoutes) {
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
import queue from "./queue"
|
||||
import { getAppMigrationMetadata } from "./appMigrationMetadata"
|
||||
import { MIGRATIONS } from "./migrations"
|
||||
|
||||
const latestMigration = Object.keys(MIGRATIONS).sort().reverse()[0]
|
||||
|
||||
export async function checkMissingMigrations(appId: string) {
|
||||
const currentVersion = await getAppMigrationMetadata(appId)
|
||||
|
||||
if (currentVersion < latestMigration) {
|
||||
await queue.add(
|
||||
{
|
||||
appId,
|
||||
}
|
||||
// TODO: idempotency
|
||||
)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
import { queue } from "@budibase/backend-core"
|
||||
import { Job } from "bull"
|
||||
|
||||
const appMigrationQueue = queue.createQueue(queue.JobQueue.APP_MIGRATION)
|
||||
appMigrationQueue.process(processMessage)
|
||||
|
||||
async function processMessage(job: Job) {
|
||||
const { appId } = job.data
|
||||
|
||||
console.log(appId)
|
||||
}
|
||||
|
||||
export default appMigrationQueue
|
|
@ -0,0 +1,14 @@
|
|||
import { UserCtx } from "@budibase/types"
|
||||
import { checkMissingMigrations } from "../appMigrations"
|
||||
|
||||
export default async (ctx: UserCtx, next: any) => {
|
||||
const { appId } = ctx
|
||||
|
||||
if (!appId) {
|
||||
return next()
|
||||
}
|
||||
|
||||
await checkMissingMigrations(appId)
|
||||
|
||||
return next()
|
||||
}
|
|
@ -20,6 +20,7 @@ export enum LockName {
|
|||
UPDATE_TENANTS_DOC = "update_tenants_doc",
|
||||
PERSIST_WRITETHROUGH = "persist_writethrough",
|
||||
QUOTA_USAGE_EVENT = "quota_usage_event",
|
||||
APP_MIGRATION = "app_migrations",
|
||||
}
|
||||
|
||||
export type LockOptions = {
|
||||
|
|
Loading…
Reference in New Issue