Merge pull request #3338 from Budibase/feature/query-rbac-timeouts
Query RBAC, query and automation threading/timeouts
This commit is contained in:
commit
a726d158b8
|
@ -23,8 +23,6 @@
|
|||
// Show updated permissions in UI: REMOVE
|
||||
permissions = await permissionsStore.forResource(resourceId)
|
||||
notifications.success("Updated permissions.")
|
||||
// TODO: update permissions
|
||||
// permissions[]
|
||||
}
|
||||
</script>
|
||||
|
||||
|
|
|
@ -19,15 +19,24 @@
|
|||
import IntegrationQueryEditor from "components/integration/index.svelte"
|
||||
import ExternalDataSourceTable from "components/backend/DataTable/ExternalDataSourceTable.svelte"
|
||||
import ParameterBuilder from "components/integration/QueryParameterBuilder.svelte"
|
||||
import { datasources, integrations, queries } from "stores/backend"
|
||||
import {
|
||||
datasources,
|
||||
integrations,
|
||||
queries,
|
||||
roles,
|
||||
permissions,
|
||||
} from "stores/backend"
|
||||
import { capitalise } from "../../helpers"
|
||||
import CodeMirrorEditor from "components/common/CodeMirrorEditor.svelte"
|
||||
import { Roles } from "constants/backend"
|
||||
import { onMount } from "svelte"
|
||||
|
||||
export let query
|
||||
export let fields = []
|
||||
|
||||
let parameters
|
||||
let data = []
|
||||
let roleId
|
||||
const transformerDocs =
|
||||
"https://docs.budibase.com/building-apps/data/transformers"
|
||||
const typeOptions = [
|
||||
|
@ -70,7 +79,22 @@
|
|||
}
|
||||
|
||||
function resetDependentFields() {
|
||||
if (query.fields.extra) query.fields.extra = {}
|
||||
if (query.fields.extra) {
|
||||
query.fields.extra = {}
|
||||
}
|
||||
}
|
||||
|
||||
async function updateRole(role, id = null) {
|
||||
roleId = role
|
||||
if (query?._id || id) {
|
||||
for (let level of ["read", "write"]) {
|
||||
await permissions.save({
|
||||
level,
|
||||
role,
|
||||
resource: query?._id || id,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function populateExtraQuery(extraQueryFields) {
|
||||
|
@ -122,6 +146,7 @@
|
|||
async function saveQuery() {
|
||||
try {
|
||||
const { _id } = await queries.save(query.datasourceId, query)
|
||||
await updateRole(roleId, _id)
|
||||
notifications.success(`Query saved successfully.`)
|
||||
$goto(`../${_id}`)
|
||||
} catch (err) {
|
||||
|
@ -129,6 +154,18 @@
|
|||
notifications.error(`Error creating query. ${err.message}`)
|
||||
}
|
||||
}
|
||||
|
||||
onMount(async () => {
|
||||
if (!query || !query._id) {
|
||||
roleId = Roles.BASIC
|
||||
return
|
||||
}
|
||||
try {
|
||||
roleId = (await permissions.forResource(query._id))["read"]
|
||||
} catch (err) {
|
||||
roleId = Roles.BASIC
|
||||
}
|
||||
})
|
||||
</script>
|
||||
|
||||
<Layout gap="S" noPadding>
|
||||
|
@ -151,6 +188,16 @@
|
|||
queryConfig[verb]?.displayName || capitalise(verb)}
|
||||
/>
|
||||
</div>
|
||||
<div class="config-field">
|
||||
<Label>Access level</Label>
|
||||
<Select
|
||||
value={roleId}
|
||||
on:change={e => updateRole(e.detail)}
|
||||
options={$roles}
|
||||
getOptionLabel={x => x.name}
|
||||
getOptionValue={x => x._id}
|
||||
/>
|
||||
</div>
|
||||
{#if integrationInfo?.extra && query.queryVerb}
|
||||
<ExtraQueryConfig
|
||||
{query}
|
||||
|
|
|
@ -95,6 +95,7 @@ export function createDatasourcesStore() {
|
|||
return { list: sources, selected: null }
|
||||
})
|
||||
|
||||
await queries.fetch()
|
||||
return response
|
||||
},
|
||||
removeSchemaError: () => {
|
||||
|
|
|
@ -10,13 +10,11 @@ export function createPermissionStore() {
|
|||
const response = await api.post(
|
||||
`/api/permission/${role}/${resource}/${level}`
|
||||
)
|
||||
const json = await response.json()
|
||||
return json
|
||||
return await response.json()
|
||||
},
|
||||
forResource: async resourceId => {
|
||||
const response = await api.get(`/api/permission/${resourceId}`)
|
||||
const json = await response.json()
|
||||
return json
|
||||
return await response.json()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -120,6 +120,7 @@
|
|||
"uuid": "3.3.2",
|
||||
"validate.js": "0.13.1",
|
||||
"vm2": "^3.9.3",
|
||||
"worker-farm": "^1.7.0",
|
||||
"yargs": "13.2.4",
|
||||
"zlib": "1.0.5"
|
||||
},
|
||||
|
|
|
@ -119,8 +119,16 @@ exports.destroy = async function (ctx) {
|
|||
const db = new CouchDB(ctx.appId)
|
||||
|
||||
// Delete all queries for the datasource
|
||||
const rows = await db.allDocs(getQueryParams(ctx.params.datasourceId, null))
|
||||
await db.bulkDocs(rows.rows.map(row => ({ ...row.doc, _deleted: true })))
|
||||
const queries = await db.allDocs(
|
||||
getQueryParams(ctx.params.datasourceId, null)
|
||||
)
|
||||
await db.bulkDocs(
|
||||
queries.rows.map(row => ({
|
||||
_id: row.id,
|
||||
_rev: row.value.rev,
|
||||
_deleted: true,
|
||||
}))
|
||||
)
|
||||
|
||||
// delete the datasource
|
||||
await db.remove(ctx.params.datasourceId, ctx.params.revId)
|
||||
|
|
|
@ -1,10 +1,11 @@
|
|||
const { processString } = require("@budibase/string-templates")
|
||||
const CouchDB = require("../../db")
|
||||
const { generateQueryID, getQueryParams } = require("../../db/utils")
|
||||
const { integrations } = require("../../integrations")
|
||||
const { BaseQueryVerbs } = require("../../constants")
|
||||
const env = require("../../environment")
|
||||
const ScriptRunner = require("../../utilities/scriptRunner")
|
||||
const { Thread, ThreadType } = require("../../threads")
|
||||
|
||||
const Runner = new Thread(ThreadType.QUERY, { timeoutMs: 10000 })
|
||||
|
||||
// simple function to append "readable" to all read queries
|
||||
function enrichQueries(input) {
|
||||
|
@ -18,47 +19,6 @@ function enrichQueries(input) {
|
|||
return wasArray ? queries : queries[0]
|
||||
}
|
||||
|
||||
function formatResponse(resp) {
|
||||
if (typeof resp === "string") {
|
||||
try {
|
||||
resp = JSON.parse(resp)
|
||||
} catch (err) {
|
||||
resp = { response: resp }
|
||||
}
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
async function runAndTransform(
|
||||
integration,
|
||||
queryVerb,
|
||||
enrichedQuery,
|
||||
transformer
|
||||
) {
|
||||
let rows = formatResponse(await integration[queryVerb](enrichedQuery))
|
||||
|
||||
// transform as required
|
||||
if (transformer) {
|
||||
const runner = new ScriptRunner(transformer, { data: rows })
|
||||
rows = runner.execute()
|
||||
}
|
||||
|
||||
// needs to an array for next step
|
||||
if (!Array.isArray(rows)) {
|
||||
rows = [rows]
|
||||
}
|
||||
|
||||
// map into JSON if just raw primitive here
|
||||
if (rows.find(row => typeof row !== "object")) {
|
||||
rows = rows.map(value => ({ value }))
|
||||
}
|
||||
|
||||
// get all the potential fields in the schema
|
||||
let keys = rows.flatMap(Object.keys)
|
||||
|
||||
return { rows, keys }
|
||||
}
|
||||
|
||||
exports.fetch = async function (ctx) {
|
||||
const db = new CouchDB(ctx.appId)
|
||||
|
||||
|
@ -143,30 +103,23 @@ exports.preview = async function (ctx) {
|
|||
|
||||
const datasource = await db.get(ctx.request.body.datasourceId)
|
||||
|
||||
const Integration = integrations[datasource.source]
|
||||
|
||||
if (!Integration) {
|
||||
ctx.throw(400, "Integration type does not exist.")
|
||||
}
|
||||
|
||||
const { fields, parameters, queryVerb, transformer } = ctx.request.body
|
||||
const enrichedQuery = await enrichQueryFields(fields, parameters)
|
||||
const integration = new Integration(datasource.config)
|
||||
|
||||
const { rows, keys } = await runAndTransform(
|
||||
integration,
|
||||
queryVerb,
|
||||
enrichedQuery,
|
||||
transformer
|
||||
)
|
||||
try {
|
||||
const { rows, keys } = await Runner.run({
|
||||
datasource,
|
||||
queryVerb,
|
||||
query: enrichedQuery,
|
||||
transformer,
|
||||
})
|
||||
|
||||
ctx.body = {
|
||||
rows,
|
||||
schemaFields: [...new Set(keys)],
|
||||
}
|
||||
// cleanup
|
||||
if (integration.end) {
|
||||
integration.end()
|
||||
ctx.body = {
|
||||
rows,
|
||||
schemaFields: [...new Set(keys)],
|
||||
}
|
||||
} catch (err) {
|
||||
ctx.throw(400, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -176,29 +129,22 @@ exports.execute = async function (ctx) {
|
|||
const query = await db.get(ctx.params.queryId)
|
||||
const datasource = await db.get(query.datasourceId)
|
||||
|
||||
const Integration = integrations[datasource.source]
|
||||
|
||||
if (!Integration) {
|
||||
ctx.throw(400, "Integration type does not exist.")
|
||||
}
|
||||
|
||||
const enrichedQuery = await enrichQueryFields(
|
||||
query.fields,
|
||||
ctx.request.body.parameters
|
||||
)
|
||||
const integration = new Integration(datasource.config)
|
||||
|
||||
// call the relevant CRUD method on the integration class
|
||||
const { rows } = await runAndTransform(
|
||||
integration,
|
||||
query.queryVerb,
|
||||
enrichedQuery,
|
||||
query.transformer
|
||||
)
|
||||
ctx.body = rows
|
||||
// cleanup
|
||||
if (integration.end) {
|
||||
integration.end()
|
||||
try {
|
||||
const { rows } = await Runner.run({
|
||||
datasource,
|
||||
queryVerb: query.queryVerb,
|
||||
query: enrichedQuery,
|
||||
transformer: query.transformer,
|
||||
})
|
||||
ctx.body = rows
|
||||
} catch (err) {
|
||||
ctx.throw(400, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -66,6 +66,7 @@ router
|
|||
)
|
||||
.get(
|
||||
"/api/queries/:queryId",
|
||||
paramResource("queryId"),
|
||||
authorized(PermissionTypes.QUERY, PermissionLevels.READ),
|
||||
queryController.find
|
||||
)
|
||||
|
|
|
@ -9,12 +9,27 @@ const { JobQueues } = require("../constants")
|
|||
const { utils } = require("@budibase/auth/redis")
|
||||
const { opts, redisProtocolUrl } = utils.getRedisOptions()
|
||||
|
||||
const redisConfig = redisProtocolUrl || { redis: opts }
|
||||
let automationQueue = new Queue(JobQueues.AUTOMATIONS, redisConfig)
|
||||
const CLEANUP_PERIOD_MS = 60 * 1000
|
||||
const queueConfig = redisProtocolUrl || { redis: opts }
|
||||
let cleanupInternal = null
|
||||
|
||||
let automationQueue = new Queue(JobQueues.AUTOMATIONS, queueConfig)
|
||||
|
||||
async function cleanup() {
|
||||
await automationQueue.clean(CLEANUP_PERIOD_MS, "completed")
|
||||
}
|
||||
|
||||
exports.pathPrefix = "/bulladmin"
|
||||
|
||||
exports.init = () => {
|
||||
// cleanup the events every 5 minutes
|
||||
if (!cleanupInternal) {
|
||||
cleanupInternal = setInterval(cleanup, CLEANUP_PERIOD_MS)
|
||||
// fire off an initial cleanup
|
||||
cleanup().catch(err => {
|
||||
console.error(`Unable to cleanup automation queue initially - ${err}`)
|
||||
})
|
||||
}
|
||||
const expressApp = express()
|
||||
// Set up queues for bull board admin
|
||||
const queues = [automationQueue]
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
jest.mock("../../utilities/usageQuota")
|
||||
jest.mock("../thread")
|
||||
jest.mock("../../threads/automation")
|
||||
jest.mock("../../utilities/redis", () => ({
|
||||
init: jest.fn(),
|
||||
checkTestFlag: () => {
|
||||
|
@ -11,8 +11,7 @@ jest.spyOn(global.console, "error")
|
|||
|
||||
require("../../environment")
|
||||
const automation = require("../index")
|
||||
const usageQuota = require("../../utilities/usageQuota")
|
||||
const thread = require("../thread")
|
||||
const thread = require("../../threads/automation")
|
||||
const triggers = require("../triggers")
|
||||
const { basicAutomation } = require("../../tests/utilities/structures")
|
||||
const { wait } = require("../../utilities")
|
||||
|
@ -62,7 +61,7 @@ describe("Run through some parts of the automations system", () => {
|
|||
}
|
||||
}
|
||||
}
|
||||
}))
|
||||
}), expect.any(Function))
|
||||
})
|
||||
|
||||
it("should be able to clean inputs with the utilities", () => {
|
||||
|
|
|
@ -11,6 +11,10 @@ const utils = require("./utils")
|
|||
const env = require("../environment")
|
||||
|
||||
const TRIGGER_DEFINITIONS = definitions
|
||||
const JOB_OPTS = {
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true,
|
||||
}
|
||||
|
||||
async function queueRelevantRowAutomations(event, eventType) {
|
||||
if (event.appId == null) {
|
||||
|
@ -47,7 +51,7 @@ async function queueRelevantRowAutomations(event, eventType) {
|
|||
automationTrigger.inputs &&
|
||||
automationTrigger.inputs.tableId === event.row.tableId
|
||||
) {
|
||||
await queue.add({ automation, event })
|
||||
await queue.add({ automation, event }, JOB_OPTS)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -86,7 +90,7 @@ exports.externalTrigger = async function (
|
|||
automation.definition.trigger != null &&
|
||||
automation.definition.trigger.stepId === definitions.APP.stepId &&
|
||||
automation.definition.trigger.stepId === "APP" &&
|
||||
!checkTestFlag(automation._id)
|
||||
!(await checkTestFlag(automation._id))
|
||||
) {
|
||||
// values are likely to be submitted as strings, so we shall convert to correct type
|
||||
const coercedFields = {}
|
||||
|
@ -100,7 +104,7 @@ exports.externalTrigger = async function (
|
|||
if (getResponses) {
|
||||
return utils.processEvent({ data })
|
||||
} else {
|
||||
return queue.add(data)
|
||||
return queue.add(data, JOB_OPTS)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
const runner = require("./thread")
|
||||
const { Thread, ThreadType } = require("../threads")
|
||||
const { definitions } = require("./triggerInfo")
|
||||
const webhooks = require("../api/controllers/webhook")
|
||||
const CouchDB = require("../db")
|
||||
|
@ -10,11 +10,12 @@ const { getDeployedAppID } = require("@budibase/auth/db")
|
|||
|
||||
const WH_STEP_ID = definitions.WEBHOOK.stepId
|
||||
const CRON_STEP_ID = definitions.CRON.stepId
|
||||
const Runner = new Thread(ThreadType.AUTOMATION)
|
||||
|
||||
exports.processEvent = async job => {
|
||||
try {
|
||||
// need to actually await these so that an error can be captured properly
|
||||
return await runner(job)
|
||||
return await Runner.run(job)
|
||||
} catch (err) {
|
||||
console.error(
|
||||
`${job.data.automation.appId} automation ${job.data.automation._id} was unable to run - ${err}`
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module AirtableModule {
|
||||
const Airtable = require("airtable")
|
||||
|
@ -73,7 +74,7 @@ module AirtableModule {
|
|||
},
|
||||
}
|
||||
|
||||
class AirtableIntegration {
|
||||
class AirtableIntegration implements IntegrationBase {
|
||||
private config: AirtableConfig
|
||||
private client: any
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module ArangoModule {
|
||||
const { Database, aql } = require("arangojs")
|
||||
|
@ -55,7 +56,7 @@ module ArangoModule {
|
|||
},
|
||||
}
|
||||
|
||||
class ArangoDBIntegration {
|
||||
class ArangoDBIntegration implements IntegrationBase {
|
||||
private config: ArangodbConfig
|
||||
private client: any
|
||||
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
export interface IntegrationBase {
|
||||
create?(query: any): Promise<[any]>
|
||||
read?(query: any): Promise<[any]>
|
||||
update?(query: any): Promise<[any]>
|
||||
delete?(query: any): Promise<[any]>
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
import { Table } from "../../definitions/common"
|
||||
import { IntegrationBase } from "./IntegrationBase"
|
||||
|
||||
export interface DatasourcePlus {
|
||||
export interface DatasourcePlus extends IntegrationBase {
|
||||
tables: Record<string, Table>
|
||||
schemaErrors: Record<string, string>
|
||||
|
||||
|
|
|
@ -13,22 +13,50 @@ import SqlTableQueryBuilder from "./sqlTable"
|
|||
const BASE_LIMIT = 5000
|
||||
|
||||
type KnexQuery = Knex.QueryBuilder | Knex
|
||||
// these are invalid dates sent by the client, need to convert them to a real max date
|
||||
const MIN_ISO_DATE = "0000-00-00T00:00:00.000Z"
|
||||
const MAX_ISO_DATE = "9999-00-00T00:00:00.000Z"
|
||||
|
||||
function parse(input: any) {
|
||||
if (Array.isArray(input)) {
|
||||
return JSON.stringify(input)
|
||||
}
|
||||
if (typeof input !== "string") {
|
||||
return input
|
||||
}
|
||||
if (input === MAX_ISO_DATE) {
|
||||
return new Date(8640000000000000)
|
||||
}
|
||||
if (input === MIN_ISO_DATE) {
|
||||
return new Date(-8640000000000000)
|
||||
}
|
||||
if (isIsoDateString(input)) {
|
||||
return new Date(input)
|
||||
}
|
||||
return input
|
||||
}
|
||||
|
||||
function parseBody(body: any) {
|
||||
for (let [key, value] of Object.entries(body)) {
|
||||
if (Array.isArray(value)) {
|
||||
body[key] = JSON.stringify(value)
|
||||
}
|
||||
if (typeof value !== "string") {
|
||||
continue
|
||||
}
|
||||
if (isIsoDateString(value)) {
|
||||
body[key] = new Date(value)
|
||||
}
|
||||
body[key] = parse(value)
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
function parseFilters(filters: SearchFilters): SearchFilters {
|
||||
for (let [key, value] of Object.entries(filters)) {
|
||||
let parsed
|
||||
if (typeof value === "object") {
|
||||
parsed = parseFilters(value)
|
||||
} else {
|
||||
parsed = parse(value)
|
||||
}
|
||||
// @ts-ignore
|
||||
filters[key] = parsed
|
||||
}
|
||||
return filters
|
||||
}
|
||||
|
||||
class InternalBuilder {
|
||||
private readonly client: string
|
||||
|
||||
|
@ -53,6 +81,7 @@ class InternalBuilder {
|
|||
if (!filters) {
|
||||
return query
|
||||
}
|
||||
filters = parseFilters(filters)
|
||||
// if all or specified in filters, then everything is an or
|
||||
const allOr = filters.allOr
|
||||
if (filters.oneOf) {
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module CouchDBModule {
|
||||
const PouchDB = require("pouchdb")
|
||||
|
@ -50,7 +51,7 @@ module CouchDBModule {
|
|||
},
|
||||
}
|
||||
|
||||
class CouchDBIntegration {
|
||||
class CouchDBIntegration implements IntegrationBase {
|
||||
private config: CouchDBConfig
|
||||
private client: any
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module DynamoModule {
|
||||
const AWS = require("aws-sdk")
|
||||
|
@ -113,7 +114,7 @@ module DynamoModule {
|
|||
},
|
||||
}
|
||||
|
||||
class DynamoDBIntegration {
|
||||
class DynamoDBIntegration implements IntegrationBase {
|
||||
private config: DynamoDBConfig
|
||||
private client: any
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module ElasticsearchModule {
|
||||
const { Client } = require("@elastic/elasticsearch")
|
||||
|
@ -74,7 +75,7 @@ module ElasticsearchModule {
|
|||
},
|
||||
}
|
||||
|
||||
class ElasticSearchIntegration {
|
||||
class ElasticSearchIntegration implements IntegrationBase {
|
||||
private config: ElasticsearchConfig
|
||||
private client: any
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module MongoDBModule {
|
||||
const { MongoClient } = require("mongodb")
|
||||
|
@ -62,7 +63,7 @@ module MongoDBModule {
|
|||
},
|
||||
}
|
||||
|
||||
class MongoIntegration {
|
||||
class MongoIntegration implements IntegrationBase {
|
||||
private config: MongoDBConfig
|
||||
private client: any
|
||||
|
||||
|
|
|
@ -184,7 +184,7 @@ module MySQLModule {
|
|||
return results.length ? results : [{ created: true }]
|
||||
}
|
||||
|
||||
read(query: SqlQuery | string) {
|
||||
async read(query: SqlQuery | string) {
|
||||
return internalQuery(this.client, getSqlQuery(query))
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import {
|
|||
DatasourceFieldTypes,
|
||||
QueryTypes,
|
||||
} from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module RestModule {
|
||||
const fetch = require("node-fetch")
|
||||
|
@ -131,7 +132,7 @@ module RestModule {
|
|||
},
|
||||
}
|
||||
|
||||
class RestIntegration {
|
||||
class RestIntegration implements IntegrationBase {
|
||||
private config: RestConfig
|
||||
private headers: {
|
||||
[key: string]: string
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import { Integration, QueryTypes } from "../definitions/datasource"
|
||||
import { IntegrationBase } from "./base/IntegrationBase"
|
||||
|
||||
module S3Module {
|
||||
const AWS = require("aws-sdk")
|
||||
|
@ -42,7 +43,7 @@ module S3Module {
|
|||
},
|
||||
}
|
||||
|
||||
class S3Integration {
|
||||
class S3Integration implements IntegrationBase {
|
||||
private readonly config: S3Config
|
||||
private client: any
|
||||
private connectionPromise: Promise<any>
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
const actions = require("./actions")
|
||||
const automationUtils = require("./automationUtils")
|
||||
const actions = require("../automations/actions")
|
||||
const automationUtils = require("../automations/automationUtils")
|
||||
const AutomationEmitter = require("../events/AutomationEmitter")
|
||||
const { processObject } = require("@budibase/string-templates")
|
||||
const { DEFAULT_TENANT_ID } = require("@budibase/auth").constants
|
||||
|
@ -8,8 +8,10 @@ const { DocumentTypes, isDevAppID } = require("../db/utils")
|
|||
const { doInTenant } = require("@budibase/auth/tenancy")
|
||||
const env = require("../environment")
|
||||
const usage = require("../utilities/usageQuota")
|
||||
const { definitions: triggerDefs } = require("../automations/triggerInfo")
|
||||
|
||||
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId
|
||||
const CRON_STEP_ID = triggerDefs.CRON.stepId
|
||||
const STOPPED_STATUS = { success: false, status: "STOPPED" }
|
||||
|
||||
/**
|
||||
|
@ -23,6 +25,8 @@ class Orchestrator {
|
|||
this._chainCount = this._metadata ? this._metadata.automationChainCount : 0
|
||||
this._appId = triggerOutput.appId
|
||||
this._app = null
|
||||
const triggerStepId = automation.definition.trigger.stepId
|
||||
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
|
||||
// remove from context
|
||||
delete triggerOutput.appId
|
||||
delete triggerOutput.metadata
|
||||
|
@ -34,11 +38,17 @@ class Orchestrator {
|
|||
this._emitter = new AutomationEmitter(this._chainCount + 1)
|
||||
this.executionOutput = { trigger: {}, steps: [] }
|
||||
// setup the execution output
|
||||
const triggerStepId = automation.definition.trigger.stepId
|
||||
const triggerId = automation.definition.trigger.id
|
||||
this.updateExecutionOutput(triggerId, triggerStepId, null, triggerOutput)
|
||||
}
|
||||
|
||||
cleanupTriggerOutputs(stepId, triggerOutput) {
|
||||
if (stepId === CRON_STEP_ID) {
|
||||
triggerOutput.timestamp = Date.now()
|
||||
}
|
||||
return triggerOutput
|
||||
}
|
||||
|
||||
async getStepFunctionality(stepId) {
|
||||
let step = await actions.getAction(stepId)
|
||||
if (step == null) {
|
||||
|
@ -119,10 +129,17 @@ class Orchestrator {
|
|||
}
|
||||
}
|
||||
|
||||
module.exports = async job => {
|
||||
module.exports = (input, callback) => {
|
||||
const automationOrchestrator = new Orchestrator(
|
||||
job.data.automation,
|
||||
job.data.event
|
||||
input.data.automation,
|
||||
input.data.event
|
||||
)
|
||||
return automationOrchestrator.execute()
|
||||
automationOrchestrator
|
||||
.execute()
|
||||
.then(response => {
|
||||
callback(null, response)
|
||||
})
|
||||
.catch(err => {
|
||||
callback(err)
|
||||
})
|
||||
}
|
|
@ -0,0 +1,60 @@
|
|||
const workerFarm = require("worker-farm")
|
||||
const env = require("../environment")
|
||||
|
||||
const ThreadType = {
|
||||
QUERY: "query",
|
||||
AUTOMATION: "automation",
|
||||
}
|
||||
|
||||
function typeToFile(type) {
|
||||
let filename = null
|
||||
switch (type) {
|
||||
case ThreadType.QUERY:
|
||||
filename = "./query"
|
||||
break
|
||||
case ThreadType.AUTOMATION:
|
||||
filename = "./automation"
|
||||
break
|
||||
default:
|
||||
throw "Unknown thread type"
|
||||
}
|
||||
return require.resolve(filename)
|
||||
}
|
||||
|
||||
class Thread {
|
||||
constructor(type, opts = { timeoutMs: null, count: 1 }) {
|
||||
this.type = type
|
||||
if (!env.isTest()) {
|
||||
const workerOpts = {
|
||||
autoStart: true,
|
||||
maxConcurrentWorkers: opts.count ? opts.count : 1,
|
||||
}
|
||||
if (opts.timeoutMs) {
|
||||
workerOpts.maxCallTime = opts.timeoutMs
|
||||
}
|
||||
this.workers = workerFarm(workerOpts, typeToFile(type))
|
||||
}
|
||||
}
|
||||
|
||||
run(data) {
|
||||
return new Promise((resolve, reject) => {
|
||||
let fncToCall
|
||||
// if in test then don't use threading
|
||||
if (env.isTest()) {
|
||||
fncToCall = require(typeToFile(this.type))
|
||||
} else {
|
||||
fncToCall = this.workers
|
||||
}
|
||||
fncToCall(data, (err, response) => {
|
||||
if (err) {
|
||||
reject(err)
|
||||
} else {
|
||||
resolve(response)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
module.exports.Thread = Thread
|
||||
module.exports.ThreadType = ThreadType
|
|
@ -0,0 +1,63 @@
|
|||
const ScriptRunner = require("../utilities/scriptRunner")
|
||||
const { integrations } = require("../integrations")
|
||||
|
||||
function formatResponse(resp) {
|
||||
if (typeof resp === "string") {
|
||||
try {
|
||||
resp = JSON.parse(resp)
|
||||
} catch (err) {
|
||||
resp = { response: resp }
|
||||
}
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
async function runAndTransform(datasource, queryVerb, query, transformer) {
|
||||
const Integration = integrations[datasource.source]
|
||||
if (!Integration) {
|
||||
throw "Integration type does not exist."
|
||||
}
|
||||
const integration = new Integration(datasource.config)
|
||||
|
||||
let rows = formatResponse(await integration[queryVerb](query))
|
||||
|
||||
// transform as required
|
||||
if (transformer) {
|
||||
const runner = new ScriptRunner(transformer, { data: rows })
|
||||
rows = runner.execute()
|
||||
}
|
||||
|
||||
// needs to an array for next step
|
||||
if (!Array.isArray(rows)) {
|
||||
rows = [rows]
|
||||
}
|
||||
|
||||
// map into JSON if just raw primitive here
|
||||
if (rows.find(row => typeof row !== "object")) {
|
||||
rows = rows.map(value => ({ value }))
|
||||
}
|
||||
|
||||
// get all the potential fields in the schema
|
||||
let keys = rows.flatMap(Object.keys)
|
||||
|
||||
if (integration.end) {
|
||||
integration.end()
|
||||
}
|
||||
|
||||
return { rows, keys }
|
||||
}
|
||||
|
||||
module.exports = (input, callback) => {
|
||||
runAndTransform(
|
||||
input.datasource,
|
||||
input.queryVerb,
|
||||
input.query,
|
||||
input.transformer
|
||||
)
|
||||
.then(response => {
|
||||
callback(null, response)
|
||||
})
|
||||
.catch(err => {
|
||||
callback(err)
|
||||
})
|
||||
}
|
|
@ -89,6 +89,13 @@ class InMemoryQueue {
|
|||
getRepeatableJobs() {
|
||||
return []
|
||||
}
|
||||
|
||||
/**
|
||||
* Implemented for tests
|
||||
*/
|
||||
async clean() {
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = InMemoryQueue
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
const fetch = require("node-fetch")
|
||||
const { VM, VMScript } = require("vm2")
|
||||
const JS_TIMEOUT_MS = 1000
|
||||
|
||||
class ScriptRunner {
|
||||
constructor(script, context) {
|
||||
const code = `let fn = () => {\n${script}\n}; results.out = fn();`
|
||||
this.vm = new VM()
|
||||
this.vm = new VM({
|
||||
timeout: JS_TIMEOUT_MS,
|
||||
})
|
||||
this.results = { out: "" }
|
||||
this.vm.setGlobals(context)
|
||||
this.vm.setGlobal("fetch", fetch)
|
||||
|
|
|
@ -2,7 +2,7 @@
|
|||
"compilerOptions": {
|
||||
"target": "es6",
|
||||
"module": "commonjs",
|
||||
"lib": ["es6"],
|
||||
"lib": ["es2019"],
|
||||
"allowJs": true,
|
||||
"outDir": "dist",
|
||||
"strict": true,
|
||||
|
|
|
@ -943,10 +943,10 @@
|
|||
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
|
||||
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
|
||||
|
||||
"@budibase/auth@^0.9.169-alpha.1":
|
||||
version "0.9.169"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/auth/-/auth-0.9.169.tgz#fd2a8fc271782ba857259ace15118a4d53b3d161"
|
||||
integrity sha512-Q087k/54Nzx6Oeg5uL7YD/9BB+qkBWIv7h4ct+cNQJFNK/aKKN8JLQft+z3mBN5omHTkdJYFmbgXWFxtX+rR3Q==
|
||||
"@budibase/auth@^0.9.180-alpha.1":
|
||||
version "0.9.183"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/auth/-/auth-0.9.183.tgz#da5a7e8b8ba9909d33399bbcd1b7164690ada257"
|
||||
integrity sha512-BNlD4f7YfQejaq1wgMiIPzkNB+fu0HFpg9lyPYaD/mDWpa0F3HdMK3LxYewda9uRy9LJf6LtR3NJxVFvo0zXHA==
|
||||
dependencies:
|
||||
"@techpass/passport-openidconnect" "^0.3.0"
|
||||
aws-sdk "^2.901.0"
|
||||
|
@ -956,6 +956,7 @@
|
|||
jsonwebtoken "^8.5.1"
|
||||
koa-passport "^4.1.4"
|
||||
lodash "^4.17.21"
|
||||
lodash.isarguments "^3.1.0"
|
||||
node-fetch "^2.6.1"
|
||||
passport-google-auth "^1.0.2"
|
||||
passport-google-oauth "^2.0.0"
|
||||
|
@ -1015,10 +1016,10 @@
|
|||
svelte-flatpickr "^3.1.0"
|
||||
svelte-portal "^1.0.0"
|
||||
|
||||
"@budibase/bbui@^0.9.169":
|
||||
version "0.9.169"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/bbui/-/bbui-0.9.169.tgz#e8dac59b9792a7edf03c4301a9069760e2ebd2f4"
|
||||
integrity sha512-2hks6GEjcXbDUzC37WgJvgloiqTP5ZS7IuRjlHU9kStDr6dAnXuy8pO6JNJmKrTXt+rgtwhHHrVWzzcmNLIYxA==
|
||||
"@budibase/bbui@^0.9.183":
|
||||
version "0.9.183"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/bbui/-/bbui-0.9.183.tgz#7e2ad9a34ec5ae9f32bc9d263199217b324f1b8c"
|
||||
integrity sha512-SFTb5rxfUB1rVYMASvtwVYb5XDhSdsQ1Fkr85Mn+ME284WQqBeJKRSz87jLVXJFQAnSpPEDUShOUTTFVByqpig==
|
||||
dependencies:
|
||||
"@adobe/spectrum-css-workflow-icons" "^1.2.1"
|
||||
"@spectrum-css/actionbutton" "^1.0.1"
|
||||
|
@ -1064,14 +1065,14 @@
|
|||
svelte-flatpickr "^3.2.3"
|
||||
svelte-portal "^1.0.0"
|
||||
|
||||
"@budibase/client@^0.9.169-alpha.1":
|
||||
version "0.9.169"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/client/-/client-0.9.169.tgz#bec370b8f069b42f62483b281d6b9e2c7c8625f3"
|
||||
integrity sha512-/bDnwv2iRysZrcrBQJEKzuxdwkwoJ2FalmQFhsfj+V/MWBN/wpQSDbJZQwf/YcI5bQk8f7xIn95O+DMH/m5izg==
|
||||
"@budibase/client@^0.9.180-alpha.1":
|
||||
version "0.9.183"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/client/-/client-0.9.183.tgz#cf86a2e0382d7e4a0898630f10f17d7640ce256d"
|
||||
integrity sha512-1gw8EVIwouNJtYPgByX97EyeegAm35+jSd6irjU0PQEKldtvw2vLI9hmatvUdkUqLFUCT5PeXq37xfkp2JCYLQ==
|
||||
dependencies:
|
||||
"@budibase/bbui" "^0.9.169"
|
||||
"@budibase/bbui" "^0.9.183"
|
||||
"@budibase/standard-components" "^0.9.139"
|
||||
"@budibase/string-templates" "^0.9.169"
|
||||
"@budibase/string-templates" "^0.9.183"
|
||||
regexparam "^1.3.0"
|
||||
shortid "^2.2.15"
|
||||
svelte-spa-router "^3.0.5"
|
||||
|
@ -1121,16 +1122,17 @@
|
|||
svelte-apexcharts "^1.0.2"
|
||||
svelte-flatpickr "^3.1.0"
|
||||
|
||||
"@budibase/string-templates@^0.9.169", "@budibase/string-templates@^0.9.169-alpha.1":
|
||||
version "0.9.169"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/string-templates/-/string-templates-0.9.169.tgz#3c0be97718f39a92ff6b2dbb8b470aaa7851005e"
|
||||
integrity sha512-JUyg6XuUgFqnfdDSCAplo4cTtrqdSZ9NPrU3iGudZEQjO/Wk5sezWPznl3Yw/kFHKmPLjFHIveEa2+lODEAxIA==
|
||||
"@budibase/string-templates@^0.9.180-alpha.1", "@budibase/string-templates@^0.9.183":
|
||||
version "0.9.183"
|
||||
resolved "https://registry.yarnpkg.com/@budibase/string-templates/-/string-templates-0.9.183.tgz#c75dc298d8ec69e1717721b46c3c99448b5ee0a1"
|
||||
integrity sha512-S3Z81c2YGtG0hUXvOrDKn8Gj4iu1adxIDeNgHJAsesID3/SrI9KBhExx1HzIP14SLZlFEao5A12cVtpFBHC7LQ==
|
||||
dependencies:
|
||||
"@budibase/handlebars-helpers" "^0.11.7"
|
||||
dayjs "^1.10.4"
|
||||
handlebars "^4.7.6"
|
||||
handlebars-utils "^1.0.6"
|
||||
lodash "^4.17.20"
|
||||
vm2 "^3.9.4"
|
||||
|
||||
"@cnakazawa/watch@^1.0.3":
|
||||
version "1.0.4"
|
||||
|
@ -4476,7 +4478,7 @@ ent@^2.2.0:
|
|||
resolved "https://registry.yarnpkg.com/ent/-/ent-2.2.0.tgz#e964219325a21d05f44466a2f686ed6ce5f5dd1d"
|
||||
integrity sha1-6WQhkyWiHQX0RGai9obtbOX13R0=
|
||||
|
||||
errno@~0.1.1:
|
||||
errno@~0.1.1, errno@~0.1.7:
|
||||
version "0.1.8"
|
||||
resolved "https://registry.yarnpkg.com/errno/-/errno-0.1.8.tgz#8bb3e9c7d463be4976ff888f76b4809ebc2e811f"
|
||||
integrity sha512-dJ6oBr5SQ1VSd9qkk7ByRgb/1SH4JZjCHSW/mr63/QcXO9zLVxvJ6Oy13nio03rxpSnVDDjFor75SjVeZWPW/A==
|
||||
|
@ -11785,6 +11787,11 @@ vm2@^3.9.3:
|
|||
resolved "https://registry.yarnpkg.com/vm2/-/vm2-3.9.4.tgz#2e118290fefe7bd8ea09ebe2f5faf53730dbddaa"
|
||||
integrity sha512-sOdharrJ7KEePIpHekiWaY1DwgueuiBeX/ZBJUPgETsVlJsXuEx0K0/naATq2haFvJrvZnRiORQRubR0b7Ye6g==
|
||||
|
||||
vm2@^3.9.4:
|
||||
version "3.9.5"
|
||||
resolved "https://registry.yarnpkg.com/vm2/-/vm2-3.9.5.tgz#5288044860b4bbace443101fcd3bddb2a0aa2496"
|
||||
integrity sha512-LuCAHZN75H9tdrAiLFf030oW7nJV5xwNMuk1ymOZwopmuK3d2H4L1Kv4+GFHgarKiLfXXLFU+7LDABHnwOkWng==
|
||||
|
||||
vuvuzela@1.0.3:
|
||||
version "1.0.3"
|
||||
resolved "https://registry.yarnpkg.com/vuvuzela/-/vuvuzela-1.0.3.tgz#3be145e58271c73ca55279dd851f12a682114b0b"
|
||||
|
@ -11920,6 +11927,13 @@ wordwrap@^1.0.0:
|
|||
resolved "https://registry.yarnpkg.com/wordwrap/-/wordwrap-1.0.0.tgz#27584810891456a4171c8d0226441ade90cbcaeb"
|
||||
integrity sha1-J1hIEIkUVqQXHI0CJkQa3pDLyus=
|
||||
|
||||
worker-farm@^1.7.0:
|
||||
version "1.7.0"
|
||||
resolved "https://registry.yarnpkg.com/worker-farm/-/worker-farm-1.7.0.tgz#26a94c5391bbca926152002f69b84a4bf772e5a8"
|
||||
integrity sha512-rvw3QTZc8lAxyVrqcSGVm5yP/IJ2UcB3U0graE3LCFoZ0Yn2x4EoVSqJKdB/T5M+FLcRPjz4TDacRf3OCfNUzw==
|
||||
dependencies:
|
||||
errno "~0.1.7"
|
||||
|
||||
wrap-ansi@^5.1.0:
|
||||
version "5.1.0"
|
||||
resolved "https://registry.yarnpkg.com/wrap-ansi/-/wrap-ansi-5.1.0.tgz#1fd1f67235d5b6d0fee781056001bfb694c03b09"
|
||||
|
|
Loading…
Reference in New Issue