From d3fdb52bf5b0cbe09bcee0568a4f7ad47ea81196 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 10:21:50 +0100 Subject: [PATCH 1/9] Store all socket users in redis to allow all server instances to be aware of all users --- .../builder/src/builderStore/websocket.js | 13 +- .../src/components/grid/layout/Grid.svelte | 1 + .../src/components/grid/lib/websocket.js | 17 +-- packages/frontend-core/src/utils/websocket.js | 2 +- packages/server/src/utilities/redis.ts | 11 +- packages/server/src/websockets/builder.ts | 79 +++++------ packages/server/src/websockets/client.ts | 4 +- packages/server/src/websockets/grid.ts | 66 +++------ packages/server/src/websockets/websocket.ts | 132 ++++++++++++++++-- packages/shared-core/src/constants.ts | 18 +++ packages/types/src/sdk/index.ts | 1 + packages/types/src/sdk/websocket.ts | 9 ++ 12 files changed, 243 insertions(+), 110 deletions(-) create mode 100644 packages/types/src/sdk/websocket.ts diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js index d58eb24bbf..79138b0767 100644 --- a/packages/builder/src/builderStore/websocket.js +++ b/packages/builder/src/builderStore/websocket.js @@ -1,13 +1,14 @@ import { createWebsocket } from "@budibase/frontend-core" import { userStore } from "builderStore" import { datasources, tables } from "stores/backend" +import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core" export const createBuilderWebsocket = () => { const socket = createWebsocket("/socket/builder") - // Connection events + // Built-in events socket.on("connect", () => { - socket.emit("get-users", null, response => { + socket.emit(SocketEvents.GetUsers, null, response => { userStore.actions.init(response.users) }) }) @@ -16,16 +17,16 @@ export const createBuilderWebsocket = () => { }) // User events - socket.on("user-update", userStore.actions.updateUser) - socket.on("user-disconnect", userStore.actions.removeUser) + socket.on(SocketEvents.UserUpdate, userStore.actions.updateUser) + socket.on(SocketEvents.UserDisconnect, userStore.actions.removeUser) // Table events - socket.on("table-change", ({ id, table }) => { + socket.on(BuilderSocketEvents.TableChange, ({ id, table }) => { tables.replaceTable(id, table) }) // Datasource events - socket.on("datasource-change", ({ id, datasource }) => { + socket.on(BuilderSocketEvents.DatasourceChange, ({ id, datasource }) => { datasources.replaceDatasource(id, datasource) }) diff --git a/packages/frontend-core/src/components/grid/layout/Grid.svelte b/packages/frontend-core/src/components/grid/layout/Grid.svelte index 08325857d6..72c32cbbee 100644 --- a/packages/frontend-core/src/components/grid/layout/Grid.svelte +++ b/packages/frontend-core/src/components/grid/layout/Grid.svelte @@ -62,6 +62,7 @@ stripeRows, }) + // Build up context let context = { API: API || createAPIClient(), diff --git a/packages/frontend-core/src/components/grid/lib/websocket.js b/packages/frontend-core/src/components/grid/lib/websocket.js index 3f1c473ea0..3f4ee32933 100644 --- a/packages/frontend-core/src/components/grid/lib/websocket.js +++ b/packages/frontend-core/src/components/grid/lib/websocket.js @@ -1,5 +1,6 @@ import { get } from "svelte/store" import { createWebsocket } from "../../../utils" +import { SocketEvents, GridSocketEvents } from "@budibase/shared-core" export const createGridWebsocket = context => { const { rows, tableId, users, focusedCellId, table } = context @@ -10,13 +11,13 @@ export const createGridWebsocket = context => { return } // Identify which table we are editing - socket.emit("select-table", tableId, response => { + socket.emit(GridSocketEvents.SelectTable, tableId, response => { // handle initial connection info users.set(response.users) }) } - // Connection events + // Built-in events socket.on("connect", () => { connectToTable(get(tableId)) }) @@ -25,25 +26,25 @@ export const createGridWebsocket = context => { }) // User events - socket.on("user-update", user => { + socket.on(SocketEvents.UserUpdate, user => { users.actions.updateUser(user) }) - socket.on("user-disconnect", user => { + socket.on(SocketEvents.UserDisconnect, user => { users.actions.removeUser(user) }) // Row events - socket.on("row-change", async data => { + socket.on(GridSocketEvents.RowChange, async data => { if (data.id) { rows.actions.replaceRow(data.id, data.row) } else if (data.row.id) { - // Handle users table edge case + // Handle users table edge cased await rows.actions.refreshRow(data.row.id) } }) // Table events - socket.on("table-change", data => { + socket.on(GridSocketEvents.TableChange, data => { // Only update table if one exists. If the table was deleted then we don't // want to know - let the builder navigate away if (data.table) { @@ -56,7 +57,7 @@ export const createGridWebsocket = context => { // Notify selected cell changes focusedCellId.subscribe($focusedCellId => { - socket.emit("select-cell", $focusedCellId) + socket.emit(GridSocketEvents.SelectCell, $focusedCellId) }) return () => socket?.disconnect() diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 839fa6d73d..6de9690253 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -21,6 +21,6 @@ export const createWebsocket = path => { timeout: 4000, // Disable polling and rely on websocket only, as HTTP transport // will only work with sticky sessions which we don't have - transports: ["websocket"], + // transports: ["websocket"], }) } diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts index ff1c863bf7..2ae2216fc4 100644 --- a/packages/server/src/utilities/redis.ts +++ b/packages/server/src/utilities/redis.ts @@ -1,6 +1,6 @@ import { redis } from "@budibase/backend-core" import { getGlobalIDFromUserMetadataID } from "../db/utils" -import { ContextUser } from "@budibase/types" +import { ContextUser, SocketUser } from "@budibase/types" const APP_DEV_LOCK_SECONDS = 600 const AUTOMATION_TEST_FLAG_SECONDS = 60 @@ -26,6 +26,7 @@ export async function init() { } export async function shutdown() { + console.log("REDIS SHUTDOWN") if (devAppClient) await devAppClient.finish() if (debounceClient) await debounceClient.finish() if (flagClient) await flagClient.finish() @@ -97,6 +98,14 @@ export async function clearTestFlag(id: string) { await devAppClient.delete(id) } +export async function getSocketUsers(path: string, room: string) { + return await socketClient.get(`${path}:${room}`) +} + +export async function setSocketUsers(path: string, room: string, users: SocketUser[]) { + await socketClient.store(`${path}:${room}`, users) +} + export function getSocketPubSubClients() { return { pub: socketClient.getClient(), diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index de8709129d..6fe02be0ba 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -1,69 +1,70 @@ import authorized from "../middleware/authorized" -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" -import { Datasource, Table } from "@budibase/types" +import { Datasource, Table, SocketUser, ContextUser } from "@budibase/types" import { gridSocket } from "./index" import { clearLock } from "../utilities/redis" +import { Socket } from "socket.io" +import { BuilderSocketEvents } from "@budibase/shared-core" -export default class BuilderSocket extends Socket { +export default class BuilderSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/builder", [authorized(permissions.BUILDER)]) + } - this.io.on("connection", socket => { - // Join a room for this app - const user = socket.data.user - const appId = socket.data.appId - socket.join(appId) - socket.to(appId).emit("user-update", user) + async onConnect(socket: Socket) { + // Join a room for this app + await this.joinRoom(socket, socket.data.appId) + } - // Initial identification of connected spreadsheet - socket.on("get-users", async (payload, callback) => { - const sockets = await this.io.in(appId).fetchSockets() - callback({ - users: sockets.map(socket => socket.data.user), - }) + async onDisconnect(socket: Socket) { + // Remove app lock from this user if they have no other connections + try { + // @ts-ignore + const user: SocketUser = socket.data + const { _id, sessionId, appId } = user + const users = await this.getSocketUsers(user.room) + const hasOtherConnection = users.some(otherUser => { + return _id === otherUser._id && sessionId !== otherUser.sessionId }) - - // Disconnection cleanup - socket.on("disconnect", async () => { - socket.to(appId).emit("user-disconnect", user) - - // Remove app lock from this user if they have no other connections - try { - const sockets = await this.io.in(appId).fetchSockets() - const hasOtherConnection = sockets.some(socket => { - const { _id, sessionId } = socket.data.user - return _id === user._id && sessionId !== user.sessionId - }) - if (!hasOtherConnection) { - await clearLock(appId, user) - } - } catch (e) { - // This is fine, just means this user didn't hold the lock - } - }) - }) + if (!hasOtherConnection) { + // @ts-ignore + const user: ContextUser = { _id: socket.data._id } + await clearLock(appId, user) + } + } catch (e) { + // This is fine, just means this user didn't hold the lock + } } emitTableUpdate(ctx: any, table: Table) { - this.io.in(ctx.appId).emit("table-change", { id: table._id, table }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvents.TableChange, { id: table._id, table }) gridSocket.emitTableUpdate(table) } emitTableDeletion(ctx: any, id: string) { - this.io.in(ctx.appId).emit("table-change", { id, table: null }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvents.TableChange, { id, table: null }) gridSocket.emitTableDeletion(id) } emitDatasourceUpdate(ctx: any, datasource: Datasource) { this.io .in(ctx.appId) - .emit("datasource-change", { id: datasource._id, datasource }) + .emit(BuilderSocketEvents.DatasourceChange, { + id: datasource._id, + datasource, + }) } emitDatasourceDeletion(ctx: any, id: string) { - this.io.in(ctx.appId).emit("datasource-change", { id, datasource: null }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvents.DatasourceChange, { id, datasource: null }) } } diff --git a/packages/server/src/websockets/client.ts b/packages/server/src/websockets/client.ts index d59325f66e..165febb7a3 100644 --- a/packages/server/src/websockets/client.ts +++ b/packages/server/src/websockets/client.ts @@ -1,10 +1,10 @@ -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import authorized from "../middleware/authorized" import http from "http" import Koa from "koa" import { permissions } from "@budibase/backend-core" -export default class ClientAppWebsocket extends Socket { +export default class ClientAppWebsocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/client", [authorized(permissions.BUILDER)]) } diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts index a0272be33c..c3c460b19d 100644 --- a/packages/server/src/websockets/grid.ts +++ b/packages/server/src/websockets/grid.ts @@ -1,73 +1,51 @@ import authorized from "../middleware/authorized" -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" import { getTableId } from "../api/controllers/row/utils" import { Row, Table } from "@budibase/types" +import { Socket } from "socket.io" +import { GridSocketEvents } from "@budibase/shared-core" -export default class GridSocket extends Socket { +export default class GridSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/grid", [authorized(permissions.BUILDER)]) + } - this.io.on("connection", socket => { - const user = socket.data.user + async onConnect(socket: Socket) { + // Initial identification of connected spreadsheet + socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => { + await this.joinRoom(socket, tableId) - // Socket state - let currentRoom: string + // Reply with all users in current roome + const users = await this.getSocketUsers(tableId) + callback({ users }) + }) - // Initial identification of connected spreadsheet - socket.on("select-table", async (tableId, callback) => { - // Leave current room - if (currentRoom) { - socket.to(currentRoom).emit("user-disconnect", user) - socket.leave(currentRoom) - } - - // Join new room - currentRoom = tableId - socket.join(currentRoom) - socket.to(currentRoom).emit("user-update", user) - - // Reply with all users in current room - const sockets = await this.io.in(currentRoom).fetchSockets() - callback({ - users: sockets.map(socket => socket.data.user), - }) - }) - - // Handle users selecting a new cell - socket.on("select-cell", cellId => { - socket.data.user.focusedCellId = cellId - if (currentRoom) { - socket.to(currentRoom).emit("user-update", user) - } - }) - - // Disconnection cleanup - socket.on("disconnect", () => { - if (currentRoom) { - socket.to(currentRoom).emit("user-disconnect", user) - } - }) + // Handle users selecting a new cell + socket.on(GridSocketEvents.SelectCell, cellId => { + this.updateUser(socket, { focusedCellId: cellId }) }) } emitRowUpdate(ctx: any, row: Row) { const tableId = getTableId(ctx) - this.io.in(tableId).emit("row-change", { id: row._id, row }) + this.io.in(tableId).emit(GridSocketEvents.RowChange, { id: row._id, row }) } emitRowDeletion(ctx: any, id: string) { const tableId = getTableId(ctx) - this.io.in(tableId).emit("row-change", { id, row: null }) + this.io.in(tableId).emit(GridSocketEvents.RowChange, { id, row: null }) } emitTableUpdate(table: Table) { - this.io.in(table._id!).emit("table-change", { id: table._id, table }) + this.io + .in(table._id!) + .emit(GridSocketEvents.TableChange, { id: table._id, table }) } emitTableDeletion(id: string) { - this.io.in(id).emit("table-change", { id, table: null }) + this.io.in(id).emit(GridSocketEvents.TableChange, { id, table: null }) } } diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 5cac959099..40ab42ab22 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -6,11 +6,18 @@ import { userAgent } from "koa-useragent" import { auth } from "@budibase/backend-core" import currentApp from "../middleware/currentapp" import { createAdapter } from "@socket.io/redis-adapter" -import { getSocketPubSubClients } from "../utilities/redis" -import uuid from "uuid" +import { Socket } from "socket.io" +import { + getSocketPubSubClients, + getSocketUsers, + setSocketUsers, +} from "../utilities/redis" +import { SocketEvents } from "@budibase/shared-core" +import { SocketUser } from "@budibase/types" -export default class Socket { +export class BaseSocket { io: Server + path: string constructor( app: Koa, @@ -18,6 +25,7 @@ export default class Socket { path: string = "/", additionalMiddlewares?: any[] ) { + this.path = path this.io = new Server(server, { path, }) @@ -65,18 +73,15 @@ export default class Socket { // Middlewares are finished // Extract some data from our enriched koa context to persist // as metadata for the socket - // Add user info, including a deterministic color and label const { _id, email, firstName, lastName } = ctx.user - socket.data.user = { + socket.data = { _id, email, firstName, lastName, - sessionId: uuid.v4(), + appId: ctx.appId, + sessionId: socket.id, } - - // Add app ID to help split sockets into rooms - socket.data.appId = ctx.appId next() } }) @@ -90,6 +95,115 @@ export default class Socket { const { pub, sub } = getSocketPubSubClients() const opts = { key: `socket.io-${path}` } this.io.adapter(createAdapter(pub, sub, opts)) + + // Handle user connections and disconnections + this.io.on("connection", async socket => { + // Add built in handler to allow fetching all other users in this room + socket.on(SocketEvents.GetUsers, async (payload, callback) => { + let users + if (socket.data.room) { + users = await this.getSocketUsers(socket.data.room) + } + callback({ users }) + }) + + // Add handlers for this socket + await this.onConnect(socket) + + // Add early disconnection handler to clean up and leave room + socket.on("disconnect", async () => { + // Leave the current room when the user disconnects if we're in one + if (socket.data.room) { + await this.leaveRoom(socket) + } + + // Run any other disconnection logic + await this.onDisconnect(socket) + }) + }) + } + + // Gets a list of all users inside a certain room + async getSocketUsers(room?: string): Promise { + if (room) { + const users = await getSocketUsers(this.path, room) + return users || [] + } else { + return [] + } + } + + // Adds a user to a certain room + async joinRoom(socket: Socket, room: string) { + // Check if we're already in a room, as we'll need to leave if we are before we + // can join a different room + const oldRoom = socket.data.room + if (oldRoom && oldRoom !== room) { + await this.leaveRoom(socket) + } + + // Join new room + if (!oldRoom || oldRoom !== room) { + socket.join(room) + socket.data.room = room + } + + // @ts-ignore + let user: SocketUser = socket.data + let users = await this.getSocketUsers(room) + + // Store this socket in redis + if (!users?.length) { + users = [] + } + const index = users.findIndex(x => x.sessionId === socket.data.sessionId) + if (index === -1) { + users.push(user) + } else { + users[index] = user + } + await setSocketUsers(this.path, room, users) + socket.to(room).emit(SocketEvents.UserUpdate, user) + } + + // Disconnects a socket from its current room + async leaveRoom(socket: Socket) { + // @ts-ignore + let user: SocketUser = socket.data + const { room, sessionId } = user + if (!room) { + return + } + socket.leave(room) + socket.data.room = undefined + + let users = await this.getSocketUsers(room) + + // Remove this socket from redis + users = users.filter(user => user.sessionId !== sessionId) + await setSocketUsers(this.path, room, users) + socket.to(room).emit(SocketEvents.UserDisconnect, user) + } + + // Updates a connected user's metadata, assuming a room change is not required. + async updateUser(socket: Socket, patch: Object) { + socket.data = { + ...socket.data, + ...patch, + } + + // If we're in a room, notify others of this change and update redis + if (socket.data.room) { + await this.joinRoom(socket, socket.data.room) + } + } + + async onConnect(socket: Socket) { + // Override + } + + async onDisconnect(socket: Socket) { + // Override } // Emit an event to all sockets diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 35b02d1e15..62603ee941 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -67,3 +67,21 @@ export const SqlNumberTypeRangeMap = { min: -8388608, }, } + +export const SocketEvents = { + UserUpdate: "UserUpdate", + UserDisconnect: "UserDisconnect", + GetUsers: "GetUsers" +} + +export const GridSocketEvents = { + RowChange: "RowChange", + TableChange: "TableChange", + SelectTable: "SelectTable", + SelectCell: "SelectCell" +} + +export const BuilderSocketEvents = { + TableChange: "TableChange", + DatasourceChange: "DatasourceChange" +} \ No newline at end of file diff --git a/packages/types/src/sdk/index.ts b/packages/types/src/sdk/index.ts index ed44c13667..49d0387a82 100644 --- a/packages/types/src/sdk/index.ts +++ b/packages/types/src/sdk/index.ts @@ -17,3 +17,4 @@ export * from "./auditLogs" export * from "./sso" export * from "./user" export * from "./cli" +export * from "./websocket" diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts new file mode 100644 index 0000000000..822c4294e0 --- /dev/null +++ b/packages/types/src/sdk/websocket.ts @@ -0,0 +1,9 @@ +export interface SocketUser { + _id: string, + email: string, + firstName?: string, + lastName?: string, + appId: string, + sessionId: string, + room?: string +} \ No newline at end of file From d9266502328827a5cb0d1b40f2745ec750668f4a Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 10:52:39 +0100 Subject: [PATCH 2/9] Add back in functionality to select different redis DB's per client --- packages/backend-core/src/redis/init.ts | 5 +---- packages/backend-core/src/redis/redis.ts | 3 +++ packages/backend-core/src/redis/utils.ts | 2 +- packages/server/src/utilities/redis.ts | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index 485268edad..bbece02060 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -14,10 +14,7 @@ async function init() { appClient = await new Client(utils.Databases.APP_METADATA).init() cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init() lockClient = await new Client(utils.Databases.LOCKS).init() - writethroughClient = await new Client( - utils.Databases.WRITE_THROUGH, - utils.SelectableDatabase.WRITE_THROUGH - ).init() + writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init() } export async function shutdown() { diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 2d54b51a9f..de947f5edc 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -183,6 +183,9 @@ class RedisWrapper { CLOSED = false init(this._select) await waitForConnection(this._select) + if (this._select) { + this.getClient().select(this._select) + } return this } diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts index f8b815824c..b57dd58d57 100644 --- a/packages/backend-core/src/redis/utils.ts +++ b/packages/backend-core/src/redis/utils.ts @@ -41,7 +41,7 @@ export enum Databases { */ export enum SelectableDatabase { DEFAULT = 0, - WRITE_THROUGH = 1, + SOCKET_IO = 1, UNUSED_1 = 2, UNUSED_2 = 3, UNUSED_3 = 4, diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts index 2ae2216fc4..f2625b9cfa 100644 --- a/packages/server/src/utilities/redis.ts +++ b/packages/server/src/utilities/redis.ts @@ -15,7 +15,7 @@ export async function init() { devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS) debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE) flagClient = new redis.Client(redis.utils.Databases.FLAGS) - socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO) + socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO, redis.utils.SelectableDatabase.SOCKET_IO) await devAppClient.init() await debounceClient.init() await flagClient.init() From 05e34076f78ce16f957f32237df0e2a3d426813d Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 15:13:22 +0100 Subject: [PATCH 3/9] Fully rewrite websocket redis integration to use individual keys per session, enable TTLs on sesisons, prune sessions when users connect and add a heartbeat to sockets --- packages/backend-core/src/redis/init.ts | 12 +- packages/backend-core/src/redis/redis.ts | 5 + .../src/builderStore/store/frontend.js | 2 +- .../builder/src/builderStore/websocket.js | 17 +- packages/frontend-core/src/utils/websocket.js | 23 ++- packages/server/src/utilities/redis.ts | 17 +- packages/server/src/websockets/builder.ts | 26 ++- packages/server/src/websockets/grid.ts | 6 +- packages/server/src/websockets/websocket.ts | 171 ++++++++++++------ packages/shared-core/src/constants.ts | 8 +- packages/types/src/sdk/websocket.ts | 3 +- 11 files changed, 193 insertions(+), 97 deletions(-) diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index bbece02060..056620c085 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -6,7 +6,8 @@ let userClient: Client, appClient: Client, cacheClient: Client, writethroughClient: Client, - lockClient: Client + lockClient: Client, + socketClient: Client async function init() { userClient = await new Client(utils.Databases.USER_CACHE).init() @@ -15,6 +16,7 @@ async function init() { cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init() lockClient = await new Client(utils.Databases.LOCKS).init() writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init() + socketClient = await new Client(utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO).init() } export async function shutdown() { @@ -24,6 +26,7 @@ export async function shutdown() { if (cacheClient) await cacheClient.finish() if (writethroughClient) await writethroughClient.finish() if (lockClient) await lockClient.finish() + if (socketClient) await socketClient.finish() } process.on("exit", async () => { @@ -71,3 +74,10 @@ export async function getLockClient() { } return lockClient } + +export async function getSocketClient() { + if (!socketClient) { + await init() + } + return socketClient +} \ No newline at end of file diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index de947f5edc..ea1f1944a1 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -212,6 +212,11 @@ class RedisWrapper { return this.getClient().keys(addDbPrefix(db, pattern)) } + async exists(key: string) { + const db = this._db + return await this.getClient().exists(addDbPrefix(db, key)) + } + async get(key: string) { const db = this._db let response = await this.getClient().get(addDbPrefix(db, key)) diff --git a/packages/builder/src/builderStore/store/frontend.js b/packages/builder/src/builderStore/store/frontend.js index c921ddef3b..4f2a1bef6f 100644 --- a/packages/builder/src/builderStore/store/frontend.js +++ b/packages/builder/src/builderStore/store/frontend.js @@ -118,7 +118,7 @@ export const getFrontendStore = () => { }, initialise: async pkg => { const { layouts, screens, application, clientLibPath, hasLock } = pkg - websocket = createBuilderWebsocket() + websocket = createBuilderWebsocket(application.appId) await store.actions.components.refreshDefinitions(application.appId) // Reset store state diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js index 79138b0767..ffaca7a025 100644 --- a/packages/builder/src/builderStore/websocket.js +++ b/packages/builder/src/builderStore/websocket.js @@ -3,12 +3,12 @@ import { userStore } from "builderStore" import { datasources, tables } from "stores/backend" import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core" -export const createBuilderWebsocket = () => { +export const createBuilderWebsocket = appId => { const socket = createWebsocket("/socket/builder") // Built-in events socket.on("connect", () => { - socket.emit(SocketEvents.GetUsers, null, response => { + socket.emit(BuilderSocketEvents.SelectApp, appId, response => { userStore.actions.init(response.users) }) }) @@ -30,11 +30,10 @@ export const createBuilderWebsocket = () => { datasources.replaceDatasource(id, datasource) }) - return { - ...socket, - disconnect: () => { - socket?.disconnect() - userStore.actions.reset() - }, - } + // Clean up user store on disconnect + socket.on("disconnect", () => { + userStore.actions.reset() + }) + + return socket } diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 6de9690253..6f907f5270 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -1,6 +1,7 @@ import { io } from "socket.io-client" +import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core" -export const createWebsocket = path => { +export const createWebsocket = (path, heartbeat = true) => { if (!path) { throw "A websocket path must be provided" } @@ -10,7 +11,7 @@ export const createWebsocket = path => { const proto = tls ? "wss:" : "ws:" const host = location.hostname const port = location.port || (tls ? 443 : 80) - return io(`${proto}//${host}:${port}`, { + const socket = io(`${proto}//${host}:${port}`, { path, // Cap reconnection attempts to 3 (total of 15 seconds before giving up) reconnectionAttempts: 3, @@ -21,6 +22,22 @@ export const createWebsocket = path => { timeout: 4000, // Disable polling and rely on websocket only, as HTTP transport // will only work with sticky sessions which we don't have - // transports: ["websocket"], + transports: ["websocket"], }) + + // Set up a heartbeat that's half of the session TTL + let interval + if (heartbeat) { + interval = setInterval(() => { + console.log("Sending heartbeat") + socket.emit(SocketEvents.Heartbeat) + }, SocketSessionTTL * 500) + } + + socket.on("disconnect", () => { + console.log("clear interval") + clearInterval(interval) + }) + + return socket } diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts index f2625b9cfa..5785d79d2a 100644 --- a/packages/server/src/utilities/redis.ts +++ b/packages/server/src/utilities/redis.ts @@ -1,12 +1,13 @@ import { redis } from "@budibase/backend-core" import { getGlobalIDFromUserMetadataID } from "../db/utils" -import { ContextUser, SocketUser } from "@budibase/types" +import { ContextUser } from "@budibase/types" const APP_DEV_LOCK_SECONDS = 600 const AUTOMATION_TEST_FLAG_SECONDS = 60 -let devAppClient: any, debounceClient: any, flagClient: any, socketClient: any +let devAppClient: any, debounceClient: any, flagClient: any // We need to maintain a duplicate client for socket.io pub/sub +let socketClient: any let socketSubClient: any // We init this as we want to keep the connection open all the time @@ -15,13 +16,12 @@ export async function init() { devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS) debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE) flagClient = new redis.Client(redis.utils.Databases.FLAGS) - socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO, redis.utils.SelectableDatabase.SOCKET_IO) await devAppClient.init() await debounceClient.init() await flagClient.init() - await socketClient.init() // Duplicate the socket client for pub/sub + socketClient = await redis.clients.getSocketClient() socketSubClient = socketClient.getClient().duplicate() } @@ -30,7 +30,6 @@ export async function shutdown() { if (devAppClient) await devAppClient.finish() if (debounceClient) await debounceClient.finish() if (flagClient) await flagClient.finish() - if (socketClient) await socketClient.finish() if (socketSubClient) socketSubClient.disconnect() // shutdown core clients await redis.clients.shutdown() @@ -98,14 +97,6 @@ export async function clearTestFlag(id: string) { await devAppClient.delete(id) } -export async function getSocketUsers(path: string, room: string) { - return await socketClient.get(`${path}:${room}`) -} - -export async function setSocketUsers(path: string, room: string, users: SocketUser[]) { - await socketClient.store(`${path}:${room}`, users) -} - export function getSocketPubSubClients() { return { pub: socketClient.getClient(), diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index 6fe02be0ba..9fbedbe9e2 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -3,7 +3,7 @@ import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" -import { Datasource, Table, SocketUser, ContextUser } from "@budibase/types" +import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types" import { gridSocket } from "./index" import { clearLock } from "../utilities/redis" import { Socket } from "socket.io" @@ -15,24 +15,30 @@ export default class BuilderSocket extends BaseSocket { } async onConnect(socket: Socket) { - // Join a room for this app - await this.joinRoom(socket, socket.data.appId) + // Initial identification of selected app + socket.on(BuilderSocketEvents.SelectApp, async (appId, callback) => { + await this.joinRoom(socket, appId) + + // Reply with all users in current room + const sessions = await this.getRoomSessions(appId) + callback({ users: sessions }) + }) } async onDisconnect(socket: Socket) { // Remove app lock from this user if they have no other connections try { // @ts-ignore - const user: SocketUser = socket.data - const { _id, sessionId, appId } = user - const users = await this.getSocketUsers(user.room) - const hasOtherConnection = users.some(otherUser => { - return _id === otherUser._id && sessionId !== otherUser.sessionId + const session: SocketSession = socket.data + const { _id, sessionId, room } = session + const sessions = await this.getRoomSessions(room) + const hasOtherSession = sessions.some(otherSession => { + return _id === otherSession._id && sessionId !== otherSession.sessionId }) - if (!hasOtherConnection) { + if (!hasOtherSession && room) { // @ts-ignore const user: ContextUser = { _id: socket.data._id } - await clearLock(appId, user) + await clearLock(room, user) } } catch (e) { // This is fine, just means this user didn't hold the lock diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts index c3c460b19d..833c7b3348 100644 --- a/packages/server/src/websockets/grid.ts +++ b/packages/server/src/websockets/grid.ts @@ -18,9 +18,9 @@ export default class GridSocket extends BaseSocket { socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => { await this.joinRoom(socket, tableId) - // Reply with all users in current roome - const users = await this.getSocketUsers(tableId) - callback({ users }) + // Reply with all users in current room + const sessions = await this.getRoomSessions(tableId) + callback({ users: sessions }) }) // Handle users selecting a new cell diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 40ab42ab22..2ecff7a62c 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -3,21 +3,18 @@ import http from "http" import Koa from "koa" import Cookies from "cookies" import { userAgent } from "koa-useragent" -import { auth } from "@budibase/backend-core" +import { auth, redis } from "@budibase/backend-core" import currentApp from "../middleware/currentapp" import { createAdapter } from "@socket.io/redis-adapter" import { Socket } from "socket.io" -import { - getSocketPubSubClients, - getSocketUsers, - setSocketUsers, -} from "../utilities/redis" -import { SocketEvents } from "@budibase/shared-core" -import { SocketUser } from "@budibase/types" +import { getSocketPubSubClients } from "../utilities/redis" +import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core" +import { SocketSession } from "@budibase/types" export class BaseSocket { io: Server path: string + redisClient?: redis.Client constructor( app: Koa, @@ -79,7 +76,6 @@ export class BaseSocket { email, firstName, lastName, - appId: ctx.appId, sessionId: socket.id, } next() @@ -91,50 +87,116 @@ export class BaseSocket { } }) - // Instantiate redis adapter - const { pub, sub } = getSocketPubSubClients() - const opts = { key: `socket.io-${path}` } - this.io.adapter(createAdapter(pub, sub, opts)) + // Initialise redis before handling connections + this.initialise().then(() => { + this.io.on("connection", async socket => { + // Add built in handler to allow fetching all other users in this room + socket.on(SocketEvents.GetUsers, async (payload, callback) => { + const sessions = await this.getRoomSessions(socket.data.room) + callback({ users: sessions }) + }) - // Handle user connections and disconnections - this.io.on("connection", async socket => { - // Add built in handler to allow fetching all other users in this room - socket.on(SocketEvents.GetUsers, async (payload, callback) => { - let users - if (socket.data.room) { - users = await this.getSocketUsers(socket.data.room) - } - callback({ users }) - }) + // Add built in handler for heartbeats + socket.on(SocketEvents.Heartbeat, async () => { + console.log(socket.data.email, "heartbeat received") + await this.extendSessionTTL(socket.data.sessionId) + }) - // Add handlers for this socket - await this.onConnect(socket) + // Add early disconnection handler to clean up and leave room + socket.on("disconnect", async () => { + // Run any custom disconnection logic before we leave the room, + // so that we have access to their room etc before disconnection + await this.onDisconnect(socket) - // Add early disconnection handler to clean up and leave room - socket.on("disconnect", async () => { - // Leave the current room when the user disconnects if we're in one - if (socket.data.room) { + // Leave the current room when the user disconnects if we're in one await this.leaveRoom(socket) - } + }) - // Run any other disconnection logic - await this.onDisconnect(socket) + // Add handlers for this socket + await this.onConnect(socket) }) }) } + async initialise() { + // Instantiate redis adapter. + // We use a fully qualified key name here as this bypasses the normal + // redis client#s key prefixing. + const { pub, sub } = getSocketPubSubClients() + const opts = { + key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`, + } + this.io.adapter(createAdapter(pub, sub, opts)) + + // Fetch redis client + this.redisClient = await redis.clients.getSocketClient() + } + + // Gets the redis key for a certain session ID + getSessionKey(sessionId: string) { + return `${this.path}-session:${sessionId}` + } + + // Gets the redis key for certain room name + getRoomKey(room: string) { + return `${this.path}-room:${room}` + } + + async extendSessionTTL(sessionId: string) { + const key = this.getSessionKey(sessionId) + await this.redisClient?.setExpiry(key, SocketSessionTTL) + } + + // Gets an array of all redis keys of users inside a certain room + async getRoomSessionKeys(room: string): Promise { + const keys = await this.redisClient?.get(this.getRoomKey(room)) + return keys || [] + } + + // Sets the list of redis keys for users inside a certain room. + // There is no TTL on the actual room key map itself. + async setRoomSessionKeys(room: string, keys: string[]) { + await this.redisClient?.store(this.getRoomKey(room), keys) + } + // Gets a list of all users inside a certain room - async getSocketUsers(room?: string): Promise { + async getRoomSessions(room?: string): Promise { if (room) { - const users = await getSocketUsers(this.path, room) - return users || [] + const keys = await this.getRoomSessionKeys(room) + const sessions = await this.redisClient?.bulkGet(keys) + return Object.values(sessions || {}) } else { return [] } } + // Detects keys which have been pruned from redis due to TTL expiry in a certain + // room and broadcasts disconnection messages to ensure clients are aware + async pruneRoom(room: string) { + const keys = await this.getRoomSessionKeys(room) + const keysExist = await Promise.all( + keys.map(key => this.redisClient?.exists(key)) + ) + const prunedKeys = keys.filter((key, idx) => { + if (!keysExist[idx]) { + console.log("pruning key", keys[idx]) + return false + } + return true + }) + + // Store new pruned keys + await this.setRoomSessionKeys(room, prunedKeys) + } + // Adds a user to a certain room async joinRoom(socket: Socket, room: string) { + if (!room) { + return + } + // Prune room before joining + await this.pruneRoom(room) + // Check if we're already in a room, as we'll need to leave if we are before we // can join a different room const oldRoom = socket.data.room @@ -148,40 +210,43 @@ export class BaseSocket { socket.data.room = room } + // Store in redis // @ts-ignore - let user: SocketUser = socket.data - let users = await this.getSocketUsers(room) + let user: SocketSession = socket.data + const key = this.getSessionKey(user.sessionId) + await this.redisClient?.store(key, user, SocketSessionTTL) + const roomKeys = await this.getRoomSessionKeys(room) + if (!roomKeys.includes(key)) { + await this.setRoomSessionKeys(room, [...roomKeys, key]) + } - // Store this socket in redis - if (!users?.length) { - users = [] - } - const index = users.findIndex(x => x.sessionId === socket.data.sessionId) - if (index === -1) { - users.push(user) - } else { - users[index] = user - } - await setSocketUsers(this.path, room, users) + // Notify other users socket.to(room).emit(SocketEvents.UserUpdate, user) } // Disconnects a socket from its current room async leaveRoom(socket: Socket) { // @ts-ignore - let user: SocketUser = socket.data + let user: SocketSession = socket.data const { room, sessionId } = user if (!room) { return } + + // Leave room socket.leave(room) socket.data.room = undefined - let users = await this.getSocketUsers(room) + // Delete from redis + const key = this.getSessionKey(sessionId) + await this.redisClient?.delete(key) + const roomKeys = await this.getRoomSessionKeys(room) + await this.setRoomSessionKeys( + room, + roomKeys.filter(k => k !== key) + ) - // Remove this socket from redis - users = users.filter(user => user.sessionId !== sessionId) - await setSocketUsers(this.path, room, users) + // Notify other users socket.to(room).emit(SocketEvents.UserDisconnect, user) } diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 62603ee941..9c92755730 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -71,7 +71,8 @@ export const SqlNumberTypeRangeMap = { export const SocketEvents = { UserUpdate: "UserUpdate", UserDisconnect: "UserDisconnect", - GetUsers: "GetUsers" + GetUsers: "GetUsers", + Heartbeat: "Heartbeat" } export const GridSocketEvents = { @@ -82,6 +83,9 @@ export const GridSocketEvents = { } export const BuilderSocketEvents = { + SelectApp: "SelectApp", TableChange: "TableChange", DatasourceChange: "DatasourceChange" -} \ No newline at end of file +} + +export const SocketSessionTTL = 60 \ No newline at end of file diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts index 822c4294e0..0238478ed8 100644 --- a/packages/types/src/sdk/websocket.ts +++ b/packages/types/src/sdk/websocket.ts @@ -1,9 +1,8 @@ -export interface SocketUser { +export interface SocketSession { _id: string, email: string, firstName?: string, lastName?: string, - appId: string, sessionId: string, room?: string } \ No newline at end of file From 8d83a94d61cce2640502651864b9968a5ce92c97 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 15:13:52 +0100 Subject: [PATCH 4/9] Disable heartbead on client app socket --- packages/client/src/websocket.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/client/src/websocket.js b/packages/client/src/websocket.js index a3356b7eaa..1e8e908d2b 100644 --- a/packages/client/src/websocket.js +++ b/packages/client/src/websocket.js @@ -18,7 +18,7 @@ export const initWebsocket = () => { } // Initialise connection - socket = createWebsocket("/socket/client") + socket = createWebsocket("/socket/client", false) // Event handlers socket.on("plugin-update", data => { From 7a307e3de88eb1ac51dc3443bcf49f26a63f0926 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 15:14:33 +0100 Subject: [PATCH 5/9] Remove logs --- packages/frontend-core/src/utils/websocket.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 6f907f5270..3863132664 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -29,13 +29,11 @@ export const createWebsocket = (path, heartbeat = true) => { let interval if (heartbeat) { interval = setInterval(() => { - console.log("Sending heartbeat") socket.emit(SocketEvents.Heartbeat) }, SocketSessionTTL * 500) } socket.on("disconnect", () => { - console.log("clear interval") clearInterval(interval) }) From a7c6298d1fff78f045eae0d1c33b923c6ade1cc2 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 15:37:39 +0100 Subject: [PATCH 6/9] Ensure socket pruning due to redis TTL expiry works as expected --- .../builder/src/builderStore/store/users.js | 4 +- .../src/components/grid/stores/users.js | 4 +- packages/server/src/websockets/websocket.ts | 40 ++++++++++--------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/packages/builder/src/builderStore/store/users.js b/packages/builder/src/builderStore/store/users.js index 18f343c884..6e10b081d6 100644 --- a/packages/builder/src/builderStore/store/users.js +++ b/packages/builder/src/builderStore/store/users.js @@ -20,9 +20,9 @@ export const getUserStore = () => { } } - const removeUser = user => { + const removeUser = sessionId => { store.update(state => { - return state.filter(x => x.sessionId !== user.sessionId) + return state.filter(x => x.sessionId !== sessionId) }) } diff --git a/packages/frontend-core/src/components/grid/stores/users.js b/packages/frontend-core/src/components/grid/stores/users.js index 5368c414ce..b6e74ef276 100644 --- a/packages/frontend-core/src/components/grid/stores/users.js +++ b/packages/frontend-core/src/components/grid/stores/users.js @@ -51,9 +51,9 @@ export const deriveStores = context => { } } - const removeUser = user => { + const removeUser = sessionId => { users.update(state => { - return state.filter(x => x.sessionId !== user.sessionId) + return state.filter(x => x.sessionId !== sessionId) }) } diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 2ecff7a62c..62d3c38641 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -148,21 +148,22 @@ export class BaseSocket { } // Gets an array of all redis keys of users inside a certain room - async getRoomSessionKeys(room: string): Promise { + async getRoomSessionIds(room: string): Promise { const keys = await this.redisClient?.get(this.getRoomKey(room)) return keys || [] } // Sets the list of redis keys for users inside a certain room. // There is no TTL on the actual room key map itself. - async setRoomSessionKeys(room: string, keys: string[]) { - await this.redisClient?.store(this.getRoomKey(room), keys) + async setRoomSessionIds(room: string, ids: string[]) { + await this.redisClient?.store(this.getRoomKey(room), ids) } // Gets a list of all users inside a certain room async getRoomSessions(room?: string): Promise { if (room) { - const keys = await this.getRoomSessionKeys(room) + const sessionIds = await this.getRoomSessionIds(room) + const keys = sessionIds.map(this.getSessionKey.bind(this)) const sessions = await this.redisClient?.bulkGet(keys) return Object.values(sessions || {}) } else { @@ -173,20 +174,20 @@ export class BaseSocket { // Detects keys which have been pruned from redis due to TTL expiry in a certain // room and broadcasts disconnection messages to ensure clients are aware async pruneRoom(room: string) { - const keys = await this.getRoomSessionKeys(room) - const keysExist = await Promise.all( - keys.map(key => this.redisClient?.exists(key)) + const sessionIds = await this.getRoomSessionIds(room) + const sessionsExist = await Promise.all( + sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id))) ) - const prunedKeys = keys.filter((key, idx) => { - if (!keysExist[idx]) { - console.log("pruning key", keys[idx]) + const prunedSessionIds = sessionIds.filter((id, idx) => { + if (!sessionsExist[idx]) { + this.io.to(room).emit(SocketEvents.UserDisconnect, sessionIds[idx]) return false } return true }) // Store new pruned keys - await this.setRoomSessionKeys(room, prunedKeys) + await this.setRoomSessionIds(room, prunedSessionIds) } // Adds a user to a certain room @@ -213,11 +214,12 @@ export class BaseSocket { // Store in redis // @ts-ignore let user: SocketSession = socket.data - const key = this.getSessionKey(user.sessionId) + const { sessionId } = user + const key = this.getSessionKey(sessionId) await this.redisClient?.store(key, user, SocketSessionTTL) - const roomKeys = await this.getRoomSessionKeys(room) - if (!roomKeys.includes(key)) { - await this.setRoomSessionKeys(room, [...roomKeys, key]) + const sessionIds = await this.getRoomSessionIds(room) + if (!sessionIds.includes(sessionId)) { + await this.setRoomSessionIds(room, [...sessionIds, sessionId]) } // Notify other users @@ -240,14 +242,14 @@ export class BaseSocket { // Delete from redis const key = this.getSessionKey(sessionId) await this.redisClient?.delete(key) - const roomKeys = await this.getRoomSessionKeys(room) - await this.setRoomSessionKeys( + const sessionIds = await this.getRoomSessionIds(room) + await this.setRoomSessionIds( room, - roomKeys.filter(k => k !== key) + sessionIds.filter(id => id !== sessionId) ) // Notify other users - socket.to(room).emit(SocketEvents.UserDisconnect, user) + socket.to(room).emit(SocketEvents.UserDisconnect, sessionId) } // Updates a connected user's metadata, assuming a room change is not required. From ffbbf04e868c98130f69dd6b89b68a494e163307 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 16:08:35 +0100 Subject: [PATCH 7/9] Lint --- packages/backend-core/src/redis/init.ts | 7 +++++-- .../src/components/grid/layout/Grid.svelte | 1 - packages/server/src/websockets/builder.ts | 10 ++++------ packages/shared-core/src/constants.ts | 8 ++++---- packages/types/src/sdk/websocket.ts | 14 +++++++------- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index 056620c085..55ffe3dd12 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -16,7 +16,10 @@ async function init() { cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init() lockClient = await new Client(utils.Databases.LOCKS).init() writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init() - socketClient = await new Client(utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO).init() + socketClient = await new Client( + utils.Databases.SOCKET_IO, + utils.SelectableDatabase.SOCKET_IO + ).init() } export async function shutdown() { @@ -80,4 +83,4 @@ export async function getSocketClient() { await init() } return socketClient -} \ No newline at end of file +} diff --git a/packages/frontend-core/src/components/grid/layout/Grid.svelte b/packages/frontend-core/src/components/grid/layout/Grid.svelte index 72c32cbbee..08325857d6 100644 --- a/packages/frontend-core/src/components/grid/layout/Grid.svelte +++ b/packages/frontend-core/src/components/grid/layout/Grid.svelte @@ -62,7 +62,6 @@ stripeRows, }) - // Build up context let context = { API: API || createAPIClient(), diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index 9fbedbe9e2..128b22322d 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -60,12 +60,10 @@ export default class BuilderSocket extends BaseSocket { } emitDatasourceUpdate(ctx: any, datasource: Datasource) { - this.io - .in(ctx.appId) - .emit(BuilderSocketEvents.DatasourceChange, { - id: datasource._id, - datasource, - }) + this.io.in(ctx.appId).emit(BuilderSocketEvents.DatasourceChange, { + id: datasource._id, + datasource, + }) } emitDatasourceDeletion(ctx: any, id: string) { diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 9c92755730..118ac876a4 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -72,20 +72,20 @@ export const SocketEvents = { UserUpdate: "UserUpdate", UserDisconnect: "UserDisconnect", GetUsers: "GetUsers", - Heartbeat: "Heartbeat" + Heartbeat: "Heartbeat", } export const GridSocketEvents = { RowChange: "RowChange", TableChange: "TableChange", SelectTable: "SelectTable", - SelectCell: "SelectCell" + SelectCell: "SelectCell", } export const BuilderSocketEvents = { SelectApp: "SelectApp", TableChange: "TableChange", - DatasourceChange: "DatasourceChange" + DatasourceChange: "DatasourceChange", } -export const SocketSessionTTL = 60 \ No newline at end of file +export const SocketSessionTTL = 60 diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts index 0238478ed8..4fa7e155d6 100644 --- a/packages/types/src/sdk/websocket.ts +++ b/packages/types/src/sdk/websocket.ts @@ -1,8 +1,8 @@ export interface SocketSession { - _id: string, - email: string, - firstName?: string, - lastName?: string, - sessionId: string, - room?: string -} \ No newline at end of file + _id: string + email: string + firstName?: string + lastName?: string + sessionId: string + room?: string +} From e3b5d711df4b1e664be64321fe1106ab3fcb2fa3 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 16:27:49 +0100 Subject: [PATCH 8/9] Disable redis select command in tests --- packages/backend-core/src/redis/redis.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index ea1f1944a1..f25de9be2b 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -183,7 +183,7 @@ class RedisWrapper { CLOSED = false init(this._select) await waitForConnection(this._select) - if (this._select) { + if (this._select && !env.isTest()) { this.getClient().select(this._select) } return this From 680e609b52198e5b59563f54eaf3716e637e4355 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 17:03:14 +0100 Subject: [PATCH 9/9] Remove plural forms of enums and use TS enum rather than JS const --- .../builder/src/builderStore/websocket.js | 12 ++++----- .../src/components/grid/lib/websocket.js | 14 +++++----- packages/frontend-core/src/utils/websocket.js | 4 +-- packages/server/src/websockets/builder.ts | 12 ++++----- packages/server/src/websockets/grid.ts | 14 +++++----- packages/server/src/websockets/websocket.ts | 16 ++++------- packages/shared-core/src/constants.ts | 27 +++++++++---------- 7 files changed, 46 insertions(+), 53 deletions(-) diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js index ffaca7a025..6a3ce76ae6 100644 --- a/packages/builder/src/builderStore/websocket.js +++ b/packages/builder/src/builderStore/websocket.js @@ -1,14 +1,14 @@ import { createWebsocket } from "@budibase/frontend-core" import { userStore } from "builderStore" import { datasources, tables } from "stores/backend" -import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core" +import { SocketEvent, BuilderSocketEvent } from "@budibase/shared-core" export const createBuilderWebsocket = appId => { const socket = createWebsocket("/socket/builder") // Built-in events socket.on("connect", () => { - socket.emit(BuilderSocketEvents.SelectApp, appId, response => { + socket.emit(BuilderSocketEvent.SelectApp, appId, response => { userStore.actions.init(response.users) }) }) @@ -17,16 +17,16 @@ export const createBuilderWebsocket = appId => { }) // User events - socket.on(SocketEvents.UserUpdate, userStore.actions.updateUser) - socket.on(SocketEvents.UserDisconnect, userStore.actions.removeUser) + socket.on(SocketEvent.UserUpdate, userStore.actions.updateUser) + socket.on(SocketEvent.UserDisconnect, userStore.actions.removeUser) // Table events - socket.on(BuilderSocketEvents.TableChange, ({ id, table }) => { + socket.on(BuilderSocketEvent.TableChange, ({ id, table }) => { tables.replaceTable(id, table) }) // Datasource events - socket.on(BuilderSocketEvents.DatasourceChange, ({ id, datasource }) => { + socket.on(BuilderSocketEvent.DatasourceChange, ({ id, datasource }) => { datasources.replaceDatasource(id, datasource) }) diff --git a/packages/frontend-core/src/components/grid/lib/websocket.js b/packages/frontend-core/src/components/grid/lib/websocket.js index 3f4ee32933..bb1d6991b0 100644 --- a/packages/frontend-core/src/components/grid/lib/websocket.js +++ b/packages/frontend-core/src/components/grid/lib/websocket.js @@ -1,6 +1,6 @@ import { get } from "svelte/store" import { createWebsocket } from "../../../utils" -import { SocketEvents, GridSocketEvents } from "@budibase/shared-core" +import { SocketEvent, GridSocketEvent } from "@budibase/shared-core" export const createGridWebsocket = context => { const { rows, tableId, users, focusedCellId, table } = context @@ -11,7 +11,7 @@ export const createGridWebsocket = context => { return } // Identify which table we are editing - socket.emit(GridSocketEvents.SelectTable, tableId, response => { + socket.emit(GridSocketEvent.SelectTable, tableId, response => { // handle initial connection info users.set(response.users) }) @@ -26,15 +26,15 @@ export const createGridWebsocket = context => { }) // User events - socket.on(SocketEvents.UserUpdate, user => { + socket.on(SocketEvent.UserUpdate, user => { users.actions.updateUser(user) }) - socket.on(SocketEvents.UserDisconnect, user => { + socket.on(SocketEvent.UserDisconnect, user => { users.actions.removeUser(user) }) // Row events - socket.on(GridSocketEvents.RowChange, async data => { + socket.on(GridSocketEvent.RowChange, async data => { if (data.id) { rows.actions.replaceRow(data.id, data.row) } else if (data.row.id) { @@ -44,7 +44,7 @@ export const createGridWebsocket = context => { }) // Table events - socket.on(GridSocketEvents.TableChange, data => { + socket.on(GridSocketEvent.TableChange, data => { // Only update table if one exists. If the table was deleted then we don't // want to know - let the builder navigate away if (data.table) { @@ -57,7 +57,7 @@ export const createGridWebsocket = context => { // Notify selected cell changes focusedCellId.subscribe($focusedCellId => { - socket.emit(GridSocketEvents.SelectCell, $focusedCellId) + socket.emit(GridSocketEvent.SelectCell, $focusedCellId) }) return () => socket?.disconnect() diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 3863132664..561a020e13 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -1,5 +1,5 @@ import { io } from "socket.io-client" -import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core" +import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core" export const createWebsocket = (path, heartbeat = true) => { if (!path) { @@ -29,7 +29,7 @@ export const createWebsocket = (path, heartbeat = true) => { let interval if (heartbeat) { interval = setInterval(() => { - socket.emit(SocketEvents.Heartbeat) + socket.emit(SocketEvent.Heartbeat) }, SocketSessionTTL * 500) } diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index 128b22322d..5e0e094b9e 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -7,7 +7,7 @@ import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types" import { gridSocket } from "./index" import { clearLock } from "../utilities/redis" import { Socket } from "socket.io" -import { BuilderSocketEvents } from "@budibase/shared-core" +import { BuilderSocketEvent } from "@budibase/shared-core" export default class BuilderSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { @@ -16,7 +16,7 @@ export default class BuilderSocket extends BaseSocket { async onConnect(socket: Socket) { // Initial identification of selected app - socket.on(BuilderSocketEvents.SelectApp, async (appId, callback) => { + socket.on(BuilderSocketEvent.SelectApp, async (appId, callback) => { await this.joinRoom(socket, appId) // Reply with all users in current room @@ -48,19 +48,19 @@ export default class BuilderSocket extends BaseSocket { emitTableUpdate(ctx: any, table: Table) { this.io .in(ctx.appId) - .emit(BuilderSocketEvents.TableChange, { id: table._id, table }) + .emit(BuilderSocketEvent.TableChange, { id: table._id, table }) gridSocket.emitTableUpdate(table) } emitTableDeletion(ctx: any, id: string) { this.io .in(ctx.appId) - .emit(BuilderSocketEvents.TableChange, { id, table: null }) + .emit(BuilderSocketEvent.TableChange, { id, table: null }) gridSocket.emitTableDeletion(id) } emitDatasourceUpdate(ctx: any, datasource: Datasource) { - this.io.in(ctx.appId).emit(BuilderSocketEvents.DatasourceChange, { + this.io.in(ctx.appId).emit(BuilderSocketEvent.DatasourceChange, { id: datasource._id, datasource, }) @@ -69,6 +69,6 @@ export default class BuilderSocket extends BaseSocket { emitDatasourceDeletion(ctx: any, id: string) { this.io .in(ctx.appId) - .emit(BuilderSocketEvents.DatasourceChange, { id, datasource: null }) + .emit(BuilderSocketEvent.DatasourceChange, { id, datasource: null }) } } diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts index 833c7b3348..440b4f78a5 100644 --- a/packages/server/src/websockets/grid.ts +++ b/packages/server/src/websockets/grid.ts @@ -6,7 +6,7 @@ import Koa from "koa" import { getTableId } from "../api/controllers/row/utils" import { Row, Table } from "@budibase/types" import { Socket } from "socket.io" -import { GridSocketEvents } from "@budibase/shared-core" +import { GridSocketEvent } from "@budibase/shared-core" export default class GridSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { @@ -15,7 +15,7 @@ export default class GridSocket extends BaseSocket { async onConnect(socket: Socket) { // Initial identification of connected spreadsheet - socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => { + socket.on(GridSocketEvent.SelectTable, async (tableId, callback) => { await this.joinRoom(socket, tableId) // Reply with all users in current room @@ -24,28 +24,28 @@ export default class GridSocket extends BaseSocket { }) // Handle users selecting a new cell - socket.on(GridSocketEvents.SelectCell, cellId => { + socket.on(GridSocketEvent.SelectCell, cellId => { this.updateUser(socket, { focusedCellId: cellId }) }) } emitRowUpdate(ctx: any, row: Row) { const tableId = getTableId(ctx) - this.io.in(tableId).emit(GridSocketEvents.RowChange, { id: row._id, row }) + this.io.in(tableId).emit(GridSocketEvent.RowChange, { id: row._id, row }) } emitRowDeletion(ctx: any, id: string) { const tableId = getTableId(ctx) - this.io.in(tableId).emit(GridSocketEvents.RowChange, { id, row: null }) + this.io.in(tableId).emit(GridSocketEvent.RowChange, { id, row: null }) } emitTableUpdate(table: Table) { this.io .in(table._id!) - .emit(GridSocketEvents.TableChange, { id: table._id, table }) + .emit(GridSocketEvent.TableChange, { id: table._id, table }) } emitTableDeletion(id: string) { - this.io.in(id).emit(GridSocketEvents.TableChange, { id, table: null }) + this.io.in(id).emit(GridSocketEvent.TableChange, { id, table: null }) } } diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 62d3c38641..a2d2474c77 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -8,7 +8,7 @@ import currentApp from "../middleware/currentapp" import { createAdapter } from "@socket.io/redis-adapter" import { Socket } from "socket.io" import { getSocketPubSubClients } from "../utilities/redis" -import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core" +import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core" import { SocketSession } from "@budibase/types" export class BaseSocket { @@ -90,14 +90,8 @@ export class BaseSocket { // Initialise redis before handling connections this.initialise().then(() => { this.io.on("connection", async socket => { - // Add built in handler to allow fetching all other users in this room - socket.on(SocketEvents.GetUsers, async (payload, callback) => { - const sessions = await this.getRoomSessions(socket.data.room) - callback({ users: sessions }) - }) - // Add built in handler for heartbeats - socket.on(SocketEvents.Heartbeat, async () => { + socket.on(SocketEvent.Heartbeat, async () => { console.log(socket.data.email, "heartbeat received") await this.extendSessionTTL(socket.data.sessionId) }) @@ -180,7 +174,7 @@ export class BaseSocket { ) const prunedSessionIds = sessionIds.filter((id, idx) => { if (!sessionsExist[idx]) { - this.io.to(room).emit(SocketEvents.UserDisconnect, sessionIds[idx]) + this.io.to(room).emit(SocketEvent.UserDisconnect, sessionIds[idx]) return false } return true @@ -223,7 +217,7 @@ export class BaseSocket { } // Notify other users - socket.to(room).emit(SocketEvents.UserUpdate, user) + socket.to(room).emit(SocketEvent.UserUpdate, user) } // Disconnects a socket from its current room @@ -249,7 +243,7 @@ export class BaseSocket { ) // Notify other users - socket.to(room).emit(SocketEvents.UserDisconnect, sessionId) + socket.to(room).emit(SocketEvent.UserDisconnect, sessionId) } // Updates a connected user's metadata, assuming a room change is not required. diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 118ac876a4..5802a6d88a 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -68,24 +68,23 @@ export const SqlNumberTypeRangeMap = { }, } -export const SocketEvents = { - UserUpdate: "UserUpdate", - UserDisconnect: "UserDisconnect", - GetUsers: "GetUsers", - Heartbeat: "Heartbeat", +export enum SocketEvent { + UserUpdate = "UserUpdate", + UserDisconnect = "UserDisconnect", + Heartbeat = "Heartbeat", } -export const GridSocketEvents = { - RowChange: "RowChange", - TableChange: "TableChange", - SelectTable: "SelectTable", - SelectCell: "SelectCell", +export enum GridSocketEvent { + RowChange = "RowChange", + TableChange = "TableChange", + SelectTable = "SelectTable", + SelectCell = "SelectCell", } -export const BuilderSocketEvents = { - SelectApp: "SelectApp", - TableChange: "TableChange", - DatasourceChange: "DatasourceChange", +export enum BuilderSocketEvent { + SelectApp = "SelectApp", + TableChange = "TableChange", + DatasourceChange = "DatasourceChange", } export const SocketSessionTTL = 60