2022-05-31 15:39:53 +02:00
import {
Integration ,
QueryTypes ,
SqlQuery ,
DatasourceFieldTypes ,
} from "../definitions/datasource"
2022-05-31 12:58:03 +02:00
import {
SnowflakeError ,
Statement ,
createConnection ,
Connection ,
} from "snowflake-sdk"
2022-05-31 15:39:53 +02:00
import {
SqlClients ,
finaliseExternalTables ,
buildExternalTableId ,
convertSqlType ,
} from "./utils"
import { DatasourcePlus } from "./base/datasourcePlus"
import { Table , TableSchema } from "../definitions/common"
2022-05-30 22:13:45 +02:00
module SnowflakeModule {
2022-05-31 15:39:53 +02:00
const Sql = require ( "./base/sql" )
2022-05-30 22:13:45 +02:00
interface SnowflakeConfig {
account : string
username : string
password : string
warehouse : string
database : string
schema : string
}
const SCHEMA : Integration = {
2022-05-31 15:39:53 +02:00
plus : true ,
2022-05-30 22:13:45 +02:00
docs : "https://developers.snowflake.com/" ,
description :
"Snowflake is a solution for data warehousing, data lakes, data engineering, data science, data application development, and securely sharing and consuming shared data." ,
friendlyName : "Snowflake" ,
datasource : {
account : {
2022-05-31 15:39:53 +02:00
type : DatasourceFieldTypes . STRING ,
2022-05-30 22:13:45 +02:00
required : true ,
} ,
username : {
2022-05-31 15:39:53 +02:00
type : DatasourceFieldTypes . STRING ,
2022-05-30 22:13:45 +02:00
required : true ,
} ,
password : {
2022-05-31 15:39:53 +02:00
type : DatasourceFieldTypes . PASSWORD ,
2022-05-30 22:13:45 +02:00
required : true ,
} ,
warehouse : {
2022-05-31 15:39:53 +02:00
type : DatasourceFieldTypes . STRING ,
2022-05-30 22:13:45 +02:00
required : true ,
} ,
database : {
2022-05-31 15:39:53 +02:00
type : DatasourceFieldTypes . STRING ,
2022-05-30 22:13:45 +02:00
required : true ,
} ,
schema : {
2022-05-31 15:39:53 +02:00
type : DatasourceFieldTypes . STRING ,
2022-05-30 22:13:45 +02:00
required : true ,
} ,
} ,
query : {
create : {
type : QueryTypes . SQL ,
} ,
read : {
type : QueryTypes . SQL ,
} ,
update : {
type : QueryTypes . SQL ,
} ,
delete : {
type : QueryTypes . SQL ,
} ,
} ,
}
2022-05-31 15:39:53 +02:00
class SnowflakeIntegration extends Sql implements DatasourcePlus {
2022-05-31 12:58:03 +02:00
private client : Connection
2022-05-31 15:39:53 +02:00
private config : SnowflakeConfig
public tables : Record < string , Table > = { }
public schemaErrors : Record < string , string > = { }
2022-05-30 22:13:45 +02:00
constructor ( config : SnowflakeConfig ) {
2022-05-31 15:39:53 +02:00
super ( SqlClients . SNOWFLAKE )
this . config = config
2022-05-31 12:58:03 +02:00
this . client = createConnection ( config )
2022-05-30 22:13:45 +02:00
}
2022-05-31 15:39:53 +02:00
getBindingIdentifier ( ) : string {
return "?"
}
getStringConcat ( parts : string [ ] ) : string {
return ` concat( ${ parts . join ( ", " ) } ) `
}
async buildSchema ( datasourceId : string , entities : Record < string , Table > ) {
const tables : { [ key : string ] : Table } = { }
const database = this . config . database
// get the tables first
const tablesResp = await this . internalQuery ( { sql : "SHOW TABLES;" } )
const tableNames = tablesResp . map ( ( obj : any ) = > obj . name )
for ( let tableName of tableNames ) {
const primaryKeys = [ ]
const schema : TableSchema = { }
const descResp = await this . internalQuery ( {
sql : ` DESCRIBE TABLE ${ tableName } ; ` ,
} )
2022-06-01 10:36:28 +02:00
if ( tableName === "CUSTOMER" ) {
console . log ( "DESC = " , descResp )
}
2022-05-31 15:39:53 +02:00
for ( let column of descResp ) {
2022-06-01 10:36:28 +02:00
const columnName = column . Field
if (
column [ "primary key" ] === "Y" &&
primaryKeys . indexOf ( column . Key ) === - 1
) {
2022-05-31 15:39:53 +02:00
primaryKeys . push ( columnName )
}
const constraints = {
presence : column [ "null?" ] !== "Y" ,
}
const isAuto : boolean = column . default
? . toLowerCase ( )
. includes ( "increment" )
schema [ columnName ] = {
name : columnName ,
autocolumn : isAuto ,
constraints ,
. . . convertSqlType ( column [ "type" ] ) ,
}
}
if ( ! tables [ tableName ] ) {
tables [ tableName ] = {
_id : buildExternalTableId ( datasourceId , tableName ) ,
primary : primaryKeys ,
name : tableName ,
schema ,
}
}
}
const final = finaliseExternalTables ( tables , entities )
this . tables = final . tables
this . schemaErrors = final . errors
}
2022-05-31 12:58:03 +02:00
async connectAsync() {
2022-05-30 22:13:45 +02:00
return new Promise ( ( resolve , reject ) = > {
2022-05-31 12:58:03 +02:00
this . client . connect ( function ( err : any , conn : any ) {
2022-05-30 22:13:45 +02:00
if ( err ) reject ( err )
resolve ( conn )
} )
} )
}
2022-05-31 14:10:16 +02:00
async internalQuery ( query : SqlQuery ) {
2022-05-31 15:39:53 +02:00
if ( ! this . client . isUp ( ) ) {
await this . connectAsync ( )
}
2022-05-30 22:13:45 +02:00
let response : any = await new Promise ( ( resolve , reject ) = >
2022-05-31 12:58:03 +02:00
this . client . execute ( {
2022-05-30 22:13:45 +02:00
sqlText : query.sql ,
streamResult : false ,
2022-05-31 12:58:03 +02:00
complete : (
err : SnowflakeError | undefined ,
stmt : Statement ,
rows : any [ ] | undefined
) = > {
if ( err ) reject ( err ? . message . split ( ":" ) [ 1 ] || err ? . message )
2022-05-30 22:13:45 +02:00
resolve ( { rows } )
} ,
} )
)
return response . rows
}
2022-05-31 14:10:16 +02:00
async create ( query : SqlQuery ) {
return this . internalQuery ( query )
}
async read ( query : SqlQuery ) {
return this . internalQuery ( query )
}
async update ( query : SqlQuery ) {
return this . internalQuery ( query )
}
async delete ( query : SqlQuery ) {
return this . internalQuery ( query )
}
2022-05-30 22:13:45 +02:00
}
module .exports = {
schema : SCHEMA ,
integration : SnowflakeIntegration ,
}
}