Merge pull request #13689 from Budibase/feature/sqs-table-cleanup

SQS cleanup (tables and apps)
This commit is contained in:
Michael Drury 2024-05-17 11:27:50 +01:00 committed by GitHub
commit 8547b9b66e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 204 additions and 62 deletions

View File

@ -55,7 +55,9 @@
} }
], ],
"no-redeclare": "off", "no-redeclare": "off",
"@typescript-eslint/no-redeclare": "error" "@typescript-eslint/no-redeclare": "error",
// have to turn this off to allow function overloading in typescript
"no-dupe-class-members": "off"
} }
}, },
{ {
@ -88,7 +90,9 @@
"jest/expect-expect": "off", "jest/expect-expect": "off",
// We do this in some tests where the behaviour of internal tables // We do this in some tests where the behaviour of internal tables
// differs to external, but the API is broadly the same // differs to external, but the API is broadly the same
"jest/no-conditional-expect": "off" "jest/no-conditional-expect": "off",
// have to turn this off to allow function overloading in typescript
"no-dupe-class-members": "off"
} }
}, },
{ {

View File

@ -1,14 +1,31 @@
import PouchDB from "pouchdb" import PouchDB from "pouchdb"
import { getPouchDB, closePouchDB } from "./couch" import { getPouchDB, closePouchDB } from "./couch"
import { DocumentType } from "../constants" import { DocumentType } from "@budibase/types"
enum ReplicationDirection {
TO_PRODUCTION = "toProduction",
TO_DEV = "toDev",
}
class Replication { class Replication {
source: PouchDB.Database source: PouchDB.Database
target: PouchDB.Database target: PouchDB.Database
direction: ReplicationDirection | undefined
constructor({ source, target }: { source: string; target: string }) { constructor({ source, target }: { source: string; target: string }) {
this.source = getPouchDB(source) this.source = getPouchDB(source)
this.target = getPouchDB(target) this.target = getPouchDB(target)
if (
source.startsWith(DocumentType.APP_DEV) &&
target.startsWith(DocumentType.APP)
) {
this.direction = ReplicationDirection.TO_PRODUCTION
} else if (
source.startsWith(DocumentType.APP) &&
target.startsWith(DocumentType.APP_DEV)
) {
this.direction = ReplicationDirection.TO_DEV
}
} }
async close() { async close() {
@ -40,12 +57,18 @@ class Replication {
} }
const filter = opts.filter const filter = opts.filter
const direction = this.direction
const toDev = direction === ReplicationDirection.TO_DEV
delete opts.filter delete opts.filter
return { return {
...opts, ...opts,
filter: (doc: any, params: any) => { filter: (doc: any, params: any) => {
if (doc._id && doc._id.startsWith(DocumentType.AUTOMATION_LOG)) { // don't sync design documents
if (toDev && doc._id?.startsWith("_design")) {
return false
}
if (doc._id?.startsWith(DocumentType.AUTOMATION_LOG)) {
return false return false
} }
if (doc._id === DocumentType.APP_METADATA) { if (doc._id === DocumentType.APP_METADATA) {

View File

@ -12,6 +12,7 @@ import {
isDocument, isDocument,
RowResponse, RowResponse,
RowValue, RowValue,
SQLiteDefinition,
SqlQueryBinding, SqlQueryBinding,
} from "@budibase/types" } from "@budibase/types"
import { getCouchInfo } from "./connections" import { getCouchInfo } from "./connections"
@ -21,6 +22,8 @@ import { ReadStream, WriteStream } from "fs"
import { newid } from "../../docIds/newid" import { newid } from "../../docIds/newid"
import { SQLITE_DESIGN_DOC_ID } from "../../constants" import { SQLITE_DESIGN_DOC_ID } from "../../constants"
import { DDInstrumentedDatabase } from "../instrumentation" import { DDInstrumentedDatabase } from "../instrumentation"
import { checkSlashesInUrl } from "../../helpers"
import env from "../../environment"
const DATABASE_NOT_FOUND = "Database does not exist." const DATABASE_NOT_FOUND = "Database does not exist."
@ -281,25 +284,61 @@ export class DatabaseImpl implements Database {
}) })
} }
async _sqlQuery<T>(
url: string,
method: "POST" | "GET",
body?: Record<string, any>
): Promise<T> {
url = checkSlashesInUrl(`${this.couchInfo.sqlUrl}/${url}`)
const args: { url: string; method: string; cookie: string; body?: any } = {
url,
method,
cookie: this.couchInfo.cookie,
}
if (body) {
args.body = body
}
return this.performCall(() => {
return async () => {
const response = await directCouchUrlCall(args)
const json = await response.json()
if (response.status > 300) {
throw json
}
return json as T
}
})
}
async sql<T extends Document>( async sql<T extends Document>(
sql: string, sql: string,
parameters?: SqlQueryBinding parameters?: SqlQueryBinding
): Promise<T[]> { ): Promise<T[]> {
const dbName = this.name const dbName = this.name
const url = `/${dbName}/${SQLITE_DESIGN_DOC_ID}` const url = `/${dbName}/${SQLITE_DESIGN_DOC_ID}`
const response = await directCouchUrlCall({ return await this._sqlQuery<T[]>(url, "POST", {
url: `${this.couchInfo.sqlUrl}/${url}`,
method: "POST",
cookie: this.couchInfo.cookie,
body: {
query: sql, query: sql,
args: parameters, args: parameters,
},
}) })
if (response.status > 300) {
throw new Error(await response.text())
} }
return (await response.json()) as T[]
// checks design document is accurate (cleans up tables)
// this will check the design document and remove anything from
// disk which is not supposed to be there
async sqlDiskCleanup(): Promise<void> {
const dbName = this.name
const url = `/${dbName}/_cleanup`
return await this._sqlQuery<void>(url, "POST")
}
// removes a document from sqlite
async sqlPurgeDocument(docIds: string[] | string): Promise<void> {
if (!Array.isArray(docIds)) {
docIds = [docIds]
}
const dbName = this.name
const url = `/${dbName}/_purge`
return await this._sqlQuery<void>(url, "POST", { docs: docIds })
} }
async query<T extends Document>( async query<T extends Document>(
@ -314,6 +353,17 @@ export class DatabaseImpl implements Database {
async destroy() { async destroy() {
try { try {
if (env.SQS_SEARCH_ENABLE) {
// delete the design document, then run the cleanup operation
try {
const definition = await this.get<SQLiteDefinition>(
SQLITE_DESIGN_DOC_ID
)
await this.remove(SQLITE_DESIGN_DOC_ID, definition._rev)
} finally {
await this.sqlDiskCleanup()
}
}
return await this.nano().db.destroy(this.name) return await this.nano().db.destroy(this.name)
} catch (err: any) { } catch (err: any) {
// didn't exist, don't worry // didn't exist, don't worry

View File

@ -21,7 +21,7 @@ export async function directCouchUrlCall({
url: string url: string
cookie: string cookie: string
method: string method: string
body?: any body?: Record<string, any>
}) { }) {
const params: any = { const params: any = {
method: method, method: method,

View File

@ -56,12 +56,17 @@ export class DDInstrumentedDatabase implements Database {
}) })
} }
remove(idOrDoc: Document): Promise<DocumentDestroyResponse>
remove(idOrDoc: string, rev?: string): Promise<DocumentDestroyResponse>
remove( remove(
id: string | Document, idOrDoc: string | Document,
rev?: string | undefined rev?: string
): Promise<DocumentDestroyResponse> { ): Promise<DocumentDestroyResponse> {
return tracer.trace("db.remove", span => { return tracer.trace("db.remove", span => {
span?.addTags({ db_name: this.name, doc_id: id }) span?.addTags({ db_name: this.name, doc_id: idOrDoc })
const isDocument = typeof idOrDoc === "object"
const id = isDocument ? idOrDoc._id! : idOrDoc
rev = isDocument ? idOrDoc._rev : rev
return this.db.remove(id, rev) return this.db.remove(id, rev)
}) })
} }
@ -160,4 +165,18 @@ export class DDInstrumentedDatabase implements Database {
return this.db.sql(sql, parameters) return this.db.sql(sql, parameters)
}) })
} }
sqlPurgeDocument(docIds: string[] | string): Promise<void> {
return tracer.trace("db.sqlPurgeDocument", span => {
span?.addTags({ db_name: this.name })
return this.db.sqlPurgeDocument(docIds)
})
}
sqlDiskCleanup(): Promise<void> {
return tracer.trace("db.sqlDiskCleanup", span => {
span?.addTags({ db_name: this.name })
return this.db.sqlDiskCleanup()
})
}
} }

View File

@ -109,6 +109,7 @@ const environment = {
API_ENCRYPTION_KEY: getAPIEncryptionKey(), API_ENCRYPTION_KEY: getAPIEncryptionKey(),
COUCH_DB_URL: process.env.COUCH_DB_URL || "http://localhost:4005", COUCH_DB_URL: process.env.COUCH_DB_URL || "http://localhost:4005",
COUCH_DB_SQL_URL: process.env.COUCH_DB_SQL_URL || "http://localhost:4006", COUCH_DB_SQL_URL: process.env.COUCH_DB_SQL_URL || "http://localhost:4006",
SQS_SEARCH_ENABLE: process.env.SQS_SEARCH_ENABLE,
COUCH_DB_USERNAME: process.env.COUCH_DB_USER, COUCH_DB_USERNAME: process.env.COUCH_DB_USER,
COUCH_DB_PASSWORD: process.env.COUCH_DB_PASSWORD, COUCH_DB_PASSWORD: process.env.COUCH_DB_PASSWORD,
GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID, GOOGLE_CLIENT_ID: process.env.GOOGLE_CLIENT_ID,

View File

@ -492,7 +492,7 @@ export class UserDB {
await platform.users.removeUser(dbUser) await platform.users.removeUser(dbUser)
await db.remove(userId, dbUser._rev) await db.remove(userId, dbUser._rev!)
const creatorsToDelete = (await isCreator(dbUser)) ? 1 : 0 const creatorsToDelete = (await isCreator(dbUser)) ? 1 : 0
await UserDB.quotas.removeUsers(1, creatorsToDelete) await UserDB.quotas.removeUsers(1, creatorsToDelete)

View File

@ -33,6 +33,7 @@ import {
} from "@budibase/types" } from "@budibase/types"
import sdk from "../../../sdk" import sdk from "../../../sdk"
import env from "../../../environment" import env from "../../../environment"
import { runStaticFormulaChecks } from "./bulkFormula"
export async function clearColumns(table: Table, columnNames: string[]) { export async function clearColumns(table: Table, columnNames: string[]) {
const db = context.getAppDB() const db = context.getAppDB()
@ -324,7 +325,7 @@ class TableSaveFunctions {
user: this.user, user: this.user,
}) })
if (env.SQS_SEARCH_ENABLE) { if (env.SQS_SEARCH_ENABLE) {
await sdk.tables.sqs.addTableToSqlite(table) await sdk.tables.sqs.addTable(table)
} }
return table return table
} }
@ -496,5 +497,31 @@ export function setStaticSchemas(datasource: Datasource, table: Table) {
return table return table
} }
export async function internalTableCleanup(table: Table, rows?: Row[]) {
const db = context.getAppDB()
const tableId = table._id!
// remove table search index
if (!env.isTest() || env.COUCH_DB_URL) {
const currentIndexes = await db.getIndexes()
const existingIndex = currentIndexes.indexes.find(
(existing: any) => existing.name === `search:${tableId}`
)
if (existingIndex) {
await db.deleteIndex(existingIndex)
}
}
// has to run after, make sure it has _id
await runStaticFormulaChecks(table, {
deletion: true,
})
if (rows) {
await AttachmentCleanup.tableDelete(table, rows)
}
if (env.SQS_SEARCH_ENABLE) {
await sdk.tables.sqs.removeTable(table)
}
}
const _TableSaveFunctions = TableSaveFunctions const _TableSaveFunctions = TableSaveFunctions
export { _TableSaveFunctions as TableSaveFunctions } export { _TableSaveFunctions as TableSaveFunctions }

View File

@ -19,7 +19,7 @@ import {
sqlOutputProcessing, sqlOutputProcessing,
} from "../../../../api/controllers/row/utils" } from "../../../../api/controllers/row/utils"
import sdk from "../../../index" import sdk from "../../../index"
import { context } from "@budibase/backend-core" import { context, SQLITE_DESIGN_DOC_ID } from "@budibase/backend-core"
import { import {
CONSTANT_INTERNAL_ROW_COLS, CONSTANT_INTERNAL_ROW_COLS,
SQS_DATASOURCE_INTERNAL, SQS_DATASOURCE_INTERNAL,
@ -195,6 +195,10 @@ export async function search(
} }
} catch (err: any) { } catch (err: any) {
const msg = typeof err === "string" ? err : err.message const msg = typeof err === "string" ? err : err.message
if (err.status === 404 && err.message?.includes(SQLITE_DESIGN_DOC_ID)) {
await sdk.tables.sqs.syncDefinition()
return search(options, table)
}
throw new Error(`Unable to search by SQL - ${msg}`, { cause: err }) throw new Error(`Unable to search by SQL - ${msg}`, { cause: err })
} }
} }

View File

@ -10,6 +10,7 @@ import {
import { import {
hasTypeChanged, hasTypeChanged,
TableSaveFunctions, TableSaveFunctions,
internalTableCleanup,
} from "../../../../api/controllers/table/utils" } from "../../../../api/controllers/table/utils"
import { EventType, updateLinks } from "../../../../db/linkedRows" import { EventType, updateLinks } from "../../../../db/linkedRows"
import { cloneDeep } from "lodash/fp" import { cloneDeep } from "lodash/fp"
@ -21,8 +22,6 @@ import { checkAutoColumns } from "./utils"
import * as viewsSdk from "../../views" import * as viewsSdk from "../../views"
import { getRowParams } from "../../../../db/utils" import { getRowParams } from "../../../../db/utils"
import { quotas } from "@budibase/pro" import { quotas } from "@budibase/pro"
import env from "../../../../environment"
import { AttachmentCleanup } from "../../../../utilities/rowProcessor"
export async function save( export async function save(
table: Table, table: Table,
@ -128,16 +127,20 @@ export async function destroy(table: Table) {
const db = context.getAppDB() const db = context.getAppDB()
const tableId = table._id! const tableId = table._id!
// Delete all rows for that table // Delete all rows for that table - we have to retrieve the full rows for
const rowsData = await db.allDocs( // attachment cleanup, this may be worth investigating if there is a better
// way - we could delete all rows without the `include_docs` which would be faster
const rows = (
await db.allDocs<Row>(
getRowParams(tableId, null, { getRowParams(tableId, null, {
include_docs: true, include_docs: true,
}) })
) )
await db.bulkDocs( ).rows.map(data => data.doc!)
rowsData.rows.map((row: any) => ({ ...row.doc, _deleted: true })) await db.bulkDocs(rows.map((row: Row) => ({ ...row, _deleted: true })))
)
await quotas.removeRows(rowsData.rows.length, { // remove rows from quota
await quotas.removeRows(rows.length, {
tableId, tableId,
}) })
@ -150,25 +153,8 @@ export async function destroy(table: Table) {
// don't remove the table itself until very end // don't remove the table itself until very end
await db.remove(tableId, table._rev) await db.remove(tableId, table._rev)
// remove table search index // final cleanup, attachments, indexes, SQS
if (!env.isTest() || env.COUCH_DB_URL) { await internalTableCleanup(table, rows)
const currentIndexes = await db.getIndexes()
const existingIndex = currentIndexes.indexes.find(
(existing: any) => existing.name === `search:${tableId}`
)
if (existingIndex) {
await db.deleteIndex(existingIndex)
}
}
// has to run after, make sure it has _id
await runStaticFormulaChecks(table, {
deletion: true,
})
await AttachmentCleanup.tableDelete(
table,
rowsData.rows.map((row: any) => row.doc)
)
return { table } return { table }
} }

View File

@ -15,7 +15,9 @@ import {
generateJunctionTableID, generateJunctionTableID,
} from "../../../../db/utils" } from "../../../../db/utils"
const BASIC_SQLITE_DOC: SQLiteDefinition = { type PreSaveSQLiteDefinition = Omit<SQLiteDefinition, "_rev">
const BASIC_SQLITE_DOC: PreSaveSQLiteDefinition = {
_id: SQLITE_DESIGN_DOC_ID, _id: SQLITE_DESIGN_DOC_ID,
language: "sqlite", language: "sqlite",
sql: { sql: {
@ -103,7 +105,7 @@ function mapTable(table: Table): SQLiteTables {
} }
// nothing exists, need to iterate though existing tables // nothing exists, need to iterate though existing tables
async function buildBaseDefinition(): Promise<SQLiteDefinition> { async function buildBaseDefinition(): Promise<PreSaveSQLiteDefinition> {
const tables = await tablesSdk.getAllInternalTables() const tables = await tablesSdk.getAllInternalTables()
const definition = cloneDeep(BASIC_SQLITE_DOC) const definition = cloneDeep(BASIC_SQLITE_DOC)
for (let table of tables) { for (let table of tables) {
@ -115,11 +117,17 @@ async function buildBaseDefinition(): Promise<SQLiteDefinition> {
return definition return definition
} }
export async function addTableToSqlite(table: Table) { export async function syncDefinition(): Promise<void> {
const db = context.getAppDB() const db = context.getAppDB()
let definition: SQLiteDefinition const definition = await buildBaseDefinition()
await db.put(definition)
}
export async function addTable(table: Table) {
const db = context.getAppDB()
let definition: PreSaveSQLiteDefinition | SQLiteDefinition
try { try {
definition = await db.get(SQLITE_DESIGN_DOC_ID) definition = await db.get<SQLiteDefinition>(SQLITE_DESIGN_DOC_ID)
} catch (err) { } catch (err) {
definition = await buildBaseDefinition() definition = await buildBaseDefinition()
} }
@ -129,3 +137,22 @@ export async function addTableToSqlite(table: Table) {
} }
await db.put(definition) await db.put(definition)
} }
export async function removeTable(table: Table) {
const db = context.getAppDB()
try {
const definition = await db.get<SQLiteDefinition>(SQLITE_DESIGN_DOC_ID)
if (definition.sql?.tables?.[table._id!]) {
delete definition.sql.tables[table._id!]
await db.put(definition)
// make sure SQS is cleaned up, tables removed
await db.sqlDiskCleanup()
}
} catch (err: any) {
if (err?.status === 404) {
return
} else {
throw err
}
}
}

View File

@ -20,6 +20,7 @@ export type SQLiteTables = Record<
export interface SQLiteDefinition { export interface SQLiteDefinition {
_id: string _id: string
_rev: string
language: string language: string
sql: { sql: {
tables: SQLiteTables tables: SQLiteTables

View File

@ -135,10 +135,8 @@ export interface Database {
ids: string[], ids: string[],
opts?: { allowMissing?: boolean } opts?: { allowMissing?: boolean }
): Promise<T[]> ): Promise<T[]>
remove( remove(idOrDoc: Document): Promise<Nano.DocumentDestroyResponse>
id: string | Document, remove(idOrDoc: string, rev?: string): Promise<Nano.DocumentDestroyResponse>
rev?: string
): Promise<Nano.DocumentDestroyResponse>
put( put(
document: AnyDocument, document: AnyDocument,
opts?: DatabasePutOpts opts?: DatabasePutOpts
@ -148,6 +146,8 @@ export interface Database {
sql: string, sql: string,
parameters?: SqlQueryBinding parameters?: SqlQueryBinding
): Promise<T[]> ): Promise<T[]>
sqlPurgeDocument(docIds: string[] | string): Promise<void>
sqlDiskCleanup(): Promise<void>
allDocs<T extends Document | RowValue>( allDocs<T extends Document | RowValue>(
params: DatabaseQueryOpts params: DatabaseQueryOpts
): Promise<AllDocsResponse<T>> ): Promise<AllDocsResponse<T>>