From 9d8c18337dc8d438da07a90307ceaff9d9a8951b Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Mon, 4 Mar 2024 16:42:41 +0000 Subject: [PATCH 01/44] Type role controller. --- packages/backend-core/src/security/roles.ts | 57 +++++++++--------- packages/pro | 2 +- packages/server/src/api/controllers/role.ts | 58 ++++++++++--------- .../server/src/api/controllers/routing.ts | 2 +- .../src/api/routes/tests/application.spec.ts | 11 +++- .../src/api/routes/tests/utilities/index.ts | 7 +-- .../src/tests/utilities/TestConfiguration.ts | 10 ++++ packages/types/src/api/web/index.ts | 1 + packages/types/src/api/web/role.ts | 22 +++++++ 9 files changed, 102 insertions(+), 68 deletions(-) create mode 100644 packages/types/src/api/web/role.ts diff --git a/packages/backend-core/src/security/roles.ts b/packages/backend-core/src/security/roles.ts index 4f048c0a11..01473ad991 100644 --- a/packages/backend-core/src/security/roles.ts +++ b/packages/backend-core/src/security/roles.ts @@ -84,16 +84,18 @@ export function getBuiltinRoles(): { [key: string]: RoleDoc } { return cloneDeep(BUILTIN_ROLES) } -export const BUILTIN_ROLE_ID_ARRAY = Object.values(BUILTIN_ROLES).map( - role => role._id -) +export function isBuiltin(role: string) { + return getBuiltinRole(role) !== undefined +} -export const BUILTIN_ROLE_NAME_ARRAY = Object.values(BUILTIN_ROLES).map( - role => role.name -) - -export function isBuiltin(role?: string) { - return BUILTIN_ROLE_ID_ARRAY.some(builtin => role?.includes(builtin)) +export function getBuiltinRole(roleId: string): Role | undefined { + const role = Object.values(BUILTIN_ROLES).find(role => + roleId.includes(role._id) + ) + if (!role) { + return undefined + } + return cloneDeep(role) } /** @@ -123,7 +125,7 @@ export function builtinRoleToNumber(id?: string) { /** * Converts any role to a number, but has to be async to get the roles from db. */ -export async function roleToNumber(id?: string) { +export async function roleToNumber(id: string) { if (isBuiltin(id)) { return builtinRoleToNumber(id) } @@ -131,7 +133,7 @@ export async function roleToNumber(id?: string) { defaultPublic: true, })) as RoleDoc[] for (let role of hierarchy) { - if (isBuiltin(role?.inherits)) { + if (role?.inherits && isBuiltin(role.inherits)) { return builtinRoleToNumber(role.inherits) + 1 } } @@ -161,35 +163,28 @@ export function lowerBuiltinRoleID(roleId1?: string, roleId2?: string): string { * @returns The role object, which may contain an "inherits" property. */ export async function getRole( - roleId?: string, + roleId: string, opts?: { defaultPublic?: boolean } -): Promise { - if (!roleId) { - return undefined - } - let role: any = {} +): Promise { // built in roles mostly come from the in-code implementation, // but can be extended by a doc stored about them (e.g. permissions) - if (isBuiltin(roleId)) { - role = cloneDeep( - Object.values(BUILTIN_ROLES).find(role => role._id === roleId) - ) - } else { + let role: RoleDoc | undefined = getBuiltinRole(roleId) + if (!role) { // make sure has the prefix (if it has it then it won't be added) roleId = prefixRoleID(roleId) } try { const db = getAppDB() - const dbRole = await db.get(getDBRoleID(roleId)) - role = Object.assign(role, dbRole) + const dbRole = await db.get(getDBRoleID(roleId)) + role = Object.assign(role || {}, dbRole) // finalise the ID - role._id = getExternalRoleID(role._id, role.version) + role._id = getExternalRoleID(role._id!, role.version) } catch (err) { if (!isBuiltin(roleId) && opts?.defaultPublic) { return cloneDeep(BUILTIN_ROLES.PUBLIC) } // only throw an error if there is no role at all - if (Object.keys(role).length === 0) { + if (!role || Object.keys(role).length === 0) { throw err } } @@ -200,7 +195,7 @@ export async function getRole( * Simple function to get all the roles based on the top level user role ID. */ async function getAllUserRoles( - userRoleId?: string, + userRoleId: string, opts?: { defaultPublic?: boolean } ): Promise { // admins have access to all roles @@ -226,7 +221,7 @@ async function getAllUserRoles( } export async function getUserRoleIdHierarchy( - userRoleId?: string + userRoleId: string ): Promise { const roles = await getUserRoleHierarchy(userRoleId) return roles.map(role => role._id!) @@ -241,7 +236,7 @@ export async function getUserRoleIdHierarchy( * highest level of access and the last being the lowest level. */ export async function getUserRoleHierarchy( - userRoleId?: string, + userRoleId: string, opts?: { defaultPublic?: boolean } ) { // special case, if they don't have a role then they are a public user @@ -265,9 +260,9 @@ export function checkForRoleResourceArray( return rolePerms } -export async function getAllRoleIds(appId?: string) { +export async function getAllRoleIds(appId: string): Promise { const roles = await getAllRoles(appId) - return roles.map(role => role._id) + return roles.map(role => role._id!) } /** diff --git a/packages/pro b/packages/pro index 183b35d3ac..22a278da72 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit 183b35d3acd42433dcb2d32bcd89a36abe13afec +Subproject commit 22a278da720d92991dabdcd4cb6c96e7abe29781 diff --git a/packages/server/src/api/controllers/role.ts b/packages/server/src/api/controllers/role.ts index ae6b89e6d4..ffc1d74209 100644 --- a/packages/server/src/api/controllers/role.ts +++ b/packages/server/src/api/controllers/role.ts @@ -7,8 +7,14 @@ import { } from "@budibase/backend-core" import { getUserMetadataParams, InternalTables } from "../../db/utils" import { + AccessibleRolesResponse, Database, + DestroyRoleResponse, + FetchRolesResponse, + FindRoleResponse, Role, + SaveRoleRequest, + SaveRoleResponse, UserCtx, UserMetadata, UserRoles, @@ -25,43 +31,35 @@ async function updateRolesOnUserTable( db: Database, roleId: string, updateOption: string, - roleVersion: string | undefined + roleVersion?: string ) { const table = await sdk.tables.getTable(InternalTables.USER_METADATA) - const schema = table.schema - const remove = updateOption === UpdateRolesOptions.REMOVED - let updated = false - for (let prop of Object.keys(schema)) { - if (prop === "roleId") { - updated = true - const constraints = schema[prop].constraints! - const updatedRoleId = - roleVersion === roles.RoleIDVersion.NAME - ? roles.getExternalRoleID(roleId, roleVersion) - : roleId - const indexOfRoleId = constraints.inclusion!.indexOf(updatedRoleId) - if (remove && indexOfRoleId !== -1) { - constraints.inclusion!.splice(indexOfRoleId, 1) - } else if (!remove && indexOfRoleId === -1) { - constraints.inclusion!.push(updatedRoleId) - } - break + const constraints = table.schema.roleId?.constraints + if (constraints) { + const updatedRoleId = + roleVersion === roles.RoleIDVersion.NAME + ? roles.getExternalRoleID(roleId, roleVersion) + : roleId + const indexOfRoleId = constraints.inclusion!.indexOf(updatedRoleId) + const remove = updateOption === UpdateRolesOptions.REMOVED + if (remove && indexOfRoleId !== -1) { + constraints.inclusion!.splice(indexOfRoleId, 1) + } else if (!remove && indexOfRoleId === -1) { + constraints.inclusion!.push(updatedRoleId) } - } - if (updated) { await db.put(table) } } -export async function fetch(ctx: UserCtx) { +export async function fetch(ctx: UserCtx) { ctx.body = await roles.getAllRoles() } -export async function find(ctx: UserCtx) { +export async function find(ctx: UserCtx) { ctx.body = await roles.getRole(ctx.params.roleId) } -export async function save(ctx: UserCtx) { +export async function save(ctx: UserCtx) { const db = context.getAppDB() let { _id, name, inherits, permissionId, version } = ctx.request.body let isCreate = false @@ -109,9 +107,9 @@ export async function save(ctx: UserCtx) { ctx.body = role } -export async function destroy(ctx: UserCtx) { +export async function destroy(ctx: UserCtx) { const db = context.getAppDB() - let roleId = ctx.params.roleId + let roleId = ctx.params.roleId as string if (roles.isBuiltin(roleId)) { ctx.throw(400, "Cannot delete builtin role.") } else { @@ -144,14 +142,18 @@ export async function destroy(ctx: UserCtx) { ctx.status = 200 } -export async function accessible(ctx: UserCtx) { +export async function accessible(ctx: UserCtx) { let roleId = ctx.user?.roleId if (!roleId) { roleId = roles.BUILTIN_ROLE_IDS.PUBLIC } if (ctx.user && sharedSdk.users.isAdminOrBuilder(ctx.user)) { const appId = context.getAppId() - ctx.body = await roles.getAllRoleIds(appId) + if (!appId) { + ctx.body = [] + } else { + ctx.body = await roles.getAllRoleIds(appId) + } } else { ctx.body = await roles.getUserRoleIdHierarchy(roleId!) } diff --git a/packages/server/src/api/controllers/routing.ts b/packages/server/src/api/controllers/routing.ts index 4154c6b597..040cda4dd0 100644 --- a/packages/server/src/api/controllers/routing.ts +++ b/packages/server/src/api/controllers/routing.ts @@ -63,7 +63,7 @@ export async function fetch(ctx: UserCtx) { export async function clientFetch(ctx: UserCtx) { const routing = await getRoutingStructure() let roleId = ctx.user?.role?._id - const roleIds = await roles.getUserRoleIdHierarchy(roleId) + const roleIds = roleId ? await roles.getUserRoleIdHierarchy(roleId) : [] for (let topLevel of Object.values(routing.routes) as any) { for (let subpathKey of Object.keys(topLevel.subpaths)) { let found = false diff --git a/packages/server/src/api/routes/tests/application.spec.ts b/packages/server/src/api/routes/tests/application.spec.ts index 3e4ad693db..5a3be462e8 100644 --- a/packages/server/src/api/routes/tests/application.spec.ts +++ b/packages/server/src/api/routes/tests/application.spec.ts @@ -251,10 +251,15 @@ describe("/applications", () => { describe("permissions", () => { it("should only return apps a user has access to", async () => { - const user = await config.createUser() + const user = await config.createUser({ + builder: { global: false }, + admin: { global: false }, + }) - const apps = await config.api.application.fetch() - expect(apps.length).toBeGreaterThan(0) + await config.withUser(user, async () => { + const apps = await config.api.application.fetch() + expect(apps).toHaveLength(0) + }) }) }) }) diff --git a/packages/server/src/api/routes/tests/utilities/index.ts b/packages/server/src/api/routes/tests/utilities/index.ts index 915ff5d970..dcb8ccd6c0 100644 --- a/packages/server/src/api/routes/tests/utilities/index.ts +++ b/packages/server/src/api/routes/tests/utilities/index.ts @@ -1,5 +1,4 @@ -import TestConfig from "../../../../tests/utilities/TestConfiguration" -import env from "../../../../environment" +import TestConfiguration from "../../../../tests/utilities/TestConfiguration" import supertest from "supertest" export * as structures from "../../../../tests/utilities/structures" @@ -47,10 +46,10 @@ export function delay(ms: number) { } let request: supertest.SuperTest | undefined | null, - config: TestConfig | null + config: TestConfiguration | null export function beforeAll() { - config = new TestConfig() + config = new TestConfiguration() request = config.getRequest() } diff --git a/packages/server/src/tests/utilities/TestConfiguration.ts b/packages/server/src/tests/utilities/TestConfiguration.ts index 35ca2982c0..2127e9d1cd 100644 --- a/packages/server/src/tests/utilities/TestConfiguration.ts +++ b/packages/server/src/tests/utilities/TestConfiguration.ts @@ -299,6 +299,16 @@ export default class TestConfiguration { } } + withUser(user: User, f: () => Promise) { + const oldUser = this.user + this.user = user + try { + return f() + } finally { + this.user = oldUser + } + } + // UTILS _req | void, Res>( diff --git a/packages/types/src/api/web/index.ts b/packages/types/src/api/web/index.ts index 9a688a17a5..8a091afdba 100644 --- a/packages/types/src/api/web/index.ts +++ b/packages/types/src/api/web/index.ts @@ -14,3 +14,4 @@ export * from "./cookies" export * from "./automation" export * from "./layout" export * from "./query" +export * from "./role" diff --git a/packages/types/src/api/web/role.ts b/packages/types/src/api/web/role.ts new file mode 100644 index 0000000000..c37dee60e0 --- /dev/null +++ b/packages/types/src/api/web/role.ts @@ -0,0 +1,22 @@ +import { Role } from "../../documents" + +export interface SaveRoleRequest { + _id?: string + _rev?: string + name: string + inherits: string + permissionId: string + version: string +} + +export interface SaveRoleResponse extends Role {} + +export interface FindRoleResponse extends Role {} + +export type FetchRolesResponse = Role[] + +export interface DestroyRoleResponse { + message: string +} + +export type AccessibleRolesResponse = string[] From fced2f369649410d9e8ea7e4ca381fef3d0afb12 Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 5 Mar 2024 09:23:48 +0000 Subject: [PATCH 02/44] Respond to PR feedback. --- packages/backend-core/src/security/roles.ts | 2 +- packages/server/src/api/controllers/role.ts | 27 +++++++++++---------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/packages/backend-core/src/security/roles.ts b/packages/backend-core/src/security/roles.ts index 01473ad991..213c65e18e 100644 --- a/packages/backend-core/src/security/roles.ts +++ b/packages/backend-core/src/security/roles.ts @@ -184,7 +184,7 @@ export async function getRole( return cloneDeep(BUILTIN_ROLES.PUBLIC) } // only throw an error if there is no role at all - if (!role || Object.keys(role).length === 0) { + if (Object.keys(role || {}).length === 0) { throw err } } diff --git a/packages/server/src/api/controllers/role.ts b/packages/server/src/api/controllers/role.ts index ffc1d74209..b3eb61a255 100644 --- a/packages/server/src/api/controllers/role.ts +++ b/packages/server/src/api/controllers/role.ts @@ -35,20 +35,21 @@ async function updateRolesOnUserTable( ) { const table = await sdk.tables.getTable(InternalTables.USER_METADATA) const constraints = table.schema.roleId?.constraints - if (constraints) { - const updatedRoleId = - roleVersion === roles.RoleIDVersion.NAME - ? roles.getExternalRoleID(roleId, roleVersion) - : roleId - const indexOfRoleId = constraints.inclusion!.indexOf(updatedRoleId) - const remove = updateOption === UpdateRolesOptions.REMOVED - if (remove && indexOfRoleId !== -1) { - constraints.inclusion!.splice(indexOfRoleId, 1) - } else if (!remove && indexOfRoleId === -1) { - constraints.inclusion!.push(updatedRoleId) - } - await db.put(table) + if (!constraints) { + return } + const updatedRoleId = + roleVersion === roles.RoleIDVersion.NAME + ? roles.getExternalRoleID(roleId, roleVersion) + : roleId + const indexOfRoleId = constraints.inclusion!.indexOf(updatedRoleId) + const remove = updateOption === UpdateRolesOptions.REMOVED + if (remove && indexOfRoleId !== -1) { + constraints.inclusion!.splice(indexOfRoleId, 1) + } else if (!remove && indexOfRoleId === -1) { + constraints.inclusion!.push(updatedRoleId) + } + await db.put(table) } export async function fetch(ctx: UserCtx) { From e076c0e5f53d8696d65287fca328427b52de928a Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 15:10:51 +0100 Subject: [PATCH 03/44] Use typed redis clients --- packages/backend-core/src/redis/redis.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index d15453ba62..2280c3f6df 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -28,7 +28,7 @@ const DEFAULT_SELECT_DB = SelectableDatabase.DEFAULT // for testing just generate the client once let CLOSED = false -let CLIENTS: { [key: number]: any } = {} +const CLIENTS: Record = {} let CONNECTED = false // mock redis always connected @@ -36,7 +36,7 @@ if (env.MOCK_REDIS) { CONNECTED = true } -function pickClient(selectDb: number): any { +function pickClient(selectDb: number) { return CLIENTS[selectDb] } From 488cfea1f432c1dad91680d71afafeacdee5229b Mon Sep 17 00:00:00 2001 From: Sam Rose Date: Tue, 5 Mar 2024 14:40:29 +0000 Subject: [PATCH 04/44] Fix typing. --- packages/backend-core/src/security/roles.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend-core/src/security/roles.ts b/packages/backend-core/src/security/roles.ts index 213c65e18e..01473ad991 100644 --- a/packages/backend-core/src/security/roles.ts +++ b/packages/backend-core/src/security/roles.ts @@ -184,7 +184,7 @@ export async function getRole( return cloneDeep(BUILTIN_ROLES.PUBLIC) } // only throw an error if there is no role at all - if (Object.keys(role || {}).length === 0) { + if (!role || Object.keys(role).length === 0) { throw err } } From a5d6d094e63def29122ce5d3b73470681523688f Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 16:28:31 +0100 Subject: [PATCH 05/44] Update types --- packages/backend-core/package.json | 2 +- packages/types/package.json | 2 +- yarn.lock | 11 ++++++----- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/backend-core/package.json b/packages/backend-core/package.json index 3f8c34f823..90b3316c3f 100644 --- a/packages/backend-core/package.json +++ b/packages/backend-core/package.json @@ -67,7 +67,7 @@ "@types/lodash": "4.14.200", "@types/node-fetch": "2.6.4", "@types/pouchdb": "6.4.0", - "@types/redlock": "4.0.3", + "@types/redlock": "4.0.7", "@types/semver": "7.3.7", "@types/tar-fs": "2.0.1", "@types/uuid": "8.3.4", diff --git a/packages/types/package.json b/packages/types/package.json index ce4fce95fb..558e55a632 100644 --- a/packages/types/package.json +++ b/packages/types/package.json @@ -18,7 +18,7 @@ "@budibase/nano": "10.1.5", "@types/koa": "2.13.4", "@types/pouchdb": "6.4.0", - "@types/redlock": "4.0.3", + "@types/redlock": "4.0.7", "rimraf": "3.0.2", "typescript": "5.2.2" }, diff --git a/yarn.lock b/yarn.lock index 260ae3870a..2f9f558e2c 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5408,7 +5408,7 @@ resolved "https://registry.yarnpkg.com/@types/http-errors/-/http-errors-2.0.1.tgz#20172f9578b225f6c7da63446f56d4ce108d5a65" integrity sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ== -"@types/ioredis@4.28.10": +"@types/ioredis@4.28.10", "@types/ioredis@^4.28.10": version "4.28.10" resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.10.tgz#40ceb157a4141088d1394bb87c98ed09a75a06ff" integrity sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ== @@ -5896,12 +5896,13 @@ dependencies: "@types/node" "*" -"@types/redlock@4.0.3": - version "4.0.3" - resolved "https://registry.yarnpkg.com/@types/redlock/-/redlock-4.0.3.tgz#aeab5fe5f0d433a125f6dcf9a884372ac0cddd4b" - integrity sha512-mcvvrquwREbAqyZALNBIlf49AL9Aa324BG+J/Dv4TAP8g+nxQMBI4/APNqqS99QEY7VTNT9XvsaczCVGK8uNnQ== +"@types/redlock@4.0.7": + version "4.0.7" + resolved "https://registry.yarnpkg.com/@types/redlock/-/redlock-4.0.7.tgz#33ed56f22a38d6b2f2e6ae5ed1b3fc1875a08e6b" + integrity sha512-5D6egBv0fCfdbmnCETjEynVuiwFMEFFc3YFjh9EwhaaVTAi0YmB6UI1swq1S1rjIu+n27ppmlTFDK3D3cadJqg== dependencies: "@types/bluebird" "*" + "@types/ioredis" "^4.28.10" "@types/redis" "^2.8.0" "@types/request@^2.48.7": From 56870bed5b0bc2c4479c4107e78193e36195a795 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 16:30:45 +0100 Subject: [PATCH 06/44] Typings --- packages/backend-core/src/redis/redis.ts | 21 ++++++++++++------- .../backend-core/src/redis/redlockImpl.ts | 2 +- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 2280c3f6df..f8f0c9f3d7 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -1,5 +1,5 @@ import env from "../environment" -import Redis from "ioredis" +import Redis, { Cluster } from "ioredis" // mock-redis doesn't have any typing let MockRedis: any | undefined if (env.MOCK_REDIS) { @@ -28,7 +28,7 @@ const DEFAULT_SELECT_DB = SelectableDatabase.DEFAULT // for testing just generate the client once let CLOSED = false -const CLIENTS: Record = {} +const CLIENTS: Record = {} let CONNECTED = false // mock redis always connected @@ -201,12 +201,15 @@ class RedisWrapper { key = `${db}${SEPARATOR}${key}` let stream if (CLUSTERED) { - let node = this.getClient().nodes("master") + let node = (this.getClient() as Cluster).nodes("master") stream = node[0].scanStream({ match: key + "*", count: 100 }) } else { - stream = this.getClient().scanStream({ match: key + "*", count: 100 }) + stream = (this.getClient() as Redis).scanStream({ + match: key + "*", + count: 100, + }) } - return promisifyStream(stream, this.getClient()) + return promisifyStream(stream, this.getClient() as any) } async keys(pattern: string) { @@ -221,14 +224,16 @@ class RedisWrapper { async get(key: string) { const db = this._db - let response = await this.getClient().get(addDbPrefix(db, key)) + const response = await this.getClient().get(addDbPrefix(db, key)) // overwrite the prefixed key + // @ts-ignore if (response != null && response.key) { + // @ts-ignore response.key = key } // if its not an object just return the response try { - return JSON.parse(response) + return JSON.parse(response!) } catch (err) { return response } @@ -280,7 +285,7 @@ class RedisWrapper { return this.getClient().ttl(prefixedKey) } - async setExpiry(key: string, expirySeconds: number | null) { + async setExpiry(key: string, expirySeconds: number) { const db = this._db const prefixedKey = addDbPrefix(db, key) await this.getClient().expire(prefixedKey, expirySeconds) diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index 7009dc6f55..adeb5b12ec 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -72,7 +72,7 @@ const OPTIONS: Record = { export async function newRedlock(opts: Redlock.Options = {}) { const options = { ...OPTIONS.DEFAULT, ...opts } const redisWrapper = await getLockClient() - const client = redisWrapper.getClient() + const client = redisWrapper.getClient() as any return new Redlock([client], options) } From a4288a9dd3e21dd99c3ac825effa7b1a40dadb63 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 16:41:49 +0100 Subject: [PATCH 07/44] Basic test --- .../src/redis/tests/redis.spec.ts | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 packages/backend-core/src/redis/tests/redis.spec.ts diff --git a/packages/backend-core/src/redis/tests/redis.spec.ts b/packages/backend-core/src/redis/tests/redis.spec.ts new file mode 100644 index 0000000000..d082b6b617 --- /dev/null +++ b/packages/backend-core/src/redis/tests/redis.spec.ts @@ -0,0 +1,21 @@ +import { generator, structures } from "../../../tests" +import RedisWrapper from "../redis" + +describe("redis", () => { + const redis = new RedisWrapper(structures.db.id()) + + beforeAll(async () => { + await redis.init() + }) + + describe("store", () => { + it("a basic value can be persisted", async () => { + const key = structures.uuid() + const value = generator.word() + + await redis.store(key, value) + + expect(await redis.get(key)).toEqual(value) + }) + }) +}) From 49db47e1fd75b63b7bf06794e5418a54c266db3d Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 16:46:33 +0100 Subject: [PATCH 08/44] Add bulk store --- packages/backend-core/src/redis/redis.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index f8f0c9f3d7..076f64b1ea 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -279,6 +279,19 @@ class RedisWrapper { } } + async bulkStore( + data: Record, + expirySeconds: number | null = null + ) { + const client = this.getClient() + + const dataToStore = Object.entries(data).reduce((acc, [key, value]) => { + acc[addDbPrefix(this._db, key)] = value + return acc + }, {} as Record) + await client.mset(dataToStore) + } + async getTTL(key: string) { const db = this._db const prefixedKey = addDbPrefix(db, key) From d9a5899b2770e7140c360f0eaf24b9e3ccb84d07 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 16:49:45 +0100 Subject: [PATCH 09/44] Bulk store test --- .../src/redis/tests/redis.spec.ts | 24 +++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/redis/tests/redis.spec.ts b/packages/backend-core/src/redis/tests/redis.spec.ts index d082b6b617..6333573e6e 100644 --- a/packages/backend-core/src/redis/tests/redis.spec.ts +++ b/packages/backend-core/src/redis/tests/redis.spec.ts @@ -2,9 +2,10 @@ import { generator, structures } from "../../../tests" import RedisWrapper from "../redis" describe("redis", () => { - const redis = new RedisWrapper(structures.db.id()) + let redis: RedisWrapper - beforeAll(async () => { + beforeEach(async () => { + redis = new RedisWrapper(structures.db.id()) await redis.init() }) @@ -18,4 +19,23 @@ describe("redis", () => { expect(await redis.get(key)).toEqual(value) }) }) + + describe("bulkStore", () => { + it("a basic object can be persisted", async () => { + const data = generator + .unique(() => generator.word(), 10) + .reduce((acc, key) => { + acc[key] = generator.word() + return acc + }, {} as Record) + + await redis.bulkStore(data) + + for (const [key, value] of Object.entries(data)) { + expect(await redis.get(key)).toEqual(value) + } + + expect(await redis.keys("*")).toHaveLength(10) + }) + }) }) From 1b0a943e13fcb46cc91be18f794c3ff2a4c95684 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 17:04:46 +0100 Subject: [PATCH 10/44] Atomic expires --- packages/backend-core/src/redis/redis.ts | 19 ++++++++++++++++++- .../src/redis/tests/redis.spec.ts | 19 +++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 076f64b1ea..18152aac72 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -289,7 +289,24 @@ class RedisWrapper { acc[addDbPrefix(this._db, key)] = value return acc }, {} as Record) - await client.mset(dataToStore) + + const luaScript = ` + for i, key in ipairs(KEYS) do + redis.call('MSET', key, ARGV[i]) + ${ + expirySeconds !== null + ? `redis.call('EXPIRE', key, ARGV[#ARGV])` + : "" + } + end + ` + const keys = Object.keys(dataToStore) + let values = Object.values(dataToStore) + if (expirySeconds !== null) { + values.push(expirySeconds) + } + + await client.eval(luaScript, keys.length, ...keys, ...values) } async getTTL(key: string) { diff --git a/packages/backend-core/src/redis/tests/redis.spec.ts b/packages/backend-core/src/redis/tests/redis.spec.ts index 6333573e6e..e3e4ae7247 100644 --- a/packages/backend-core/src/redis/tests/redis.spec.ts +++ b/packages/backend-core/src/redis/tests/redis.spec.ts @@ -37,5 +37,24 @@ describe("redis", () => { expect(await redis.keys("*")).toHaveLength(10) }) + + it("a bulk store can be persisted with TTL", async () => { + const ttl = 500 + const data = generator + .unique(() => generator.word(), 10) + .reduce((acc, key) => { + acc[key] = generator.word() + return acc + }, {} as Record) + + await redis.bulkStore(data, ttl) + + for (const [key, value] of Object.entries(data)) { + expect(await redis.get(key)).toEqual(value) + expect(await redis.getTTL(key)).toEqual(ttl) + } + + expect(await redis.keys("*")).toHaveLength(10) + }) }) }) From 3baf981d4826c68360261a1c57c9a97c1a4ed267 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 17:08:08 +0100 Subject: [PATCH 11/44] Add TTL tests --- .../src/redis/tests/redis.spec.ts | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/packages/backend-core/src/redis/tests/redis.spec.ts b/packages/backend-core/src/redis/tests/redis.spec.ts index e3e4ae7247..13f2c676c9 100644 --- a/packages/backend-core/src/redis/tests/redis.spec.ts +++ b/packages/backend-core/src/redis/tests/redis.spec.ts @@ -21,13 +21,17 @@ describe("redis", () => { }) describe("bulkStore", () => { - it("a basic object can be persisted", async () => { - const data = generator - .unique(() => generator.word(), 10) + function createRandomObject(keyLength: number) { + return generator + .unique(() => generator.word(), keyLength) .reduce((acc, key) => { acc[key] = generator.word() return acc }, {} as Record) + } + + it("a basic object can be persisted", async () => { + const data = createRandomObject(10) await redis.bulkStore(data) @@ -38,14 +42,20 @@ describe("redis", () => { expect(await redis.keys("*")).toHaveLength(10) }) + it("no TTL is set by default", async () => { + const data = createRandomObject(10) + + await redis.bulkStore(data) + + for (const [key, value] of Object.entries(data)) { + expect(await redis.get(key)).toEqual(value) + expect(await redis.getTTL(key)).toEqual(-1) + } + }) + it("a bulk store can be persisted with TTL", async () => { const ttl = 500 - const data = generator - .unique(() => generator.word(), 10) - .reduce((acc, key) => { - acc[key] = generator.word() - return acc - }, {} as Record) + const data = createRandomObject(8) await redis.bulkStore(data, ttl) @@ -54,7 +64,20 @@ describe("redis", () => { expect(await redis.getTTL(key)).toEqual(ttl) } - expect(await redis.keys("*")).toHaveLength(10) + expect(await redis.keys("*")).toHaveLength(8) + }) + + it("setting a TTL of -1 will not persist the key", async () => { + const ttl = -1 + const data = createRandomObject(5) + + await redis.bulkStore(data, ttl) + + for (const [key, value] of Object.entries(data)) { + expect(await redis.get(key)).toBe(null) + } + + expect(await redis.keys("*")).toHaveLength(0) }) }) }) From 8f9e8b60c328af4f635c98759b475f08668d7eed Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 17:19:26 +0100 Subject: [PATCH 12/44] Fix types --- packages/backend-core/src/redis/redis.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 18152aac72..99613e7c32 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -28,7 +28,7 @@ const DEFAULT_SELECT_DB = SelectableDatabase.DEFAULT // for testing just generate the client once let CLOSED = false -const CLIENTS: Record = {} +const CLIENTS: Record = {} let CONNECTED = false // mock redis always connected @@ -201,7 +201,7 @@ class RedisWrapper { key = `${db}${SEPARATOR}${key}` let stream if (CLUSTERED) { - let node = (this.getClient() as Cluster).nodes("master") + let node = (this.getClient() as never as Cluster).nodes("master") stream = node[0].scanStream({ match: key + "*", count: 100 }) } else { stream = (this.getClient() as Redis).scanStream({ From f2330144de5bfdddc3d9b4021540e0cc9976c3ca Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 17:57:28 +0100 Subject: [PATCH 13/44] Clean --- packages/backend-core/src/redis/redis.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 99613e7c32..59583da366 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -301,7 +301,7 @@ class RedisWrapper { end ` const keys = Object.keys(dataToStore) - let values = Object.values(dataToStore) + const values = Object.values(dataToStore) if (expirySeconds !== null) { values.push(expirySeconds) } From 82ff748fd950e79e60daad95268d5c34490d25f7 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 18:10:33 +0100 Subject: [PATCH 14/44] Add complex object tests --- .../src/redis/tests/redis.spec.ts | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/redis/tests/redis.spec.ts b/packages/backend-core/src/redis/tests/redis.spec.ts index 13f2c676c9..eb32172edd 100644 --- a/packages/backend-core/src/redis/tests/redis.spec.ts +++ b/packages/backend-core/src/redis/tests/redis.spec.ts @@ -18,14 +18,26 @@ describe("redis", () => { expect(await redis.get(key)).toEqual(value) }) + + it("objects can be persisted", async () => { + const key = structures.uuid() + const value = { [generator.word()]: generator.word() } + + await redis.store(key, value) + + expect(await redis.get(key)).toEqual(value) + }) }) describe("bulkStore", () => { - function createRandomObject(keyLength: number) { + function createRandomObject( + keyLength: number, + valueGenerator: () => any = () => generator.word() + ) { return generator .unique(() => generator.word(), keyLength) .reduce((acc, key) => { - acc[key] = generator.word() + acc[key] = valueGenerator() return acc }, {} as Record) } @@ -42,6 +54,21 @@ describe("redis", () => { expect(await redis.keys("*")).toHaveLength(10) }) + it("a complex object can be persisted", async () => { + const data = { + ...createRandomObject(10, () => createRandomObject(5)), + ...createRandomObject(5), + } + + await redis.bulkStore(data) + + for (const [key, value] of Object.entries(data)) { + expect(await redis.get(key)).toEqual(value) + } + + expect(await redis.keys("*")).toHaveLength(10) + }) + it("no TTL is set by default", async () => { const data = createRandomObject(10) From de0527384aebd2703fd7eec58818366627da8b10 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 18:10:45 +0100 Subject: [PATCH 15/44] Support complex objects --- packages/backend-core/src/redis/redis.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 59583da366..8cfa3db5c1 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -286,7 +286,8 @@ class RedisWrapper { const client = this.getClient() const dataToStore = Object.entries(data).reduce((acc, [key, value]) => { - acc[addDbPrefix(this._db, key)] = value + acc[addDbPrefix(this._db, key)] = + typeof value === "object" ? JSON.stringify(value) : value return acc }, {} as Record) From a093cfca993d585ae5a83e933dcacccee35c4bf5 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 18:11:12 +0100 Subject: [PATCH 16/44] Fix test --- packages/backend-core/src/redis/tests/redis.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/tests/redis.spec.ts b/packages/backend-core/src/redis/tests/redis.spec.ts index eb32172edd..1fd40acc37 100644 --- a/packages/backend-core/src/redis/tests/redis.spec.ts +++ b/packages/backend-core/src/redis/tests/redis.spec.ts @@ -66,7 +66,7 @@ describe("redis", () => { expect(await redis.get(key)).toEqual(value) } - expect(await redis.keys("*")).toHaveLength(10) + expect(await redis.keys("*")).toHaveLength(15) }) it("no TTL is set by default", async () => { From 40cc383c0140fa3d960938162e7924aaacd079f5 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 11:22:27 +0100 Subject: [PATCH 17/44] Create docWriteThrough redis cache --- packages/backend-core/src/redis/init.ts | 13 ++++++++++++- packages/backend-core/src/redis/utils.ts | 1 + 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index f3bcee3209..7920dfed2d 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -9,7 +9,8 @@ let userClient: Client, lockClient: Client, socketClient: Client, inviteClient: Client, - passwordResetClient: Client + passwordResetClient: Client, + docWritethroughClient: Client export async function init() { userClient = await new Client(utils.Databases.USER_CACHE).init() @@ -24,6 +25,9 @@ export async function init() { utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO ).init() + docWritethroughClient = await new Client( + utils.Databases.DOC_WRITE_THROUGH + ).init() } export async function shutdown() { @@ -104,3 +108,10 @@ export async function getPasswordResetClient() { } return passwordResetClient } + +export async function getDocWritethroughClient() { + if (!writethroughClient) { + await init() + } + return writethroughClient +} diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts index 7b93458b52..7f84f11467 100644 --- a/packages/backend-core/src/redis/utils.ts +++ b/packages/backend-core/src/redis/utils.ts @@ -30,6 +30,7 @@ export enum Databases { LOCKS = "locks", SOCKET_IO = "socket_io", BPM_EVENTS = "bpmEvents", + DOC_WRITE_THROUGH = "docWriteThrough", } /** From 9f42ea6bbf2b8247e988b16ae8f3b84a9beb1f9e Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 13:44:52 +0100 Subject: [PATCH 18/44] DocWritethrough --- .../backend-core/src/cache/docWritethrough.ts | 102 ++++++++++++++++++ .../backend-core/src/db/couch/DatabaseImpl.ts | 9 ++ .../backend-core/src/db/instrumentation.ts | 7 ++ packages/types/src/sdk/db.ts | 1 + 4 files changed, 119 insertions(+) create mode 100644 packages/backend-core/src/cache/docWritethrough.ts diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts new file mode 100644 index 0000000000..9e1977f797 --- /dev/null +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -0,0 +1,102 @@ +import BaseCache from "./base" +import { getDocWritethroughClient } from "../redis/init" +import { AnyDocument, Database, LockName, LockType } from "@budibase/types" +import * as locks from "../redis/redlockImpl" + +const DEFAULT_WRITE_RATE_MS = 10000 + +let CACHE: BaseCache | null = null +async function getCache() { + if (!CACHE) { + const client = await getDocWritethroughClient() + CACHE = new BaseCache(client) + } + return CACHE +} + +interface CacheItem { + lastWrite: number +} + +export class DocWritethrough { + db: Database + docId: string + writeRateMs: number + + constructor( + db: Database, + docId: string, + writeRateMs: number = DEFAULT_WRITE_RATE_MS + ) { + this.db = db + this.docId = docId + this.writeRateMs = writeRateMs + } + + private makeCacheItem(): CacheItem { + return { lastWrite: Date.now() } + } + + async patch(data: Record) { + const cache = await getCache() + + const key = `${this.docId}:info` + const cacheItem = await cache.withCache( + key, + null, + () => this.makeCacheItem(), + { + useTenancy: false, + } + ) + + await this.storeToCache(cache, data) + + const updateDb = + !cacheItem || cacheItem.lastWrite <= Date.now() - this.writeRateMs + // let output = this.doc + if (updateDb) { + await this.persistToDb(cache) + } + } + + private async storeToCache(cache: BaseCache, data: Record) { + for (const [key, value] of Object.entries(data)) { + const cacheKey = this.docId + ":data:" + key + await cache.store(cacheKey, { key, value }, undefined) + } + } + + private async persistToDb(cache: BaseCache) { + const key = `${this.db.name}_${this.docId}` + + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: key, + ttl: 15000, + }, + async () => { + let doc: AnyDocument | undefined + try { + doc = await this.db.get(this.docId) + } catch { + doc = { _id: this.docId } + } + + const keysToPersist = await cache.keys(`${this.docId}:data:*`) + for (const key of keysToPersist) { + const data = await cache.get(key, { useTenancy: false }) + doc[data.key] = data.value + } + + await this.db.put(doc) + } + ) + + if (!lockResponse.executed) { + throw `DocWriteThrough could not be persisted to db for ${key}` + } + } +} diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index 7e7c997cbe..d4d17f6127 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -135,6 +135,15 @@ export class DatabaseImpl implements Database { }) } + async docExists(id: string): Promise { + try { + await this.get(id) + return true + } catch { + return false + } + } + async getMultiple( ids: string[], opts?: { allowMissing?: boolean } diff --git a/packages/backend-core/src/db/instrumentation.ts b/packages/backend-core/src/db/instrumentation.ts index 03010d4c92..87af0e3127 100644 --- a/packages/backend-core/src/db/instrumentation.ts +++ b/packages/backend-core/src/db/instrumentation.ts @@ -38,6 +38,13 @@ export class DDInstrumentedDatabase implements Database { }) } + docExists(id: string): Promise { + return tracer.trace("db.docExists", span => { + span?.addTags({ db_name: this.name, doc_id: id }) + return this.db.docExists(id) + }) + } + getMultiple( ids: string[], opts?: { allowMissing?: boolean | undefined } | undefined diff --git a/packages/types/src/sdk/db.ts b/packages/types/src/sdk/db.ts index c4e4a4f02f..dafc9ced57 100644 --- a/packages/types/src/sdk/db.ts +++ b/packages/types/src/sdk/db.ts @@ -128,6 +128,7 @@ export interface Database { exists(): Promise get(id?: string): Promise + docExists(id: string): Promise getMultiple( ids: string[], opts?: { allowMissing?: boolean } From 10568cccff8e4d342a03484f04be299fe4868917 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 13:47:18 +0100 Subject: [PATCH 19/44] USe get for doc exists --- packages/backend-core/src/cache/base/index.ts | 2 +- packages/backend-core/src/db/couch/DatabaseImpl.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/cache/base/index.ts b/packages/backend-core/src/cache/base/index.ts index 264984c6a5..23c952c7b2 100644 --- a/packages/backend-core/src/cache/base/index.ts +++ b/packages/backend-core/src/cache/base/index.ts @@ -60,7 +60,7 @@ export default class BaseCache { */ async withCache( key: string, - ttl: number, + ttl: number | null = null, fetchFn: any, opts = { useTenancy: true } ) { diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index d4d17f6127..9d198e4307 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -137,7 +137,7 @@ export class DatabaseImpl implements Database { async docExists(id: string): Promise { try { - await this.get(id) + await this.performCall(db => () => db.head(id)) return true } catch { return false From 82132d539d2c535be99a8aee58360fff288a1907 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 15:17:18 +0100 Subject: [PATCH 20/44] DatabaseImpl.docExists test --- .../src/db/tests/DatabaseImpl.spec.ts | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 packages/backend-core/src/db/tests/DatabaseImpl.spec.ts diff --git a/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts new file mode 100644 index 0000000000..140ecf4f2c --- /dev/null +++ b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts @@ -0,0 +1,55 @@ +import _ from "lodash" +import { AnyDocument } from "@budibase/types" +import { generator } from "../../../tests" +import { DatabaseImpl } from "../couch" +import { newid } from "../../utils" + +describe("DatabaseImpl", () => { + const database = new DatabaseImpl(generator.word()) + const documents: AnyDocument[] = [] + + beforeAll(async () => { + const docsToCreate = Array.from({ length: 10 }).map(() => ({ + _id: newid(), + })) + const createdDocs = await database.bulkDocs(docsToCreate) + + documents.push(...createdDocs.map((x: any) => ({ _id: x.id, _rev: x.rev }))) + }) + + describe("docExists", () => { + it("can check existing docs by id", async () => { + const existingDoc = _.sample(documents) + const result = await database.docExists(existingDoc!._id!) + + expect(result).toBe(true) + }) + + it("can check non existing docs by id", async () => { + const result = await database.docExists(newid()) + + expect(result).toBe(false) + }) + + it("can check an existing doc by id multiple times", async () => { + const existingDoc = _.sample(documents) + const id = existingDoc!._id! + + const results = [] + results.push(await database.docExists(id)) + results.push(await database.docExists(id)) + results.push(await database.docExists(id)) + + expect(results).toEqual([true, true, true]) + }) + + it("returns false after the doc is deleted", async () => { + const existingDoc = _.sample(documents) + const id = existingDoc!._id! + expect(await database.docExists(id)).toBe(true) + + await database.remove(existingDoc!) + expect(await database.docExists(id)).toBe(false) + }) + }) +}) From 74aae19a7ebdd9fcb040679c2aeca40e991a8456 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 15:23:32 +0100 Subject: [PATCH 21/44] docWritethrough test --- .../src/cache/tests/docWritethrough.spec.ts | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 packages/backend-core/src/cache/tests/docWritethrough.spec.ts diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts new file mode 100644 index 0000000000..bfb1da5f1c --- /dev/null +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -0,0 +1,47 @@ +import tk from "timekeeper" +import { env } from "../.." +import { DBTestConfiguration, generator, structures } from "../../../tests" +import { getDB } from "../../db" +import { DocWritethrough } from "../docWritethrough" +import _ from "lodash" + +env._set("MOCK_REDIS", null) + +const initialTime = Date.now() + +const WRITE_RATE_MS = 500 + +describe("docWritethrough", () => { + const config = new DBTestConfiguration() + + const db = getDB(structures.db.id()) + let documentId: string + let docWritethrough: DocWritethrough + + describe("patch", () => { + function generatePatchObject(fieldCount: number) { + const keys = generator.unique(() => generator.word(), fieldCount) + return keys.reduce((acc, c) => { + acc[c] = generator.word() + return acc + }, {} as Record) + } + + beforeEach(() => { + tk.freeze(initialTime) + documentId = structures.db.id() + docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) + }) + + it("patching will not persist until timeout is hit", async () => { + await config.doInTenant(async () => { + await docWritethrough.patch(generatePatchObject(2)) + await docWritethrough.patch(generatePatchObject(2)) + tk.travel(Date.now() + WRITE_RATE_MS - 1) + await docWritethrough.patch(generatePatchObject(2)) + + expect(await db.docExists(documentId)).toBe(false) + }) + }) + }) +}) From bfde028e9b8dcae7ed81d34542acfcef32cf791c Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 15:28:35 +0100 Subject: [PATCH 22/44] Add persisting tests --- .../src/cache/tests/docWritethrough.spec.ts | 39 ++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index bfb1da5f1c..ab0de53bee 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -33,7 +33,7 @@ describe("docWritethrough", () => { docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) }) - it("patching will not persist until timeout is hit", async () => { + it("patching will not persist if timeout does not hit", async () => { await config.doInTenant(async () => { await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2)) @@ -43,5 +43,42 @@ describe("docWritethrough", () => { expect(await db.docExists(documentId)).toBe(false) }) }) + + it("patching will persist if timeout hits and next patch is called", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + tk.travel(Date.now() + WRITE_RATE_MS) + + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + expect(await db.get(documentId)).toEqual({ + _id: documentId, + ...patch1, + ...patch2, + ...patch3, + _rev: expect.stringMatching(/1-.+/), + createdAt: new Date(initialTime + 500).toISOString(), + updatedAt: new Date(initialTime + 500).toISOString(), + }) + }) + }) + + it("patching will not persist even if timeout hits but next patch is not callec", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + tk.travel(Date.now() + WRITE_RATE_MS) + + expect(await db.docExists(documentId)).toBe(false) + }) + }) }) }) From 35536592e6558176e48960063ab71ddfebd2f8d1 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 15:41:26 +0100 Subject: [PATCH 23/44] Add extra tests --- .../src/cache/tests/docWritethrough.spec.ts | 86 ++++++++++++++++--- 1 file changed, 75 insertions(+), 11 deletions(-) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index ab0de53bee..16e47ce3c3 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -7,9 +7,17 @@ import _ from "lodash" env._set("MOCK_REDIS", null) +const WRITE_RATE_MS = 500 + const initialTime = Date.now() -const WRITE_RATE_MS = 500 +function resetTime() { + tk.travel(initialTime) +} +function travelForward(ms: number) { + const updatedTime = Date.now() + ms + tk.travel(updatedTime) +} describe("docWritethrough", () => { const config = new DBTestConfiguration() @@ -28,7 +36,7 @@ describe("docWritethrough", () => { } beforeEach(() => { - tk.freeze(initialTime) + resetTime() documentId = structures.db.id() docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) }) @@ -37,7 +45,7 @@ describe("docWritethrough", () => { await config.doInTenant(async () => { await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2)) - tk.travel(Date.now() + WRITE_RATE_MS - 1) + travelForward(WRITE_RATE_MS - 1) await docWritethrough.patch(generatePatchObject(2)) expect(await db.docExists(documentId)).toBe(false) @@ -51,7 +59,7 @@ describe("docWritethrough", () => { await docWritethrough.patch(patch1) await docWritethrough.patch(patch2) - tk.travel(Date.now() + WRITE_RATE_MS) + travelForward(WRITE_RATE_MS) const patch3 = generatePatchObject(3) await docWritethrough.patch(patch3) @@ -62,23 +70,79 @@ describe("docWritethrough", () => { ...patch2, ...patch3, _rev: expect.stringMatching(/1-.+/), - createdAt: new Date(initialTime + 500).toISOString(), - updatedAt: new Date(initialTime + 500).toISOString(), + createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), + updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), }) }) }) + it("date audit fields are set correctly when persisting", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + travelForward(WRITE_RATE_MS) + const date1 = new Date() + await docWritethrough.patch(patch2) + + travelForward(WRITE_RATE_MS) + const date2 = new Date() + + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + expect(date1).not.toEqual(date2) + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + createdAt: date1.toISOString(), + updatedAt: date2.toISOString(), + }) + ) + }) + }) + it("patching will not persist even if timeout hits but next patch is not callec", async () => { await config.doInTenant(async () => { - const patch1 = generatePatchObject(2) - const patch2 = generatePatchObject(2) - await docWritethrough.patch(patch1) - await docWritethrough.patch(patch2) + await docWritethrough.patch(generatePatchObject(2)) + await docWritethrough.patch(generatePatchObject(2)) - tk.travel(Date.now() + WRITE_RATE_MS) + travelForward(WRITE_RATE_MS) expect(await db.docExists(documentId)).toBe(false) }) }) + + it("concurrent patches will override keys", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + await docWritethrough.patch(patch1) + const time1 = travelForward(WRITE_RATE_MS) + const patch2 = generatePatchObject(1) + await docWritethrough.patch(patch2) + + const keyToOverride = _.sample(Object.keys(patch1))! + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + [keyToOverride]: patch1[keyToOverride], + }) + ) + + travelForward(WRITE_RATE_MS) + + const patch3 = { + ...generatePatchObject(3), + [keyToOverride]: generator.word(), + } + await docWritethrough.patch(patch3) + + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + ...patch1, + ...patch2, + ...patch3, + }) + ) + }) + }) }) }) From 41dde9722f57f12d03450c4bc98e929c7133086d Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 15:51:42 +0100 Subject: [PATCH 24/44] Test concurrency --- .../backend-core/src/cache/docWritethrough.ts | 12 ++++-- .../src/cache/tests/docWritethrough.spec.ts | 41 ++++++++++++++++++- 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 9e1977f797..13a85a0d84 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -19,9 +19,9 @@ interface CacheItem { } export class DocWritethrough { - db: Database - docId: string - writeRateMs: number + private db: Database + private _docId: string + private writeRateMs: number constructor( db: Database, @@ -29,10 +29,14 @@ export class DocWritethrough { writeRateMs: number = DEFAULT_WRITE_RATE_MS ) { this.db = db - this.docId = docId + this._docId = docId this.writeRateMs = writeRateMs } + get docId() { + return this._docId + } + private makeCacheItem(): CacheItem { return { lastWrite: Date.now() } } diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 16e47ce3c3..aed87499ee 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -41,8 +41,9 @@ describe("docWritethrough", () => { docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) }) - it("patching will not persist if timeout does not hit", async () => { + it("patching will not persist if timeout from the creation does not hit", async () => { await config.doInTenant(async () => { + travelForward(WRITE_RATE_MS) await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2)) travelForward(WRITE_RATE_MS - 1) @@ -116,7 +117,7 @@ describe("docWritethrough", () => { await config.doInTenant(async () => { const patch1 = generatePatchObject(2) await docWritethrough.patch(patch1) - const time1 = travelForward(WRITE_RATE_MS) + travelForward(WRITE_RATE_MS) const patch2 = generatePatchObject(1) await docWritethrough.patch(patch2) @@ -144,5 +145,41 @@ describe("docWritethrough", () => { ) }) }) + + it("concurrent patches to multiple DocWritethrough will not contaminate each other", async () => { + await config.doInTenant(async () => { + const secondDocWritethrough = new DocWritethrough( + db, + structures.db.id(), + WRITE_RATE_MS + ) + + const doc1Patch = generatePatchObject(2) + await docWritethrough.patch(doc1Patch) + const doc2Patch = generatePatchObject(1) + await secondDocWritethrough.patch(doc2Patch) + + travelForward(WRITE_RATE_MS) + + const doc1Patch2 = generatePatchObject(3) + await docWritethrough.patch(doc1Patch2) + const doc2Patch2 = generatePatchObject(3) + await secondDocWritethrough.patch(doc2Patch2) + + expect(await db.get(docWritethrough.docId)).toEqual( + expect.objectContaining({ + ...doc1Patch, + ...doc1Patch2, + }) + ) + + expect(await db.get(secondDocWritethrough.docId)).toEqual( + expect.objectContaining({ + ...doc2Patch, + ...doc2Patch2, + }) + ) + }) + }) }) }) From 223637999a4679536ca68ca0a0115376753abfa1 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 16:48:16 +0100 Subject: [PATCH 25/44] Ensure keys are removed --- .../backend-core/src/cache/docWritethrough.ts | 4 +++ .../src/cache/tests/docWritethrough.spec.ts | 28 +++++++++++++++++++ 2 files changed, 32 insertions(+) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 13a85a0d84..bde93182a9 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -96,6 +96,10 @@ export class DocWritethrough { } await this.db.put(doc) + + for (const key of keysToPersist) { + await cache.delete(key, { useTenancy: false }) + } } ) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index aed87499ee..65e9450f62 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -181,5 +181,33 @@ describe("docWritethrough", () => { ) }) }) + + it("cached values are persisted only once", async () => { + await config.doInTenant(async () => { + const initialPatch = generatePatchObject(5) + + await docWritethrough.patch(initialPatch) + travelForward(WRITE_RATE_MS) + + await docWritethrough.patch({}) + + expect(await db.get(documentId)).toEqual( + expect.objectContaining(initialPatch) + ) + + await db.remove(await db.get(documentId)) + + travelForward(WRITE_RATE_MS) + const extraPatch = generatePatchObject(5) + await docWritethrough.patch(extraPatch) + + expect(await db.get(documentId)).toEqual( + expect.objectContaining(extraPatch) + ) + expect(await db.get(documentId)).not.toEqual( + expect.objectContaining(initialPatch) + ) + }) + }) }) }) From 04fb27962390d79fe2fe3b65fe7ee44a48d6dbd8 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Thu, 29 Feb 2024 17:01:16 +0100 Subject: [PATCH 26/44] Extra tests --- .../src/cache/tests/docWritethrough.spec.ts | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 65e9450f62..974494d1c9 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -77,6 +77,35 @@ describe("docWritethrough", () => { }) }) + it("patching will persist keeping the previous data", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + travelForward(WRITE_RATE_MS) + + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + travelForward(WRITE_RATE_MS) + + const patch4 = generatePatchObject(3) + await docWritethrough.patch(patch4) + + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + _id: documentId, + ...patch1, + ...patch2, + ...patch3, + ...patch4, + }) + ) + }) + }) + it("date audit fields are set correctly when persisting", async () => { await config.doInTenant(async () => { const patch1 = generatePatchObject(2) From fd93eb79d5b96c7cf0c71a9d8501dfe189771d56 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 10:53:18 +0100 Subject: [PATCH 27/44] Fixes and tests --- .../backend-core/src/cache/docWritethrough.ts | 88 +++++++++---------- .../src/cache/tests/docWritethrough.spec.ts | 41 ++++++++- 2 files changed, 82 insertions(+), 47 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index bde93182a9..80063e4772 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -23,6 +23,8 @@ export class DocWritethrough { private _docId: string private writeRateMs: number + private docInfoCacheKey: string + constructor( db: Database, docId: string, @@ -31,6 +33,7 @@ export class DocWritethrough { this.db = db this._docId = docId this.writeRateMs = writeRateMs + this.docInfoCacheKey = `${this.docId}:info` } get docId() { @@ -44,26 +47,39 @@ export class DocWritethrough { async patch(data: Record) { const cache = await getCache() - const key = `${this.docId}:info` - const cacheItem = await cache.withCache( - key, - null, - () => this.makeCacheItem(), - { - useTenancy: false, - } - ) - await this.storeToCache(cache, data) - const updateDb = - !cacheItem || cacheItem.lastWrite <= Date.now() - this.writeRateMs - // let output = this.doc + const updateDb = await this.shouldUpdateDb(cache) + if (updateDb) { - await this.persistToDb(cache) + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: this.docInfoCacheKey, + ttl: 15000, + }, + async () => { + if (await this.shouldUpdateDb(cache)) { + await this.persistToDb(cache) + await cache.store(this.docInfoCacheKey, this.makeCacheItem()) + } + } + ) + + if (!lockResponse.executed) { + console.log(`Ignoring redlock conflict in write-through cache`) + } } } + private async shouldUpdateDb(cache: BaseCache) { + const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () => + this.makeCacheItem() + ) + return cacheItem.lastWrite <= Date.now() - this.writeRateMs + } + private async storeToCache(cache: BaseCache, data: Record) { for (const [key, value] of Object.entries(data)) { const cacheKey = this.docId + ":data:" + key @@ -72,39 +88,23 @@ export class DocWritethrough { } private async persistToDb(cache: BaseCache) { - const key = `${this.db.name}_${this.docId}` + let doc: AnyDocument | undefined + try { + doc = await this.db.get(this.docId) + } catch { + doc = { _id: this.docId } + } - const lockResponse = await locks.doWithLock( - { - type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH, - resource: key, - ttl: 15000, - }, - async () => { - let doc: AnyDocument | undefined - try { - doc = await this.db.get(this.docId) - } catch { - doc = { _id: this.docId } - } + const keysToPersist = await cache.keys(`${this.docId}:data:*`) + for (const key of keysToPersist) { + const data = await cache.get(key, { useTenancy: false }) + doc[data.key] = data.value + } - const keysToPersist = await cache.keys(`${this.docId}:data:*`) - for (const key of keysToPersist) { - const data = await cache.get(key, { useTenancy: false }) - doc[data.key] = data.value - } + await this.db.put(doc) - await this.db.put(doc) - - for (const key of keysToPersist) { - await cache.delete(key, { useTenancy: false }) - } - } - ) - - if (!lockResponse.executed) { - throw `DocWriteThrough could not be persisted to db for ${key}` + for (const key of keysToPersist) { + await cache.delete(key, { useTenancy: false }) } } } diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 974494d1c9..bca781e377 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -1,12 +1,10 @@ import tk from "timekeeper" -import { env } from "../.." + import { DBTestConfiguration, generator, structures } from "../../../tests" import { getDB } from "../../db" import { DocWritethrough } from "../docWritethrough" import _ from "lodash" -env._set("MOCK_REDIS", null) - const WRITE_RATE_MS = 500 const initialTime = Date.now() @@ -238,5 +236,42 @@ describe("docWritethrough", () => { ) }) }) + + it("concurrent calls will not cause multiple saves", async () => { + async function parallelPatch(count: number) { + await Promise.all( + Array.from({ length: count }).map(() => + docWritethrough.patch(generatePatchObject(1)) + ) + ) + } + + const persistToDbSpy = jest.spyOn(docWritethrough as any, "persistToDb") + const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache") + + await config.doInTenant(async () => { + await parallelPatch(5) + expect(persistToDbSpy).not.toBeCalled() + expect(storeToCacheSpy).toBeCalledTimes(5) + + travelForward(WRITE_RATE_MS) + + await parallelPatch(40) + + expect(persistToDbSpy).toBeCalledTimes(1) + expect(storeToCacheSpy).toBeCalledTimes(45) + + await parallelPatch(10) + + expect(persistToDbSpy).toBeCalledTimes(1) + expect(storeToCacheSpy).toBeCalledTimes(55) + + travelForward(WRITE_RATE_MS) + + await parallelPatch(5) + expect(persistToDbSpy).toBeCalledTimes(2) + expect(storeToCacheSpy).toBeCalledTimes(60) + }) + }) }) }) From eb9a1633944d84cbefa727b18a129feff27c9f56 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 11:04:30 +0100 Subject: [PATCH 28/44] Making code more readable --- .../backend-core/src/cache/docWritethrough.ts | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 80063e4772..5148950c1d 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -15,7 +15,7 @@ async function getCache() { } interface CacheItem { - lastWrite: number + nextWrite: number } export class DocWritethrough { @@ -40,8 +40,8 @@ export class DocWritethrough { return this._docId } - private makeCacheItem(): CacheItem { - return { lastWrite: Date.now() } + private makeNextWriteInfoItem(): CacheItem { + return { nextWrite: Date.now() + this.writeRateMs } } async patch(data: Record) { @@ -62,7 +62,10 @@ export class DocWritethrough { async () => { if (await this.shouldUpdateDb(cache)) { await this.persistToDb(cache) - await cache.store(this.docInfoCacheKey, this.makeCacheItem()) + await cache.store( + this.docInfoCacheKey, + this.makeNextWriteInfoItem() + ) } } ) @@ -75,9 +78,9 @@ export class DocWritethrough { private async shouldUpdateDb(cache: BaseCache) { const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () => - this.makeCacheItem() + this.makeNextWriteInfoItem() ) - return cacheItem.lastWrite <= Date.now() - this.writeRateMs + return Date.now() >= cacheItem.nextWrite } private async storeToCache(cache: BaseCache, data: Record) { From dc84eb4e806684c438ab18005bb14836720cc57b Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 11:04:55 +0100 Subject: [PATCH 29/44] Type caches --- packages/backend-core/src/cache/base/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/backend-core/src/cache/base/index.ts b/packages/backend-core/src/cache/base/index.ts index 23c952c7b2..911bd6a831 100644 --- a/packages/backend-core/src/cache/base/index.ts +++ b/packages/backend-core/src/cache/base/index.ts @@ -58,12 +58,12 @@ export default class BaseCache { /** * Read from the cache. Write to the cache if not exists. */ - async withCache( + async withCache( key: string, ttl: number | null = null, - fetchFn: any, + fetchFn: () => Promise | T, opts = { useTenancy: true } - ) { + ): Promise { const cachedValue = await this.get(key, opts) if (cachedValue) { return cachedValue From e986d34b8739258e81c6acc385afdd4cbe133a7b Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 11:12:31 +0100 Subject: [PATCH 30/44] Fix types --- packages/backend-core/src/cache/generic.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/packages/backend-core/src/cache/generic.ts b/packages/backend-core/src/cache/generic.ts index 3ac323a8d4..2d6d8b9472 100644 --- a/packages/backend-core/src/cache/generic.ts +++ b/packages/backend-core/src/cache/generic.ts @@ -26,7 +26,8 @@ export const store = (...args: Parameters) => GENERIC.store(...args) export const destroy = (...args: Parameters) => GENERIC.delete(...args) -export const withCache = (...args: Parameters) => - GENERIC.withCache(...args) +export const withCache = ( + ...args: Parameters> +) => GENERIC.withCache(...args) export const bustCache = (...args: Parameters) => GENERIC.bustCache(...args) From da012c0f082d1bf44b6837e69da05d0a13db7fea Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 13:38:48 +0100 Subject: [PATCH 31/44] Namespace key in redis by db --- packages/backend-core/src/cache/docWritethrough.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 5148950c1d..e46c763906 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -33,7 +33,7 @@ export class DocWritethrough { this.db = db this._docId = docId this.writeRateMs = writeRateMs - this.docInfoCacheKey = `${this.docId}:info` + this.docInfoCacheKey = `${this.db.name}:${this.docId}:info` } get docId() { From 82a6f9027e5df55b113d550d5e26a8b958f87219 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 13:41:40 +0100 Subject: [PATCH 32/44] Namespace key in redis by db --- packages/backend-core/src/cache/docWritethrough.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index e46c763906..e367c9e060 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -23,6 +23,7 @@ export class DocWritethrough { private _docId: string private writeRateMs: number + private cacheKeyPrefix: string private docInfoCacheKey: string constructor( @@ -33,7 +34,8 @@ export class DocWritethrough { this.db = db this._docId = docId this.writeRateMs = writeRateMs - this.docInfoCacheKey = `${this.db.name}:${this.docId}:info` + this.cacheKeyPrefix = `${this.db.name}:${this.docId}` + this.docInfoCacheKey = `${this.cacheKeyPrefix}:info` } get docId() { @@ -85,7 +87,7 @@ export class DocWritethrough { private async storeToCache(cache: BaseCache, data: Record) { for (const [key, value] of Object.entries(data)) { - const cacheKey = this.docId + ":data:" + key + const cacheKey = this.cacheKeyPrefix + ":data:" + key await cache.store(cacheKey, { key, value }, undefined) } } @@ -98,7 +100,7 @@ export class DocWritethrough { doc = { _id: this.docId } } - const keysToPersist = await cache.keys(`${this.docId}:data:*`) + const keysToPersist = await cache.keys(`${this.cacheKeyPrefix}:data:*`) for (const key of keysToPersist) { const data = await cache.get(key, { useTenancy: false }) doc[data.key] = data.value From 774ff42f0c926eb91c84d8a467a9047947274573 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Fri, 1 Mar 2024 13:59:51 +0100 Subject: [PATCH 33/44] Use overloads --- .../src/cache/tests/docWritethrough.spec.ts | 6 ++-- .../backend-core/src/db/couch/DatabaseImpl.ts | 28 ++++++++++++------- .../backend-core/src/db/instrumentation.ts | 14 ++++------ .../src/db/tests/DatabaseImpl.spec.ts | 16 +++++------ packages/types/src/sdk/db.ts | 2 +- 5 files changed, 35 insertions(+), 31 deletions(-) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index bca781e377..4c4a4b2b60 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -35,7 +35,7 @@ describe("docWritethrough", () => { beforeEach(() => { resetTime() - documentId = structures.db.id() + documentId = structures.uuid() docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) }) @@ -47,7 +47,7 @@ describe("docWritethrough", () => { travelForward(WRITE_RATE_MS - 1) await docWritethrough.patch(generatePatchObject(2)) - expect(await db.docExists(documentId)).toBe(false) + expect(await db.exists(documentId)).toBe(false) }) }) @@ -136,7 +136,7 @@ describe("docWritethrough", () => { travelForward(WRITE_RATE_MS) - expect(await db.docExists(documentId)).toBe(false) + expect(await db.exists(documentId)).toBe(false) }) }) diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index 9d198e4307..416313f520 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -70,7 +70,15 @@ export class DatabaseImpl implements Database { DatabaseImpl.nano = buildNano(couchInfo) } - async exists() { + exists(docId?: string) { + if (docId === undefined) { + return this.dbExists() + } + + return this.docExists(docId) + } + + private async dbExists() { const response = await directCouchUrlCall({ url: `${this.couchInfo.url}/${this.name}`, method: "HEAD", @@ -79,6 +87,15 @@ export class DatabaseImpl implements Database { return response.status === 200 } + private async docExists(id: string): Promise { + try { + await this.performCall(db => () => db.head(id)) + return true + } catch { + return false + } + } + private nano() { return this.instanceNano || DatabaseImpl.nano } @@ -135,15 +152,6 @@ export class DatabaseImpl implements Database { }) } - async docExists(id: string): Promise { - try { - await this.performCall(db => () => db.head(id)) - return true - } catch { - return false - } - } - async getMultiple( ids: string[], opts?: { allowMissing?: boolean } diff --git a/packages/backend-core/src/db/instrumentation.ts b/packages/backend-core/src/db/instrumentation.ts index 87af0e3127..795f30d7cd 100644 --- a/packages/backend-core/src/db/instrumentation.ts +++ b/packages/backend-core/src/db/instrumentation.ts @@ -24,9 +24,12 @@ export class DDInstrumentedDatabase implements Database { return this.db.name } - exists(): Promise { + exists(docId?: string): Promise { return tracer.trace("db.exists", span => { - span?.addTags({ db_name: this.name }) + span?.addTags({ db_name: this.name, doc_id: docId }) + if (docId) { + return this.db.exists(docId) + } return this.db.exists() }) } @@ -38,13 +41,6 @@ export class DDInstrumentedDatabase implements Database { }) } - docExists(id: string): Promise { - return tracer.trace("db.docExists", span => { - span?.addTags({ db_name: this.name, doc_id: id }) - return this.db.docExists(id) - }) - } - getMultiple( ids: string[], opts?: { allowMissing?: boolean | undefined } | undefined diff --git a/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts index 140ecf4f2c..586f13f417 100644 --- a/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts +++ b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts @@ -17,16 +17,16 @@ describe("DatabaseImpl", () => { documents.push(...createdDocs.map((x: any) => ({ _id: x.id, _rev: x.rev }))) }) - describe("docExists", () => { + describe("document exists", () => { it("can check existing docs by id", async () => { const existingDoc = _.sample(documents) - const result = await database.docExists(existingDoc!._id!) + const result = await database.exists(existingDoc!._id!) expect(result).toBe(true) }) it("can check non existing docs by id", async () => { - const result = await database.docExists(newid()) + const result = await database.exists(newid()) expect(result).toBe(false) }) @@ -36,9 +36,9 @@ describe("DatabaseImpl", () => { const id = existingDoc!._id! const results = [] - results.push(await database.docExists(id)) - results.push(await database.docExists(id)) - results.push(await database.docExists(id)) + results.push(await database.exists(id)) + results.push(await database.exists(id)) + results.push(await database.exists(id)) expect(results).toEqual([true, true, true]) }) @@ -46,10 +46,10 @@ describe("DatabaseImpl", () => { it("returns false after the doc is deleted", async () => { const existingDoc = _.sample(documents) const id = existingDoc!._id! - expect(await database.docExists(id)).toBe(true) + expect(await database.exists(id)).toBe(true) await database.remove(existingDoc!) - expect(await database.docExists(id)).toBe(false) + expect(await database.exists(id)).toBe(false) }) }) }) diff --git a/packages/types/src/sdk/db.ts b/packages/types/src/sdk/db.ts index dafc9ced57..4d103d5be6 100644 --- a/packages/types/src/sdk/db.ts +++ b/packages/types/src/sdk/db.ts @@ -128,7 +128,7 @@ export interface Database { exists(): Promise get(id?: string): Promise - docExists(id: string): Promise + exists(docId: string): Promise getMultiple( ids: string[], opts?: { allowMissing?: boolean } From 2412d75cacbe36f27d0f8c4d02804eb371bb292d Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Mon, 4 Mar 2024 15:38:45 +0100 Subject: [PATCH 34/44] Type inMemoryQueue --- .../backend-core/src/queue/inMemoryQueue.ts | 36 ++++++++++--------- packages/backend-core/src/queue/queue.ts | 2 ++ 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index c05bbffbe9..3205b6f383 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,5 +1,6 @@ import events from "events" import { timeout } from "../utils" +import { Queue, QueueOptions, JobOptions } from "./queue" /** * Bull works with a Job wrapper around all messages that contains a lot more information about @@ -24,9 +25,9 @@ function newJob(queue: string, message: any) { * It is relatively simple, using an event emitter internally to register when messages are available * to the consumers - in can support many inputs and many consumers. */ -class InMemoryQueue { +class InMemoryQueue implements Partial { _name: string - _opts?: any + _opts?: QueueOptions _messages: any[] _emitter: EventEmitter _runCount: number @@ -37,7 +38,7 @@ class InMemoryQueue { * @param opts This is not used by the in memory queue as there is no real use * case when in memory, but is the same API as Bull */ - constructor(name: string, opts?: any) { + constructor(name: string, opts?: QueueOptions) { this._name = name this._opts = opts this._messages = [] @@ -55,8 +56,12 @@ class InMemoryQueue { * note this is incredibly limited compared to Bull as in reality the Job would contain * a lot more information about the queue and current status of Bull cluster. */ - process(func: any) { + async process(func: any) { this._emitter.on("message", async () => { + const delay = this._opts?.defaultJobOptions?.delay + if (delay) { + await new Promise(r => setTimeout(() => r(), delay)) + } if (this._messages.length <= 0) { return } @@ -70,7 +75,7 @@ class InMemoryQueue { } async isReady() { - return true + return this as any } // simply puts a message to the queue and emits to the queue for processing @@ -83,27 +88,26 @@ class InMemoryQueue { * @param repeat serves no purpose for the import queue. */ // eslint-disable-next-line no-unused-vars - add(msg: any, repeat: boolean) { - if (typeof msg !== "object") { + async add(data: any, opts?: JobOptions) { + if (typeof data !== "object") { throw "Queue only supports carrying JSON." } - this._messages.push(newJob(this._name, msg)) + this._messages.push(newJob(this._name, data)) this._addCount++ this._emitter.emit("message") + return {} as any } /** * replicating the close function from bull, which waits for jobs to finish. */ - async close() { - return [] - } + async close() {} /** * This removes a cron which has been implemented, this is part of Bull API. * @param cronJobId The cron which is to be removed. */ - removeRepeatableByKey(cronJobId: string) { + async removeRepeatableByKey(cronJobId: string) { // TODO: implement for testing console.log(cronJobId) } @@ -111,12 +115,12 @@ class InMemoryQueue { /** * Implemented for tests */ - getRepeatableJobs() { + async getRepeatableJobs() { return [] } // eslint-disable-next-line no-unused-vars - removeJobs(pattern: string) { + async removeJobs(pattern: string) { // no-op } @@ -128,12 +132,12 @@ class InMemoryQueue { } async getJob() { - return {} + return null } on() { // do nothing - return this + return this as any } async waitForCompletion() { diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 0bcb25a35f..1838eed92f 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners" import { Duration } from "../utils" import * as timers from "../timers" +export { QueueOptions, Queue, JobOptions } from "bull" + // the queue lock is held for 5 minutes const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() // queue lock is refreshed every 30 seconds From b39400f08c5145a818aadd602f74c2a7a41e895c Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Mon, 4 Mar 2024 15:43:47 +0100 Subject: [PATCH 35/44] Clean --- packages/worker/src/initPro.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/worker/src/initPro.ts b/packages/worker/src/initPro.ts index ddc8d2562a..b34d514992 100644 --- a/packages/worker/src/initPro.ts +++ b/packages/worker/src/initPro.ts @@ -1,5 +1,4 @@ import { sdk as proSdk } from "@budibase/pro" -import * as userSdk from "./sdk/users" export const initPro = async () => { await proSdk.init({}) From df325e21c30fae69940ed04bc3eb9f2d2f8b160d Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Mon, 4 Mar 2024 16:18:01 +0100 Subject: [PATCH 36/44] Add doc-writethrough queue --- packages/backend-core/src/queue/constants.ts | 1 + packages/backend-core/src/queue/listeners.ts | 2 ++ 2 files changed, 3 insertions(+) diff --git a/packages/backend-core/src/queue/constants.ts b/packages/backend-core/src/queue/constants.ts index eb4f21aced..a095c6c769 100644 --- a/packages/backend-core/src/queue/constants.ts +++ b/packages/backend-core/src/queue/constants.ts @@ -4,4 +4,5 @@ export enum JobQueue { AUDIT_LOG = "auditLogQueue", SYSTEM_EVENT_QUEUE = "systemEventQueue", APP_MIGRATION = "appMigration", + DOC_WRITETHROUGH_QUEUE = "docWritethroughQueue", } diff --git a/packages/backend-core/src/queue/listeners.ts b/packages/backend-core/src/queue/listeners.ts index 063a01bd2f..14dce5fe8d 100644 --- a/packages/backend-core/src/queue/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -88,6 +88,7 @@ enum QueueEventType { AUDIT_LOG_EVENT = "audit-log-event", SYSTEM_EVENT = "system-event", APP_MIGRATION = "app-migration", + DOC_WRITETHROUGH = "doc-writethrough", } const EventTypeMap: { [key in JobQueue]: QueueEventType } = { @@ -96,6 +97,7 @@ const EventTypeMap: { [key in JobQueue]: QueueEventType } = { [JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT, [JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT, [JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION, + [JobQueue.DOC_WRITETHROUGH_QUEUE]: QueueEventType.DOC_WRITETHROUGH, } function logging(queue: Queue, jobQueue: JobQueue) { From 936ddafee7c21aa939c2842e793e6865741054a5 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Mon, 4 Mar 2024 16:34:05 +0100 Subject: [PATCH 37/44] Use bull --- .../backend-core/src/cache/docWritethrough.ts | 123 +++++++++--------- 1 file changed, 64 insertions(+), 59 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index e367c9e060..38a162435d 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -3,6 +3,9 @@ import { getDocWritethroughClient } from "../redis/init" import { AnyDocument, Database, LockName, LockType } from "@budibase/types" import * as locks from "../redis/redlockImpl" +import { JobQueue, createQueue } from "../queue" +import { context, db as dbUtils } from ".." + const DEFAULT_WRITE_RATE_MS = 10000 let CACHE: BaseCache | null = null @@ -14,17 +17,63 @@ async function getCache() { return CACHE } -interface CacheItem { - nextWrite: number +interface ProcessDocMessage { + tenantId: string + dbName: string + docId: string + cacheKeyPrefix: string } +export const docWritethroughProcessorQueue = createQueue( + JobQueue.DOC_WRITETHROUGH_QUEUE +) + +docWritethroughProcessorQueue.process(async message => { + const { dbName, tenantId, docId, cacheKeyPrefix } = message.data + const cache = await getCache() + await context.doInTenant(tenantId, async () => { + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: cacheKeyPrefix, + ttl: 15000, + }, + async () => { + const db = dbUtils.getDB(dbName) + let doc: AnyDocument | undefined + try { + doc = await db.get(docId) + } catch { + doc = { _id: docId } + } + + const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`) + for (const key of keysToPersist) { + const data = await cache.get(key, { useTenancy: false }) + doc[data.key] = data.value + } + + await db.put(doc) + + for (const key of keysToPersist) { + await cache.delete(key, { useTenancy: false }) + } + } + ) + + if (!lockResponse.executed) { + console.log(`Ignoring redlock conflict in write-through cache`) + } + }) +}) + export class DocWritethrough { private db: Database private _docId: string private writeRateMs: number private cacheKeyPrefix: string - private docInfoCacheKey: string constructor( db: Database, @@ -35,54 +84,31 @@ export class DocWritethrough { this._docId = docId this.writeRateMs = writeRateMs this.cacheKeyPrefix = `${this.db.name}:${this.docId}` - this.docInfoCacheKey = `${this.cacheKeyPrefix}:info` } get docId() { return this._docId } - private makeNextWriteInfoItem(): CacheItem { - return { nextWrite: Date.now() + this.writeRateMs } - } - async patch(data: Record) { const cache = await getCache() await this.storeToCache(cache, data) - const updateDb = await this.shouldUpdateDb(cache) - - if (updateDb) { - const lockResponse = await locks.doWithLock( - { - type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH, - resource: this.docInfoCacheKey, - ttl: 15000, - }, - async () => { - if (await this.shouldUpdateDb(cache)) { - await this.persistToDb(cache) - await cache.store( - this.docInfoCacheKey, - this.makeNextWriteInfoItem() - ) - } - } - ) - - if (!lockResponse.executed) { - console.log(`Ignoring redlock conflict in write-through cache`) + docWritethroughProcessorQueue.add( + { + tenantId: context.getTenantId(), + dbName: this.db.name, + docId: this.docId, + cacheKeyPrefix: this.cacheKeyPrefix, + }, + { + delay: this.writeRateMs - 1, + jobId: this.cacheKeyPrefix, + removeOnFail: true, + removeOnComplete: true, } - } - } - - private async shouldUpdateDb(cache: BaseCache) { - const cacheItem = await cache.withCache(this.docInfoCacheKey, null, () => - this.makeNextWriteInfoItem() ) - return Date.now() >= cacheItem.nextWrite } private async storeToCache(cache: BaseCache, data: Record) { @@ -91,25 +117,4 @@ export class DocWritethrough { await cache.store(cacheKey, { key, value }, undefined) } } - - private async persistToDb(cache: BaseCache) { - let doc: AnyDocument | undefined - try { - doc = await this.db.get(this.docId) - } catch { - doc = { _id: this.docId } - } - - const keysToPersist = await cache.keys(`${this.cacheKeyPrefix}:data:*`) - for (const key of keysToPersist) { - const data = await cache.get(key, { useTenancy: false }) - doc[data.key] = data.value - } - - await this.db.put(doc) - - for (const key of keysToPersist) { - await cache.delete(key, { useTenancy: false }) - } - } } From 420b0ffc03386fdf896b11ff0cc5a0f01741ef9f Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 13:50:58 +0100 Subject: [PATCH 38/44] Tests --- .../backend-core/src/cache/docWritethrough.ts | 99 +++++++++------ .../src/cache/tests/docWritethrough.spec.ts | 120 ++++++++++-------- .../backend-core/src/queue/inMemoryQueue.ts | 76 ++++++++--- 3 files changed, 186 insertions(+), 109 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 38a162435d..f53cfbfe5f 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -4,7 +4,8 @@ import { AnyDocument, Database, LockName, LockType } from "@budibase/types" import * as locks from "../redis/redlockImpl" import { JobQueue, createQueue } from "../queue" -import { context, db as dbUtils } from ".." +import * as context from "../context" +import * as dbUtils from "../db" const DEFAULT_WRITE_RATE_MS = 10000 @@ -28,50 +29,71 @@ export const docWritethroughProcessorQueue = createQueue( JobQueue.DOC_WRITETHROUGH_QUEUE ) -docWritethroughProcessorQueue.process(async message => { - const { dbName, tenantId, docId, cacheKeyPrefix } = message.data - const cache = await getCache() - await context.doInTenant(tenantId, async () => { - const lockResponse = await locks.doWithLock( - { - type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH, - resource: cacheKeyPrefix, - ttl: 15000, - }, - async () => { - const db = dbUtils.getDB(dbName) - let doc: AnyDocument | undefined - try { - doc = await db.get(docId) - } catch { - doc = { _id: docId } +let _init = false +export const init = () => { + if (_init) { + return + } + docWritethroughProcessorQueue.process(async message => { + const { tenantId, cacheKeyPrefix } = message.data + await context.doInTenant(tenantId, async () => { + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: cacheKeyPrefix, + ttl: 15000, + }, + async () => { + await persistToDb(message.data) } + ) - const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`) - for (const key of keysToPersist) { - const data = await cache.get(key, { useTenancy: false }) - doc[data.key] = data.value - } - - await db.put(doc) - - for (const key of keysToPersist) { - await cache.delete(key, { useTenancy: false }) - } + if (!lockResponse.executed) { + console.log(`Ignoring redlock conflict in write-through cache`) } - ) - - if (!lockResponse.executed) { - console.log(`Ignoring redlock conflict in write-through cache`) - } + }) }) -}) + _init = true +} + +export async function persistToDb({ + dbName, + docId, + cacheKeyPrefix, +}: { + dbName: string + docId: string + cacheKeyPrefix: string +}) { + const cache = await getCache() + + const db = dbUtils.getDB(dbName) + let doc: AnyDocument | undefined + try { + doc = await db.get(docId) + } catch { + doc = { _id: docId } + } + + const keysToPersist = await cache.keys(`${cacheKeyPrefix}:data:*`) + for (const key of keysToPersist) { + const data = await cache.get(key, { useTenancy: false }) + doc[data.key] = data.value + } + + await db.put(doc) + + for (const key of keysToPersist) { + await cache.delete(key, { useTenancy: false }) + } +} export class DocWritethrough { private db: Database private _docId: string private writeRateMs: number + private tenantId: string private cacheKeyPrefix: string @@ -84,6 +106,7 @@ export class DocWritethrough { this._docId = docId this.writeRateMs = writeRateMs this.cacheKeyPrefix = `${this.db.name}:${this.docId}` + this.tenantId = context.getTenantId() } get docId() { @@ -97,13 +120,13 @@ export class DocWritethrough { docWritethroughProcessorQueue.add( { - tenantId: context.getTenantId(), + tenantId: this.tenantId, dbName: this.db.name, docId: this.docId, cacheKeyPrefix: this.cacheKeyPrefix, }, { - delay: this.writeRateMs - 1, + delay: this.writeRateMs, jobId: this.cacheKeyPrefix, removeOnFail: true, removeOnComplete: true, diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 4c4a4b2b60..83af66a9d2 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -1,20 +1,32 @@ -import tk from "timekeeper" - import { DBTestConfiguration, generator, structures } from "../../../tests" import { getDB } from "../../db" -import { DocWritethrough } from "../docWritethrough" import _ from "lodash" -const WRITE_RATE_MS = 500 +import { + DocWritethrough, + docWritethroughProcessorQueue, + init, +} from "../docWritethrough" +import InMemoryQueue from "../../queue/inMemoryQueue" + +const WRITE_RATE_MS = 1000 const initialTime = Date.now() +jest.useFakeTimers({ + now: initialTime, +}) + function resetTime() { - tk.travel(initialTime) + jest.setSystemTime(initialTime) } -function travelForward(ms: number) { - const updatedTime = Date.now() + ms - tk.travel(updatedTime) +async function travelForward(ms: number) { + await jest.advanceTimersByTimeAsync(ms) + + const queue: InMemoryQueue = docWritethroughProcessorQueue as never + while (queue.hasRunningJobs()) { + await jest.runOnlyPendingTimersAsync() + } } describe("docWritethrough", () => { @@ -33,33 +45,37 @@ describe("docWritethrough", () => { }, {} as Record) } - beforeEach(() => { + beforeAll(() => init()) + + beforeEach(async () => { resetTime() documentId = structures.uuid() - docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) + await config.doInTenant(async () => { + docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) + }) }) - it("patching will not persist if timeout from the creation does not hit", async () => { + it("patching will not persist if timeout does not hit", async () => { await config.doInTenant(async () => { - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) await docWritethrough.patch(generatePatchObject(2)) await docWritethrough.patch(generatePatchObject(2)) - travelForward(WRITE_RATE_MS - 1) - await docWritethrough.patch(generatePatchObject(2)) + await travelForward(WRITE_RATE_MS - 1) expect(await db.exists(documentId)).toBe(false) }) }) - it("patching will persist if timeout hits and next patch is called", async () => { + it("patching will persist if timeout hits", async () => { await config.doInTenant(async () => { const patch1 = generatePatchObject(2) const patch2 = generatePatchObject(2) await docWritethrough.patch(patch1) await docWritethrough.patch(patch2) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) + // This will not be persisted const patch3 = generatePatchObject(3) await docWritethrough.patch(patch3) @@ -67,7 +83,6 @@ describe("docWritethrough", () => { _id: documentId, ...patch1, ...patch2, - ...patch3, _rev: expect.stringMatching(/1-.+/), createdAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), updatedAt: new Date(initialTime + WRITE_RATE_MS).toISOString(), @@ -82,15 +97,12 @@ describe("docWritethrough", () => { await docWritethrough.patch(patch1) await docWritethrough.patch(patch2) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const patch3 = generatePatchObject(3) await docWritethrough.patch(patch3) - travelForward(WRITE_RATE_MS) - - const patch4 = generatePatchObject(3) - await docWritethrough.patch(patch4) + await travelForward(WRITE_RATE_MS) expect(await db.get(documentId)).toEqual( expect.objectContaining({ @@ -98,7 +110,6 @@ describe("docWritethrough", () => { ...patch1, ...patch2, ...patch3, - ...patch4, }) ) }) @@ -109,16 +120,13 @@ describe("docWritethrough", () => { const patch1 = generatePatchObject(2) const patch2 = generatePatchObject(2) await docWritethrough.patch(patch1) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const date1 = new Date() await docWritethrough.patch(patch2) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const date2 = new Date() - const patch3 = generatePatchObject(3) - await docWritethrough.patch(patch3) - expect(date1).not.toEqual(date2) expect(await db.get(documentId)).toEqual( expect.objectContaining({ @@ -129,22 +137,11 @@ describe("docWritethrough", () => { }) }) - it("patching will not persist even if timeout hits but next patch is not callec", async () => { - await config.doInTenant(async () => { - await docWritethrough.patch(generatePatchObject(2)) - await docWritethrough.patch(generatePatchObject(2)) - - travelForward(WRITE_RATE_MS) - - expect(await db.exists(documentId)).toBe(false) - }) - }) - it("concurrent patches will override keys", async () => { await config.doInTenant(async () => { const patch1 = generatePatchObject(2) await docWritethrough.patch(patch1) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const patch2 = generatePatchObject(1) await docWritethrough.patch(patch2) @@ -155,13 +152,14 @@ describe("docWritethrough", () => { }) ) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const patch3 = { ...generatePatchObject(3), [keyToOverride]: generator.word(), } await docWritethrough.patch(patch3) + await travelForward(WRITE_RATE_MS) expect(await db.get(documentId)).toEqual( expect.objectContaining({ @@ -173,7 +171,7 @@ describe("docWritethrough", () => { }) }) - it("concurrent patches to multiple DocWritethrough will not contaminate each other", async () => { + it("concurrent patches to different docWritethrough will not pollute each other", async () => { await config.doInTenant(async () => { const secondDocWritethrough = new DocWritethrough( db, @@ -186,12 +184,13 @@ describe("docWritethrough", () => { const doc2Patch = generatePatchObject(1) await secondDocWritethrough.patch(doc2Patch) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const doc1Patch2 = generatePatchObject(3) await docWritethrough.patch(doc1Patch2) const doc2Patch2 = generatePatchObject(3) await secondDocWritethrough.patch(doc2Patch2) + await travelForward(WRITE_RATE_MS) expect(await db.get(docWritethrough.docId)).toEqual( expect.objectContaining({ @@ -214,7 +213,7 @@ describe("docWritethrough", () => { const initialPatch = generatePatchObject(5) await docWritethrough.patch(initialPatch) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) await docWritethrough.patch({}) @@ -224,9 +223,10 @@ describe("docWritethrough", () => { await db.remove(await db.get(documentId)) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) const extraPatch = generatePatchObject(5) await docWritethrough.patch(extraPatch) + await travelForward(WRITE_RATE_MS) expect(await db.get(documentId)).toEqual( expect.objectContaining(extraPatch) @@ -246,30 +246,46 @@ describe("docWritethrough", () => { ) } - const persistToDbSpy = jest.spyOn(docWritethrough as any, "persistToDb") const storeToCacheSpy = jest.spyOn(docWritethrough as any, "storeToCache") await config.doInTenant(async () => { await parallelPatch(5) - expect(persistToDbSpy).not.toBeCalled() expect(storeToCacheSpy).toBeCalledTimes(5) + expect(await db.exists(documentId)).toBe(false) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) await parallelPatch(40) - expect(persistToDbSpy).toBeCalledTimes(1) expect(storeToCacheSpy).toBeCalledTimes(45) + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + _id: documentId, + _rev: expect.stringMatching(/1-.+/), + }) + ) + await parallelPatch(10) - expect(persistToDbSpy).toBeCalledTimes(1) expect(storeToCacheSpy).toBeCalledTimes(55) + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + _id: documentId, + _rev: expect.stringMatching(/1-.+/), + }) + ) - travelForward(WRITE_RATE_MS) + await travelForward(WRITE_RATE_MS) await parallelPatch(5) - expect(persistToDbSpy).toBeCalledTimes(2) + await travelForward(WRITE_RATE_MS) + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + _id: documentId, + _rev: expect.stringMatching(/3-.+/), + }) + ) expect(storeToCacheSpy).toBeCalledTimes(60) }) }) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index 3205b6f383..f201714903 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -2,6 +2,13 @@ import events from "events" import { timeout } from "../utils" import { Queue, QueueOptions, JobOptions } from "./queue" +interface JobMessage { + timestamp: number + queue: string + data: any + opts?: JobOptions +} + /** * Bull works with a Job wrapper around all messages that contains a lot more information about * the state of the message, this object constructor implements the same schema of Bull jobs @@ -11,12 +18,12 @@ import { Queue, QueueOptions, JobOptions } from "./queue" * @returns A new job which can now be put onto the queue, this is mostly an * internal structure so that an in memory queue can be easily swapped for a Bull queue. */ -function newJob(queue: string, message: any) { +function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { return { timestamp: Date.now(), queue: queue, data: message, - opts: {}, + opts, } } @@ -28,10 +35,12 @@ function newJob(queue: string, message: any) { class InMemoryQueue implements Partial { _name: string _opts?: QueueOptions - _messages: any[] + _messages: JobMessage[] + _queuedJobIds: Set _emitter: EventEmitter _runCount: number _addCount: number + /** * The constructor the queue, exactly the same as that of Bulls. * @param name The name of the queue which is being configured. @@ -45,6 +54,7 @@ class InMemoryQueue implements Partial { this._emitter = new events.EventEmitter() this._runCount = 0 this._addCount = 0 + this._queuedJobIds = new Set() } /** @@ -58,19 +68,24 @@ class InMemoryQueue implements Partial { */ async process(func: any) { this._emitter.on("message", async () => { - const delay = this._opts?.defaultJobOptions?.delay - if (delay) { - await new Promise(r => setTimeout(() => r(), delay)) + try { + if (this._messages.length <= 0) { + return + } + let msg = this._messages.shift() + + let resp = func(msg) + if (resp.then != null) { + await resp + } + this._runCount++ + const jobId = msg?.opts?.jobId?.toString() + if (jobId && msg?.opts?.removeOnComplete) { + this._queuedJobIds.delete(jobId) + } + } catch (e: any) { + throw e } - if (this._messages.length <= 0) { - return - } - let msg = this._messages.shift() - let resp = func(msg) - if (resp.then != null) { - await resp - } - this._runCount++ }) } @@ -89,12 +104,31 @@ class InMemoryQueue implements Partial { */ // eslint-disable-next-line no-unused-vars async add(data: any, opts?: JobOptions) { + const jobId = opts?.jobId?.toString() + if (jobId && this._queuedJobIds.has(jobId)) { + console.log(`Ignoring already queued job ${jobId}`) + return + } + if (typeof data !== "object") { throw "Queue only supports carrying JSON." } - this._messages.push(newJob(this._name, data)) - this._addCount++ - this._emitter.emit("message") + if (jobId) { + this._queuedJobIds.add(jobId) + } + + const pushMessage = () => { + this._messages.push(newJob(this._name, data, opts)) + this._addCount++ + this._emitter.emit("message") + } + + const delay = opts?.delay + if (delay) { + setTimeout(pushMessage, delay) + } else { + pushMessage() + } return {} as any } @@ -143,7 +177,11 @@ class InMemoryQueue implements Partial { async waitForCompletion() { do { await timeout(50) - } while (this._addCount < this._runCount) + } while (this.hasRunningJobs) + } + + hasRunningJobs() { + return this._addCount > this._runCount } } From b94d28b7d63caa6061ff55f623be1f76c9665578 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 13:55:07 +0100 Subject: [PATCH 39/44] Clean --- .../backend-core/src/queue/inMemoryQueue.ts | 28 ++++++++----------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index f201714903..6c8107c7a4 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -68,23 +68,19 @@ class InMemoryQueue implements Partial { */ async process(func: any) { this._emitter.on("message", async () => { - try { - if (this._messages.length <= 0) { - return - } - let msg = this._messages.shift() + if (this._messages.length <= 0) { + return + } + let msg = this._messages.shift() - let resp = func(msg) - if (resp.then != null) { - await resp - } - this._runCount++ - const jobId = msg?.opts?.jobId?.toString() - if (jobId && msg?.opts?.removeOnComplete) { - this._queuedJobIds.delete(jobId) - } - } catch (e: any) { - throw e + let resp = func(msg) + if (resp.then != null) { + await resp + } + this._runCount++ + const jobId = msg?.opts?.jobId?.toString() + if (jobId && msg?.opts?.removeOnComplete) { + this._queuedJobIds.delete(jobId) } }) } From 8d87850765efdea50d4127cc46743eed2c57a511 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 14:19:05 +0100 Subject: [PATCH 40/44] Remove defaults and init --- .../backend-core/src/cache/docWritethrough.ts | 52 +++++++------------ .../src/cache/tests/docWritethrough.spec.ts | 3 -- 2 files changed, 20 insertions(+), 35 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index f53cfbfe5f..1a16f60eb9 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -7,8 +7,6 @@ import { JobQueue, createQueue } from "../queue" import * as context from "../context" import * as dbUtils from "../db" -const DEFAULT_WRITE_RATE_MS = 10000 - let CACHE: BaseCache | null = null async function getCache() { if (!CACHE) { @@ -29,33 +27,27 @@ export const docWritethroughProcessorQueue = createQueue( JobQueue.DOC_WRITETHROUGH_QUEUE ) -let _init = false -export const init = () => { - if (_init) { - return - } - docWritethroughProcessorQueue.process(async message => { - const { tenantId, cacheKeyPrefix } = message.data - await context.doInTenant(tenantId, async () => { - const lockResponse = await locks.doWithLock( - { - type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH, - resource: cacheKeyPrefix, - ttl: 15000, - }, - async () => { - await persistToDb(message.data) - } - ) - - if (!lockResponse.executed) { - console.log(`Ignoring redlock conflict in write-through cache`) +docWritethroughProcessorQueue.process(async message => { + const { tenantId, cacheKeyPrefix } = message.data + await context.doInTenant(tenantId, async () => { + const lockResponse = await locks.doWithLock( + { + type: LockType.TRY_ONCE, + name: LockName.PERSIST_WRITETHROUGH, + resource: cacheKeyPrefix, + ttl: 15000, + }, + async () => { + await persistToDb(message.data) + console.log("DocWritethrough persisted", { data: message.data }) } - }) + ) + + if (!lockResponse.executed) { + console.log(`Ignoring redlock conflict in write-through cache`) + } }) - _init = true -} +}) export async function persistToDb({ dbName, @@ -97,11 +89,7 @@ export class DocWritethrough { private cacheKeyPrefix: string - constructor( - db: Database, - docId: string, - writeRateMs: number = DEFAULT_WRITE_RATE_MS - ) { + constructor(db: Database, docId: string, writeRateMs: number) { this.db = db this._docId = docId this.writeRateMs = writeRateMs diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 83af66a9d2..a5765171cb 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -5,7 +5,6 @@ import _ from "lodash" import { DocWritethrough, docWritethroughProcessorQueue, - init, } from "../docWritethrough" import InMemoryQueue from "../../queue/inMemoryQueue" @@ -45,8 +44,6 @@ describe("docWritethrough", () => { }, {} as Record) } - beforeAll(() => init()) - beforeEach(async () => { resetTime() documentId = structures.uuid() From 0649497ab53a1d73bac39f3c4ec8ba2cb8e88c3c Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 14:47:23 +0100 Subject: [PATCH 41/44] Add comment --- packages/backend-core/src/cache/tests/docWritethrough.spec.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index a5765171cb..3e638a4eec 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -256,6 +256,8 @@ describe("docWritethrough", () => { expect(storeToCacheSpy).toBeCalledTimes(45) + // Ideally we want to spy on persistToDb from ./docWritethrough, but due our barrel files configuration required quite of a complex setup. + // We are relying on the document being stored only once (otherwise we would have _rev updated) expect(await db.get(documentId)).toEqual( expect.objectContaining({ _id: documentId, From 2b25f9f0cb75ae1925db074348dbdaab521747c6 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 15:02:02 +0100 Subject: [PATCH 42/44] Improve redlock non executed response --- packages/backend-core/src/cache/docWritethrough.ts | 9 +++++++++ packages/backend-core/src/redis/redlockImpl.ts | 10 +++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index 1a16f60eb9..ebb64ee9e5 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -44,6 +44,15 @@ docWritethroughProcessorQueue.process(async message => { ) if (!lockResponse.executed) { + if ( + lockResponse.reason !== + locks.UnsuccessfulRedlockExecutionReason.LockTakenWithTryOnce + ) { + console.error("Error persisting docWritethrough", { + data: message.data, + }) + throw "Error persisting docWritethrough" + } console.log(`Ignoring redlock conflict in write-through cache`) } }) diff --git a/packages/backend-core/src/redis/redlockImpl.ts b/packages/backend-core/src/redis/redlockImpl.ts index adeb5b12ec..28babb9405 100644 --- a/packages/backend-core/src/redis/redlockImpl.ts +++ b/packages/backend-core/src/redis/redlockImpl.ts @@ -82,6 +82,11 @@ type SuccessfulRedlockExecution = { } type UnsuccessfulRedlockExecution = { executed: false + reason: UnsuccessfulRedlockExecutionReason +} + +export const enum UnsuccessfulRedlockExecutionReason { + LockTakenWithTryOnce = "LOCK_TAKEN_WITH_TRY_ONCE", } type RedlockExecution = @@ -141,7 +146,10 @@ export async function doWithLock( if (opts.type === LockType.TRY_ONCE) { // don't throw for try-once locks, they will always error // due to retry count (0) exceeded - return { executed: false } + return { + executed: false, + reason: UnsuccessfulRedlockExecutionReason.LockTakenWithTryOnce, + } } else { throw e } From 4fe7e67dd51617c36356ccc79343a8d12f261ea4 Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 17:15:50 +0100 Subject: [PATCH 43/44] Do not use lock --- .../backend-core/src/cache/docWritethrough.ts | 37 ++----------------- .../src/cache/tests/docWritethrough.spec.ts | 4 +- 2 files changed, 4 insertions(+), 37 deletions(-) diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index ebb64ee9e5..d4d651c688 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -1,7 +1,6 @@ import BaseCache from "./base" import { getDocWritethroughClient } from "../redis/init" -import { AnyDocument, Database, LockName, LockType } from "@budibase/types" -import * as locks from "../redis/redlockImpl" +import { AnyDocument, Database } from "@budibase/types" import { JobQueue, createQueue } from "../queue" import * as context from "../context" @@ -17,7 +16,6 @@ async function getCache() { } interface ProcessDocMessage { - tenantId: string dbName: string docId: string cacheKeyPrefix: string @@ -28,34 +26,8 @@ export const docWritethroughProcessorQueue = createQueue( ) docWritethroughProcessorQueue.process(async message => { - const { tenantId, cacheKeyPrefix } = message.data - await context.doInTenant(tenantId, async () => { - const lockResponse = await locks.doWithLock( - { - type: LockType.TRY_ONCE, - name: LockName.PERSIST_WRITETHROUGH, - resource: cacheKeyPrefix, - ttl: 15000, - }, - async () => { - await persistToDb(message.data) - console.log("DocWritethrough persisted", { data: message.data }) - } - ) - - if (!lockResponse.executed) { - if ( - lockResponse.reason !== - locks.UnsuccessfulRedlockExecutionReason.LockTakenWithTryOnce - ) { - console.error("Error persisting docWritethrough", { - data: message.data, - }) - throw "Error persisting docWritethrough" - } - console.log(`Ignoring redlock conflict in write-through cache`) - } - }) + await persistToDb(message.data) + console.log("DocWritethrough persisted", { data: message.data }) }) export async function persistToDb({ @@ -94,7 +66,6 @@ export class DocWritethrough { private db: Database private _docId: string private writeRateMs: number - private tenantId: string private cacheKeyPrefix: string @@ -103,7 +74,6 @@ export class DocWritethrough { this._docId = docId this.writeRateMs = writeRateMs this.cacheKeyPrefix = `${this.db.name}:${this.docId}` - this.tenantId = context.getTenantId() } get docId() { @@ -117,7 +87,6 @@ export class DocWritethrough { docWritethroughProcessorQueue.add( { - tenantId: this.tenantId, dbName: this.db.name, docId: this.docId, cacheKeyPrefix: this.cacheKeyPrefix, diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts index 3e638a4eec..9bbcd6af44 100644 --- a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -47,9 +47,7 @@ describe("docWritethrough", () => { beforeEach(async () => { resetTime() documentId = structures.uuid() - await config.doInTenant(async () => { - docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) - }) + docWritethrough = new DocWritethrough(db, documentId, WRITE_RATE_MS) }) it("patching will not persist if timeout does not hit", async () => { From ebcb7718b8f6e60e88c1ca4bbcb7cf0f18857efa Mon Sep 17 00:00:00 2001 From: Adria Navarro Date: Tue, 5 Mar 2024 18:06:14 +0100 Subject: [PATCH 44/44] Use bulk --- packages/backend-core/src/cache/base/index.ts | 19 +++++++++++++++++++ .../backend-core/src/cache/docWritethrough.ts | 10 +++++----- 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/packages/backend-core/src/cache/base/index.ts b/packages/backend-core/src/cache/base/index.ts index 911bd6a831..942d70ae72 100644 --- a/packages/backend-core/src/cache/base/index.ts +++ b/packages/backend-core/src/cache/base/index.ts @@ -46,6 +46,25 @@ export default class BaseCache { await client.store(key, value, ttl) } + /** + * Bulk write to the cache. + */ + async bulkStore( + data: Record, + ttl: number | null = null, + opts = { useTenancy: true } + ) { + if (opts.useTenancy) { + data = Object.entries(data).reduce((acc, [key, value]) => { + acc[generateTenantKey(key)] = value + return acc + }, {} as Record) + } + + const client = await this.getClient() + await client.bulkStore(data, ttl) + } + /** * Remove from cache. */ diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts index d4d651c688..a0bc14ec5c 100644 --- a/packages/backend-core/src/cache/docWritethrough.ts +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -3,7 +3,6 @@ import { getDocWritethroughClient } from "../redis/init" import { AnyDocument, Database } from "@budibase/types" import { JobQueue, createQueue } from "../queue" -import * as context from "../context" import * as dbUtils from "../db" let CACHE: BaseCache | null = null @@ -101,9 +100,10 @@ export class DocWritethrough { } private async storeToCache(cache: BaseCache, data: Record) { - for (const [key, value] of Object.entries(data)) { - const cacheKey = this.cacheKeyPrefix + ":data:" + key - await cache.store(cacheKey, { key, value }, undefined) - } + data = Object.entries(data).reduce((acc, [key, value]) => { + acc[this.cacheKeyPrefix + ":data:" + key] = { key, value } + return acc + }, {} as Record) + await cache.bulkStore(data, null) } }