diff --git a/packages/backend-core/src/sql/sql.ts b/packages/backend-core/src/sql/sql.ts index 9bc2092b83..3224fc043e 100644 --- a/packages/backend-core/src/sql/sql.ts +++ b/packages/backend-core/src/sql/sql.ts @@ -571,6 +571,31 @@ class InternalBuilder { return query.insert(parsedBody) } + bulkUpsert(knex: Knex, json: QueryJson): Knex.QueryBuilder { + const { endpoint, body } = json + let query = this.knexWithAlias(knex, endpoint) + if (!Array.isArray(body)) { + return query + } + const parsedBody = body.map(row => parseBody(row)) + if ( + this.client === SqlClient.POSTGRES || + this.client === SqlClient.SQL_LITE || + this.client === SqlClient.MY_SQL + ) { + const primary = json.meta.table.primary + if (!primary) { + throw new Error("Primary key is required for upsert") + } + return query.insert(parsedBody).onConflict(primary).merge() + } else if (this.client === SqlClient.MS_SQL) { + // No upsert or onConflict support in MSSQL yet, see: + // https://github.com/knex/knex/pull/6050 + return query.insert(parsedBody) + } + return query.upsert(parsedBody) + } + read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder { let { endpoint, resource, filters, paginate, relationships, tableAliases } = json @@ -708,6 +733,9 @@ class SqlQueryBuilder extends SqlTableQueryBuilder { case Operation.BULK_CREATE: query = builder.bulkCreate(client, json) break + case Operation.BULK_UPSERT: + query = builder.bulkUpsert(client, json) + break case Operation.CREATE_TABLE: case Operation.UPDATE_TABLE: case Operation.DELETE_TABLE: diff --git a/packages/server/src/api/controllers/table/external.ts b/packages/server/src/api/controllers/table/external.ts index bd674d7d38..f1b186c233 100644 --- a/packages/server/src/api/controllers/table/external.ts +++ b/packages/server/src/api/controllers/table/external.ts @@ -98,7 +98,7 @@ export async function bulkImport( table = processed.table } - await handleRequest(Operation.BULK_CREATE, table._id!, { + await handleRequest(Operation.BULK_UPSERT, table._id!, { rows: parsedRows, }) await events.rows.imported(table, parsedRows.length) diff --git a/packages/server/src/api/routes/tests/row.spec.ts b/packages/server/src/api/routes/tests/row.spec.ts index e5599c6d81..4d1b4f028b 100644 --- a/packages/server/src/api/routes/tests/row.spec.ts +++ b/packages/server/src/api/routes/tests/row.spec.ts @@ -26,6 +26,7 @@ import { Table, TableSourceType, UpdatedRowEventEmitter, + TableSchema, } from "@budibase/types" import { generator, mocks } from "@budibase/backend-core/tests" import _, { merge } from "lodash" @@ -64,6 +65,7 @@ describe.each([ [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], ])("/rows (%s)", (providerType, dsProvider) => { const isInternal = dsProvider === undefined + const isMSSQL = providerType === DatabaseName.SQL_SERVER const config = setup.getConfig() let table: Table @@ -88,6 +90,23 @@ describe.each([ // the table name they're writing to. ...overrides: Partial>[] ): SaveTableRequest { + const defaultSchema: TableSchema = { + id: { + type: FieldType.AUTO, + name: "id", + autocolumn: true, + constraints: { + presence: true, + }, + }, + } + + for (const override of overrides) { + if (override.primary) { + delete defaultSchema.id + } + } + const req: SaveTableRequest = { name: uuid.v4().substring(0, 10), type: "table", @@ -96,16 +115,7 @@ describe.each([ : TableSourceType.INTERNAL, sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID, primary: ["id"], - schema: { - id: { - type: FieldType.AUTO, - name: "id", - autocolumn: true, - constraints: { - presence: true, - }, - }, - }, + schema: defaultSchema, } return merge(req, ...overrides) } @@ -960,6 +970,121 @@ describe.each([ row = await config.api.row.save(table._id!, {}) expect(row.autoId).toEqual(3) }) + + it("should be able to bulkImport rows", async () => { + const table = await config.api.table.save( + saveTableRequest({ + schema: { + name: { + type: FieldType.STRING, + name: "name", + }, + description: { + type: FieldType.STRING, + name: "description", + }, + }, + }) + ) + + const rowUsage = await getRowUsage() + + await config.api.row.bulkImport(table._id!, { + rows: [ + { + name: "Row 1", + description: "Row 1 description", + }, + { + name: "Row 2", + description: "Row 2 description", + }, + ], + }) + + const rows = await config.api.row.fetch(table._id!) + expect(rows.length).toEqual(2) + + rows.sort((a, b) => a.name.localeCompare(b.name)) + expect(rows[0].name).toEqual("Row 1") + expect(rows[0].description).toEqual("Row 1 description") + expect(rows[1].name).toEqual("Row 2") + expect(rows[1].description).toEqual("Row 2 description") + + await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage) + }) + + // Upserting isn't yet supported in MSSQL, see: + // https://github.com/knex/knex/pull/6050 + !isMSSQL && + it("should be able to update existing rows with bulkImport", async () => { + const table = await config.api.table.save( + saveTableRequest({ + primary: ["userId"], + schema: { + userId: { + type: FieldType.NUMBER, + name: "userId", + constraints: { + presence: true, + }, + }, + name: { + type: FieldType.STRING, + name: "name", + }, + description: { + type: FieldType.STRING, + name: "description", + }, + }, + }) + ) + + const row1 = await config.api.row.save(table._id!, { + userId: 1, + name: "Row 1", + description: "Row 1 description", + }) + + const row2 = await config.api.row.save(table._id!, { + userId: 2, + name: "Row 2", + description: "Row 2 description", + }) + + await config.api.row.bulkImport(table._id!, { + identifierFields: ["userId"], + rows: [ + { + userId: row1.userId, + name: "Row 1 updated", + description: "Row 1 description updated", + }, + { + userId: row2.userId, + name: "Row 2 updated", + description: "Row 2 description updated", + }, + { + userId: 3, + name: "Row 3", + description: "Row 3 description", + }, + ], + }) + + const rows = await config.api.row.fetch(table._id!) + expect(rows.length).toEqual(3) + + rows.sort((a, b) => a.name.localeCompare(b.name)) + expect(rows[0].name).toEqual("Row 1 updated") + expect(rows[0].description).toEqual("Row 1 description updated") + expect(rows[1].name).toEqual("Row 2 updated") + expect(rows[1].description).toEqual("Row 2 description updated") + expect(rows[2].name).toEqual("Row 3") + expect(rows[2].description).toEqual("Row 3 description") + }) }) describe("enrich", () => { diff --git a/packages/types/src/sdk/datasources.ts b/packages/types/src/sdk/datasources.ts index 3a788cbac6..23caa2c618 100644 --- a/packages/types/src/sdk/datasources.ts +++ b/packages/types/src/sdk/datasources.ts @@ -9,6 +9,7 @@ export enum Operation { UPDATE = "UPDATE", DELETE = "DELETE", BULK_CREATE = "BULK_CREATE", + BULK_UPSERT = "BULK_UPSERT", CREATE_TABLE = "CREATE_TABLE", UPDATE_TABLE = "UPDATE_TABLE", DELETE_TABLE = "DELETE_TABLE", @@ -20,6 +21,7 @@ export const RowOperations = [ Operation.UPDATE, Operation.DELETE, Operation.BULK_CREATE, + Operation.BULK_UPSERT, ] export enum QueryType {