Added support for JSON array stages

This commit is contained in:
Mel O'Hagan 2022-10-04 11:03:24 +01:00
parent 7b90f56500
commit dbf58fcfe9
3 changed files with 35 additions and 23 deletions

View File

@ -41,6 +41,9 @@
flowEditors[i].update(query.fields.steps[i + 1].value.value)
}
}
$: shouldDisplayJsonBox =
schema.type === QueryTypes.JSON && query.fields.extra?.actionType !== "flow"
</script>
{#if schema}
@ -55,7 +58,7 @@
value={query.fields.sql}
parameters={query.parameters}
/>
{:else if schema.type === QueryTypes.JSON}
{:else if shouldDisplayJsonBox}
<Editor
editorHeight={height}
label="Query"
@ -73,7 +76,7 @@
<Input thin outline disabled value={urlDisplay} />
</div>
{/if}
{:else if schema.type === QueryTypes.FLOW}
{:else if query.fields.extra?.actionType === "flow"}
<br />
{#if (query.fields.steps?.length ?? 0) === 0}
<div class="controls">

View File

@ -57,7 +57,7 @@ const SCHEMA: Integration = {
type: QueryType.JSON,
},
aggregate: {
type: QueryType.FLOW,
type: QueryType.JSON,
},
},
extra: {
@ -66,8 +66,8 @@ const SCHEMA: Integration = {
type: DatasourceFieldType.STRING,
required: true,
},
actionTypes: {
displayName: "Action Types",
actionType: {
displayName: "Action Type",
type: DatasourceFieldType.LIST,
required: true,
data: {
@ -75,6 +75,7 @@ const SCHEMA: Integration = {
create: ["insertOne", "insertMany"],
update: ["updateOne", "updateMany"],
delete: ["deleteOne", "deleteMany"],
aggregate: ["json", "flow"],
},
},
},
@ -170,7 +171,7 @@ class MongoIntegration implements IntegrationBase {
// For mongodb we add an extra actionType to specify
// which method we want to call on the collection
switch (query.extra.actionTypes) {
switch (query.extra.actionType) {
case "insertOne": {
return await collection.insertOne(json)
}
@ -179,7 +180,7 @@ class MongoIntegration implements IntegrationBase {
}
default: {
throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for create`
`actionType ${query.extra.actionType} does not exist on DB for create`
)
}
}
@ -198,7 +199,7 @@ class MongoIntegration implements IntegrationBase {
const collection = db.collection(query.extra.collection)
let json = this.createObjectIds(query.json)
switch (query.extra.actionTypes) {
switch (query.extra.actionType) {
case "find": {
return await collection.find(json).toArray()
}
@ -228,7 +229,7 @@ class MongoIntegration implements IntegrationBase {
}
default: {
throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for read`
`actionType ${query.extra.actionType} does not exist on DB for read`
)
}
}
@ -255,7 +256,7 @@ class MongoIntegration implements IntegrationBase {
options: object
}
switch (query.extra.actionTypes) {
switch (query.extra.actionType) {
case "updateOne": {
return await collection.updateOne(
json.filter,
@ -272,7 +273,7 @@ class MongoIntegration implements IntegrationBase {
}
default: {
throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for update`
`actionType ${query.extra.actionType} does not exist on DB for update`
)
}
}
@ -304,7 +305,7 @@ class MongoIntegration implements IntegrationBase {
}
}
switch (query.extra.actionTypes) {
switch (query.extra.actionType) {
case "deleteOne": {
return await collection.deleteOne(json.filter, json.options)
}
@ -313,7 +314,7 @@ class MongoIntegration implements IntegrationBase {
}
default: {
throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for delete`
`actionType ${query.extra.actionType} does not exist on DB for delete`
)
}
}
@ -325,12 +326,13 @@ class MongoIntegration implements IntegrationBase {
}
}
async aggregate(query: { steps: any[]; extra: { [key: string]: string } }) {
async aggregate(query: { json: object; steps: any[]; extra: { [key: string]: string } }) {
try {
await this.connect()
const db = this.client.db(this.config.db)
const collection = db.collection(query.extra.collection)
let response = []
if (query.extra?.actionType === "flow") {
for await (const doc of collection.aggregate(
query.steps.map(({ key, value }) => {
let temp: any = {}
@ -340,6 +342,14 @@ class MongoIntegration implements IntegrationBase {
)) {
response.push(doc)
}
} else {
const stages: Array<any> = query.json as Array<any>
for await (const doc of collection.aggregate(
stages ? stages : []
)) {
response.push(doc)
}
}
return response
} catch (err) {
console.error("Error writing to mongodb", err)

View File

@ -20,7 +20,6 @@ export enum QueryType {
SQL = "sql",
JSON = "json",
FIELDS = "fields",
FLOW = "flow",
}
export enum DatasourceFieldType {