Merge pull request #13968 from Budibase/budi-8220-support-updating-existing-rows-for-external-databases-using
Support external datasources in the bulkImport endpoint.
This commit is contained in:
commit
f9d46de7e1
|
@ -571,6 +571,31 @@ class InternalBuilder {
|
||||||
return query.insert(parsedBody)
|
return query.insert(parsedBody)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bulkUpsert(knex: Knex, json: QueryJson): Knex.QueryBuilder {
|
||||||
|
const { endpoint, body } = json
|
||||||
|
let query = this.knexWithAlias(knex, endpoint)
|
||||||
|
if (!Array.isArray(body)) {
|
||||||
|
return query
|
||||||
|
}
|
||||||
|
const parsedBody = body.map(row => parseBody(row))
|
||||||
|
if (
|
||||||
|
this.client === SqlClient.POSTGRES ||
|
||||||
|
this.client === SqlClient.SQL_LITE ||
|
||||||
|
this.client === SqlClient.MY_SQL
|
||||||
|
) {
|
||||||
|
const primary = json.meta.table.primary
|
||||||
|
if (!primary) {
|
||||||
|
throw new Error("Primary key is required for upsert")
|
||||||
|
}
|
||||||
|
return query.insert(parsedBody).onConflict(primary).merge()
|
||||||
|
} else if (this.client === SqlClient.MS_SQL) {
|
||||||
|
// No upsert or onConflict support in MSSQL yet, see:
|
||||||
|
// https://github.com/knex/knex/pull/6050
|
||||||
|
return query.insert(parsedBody)
|
||||||
|
}
|
||||||
|
return query.upsert(parsedBody)
|
||||||
|
}
|
||||||
|
|
||||||
read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder {
|
read(knex: Knex, json: QueryJson, limit: number): Knex.QueryBuilder {
|
||||||
let { endpoint, resource, filters, paginate, relationships, tableAliases } =
|
let { endpoint, resource, filters, paginate, relationships, tableAliases } =
|
||||||
json
|
json
|
||||||
|
@ -708,6 +733,9 @@ class SqlQueryBuilder extends SqlTableQueryBuilder {
|
||||||
case Operation.BULK_CREATE:
|
case Operation.BULK_CREATE:
|
||||||
query = builder.bulkCreate(client, json)
|
query = builder.bulkCreate(client, json)
|
||||||
break
|
break
|
||||||
|
case Operation.BULK_UPSERT:
|
||||||
|
query = builder.bulkUpsert(client, json)
|
||||||
|
break
|
||||||
case Operation.CREATE_TABLE:
|
case Operation.CREATE_TABLE:
|
||||||
case Operation.UPDATE_TABLE:
|
case Operation.UPDATE_TABLE:
|
||||||
case Operation.DELETE_TABLE:
|
case Operation.DELETE_TABLE:
|
||||||
|
|
|
@ -98,7 +98,7 @@ export async function bulkImport(
|
||||||
table = processed.table
|
table = processed.table
|
||||||
}
|
}
|
||||||
|
|
||||||
await handleRequest(Operation.BULK_CREATE, table._id!, {
|
await handleRequest(Operation.BULK_UPSERT, table._id!, {
|
||||||
rows: parsedRows,
|
rows: parsedRows,
|
||||||
})
|
})
|
||||||
await events.rows.imported(table, parsedRows.length)
|
await events.rows.imported(table, parsedRows.length)
|
||||||
|
|
|
@ -26,6 +26,7 @@ import {
|
||||||
Table,
|
Table,
|
||||||
TableSourceType,
|
TableSourceType,
|
||||||
UpdatedRowEventEmitter,
|
UpdatedRowEventEmitter,
|
||||||
|
TableSchema,
|
||||||
} from "@budibase/types"
|
} from "@budibase/types"
|
||||||
import { generator, mocks } from "@budibase/backend-core/tests"
|
import { generator, mocks } from "@budibase/backend-core/tests"
|
||||||
import _, { merge } from "lodash"
|
import _, { merge } from "lodash"
|
||||||
|
@ -64,6 +65,7 @@ describe.each([
|
||||||
[DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)],
|
[DatabaseName.MARIADB, getDatasource(DatabaseName.MARIADB)],
|
||||||
])("/rows (%s)", (providerType, dsProvider) => {
|
])("/rows (%s)", (providerType, dsProvider) => {
|
||||||
const isInternal = dsProvider === undefined
|
const isInternal = dsProvider === undefined
|
||||||
|
const isMSSQL = providerType === DatabaseName.SQL_SERVER
|
||||||
const config = setup.getConfig()
|
const config = setup.getConfig()
|
||||||
|
|
||||||
let table: Table
|
let table: Table
|
||||||
|
@ -88,6 +90,23 @@ describe.each([
|
||||||
// the table name they're writing to.
|
// the table name they're writing to.
|
||||||
...overrides: Partial<Omit<SaveTableRequest, "name">>[]
|
...overrides: Partial<Omit<SaveTableRequest, "name">>[]
|
||||||
): SaveTableRequest {
|
): SaveTableRequest {
|
||||||
|
const defaultSchema: TableSchema = {
|
||||||
|
id: {
|
||||||
|
type: FieldType.AUTO,
|
||||||
|
name: "id",
|
||||||
|
autocolumn: true,
|
||||||
|
constraints: {
|
||||||
|
presence: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const override of overrides) {
|
||||||
|
if (override.primary) {
|
||||||
|
delete defaultSchema.id
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const req: SaveTableRequest = {
|
const req: SaveTableRequest = {
|
||||||
name: uuid.v4().substring(0, 10),
|
name: uuid.v4().substring(0, 10),
|
||||||
type: "table",
|
type: "table",
|
||||||
|
@ -96,16 +115,7 @@ describe.each([
|
||||||
: TableSourceType.INTERNAL,
|
: TableSourceType.INTERNAL,
|
||||||
sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID,
|
sourceId: datasource ? datasource._id! : INTERNAL_TABLE_SOURCE_ID,
|
||||||
primary: ["id"],
|
primary: ["id"],
|
||||||
schema: {
|
schema: defaultSchema,
|
||||||
id: {
|
|
||||||
type: FieldType.AUTO,
|
|
||||||
name: "id",
|
|
||||||
autocolumn: true,
|
|
||||||
constraints: {
|
|
||||||
presence: true,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
return merge(req, ...overrides)
|
return merge(req, ...overrides)
|
||||||
}
|
}
|
||||||
|
@ -960,6 +970,121 @@ describe.each([
|
||||||
row = await config.api.row.save(table._id!, {})
|
row = await config.api.row.save(table._id!, {})
|
||||||
expect(row.autoId).toEqual(3)
|
expect(row.autoId).toEqual(3)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it("should be able to bulkImport rows", async () => {
|
||||||
|
const table = await config.api.table.save(
|
||||||
|
saveTableRequest({
|
||||||
|
schema: {
|
||||||
|
name: {
|
||||||
|
type: FieldType.STRING,
|
||||||
|
name: "name",
|
||||||
|
},
|
||||||
|
description: {
|
||||||
|
type: FieldType.STRING,
|
||||||
|
name: "description",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
const rowUsage = await getRowUsage()
|
||||||
|
|
||||||
|
await config.api.row.bulkImport(table._id!, {
|
||||||
|
rows: [
|
||||||
|
{
|
||||||
|
name: "Row 1",
|
||||||
|
description: "Row 1 description",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Row 2",
|
||||||
|
description: "Row 2 description",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
const rows = await config.api.row.fetch(table._id!)
|
||||||
|
expect(rows.length).toEqual(2)
|
||||||
|
|
||||||
|
rows.sort((a, b) => a.name.localeCompare(b.name))
|
||||||
|
expect(rows[0].name).toEqual("Row 1")
|
||||||
|
expect(rows[0].description).toEqual("Row 1 description")
|
||||||
|
expect(rows[1].name).toEqual("Row 2")
|
||||||
|
expect(rows[1].description).toEqual("Row 2 description")
|
||||||
|
|
||||||
|
await assertRowUsage(isInternal ? rowUsage + 2 : rowUsage)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Upserting isn't yet supported in MSSQL, see:
|
||||||
|
// https://github.com/knex/knex/pull/6050
|
||||||
|
!isMSSQL &&
|
||||||
|
it("should be able to update existing rows with bulkImport", async () => {
|
||||||
|
const table = await config.api.table.save(
|
||||||
|
saveTableRequest({
|
||||||
|
primary: ["userId"],
|
||||||
|
schema: {
|
||||||
|
userId: {
|
||||||
|
type: FieldType.NUMBER,
|
||||||
|
name: "userId",
|
||||||
|
constraints: {
|
||||||
|
presence: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
name: {
|
||||||
|
type: FieldType.STRING,
|
||||||
|
name: "name",
|
||||||
|
},
|
||||||
|
description: {
|
||||||
|
type: FieldType.STRING,
|
||||||
|
name: "description",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
)
|
||||||
|
|
||||||
|
const row1 = await config.api.row.save(table._id!, {
|
||||||
|
userId: 1,
|
||||||
|
name: "Row 1",
|
||||||
|
description: "Row 1 description",
|
||||||
|
})
|
||||||
|
|
||||||
|
const row2 = await config.api.row.save(table._id!, {
|
||||||
|
userId: 2,
|
||||||
|
name: "Row 2",
|
||||||
|
description: "Row 2 description",
|
||||||
|
})
|
||||||
|
|
||||||
|
await config.api.row.bulkImport(table._id!, {
|
||||||
|
identifierFields: ["userId"],
|
||||||
|
rows: [
|
||||||
|
{
|
||||||
|
userId: row1.userId,
|
||||||
|
name: "Row 1 updated",
|
||||||
|
description: "Row 1 description updated",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
userId: row2.userId,
|
||||||
|
name: "Row 2 updated",
|
||||||
|
description: "Row 2 description updated",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
userId: 3,
|
||||||
|
name: "Row 3",
|
||||||
|
description: "Row 3 description",
|
||||||
|
},
|
||||||
|
],
|
||||||
|
})
|
||||||
|
|
||||||
|
const rows = await config.api.row.fetch(table._id!)
|
||||||
|
expect(rows.length).toEqual(3)
|
||||||
|
|
||||||
|
rows.sort((a, b) => a.name.localeCompare(b.name))
|
||||||
|
expect(rows[0].name).toEqual("Row 1 updated")
|
||||||
|
expect(rows[0].description).toEqual("Row 1 description updated")
|
||||||
|
expect(rows[1].name).toEqual("Row 2 updated")
|
||||||
|
expect(rows[1].description).toEqual("Row 2 description updated")
|
||||||
|
expect(rows[2].name).toEqual("Row 3")
|
||||||
|
expect(rows[2].description).toEqual("Row 3 description")
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
describe("enrich", () => {
|
describe("enrich", () => {
|
||||||
|
|
|
@ -9,6 +9,7 @@ export enum Operation {
|
||||||
UPDATE = "UPDATE",
|
UPDATE = "UPDATE",
|
||||||
DELETE = "DELETE",
|
DELETE = "DELETE",
|
||||||
BULK_CREATE = "BULK_CREATE",
|
BULK_CREATE = "BULK_CREATE",
|
||||||
|
BULK_UPSERT = "BULK_UPSERT",
|
||||||
CREATE_TABLE = "CREATE_TABLE",
|
CREATE_TABLE = "CREATE_TABLE",
|
||||||
UPDATE_TABLE = "UPDATE_TABLE",
|
UPDATE_TABLE = "UPDATE_TABLE",
|
||||||
DELETE_TABLE = "DELETE_TABLE",
|
DELETE_TABLE = "DELETE_TABLE",
|
||||||
|
@ -20,6 +21,7 @@ export const RowOperations = [
|
||||||
Operation.UPDATE,
|
Operation.UPDATE,
|
||||||
Operation.DELETE,
|
Operation.DELETE,
|
||||||
Operation.BULK_CREATE,
|
Operation.BULK_CREATE,
|
||||||
|
Operation.BULK_UPSERT,
|
||||||
]
|
]
|
||||||
|
|
||||||
export enum QueryType {
|
export enum QueryType {
|
||||||
|
|
Loading…
Reference in New Issue