Merge pull request #7154 from Budibase/rate-limit-served-events
Rate limit served:X events
This commit is contained in:
commit
3a8c85e9ba
|
@ -9,6 +9,7 @@ exports.CacheKeys = {
|
|||
UNIQUE_TENANT_ID: "uniqueTenantId",
|
||||
EVENTS: "events",
|
||||
BACKFILL_METADATA: "backfillMetadata",
|
||||
EVENTS_RATE_LIMIT: "eventsRateLimit",
|
||||
}
|
||||
|
||||
exports.TTL = {
|
||||
|
|
|
@ -2,7 +2,7 @@ import { Event, Identity, Group, IdentityType } from "@budibase/types"
|
|||
import { EventProcessor } from "./types"
|
||||
import env from "../../environment"
|
||||
import * as analytics from "../analytics"
|
||||
import PosthogProcessor from "./PosthogProcessor"
|
||||
import PosthogProcessor from "./posthog"
|
||||
|
||||
/**
|
||||
* Events that are always captured.
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
import PostHog from "posthog-node"
|
||||
import { Event, Identity, Group, BaseEvent } from "@budibase/types"
|
||||
import { EventProcessor } from "./types"
|
||||
import env from "../../environment"
|
||||
import * as context from "../../context"
|
||||
const pkg = require("../../../package.json")
|
||||
import { EventProcessor } from "../types"
|
||||
import env from "../../../environment"
|
||||
import * as context from "../../../context"
|
||||
import * as rateLimiting from "./rateLimiting"
|
||||
const pkg = require("../../../../package.json")
|
||||
|
||||
const EXCLUDED_EVENTS: Event[] = [
|
||||
Event.USER_UPDATED,
|
||||
|
@ -42,6 +43,10 @@ export default class PosthogProcessor implements EventProcessor {
|
|||
return
|
||||
}
|
||||
|
||||
if (await rateLimiting.limited(event)) {
|
||||
return
|
||||
}
|
||||
|
||||
properties.version = pkg.version
|
||||
properties.service = env.SERVICE
|
||||
properties.environment = identity.environment
|
|
@ -0,0 +1,2 @@
|
|||
import PosthogProcessor from "./PosthogProcessor"
|
||||
export default PosthogProcessor
|
|
@ -0,0 +1,103 @@
|
|||
import { Event } from "@budibase/types"
|
||||
import { CacheKeys, TTL } from "../../../cache/generic"
|
||||
import * as cache from "../../../cache/generic"
|
||||
import * as context from "../../../context"
|
||||
|
||||
type RateLimitedEvent =
|
||||
| Event.SERVED_BUILDER
|
||||
| Event.SERVED_APP_PREVIEW
|
||||
| Event.SERVED_APP
|
||||
|
||||
const isRateLimited = (event: Event): event is RateLimitedEvent => {
|
||||
return (
|
||||
event === Event.SERVED_BUILDER ||
|
||||
event === Event.SERVED_APP_PREVIEW ||
|
||||
event === Event.SERVED_APP
|
||||
)
|
||||
}
|
||||
|
||||
const isPerApp = (event: RateLimitedEvent) => {
|
||||
return event === Event.SERVED_APP_PREVIEW || event === Event.SERVED_APP
|
||||
}
|
||||
|
||||
interface EventProperties {
|
||||
timestamp: number
|
||||
}
|
||||
|
||||
enum RateLimit {
|
||||
CALENDAR_DAY = "calendarDay",
|
||||
}
|
||||
|
||||
const RATE_LIMITS = {
|
||||
[Event.SERVED_APP]: RateLimit.CALENDAR_DAY,
|
||||
[Event.SERVED_APP_PREVIEW]: RateLimit.CALENDAR_DAY,
|
||||
[Event.SERVED_BUILDER]: RateLimit.CALENDAR_DAY,
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this event should be sent right now
|
||||
* Return false to signal the event SHOULD be sent
|
||||
* Return true to signal the event should NOT be sent
|
||||
*/
|
||||
export const limited = async (event: Event): Promise<boolean> => {
|
||||
// not a rate limited event -- send
|
||||
if (!isRateLimited(event)) {
|
||||
return false
|
||||
}
|
||||
|
||||
const cachedEvent = (await readEvent(event)) as EventProperties
|
||||
if (cachedEvent) {
|
||||
const timestamp = new Date(cachedEvent.timestamp)
|
||||
const limit = RATE_LIMITS[event]
|
||||
switch (limit) {
|
||||
case RateLimit.CALENDAR_DAY: {
|
||||
// get midnight at the start of the next day for the timestamp
|
||||
timestamp.setDate(timestamp.getDate() + 1)
|
||||
timestamp.setHours(0, 0, 0, 0)
|
||||
|
||||
// if we have passed the threshold into the next day
|
||||
if (Date.now() > timestamp.getTime()) {
|
||||
// update the timestamp in the event -- send
|
||||
await recordEvent(event, { timestamp: Date.now() })
|
||||
return false
|
||||
} else {
|
||||
// still within the limited period -- don't send
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// no event present i.e. expired -- send
|
||||
await recordEvent(event, { timestamp: Date.now() })
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
const eventKey = (event: RateLimitedEvent) => {
|
||||
let key = `${CacheKeys.EVENTS_RATE_LIMIT}:${event}`
|
||||
if (isPerApp(event)) {
|
||||
key = key + context.getAppId()
|
||||
}
|
||||
return key
|
||||
}
|
||||
|
||||
const readEvent = async (event: RateLimitedEvent) => {
|
||||
const key = eventKey(event)
|
||||
return cache.get(key)
|
||||
}
|
||||
|
||||
const recordEvent = async (
|
||||
event: RateLimitedEvent,
|
||||
properties: EventProperties
|
||||
) => {
|
||||
const key = eventKey(event)
|
||||
const limit = RATE_LIMITS[event]
|
||||
let ttl
|
||||
switch (limit) {
|
||||
case RateLimit.CALENDAR_DAY: {
|
||||
ttl = TTL.ONE_DAY
|
||||
}
|
||||
}
|
||||
|
||||
await cache.store(key, properties, ttl)
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
import "../../../../../tests/utilities/TestConfiguration"
|
||||
import PosthogProcessor from "../PosthogProcessor"
|
||||
import { Event, IdentityType, Hosting } from "@budibase/types"
|
||||
const tk = require("timekeeper")
|
||||
import * as cache from "../../../../cache/generic"
|
||||
import { CacheKeys } from "../../../../cache/generic"
|
||||
import * as context from "../../../../context"
|
||||
|
||||
const newIdentity = () => {
|
||||
return {
|
||||
id: "test",
|
||||
type: IdentityType.USER,
|
||||
hosting: Hosting.SELF,
|
||||
environment: "test",
|
||||
}
|
||||
}
|
||||
|
||||
describe("PosthogProcessor", () => {
|
||||
beforeEach(async () => {
|
||||
jest.clearAllMocks()
|
||||
await cache.bustCache(
|
||||
`${CacheKeys.EVENTS_RATE_LIMIT}:${Event.SERVED_BUILDER}`
|
||||
)
|
||||
})
|
||||
|
||||
describe("processEvent", () => {
|
||||
it("processes event", async () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
await processor.processEvent(Event.APP_CREATED, identity, properties)
|
||||
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it("honours exclusions", async () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
await processor.processEvent(Event.AUTH_SSO_UPDATED, identity, properties)
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
|
||||
describe("rate limiting", () => {
|
||||
it("sends daily event once in same day", async () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
tk.freeze(new Date(2022, 0, 1, 14, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
// go forward one hour
|
||||
tk.freeze(new Date(2022, 0, 1, 15, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it("sends daily event once per unique day", async () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
tk.freeze(new Date(2022, 0, 1, 14, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
// go forward into next day
|
||||
tk.freeze(new Date(2022, 0, 2, 9, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
// go forward into next day
|
||||
tk.freeze(new Date(2022, 0, 3, 5, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
// go forward one hour
|
||||
tk.freeze(new Date(2022, 0, 3, 6, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(3)
|
||||
})
|
||||
|
||||
it("sends event again after cache expires", async () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
tk.freeze(new Date(2022, 0, 1, 14, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
|
||||
await cache.bustCache(
|
||||
`${CacheKeys.EVENTS_RATE_LIMIT}:${Event.SERVED_BUILDER}`
|
||||
)
|
||||
|
||||
tk.freeze(new Date(2022, 0, 1, 14, 0))
|
||||
await processor.processEvent(Event.SERVED_BUILDER, identity, properties)
|
||||
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it("sends per app events once per day per app", async () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
const runAppEvents = async (appId: string) => {
|
||||
await context.doInAppContext(appId, async () => {
|
||||
tk.freeze(new Date(2022, 0, 1, 14, 0))
|
||||
await processor.processEvent(Event.SERVED_APP, identity, properties)
|
||||
await processor.processEvent(
|
||||
Event.SERVED_APP_PREVIEW,
|
||||
identity,
|
||||
properties
|
||||
)
|
||||
|
||||
// go forward one hour - should be ignored
|
||||
tk.freeze(new Date(2022, 0, 1, 15, 0))
|
||||
await processor.processEvent(Event.SERVED_APP, identity, properties)
|
||||
await processor.processEvent(
|
||||
Event.SERVED_APP_PREVIEW,
|
||||
identity,
|
||||
properties
|
||||
)
|
||||
|
||||
// go forward into next day
|
||||
tk.freeze(new Date(2022, 0, 2, 9, 0))
|
||||
|
||||
await processor.processEvent(Event.SERVED_APP, identity, properties)
|
||||
await processor.processEvent(
|
||||
Event.SERVED_APP_PREVIEW,
|
||||
identity,
|
||||
properties
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
await runAppEvents("app_1")
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(4)
|
||||
|
||||
await runAppEvents("app_2")
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(8)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
|
@ -1,40 +0,0 @@
|
|||
import PosthogProcessor from "../PosthogProcessor"
|
||||
import { Event, IdentityType, Hosting } from "@budibase/types"
|
||||
|
||||
const newIdentity = () => {
|
||||
return {
|
||||
id: "test",
|
||||
type: IdentityType.USER,
|
||||
hosting: Hosting.SELF,
|
||||
environment: "test",
|
||||
}
|
||||
}
|
||||
|
||||
describe("PosthogProcessor", () => {
|
||||
beforeEach(() => {
|
||||
jest.clearAllMocks()
|
||||
})
|
||||
|
||||
describe("processEvent", () => {
|
||||
it("processes event", () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
processor.processEvent(Event.APP_CREATED, identity, properties)
|
||||
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it("honours exclusions", () => {
|
||||
const processor = new PosthogProcessor("test")
|
||||
|
||||
const identity = newIdentity()
|
||||
const properties = {}
|
||||
|
||||
processor.processEvent(Event.AUTH_SSO_UPDATED, identity, properties)
|
||||
expect(processor.posthog.capture).toHaveBeenCalledTimes(0)
|
||||
})
|
||||
})
|
||||
})
|
Loading…
Reference in New Issue