Merge pull request #13909 from Budibase/fix/some-potential-app-migration-issues

Some potential app migration failure causes
This commit is contained in:
Michael Drury 2024-06-10 23:45:04 +01:00 committed by GitHub
commit 51fad26be0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 61 additions and 55 deletions

View File

@ -63,12 +63,12 @@ class InMemoryQueue implements Partial<Queue> {
* Same callback API as Bull, each callback passed to this will consume messages as they are * Same callback API as Bull, each callback passed to this will consume messages as they are
* available. Please note this is a queue service, not a notification service, so each * available. Please note this is a queue service, not a notification service, so each
* consumer will receive different messages. * consumer will receive different messages.
* @param func The callback function which will return a "Job", the same
* as the Bull API, within this job the property "data" contains the JSON message. Please * as the Bull API, within this job the property "data" contains the JSON message. Please
* note this is incredibly limited compared to Bull as in reality the Job would contain * note this is incredibly limited compared to Bull as in reality the Job would contain
* a lot more information about the queue and current status of Bull cluster. * a lot more information about the queue and current status of Bull cluster.
*/ */
async process(func: any) { async process(concurrencyOrFunc: number | any, func?: any) {
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
this._emitter.on("message", async () => { this._emitter.on("message", async () => {
if (this._messages.length <= 0) { if (this._messages.length <= 0) {
return return

View File

@ -1,4 +1,4 @@
import { Duration, cache, context, db, env } from "@budibase/backend-core" import { Duration, cache, db, env } from "@budibase/backend-core"
import { Database, DocumentType, Document } from "@budibase/types" import { Database, DocumentType, Document } from "@budibase/types"
export interface AppMigrationDoc extends Document { export interface AppMigrationDoc extends Document {
@ -42,7 +42,10 @@ export async function getAppMigrationVersion(appId: string): Promise<string> {
version = "" version = ""
} }
await cache.store(cacheKey, version, EXPIRY_SECONDS) // only cache if we have a valid version
if (version) {
await cache.store(cacheKey, version, EXPIRY_SECONDS)
}
return version return version
} }
@ -54,8 +57,7 @@ export async function updateAppMigrationMetadata({
appId: string appId: string
version: string version: string
}): Promise<void> { }): Promise<void> {
const db = context.getAppDB() const appDb = db.getDB(appId)
let appMigrationDoc: AppMigrationDoc let appMigrationDoc: AppMigrationDoc
try { try {
@ -70,7 +72,7 @@ export async function updateAppMigrationMetadata({
version: "", version: "",
history: {}, history: {},
} }
await db.put(appMigrationDoc) await appDb.put(appMigrationDoc)
appMigrationDoc = await getFromDB(appId) appMigrationDoc = await getFromDB(appId)
} }
@ -82,7 +84,7 @@ export async function updateAppMigrationMetadata({
[version]: { runAt: new Date().toISOString() }, [version]: { runAt: new Date().toISOString() },
}, },
} }
await db.put(updatedMigrationDoc) await appDb.put(updatedMigrationDoc)
const cacheKey = getCacheKey(appId) const cacheKey = getCacheKey(appId)

View File

@ -16,7 +16,10 @@ export type AppMigration = {
export function getLatestEnabledMigrationId(migrations?: AppMigration[]) { export function getLatestEnabledMigrationId(migrations?: AppMigration[]) {
let latestMigrationId: string | undefined let latestMigrationId: string | undefined
for (let migration of migrations || MIGRATIONS) { if (!migrations) {
migrations = MIGRATIONS
}
for (let migration of migrations) {
// if a migration is disabled, all migrations after it are disabled // if a migration is disabled, all migrations after it are disabled
if (migration.disabled) { if (migration.disabled) {
break break
@ -35,8 +38,14 @@ export async function checkMissingMigrations(
next: Next, next: Next,
appId: string appId: string
) { ) {
const currentVersion = await getAppMigrationVersion(appId)
const latestMigration = getLatestEnabledMigrationId() const latestMigration = getLatestEnabledMigrationId()
// no migrations set - edge case, don't try to do anything
if (!latestMigration) {
return next()
}
const currentVersion = await getAppMigrationVersion(appId)
const queue = getAppMigrationQueue() const queue = getAppMigrationQueue()
if ( if (

View File

@ -13,8 +13,8 @@ export async function processMigrations(
) { ) {
console.log(`Processing app migration for "${appId}"`) console.log(`Processing app migration for "${appId}"`)
try { try {
// have to wrap in context, this gets the tenant from the app ID // first step - setup full context - tenancy, app and guards
await context.doInAppContext(appId, async () => { await context.doInAppMigrationContext(appId, async () => {
console.log(`Acquiring app migration lock for "${appId}"`) console.log(`Acquiring app migration lock for "${appId}"`)
await locks.doWithLock( await locks.doWithLock(
{ {
@ -23,48 +23,45 @@ export async function processMigrations(
resource: appId, resource: appId,
}, },
async () => { async () => {
await context.doInAppMigrationContext(appId, async () => { console.log(`Lock acquired starting app migration for "${appId}"`)
console.log(`Lock acquired starting app migration for "${appId}"`) let currentVersion = await getAppMigrationVersion(appId)
let currentVersion = await getAppMigrationVersion(appId)
const pendingMigrations = migrations const pendingMigrations = migrations
.filter(m => m.id > currentVersion) .filter(m => m.id > currentVersion)
.sort((a, b) => a.id.localeCompare(b.id)) .sort((a, b) => a.id.localeCompare(b.id))
const migrationIds = migrations.map(m => m.id).sort() const migrationIds = migrations.map(m => m.id).sort()
console.log( console.log(
`App migrations to run for "${appId}" - ${migrationIds.join(",")}` `App migrations to run for "${appId}" - ${migrationIds.join(",")}`
) )
let index = 0 let index = 0
for (const { id, func } of pendingMigrations) { for (const { id, func } of pendingMigrations) {
const expectedMigration = const expectedMigration =
migrationIds[migrationIds.indexOf(currentVersion) + 1] migrationIds[migrationIds.indexOf(currentVersion) + 1]
if (expectedMigration !== id) { if (expectedMigration !== id) {
throw new Error( throw new Error(
`Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected` `Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected`
) )
}
const counter = `(${++index}/${pendingMigrations.length})`
console.info(`Running migration ${id}... ${counter}`, {
migrationId: id,
appId,
})
await func()
await updateAppMigrationMetadata({
appId,
version: id,
})
currentVersion = id
} }
})
const counter = `(${++index}/${pendingMigrations.length})`
console.info(`Running migration ${id}... ${counter}`, {
migrationId: id,
appId,
})
await func()
await updateAppMigrationMetadata({
appId,
version: id,
})
currentVersion = id
}
} }
) )
console.log(`App migration for "${appId}" processed`)
}) })
console.log(`App migration for "${appId}" processed`)
} catch (err) { } catch (err) {
logging.logAlert("Failed to run app migration", err) logging.logAlert("Failed to run app migration", err)
throw err throw err

View File

@ -2,9 +2,10 @@ import { queue, logging } from "@budibase/backend-core"
import { Job } from "bull" import { Job } from "bull"
import { MIGRATIONS } from "./migrations" import { MIGRATIONS } from "./migrations"
import { processMigrations } from "./migrationsProcessor" import { processMigrations } from "./migrationsProcessor"
import { apiEnabled } from "../features"
const MAX_ATTEMPTS = 1 const MAX_ATTEMPTS = 3
// max number of migrations to run at same time, per node
const MIGRATION_CONCURRENCY = 5
export type AppMigrationJob = { export type AppMigrationJob = {
appId: string appId: string
@ -13,10 +14,6 @@ export type AppMigrationJob = {
let appMigrationQueue: queue.Queue<AppMigrationJob> | undefined let appMigrationQueue: queue.Queue<AppMigrationJob> | undefined
export function init() { export function init() {
// only run app migrations in main API services
if (!apiEnabled()) {
return
}
appMigrationQueue = queue.createQueue<AppMigrationJob>( appMigrationQueue = queue.createQueue<AppMigrationJob>(
queue.JobQueue.APP_MIGRATION, queue.JobQueue.APP_MIGRATION,
{ {
@ -34,10 +31,10 @@ export function init() {
} }
) )
return appMigrationQueue.process(processMessage) return appMigrationQueue.process(MIGRATION_CONCURRENCY, processMessage)
} }
async function processMessage(job: Job) { async function processMessage(job: Job<AppMigrationJob>) {
const { appId } = job.data const { appId } = job.data
await processMigrations(appId, MIGRATIONS) await processMigrations(appId, MIGRATIONS)

View File

@ -115,8 +115,9 @@ export async function startup(
// configure events to use the pro audit log write // configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues // can't integrate directly into backend-core due to cyclic issues
queuePromises.push(events.processors.init(pro.sdk.auditLogs.write)) queuePromises.push(events.processors.init(pro.sdk.auditLogs.write))
queuePromises.push(appMigrations.init()) // app migrations and automations on other service
if (automationsEnabled()) { if (automationsEnabled()) {
queuePromises.push(appMigrations.init())
queuePromises.push(automations.init()) queuePromises.push(automations.init())
} }
queuePromises.push(initPro()) queuePromises.push(initPro())