Main body of updates to support disabling the cron, some cleanup of metadata at deployment to make sure they can be re-ran.
This commit is contained in:
parent
9b2c6d6382
commit
caaf5dc3c9
|
@ -5,7 +5,11 @@ import {
|
||||||
getDevelopmentAppID,
|
getDevelopmentAppID,
|
||||||
} from "@budibase/backend-core/db"
|
} from "@budibase/backend-core/db"
|
||||||
import { DocumentTypes, getAutomationParams } from "../../../db/utils"
|
import { DocumentTypes, getAutomationParams } from "../../../db/utils"
|
||||||
import { disableAllCrons, enableCronTrigger } from "../../../automations/utils"
|
import {
|
||||||
|
disableAllCrons,
|
||||||
|
enableCronTrigger,
|
||||||
|
clearMetadata,
|
||||||
|
} from "../../../automations/utils"
|
||||||
import { app as appCache } from "@budibase/backend-core/cache"
|
import { app as appCache } from "@budibase/backend-core/cache"
|
||||||
import {
|
import {
|
||||||
getAppId,
|
getAppId,
|
||||||
|
@ -80,6 +84,7 @@ async function initDeployedApp(prodAppId: any) {
|
||||||
})
|
})
|
||||||
)
|
)
|
||||||
).rows.map((row: any) => row.doc)
|
).rows.map((row: any) => row.doc)
|
||||||
|
await clearMetadata()
|
||||||
console.log("You have " + automations.length + " automations")
|
console.log("You have " + automations.length + " automations")
|
||||||
const promises = []
|
const promises = []
|
||||||
console.log("Disabling prod crons..")
|
console.log("Disabling prod crons..")
|
||||||
|
|
|
@ -6,8 +6,13 @@ import newid from "../db/newid"
|
||||||
import { updateEntityMetadata } from "../utilities"
|
import { updateEntityMetadata } from "../utilities"
|
||||||
import { MetadataTypes, WebhookType } from "../constants"
|
import { MetadataTypes, WebhookType } from "../constants"
|
||||||
import { getProdAppID, doWithDB } from "@budibase/backend-core/db"
|
import { getProdAppID, doWithDB } from "@budibase/backend-core/db"
|
||||||
|
import { getAutomationMetadataParams } from "../db/utils"
|
||||||
import { cloneDeep } from "lodash/fp"
|
import { cloneDeep } from "lodash/fp"
|
||||||
import { getAppDB, getAppId } from "@budibase/backend-core/context"
|
import {
|
||||||
|
getAppDB,
|
||||||
|
getAppId,
|
||||||
|
getProdAppDB,
|
||||||
|
} from "@budibase/backend-core/context"
|
||||||
import { tenancy } from "@budibase/backend-core"
|
import { tenancy } from "@budibase/backend-core"
|
||||||
import { quotas } from "@budibase/pro"
|
import { quotas } from "@budibase/pro"
|
||||||
import { Automation } from "@budibase/types"
|
import { Automation } from "@budibase/types"
|
||||||
|
@ -83,6 +88,26 @@ export async function disableAllCrons(appId: any) {
|
||||||
return Promise.all(promises)
|
return Promise.all(promises)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export async function disableCron(jobId: string, jobKey: string) {
|
||||||
|
await queue.removeRepeatableByKey(jobKey)
|
||||||
|
await queue.removeJobs(jobId)
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function clearMetadata() {
|
||||||
|
const db = getProdAppDB()
|
||||||
|
const automationMetadata = (
|
||||||
|
await db.allDocs(
|
||||||
|
getAutomationMetadataParams({
|
||||||
|
include_docs: true,
|
||||||
|
})
|
||||||
|
)
|
||||||
|
).rows.map((row: any) => row.doc)
|
||||||
|
for (let metadata of automationMetadata) {
|
||||||
|
metadata._deleted = true
|
||||||
|
}
|
||||||
|
await db.bulkDocs(automationMetadata)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This function handles checking of any cron jobs that need to be enabled/updated.
|
* This function handles checking of any cron jobs that need to be enabled/updated.
|
||||||
* @param {string} appId The ID of the app in which we are checking for webhooks
|
* @param {string} appId The ID of the app in which we are checking for webhooks
|
||||||
|
@ -214,3 +239,21 @@ export async function cleanupAutomations(appId: any) {
|
||||||
export function isRecurring(automation: Automation) {
|
export function isRecurring(automation: Automation) {
|
||||||
return automation.definition.trigger.stepId === definitions.CRON.stepId
|
return automation.definition.trigger.stepId === definitions.CRON.stepId
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export function isErrorInOutput(output: {
|
||||||
|
steps: { outputs?: { success: boolean } }[]
|
||||||
|
}) {
|
||||||
|
let first = true,
|
||||||
|
error = false
|
||||||
|
for (let step of output.steps) {
|
||||||
|
// skip the trigger, its always successful if automation ran
|
||||||
|
if (first) {
|
||||||
|
first = false
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if (!step.outputs?.success) {
|
||||||
|
error = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return error
|
||||||
|
}
|
||||||
|
|
|
@ -320,6 +320,13 @@ exports.generateAutomationMetadataID = automationId => {
|
||||||
return `${DocumentTypes.AUTOMATION_METADATA}${SEPARATOR}${automationId}`
|
return `${DocumentTypes.AUTOMATION_METADATA}${SEPARATOR}${automationId}`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieve all automation metadata in an app database.
|
||||||
|
*/
|
||||||
|
exports.getAutomationMetadataParams = (otherProps = {}) => {
|
||||||
|
return getDocParams(DocumentTypes.AUTOMATION_METADATA, null, otherProps)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Gets parameters for retrieving a query, this is a utility function for the getDocParams function.
|
* Gets parameters for retrieving a query, this is a utility function for the getDocParams function.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -32,6 +32,11 @@ export interface AutomationEvent {
|
||||||
automation: Automation
|
automation: Automation
|
||||||
event: any
|
event: any
|
||||||
}
|
}
|
||||||
|
opts?: {
|
||||||
|
repeat?: {
|
||||||
|
jobId: string
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface AutomationContext extends AutomationResults {
|
export interface AutomationContext extends AutomationResults {
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
import { threadSetup } from "./utils"
|
import { default as threadUtils } from "./utils"
|
||||||
threadSetup()
|
threadUtils.threadSetup()
|
||||||
import { isRecurring } from "../automations/utils"
|
import { isRecurring, disableCron, isErrorInOutput } from "../automations/utils"
|
||||||
import { default as actions } from "../automations/actions"
|
import { default as actions } from "../automations/actions"
|
||||||
import { default as automationUtils } from "../automations/automationUtils"
|
import { default as automationUtils } from "../automations/automationUtils"
|
||||||
import { default as AutomationEmitter } from "../events/AutomationEmitter"
|
import { default as AutomationEmitter } from "../events/AutomationEmitter"
|
||||||
import { generateAutomationMetadataID } from "../db/utils"
|
import { generateAutomationMetadataID, isProdAppID } from "../db/utils"
|
||||||
import { definitions as triggerDefs } from "../automations/triggerInfo"
|
import { definitions as triggerDefs } from "../automations/triggerInfo"
|
||||||
import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants"
|
import { AutomationErrors, MAX_AUTOMATION_RECURRING_ERRORS } from "../constants"
|
||||||
import { storeLog } from "../automations/logging"
|
import { storeLog } from "../automations/logging"
|
||||||
|
@ -18,8 +18,9 @@ import {
|
||||||
AutomationContext,
|
AutomationContext,
|
||||||
AutomationMetadata,
|
AutomationMetadata,
|
||||||
} from "../definitions/automations"
|
} from "../definitions/automations"
|
||||||
|
import { WorkerCallback } from "./definitions"
|
||||||
const { doInAppContext, getAppDB } = require("@budibase/backend-core/context")
|
const { doInAppContext, getAppDB } = require("@budibase/backend-core/context")
|
||||||
const { logAlertWithInfo } = require("@budibase/backend-core/logging")
|
const { logAlertWithInfo, logAlert } = require("@budibase/backend-core/logging")
|
||||||
const { processObject } = require("@budibase/string-templates")
|
const { processObject } = require("@budibase/string-templates")
|
||||||
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId
|
const FILTER_STEP_ID = actions.ACTION_DEFINITIONS.FILTER.stepId
|
||||||
const LOOP_STEP_ID = actions.ACTION_DEFINITIONS.LOOP.stepId
|
const LOOP_STEP_ID = actions.ACTION_DEFINITIONS.LOOP.stepId
|
||||||
|
@ -72,12 +73,19 @@ class Orchestrator {
|
||||||
_automation: Automation
|
_automation: Automation
|
||||||
_emitter: any
|
_emitter: any
|
||||||
_context: AutomationContext
|
_context: AutomationContext
|
||||||
|
_repeat?: { jobId: string; jobKey: string }
|
||||||
executionOutput: AutomationContext
|
executionOutput: AutomationContext
|
||||||
|
|
||||||
constructor(automation: Automation, triggerOutput: TriggerOutput) {
|
constructor(automation: Automation, triggerOutput: TriggerOutput, opts: any) {
|
||||||
const metadata = triggerOutput.metadata
|
const metadata = triggerOutput.metadata
|
||||||
this._chainCount = metadata ? metadata.automationChainCount : 0
|
this._chainCount = metadata ? metadata.automationChainCount : 0
|
||||||
this._appId = triggerOutput.appId as string
|
this._appId = triggerOutput.appId as string
|
||||||
|
if (opts?.repeat) {
|
||||||
|
this._repeat = {
|
||||||
|
jobId: opts.repeat.jobId,
|
||||||
|
jobKey: opts.repeat.key,
|
||||||
|
}
|
||||||
|
}
|
||||||
const triggerStepId = automation.definition.trigger.stepId
|
const triggerStepId = automation.definition.trigger.stepId
|
||||||
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
|
triggerOutput = this.cleanupTriggerOutputs(triggerStepId, triggerOutput)
|
||||||
// remove from context
|
// remove from context
|
||||||
|
@ -126,13 +134,16 @@ class Orchestrator {
|
||||||
}
|
}
|
||||||
|
|
||||||
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> {
|
async checkIfShouldStop(metadata: AutomationMetadata): Promise<boolean> {
|
||||||
if (!metadata.errorCount) {
|
if (!metadata.errorCount || !this._repeat) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
const automation = this._automation
|
const automation = this._automation
|
||||||
const trigger = automation.definition.trigger
|
const trigger = automation.definition.trigger
|
||||||
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
if (metadata.errorCount >= MAX_AUTOMATION_RECURRING_ERRORS) {
|
||||||
// TODO: need to disable the recurring here
|
logAlert(
|
||||||
|
`CRON disabled due to errors - ${this._appId}/${this._automation._id}`
|
||||||
|
)
|
||||||
|
await disableCron(this._repeat?.jobId, this._repeat?.jobKey)
|
||||||
this.updateExecutionOutput(trigger.id, trigger.stepId, {}, STOPPED_STATUS)
|
this.updateExecutionOutput(trigger.id, trigger.stepId, {}, STOPPED_STATUS)
|
||||||
await storeLog(automation, this.executionOutput)
|
await storeLog(automation, this.executionOutput)
|
||||||
return true
|
return true
|
||||||
|
@ -147,7 +158,7 @@ class Orchestrator {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
const count = metadata.errorCount
|
const count = metadata.errorCount
|
||||||
const isError = output.status === AutomationStatus.ERROR
|
const isError = isErrorInOutput(output)
|
||||||
// nothing to do in this scenario, escape
|
// nothing to do in this scenario, escape
|
||||||
if (!count && !isError) {
|
if (!count && !isError) {
|
||||||
return
|
return
|
||||||
|
@ -216,7 +227,7 @@ class Orchestrator {
|
||||||
let metadata
|
let metadata
|
||||||
|
|
||||||
// check if this is a recurring automation,
|
// check if this is a recurring automation,
|
||||||
if (isRecurring(automation)) {
|
if (isProdAppID(this._appId) && isRecurring(automation)) {
|
||||||
metadata = await this.getMetadata()
|
metadata = await this.getMetadata()
|
||||||
const shouldStop = await this.checkIfShouldStop(metadata)
|
const shouldStop = await this.checkIfShouldStop(metadata)
|
||||||
if (shouldStop) {
|
if (shouldStop) {
|
||||||
|
@ -411,22 +422,20 @@ class Orchestrator {
|
||||||
|
|
||||||
// store the logs for the automation run
|
// store the logs for the automation run
|
||||||
await storeLog(this._automation, this.executionOutput)
|
await storeLog(this._automation, this.executionOutput)
|
||||||
if (isRecurring(automation) && metadata) {
|
if (isProdAppID(this._appId) && isRecurring(automation) && metadata) {
|
||||||
await this.updateMetadata(metadata)
|
await this.updateMetadata(metadata)
|
||||||
}
|
}
|
||||||
return this.executionOutput
|
return this.executionOutput
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default (
|
export function execute(input: AutomationEvent, callback: WorkerCallback) {
|
||||||
input: AutomationEvent,
|
|
||||||
callback: (error: any, response?: any) => void
|
|
||||||
) => {
|
|
||||||
const appId = input.data.event.appId
|
const appId = input.data.event.appId
|
||||||
doInAppContext(appId, async () => {
|
doInAppContext(appId, async () => {
|
||||||
const automationOrchestrator = new Orchestrator(
|
const automationOrchestrator = new Orchestrator(
|
||||||
input.data.automation,
|
input.data.automation,
|
||||||
input.data.event
|
input.data.event,
|
||||||
|
input.opts
|
||||||
)
|
)
|
||||||
try {
|
try {
|
||||||
const response = await automationOrchestrator.execute()
|
const response = await automationOrchestrator.execute()
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
export type WorkerCallback = (error: any, response?: any) => void
|
||||||
|
|
||||||
|
export interface QueryEvent {
|
||||||
|
appId?: string
|
||||||
|
datasource: any
|
||||||
|
queryVerb: string
|
||||||
|
fields: { [key: string]: any }
|
||||||
|
parameters: { [key: string]: any }
|
||||||
|
pagination?: any
|
||||||
|
transformer: any
|
||||||
|
queryId: string
|
||||||
|
ctx?: any
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface QueryVariable {
|
||||||
|
queryId: string
|
||||||
|
name: string
|
||||||
|
}
|
|
@ -18,26 +18,23 @@ function typeToFile(type: any) {
|
||||||
default:
|
default:
|
||||||
throw "Unknown thread type"
|
throw "Unknown thread type"
|
||||||
}
|
}
|
||||||
|
// have to use require here, to make it work with worker-farm
|
||||||
return require.resolve(filename)
|
return require.resolve(filename)
|
||||||
}
|
}
|
||||||
|
|
||||||
export class Thread {
|
export class Thread {
|
||||||
type: any
|
type: any
|
||||||
count: any
|
count: any
|
||||||
disableThreading: any
|
|
||||||
workers: any
|
workers: any
|
||||||
timeoutMs: any
|
timeoutMs: any
|
||||||
|
disableThreading: boolean
|
||||||
|
|
||||||
static workerRefs: any[] = []
|
static workerRefs: any[] = []
|
||||||
|
|
||||||
constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) {
|
constructor(type: any, opts: any = { timeoutMs: null, count: 1 }) {
|
||||||
this.type = type
|
this.type = type
|
||||||
this.count = opts.count ? opts.count : 1
|
this.count = opts.count ? opts.count : 1
|
||||||
this.disableThreading =
|
this.disableThreading = this.shouldDisableThreading()
|
||||||
env.isTest() ||
|
|
||||||
env.DISABLE_THREADING ||
|
|
||||||
this.count === 0 ||
|
|
||||||
env.isInThread()
|
|
||||||
if (!this.disableThreading) {
|
if (!this.disableThreading) {
|
||||||
const workerOpts: any = {
|
const workerOpts: any = {
|
||||||
autoStart: true,
|
autoStart: true,
|
||||||
|
@ -47,33 +44,44 @@ export class Thread {
|
||||||
this.timeoutMs = opts.timeoutMs
|
this.timeoutMs = opts.timeoutMs
|
||||||
workerOpts.maxCallTime = opts.timeoutMs
|
workerOpts.maxCallTime = opts.timeoutMs
|
||||||
}
|
}
|
||||||
this.workers = workerFarm(workerOpts, typeToFile(type))
|
this.workers = workerFarm(workerOpts, typeToFile(type), ["execute"])
|
||||||
Thread.workerRefs.push(this.workers)
|
Thread.workerRefs.push(this.workers)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shouldDisableThreading(): boolean {
|
||||||
|
return !!(
|
||||||
|
env.isTest() ||
|
||||||
|
env.DISABLE_THREADING ||
|
||||||
|
this.count === 0 ||
|
||||||
|
env.isInThread()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
run(data: any) {
|
run(data: any) {
|
||||||
|
const timeout = this.timeoutMs
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
let fncToCall
|
function fire(worker: any) {
|
||||||
|
worker.execute(data, (err: any, response: any) => {
|
||||||
|
if (err && err.type === "TimeoutError") {
|
||||||
|
reject(
|
||||||
|
new Error(`Query response time exceeded ${timeout}ms timeout.`)
|
||||||
|
)
|
||||||
|
} else if (err) {
|
||||||
|
reject(err)
|
||||||
|
} else {
|
||||||
|
resolve(response)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
// if in test then don't use threading
|
// if in test then don't use threading
|
||||||
if (this.disableThreading) {
|
if (this.disableThreading) {
|
||||||
fncToCall = require(typeToFile(this.type))
|
import(typeToFile(this.type)).then((thread: any) => {
|
||||||
|
fire(thread)
|
||||||
|
})
|
||||||
} else {
|
} else {
|
||||||
fncToCall = this.workers
|
fire(this.workers)
|
||||||
}
|
}
|
||||||
fncToCall(data, (err: any, response: any) => {
|
|
||||||
if (err && err.type === "TimeoutError") {
|
|
||||||
reject(
|
|
||||||
new Error(
|
|
||||||
`Query response time exceeded ${this.timeoutMs}ms timeout.`
|
|
||||||
)
|
|
||||||
)
|
|
||||||
} else if (err) {
|
|
||||||
reject(err)
|
|
||||||
} else {
|
|
||||||
resolve(response)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
const threadUtils = require("./utils")
|
import { default as threadUtils } from "./utils"
|
||||||
threadUtils.threadSetup()
|
threadUtils.threadSetup()
|
||||||
|
import { WorkerCallback, QueryEvent, QueryVariable } from "./definitions"
|
||||||
const ScriptRunner = require("../utilities/scriptRunner")
|
const ScriptRunner = require("../utilities/scriptRunner")
|
||||||
const { integrations } = require("../integrations")
|
const { integrations } = require("../integrations")
|
||||||
const { processStringSync } = require("@budibase/string-templates")
|
const { processStringSync } = require("@budibase/string-templates")
|
||||||
|
@ -19,7 +20,22 @@ const {
|
||||||
} = require("../integrations/queries/sql")
|
} = require("../integrations/queries/sql")
|
||||||
|
|
||||||
class QueryRunner {
|
class QueryRunner {
|
||||||
constructor(input, flags = { noRecursiveQuery: false }) {
|
datasource: any
|
||||||
|
queryVerb: string
|
||||||
|
queryId: string
|
||||||
|
fields: any
|
||||||
|
parameters: any
|
||||||
|
pagination: any
|
||||||
|
transformer: any
|
||||||
|
cachedVariables: any[]
|
||||||
|
ctx: any
|
||||||
|
queryResponse: any
|
||||||
|
noRecursiveQuery: boolean
|
||||||
|
hasRerun: boolean
|
||||||
|
hasRefreshedOAuth: boolean
|
||||||
|
hasDynamicVariables: boolean
|
||||||
|
|
||||||
|
constructor(input: QueryEvent, flags = { noRecursiveQuery: false }) {
|
||||||
this.datasource = input.datasource
|
this.datasource = input.datasource
|
||||||
this.queryVerb = input.queryVerb
|
this.queryVerb = input.queryVerb
|
||||||
this.fields = input.fields
|
this.fields = input.fields
|
||||||
|
@ -37,9 +53,10 @@ class QueryRunner {
|
||||||
this.queryResponse = {}
|
this.queryResponse = {}
|
||||||
this.hasRerun = false
|
this.hasRerun = false
|
||||||
this.hasRefreshedOAuth = false
|
this.hasRefreshedOAuth = false
|
||||||
|
this.hasDynamicVariables = false
|
||||||
}
|
}
|
||||||
|
|
||||||
async execute() {
|
async execute(): Promise<any> {
|
||||||
let { datasource, fields, queryVerb, transformer } = this
|
let { datasource, fields, queryVerb, transformer } = this
|
||||||
|
|
||||||
let datasourceClone = cloneDeep(datasource)
|
let datasourceClone = cloneDeep(datasource)
|
||||||
|
@ -52,7 +69,7 @@ class QueryRunner {
|
||||||
|
|
||||||
if (datasourceClone.config.authConfigs) {
|
if (datasourceClone.config.authConfigs) {
|
||||||
datasourceClone.config.authConfigs =
|
datasourceClone.config.authConfigs =
|
||||||
datasourceClone.config.authConfigs.map(config => {
|
datasourceClone.config.authConfigs.map((config: any) => {
|
||||||
return enrichQueryFields(config, this.ctx)
|
return enrichQueryFields(config, this.ctx)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -138,8 +155,8 @@ class QueryRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
// map into JSON if just raw primitive here
|
// map into JSON if just raw primitive here
|
||||||
if (rows.find(row => typeof row !== "object")) {
|
if (rows.find((row: any) => typeof row !== "object")) {
|
||||||
rows = rows.map(value => ({ value }))
|
rows = rows.map((value: any) => ({ value }))
|
||||||
}
|
}
|
||||||
|
|
||||||
// get all the potential fields in the schema
|
// get all the potential fields in the schema
|
||||||
|
@ -152,7 +169,7 @@ class QueryRunner {
|
||||||
return { rows, keys, info, extra, pagination }
|
return { rows, keys, info, extra, pagination }
|
||||||
}
|
}
|
||||||
|
|
||||||
async runAnotherQuery(queryId, parameters) {
|
async runAnotherQuery(queryId: string, parameters: any) {
|
||||||
const db = getAppDB()
|
const db = getAppDB()
|
||||||
const query = await db.get(queryId)
|
const query = await db.get(queryId)
|
||||||
const datasource = await db.get(query.datasourceId)
|
const datasource = await db.get(query.datasourceId)
|
||||||
|
@ -163,12 +180,13 @@ class QueryRunner {
|
||||||
fields: query.fields,
|
fields: query.fields,
|
||||||
parameters,
|
parameters,
|
||||||
transformer: query.transformer,
|
transformer: query.transformer,
|
||||||
|
queryId,
|
||||||
},
|
},
|
||||||
{ noRecursiveQuery: true }
|
{ noRecursiveQuery: true }
|
||||||
).execute()
|
).execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
async refreshOAuth2(ctx) {
|
async refreshOAuth2(ctx: any) {
|
||||||
const { oauth2, providerType, _id } = ctx.user
|
const { oauth2, providerType, _id } = ctx.user
|
||||||
const { configId } = ctx.auth
|
const { configId } = ctx.auth
|
||||||
|
|
||||||
|
@ -200,7 +218,7 @@ class QueryRunner {
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
async getDynamicVariable(variable) {
|
async getDynamicVariable(variable: QueryVariable) {
|
||||||
let { parameters } = this
|
let { parameters } = this
|
||||||
const queryId = variable.queryId,
|
const queryId = variable.queryId,
|
||||||
name = variable.name
|
name = variable.name
|
||||||
|
@ -233,7 +251,7 @@ class QueryRunner {
|
||||||
if (!this.noRecursiveQuery) {
|
if (!this.noRecursiveQuery) {
|
||||||
// need to see if this uses any variables
|
// need to see if this uses any variables
|
||||||
const stringFields = JSON.stringify(fields)
|
const stringFields = JSON.stringify(fields)
|
||||||
const foundVars = dynamicVars.filter(variable => {
|
const foundVars = dynamicVars.filter((variable: QueryVariable) => {
|
||||||
// don't allow a query to use its own dynamic variable (loop)
|
// don't allow a query to use its own dynamic variable (loop)
|
||||||
if (variable.queryId === this.queryId) {
|
if (variable.queryId === this.queryId) {
|
||||||
return false
|
return false
|
||||||
|
@ -242,7 +260,9 @@ class QueryRunner {
|
||||||
const regex = new RegExp(`{{[ ]*${variable.name}[ ]*}}`)
|
const regex = new RegExp(`{{[ ]*${variable.name}[ ]*}}`)
|
||||||
return regex.test(stringFields)
|
return regex.test(stringFields)
|
||||||
})
|
})
|
||||||
const dynamics = foundVars.map(dynVar => this.getDynamicVariable(dynVar))
|
const dynamics = foundVars.map((dynVar: QueryVariable) =>
|
||||||
|
this.getDynamicVariable(dynVar)
|
||||||
|
)
|
||||||
const responses = await Promise.all(dynamics)
|
const responses = await Promise.all(dynamics)
|
||||||
for (let i = 0; i < foundVars.length; i++) {
|
for (let i = 0; i < foundVars.length; i++) {
|
||||||
const variable = foundVars[i]
|
const variable = foundVars[i]
|
||||||
|
@ -264,7 +284,7 @@ class QueryRunner {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = (input, callback) => {
|
export function execute(input: QueryEvent, callback: WorkerCallback) {
|
||||||
doInAppContext(input.appId, async () => {
|
doInAppContext(input.appId, async () => {
|
||||||
const Runner = new QueryRunner(input)
|
const Runner = new QueryRunner(input)
|
||||||
try {
|
try {
|
|
@ -1,10 +1,11 @@
|
||||||
|
import { QueryVariable } from "./definitions"
|
||||||
const env = require("../environment")
|
const env = require("../environment")
|
||||||
const db = require("../db")
|
const db = require("../db")
|
||||||
const redis = require("@budibase/backend-core/redis")
|
const redis = require("@budibase/backend-core/redis")
|
||||||
const { SEPARATOR } = require("@budibase/backend-core/db")
|
const { SEPARATOR } = require("@budibase/backend-core/db")
|
||||||
|
|
||||||
const VARIABLE_TTL_SECONDS = 3600
|
const VARIABLE_TTL_SECONDS = 3600
|
||||||
let client
|
let client: any
|
||||||
|
|
||||||
async function getClient() {
|
async function getClient() {
|
||||||
if (!client) {
|
if (!client) {
|
||||||
|
@ -14,10 +15,16 @@ async function getClient() {
|
||||||
}
|
}
|
||||||
|
|
||||||
process.on("exit", async () => {
|
process.on("exit", async () => {
|
||||||
if (client) await client.finish()
|
if (client) {
|
||||||
|
await client.finish()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
exports.threadSetup = () => {
|
function makeVariableKey(queryId: string, variable: string) {
|
||||||
|
return `${queryId}${SEPARATOR}${variable}`
|
||||||
|
}
|
||||||
|
|
||||||
|
export function threadSetup() {
|
||||||
// don't run this if not threading
|
// don't run this if not threading
|
||||||
if (env.isTest() || env.DISABLE_THREADING) {
|
if (env.isTest() || env.DISABLE_THREADING) {
|
||||||
return
|
return
|
||||||
|
@ -27,16 +34,15 @@ exports.threadSetup = () => {
|
||||||
db.init()
|
db.init()
|
||||||
}
|
}
|
||||||
|
|
||||||
function makeVariableKey(queryId, variable) {
|
export async function checkCacheForDynamicVariable(
|
||||||
return `${queryId}${SEPARATOR}${variable}`
|
queryId: string,
|
||||||
}
|
variable: string
|
||||||
|
) {
|
||||||
exports.checkCacheForDynamicVariable = async (queryId, variable) => {
|
|
||||||
const cache = await getClient()
|
const cache = await getClient()
|
||||||
return cache.get(makeVariableKey(queryId, variable))
|
return cache.get(makeVariableKey(queryId, variable))
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.invalidateDynamicVariables = async cachedVars => {
|
export async function invalidateDynamicVariables(cachedVars: QueryVariable[]) {
|
||||||
const cache = await getClient()
|
const cache = await getClient()
|
||||||
let promises = []
|
let promises = []
|
||||||
for (let variable of cachedVars) {
|
for (let variable of cachedVars) {
|
||||||
|
@ -47,7 +53,11 @@ exports.invalidateDynamicVariables = async cachedVars => {
|
||||||
await Promise.all(promises)
|
await Promise.all(promises)
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.storeDynamicVariable = async (queryId, variable, value) => {
|
export async function storeDynamicVariable(
|
||||||
|
queryId: string,
|
||||||
|
variable: string,
|
||||||
|
value: any
|
||||||
|
) {
|
||||||
const cache = await getClient()
|
const cache = await getClient()
|
||||||
await cache.store(
|
await cache.store(
|
||||||
makeVariableKey(queryId, variable),
|
makeVariableKey(queryId, variable),
|
||||||
|
@ -56,7 +66,7 @@ exports.storeDynamicVariable = async (queryId, variable, value) => {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.formatResponse = resp => {
|
export function formatResponse(resp: any) {
|
||||||
if (typeof resp === "string") {
|
if (typeof resp === "string") {
|
||||||
try {
|
try {
|
||||||
resp = JSON.parse(resp)
|
resp = JSON.parse(resp)
|
||||||
|
@ -67,7 +77,7 @@ exports.formatResponse = resp => {
|
||||||
return resp
|
return resp
|
||||||
}
|
}
|
||||||
|
|
||||||
exports.hasExtraData = response => {
|
export function hasExtraData(response: any) {
|
||||||
return (
|
return (
|
||||||
typeof response === "object" &&
|
typeof response === "object" &&
|
||||||
!Array.isArray(response) &&
|
!Array.isArray(response) &&
|
||||||
|
@ -76,3 +86,12 @@ exports.hasExtraData = response => {
|
||||||
response.info != null
|
response.info != null
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export default {
|
||||||
|
hasExtraData,
|
||||||
|
formatResponse,
|
||||||
|
storeDynamicVariable,
|
||||||
|
invalidateDynamicVariables,
|
||||||
|
checkCacheForDynamicVariable,
|
||||||
|
threadSetup,
|
||||||
|
}
|
|
@ -109,6 +109,10 @@ class InMemoryQueue {
|
||||||
async clean() {
|
async clean() {
|
||||||
return []
|
return []
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async getJob() {
|
||||||
|
return {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = InMemoryQueue
|
module.exports = InMemoryQueue
|
||||||
|
|
|
@ -1094,12 +1094,12 @@
|
||||||
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
|
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
|
||||||
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
|
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
|
||||||
|
|
||||||
"@budibase/backend-core@1.1.18-alpha.2":
|
"@budibase/backend-core@1.1.22-alpha.0":
|
||||||
version "1.1.18-alpha.2"
|
version "1.1.22-alpha.0"
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/backend-core/-/backend-core-1.1.18-alpha.2.tgz#c3fe04afa11fcbba288c7ba2113469d203213a23"
|
resolved "https://registry.yarnpkg.com/@budibase/backend-core/-/backend-core-1.1.22-alpha.0.tgz#2a090a723d35ce4a2e377ac0927fe127213bcbb3"
|
||||||
integrity sha512-T6Cv2k/F59eYCHLOr76gDOPcRAhGIEemoqmKt22JwctrArdFQkICdcT0LDFt375oR+svuK6cGY9bWZsC2J9dqg==
|
integrity sha512-77gxcPrjejdqaMaMkbrCS0glYA1jdGo74NpCxdadWx+suU4SMTmOt2jgnEZg20aeKcky2xTpVbjDxIq+Fb2J+g==
|
||||||
dependencies:
|
dependencies:
|
||||||
"@budibase/types" "^1.1.18-alpha.2"
|
"@budibase/types" "^1.1.22-alpha.0"
|
||||||
"@techpass/passport-openidconnect" "0.3.2"
|
"@techpass/passport-openidconnect" "0.3.2"
|
||||||
aws-sdk "2.1030.0"
|
aws-sdk "2.1030.0"
|
||||||
bcrypt "5.0.1"
|
bcrypt "5.0.1"
|
||||||
|
@ -1177,13 +1177,13 @@
|
||||||
svelte-flatpickr "^3.2.3"
|
svelte-flatpickr "^3.2.3"
|
||||||
svelte-portal "^1.0.0"
|
svelte-portal "^1.0.0"
|
||||||
|
|
||||||
"@budibase/pro@1.1.18-alpha.2":
|
"@budibase/pro@1.1.22-alpha.0":
|
||||||
version "1.1.18-alpha.2"
|
version "1.1.22-alpha.0"
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-1.1.18-alpha.2.tgz#d399fbc11d6d7ebee95e05d68c617581189830ed"
|
resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-1.1.22-alpha.0.tgz#5718188fbc76ce3c3cf3c633c99c6e59f5846310"
|
||||||
integrity sha512-z7kJ7PQehPbMjaUJk5xIvlGql0UxiYH1M/wrDhucpQa4FXgnjrY9eL17c3BiHNQe2+E2IbtL+pgCnXNgHxGTDQ==
|
integrity sha512-QL5bhT/BJnoKVV5XH5+s3jSCBzV/JGOy8YQObEjZgCrtTWUjdvMrm5pFinflFGriCfoArmvR3Q51FBpNJQfaag==
|
||||||
dependencies:
|
dependencies:
|
||||||
"@budibase/backend-core" "1.1.18-alpha.2"
|
"@budibase/backend-core" "1.1.22-alpha.0"
|
||||||
"@budibase/types" "1.1.18-alpha.2"
|
"@budibase/types" "1.1.22-alpha.0"
|
||||||
node-fetch "^2.6.1"
|
node-fetch "^2.6.1"
|
||||||
|
|
||||||
"@budibase/standard-components@^0.9.139":
|
"@budibase/standard-components@^0.9.139":
|
||||||
|
@ -1204,15 +1204,10 @@
|
||||||
svelte-apexcharts "^1.0.2"
|
svelte-apexcharts "^1.0.2"
|
||||||
svelte-flatpickr "^3.1.0"
|
svelte-flatpickr "^3.1.0"
|
||||||
|
|
||||||
"@budibase/types@1.1.18-alpha.2":
|
"@budibase/types@1.1.22-alpha.0", "@budibase/types@^1.1.22-alpha.0":
|
||||||
version "1.1.18-alpha.2"
|
version "1.1.22-alpha.0"
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/types/-/types-1.1.18-alpha.2.tgz#55c585c0efa983a006b2b981dc5a7a6018a768fd"
|
resolved "https://registry.yarnpkg.com/@budibase/types/-/types-1.1.22-alpha.0.tgz#85fb7f37773c710e8232c95104095c26a8dd22ca"
|
||||||
integrity sha512-1PfwibyzpuCjeSzhbjb+KgO+pIzkntbhxYKUZY5u7HxrYxvfzwAiemD5ywmGY2/hgJwShLsNpZOFMucSkfTXhg==
|
integrity sha512-1gRwAtjEl7Ug1jrYwD9Iudbfgs37nndEBEB6yVdNPKA5SpjG+Fwx30zp6R961zlx1vsSu4iMdwM8IbMsCM8p1g==
|
||||||
|
|
||||||
"@budibase/types@^1.1.18-alpha.2":
|
|
||||||
version "1.1.18"
|
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/types/-/types-1.1.18.tgz#06b4fc5029ea9a73f086d2d0fc3ed27543175797"
|
|
||||||
integrity sha512-10j8zbPsdE6k/DIJyDDNGR1yJ8WL+bSvId+Ev66hleSr2kc1ntP0hji+uFNHH/k709w8rzA0D4Ugx5bO9zA5Qw==
|
|
||||||
|
|
||||||
"@bull-board/api@3.7.0":
|
"@bull-board/api@3.7.0":
|
||||||
version "3.7.0"
|
version "3.7.0"
|
||||||
|
|
|
@ -291,12 +291,12 @@
|
||||||
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
|
resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39"
|
||||||
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
|
integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw==
|
||||||
|
|
||||||
"@budibase/backend-core@1.1.18-alpha.2":
|
"@budibase/backend-core@1.1.22-alpha.0":
|
||||||
version "1.1.18-alpha.2"
|
version "1.1.22-alpha.0"
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/backend-core/-/backend-core-1.1.18-alpha.2.tgz#c3fe04afa11fcbba288c7ba2113469d203213a23"
|
resolved "https://registry.yarnpkg.com/@budibase/backend-core/-/backend-core-1.1.22-alpha.0.tgz#2a090a723d35ce4a2e377ac0927fe127213bcbb3"
|
||||||
integrity sha512-T6Cv2k/F59eYCHLOr76gDOPcRAhGIEemoqmKt22JwctrArdFQkICdcT0LDFt375oR+svuK6cGY9bWZsC2J9dqg==
|
integrity sha512-77gxcPrjejdqaMaMkbrCS0glYA1jdGo74NpCxdadWx+suU4SMTmOt2jgnEZg20aeKcky2xTpVbjDxIq+Fb2J+g==
|
||||||
dependencies:
|
dependencies:
|
||||||
"@budibase/types" "^1.1.18-alpha.2"
|
"@budibase/types" "^1.1.22-alpha.0"
|
||||||
"@techpass/passport-openidconnect" "0.3.2"
|
"@techpass/passport-openidconnect" "0.3.2"
|
||||||
aws-sdk "2.1030.0"
|
aws-sdk "2.1030.0"
|
||||||
bcrypt "5.0.1"
|
bcrypt "5.0.1"
|
||||||
|
@ -324,24 +324,19 @@
|
||||||
uuid "8.3.2"
|
uuid "8.3.2"
|
||||||
zlib "1.0.5"
|
zlib "1.0.5"
|
||||||
|
|
||||||
"@budibase/pro@1.1.18-alpha.2":
|
"@budibase/pro@1.1.22-alpha.0":
|
||||||
version "1.1.18-alpha.2"
|
version "1.1.22-alpha.0"
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-1.1.18-alpha.2.tgz#d399fbc11d6d7ebee95e05d68c617581189830ed"
|
resolved "https://registry.yarnpkg.com/@budibase/pro/-/pro-1.1.22-alpha.0.tgz#5718188fbc76ce3c3cf3c633c99c6e59f5846310"
|
||||||
integrity sha512-z7kJ7PQehPbMjaUJk5xIvlGql0UxiYH1M/wrDhucpQa4FXgnjrY9eL17c3BiHNQe2+E2IbtL+pgCnXNgHxGTDQ==
|
integrity sha512-QL5bhT/BJnoKVV5XH5+s3jSCBzV/JGOy8YQObEjZgCrtTWUjdvMrm5pFinflFGriCfoArmvR3Q51FBpNJQfaag==
|
||||||
dependencies:
|
dependencies:
|
||||||
"@budibase/backend-core" "1.1.18-alpha.2"
|
"@budibase/backend-core" "1.1.22-alpha.0"
|
||||||
"@budibase/types" "1.1.18-alpha.2"
|
"@budibase/types" "1.1.22-alpha.0"
|
||||||
node-fetch "^2.6.1"
|
node-fetch "^2.6.1"
|
||||||
|
|
||||||
"@budibase/types@1.1.18-alpha.2":
|
"@budibase/types@1.1.22-alpha.0", "@budibase/types@^1.1.22-alpha.0":
|
||||||
version "1.1.18-alpha.2"
|
version "1.1.22-alpha.0"
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/types/-/types-1.1.18-alpha.2.tgz#55c585c0efa983a006b2b981dc5a7a6018a768fd"
|
resolved "https://registry.yarnpkg.com/@budibase/types/-/types-1.1.22-alpha.0.tgz#85fb7f37773c710e8232c95104095c26a8dd22ca"
|
||||||
integrity sha512-1PfwibyzpuCjeSzhbjb+KgO+pIzkntbhxYKUZY5u7HxrYxvfzwAiemD5ywmGY2/hgJwShLsNpZOFMucSkfTXhg==
|
integrity sha512-1gRwAtjEl7Ug1jrYwD9Iudbfgs37nndEBEB6yVdNPKA5SpjG+Fwx30zp6R961zlx1vsSu4iMdwM8IbMsCM8p1g==
|
||||||
|
|
||||||
"@budibase/types@^1.1.18-alpha.2":
|
|
||||||
version "1.1.18"
|
|
||||||
resolved "https://registry.yarnpkg.com/@budibase/types/-/types-1.1.18.tgz#06b4fc5029ea9a73f086d2d0fc3ed27543175797"
|
|
||||||
integrity sha512-10j8zbPsdE6k/DIJyDDNGR1yJ8WL+bSvId+Ev66hleSr2kc1ntP0hji+uFNHH/k709w8rzA0D4Ugx5bO9zA5Qw==
|
|
||||||
|
|
||||||
"@cspotcode/source-map-consumer@0.8.0":
|
"@cspotcode/source-map-consumer@0.8.0":
|
||||||
version "0.8.0"
|
version "0.8.0"
|
||||||
|
|
Loading…
Reference in New Issue