Adding concurrency, and changing how context is set.
This commit is contained in:
parent
a40baf5111
commit
8c1735a1bd
|
@ -13,8 +13,8 @@ export async function processMigrations(
|
|||
) {
|
||||
console.log(`Processing app migration for "${appId}"`)
|
||||
try {
|
||||
// have to wrap in context, this gets the tenant from the app ID
|
||||
await context.doInAppContext(appId, async () => {
|
||||
// first step - setup full context - tenancy, app and guards
|
||||
await context.doInAppMigrationContext(appId, async () => {
|
||||
console.log(`Acquiring app migration lock for "${appId}"`)
|
||||
await locks.doWithLock(
|
||||
{
|
||||
|
@ -23,48 +23,45 @@ export async function processMigrations(
|
|||
resource: appId,
|
||||
},
|
||||
async () => {
|
||||
await context.doInAppMigrationContext(appId, async () => {
|
||||
console.log(`Lock acquired starting app migration for "${appId}"`)
|
||||
let currentVersion = await getAppMigrationVersion(appId)
|
||||
console.log(`Lock acquired starting app migration for "${appId}"`)
|
||||
let currentVersion = await getAppMigrationVersion(appId)
|
||||
|
||||
const pendingMigrations = migrations
|
||||
.filter(m => m.id > currentVersion)
|
||||
.sort((a, b) => a.id.localeCompare(b.id))
|
||||
const pendingMigrations = migrations
|
||||
.filter(m => m.id > currentVersion)
|
||||
.sort((a, b) => a.id.localeCompare(b.id))
|
||||
|
||||
const migrationIds = migrations.map(m => m.id).sort()
|
||||
console.log(
|
||||
`App migrations to run for "${appId}" - ${migrationIds.join(",")}`
|
||||
)
|
||||
const migrationIds = migrations.map(m => m.id).sort()
|
||||
console.log(
|
||||
`App migrations to run for "${appId}" - ${migrationIds.join(",")}`
|
||||
)
|
||||
|
||||
let index = 0
|
||||
for (const { id, func } of pendingMigrations) {
|
||||
const expectedMigration =
|
||||
migrationIds[migrationIds.indexOf(currentVersion) + 1]
|
||||
let index = 0
|
||||
for (const { id, func } of pendingMigrations) {
|
||||
const expectedMigration =
|
||||
migrationIds[migrationIds.indexOf(currentVersion) + 1]
|
||||
|
||||
if (expectedMigration !== id) {
|
||||
throw new Error(
|
||||
`Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected`
|
||||
)
|
||||
}
|
||||
|
||||
const counter = `(${++index}/${pendingMigrations.length})`
|
||||
console.info(`Running migration ${id}... ${counter}`, {
|
||||
migrationId: id,
|
||||
appId,
|
||||
})
|
||||
await func()
|
||||
await updateAppMigrationMetadata({
|
||||
appId,
|
||||
version: id,
|
||||
})
|
||||
currentVersion = id
|
||||
if (expectedMigration !== id) {
|
||||
throw new Error(
|
||||
`Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected`
|
||||
)
|
||||
}
|
||||
})
|
||||
|
||||
const counter = `(${++index}/${pendingMigrations.length})`
|
||||
console.info(`Running migration ${id}... ${counter}`, {
|
||||
migrationId: id,
|
||||
appId,
|
||||
})
|
||||
await func()
|
||||
await updateAppMigrationMetadata({
|
||||
appId,
|
||||
version: id,
|
||||
})
|
||||
currentVersion = id
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
console.log(`App migration for "${appId}" processed`)
|
||||
})
|
||||
console.log(`App migration for "${appId}" processed`)
|
||||
} catch (err) {
|
||||
logging.logAlert("Failed to run app migration", err)
|
||||
throw err
|
||||
|
|
|
@ -2,9 +2,10 @@ import { queue, logging } from "@budibase/backend-core"
|
|||
import { Job } from "bull"
|
||||
import { MIGRATIONS } from "./migrations"
|
||||
import { processMigrations } from "./migrationsProcessor"
|
||||
import { apiEnabled } from "../features"
|
||||
|
||||
const MAX_ATTEMPTS = 1
|
||||
const MAX_ATTEMPTS = 3
|
||||
// max number of migrations to run at same time, per node
|
||||
const MIGRATION_CONCURRENCY = 5
|
||||
|
||||
export type AppMigrationJob = {
|
||||
appId: string
|
||||
|
@ -13,10 +14,6 @@ export type AppMigrationJob = {
|
|||
let appMigrationQueue: queue.Queue<AppMigrationJob> | undefined
|
||||
|
||||
export function init() {
|
||||
// only run app migrations in main API services
|
||||
if (!apiEnabled()) {
|
||||
return
|
||||
}
|
||||
appMigrationQueue = queue.createQueue<AppMigrationJob>(
|
||||
queue.JobQueue.APP_MIGRATION,
|
||||
{
|
||||
|
@ -34,10 +31,10 @@ export function init() {
|
|||
}
|
||||
)
|
||||
|
||||
return appMigrationQueue.process(processMessage)
|
||||
return appMigrationQueue.process(MIGRATION_CONCURRENCY, processMessage)
|
||||
}
|
||||
|
||||
async function processMessage(job: Job) {
|
||||
async function processMessage(job: Job<AppMigrationJob>) {
|
||||
const { appId } = job.data
|
||||
|
||||
await processMigrations(appId, MIGRATIONS)
|
||||
|
|
|
@ -115,8 +115,9 @@ export async function startup(
|
|||
// configure events to use the pro audit log write
|
||||
// can't integrate directly into backend-core due to cyclic issues
|
||||
queuePromises.push(events.processors.init(pro.sdk.auditLogs.write))
|
||||
queuePromises.push(appMigrations.init())
|
||||
// app migrations and automations on other service
|
||||
if (automationsEnabled()) {
|
||||
queuePromises.push(appMigrations.init())
|
||||
queuePromises.push(automations.init())
|
||||
}
|
||||
queuePromises.push(initPro())
|
||||
|
|
Loading…
Reference in New Issue