Added support for JSON array stages

This commit is contained in:
Mel O'Hagan 2022-10-04 11:03:24 +01:00
parent 91db8f10ab
commit 55c112677e
3 changed files with 35 additions and 23 deletions

View File

@ -41,6 +41,9 @@
flowEditors[i].update(query.fields.steps[i + 1].value.value) flowEditors[i].update(query.fields.steps[i + 1].value.value)
} }
} }
$: shouldDisplayJsonBox =
schema.type === QueryTypes.JSON && query.fields.extra?.actionType !== "flow"
</script> </script>
{#if schema} {#if schema}
@ -55,7 +58,7 @@
value={query.fields.sql} value={query.fields.sql}
parameters={query.parameters} parameters={query.parameters}
/> />
{:else if schema.type === QueryTypes.JSON} {:else if shouldDisplayJsonBox}
<Editor <Editor
editorHeight={height} editorHeight={height}
label="Query" label="Query"
@ -73,7 +76,7 @@
<Input thin outline disabled value={urlDisplay} /> <Input thin outline disabled value={urlDisplay} />
</div> </div>
{/if} {/if}
{:else if schema.type === QueryTypes.FLOW} {:else if query.fields.extra?.actionType === "flow"}
<br /> <br />
{#if (query.fields.steps?.length ?? 0) === 0} {#if (query.fields.steps?.length ?? 0) === 0}
<div class="controls"> <div class="controls">

View File

@ -57,7 +57,7 @@ const SCHEMA: Integration = {
type: QueryType.JSON, type: QueryType.JSON,
}, },
aggregate: { aggregate: {
type: QueryType.FLOW, type: QueryType.JSON,
}, },
}, },
extra: { extra: {
@ -66,8 +66,8 @@ const SCHEMA: Integration = {
type: DatasourceFieldType.STRING, type: DatasourceFieldType.STRING,
required: true, required: true,
}, },
actionTypes: { actionType: {
displayName: "Action Types", displayName: "Action Type",
type: DatasourceFieldType.LIST, type: DatasourceFieldType.LIST,
required: true, required: true,
data: { data: {
@ -75,6 +75,7 @@ const SCHEMA: Integration = {
create: ["insertOne", "insertMany"], create: ["insertOne", "insertMany"],
update: ["updateOne", "updateMany"], update: ["updateOne", "updateMany"],
delete: ["deleteOne", "deleteMany"], delete: ["deleteOne", "deleteMany"],
aggregate: ["json", "flow"],
}, },
}, },
}, },
@ -170,7 +171,7 @@ class MongoIntegration implements IntegrationBase {
// For mongodb we add an extra actionType to specify // For mongodb we add an extra actionType to specify
// which method we want to call on the collection // which method we want to call on the collection
switch (query.extra.actionTypes) { switch (query.extra.actionType) {
case "insertOne": { case "insertOne": {
return await collection.insertOne(json) return await collection.insertOne(json)
} }
@ -179,7 +180,7 @@ class MongoIntegration implements IntegrationBase {
} }
default: { default: {
throw new Error( throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for create` `actionType ${query.extra.actionType} does not exist on DB for create`
) )
} }
} }
@ -198,7 +199,7 @@ class MongoIntegration implements IntegrationBase {
const collection = db.collection(query.extra.collection) const collection = db.collection(query.extra.collection)
let json = this.createObjectIds(query.json) let json = this.createObjectIds(query.json)
switch (query.extra.actionTypes) { switch (query.extra.actionType) {
case "find": { case "find": {
return await collection.find(json).toArray() return await collection.find(json).toArray()
} }
@ -228,7 +229,7 @@ class MongoIntegration implements IntegrationBase {
} }
default: { default: {
throw new Error( throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for read` `actionType ${query.extra.actionType} does not exist on DB for read`
) )
} }
} }
@ -255,7 +256,7 @@ class MongoIntegration implements IntegrationBase {
options: object options: object
} }
switch (query.extra.actionTypes) { switch (query.extra.actionType) {
case "updateOne": { case "updateOne": {
return await collection.updateOne( return await collection.updateOne(
json.filter, json.filter,
@ -272,7 +273,7 @@ class MongoIntegration implements IntegrationBase {
} }
default: { default: {
throw new Error( throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for update` `actionType ${query.extra.actionType} does not exist on DB for update`
) )
} }
} }
@ -304,7 +305,7 @@ class MongoIntegration implements IntegrationBase {
} }
} }
switch (query.extra.actionTypes) { switch (query.extra.actionType) {
case "deleteOne": { case "deleteOne": {
return await collection.deleteOne(json.filter, json.options) return await collection.deleteOne(json.filter, json.options)
} }
@ -313,7 +314,7 @@ class MongoIntegration implements IntegrationBase {
} }
default: { default: {
throw new Error( throw new Error(
`actionType ${query.extra.actionTypes} does not exist on DB for delete` `actionType ${query.extra.actionType} does not exist on DB for delete`
) )
} }
} }
@ -325,12 +326,13 @@ class MongoIntegration implements IntegrationBase {
} }
} }
async aggregate(query: { steps: any[]; extra: { [key: string]: string } }) { async aggregate(query: { json: object; steps: any[]; extra: { [key: string]: string } }) {
try { try {
await this.connect() await this.connect()
const db = this.client.db(this.config.db) const db = this.client.db(this.config.db)
const collection = db.collection(query.extra.collection) const collection = db.collection(query.extra.collection)
let response = [] let response = []
if (query.extra?.actionType === "flow") {
for await (const doc of collection.aggregate( for await (const doc of collection.aggregate(
query.steps.map(({ key, value }) => { query.steps.map(({ key, value }) => {
let temp: any = {} let temp: any = {}
@ -340,6 +342,14 @@ class MongoIntegration implements IntegrationBase {
)) { )) {
response.push(doc) response.push(doc)
} }
} else {
const stages: Array<any> = query.json as Array<any>
for await (const doc of collection.aggregate(
stages ? stages : []
)) {
response.push(doc)
}
}
return response return response
} catch (err) { } catch (err) {
console.error("Error writing to mongodb", err) console.error("Error writing to mongodb", err)

View File

@ -20,7 +20,6 @@ export enum QueryType {
SQL = "sql", SQL = "sql",
JSON = "json", JSON = "json",
FIELDS = "fields", FIELDS = "fields",
FLOW = "flow",
} }
export enum DatasourceFieldType { export enum DatasourceFieldType {