From d3fdb52bf5b0cbe09bcee0568a4f7ad47ea81196 Mon Sep 17 00:00:00 2001 From: Andrew Kingston Date: Wed, 31 May 2023 10:21:50 +0100 Subject: [PATCH] Store all socket users in redis to allow all server instances to be aware of all users --- .../builder/src/builderStore/websocket.js | 13 +- .../src/components/grid/layout/Grid.svelte | 1 + .../src/components/grid/lib/websocket.js | 17 +-- packages/frontend-core/src/utils/websocket.js | 2 +- packages/server/src/utilities/redis.ts | 11 +- packages/server/src/websockets/builder.ts | 79 +++++------ packages/server/src/websockets/client.ts | 4 +- packages/server/src/websockets/grid.ts | 66 +++------ packages/server/src/websockets/websocket.ts | 132 ++++++++++++++++-- packages/shared-core/src/constants.ts | 18 +++ packages/types/src/sdk/index.ts | 1 + packages/types/src/sdk/websocket.ts | 9 ++ 12 files changed, 243 insertions(+), 110 deletions(-) create mode 100644 packages/types/src/sdk/websocket.ts diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js index d58eb24bbf..79138b0767 100644 --- a/packages/builder/src/builderStore/websocket.js +++ b/packages/builder/src/builderStore/websocket.js @@ -1,13 +1,14 @@ import { createWebsocket } from "@budibase/frontend-core" import { userStore } from "builderStore" import { datasources, tables } from "stores/backend" +import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core" export const createBuilderWebsocket = () => { const socket = createWebsocket("/socket/builder") - // Connection events + // Built-in events socket.on("connect", () => { - socket.emit("get-users", null, response => { + socket.emit(SocketEvents.GetUsers, null, response => { userStore.actions.init(response.users) }) }) @@ -16,16 +17,16 @@ export const createBuilderWebsocket = () => { }) // User events - socket.on("user-update", userStore.actions.updateUser) - socket.on("user-disconnect", userStore.actions.removeUser) + socket.on(SocketEvents.UserUpdate, userStore.actions.updateUser) + socket.on(SocketEvents.UserDisconnect, userStore.actions.removeUser) // Table events - socket.on("table-change", ({ id, table }) => { + socket.on(BuilderSocketEvents.TableChange, ({ id, table }) => { tables.replaceTable(id, table) }) // Datasource events - socket.on("datasource-change", ({ id, datasource }) => { + socket.on(BuilderSocketEvents.DatasourceChange, ({ id, datasource }) => { datasources.replaceDatasource(id, datasource) }) diff --git a/packages/frontend-core/src/components/grid/layout/Grid.svelte b/packages/frontend-core/src/components/grid/layout/Grid.svelte index 08325857d6..72c32cbbee 100644 --- a/packages/frontend-core/src/components/grid/layout/Grid.svelte +++ b/packages/frontend-core/src/components/grid/layout/Grid.svelte @@ -62,6 +62,7 @@ stripeRows, }) + // Build up context let context = { API: API || createAPIClient(), diff --git a/packages/frontend-core/src/components/grid/lib/websocket.js b/packages/frontend-core/src/components/grid/lib/websocket.js index 3f1c473ea0..3f4ee32933 100644 --- a/packages/frontend-core/src/components/grid/lib/websocket.js +++ b/packages/frontend-core/src/components/grid/lib/websocket.js @@ -1,5 +1,6 @@ import { get } from "svelte/store" import { createWebsocket } from "../../../utils" +import { SocketEvents, GridSocketEvents } from "@budibase/shared-core" export const createGridWebsocket = context => { const { rows, tableId, users, focusedCellId, table } = context @@ -10,13 +11,13 @@ export const createGridWebsocket = context => { return } // Identify which table we are editing - socket.emit("select-table", tableId, response => { + socket.emit(GridSocketEvents.SelectTable, tableId, response => { // handle initial connection info users.set(response.users) }) } - // Connection events + // Built-in events socket.on("connect", () => { connectToTable(get(tableId)) }) @@ -25,25 +26,25 @@ export const createGridWebsocket = context => { }) // User events - socket.on("user-update", user => { + socket.on(SocketEvents.UserUpdate, user => { users.actions.updateUser(user) }) - socket.on("user-disconnect", user => { + socket.on(SocketEvents.UserDisconnect, user => { users.actions.removeUser(user) }) // Row events - socket.on("row-change", async data => { + socket.on(GridSocketEvents.RowChange, async data => { if (data.id) { rows.actions.replaceRow(data.id, data.row) } else if (data.row.id) { - // Handle users table edge case + // Handle users table edge cased await rows.actions.refreshRow(data.row.id) } }) // Table events - socket.on("table-change", data => { + socket.on(GridSocketEvents.TableChange, data => { // Only update table if one exists. If the table was deleted then we don't // want to know - let the builder navigate away if (data.table) { @@ -56,7 +57,7 @@ export const createGridWebsocket = context => { // Notify selected cell changes focusedCellId.subscribe($focusedCellId => { - socket.emit("select-cell", $focusedCellId) + socket.emit(GridSocketEvents.SelectCell, $focusedCellId) }) return () => socket?.disconnect() diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 839fa6d73d..6de9690253 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -21,6 +21,6 @@ export const createWebsocket = path => { timeout: 4000, // Disable polling and rely on websocket only, as HTTP transport // will only work with sticky sessions which we don't have - transports: ["websocket"], + // transports: ["websocket"], }) } diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts index ff1c863bf7..2ae2216fc4 100644 --- a/packages/server/src/utilities/redis.ts +++ b/packages/server/src/utilities/redis.ts @@ -1,6 +1,6 @@ import { redis } from "@budibase/backend-core" import { getGlobalIDFromUserMetadataID } from "../db/utils" -import { ContextUser } from "@budibase/types" +import { ContextUser, SocketUser } from "@budibase/types" const APP_DEV_LOCK_SECONDS = 600 const AUTOMATION_TEST_FLAG_SECONDS = 60 @@ -26,6 +26,7 @@ export async function init() { } export async function shutdown() { + console.log("REDIS SHUTDOWN") if (devAppClient) await devAppClient.finish() if (debounceClient) await debounceClient.finish() if (flagClient) await flagClient.finish() @@ -97,6 +98,14 @@ export async function clearTestFlag(id: string) { await devAppClient.delete(id) } +export async function getSocketUsers(path: string, room: string) { + return await socketClient.get(`${path}:${room}`) +} + +export async function setSocketUsers(path: string, room: string, users: SocketUser[]) { + await socketClient.store(`${path}:${room}`, users) +} + export function getSocketPubSubClients() { return { pub: socketClient.getClient(), diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index de8709129d..6fe02be0ba 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -1,69 +1,70 @@ import authorized from "../middleware/authorized" -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" -import { Datasource, Table } from "@budibase/types" +import { Datasource, Table, SocketUser, ContextUser } from "@budibase/types" import { gridSocket } from "./index" import { clearLock } from "../utilities/redis" +import { Socket } from "socket.io" +import { BuilderSocketEvents } from "@budibase/shared-core" -export default class BuilderSocket extends Socket { +export default class BuilderSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/builder", [authorized(permissions.BUILDER)]) + } - this.io.on("connection", socket => { - // Join a room for this app - const user = socket.data.user - const appId = socket.data.appId - socket.join(appId) - socket.to(appId).emit("user-update", user) + async onConnect(socket: Socket) { + // Join a room for this app + await this.joinRoom(socket, socket.data.appId) + } - // Initial identification of connected spreadsheet - socket.on("get-users", async (payload, callback) => { - const sockets = await this.io.in(appId).fetchSockets() - callback({ - users: sockets.map(socket => socket.data.user), - }) + async onDisconnect(socket: Socket) { + // Remove app lock from this user if they have no other connections + try { + // @ts-ignore + const user: SocketUser = socket.data + const { _id, sessionId, appId } = user + const users = await this.getSocketUsers(user.room) + const hasOtherConnection = users.some(otherUser => { + return _id === otherUser._id && sessionId !== otherUser.sessionId }) - - // Disconnection cleanup - socket.on("disconnect", async () => { - socket.to(appId).emit("user-disconnect", user) - - // Remove app lock from this user if they have no other connections - try { - const sockets = await this.io.in(appId).fetchSockets() - const hasOtherConnection = sockets.some(socket => { - const { _id, sessionId } = socket.data.user - return _id === user._id && sessionId !== user.sessionId - }) - if (!hasOtherConnection) { - await clearLock(appId, user) - } - } catch (e) { - // This is fine, just means this user didn't hold the lock - } - }) - }) + if (!hasOtherConnection) { + // @ts-ignore + const user: ContextUser = { _id: socket.data._id } + await clearLock(appId, user) + } + } catch (e) { + // This is fine, just means this user didn't hold the lock + } } emitTableUpdate(ctx: any, table: Table) { - this.io.in(ctx.appId).emit("table-change", { id: table._id, table }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvents.TableChange, { id: table._id, table }) gridSocket.emitTableUpdate(table) } emitTableDeletion(ctx: any, id: string) { - this.io.in(ctx.appId).emit("table-change", { id, table: null }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvents.TableChange, { id, table: null }) gridSocket.emitTableDeletion(id) } emitDatasourceUpdate(ctx: any, datasource: Datasource) { this.io .in(ctx.appId) - .emit("datasource-change", { id: datasource._id, datasource }) + .emit(BuilderSocketEvents.DatasourceChange, { + id: datasource._id, + datasource, + }) } emitDatasourceDeletion(ctx: any, id: string) { - this.io.in(ctx.appId).emit("datasource-change", { id, datasource: null }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvents.DatasourceChange, { id, datasource: null }) } } diff --git a/packages/server/src/websockets/client.ts b/packages/server/src/websockets/client.ts index d59325f66e..165febb7a3 100644 --- a/packages/server/src/websockets/client.ts +++ b/packages/server/src/websockets/client.ts @@ -1,10 +1,10 @@ -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import authorized from "../middleware/authorized" import http from "http" import Koa from "koa" import { permissions } from "@budibase/backend-core" -export default class ClientAppWebsocket extends Socket { +export default class ClientAppWebsocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/client", [authorized(permissions.BUILDER)]) } diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts index a0272be33c..c3c460b19d 100644 --- a/packages/server/src/websockets/grid.ts +++ b/packages/server/src/websockets/grid.ts @@ -1,73 +1,51 @@ import authorized from "../middleware/authorized" -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" import { getTableId } from "../api/controllers/row/utils" import { Row, Table } from "@budibase/types" +import { Socket } from "socket.io" +import { GridSocketEvents } from "@budibase/shared-core" -export default class GridSocket extends Socket { +export default class GridSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/grid", [authorized(permissions.BUILDER)]) + } - this.io.on("connection", socket => { - const user = socket.data.user + async onConnect(socket: Socket) { + // Initial identification of connected spreadsheet + socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => { + await this.joinRoom(socket, tableId) - // Socket state - let currentRoom: string + // Reply with all users in current roome + const users = await this.getSocketUsers(tableId) + callback({ users }) + }) - // Initial identification of connected spreadsheet - socket.on("select-table", async (tableId, callback) => { - // Leave current room - if (currentRoom) { - socket.to(currentRoom).emit("user-disconnect", user) - socket.leave(currentRoom) - } - - // Join new room - currentRoom = tableId - socket.join(currentRoom) - socket.to(currentRoom).emit("user-update", user) - - // Reply with all users in current room - const sockets = await this.io.in(currentRoom).fetchSockets() - callback({ - users: sockets.map(socket => socket.data.user), - }) - }) - - // Handle users selecting a new cell - socket.on("select-cell", cellId => { - socket.data.user.focusedCellId = cellId - if (currentRoom) { - socket.to(currentRoom).emit("user-update", user) - } - }) - - // Disconnection cleanup - socket.on("disconnect", () => { - if (currentRoom) { - socket.to(currentRoom).emit("user-disconnect", user) - } - }) + // Handle users selecting a new cell + socket.on(GridSocketEvents.SelectCell, cellId => { + this.updateUser(socket, { focusedCellId: cellId }) }) } emitRowUpdate(ctx: any, row: Row) { const tableId = getTableId(ctx) - this.io.in(tableId).emit("row-change", { id: row._id, row }) + this.io.in(tableId).emit(GridSocketEvents.RowChange, { id: row._id, row }) } emitRowDeletion(ctx: any, id: string) { const tableId = getTableId(ctx) - this.io.in(tableId).emit("row-change", { id, row: null }) + this.io.in(tableId).emit(GridSocketEvents.RowChange, { id, row: null }) } emitTableUpdate(table: Table) { - this.io.in(table._id!).emit("table-change", { id: table._id, table }) + this.io + .in(table._id!) + .emit(GridSocketEvents.TableChange, { id: table._id, table }) } emitTableDeletion(id: string) { - this.io.in(id).emit("table-change", { id, table: null }) + this.io.in(id).emit(GridSocketEvents.TableChange, { id, table: null }) } } diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 5cac959099..40ab42ab22 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -6,11 +6,18 @@ import { userAgent } from "koa-useragent" import { auth } from "@budibase/backend-core" import currentApp from "../middleware/currentapp" import { createAdapter } from "@socket.io/redis-adapter" -import { getSocketPubSubClients } from "../utilities/redis" -import uuid from "uuid" +import { Socket } from "socket.io" +import { + getSocketPubSubClients, + getSocketUsers, + setSocketUsers, +} from "../utilities/redis" +import { SocketEvents } from "@budibase/shared-core" +import { SocketUser } from "@budibase/types" -export default class Socket { +export class BaseSocket { io: Server + path: string constructor( app: Koa, @@ -18,6 +25,7 @@ export default class Socket { path: string = "/", additionalMiddlewares?: any[] ) { + this.path = path this.io = new Server(server, { path, }) @@ -65,18 +73,15 @@ export default class Socket { // Middlewares are finished // Extract some data from our enriched koa context to persist // as metadata for the socket - // Add user info, including a deterministic color and label const { _id, email, firstName, lastName } = ctx.user - socket.data.user = { + socket.data = { _id, email, firstName, lastName, - sessionId: uuid.v4(), + appId: ctx.appId, + sessionId: socket.id, } - - // Add app ID to help split sockets into rooms - socket.data.appId = ctx.appId next() } }) @@ -90,6 +95,115 @@ export default class Socket { const { pub, sub } = getSocketPubSubClients() const opts = { key: `socket.io-${path}` } this.io.adapter(createAdapter(pub, sub, opts)) + + // Handle user connections and disconnections + this.io.on("connection", async socket => { + // Add built in handler to allow fetching all other users in this room + socket.on(SocketEvents.GetUsers, async (payload, callback) => { + let users + if (socket.data.room) { + users = await this.getSocketUsers(socket.data.room) + } + callback({ users }) + }) + + // Add handlers for this socket + await this.onConnect(socket) + + // Add early disconnection handler to clean up and leave room + socket.on("disconnect", async () => { + // Leave the current room when the user disconnects if we're in one + if (socket.data.room) { + await this.leaveRoom(socket) + } + + // Run any other disconnection logic + await this.onDisconnect(socket) + }) + }) + } + + // Gets a list of all users inside a certain room + async getSocketUsers(room?: string): Promise { + if (room) { + const users = await getSocketUsers(this.path, room) + return users || [] + } else { + return [] + } + } + + // Adds a user to a certain room + async joinRoom(socket: Socket, room: string) { + // Check if we're already in a room, as we'll need to leave if we are before we + // can join a different room + const oldRoom = socket.data.room + if (oldRoom && oldRoom !== room) { + await this.leaveRoom(socket) + } + + // Join new room + if (!oldRoom || oldRoom !== room) { + socket.join(room) + socket.data.room = room + } + + // @ts-ignore + let user: SocketUser = socket.data + let users = await this.getSocketUsers(room) + + // Store this socket in redis + if (!users?.length) { + users = [] + } + const index = users.findIndex(x => x.sessionId === socket.data.sessionId) + if (index === -1) { + users.push(user) + } else { + users[index] = user + } + await setSocketUsers(this.path, room, users) + socket.to(room).emit(SocketEvents.UserUpdate, user) + } + + // Disconnects a socket from its current room + async leaveRoom(socket: Socket) { + // @ts-ignore + let user: SocketUser = socket.data + const { room, sessionId } = user + if (!room) { + return + } + socket.leave(room) + socket.data.room = undefined + + let users = await this.getSocketUsers(room) + + // Remove this socket from redis + users = users.filter(user => user.sessionId !== sessionId) + await setSocketUsers(this.path, room, users) + socket.to(room).emit(SocketEvents.UserDisconnect, user) + } + + // Updates a connected user's metadata, assuming a room change is not required. + async updateUser(socket: Socket, patch: Object) { + socket.data = { + ...socket.data, + ...patch, + } + + // If we're in a room, notify others of this change and update redis + if (socket.data.room) { + await this.joinRoom(socket, socket.data.room) + } + } + + async onConnect(socket: Socket) { + // Override + } + + async onDisconnect(socket: Socket) { + // Override } // Emit an event to all sockets diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 35b02d1e15..62603ee941 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -67,3 +67,21 @@ export const SqlNumberTypeRangeMap = { min: -8388608, }, } + +export const SocketEvents = { + UserUpdate: "UserUpdate", + UserDisconnect: "UserDisconnect", + GetUsers: "GetUsers" +} + +export const GridSocketEvents = { + RowChange: "RowChange", + TableChange: "TableChange", + SelectTable: "SelectTable", + SelectCell: "SelectCell" +} + +export const BuilderSocketEvents = { + TableChange: "TableChange", + DatasourceChange: "DatasourceChange" +} \ No newline at end of file diff --git a/packages/types/src/sdk/index.ts b/packages/types/src/sdk/index.ts index ed44c13667..49d0387a82 100644 --- a/packages/types/src/sdk/index.ts +++ b/packages/types/src/sdk/index.ts @@ -17,3 +17,4 @@ export * from "./auditLogs" export * from "./sso" export * from "./user" export * from "./cli" +export * from "./websocket" diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts new file mode 100644 index 0000000000..822c4294e0 --- /dev/null +++ b/packages/types/src/sdk/websocket.ts @@ -0,0 +1,9 @@ +export interface SocketUser { + _id: string, + email: string, + firstName?: string, + lastName?: string, + appId: string, + sessionId: string, + room?: string +} \ No newline at end of file