diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 1838eed92f..f633d0885e 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -21,6 +21,7 @@ let cleanupInterval: NodeJS.Timeout async function cleanup() { for (let queue of QUEUES) { await queue.clean(CLEANUP_PERIOD_MS, "completed") + await queue.clean(CLEANUP_PERIOD_MS, "failed") } } diff --git a/packages/server/src/appMigrations/index.ts b/packages/server/src/appMigrations/index.ts index 8bd3ae7425..89c71ae26f 100644 --- a/packages/server/src/appMigrations/index.ts +++ b/packages/server/src/appMigrations/index.ts @@ -48,8 +48,6 @@ export async function checkMissingMigrations( }, { jobId: `${appId}_${latestMigration}`, - removeOnComplete: true, - removeOnFail: true, } ) diff --git a/packages/server/src/appMigrations/migrationsProcessor.ts b/packages/server/src/appMigrations/migrationsProcessor.ts index 0ca923230f..badd50f53c 100644 --- a/packages/server/src/appMigrations/migrationsProcessor.ts +++ b/packages/server/src/appMigrations/migrationsProcessor.ts @@ -12,54 +12,56 @@ export async function processMigrations( migrations: AppMigration[] ) { console.log(`Processing app migration for "${appId}"`) - - await locks.doWithLock( - { - name: LockName.APP_MIGRATION, - type: LockType.AUTO_EXTEND, - resource: appId, - }, - async () => { - await context.doInAppMigrationContext(appId, async () => { + // have to wrap in context, this gets the tenant from the app ID + await context.doInAppContext(appId, async () => { + await locks.doWithLock( + { + name: LockName.APP_MIGRATION, + type: LockType.AUTO_EXTEND, + resource: appId, + }, + async () => { try { - let currentVersion = await getAppMigrationVersion(appId) + await context.doInAppMigrationContext(appId, async () => { + let currentVersion = await getAppMigrationVersion(appId) - const pendingMigrations = migrations - .filter(m => m.id > currentVersion) - .sort((a, b) => a.id.localeCompare(b.id)) + const pendingMigrations = migrations + .filter(m => m.id > currentVersion) + .sort((a, b) => a.id.localeCompare(b.id)) - const migrationIds = migrations.map(m => m.id).sort() + const migrationIds = migrations.map(m => m.id).sort() - let index = 0 - for (const { id, func } of pendingMigrations) { - const expectedMigration = - migrationIds[migrationIds.indexOf(currentVersion) + 1] + let index = 0 + for (const { id, func } of pendingMigrations) { + const expectedMigration = + migrationIds[migrationIds.indexOf(currentVersion) + 1] - if (expectedMigration !== id) { - throw new Error( - `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` - ) + if (expectedMigration !== id) { + throw new Error( + `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` + ) + } + + const counter = `(${++index}/${pendingMigrations.length})` + console.info(`Running migration ${id}... ${counter}`, { + migrationId: id, + appId, + }) + await func() + await updateAppMigrationMetadata({ + appId, + version: id, + }) + currentVersion = id } - - const counter = `(${++index}/${pendingMigrations.length})` - console.info(`Running migration ${id}... ${counter}`, { - migrationId: id, - appId, - }) - await func() - await updateAppMigrationMetadata({ - appId, - version: id, - }) - currentVersion = id - } + }) } catch (err) { logging.logAlert("Failed to run app migration", err) throw err } - }) - } - ) + } + ) - console.log(`App migration for "${appId}" processed`) + console.log(`App migration for "${appId}" processed`) + }) } diff --git a/packages/server/src/appMigrations/queue.ts b/packages/server/src/appMigrations/queue.ts index 72bb2f9b12..9dbc732e82 100644 --- a/packages/server/src/appMigrations/queue.ts +++ b/packages/server/src/appMigrations/queue.ts @@ -1,9 +1,23 @@ -import { queue } from "@budibase/backend-core" +import { queue, logging } from "@budibase/backend-core" import { Job } from "bull" import { MIGRATIONS } from "./migrations" import { processMigrations } from "./migrationsProcessor" -const appMigrationQueue = queue.createQueue(queue.JobQueue.APP_MIGRATION) +const MAX_ATTEMPTS = 3 + +const appMigrationQueue = queue.createQueue(queue.JobQueue.APP_MIGRATION, { + jobOptions: { + attempts: MAX_ATTEMPTS, + removeOnComplete: true, + removeOnFail: true, + }, + maxStalledCount: MAX_ATTEMPTS, + removeStalledCb: async (job: Job) => { + logging.logAlert( + `App migration failed, queue job ID: ${job.id} - reason: ${job.failedReason}` + ) + }, +}) appMigrationQueue.process(processMessage) async function processMessage(job: Job) {