From 335c1746439e12c404d1dd12f6126aa1bcba744a Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Jun 2024 16:39:35 +0100 Subject: [PATCH 1/4] Create a failing test. --- .../server/src/api/routes/tests/row.spec.ts | 119 +++++++++++++++++- 1 file changed, 115 insertions(+), 4 deletions(-) diff --git a/packages/server/src/api/routes/tests/row.spec.ts b/packages/server/src/api/routes/tests/row.spec.ts index e5599c6d81..ba935fc6ae 100644 --- a/packages/server/src/api/routes/tests/row.spec.ts +++ b/packages/server/src/api/routes/tests/row.spec.ts @@ -59,9 +59,9 @@ async function waitForEvent( describe.each([ ["internal", undefined], [DatabaseName.POSTGRES, getDatasource(DatabaseName.POSTGRES)], - [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], - [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], - [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], + // [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], + // [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], + // [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], ])("/rows (%s)", (providerType, dsProvider) => { const isInternal = dsProvider === undefined const config = setup.getConfig() @@ -929,7 +929,7 @@ describe.each([ }) }) - describe("bulkImport", () => { + describe.only("bulkImport", () => { isInternal && it("should update Auto ID field after bulk import", async () => { const table = await config.api.table.save( @@ -960,6 +960,117 @@ describe.each([ row = await config.api.row.save(table._id!, {}) expect(row.autoId).toEqual(3) }) + + it("should be able to bulkImport rows", async () => { + const table = await config.api.table.save( + saveTableRequest({ + schema: { + name: { + type: FieldType.STRING, + name: "name", + }, + description: { + type: FieldType.STRING, + name: "description", + }, + }, + }) + ) + + const rowUsage = await getRowUsage() + + await config.api.row.bulkImport(table._id!, { + rows: [ + { + name: "Row 1", + description: "Row 1 description", + }, + { + name: "Row 2", + description: "Row 2 description", + }, + ], + }) + + const rows = await config.api.row.fetch(table._id!) + expect(rows.length).toEqual(2) + + rows.sort((a, b) => a.name.localeCompare(b.name)) + expect(rows[0].name).toEqual("Row 1") + expect(rows[0].description).toEqual("Row 1 description") + expect(rows[1].name).toEqual("Row 2") + expect(rows[1].description).toEqual("Row 2 description") + + await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage) + }) + + it.only("should be able to update existing rows with bulkImport", async () => { + const table = await config.api.table.save( + saveTableRequest({ + schema: { + userId: { + type: FieldType.NUMBER, + name: "userId", + constraints: { + presence: true, + }, + }, + name: { + type: FieldType.STRING, + name: "name", + }, + description: { + type: FieldType.STRING, + name: "description", + }, + }, + }) + ) + + const row1 = await config.api.row.save(table._id!, { + userId: 1, + name: "Row 1", + description: "Row 1 description", + }) + + const row2 = await config.api.row.save(table._id!, { + userId: 2, + name: "Row 2", + description: "Row 2 description", + }) + + await config.api.row.bulkImport(table._id!, { + identifierFields: ["userId"], + rows: [ + { + userId: row1.userId, + name: "Row 1 updated", + description: "Row 1 description updated", + }, + { + userId: row2.userId, + name: "Row 2 updated", + description: "Row 2 description updated", + }, + { + userId: 3, + name: "Row 3", + description: "Row 3 description", + }, + ], + }) + + const rows = await config.api.row.fetch(table._id!) + expect(rows.length).toEqual(3) + + rows.sort((a, b) => a.name.localeCompare(b.name)) + expect(rows[0].name).toEqual("Row 1 updated") + expect(rows[0].description).toEqual("Row 1 description updated") + expect(rows[1].name).toEqual("Row 2 updated") + expect(rows[1].description).toEqual("Row 2 description updated") + expect(rows[2].name).toEqual("Row 3") + expect(rows[2].description).toEqual("Row 3 description") + }) }) describe("enrich", () => { From 5ac8a7d514c2574b4481fcc8de80f410277ac208 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Jun 2024 17:43:25 +0100 Subject: [PATCH 2/4] bulkImport upsert working everywhere excpet mssql --- packages/backend-core/src/sql/sql.ts | 24 +++++++++++ .../src/api/controllers/table/external.ts | 2 +- .../server/src/api/routes/tests/row.spec.ts | 40 ++++++++++++------- packages/types/src/sdk/datasources.ts | 1 + 4 files changed, 51 insertions(+), 16 deletions(-) diff --git a/packages/backend-core/src/sql/sql.ts b/packages/backend-core/src/sql/sql.ts index 9bc2092b83..c4f0d7015f 100644 --- a/packages/backend-core/src/sql/sql.ts +++ b/packages/backend-core/src/sql/sql.ts @@ -571,6 +571,27 @@ class InternalBuilder { return query.insert(parsedBody) } + bulkUpsert(knex: Knex, json: QueryJson): Knex.QueryBuilder { + const { endpoint, body } = json + let query = this.knexWithAlias(knex, endpoint) + if (!Array.isArray(body)) { + return query + } + const parsedBody = body.map(row => parseBody(row)) + if ( + this.client === SqlClient.POSTGRES || + this.client === SqlClient.SQL_LITE || + this.client === SqlClient.MY_SQL + ) { + const primary = json.meta.table.primary + if (!primary) { + throw new Error("Primary key is required for upsert") + } + return query.insert(parsedBody).onConflict(primary).merge() + } + return query.upsert(parsedBody) + } + read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder { let { endpoint, resource, filters, paginate, relationships, tableAliases } = json @@ -708,6 +729,9 @@ class SqlQueryBuilder extends SqlTableQueryBuilder { case Operation.BULK_CREATE: query = builder.bulkCreate(client, json) break + case Operation.BULK_UPSERT: + query = builder.bulkUpsert(client, json) + break case Operation.CREATE_TABLE: case Operation.UPDATE_TABLE: case Operation.DELETE_TABLE: diff --git a/packages/server/src/api/controllers/table/external.ts b/packages/server/src/api/controllers/table/external.ts index bd674d7d38..f1b186c233 100644 --- a/packages/server/src/api/controllers/table/external.ts +++ b/packages/server/src/api/controllers/table/external.ts @@ -98,7 +98,7 @@ export async function bulkImport( table = processed.table } - await handleRequest(Operation.BULK_CREATE, table._id!, { + await handleRequest(Operation.BULK_UPSERT, table._id!, { rows: parsedRows, }) await events.rows.imported(table, parsedRows.length) diff --git a/packages/server/src/api/routes/tests/row.spec.ts b/packages/server/src/api/routes/tests/row.spec.ts index ba935fc6ae..09b1f0fe01 100644 --- a/packages/server/src/api/routes/tests/row.spec.ts +++ b/packages/server/src/api/routes/tests/row.spec.ts @@ -26,6 +26,7 @@ import { Table, TableSourceType, UpdatedRowEventEmitter, + TableSchema, } from "@budibase/types" import { generator, mocks } from "@budibase/backend-core/tests" import _, { merge } from "lodash" @@ -59,9 +60,9 @@ async function waitForEvent( describe.each([ ["internal", undefined], [DatabaseName.POSTGRES, getDatasource(DatabaseName.POSTGRES)], - // [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], - // [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], - // [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], + [DatabaseName.MYSQL, getDatasource(DatabaseName.MYSQL)], + [DatabaseName.SQL_SERVER, getDatasource(DatabaseName.SQL_SERVER)], + [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], ])("/rows (%s)", (providerType, dsProvider) => { const isInternal = dsProvider === undefined const config = setup.getConfig() @@ -88,6 +89,23 @@ describe.each([ // the table name they're writing to. ...overrides: Partial>[] ): SaveTableRequest { + const defaultSchema: TableSchema = { + id: { + type: FieldType.AUTO, + name: "id", + autocolumn: true, + constraints: { + presence: true, + }, + }, + } + + for (const override of overrides) { + if (override.primary) { + delete defaultSchema.id + } + } + const req: SaveTableRequest = { name: uuid.v4().substring(0, 10), type: "table", @@ -96,16 +114,7 @@ describe.each([ : TableSourceType.INTERNAL, sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID, primary: ["id"], - schema: { - id: { - type: FieldType.AUTO, - name: "id", - autocolumn: true, - constraints: { - presence: true, - }, - }, - }, + schema: defaultSchema, } return merge(req, ...overrides) } @@ -929,7 +938,7 @@ describe.each([ }) }) - describe.only("bulkImport", () => { + describe("bulkImport", () => { isInternal && it("should update Auto ID field after bulk import", async () => { const table = await config.api.table.save( @@ -1004,9 +1013,10 @@ describe.each([ await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage) }) - it.only("should be able to update existing rows with bulkImport", async () => { + it("should be able to update existing rows with bulkImport", async () => { const table = await config.api.table.save( saveTableRequest({ + primary: ["userId"], schema: { userId: { type: FieldType.NUMBER, diff --git a/packages/types/src/sdk/datasources.ts b/packages/types/src/sdk/datasources.ts index 3a788cbac6..da10bf26c6 100644 --- a/packages/types/src/sdk/datasources.ts +++ b/packages/types/src/sdk/datasources.ts @@ -9,6 +9,7 @@ export enum Operation { UPDATE = "UPDATE", DELETE = "DELETE", BULK_CREATE = "BULK_CREATE", + BULK_UPSERT = "BULK_UPSERT", CREATE_TABLE = "CREATE_TABLE", UPDATE_TABLE = "UPDATE_TABLE", DELETE_TABLE = "DELETE_TABLE", From e288fc87956f5767e2f522c5fd5553809ca604e9 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 18 Jun 2024 18:02:20 +0100 Subject: [PATCH 3/4] Disable upserting for MSSQL in bulkImport for now. --- packages/backend-core/src/sql/sql.ts | 4 + .../server/src/api/routes/tests/row.spec.ts | 130 +++++++++--------- 2 files changed, 71 insertions(+), 63 deletions(-) diff --git a/packages/backend-core/src/sql/sql.ts b/packages/backend-core/src/sql/sql.ts index c4f0d7015f..3224fc043e 100644 --- a/packages/backend-core/src/sql/sql.ts +++ b/packages/backend-core/src/sql/sql.ts @@ -588,6 +588,10 @@ class InternalBuilder { throw new Error("Primary key is required for upsert") } return query.insert(parsedBody).onConflict(primary).merge() + } else if (this.client === SqlClient.MS_SQL) { + // No upsert or onConflict support in MSSQL yet, see: + // https://github.com/knex/knex/pull/6050 + return query.insert(parsedBody) } return query.upsert(parsedBody) } diff --git a/packages/server/src/api/routes/tests/row.spec.ts b/packages/server/src/api/routes/tests/row.spec.ts index 09b1f0fe01..4d1b4f028b 100644 --- a/packages/server/src/api/routes/tests/row.spec.ts +++ b/packages/server/src/api/routes/tests/row.spec.ts @@ -65,6 +65,7 @@ describe.each([ [DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)], ])("/rows (%s)", (providerType, dsProvider) => { const isInternal = dsProvider === undefined + const isMSSQL = providerType === DatabaseName.SQL_SERVER const config = setup.getConfig() let table: Table @@ -1013,74 +1014,77 @@ describe.each([ await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage) }) - it("should be able to update existing rows with bulkImport", async () => { - const table = await config.api.table.save( - saveTableRequest({ - primary: ["userId"], - schema: { - userId: { - type: FieldType.NUMBER, - name: "userId", - constraints: { - presence: true, + // Upserting isn't yet supported in MSSQL, see: + // https://github.com/knex/knex/pull/6050 + !isMSSQL && + it("should be able to update existing rows with bulkImport", async () => { + const table = await config.api.table.save( + saveTableRequest({ + primary: ["userId"], + schema: { + userId: { + type: FieldType.NUMBER, + name: "userId", + constraints: { + presence: true, + }, + }, + name: { + type: FieldType.STRING, + name: "name", + }, + description: { + type: FieldType.STRING, + name: "description", }, }, - name: { - type: FieldType.STRING, - name: "name", - }, - description: { - type: FieldType.STRING, - name: "description", - }, - }, + }) + ) + + const row1 = await config.api.row.save(table._id!, { + userId: 1, + name: "Row 1", + description: "Row 1 description", }) - ) - const row1 = await config.api.row.save(table._id!, { - userId: 1, - name: "Row 1", - description: "Row 1 description", + const row2 = await config.api.row.save(table._id!, { + userId: 2, + name: "Row 2", + description: "Row 2 description", + }) + + await config.api.row.bulkImport(table._id!, { + identifierFields: ["userId"], + rows: [ + { + userId: row1.userId, + name: "Row 1 updated", + description: "Row 1 description updated", + }, + { + userId: row2.userId, + name: "Row 2 updated", + description: "Row 2 description updated", + }, + { + userId: 3, + name: "Row 3", + description: "Row 3 description", + }, + ], + }) + + const rows = await config.api.row.fetch(table._id!) + expect(rows.length).toEqual(3) + + rows.sort((a, b) => a.name.localeCompare(b.name)) + expect(rows[0].name).toEqual("Row 1 updated") + expect(rows[0].description).toEqual("Row 1 description updated") + expect(rows[1].name).toEqual("Row 2 updated") + expect(rows[1].description).toEqual("Row 2 description updated") + expect(rows[2].name).toEqual("Row 3") + expect(rows[2].description).toEqual("Row 3 description") }) - - const row2 = await config.api.row.save(table._id!, { - userId: 2, - name: "Row 2", - description: "Row 2 description", - }) - - await config.api.row.bulkImport(table._id!, { - identifierFields: ["userId"], - rows: [ - { - userId: row1.userId, - name: "Row 1 updated", - description: "Row 1 description updated", - }, - { - userId: row2.userId, - name: "Row 2 updated", - description: "Row 2 description updated", - }, - { - userId: 3, - name: "Row 3", - description: "Row 3 description", - }, - ], - }) - - const rows = await config.api.row.fetch(table._id!) - expect(rows.length).toEqual(3) - - rows.sort((a, b) => a.name.localeCompare(b.name)) - expect(rows[0].name).toEqual("Row 1 updated") - expect(rows[0].description).toEqual("Row 1 description updated") - expect(rows[1].name).toEqual("Row 2 updated") - expect(rows[1].description).toEqual("Row 2 description updated") - expect(rows[2].name).toEqual("Row 3") - expect(rows[2].description).toEqual("Row 3 description") - }) }) describe("enrich", () => { From 9866aabd398c06ef0893be7b8a7dcebd12671f24 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Wed, 19 Jun 2024 10:49:39 +0100 Subject: [PATCH 4/4] Add bulk upsert to row operations list. --- packages/types/src/sdk/datasources.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/types/src/sdk/datasources.ts b/packages/types/src/sdk/datasources.ts index da10bf26c6..23caa2c618 100644 --- a/packages/types/src/sdk/datasources.ts +++ b/packages/types/src/sdk/datasources.ts @@ -21,6 +21,7 @@ export const RowOperations = [ Operation.UPDATE, Operation.DELETE, Operation.BULK_CREATE, + Operation.BULK_UPSERT, ] export enum QueryType {