Merge branch 'master' into cheeks-fixes

This commit is contained in:
Andrew Kingston 2024-06-11 15:04:44 +02:00 committed by GitHub
commit 1f0cf822f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 242 additions and 75 deletions

View File

@ -63,12 +63,12 @@ class InMemoryQueue implements Partial<Queue> {
* Same callback API as Bull, each callback passed to this will consume messages as they are
* available. Please note this is a queue service, not a notification service, so each
* consumer will receive different messages.
* @param func The callback function which will return a "Job", the same
* as the Bull API, within this job the property "data" contains the JSON message. Please
* note this is incredibly limited compared to Bull as in reality the Job would contain
* a lot more information about the queue and current status of Bull cluster.
*/
async process(func: any) {
async process(concurrencyOrFunc: number | any, func?: any) {
func = typeof concurrencyOrFunc === "number" ? func : concurrencyOrFunc
this._emitter.on("message", async () => {
if (this._messages.length <= 0) {
return

View File

@ -1,6 +1,16 @@
import { getDB } from "../db/db"
import { getGlobalDBName } from "../context"
import { TenantInfo } from "@budibase/types"
export function getTenantDB(tenantId: string) {
return getDB(getGlobalDBName(tenantId))
}
export async function saveTenantInfo(tenantInfo: TenantInfo) {
const db = getTenantDB(tenantInfo.tenantId)
// save the tenant info to db
return await db.put({
_id: "tenant_info",
...tenantInfo,
})
}

View File

@ -1,4 +1,4 @@
import { Duration, cache, context, db, env } from "@budibase/backend-core"
import { Duration, cache, db, env } from "@budibase/backend-core"
import { Database, DocumentType, Document } from "@budibase/types"
export interface AppMigrationDoc extends Document {
@ -42,7 +42,10 @@ export async function getAppMigrationVersion(appId: string): Promise<string> {
version = ""
}
await cache.store(cacheKey, version, EXPIRY_SECONDS)
// only cache if we have a valid version
if (version) {
await cache.store(cacheKey, version, EXPIRY_SECONDS)
}
return version
}
@ -54,8 +57,7 @@ export async function updateAppMigrationMetadata({
appId: string
version: string
}): Promise<void> {
const db = context.getAppDB()
const appDb = db.getDB(appId)
let appMigrationDoc: AppMigrationDoc
try {
@ -70,7 +72,7 @@ export async function updateAppMigrationMetadata({
version: "",
history: {},
}
await db.put(appMigrationDoc)
await appDb.put(appMigrationDoc)
appMigrationDoc = await getFromDB(appId)
}
@ -82,7 +84,7 @@ export async function updateAppMigrationMetadata({
[version]: { runAt: new Date().toISOString() },
},
}
await db.put(updatedMigrationDoc)
await appDb.put(updatedMigrationDoc)
const cacheKey = getCacheKey(appId)

View File

@ -1,4 +1,4 @@
import queue from "./queue"
import { getAppMigrationQueue } from "./queue"
import { Next } from "koa"
import { getAppMigrationVersion } from "./appMigrationMetadata"
import { MIGRATIONS } from "./migrations"
@ -16,7 +16,10 @@ export type AppMigration = {
export function getLatestEnabledMigrationId(migrations?: AppMigration[]) {
let latestMigrationId: string | undefined
for (let migration of migrations || MIGRATIONS) {
if (!migrations) {
migrations = MIGRATIONS
}
for (let migration of migrations) {
// if a migration is disabled, all migrations after it are disabled
if (migration.disabled) {
break
@ -35,10 +38,18 @@ export async function checkMissingMigrations(
next: Next,
appId: string
) {
const currentVersion = await getAppMigrationVersion(appId)
const latestMigration = getLatestEnabledMigrationId()
// no migrations set - edge case, don't try to do anything
if (!latestMigration) {
return next()
}
const currentVersion = await getAppMigrationVersion(appId)
const queue = getAppMigrationQueue()
if (
queue &&
latestMigration &&
getTimestamp(currentVersion) < getTimestamp(latestMigration)
) {

View File

@ -13,8 +13,8 @@ export async function processMigrations(
) {
console.log(`Processing app migration for "${appId}"`)
try {
// have to wrap in context, this gets the tenant from the app ID
await context.doInAppContext(appId, async () => {
// first step - setup full context - tenancy, app and guards
await context.doInAppMigrationContext(appId, async () => {
console.log(`Acquiring app migration lock for "${appId}"`)
await locks.doWithLock(
{
@ -23,48 +23,45 @@ export async function processMigrations(
resource: appId,
},
async () => {
await context.doInAppMigrationContext(appId, async () => {
console.log(`Lock acquired starting app migration for "${appId}"`)
let currentVersion = await getAppMigrationVersion(appId)
console.log(`Lock acquired starting app migration for "${appId}"`)
let currentVersion = await getAppMigrationVersion(appId)
const pendingMigrations = migrations
.filter(m => m.id > currentVersion)
.sort((a, b) => a.id.localeCompare(b.id))
const pendingMigrations = migrations
.filter(m => m.id > currentVersion)
.sort((a, b) => a.id.localeCompare(b.id))
const migrationIds = migrations.map(m => m.id).sort()
console.log(
`App migrations to run for "${appId}" - ${migrationIds.join(",")}`
)
const migrationIds = migrations.map(m => m.id).sort()
console.log(
`App migrations to run for "${appId}" - ${migrationIds.join(",")}`
)
let index = 0
for (const { id, func } of pendingMigrations) {
const expectedMigration =
migrationIds[migrationIds.indexOf(currentVersion) + 1]
let index = 0
for (const { id, func } of pendingMigrations) {
const expectedMigration =
migrationIds[migrationIds.indexOf(currentVersion) + 1]
if (expectedMigration !== id) {
throw new Error(
`Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected`
)
}
const counter = `(${++index}/${pendingMigrations.length})`
console.info(`Running migration ${id}... ${counter}`, {
migrationId: id,
appId,
})
await func()
await updateAppMigrationMetadata({
appId,
version: id,
})
currentVersion = id
if (expectedMigration !== id) {
throw new Error(
`Migration ${id} could not run, update for "${id}" is running but ${expectedMigration} is expected`
)
}
})
const counter = `(${++index}/${pendingMigrations.length})`
console.info(`Running migration ${id}... ${counter}`, {
migrationId: id,
appId,
})
await func()
await updateAppMigrationMetadata({
appId,
version: id,
})
currentVersion = id
}
}
)
console.log(`App migration for "${appId}" processed`)
})
console.log(`App migration for "${appId}" processed`)
} catch (err) {
logging.logAlert("Failed to run app migration", err)
throw err

View File

@ -4,26 +4,42 @@ import { MIGRATIONS } from "./migrations"
import { processMigrations } from "./migrationsProcessor"
const MAX_ATTEMPTS = 3
// max number of migrations to run at same time, per node
const MIGRATION_CONCURRENCY = 5
const appMigrationQueue = queue.createQueue(queue.JobQueue.APP_MIGRATION, {
jobOptions: {
attempts: MAX_ATTEMPTS,
removeOnComplete: true,
removeOnFail: true,
},
maxStalledCount: MAX_ATTEMPTS,
removeStalledCb: async (job: Job) => {
logging.logAlert(
`App migration failed, queue job ID: ${job.id} - reason: ${job.failedReason}`
)
},
})
appMigrationQueue.process(processMessage)
export type AppMigrationJob = {
appId: string
}
async function processMessage(job: Job) {
let appMigrationQueue: queue.Queue<AppMigrationJob> | undefined
export function init() {
appMigrationQueue = queue.createQueue<AppMigrationJob>(
queue.JobQueue.APP_MIGRATION,
{
jobOptions: {
attempts: MAX_ATTEMPTS,
removeOnComplete: true,
removeOnFail: true,
},
maxStalledCount: MAX_ATTEMPTS,
removeStalledCb: async (job: Job) => {
logging.logAlert(
`App migration failed, queue job ID: ${job.id} - reason: ${job.failedReason}`
)
},
}
)
return appMigrationQueue.process(MIGRATION_CONCURRENCY, processMessage)
}
async function processMessage(job: Job<AppMigrationJob>) {
const { appId } = job.data
await processMigrations(appId, MIGRATIONS)
}
export default appMigrationQueue
export function getAppMigrationQueue() {
return appMigrationQueue
}

View File

@ -3,6 +3,7 @@ import { KoaAdapter } from "@bull-board/koa"
import { queue } from "@budibase/backend-core"
import * as automation from "../threads/automation"
import { backups } from "@budibase/pro"
import { getAppMigrationQueue } from "../appMigrations/queue"
import { createBullBoard } from "@bull-board/api"
import BullQueue from "bull"
@ -16,10 +17,14 @@ const PATH_PREFIX = "/bulladmin"
export async function init() {
// Set up queues for bull board admin
const backupQueue = backups.getBackupQueue()
const appMigrationQueue = getAppMigrationQueue()
const queues = [automationQueue]
if (backupQueue) {
queues.push(backupQueue)
}
if (appMigrationQueue) {
queues.push(appMigrationQueue)
}
const adapters = []
const serverAdapter: any = new KoaAdapter()
for (let queue of queues) {

View File

@ -15,6 +15,7 @@ import * as fileSystem from "../utilities/fileSystem"
import { default as eventEmitter, init as eventInit } from "../events"
import * as migrations from "../migrations"
import * as bullboard from "../automations/bullboard"
import * as appMigrations from "../appMigrations/queue"
import * as pro from "@budibase/pro"
import * as api from "../api"
import sdk from "../sdk"
@ -114,7 +115,9 @@ export async function startup(
// configure events to use the pro audit log write
// can't integrate directly into backend-core due to cyclic issues
queuePromises.push(events.processors.init(pro.sdk.auditLogs.write))
// app migrations and automations on other service
if (automationsEnabled()) {
queuePromises.push(appMigrations.init())
queuePromises.push(automations.init())
}
queuePromises.push(initPro())

View File

@ -7,3 +7,4 @@ export * from "./schedule"
export * from "./templates"
export * from "./environmentVariables"
export * from "./auditLogs"
export * from "./tenantInfo"

View File

@ -0,0 +1,13 @@
import { Document } from "../document"
export interface TenantInfo extends Document {
owner: {
email: string
password?: string
ssoId?: string
givenName?: string
familyName?: string
budibaseUserId?: string
}
tenantId: string
}

View File

@ -0,0 +1,10 @@
import { tenancy } from "@budibase/backend-core"
import { TenantInfo, Ctx } from "@budibase/types"
export const save = async (ctx: Ctx<TenantInfo>) => {
const response = await tenancy.saveTenantInfo(ctx.request.body)
ctx.body = {
_id: response.id,
_rev: response.rev,
}
}

View File

@ -76,6 +76,10 @@ const PUBLIC_ENDPOINTS = [
route: "/api/global/users/invite",
method: "GET",
},
{
route: "/api/global/tenant",
method: "POST",
},
]
const NO_TENANCY_ENDPOINTS = [
@ -121,6 +125,10 @@ const NO_TENANCY_ENDPOINTS = [
route: "/api/global/users/invite/:code",
method: "GET",
},
{
route: "/api/global/tenant",
method: "POST",
},
]
// most public endpoints are gets, but some are posts

View File

@ -0,0 +1,33 @@
import Router from "@koa/router"
import Joi from "joi"
import { auth } from "@budibase/backend-core"
import * as controller from "../../controllers/global/tenant"
import cloudRestricted from "../../../middleware/cloudRestricted"
const router: Router = new Router()
const OPTIONAL_STRING = Joi.string().optional().allow(null).allow("")
function buildTenantInfoValidation() {
return auth.joiValidator.body(
Joi.object({
owner: Joi.object({
email: Joi.string().required(),
password: OPTIONAL_STRING,
ssoId: OPTIONAL_STRING,
givenName: OPTIONAL_STRING,
familyName: OPTIONAL_STRING,
budibaseUserId: OPTIONAL_STRING,
}).required(),
tenantId: Joi.string().required(),
}).required()
)
}
router.post(
"/api/global/tenant",
cloudRestricted,
buildTenantInfoValidation(),
controller.save
)
export default router

View File

@ -0,0 +1,47 @@
import { TenantInfo } from "@budibase/types"
import { TestConfiguration } from "../../../../tests"
import { tenancy as _tenancy } from "@budibase/backend-core"
const tenancy = jest.mocked(_tenancy)
describe("/api/global/tenant", () => {
const config = new TestConfiguration()
beforeAll(async () => {
await config.beforeAll()
})
afterAll(async () => {
await config.afterAll()
})
beforeEach(() => {
jest.clearAllMocks()
})
describe("POST /api/global/tenant", () => {
it("should save the tenantInfo", async () => {
tenancy.saveTenantInfo = jest.fn().mockImplementation(async () => ({
id: "DOC_ID",
ok: true,
rev: "DOC_REV",
}))
const tenantInfo: TenantInfo = {
owner: {
email: "test@example.com",
password: "PASSWORD",
ssoId: "SSO_ID",
givenName: "Jane",
familyName: "Doe",
budibaseUserId: "USER_ID",
},
tenantId: "tenant123",
}
const response = await config.api.tenants.saveTenantInfo(tenantInfo)
expect(_tenancy.saveTenantInfo).toHaveBeenCalledTimes(1)
expect(_tenancy.saveTenantInfo).toHaveBeenCalledWith(tenantInfo)
expect(response.text).toEqual('{"_id":"DOC_ID","_rev":"DOC_REV"}')
})
})
})

View File

@ -1,6 +1,7 @@
import Router from "@koa/router"
import { api as pro } from "@budibase/pro"
import userRoutes from "./global/users"
import tenantRoutes from "./global/tenant"
import configRoutes from "./global/configs"
import workspaceRoutes from "./global/workspaces"
import templateRoutes from "./global/templates"
@ -40,6 +41,7 @@ export const routes: Router[] = [
accountRoutes,
restoreRoutes,
eventRoutes,
tenantRoutes,
pro.scim,
]

View File

@ -1,3 +1,4 @@
import { TenantInfo } from "@budibase/types"
import TestConfiguration from "../TestConfiguration"
import { TestAPI, TestAPIOpts } from "./base"
@ -14,4 +15,12 @@ export class TenantAPI extends TestAPI {
.set(opts?.headers)
.expect(opts?.status ? opts.status : 204)
}
saveTenantInfo = (tenantInfo: TenantInfo) => {
return this.request
.post("/api/global/tenant")
.set(this.config.internalAPIHeaders())
.send(tenantInfo)
.expect(200)
}
}

View File

@ -3483,10 +3483,10 @@
dependencies:
lodash "^4.17.21"
"@koa/cors@^3.1.0":
version "3.4.3"
resolved "https://registry.yarnpkg.com/@koa/cors/-/cors-3.4.3.tgz#d669ee6e8d6e4f0ec4a7a7b0a17e7a3ed3752ebb"
integrity sha512-WPXQUaAeAMVaLTEFpoq3T2O1C+FstkjJnDQqy95Ck1UdILajsRhu6mhJ8H2f4NFPRBoCNN+qywTJfq/gGki5mw==
"@koa/cors@^5.0.0":
version "5.0.0"
resolved "https://registry.yarnpkg.com/@koa/cors/-/cors-5.0.0.tgz#0029b5f057fa0d0ae0e37dd2c89ece315a0daffd"
integrity sha512-x/iUDjcS90W69PryLDIMgFyV21YLTnG9zOpPXS7Bkt2b8AsY3zZsIpOLBkYr9fBcF3HbkKaER5hOBZLfpLgYNw==
dependencies:
vary "^1.1.2"
@ -5797,10 +5797,10 @@
"@types/koa-compose" "*"
"@types/node" "*"
"@types/koa__cors@^3.1.1":
version "3.3.1"
resolved "https://registry.yarnpkg.com/@types/koa__cors/-/koa__cors-3.3.1.tgz#0ec7543c4c620fd23451bfdd3e21b9a6aadedccd"
integrity sha512-aFGYhTFW7651KhmZZ05VG0QZJre7QxBxDj2LF1lf6GA/wSXEfKVAJxiQQWzRV4ZoMzQIO8vJBXKsUcRuvYK9qw==
"@types/koa__cors@^5.0.0":
version "5.0.0"
resolved "https://registry.yarnpkg.com/@types/koa__cors/-/koa__cors-5.0.0.tgz#74567a045b599266e2cd3940cef96cedecc2ef1f"
integrity sha512-LCk/n25Obq5qlernGOK/2LUwa/2YJb2lxHUkkvYFDOpLXlVI6tKcdfCHRBQnOY4LwH6el5WOLs6PD/a8Uzau6g==
dependencies:
"@types/koa" "*"
@ -16299,10 +16299,10 @@ node-source-walk@^5.0.0:
dependencies:
"@babel/parser" "^7.0.0"
nodemailer@6.7.2:
version "6.7.2"
resolved "https://registry.yarnpkg.com/nodemailer/-/nodemailer-6.7.2.tgz#44b2ad5f7ed71b7067f7a21c4fedabaec62b85e0"
integrity sha512-Dz7zVwlef4k5R71fdmxwR8Q39fiboGbu3xgswkzGwczUfjp873rVxt1O46+Fh0j1ORnAC6L9+heI8uUpO6DT7Q==
nodemailer@6.9.13:
version "6.9.13"
resolved "https://registry.yarnpkg.com/nodemailer/-/nodemailer-6.9.13.tgz#5b292bf1e92645f4852ca872c56a6ba6c4a3d3d6"
integrity sha512-7o38Yogx6krdoBf3jCAqnIN4oSQFx+fMa0I7dK1D+me9kBxx12D+/33wSb+fhOCtIxvYJ+4x4IMEhmhCKfAiOA==
nodemailer@6.9.9:
version "6.9.9"