Fix served events
This commit is contained in:
parent
56bcd46886
commit
cf37563c89
|
@ -0,0 +1,27 @@
|
|||
import { EventEmitter } from "events"
|
||||
import * as context from "../../context"
|
||||
import { Identity, Event } from "@budibase/types"
|
||||
|
||||
export interface EmittedEvent {
|
||||
tenantId: string
|
||||
identity: Identity
|
||||
appId: string | undefined
|
||||
properties: any
|
||||
}
|
||||
|
||||
class BBEventEmitter extends EventEmitter {
|
||||
emitEvent(event: Event, properties: any, identity: Identity) {
|
||||
const tenantId = context.getTenantId()
|
||||
const appId = context.getAppId()
|
||||
|
||||
const emittedEvent: EmittedEvent = {
|
||||
tenantId,
|
||||
identity,
|
||||
appId,
|
||||
properties,
|
||||
}
|
||||
this.emit(event, emittedEvent)
|
||||
}
|
||||
}
|
||||
|
||||
export const emitter = new BBEventEmitter()
|
|
@ -0,0 +1 @@
|
|||
export * from "./BBEventEmitter"
|
|
@ -2,6 +2,41 @@ import { Event } from "@budibase/types"
|
|||
import { processors } from "./processors"
|
||||
import * as identification from "./identification"
|
||||
import * as backfill from "./backfill"
|
||||
import { emitter, EmittedEvent } from "./async"
|
||||
import * as context from "../context"
|
||||
import * as logging from "../logging"
|
||||
|
||||
const USE_ASYNC: any[] = [
|
||||
Event.SERVED_BUILDER,
|
||||
Event.SERVED_APP,
|
||||
Event.SERVED_APP_PREVIEW,
|
||||
]
|
||||
|
||||
for (let event of USE_ASYNC) {
|
||||
emitter.on(event, async (props: EmittedEvent) => {
|
||||
try {
|
||||
await context.doInTenant(props.tenantId, async () => {
|
||||
if (props.appId) {
|
||||
await context.doInAppContext(props.appId, async () => {
|
||||
await processors.processEvent(
|
||||
event as Event,
|
||||
props.identity,
|
||||
props.properties
|
||||
)
|
||||
})
|
||||
} else {
|
||||
await processors.processEvent(
|
||||
event as Event,
|
||||
props.identity,
|
||||
props.properties
|
||||
)
|
||||
}
|
||||
})
|
||||
} catch (e) {
|
||||
logging.logAlert(`Unable to process async event ${event}`, e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const publishEvent = async (
|
||||
event: Event,
|
||||
|
@ -11,6 +46,11 @@ export const publishEvent = async (
|
|||
// in future this should use async events via a distributed queue.
|
||||
const identity = await identification.getCurrentIdentity()
|
||||
|
||||
if (USE_ASYNC.includes(event)) {
|
||||
emitter.emitEvent(event, properties, identity)
|
||||
return
|
||||
}
|
||||
|
||||
const backfilling = await backfill.isBackfillingEvent(event)
|
||||
// no backfill - send the event and exit
|
||||
if (!backfilling) {
|
||||
|
|
|
@ -32,7 +32,7 @@ export default class AnalyticsProcessor implements EventProcessor {
|
|||
return
|
||||
}
|
||||
if (this.posthog) {
|
||||
this.posthog.processEvent(event, identity, properties, timestamp)
|
||||
await this.posthog.processEvent(event, identity, properties, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -45,14 +45,14 @@ export default class AnalyticsProcessor implements EventProcessor {
|
|||
return
|
||||
}
|
||||
if (this.posthog) {
|
||||
this.posthog.identify(identity, timestamp)
|
||||
await this.posthog.identify(identity, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
async identifyGroup(group: Group, timestamp?: string | number) {
|
||||
// Group indentifications (tenant and installation) always on
|
||||
if (this.posthog) {
|
||||
this.posthog.identifyGroup(group, timestamp)
|
||||
await this.posthog.identifyGroup(group, timestamp)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -45,7 +45,7 @@ export const limited = async (event: Event): Promise<boolean> => {
|
|||
return false
|
||||
}
|
||||
|
||||
const cachedEvent = (await readEvent(event)) as EventProperties
|
||||
const cachedEvent = await readEvent(event)
|
||||
if (cachedEvent) {
|
||||
const timestamp = new Date(cachedEvent.timestamp)
|
||||
const limit = RATE_LIMITS[event]
|
||||
|
@ -76,14 +76,17 @@ export const limited = async (event: Event): Promise<boolean> => {
|
|||
const eventKey = (event: RateLimitedEvent) => {
|
||||
let key = `${CacheKeys.EVENTS_RATE_LIMIT}:${event}`
|
||||
if (isPerApp(event)) {
|
||||
key = key + context.getAppId()
|
||||
key = key + ":" + context.getAppId()
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
const readEvent = async (event: RateLimitedEvent) => {
|
||||
const readEvent = async (
|
||||
event: RateLimitedEvent
|
||||
): Promise<EventProperties | undefined> => {
|
||||
const key = eventKey(event)
|
||||
return cache.get(key)
|
||||
const result = await cache.get(key)
|
||||
return result as EventProperties
|
||||
}
|
||||
|
||||
const recordEvent = async (
|
||||
|
|
|
@ -18,14 +18,14 @@ const { DocumentTypes, isDevAppID } = require("../../../db/utils")
|
|||
const { getAppDB, getAppId } = require("@budibase/backend-core/context")
|
||||
const { setCookie, clearCookie } = require("@budibase/backend-core/utils")
|
||||
const AWS = require("aws-sdk")
|
||||
const { events } = require("@budibase/backend-core")
|
||||
import { events } from "@budibase/backend-core"
|
||||
|
||||
const fs = require("fs")
|
||||
const {
|
||||
downloadTarballDirect,
|
||||
} = require("../../../utilities/fileSystem/utilities")
|
||||
|
||||
async function prepareUpload({ s3Key, bucket, metadata, file }) {
|
||||
async function prepareUpload({ s3Key, bucket, metadata, file }: any) {
|
||||
const response = await upload({
|
||||
bucket,
|
||||
metadata,
|
||||
|
@ -44,7 +44,7 @@ async function prepareUpload({ s3Key, bucket, metadata, file }) {
|
|||
}
|
||||
}
|
||||
|
||||
exports.toggleBetaUiFeature = async function (ctx) {
|
||||
export const toggleBetaUiFeature = async function (ctx: any) {
|
||||
const cookieName = `beta:${ctx.params.feature}`
|
||||
|
||||
if (ctx.cookies.get(cookieName)) {
|
||||
|
@ -72,21 +72,21 @@ exports.toggleBetaUiFeature = async function (ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
exports.serveBuilder = async function (ctx) {
|
||||
export const serveBuilder = async function (ctx: any) {
|
||||
const builderPath = resolve(TOP_LEVEL_PATH, "builder")
|
||||
await send(ctx, ctx.file, { root: builderPath })
|
||||
if (!ctx.file.includes("assets/")) {
|
||||
if (ctx.file === "index.html") {
|
||||
await events.serve.servedBuilder()
|
||||
}
|
||||
}
|
||||
|
||||
exports.uploadFile = async function (ctx) {
|
||||
export const uploadFile = async function (ctx: any) {
|
||||
let files =
|
||||
ctx.request.files.file.length > 1
|
||||
? Array.from(ctx.request.files.file)
|
||||
: [ctx.request.files.file]
|
||||
|
||||
const uploads = files.map(async file => {
|
||||
const uploads = files.map(async (file: any) => {
|
||||
const fileExtension = [...file.name.split(".")].pop()
|
||||
// filenames converted to UUIDs so they are unique
|
||||
const processedFileName = `${uuid.v4()}.${fileExtension}`
|
||||
|
@ -101,7 +101,7 @@ exports.uploadFile = async function (ctx) {
|
|||
ctx.body = await Promise.all(uploads)
|
||||
}
|
||||
|
||||
exports.serveApp = async function (ctx) {
|
||||
export const serveApp = async function (ctx: any) {
|
||||
const db = getAppDB({ skip_setup: true })
|
||||
const appInfo = await db.get(DocumentTypes.APP_METADATA)
|
||||
let appId = getAppId()
|
||||
|
@ -134,13 +134,13 @@ exports.serveApp = async function (ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
exports.serveClientLibrary = async function (ctx) {
|
||||
export const serveClientLibrary = async function (ctx: any) {
|
||||
return send(ctx, "budibase-client.js", {
|
||||
root: join(NODE_MODULES_PATH, "@budibase", "client", "dist"),
|
||||
})
|
||||
}
|
||||
|
||||
exports.getSignedUploadURL = async function (ctx) {
|
||||
export const getSignedUploadURL = async function (ctx: any) {
|
||||
const database = getAppDB()
|
||||
|
||||
// Ensure datasource is valid
|
Loading…
Reference in New Issue