consolidate postgres datas sources

This commit is contained in:
Martin McKeaveney 2021-06-16 15:45:57 +01:00
parent 2598af16bf
commit aabbbdecfe
8 changed files with 255 additions and 143 deletions

View File

@ -15,12 +15,22 @@
async function saveDatasource() { async function saveDatasource() {
try { try {
// Create datasource // Create datasource
await datasources.save(datasource, { refresh: true }) await datasources.save(datasource)
notifications.success(`Datasource ${name} saved successfully.`) notifications.success(`Datasource ${name} saved successfully.`)
unsaved = false unsaved = false
} catch (err) {
notifications.error(`Error saving datasource: ${err}`)
}
}
async function updateDatasourceSchema() {
try {
await datasources.updateSchema(datasource)
notifications.success(`Datasource ${name} schema saved successfully.`)
unsaved = false
await tables.fetch() await tables.fetch()
} catch (err) { } catch (err) {
notifications.error(`Error saving datasource: ${err}`) notifications.error(`Error updating datasource schema: ${err}`)
} }
} }
@ -71,7 +81,6 @@
on:change={setUnsaved} on:change={setUnsaved}
/> />
</div> </div>
{#if !integration.plus}
<Divider /> <Divider />
<div class="query-header"> <div class="query-header">
<Heading size="S">Queries</Heading> <Heading size="S">Queries</Heading>
@ -86,6 +95,8 @@
</div> </div>
{/each} {/each}
</div> </div>
{#if datasource.plus}
<Button cta on:click={updateDatasourceSchema}>Fetch Tables From Database</Button>
{/if} {/if}
</Layout> </Layout>
</section> </section>

View File

@ -28,14 +28,34 @@ export function createDatasourcesStore() {
update(state => ({ ...state, selected: datasourceId })) update(state => ({ ...state, selected: datasourceId }))
queries.update(state => ({ ...state, selected: null })) queries.update(state => ({ ...state, selected: null }))
}, },
save: async (datasource, opts = {}) => { updateSchema: async (datasource) => {
let url = "/api/datasources" let url = `/api/datasources/${datasource._id}/schema`
if (datasource.plus && opts.refresh) { const response = await api.post(url)
// Pull the latest tables from the datasource const json = await response.json()
url += "?refresh=1"
if (response.status !== 200) {
throw new Error(json.message)
} }
update(state => {
const currentIdx = state.list.findIndex(ds => ds._id === json._id)
const sources = state.list
if (currentIdx >= 0) {
sources.splice(currentIdx, 1, json)
} else {
sources.push(json)
}
return { list: sources, selected: json._id }
})
return json
},
save: async (datasource) => {
let url = "/api/datasources"
const response = await api.post(url, datasource) const response = await api.post(url, datasource)
const json = await response.json() const json = await response.json()

View File

@ -8,7 +8,6 @@ const {
getTableParams, getTableParams,
} = require("../../db/utils") } = require("../../db/utils")
const { integrations } = require("../../integrations") const { integrations } = require("../../integrations")
const plusIntegrations = require("../../integrations/plus")
const { makeExternalQuery } = require("./row/utils") const { makeExternalQuery } = require("./row/utils")
exports.fetch = async function (ctx) { exports.fetch = async function (ctx) {
@ -40,6 +39,24 @@ exports.fetch = async function (ctx) {
ctx.body = [bbInternalDb, ...datasources] ctx.body = [bbInternalDb, ...datasources]
} }
exports.buildSchemaFromDb = async function (ctx) {
const db = new CouchDB(ctx.appId)
const datasourceId = ctx.params.datasourceId
const datasource = await db.get(datasourceId)
const Connector = integrations[datasource.source]
// Connect to the DB and build the schema
const connector = new Connector(datasource.config)
await connector.buildSchema(datasource._id)
datasource.entities = connector.tables
const response = await db.post(datasource)
datasource._rev = response.rev
ctx.body = datasource
}
exports.save = async function (ctx) { exports.save = async function (ctx) {
const db = new CouchDB(ctx.appId) const db = new CouchDB(ctx.appId)
const plus = ctx.request.body.plus const plus = ctx.request.body.plus
@ -50,16 +67,6 @@ exports.save = async function (ctx) {
...ctx.request.body, ...ctx.request.body,
} }
// update the schema
if (ctx.query.refresh) {
const PlusConnector = plusIntegrations[datasource.source].integration
const connector = new PlusConnector(ctx.request.body.config)
await connector.init(datasource._id)
datasource.entities = connector.tables
}
const response = await db.post(datasource) const response = await db.post(datasource)
datasource._rev = response.rev datasource._rev = response.rev

View File

@ -33,7 +33,7 @@ exports.fetch = async function (ctx) {
) )
const external = externalTables.rows.flatMap(row => { const external = externalTables.rows.flatMap(row => {
return Object.values(row.doc.entities).map(entity => ({ return Object.values(row.doc.entities || {}).map(entity => ({
...entity, ...entity,
sourceId: row.doc._id, sourceId: row.doc._id,
})) }))

View File

@ -72,6 +72,11 @@ router
generateQueryDatasourceSchema(), generateQueryDatasourceSchema(),
datasourceController.query datasourceController.query
) )
.post(
"/api/datasources/:datasourceId/schema",
authorized(BUILDER),
datasourceController.buildSchemaFromDb
)
.post( .post(
"/api/datasources", "/api/datasources",
authorized(BUILDER), authorized(BUILDER),

View File

@ -24,7 +24,6 @@ const DEFINITIONS = {
MYSQL: mysql.schema, MYSQL: mysql.schema,
ARANGODB: arangodb.schema, ARANGODB: arangodb.schema,
REST: rest.schema, REST: rest.schema,
POSTGRES_PLUS: postgresPlus.schema,
} }
const INTEGRATIONS = { const INTEGRATIONS = {
@ -39,7 +38,6 @@ const INTEGRATIONS = {
MYSQL: mysql.integration, MYSQL: mysql.integration,
ARANGODB: arangodb.integration, ARANGODB: arangodb.integration,
REST: rest.integration, REST: rest.integration,
POSTGRES_PLUS: postgresPlus.integration,
} }
module.exports = { module.exports = {

View File

@ -1,135 +1,135 @@
const Sql = require("../base/sql") // const Sql = require("../base/sql")
const { Pool } = require("pg") // const { Pool } = require("pg")
const { FieldTypes } = require("../../constants") // const { FieldTypes } = require("../../constants")
const { FIELD_TYPES } = require("../Integration") // const { FIELD_TYPES } = require("../Integration")
const { SEPARATOR } = require("@budibase/auth/db") // const { SEPARATOR } = require("@budibase/auth/db")
const TYPE_MAP = { // const TYPE_MAP = {
text: FieldTypes.LONGFORM, // text: FieldTypes.LONGFORM,
varchar: FieldTypes.STRING, // varchar: FieldTypes.STRING,
integer: FieldTypes.NUMBER, // integer: FieldTypes.NUMBER,
bigint: FieldTypes.NUMBER, // bigint: FieldTypes.NUMBER,
decimal: FieldTypes.NUMBER, // decimal: FieldTypes.NUMBER,
smallint: FieldTypes.NUMBER, // smallint: FieldTypes.NUMBER,
timestamp: FieldTypes.DATETIME, // timestamp: FieldTypes.DATETIME,
time: FieldTypes.DATETIME, // time: FieldTypes.DATETIME,
boolean: FieldTypes.BOOLEAN, // boolean: FieldTypes.BOOLEAN,
json: FIELD_TYPES.JSON, // json: FIELD_TYPES.JSON,
} // }
const SCHEMA = { // const SCHEMA = {
friendlyName: "PostgreSQL", // friendlyName: "PostgreSQL",
description: // description:
"PostgreSQL, also known as Postgres, is a free and open-source relational database management system emphasizing extensibility and SQL compliance.", // "PostgreSQL, also known as Postgres, is a free and open-source relational database management system emphasizing extensibility and SQL compliance.",
plus: true, // plus: true,
datasource: { // datasource: {
host: { // host: {
type: FIELD_TYPES.STRING, // type: FIELD_TYPES.STRING,
default: "localhost", // default: "localhost",
required: true, // required: true,
}, // },
port: { // port: {
type: FIELD_TYPES.NUMBER, // type: FIELD_TYPES.NUMBER,
required: true, // required: true,
default: 5432, // default: 5432,
}, // },
database: { // database: {
type: FIELD_TYPES.STRING, // type: FIELD_TYPES.STRING,
default: "postgres", // default: "postgres",
required: true, // required: true,
}, // },
user: { // user: {
type: FIELD_TYPES.STRING, // type: FIELD_TYPES.STRING,
default: "root", // default: "root",
required: true, // required: true,
}, // },
password: { // password: {
type: FIELD_TYPES.PASSWORD, // type: FIELD_TYPES.PASSWORD,
default: "root", // default: "root",
required: true, // required: true,
}, // },
ssl: { // ssl: {
type: FIELD_TYPES.BOOLEAN, // type: FIELD_TYPES.BOOLEAN,
default: false, // default: false,
required: false, // required: false,
}, // },
}, // },
} // }
class PostgresPlus extends Sql { // class PostgresPlus extends Sql {
static pool // static pool
COLUMNS_SQL = // COLUMNS_SQL =
"select * from information_schema.columns where table_schema = 'public'" // "select * from information_schema.columns where table_schema = 'public'"
PRIMARY_KEYS_SQL = ` // PRIMARY_KEYS_SQL = `
select tc.table_schema, tc.table_name, kc.column_name as primary_key // select tc.table_schema, tc.table_name, kc.column_name as primary_key
from information_schema.table_constraints tc // from information_schema.table_constraints tc
join // join
information_schema.key_column_usage kc on kc.table_name = tc.table_name // information_schema.key_column_usage kc on kc.table_name = tc.table_name
and kc.table_schema = tc.table_schema // and kc.table_schema = tc.table_schema
and kc.constraint_name = tc.constraint_name // and kc.constraint_name = tc.constraint_name
where tc.constraint_type = 'PRIMARY KEY'; // where tc.constraint_type = 'PRIMARY KEY';
` // `
constructor(config, datasource) { // constructor(config, datasource) {
super("pg") // super("pg")
this.config = config // this.config = config
this.datasource = datasource // this.datasource = datasource
if (!this.pool) { // if (!this.pool) {
this.pool = new Pool(this.config) // this.pool = new Pool(this.config)
} // }
this.client = this.pool // this.client = this.pool
} // }
async init(datasourceId) { // async init(datasourceId) {
let keys = [] // let keys = []
try { // try {
const primaryKeysResponse = await this.client.query(this.PRIMARY_KEYS_SQL) // const primaryKeysResponse = await this.client.query(this.PRIMARY_KEYS_SQL)
for (let table of primaryKeysResponse.rows) { // for (let table of primaryKeysResponse.rows) {
keys.push(table.column_name || table.primary_key) // keys.push(table.column_name || table.primary_key)
} // }
} catch (err) { // } catch (err) {
// TODO: this try catch method isn't right // // TODO: this try catch method isn't right
keys = ["id"] // keys = ["id"]
} // }
const columnsResponse = await this.client.query(this.COLUMNS_SQL) // const columnsResponse = await this.client.query(this.COLUMNS_SQL)
const tables = {} // const tables = {}
for (let column of columnsResponse.rows) { // for (let column of columnsResponse.rows) {
const tableName = column.table_name // const tableName = column.table_name
const columnName = column.column_name // const columnName = column.column_name
// table key doesn't exist yet // // table key doesn't exist yet
if (!tables[tableName]) { // if (!tables[tableName]) {
tables[tableName] = { // tables[tableName] = {
_id: `${datasourceId}${SEPARATOR}${tableName}`, // _id: `${datasourceId}${SEPARATOR}${tableName}`,
// TODO: this needs to accommodate composite keys // // TODO: this needs to accommodate composite keys
primary: keys, // primary: keys,
name: tableName, // name: tableName,
schema: {}, // schema: {},
} // }
} // }
tables[tableName].schema[columnName] = { // tables[tableName].schema[columnName] = {
name: columnName, // name: columnName,
type: TYPE_MAP[column.data_type] || FIELD_TYPES.STRING, // type: TYPE_MAP[column.data_type] || FIELD_TYPES.STRING,
} // }
} // }
this.tables = tables // this.tables = tables
} // }
async query(json) { // async query(json) {
const operation = this._operation(json).toLowerCase() // const operation = this._operation(json).toLowerCase()
const sql = this._query(json) // const sql = this._query(json)
const response = await this.client.query(sql.sql, sql.bindings) // const response = await this.client.query(sql.sql, sql.bindings)
return response.rows.length ? response.rows : [{ [operation]: true }] // return response.rows.length ? response.rows : [{ [operation]: true }]
} // }
} // }
module.exports = { // module.exports = {
schema: SCHEMA, // schema: SCHEMA,
integration: PostgresPlus, // integration: PostgresPlus,
} // }

View File

@ -1,9 +1,12 @@
const { Pool } = require("pg") const { Pool } = require("pg")
const { FIELD_TYPES } = require("./Integration") const { FIELD_TYPES } = require("./Integration")
const Sql = require("./base/sql") const Sql = require("./base/sql")
const { FieldTypes } = require("../constants")
const { SEPARATOR } = require("@budibase/auth/db")
const SCHEMA = { const SCHEMA = {
docs: "https://node-postgres.com", docs: "https://node-postgres.com",
plus: true,
friendlyName: "PostgreSQL", friendlyName: "PostgreSQL",
description: description:
"PostgreSQL, also known as Postgres, is a free and open-source relational database management system emphasizing extensibility and SQL compliance.", "PostgreSQL, also known as Postgres, is a free and open-source relational database management system emphasizing extensibility and SQL compliance.",
@ -55,6 +58,19 @@ const SCHEMA = {
}, },
} }
const TYPE_MAP = {
text: FieldTypes.LONGFORM,
varchar: FieldTypes.STRING,
integer: FieldTypes.NUMBER,
bigint: FieldTypes.NUMBER,
decimal: FieldTypes.NUMBER,
smallint: FieldTypes.NUMBER,
timestamp: FieldTypes.DATETIME,
time: FieldTypes.DATETIME,
boolean: FieldTypes.BOOLEAN,
json: FIELD_TYPES.JSON,
}
async function internalQuery(client, sql) { async function internalQuery(client, sql) {
try { try {
return await client.query(sql) return await client.query(sql)
@ -66,6 +82,19 @@ async function internalQuery(client, sql) {
class PostgresIntegration extends Sql { class PostgresIntegration extends Sql {
static pool static pool
COLUMNS_SQL =
"select * from information_schema.columns where table_schema = 'public'"
PRIMARY_KEYS_SQL = `
select tc.table_schema, tc.table_name, kc.column_name as primary_key
from information_schema.table_constraints tc
join
information_schema.key_column_usage kc on kc.table_name = tc.table_name
and kc.table_schema = tc.table_schema
and kc.constraint_name = tc.constraint_name
where tc.constraint_type = 'PRIMARY KEY';
`
constructor(config) { constructor(config) {
super("pg") super("pg")
this.config = config this.config = config
@ -82,6 +111,48 @@ class PostgresIntegration extends Sql {
this.client = this.pool this.client = this.pool
} }
/**
* Fetches the tables from the postgres table and assigns them to the datasource.
* @param {*} datasourceId - datasourceId to fetch
*/
async buildSchema(datasourceId) {
let keys = []
try {
const primaryKeysResponse = await this.client.query(this.PRIMARY_KEYS_SQL)
for (let table of primaryKeysResponse.rows) {
keys.push(table.column_name || table.primary_key)
}
} catch (err) {
// TODO: this try catch method isn't right
keys = ["id"]
}
const columnsResponse = await this.client.query(this.COLUMNS_SQL)
const tables = {}
for (let column of columnsResponse.rows) {
const tableName = column.table_name
const columnName = column.column_name
// table key doesn't exist yet
if (!tables[tableName]) {
tables[tableName] = {
_id: `${datasourceId}${SEPARATOR}${tableName}`,
// TODO: this needs to accommodate composite keys
primary: keys,
name: tableName,
schema: {},
}
}
tables[tableName].schema[columnName] = {
name: columnName,
type: TYPE_MAP[column.data_type] || FIELD_TYPES.STRING,
}
}
this.tables = tables
}
async create({ sql }) { async create({ sql }) {
const response = await internalQuery(this.client, sql) const response = await internalQuery(this.client, sql)
return response.rows.length ? response.rows : [{ created: true }] return response.rows.length ? response.rows : [{ created: true }]