diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index 485268edad..55ffe3dd12 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -6,7 +6,8 @@ let userClient: Client, appClient: Client, cacheClient: Client, writethroughClient: Client, - lockClient: Client + lockClient: Client, + socketClient: Client async function init() { userClient = await new Client(utils.Databases.USER_CACHE).init() @@ -14,9 +15,10 @@ async function init() { appClient = await new Client(utils.Databases.APP_METADATA).init() cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init() lockClient = await new Client(utils.Databases.LOCKS).init() - writethroughClient = await new Client( - utils.Databases.WRITE_THROUGH, - utils.SelectableDatabase.WRITE_THROUGH + writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init() + socketClient = await new Client( + utils.Databases.SOCKET_IO, + utils.SelectableDatabase.SOCKET_IO ).init() } @@ -27,6 +29,7 @@ export async function shutdown() { if (cacheClient) await cacheClient.finish() if (writethroughClient) await writethroughClient.finish() if (lockClient) await lockClient.finish() + if (socketClient) await socketClient.finish() } process.on("exit", async () => { @@ -74,3 +77,10 @@ export async function getLockClient() { } return lockClient } + +export async function getSocketClient() { + if (!socketClient) { + await init() + } + return socketClient +} diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index 2d54b51a9f..f25de9be2b 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -183,6 +183,9 @@ class RedisWrapper { CLOSED = false init(this._select) await waitForConnection(this._select) + if (this._select && !env.isTest()) { + this.getClient().select(this._select) + } return this } @@ -209,6 +212,11 @@ class RedisWrapper { return this.getClient().keys(addDbPrefix(db, pattern)) } + async exists(key: string) { + const db = this._db + return await this.getClient().exists(addDbPrefix(db, key)) + } + async get(key: string) { const db = this._db let response = await this.getClient().get(addDbPrefix(db, key)) diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts index f8b815824c..b57dd58d57 100644 --- a/packages/backend-core/src/redis/utils.ts +++ b/packages/backend-core/src/redis/utils.ts @@ -41,7 +41,7 @@ export enum Databases { */ export enum SelectableDatabase { DEFAULT = 0, - WRITE_THROUGH = 1, + SOCKET_IO = 1, UNUSED_1 = 2, UNUSED_2 = 3, UNUSED_3 = 4, diff --git a/packages/builder/src/builderStore/store/frontend.js b/packages/builder/src/builderStore/store/frontend.js index c921ddef3b..4f2a1bef6f 100644 --- a/packages/builder/src/builderStore/store/frontend.js +++ b/packages/builder/src/builderStore/store/frontend.js @@ -118,7 +118,7 @@ export const getFrontendStore = () => { }, initialise: async pkg => { const { layouts, screens, application, clientLibPath, hasLock } = pkg - websocket = createBuilderWebsocket() + websocket = createBuilderWebsocket(application.appId) await store.actions.components.refreshDefinitions(application.appId) // Reset store state diff --git a/packages/builder/src/builderStore/store/users.js b/packages/builder/src/builderStore/store/users.js index 18f343c884..6e10b081d6 100644 --- a/packages/builder/src/builderStore/store/users.js +++ b/packages/builder/src/builderStore/store/users.js @@ -20,9 +20,9 @@ export const getUserStore = () => { } } - const removeUser = user => { + const removeUser = sessionId => { store.update(state => { - return state.filter(x => x.sessionId !== user.sessionId) + return state.filter(x => x.sessionId !== sessionId) }) } diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js index d58eb24bbf..6a3ce76ae6 100644 --- a/packages/builder/src/builderStore/websocket.js +++ b/packages/builder/src/builderStore/websocket.js @@ -1,13 +1,14 @@ import { createWebsocket } from "@budibase/frontend-core" import { userStore } from "builderStore" import { datasources, tables } from "stores/backend" +import { SocketEvent, BuilderSocketEvent } from "@budibase/shared-core" -export const createBuilderWebsocket = () => { +export const createBuilderWebsocket = appId => { const socket = createWebsocket("/socket/builder") - // Connection events + // Built-in events socket.on("connect", () => { - socket.emit("get-users", null, response => { + socket.emit(BuilderSocketEvent.SelectApp, appId, response => { userStore.actions.init(response.users) }) }) @@ -16,24 +17,23 @@ export const createBuilderWebsocket = () => { }) // User events - socket.on("user-update", userStore.actions.updateUser) - socket.on("user-disconnect", userStore.actions.removeUser) + socket.on(SocketEvent.UserUpdate, userStore.actions.updateUser) + socket.on(SocketEvent.UserDisconnect, userStore.actions.removeUser) // Table events - socket.on("table-change", ({ id, table }) => { + socket.on(BuilderSocketEvent.TableChange, ({ id, table }) => { tables.replaceTable(id, table) }) // Datasource events - socket.on("datasource-change", ({ id, datasource }) => { + socket.on(BuilderSocketEvent.DatasourceChange, ({ id, datasource }) => { datasources.replaceDatasource(id, datasource) }) - return { - ...socket, - disconnect: () => { - socket?.disconnect() - userStore.actions.reset() - }, - } + // Clean up user store on disconnect + socket.on("disconnect", () => { + userStore.actions.reset() + }) + + return socket } diff --git a/packages/client/src/websocket.js b/packages/client/src/websocket.js index a3356b7eaa..1e8e908d2b 100644 --- a/packages/client/src/websocket.js +++ b/packages/client/src/websocket.js @@ -18,7 +18,7 @@ export const initWebsocket = () => { } // Initialise connection - socket = createWebsocket("/socket/client") + socket = createWebsocket("/socket/client", false) // Event handlers socket.on("plugin-update", data => { diff --git a/packages/frontend-core/src/components/grid/lib/websocket.js b/packages/frontend-core/src/components/grid/lib/websocket.js index 3f1c473ea0..bb1d6991b0 100644 --- a/packages/frontend-core/src/components/grid/lib/websocket.js +++ b/packages/frontend-core/src/components/grid/lib/websocket.js @@ -1,5 +1,6 @@ import { get } from "svelte/store" import { createWebsocket } from "../../../utils" +import { SocketEvent, GridSocketEvent } from "@budibase/shared-core" export const createGridWebsocket = context => { const { rows, tableId, users, focusedCellId, table } = context @@ -10,13 +11,13 @@ export const createGridWebsocket = context => { return } // Identify which table we are editing - socket.emit("select-table", tableId, response => { + socket.emit(GridSocketEvent.SelectTable, tableId, response => { // handle initial connection info users.set(response.users) }) } - // Connection events + // Built-in events socket.on("connect", () => { connectToTable(get(tableId)) }) @@ -25,25 +26,25 @@ export const createGridWebsocket = context => { }) // User events - socket.on("user-update", user => { + socket.on(SocketEvent.UserUpdate, user => { users.actions.updateUser(user) }) - socket.on("user-disconnect", user => { + socket.on(SocketEvent.UserDisconnect, user => { users.actions.removeUser(user) }) // Row events - socket.on("row-change", async data => { + socket.on(GridSocketEvent.RowChange, async data => { if (data.id) { rows.actions.replaceRow(data.id, data.row) } else if (data.row.id) { - // Handle users table edge case + // Handle users table edge cased await rows.actions.refreshRow(data.row.id) } }) // Table events - socket.on("table-change", data => { + socket.on(GridSocketEvent.TableChange, data => { // Only update table if one exists. If the table was deleted then we don't // want to know - let the builder navigate away if (data.table) { @@ -56,7 +57,7 @@ export const createGridWebsocket = context => { // Notify selected cell changes focusedCellId.subscribe($focusedCellId => { - socket.emit("select-cell", $focusedCellId) + socket.emit(GridSocketEvent.SelectCell, $focusedCellId) }) return () => socket?.disconnect() diff --git a/packages/frontend-core/src/components/grid/stores/users.js b/packages/frontend-core/src/components/grid/stores/users.js index 5368c414ce..b6e74ef276 100644 --- a/packages/frontend-core/src/components/grid/stores/users.js +++ b/packages/frontend-core/src/components/grid/stores/users.js @@ -51,9 +51,9 @@ export const deriveStores = context => { } } - const removeUser = user => { + const removeUser = sessionId => { users.update(state => { - return state.filter(x => x.sessionId !== user.sessionId) + return state.filter(x => x.sessionId !== sessionId) }) } diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 839fa6d73d..561a020e13 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -1,6 +1,7 @@ import { io } from "socket.io-client" +import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core" -export const createWebsocket = path => { +export const createWebsocket = (path, heartbeat = true) => { if (!path) { throw "A websocket path must be provided" } @@ -10,7 +11,7 @@ export const createWebsocket = path => { const proto = tls ? "wss:" : "ws:" const host = location.hostname const port = location.port || (tls ? 443 : 80) - return io(`${proto}//${host}:${port}`, { + const socket = io(`${proto}//${host}:${port}`, { path, // Cap reconnection attempts to 3 (total of 15 seconds before giving up) reconnectionAttempts: 3, @@ -23,4 +24,18 @@ export const createWebsocket = path => { // will only work with sticky sessions which we don't have transports: ["websocket"], }) + + // Set up a heartbeat that's half of the session TTL + let interval + if (heartbeat) { + interval = setInterval(() => { + socket.emit(SocketEvent.Heartbeat) + }, SocketSessionTTL * 500) + } + + socket.on("disconnect", () => { + clearInterval(interval) + }) + + return socket } diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts index ff1c863bf7..5785d79d2a 100644 --- a/packages/server/src/utilities/redis.ts +++ b/packages/server/src/utilities/redis.ts @@ -4,9 +4,10 @@ import { ContextUser } from "@budibase/types" const APP_DEV_LOCK_SECONDS = 600 const AUTOMATION_TEST_FLAG_SECONDS = 60 -let devAppClient: any, debounceClient: any, flagClient: any, socketClient: any +let devAppClient: any, debounceClient: any, flagClient: any // We need to maintain a duplicate client for socket.io pub/sub +let socketClient: any let socketSubClient: any // We init this as we want to keep the connection open all the time @@ -15,21 +16,20 @@ export async function init() { devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS) debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE) flagClient = new redis.Client(redis.utils.Databases.FLAGS) - socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO) await devAppClient.init() await debounceClient.init() await flagClient.init() - await socketClient.init() // Duplicate the socket client for pub/sub + socketClient = await redis.clients.getSocketClient() socketSubClient = socketClient.getClient().duplicate() } export async function shutdown() { + console.log("REDIS SHUTDOWN") if (devAppClient) await devAppClient.finish() if (debounceClient) await debounceClient.finish() if (flagClient) await flagClient.finish() - if (socketClient) await socketClient.finish() if (socketSubClient) socketSubClient.disconnect() // shutdown core clients await redis.clients.shutdown() diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index de8709129d..5e0e094b9e 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -1,69 +1,74 @@ import authorized from "../middleware/authorized" -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" -import { Datasource, Table } from "@budibase/types" +import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types" import { gridSocket } from "./index" import { clearLock } from "../utilities/redis" +import { Socket } from "socket.io" +import { BuilderSocketEvent } from "@budibase/shared-core" -export default class BuilderSocket extends Socket { +export default class BuilderSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/builder", [authorized(permissions.BUILDER)]) + } - this.io.on("connection", socket => { - // Join a room for this app - const user = socket.data.user - const appId = socket.data.appId - socket.join(appId) - socket.to(appId).emit("user-update", user) + async onConnect(socket: Socket) { + // Initial identification of selected app + socket.on(BuilderSocketEvent.SelectApp, async (appId, callback) => { + await this.joinRoom(socket, appId) - // Initial identification of connected spreadsheet - socket.on("get-users", async (payload, callback) => { - const sockets = await this.io.in(appId).fetchSockets() - callback({ - users: sockets.map(socket => socket.data.user), - }) - }) - - // Disconnection cleanup - socket.on("disconnect", async () => { - socket.to(appId).emit("user-disconnect", user) - - // Remove app lock from this user if they have no other connections - try { - const sockets = await this.io.in(appId).fetchSockets() - const hasOtherConnection = sockets.some(socket => { - const { _id, sessionId } = socket.data.user - return _id === user._id && sessionId !== user.sessionId - }) - if (!hasOtherConnection) { - await clearLock(appId, user) - } - } catch (e) { - // This is fine, just means this user didn't hold the lock - } - }) + // Reply with all users in current room + const sessions = await this.getRoomSessions(appId) + callback({ users: sessions }) }) } + async onDisconnect(socket: Socket) { + // Remove app lock from this user if they have no other connections + try { + // @ts-ignore + const session: SocketSession = socket.data + const { _id, sessionId, room } = session + const sessions = await this.getRoomSessions(room) + const hasOtherSession = sessions.some(otherSession => { + return _id === otherSession._id && sessionId !== otherSession.sessionId + }) + if (!hasOtherSession && room) { + // @ts-ignore + const user: ContextUser = { _id: socket.data._id } + await clearLock(room, user) + } + } catch (e) { + // This is fine, just means this user didn't hold the lock + } + } + emitTableUpdate(ctx: any, table: Table) { - this.io.in(ctx.appId).emit("table-change", { id: table._id, table }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvent.TableChange, { id: table._id, table }) gridSocket.emitTableUpdate(table) } emitTableDeletion(ctx: any, id: string) { - this.io.in(ctx.appId).emit("table-change", { id, table: null }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvent.TableChange, { id, table: null }) gridSocket.emitTableDeletion(id) } emitDatasourceUpdate(ctx: any, datasource: Datasource) { - this.io - .in(ctx.appId) - .emit("datasource-change", { id: datasource._id, datasource }) + this.io.in(ctx.appId).emit(BuilderSocketEvent.DatasourceChange, { + id: datasource._id, + datasource, + }) } emitDatasourceDeletion(ctx: any, id: string) { - this.io.in(ctx.appId).emit("datasource-change", { id, datasource: null }) + this.io + .in(ctx.appId) + .emit(BuilderSocketEvent.DatasourceChange, { id, datasource: null }) } } diff --git a/packages/server/src/websockets/client.ts b/packages/server/src/websockets/client.ts index d59325f66e..165febb7a3 100644 --- a/packages/server/src/websockets/client.ts +++ b/packages/server/src/websockets/client.ts @@ -1,10 +1,10 @@ -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import authorized from "../middleware/authorized" import http from "http" import Koa from "koa" import { permissions } from "@budibase/backend-core" -export default class ClientAppWebsocket extends Socket { +export default class ClientAppWebsocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/client", [authorized(permissions.BUILDER)]) } diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts index a0272be33c..440b4f78a5 100644 --- a/packages/server/src/websockets/grid.ts +++ b/packages/server/src/websockets/grid.ts @@ -1,73 +1,51 @@ import authorized from "../middleware/authorized" -import Socket from "./websocket" +import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" import { getTableId } from "../api/controllers/row/utils" import { Row, Table } from "@budibase/types" +import { Socket } from "socket.io" +import { GridSocketEvent } from "@budibase/shared-core" -export default class GridSocket extends Socket { +export default class GridSocket extends BaseSocket { constructor(app: Koa, server: http.Server) { super(app, server, "/socket/grid", [authorized(permissions.BUILDER)]) + } - this.io.on("connection", socket => { - const user = socket.data.user + async onConnect(socket: Socket) { + // Initial identification of connected spreadsheet + socket.on(GridSocketEvent.SelectTable, async (tableId, callback) => { + await this.joinRoom(socket, tableId) - // Socket state - let currentRoom: string + // Reply with all users in current room + const sessions = await this.getRoomSessions(tableId) + callback({ users: sessions }) + }) - // Initial identification of connected spreadsheet - socket.on("select-table", async (tableId, callback) => { - // Leave current room - if (currentRoom) { - socket.to(currentRoom).emit("user-disconnect", user) - socket.leave(currentRoom) - } - - // Join new room - currentRoom = tableId - socket.join(currentRoom) - socket.to(currentRoom).emit("user-update", user) - - // Reply with all users in current room - const sockets = await this.io.in(currentRoom).fetchSockets() - callback({ - users: sockets.map(socket => socket.data.user), - }) - }) - - // Handle users selecting a new cell - socket.on("select-cell", cellId => { - socket.data.user.focusedCellId = cellId - if (currentRoom) { - socket.to(currentRoom).emit("user-update", user) - } - }) - - // Disconnection cleanup - socket.on("disconnect", () => { - if (currentRoom) { - socket.to(currentRoom).emit("user-disconnect", user) - } - }) + // Handle users selecting a new cell + socket.on(GridSocketEvent.SelectCell, cellId => { + this.updateUser(socket, { focusedCellId: cellId }) }) } emitRowUpdate(ctx: any, row: Row) { const tableId = getTableId(ctx) - this.io.in(tableId).emit("row-change", { id: row._id, row }) + this.io.in(tableId).emit(GridSocketEvent.RowChange, { id: row._id, row }) } emitRowDeletion(ctx: any, id: string) { const tableId = getTableId(ctx) - this.io.in(tableId).emit("row-change", { id, row: null }) + this.io.in(tableId).emit(GridSocketEvent.RowChange, { id, row: null }) } emitTableUpdate(table: Table) { - this.io.in(table._id!).emit("table-change", { id: table._id, table }) + this.io + .in(table._id!) + .emit(GridSocketEvent.TableChange, { id: table._id, table }) } emitTableDeletion(id: string) { - this.io.in(id).emit("table-change", { id, table: null }) + this.io.in(id).emit(GridSocketEvent.TableChange, { id, table: null }) } } diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 5cac959099..a2d2474c77 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -3,14 +3,18 @@ import http from "http" import Koa from "koa" import Cookies from "cookies" import { userAgent } from "koa-useragent" -import { auth } from "@budibase/backend-core" +import { auth, redis } from "@budibase/backend-core" import currentApp from "../middleware/currentapp" import { createAdapter } from "@socket.io/redis-adapter" +import { Socket } from "socket.io" import { getSocketPubSubClients } from "../utilities/redis" -import uuid from "uuid" +import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core" +import { SocketSession } from "@budibase/types" -export default class Socket { +export class BaseSocket { io: Server + path: string + redisClient?: redis.Client constructor( app: Koa, @@ -18,6 +22,7 @@ export default class Socket { path: string = "/", additionalMiddlewares?: any[] ) { + this.path = path this.io = new Server(server, { path, }) @@ -65,18 +70,14 @@ export default class Socket { // Middlewares are finished // Extract some data from our enriched koa context to persist // as metadata for the socket - // Add user info, including a deterministic color and label const { _id, email, firstName, lastName } = ctx.user - socket.data.user = { + socket.data = { _id, email, firstName, lastName, - sessionId: uuid.v4(), + sessionId: socket.id, } - - // Add app ID to help split sockets into rooms - socket.data.appId = ctx.appId next() } }) @@ -86,10 +87,184 @@ export default class Socket { } }) - // Instantiate redis adapter + // Initialise redis before handling connections + this.initialise().then(() => { + this.io.on("connection", async socket => { + // Add built in handler for heartbeats + socket.on(SocketEvent.Heartbeat, async () => { + console.log(socket.data.email, "heartbeat received") + await this.extendSessionTTL(socket.data.sessionId) + }) + + // Add early disconnection handler to clean up and leave room + socket.on("disconnect", async () => { + // Run any custom disconnection logic before we leave the room, + // so that we have access to their room etc before disconnection + await this.onDisconnect(socket) + + // Leave the current room when the user disconnects if we're in one + await this.leaveRoom(socket) + }) + + // Add handlers for this socket + await this.onConnect(socket) + }) + }) + } + + async initialise() { + // Instantiate redis adapter. + // We use a fully qualified key name here as this bypasses the normal + // redis client#s key prefixing. const { pub, sub } = getSocketPubSubClients() - const opts = { key: `socket.io-${path}` } + const opts = { + key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`, + } this.io.adapter(createAdapter(pub, sub, opts)) + + // Fetch redis client + this.redisClient = await redis.clients.getSocketClient() + } + + // Gets the redis key for a certain session ID + getSessionKey(sessionId: string) { + return `${this.path}-session:${sessionId}` + } + + // Gets the redis key for certain room name + getRoomKey(room: string) { + return `${this.path}-room:${room}` + } + + async extendSessionTTL(sessionId: string) { + const key = this.getSessionKey(sessionId) + await this.redisClient?.setExpiry(key, SocketSessionTTL) + } + + // Gets an array of all redis keys of users inside a certain room + async getRoomSessionIds(room: string): Promise { + const keys = await this.redisClient?.get(this.getRoomKey(room)) + return keys || [] + } + + // Sets the list of redis keys for users inside a certain room. + // There is no TTL on the actual room key map itself. + async setRoomSessionIds(room: string, ids: string[]) { + await this.redisClient?.store(this.getRoomKey(room), ids) + } + + // Gets a list of all users inside a certain room + async getRoomSessions(room?: string): Promise { + if (room) { + const sessionIds = await this.getRoomSessionIds(room) + const keys = sessionIds.map(this.getSessionKey.bind(this)) + const sessions = await this.redisClient?.bulkGet(keys) + return Object.values(sessions || {}) + } else { + return [] + } + } + + // Detects keys which have been pruned from redis due to TTL expiry in a certain + // room and broadcasts disconnection messages to ensure clients are aware + async pruneRoom(room: string) { + const sessionIds = await this.getRoomSessionIds(room) + const sessionsExist = await Promise.all( + sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id))) + ) + const prunedSessionIds = sessionIds.filter((id, idx) => { + if (!sessionsExist[idx]) { + this.io.to(room).emit(SocketEvent.UserDisconnect, sessionIds[idx]) + return false + } + return true + }) + + // Store new pruned keys + await this.setRoomSessionIds(room, prunedSessionIds) + } + + // Adds a user to a certain room + async joinRoom(socket: Socket, room: string) { + if (!room) { + return + } + // Prune room before joining + await this.pruneRoom(room) + + // Check if we're already in a room, as we'll need to leave if we are before we + // can join a different room + const oldRoom = socket.data.room + if (oldRoom && oldRoom !== room) { + await this.leaveRoom(socket) + } + + // Join new room + if (!oldRoom || oldRoom !== room) { + socket.join(room) + socket.data.room = room + } + + // Store in redis + // @ts-ignore + let user: SocketSession = socket.data + const { sessionId } = user + const key = this.getSessionKey(sessionId) + await this.redisClient?.store(key, user, SocketSessionTTL) + const sessionIds = await this.getRoomSessionIds(room) + if (!sessionIds.includes(sessionId)) { + await this.setRoomSessionIds(room, [...sessionIds, sessionId]) + } + + // Notify other users + socket.to(room).emit(SocketEvent.UserUpdate, user) + } + + // Disconnects a socket from its current room + async leaveRoom(socket: Socket) { + // @ts-ignore + let user: SocketSession = socket.data + const { room, sessionId } = user + if (!room) { + return + } + + // Leave room + socket.leave(room) + socket.data.room = undefined + + // Delete from redis + const key = this.getSessionKey(sessionId) + await this.redisClient?.delete(key) + const sessionIds = await this.getRoomSessionIds(room) + await this.setRoomSessionIds( + room, + sessionIds.filter(id => id !== sessionId) + ) + + // Notify other users + socket.to(room).emit(SocketEvent.UserDisconnect, sessionId) + } + + // Updates a connected user's metadata, assuming a room change is not required. + async updateUser(socket: Socket, patch: Object) { + socket.data = { + ...socket.data, + ...patch, + } + + // If we're in a room, notify others of this change and update redis + if (socket.data.room) { + await this.joinRoom(socket, socket.data.room) + } + } + + async onConnect(socket: Socket) { + // Override + } + + async onDisconnect(socket: Socket) { + // Override } // Emit an event to all sockets diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 35b02d1e15..5802a6d88a 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -67,3 +67,24 @@ export const SqlNumberTypeRangeMap = { min: -8388608, }, } + +export enum SocketEvent { + UserUpdate = "UserUpdate", + UserDisconnect = "UserDisconnect", + Heartbeat = "Heartbeat", +} + +export enum GridSocketEvent { + RowChange = "RowChange", + TableChange = "TableChange", + SelectTable = "SelectTable", + SelectCell = "SelectCell", +} + +export enum BuilderSocketEvent { + SelectApp = "SelectApp", + TableChange = "TableChange", + DatasourceChange = "DatasourceChange", +} + +export const SocketSessionTTL = 60 diff --git a/packages/types/src/sdk/index.ts b/packages/types/src/sdk/index.ts index ed44c13667..49d0387a82 100644 --- a/packages/types/src/sdk/index.ts +++ b/packages/types/src/sdk/index.ts @@ -17,3 +17,4 @@ export * from "./auditLogs" export * from "./sso" export * from "./user" export * from "./cli" +export * from "./websocket" diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts new file mode 100644 index 0000000000..4fa7e155d6 --- /dev/null +++ b/packages/types/src/sdk/websocket.ts @@ -0,0 +1,8 @@ +export interface SocketSession { + _id: string + email: string + firstName?: string + lastName?: string + sessionId: string + room?: string +}