Merge pull request #10753 from Budibase/websocket-redis-integration

Websocket redis rewrite for multi-user collaboration
This commit is contained in:
Andrew Kingston 2023-05-31 17:32:15 +01:00 committed by GitHub
commit 4fe3aeaf6d
18 changed files with 359 additions and 137 deletions

View File

@ -6,7 +6,8 @@ let userClient: Client,
appClient: Client, appClient: Client,
cacheClient: Client, cacheClient: Client,
writethroughClient: Client, writethroughClient: Client,
lockClient: Client lockClient: Client,
socketClient: Client
async function init() { async function init() {
userClient = await new Client(utils.Databases.USER_CACHE).init() userClient = await new Client(utils.Databases.USER_CACHE).init()
@ -14,9 +15,10 @@ async function init() {
appClient = await new Client(utils.Databases.APP_METADATA).init() appClient = await new Client(utils.Databases.APP_METADATA).init()
cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init() cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init()
lockClient = await new Client(utils.Databases.LOCKS).init() lockClient = await new Client(utils.Databases.LOCKS).init()
writethroughClient = await new Client( writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init()
utils.Databases.WRITE_THROUGH, socketClient = await new Client(
utils.SelectableDatabase.WRITE_THROUGH utils.Databases.SOCKET_IO,
utils.SelectableDatabase.SOCKET_IO
).init() ).init()
} }
@ -27,6 +29,7 @@ export async function shutdown() {
if (cacheClient) await cacheClient.finish() if (cacheClient) await cacheClient.finish()
if (writethroughClient) await writethroughClient.finish() if (writethroughClient) await writethroughClient.finish()
if (lockClient) await lockClient.finish() if (lockClient) await lockClient.finish()
if (socketClient) await socketClient.finish()
} }
process.on("exit", async () => { process.on("exit", async () => {
@ -74,3 +77,10 @@ export async function getLockClient() {
} }
return lockClient return lockClient
} }
export async function getSocketClient() {
if (!socketClient) {
await init()
}
return socketClient
}

View File

@ -183,6 +183,9 @@ class RedisWrapper {
CLOSED = false CLOSED = false
init(this._select) init(this._select)
await waitForConnection(this._select) await waitForConnection(this._select)
if (this._select && !env.isTest()) {
this.getClient().select(this._select)
}
return this return this
} }
@ -209,6 +212,11 @@ class RedisWrapper {
return this.getClient().keys(addDbPrefix(db, pattern)) return this.getClient().keys(addDbPrefix(db, pattern))
} }
async exists(key: string) {
const db = this._db
return await this.getClient().exists(addDbPrefix(db, key))
}
async get(key: string) { async get(key: string) {
const db = this._db const db = this._db
let response = await this.getClient().get(addDbPrefix(db, key)) let response = await this.getClient().get(addDbPrefix(db, key))

View File

@ -41,7 +41,7 @@ export enum Databases {
*/ */
export enum SelectableDatabase { export enum SelectableDatabase {
DEFAULT = 0, DEFAULT = 0,
WRITE_THROUGH = 1, SOCKET_IO = 1,
UNUSED_1 = 2, UNUSED_1 = 2,
UNUSED_2 = 3, UNUSED_2 = 3,
UNUSED_3 = 4, UNUSED_3 = 4,

View File

@ -118,7 +118,7 @@ export const getFrontendStore = () => {
}, },
initialise: async pkg => { initialise: async pkg => {
const { layouts, screens, application, clientLibPath, hasLock } = pkg const { layouts, screens, application, clientLibPath, hasLock } = pkg
websocket = createBuilderWebsocket() websocket = createBuilderWebsocket(application.appId)
await store.actions.components.refreshDefinitions(application.appId) await store.actions.components.refreshDefinitions(application.appId)
// Reset store state // Reset store state

View File

@ -20,9 +20,9 @@ export const getUserStore = () => {
} }
} }
const removeUser = user => { const removeUser = sessionId => {
store.update(state => { store.update(state => {
return state.filter(x => x.sessionId !== user.sessionId) return state.filter(x => x.sessionId !== sessionId)
}) })
} }

View File

@ -1,13 +1,14 @@
import { createWebsocket } from "@budibase/frontend-core" import { createWebsocket } from "@budibase/frontend-core"
import { userStore } from "builderStore" import { userStore } from "builderStore"
import { datasources, tables } from "stores/backend" import { datasources, tables } from "stores/backend"
import { SocketEvent, BuilderSocketEvent } from "@budibase/shared-core"
export const createBuilderWebsocket = () => { export const createBuilderWebsocket = appId => {
const socket = createWebsocket("/socket/builder") const socket = createWebsocket("/socket/builder")
// Connection events // Built-in events
socket.on("connect", () => { socket.on("connect", () => {
socket.emit("get-users", null, response => { socket.emit(BuilderSocketEvent.SelectApp, appId, response => {
userStore.actions.init(response.users) userStore.actions.init(response.users)
}) })
}) })
@ -16,24 +17,23 @@ export const createBuilderWebsocket = () => {
}) })
// User events // User events
socket.on("user-update", userStore.actions.updateUser) socket.on(SocketEvent.UserUpdate, userStore.actions.updateUser)
socket.on("user-disconnect", userStore.actions.removeUser) socket.on(SocketEvent.UserDisconnect, userStore.actions.removeUser)
// Table events // Table events
socket.on("table-change", ({ id, table }) => { socket.on(BuilderSocketEvent.TableChange, ({ id, table }) => {
tables.replaceTable(id, table) tables.replaceTable(id, table)
}) })
// Datasource events // Datasource events
socket.on("datasource-change", ({ id, datasource }) => { socket.on(BuilderSocketEvent.DatasourceChange, ({ id, datasource }) => {
datasources.replaceDatasource(id, datasource) datasources.replaceDatasource(id, datasource)
}) })
return { // Clean up user store on disconnect
...socket, socket.on("disconnect", () => {
disconnect: () => {
socket?.disconnect()
userStore.actions.reset() userStore.actions.reset()
}, })
}
return socket
} }

View File

@ -18,7 +18,7 @@ export const initWebsocket = () => {
} }
// Initialise connection // Initialise connection
socket = createWebsocket("/socket/client") socket = createWebsocket("/socket/client", false)
// Event handlers // Event handlers
socket.on("plugin-update", data => { socket.on("plugin-update", data => {

View File

@ -1,5 +1,6 @@
import { get } from "svelte/store" import { get } from "svelte/store"
import { createWebsocket } from "../../../utils" import { createWebsocket } from "../../../utils"
import { SocketEvent, GridSocketEvent } from "@budibase/shared-core"
export const createGridWebsocket = context => { export const createGridWebsocket = context => {
const { rows, tableId, users, focusedCellId, table } = context const { rows, tableId, users, focusedCellId, table } = context
@ -10,13 +11,13 @@ export const createGridWebsocket = context => {
return return
} }
// Identify which table we are editing // Identify which table we are editing
socket.emit("select-table", tableId, response => { socket.emit(GridSocketEvent.SelectTable, tableId, response => {
// handle initial connection info // handle initial connection info
users.set(response.users) users.set(response.users)
}) })
} }
// Connection events // Built-in events
socket.on("connect", () => { socket.on("connect", () => {
connectToTable(get(tableId)) connectToTable(get(tableId))
}) })
@ -25,25 +26,25 @@ export const createGridWebsocket = context => {
}) })
// User events // User events
socket.on("user-update", user => { socket.on(SocketEvent.UserUpdate, user => {
users.actions.updateUser(user) users.actions.updateUser(user)
}) })
socket.on("user-disconnect", user => { socket.on(SocketEvent.UserDisconnect, user => {
users.actions.removeUser(user) users.actions.removeUser(user)
}) })
// Row events // Row events
socket.on("row-change", async data => { socket.on(GridSocketEvent.RowChange, async data => {
if (data.id) { if (data.id) {
rows.actions.replaceRow(data.id, data.row) rows.actions.replaceRow(data.id, data.row)
} else if (data.row.id) { } else if (data.row.id) {
// Handle users table edge case // Handle users table edge cased
await rows.actions.refreshRow(data.row.id) await rows.actions.refreshRow(data.row.id)
} }
}) })
// Table events // Table events
socket.on("table-change", data => { socket.on(GridSocketEvent.TableChange, data => {
// Only update table if one exists. If the table was deleted then we don't // Only update table if one exists. If the table was deleted then we don't
// want to know - let the builder navigate away // want to know - let the builder navigate away
if (data.table) { if (data.table) {
@ -56,7 +57,7 @@ export const createGridWebsocket = context => {
// Notify selected cell changes // Notify selected cell changes
focusedCellId.subscribe($focusedCellId => { focusedCellId.subscribe($focusedCellId => {
socket.emit("select-cell", $focusedCellId) socket.emit(GridSocketEvent.SelectCell, $focusedCellId)
}) })
return () => socket?.disconnect() return () => socket?.disconnect()

View File

@ -51,9 +51,9 @@ export const deriveStores = context => {
} }
} }
const removeUser = user => { const removeUser = sessionId => {
users.update(state => { users.update(state => {
return state.filter(x => x.sessionId !== user.sessionId) return state.filter(x => x.sessionId !== sessionId)
}) })
} }

View File

@ -1,6 +1,7 @@
import { io } from "socket.io-client" import { io } from "socket.io-client"
import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core"
export const createWebsocket = path => { export const createWebsocket = (path, heartbeat = true) => {
if (!path) { if (!path) {
throw "A websocket path must be provided" throw "A websocket path must be provided"
} }
@ -10,7 +11,7 @@ export const createWebsocket = path => {
const proto = tls ? "wss:" : "ws:" const proto = tls ? "wss:" : "ws:"
const host = location.hostname const host = location.hostname
const port = location.port || (tls ? 443 : 80) const port = location.port || (tls ? 443 : 80)
return io(`${proto}//${host}:${port}`, { const socket = io(`${proto}//${host}:${port}`, {
path, path,
// Cap reconnection attempts to 3 (total of 15 seconds before giving up) // Cap reconnection attempts to 3 (total of 15 seconds before giving up)
reconnectionAttempts: 3, reconnectionAttempts: 3,
@ -23,4 +24,18 @@ export const createWebsocket = path => {
// will only work with sticky sessions which we don't have // will only work with sticky sessions which we don't have
transports: ["websocket"], transports: ["websocket"],
}) })
// Set up a heartbeat that's half of the session TTL
let interval
if (heartbeat) {
interval = setInterval(() => {
socket.emit(SocketEvent.Heartbeat)
}, SocketSessionTTL * 500)
}
socket.on("disconnect", () => {
clearInterval(interval)
})
return socket
} }

View File

@ -4,9 +4,10 @@ import { ContextUser } from "@budibase/types"
const APP_DEV_LOCK_SECONDS = 600 const APP_DEV_LOCK_SECONDS = 600
const AUTOMATION_TEST_FLAG_SECONDS = 60 const AUTOMATION_TEST_FLAG_SECONDS = 60
let devAppClient: any, debounceClient: any, flagClient: any, socketClient: any let devAppClient: any, debounceClient: any, flagClient: any
// We need to maintain a duplicate client for socket.io pub/sub // We need to maintain a duplicate client for socket.io pub/sub
let socketClient: any
let socketSubClient: any let socketSubClient: any
// We init this as we want to keep the connection open all the time // We init this as we want to keep the connection open all the time
@ -15,21 +16,20 @@ export async function init() {
devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS) devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS)
debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE) debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE)
flagClient = new redis.Client(redis.utils.Databases.FLAGS) flagClient = new redis.Client(redis.utils.Databases.FLAGS)
socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO)
await devAppClient.init() await devAppClient.init()
await debounceClient.init() await debounceClient.init()
await flagClient.init() await flagClient.init()
await socketClient.init()
// Duplicate the socket client for pub/sub // Duplicate the socket client for pub/sub
socketClient = await redis.clients.getSocketClient()
socketSubClient = socketClient.getClient().duplicate() socketSubClient = socketClient.getClient().duplicate()
} }
export async function shutdown() { export async function shutdown() {
console.log("REDIS SHUTDOWN")
if (devAppClient) await devAppClient.finish() if (devAppClient) await devAppClient.finish()
if (debounceClient) await debounceClient.finish() if (debounceClient) await debounceClient.finish()
if (flagClient) await flagClient.finish() if (flagClient) await flagClient.finish()
if (socketClient) await socketClient.finish()
if (socketSubClient) socketSubClient.disconnect() if (socketSubClient) socketSubClient.disconnect()
// shutdown core clients // shutdown core clients
await redis.clients.shutdown() await redis.clients.shutdown()

View File

@ -1,69 +1,74 @@
import authorized from "../middleware/authorized" import authorized from "../middleware/authorized"
import Socket from "./websocket" import { BaseSocket } from "./websocket"
import { permissions } from "@budibase/backend-core" import { permissions } from "@budibase/backend-core"
import http from "http" import http from "http"
import Koa from "koa" import Koa from "koa"
import { Datasource, Table } from "@budibase/types" import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types"
import { gridSocket } from "./index" import { gridSocket } from "./index"
import { clearLock } from "../utilities/redis" import { clearLock } from "../utilities/redis"
import { Socket } from "socket.io"
import { BuilderSocketEvent } from "@budibase/shared-core"
export default class BuilderSocket extends Socket { export default class BuilderSocket extends BaseSocket {
constructor(app: Koa, server: http.Server) { constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/builder", [authorized(permissions.BUILDER)]) super(app, server, "/socket/builder", [authorized(permissions.BUILDER)])
}
this.io.on("connection", socket => { async onConnect(socket: Socket) {
// Join a room for this app // Initial identification of selected app
const user = socket.data.user socket.on(BuilderSocketEvent.SelectApp, async (appId, callback) => {
const appId = socket.data.appId await this.joinRoom(socket, appId)
socket.join(appId)
socket.to(appId).emit("user-update", user)
// Initial identification of connected spreadsheet // Reply with all users in current room
socket.on("get-users", async (payload, callback) => { const sessions = await this.getRoomSessions(appId)
const sockets = await this.io.in(appId).fetchSockets() callback({ users: sessions })
callback({
users: sockets.map(socket => socket.data.user),
})
}) })
}
// Disconnection cleanup async onDisconnect(socket: Socket) {
socket.on("disconnect", async () => {
socket.to(appId).emit("user-disconnect", user)
// Remove app lock from this user if they have no other connections // Remove app lock from this user if they have no other connections
try { try {
const sockets = await this.io.in(appId).fetchSockets() // @ts-ignore
const hasOtherConnection = sockets.some(socket => { const session: SocketSession = socket.data
const { _id, sessionId } = socket.data.user const { _id, sessionId, room } = session
return _id === user._id && sessionId !== user.sessionId const sessions = await this.getRoomSessions(room)
const hasOtherSession = sessions.some(otherSession => {
return _id === otherSession._id && sessionId !== otherSession.sessionId
}) })
if (!hasOtherConnection) { if (!hasOtherSession && room) {
await clearLock(appId, user) // @ts-ignore
const user: ContextUser = { _id: socket.data._id }
await clearLock(room, user)
} }
} catch (e) { } catch (e) {
// This is fine, just means this user didn't hold the lock // This is fine, just means this user didn't hold the lock
} }
})
})
} }
emitTableUpdate(ctx: any, table: Table) { emitTableUpdate(ctx: any, table: Table) {
this.io.in(ctx.appId).emit("table-change", { id: table._id, table }) this.io
.in(ctx.appId)
.emit(BuilderSocketEvent.TableChange, { id: table._id, table })
gridSocket.emitTableUpdate(table) gridSocket.emitTableUpdate(table)
} }
emitTableDeletion(ctx: any, id: string) { emitTableDeletion(ctx: any, id: string) {
this.io.in(ctx.appId).emit("table-change", { id, table: null }) this.io
.in(ctx.appId)
.emit(BuilderSocketEvent.TableChange, { id, table: null })
gridSocket.emitTableDeletion(id) gridSocket.emitTableDeletion(id)
} }
emitDatasourceUpdate(ctx: any, datasource: Datasource) { emitDatasourceUpdate(ctx: any, datasource: Datasource) {
this.io this.io.in(ctx.appId).emit(BuilderSocketEvent.DatasourceChange, {
.in(ctx.appId) id: datasource._id,
.emit("datasource-change", { id: datasource._id, datasource }) datasource,
})
} }
emitDatasourceDeletion(ctx: any, id: string) { emitDatasourceDeletion(ctx: any, id: string) {
this.io.in(ctx.appId).emit("datasource-change", { id, datasource: null }) this.io
.in(ctx.appId)
.emit(BuilderSocketEvent.DatasourceChange, { id, datasource: null })
} }
} }

View File

@ -1,10 +1,10 @@
import Socket from "./websocket" import { BaseSocket } from "./websocket"
import authorized from "../middleware/authorized" import authorized from "../middleware/authorized"
import http from "http" import http from "http"
import Koa from "koa" import Koa from "koa"
import { permissions } from "@budibase/backend-core" import { permissions } from "@budibase/backend-core"
export default class ClientAppWebsocket extends Socket { export default class ClientAppWebsocket extends BaseSocket {
constructor(app: Koa, server: http.Server) { constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/client", [authorized(permissions.BUILDER)]) super(app, server, "/socket/client", [authorized(permissions.BUILDER)])
} }

View File

@ -1,73 +1,51 @@
import authorized from "../middleware/authorized" import authorized from "../middleware/authorized"
import Socket from "./websocket" import { BaseSocket } from "./websocket"
import { permissions } from "@budibase/backend-core" import { permissions } from "@budibase/backend-core"
import http from "http" import http from "http"
import Koa from "koa" import Koa from "koa"
import { getTableId } from "../api/controllers/row/utils" import { getTableId } from "../api/controllers/row/utils"
import { Row, Table } from "@budibase/types" import { Row, Table } from "@budibase/types"
import { Socket } from "socket.io"
import { GridSocketEvent } from "@budibase/shared-core"
export default class GridSocket extends Socket { export default class GridSocket extends BaseSocket {
constructor(app: Koa, server: http.Server) { constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/grid", [authorized(permissions.BUILDER)]) super(app, server, "/socket/grid", [authorized(permissions.BUILDER)])
this.io.on("connection", socket => {
const user = socket.data.user
// Socket state
let currentRoom: string
// Initial identification of connected spreadsheet
socket.on("select-table", async (tableId, callback) => {
// Leave current room
if (currentRoom) {
socket.to(currentRoom).emit("user-disconnect", user)
socket.leave(currentRoom)
} }
// Join new room async onConnect(socket: Socket) {
currentRoom = tableId // Initial identification of connected spreadsheet
socket.join(currentRoom) socket.on(GridSocketEvent.SelectTable, async (tableId, callback) => {
socket.to(currentRoom).emit("user-update", user) await this.joinRoom(socket, tableId)
// Reply with all users in current room // Reply with all users in current room
const sockets = await this.io.in(currentRoom).fetchSockets() const sessions = await this.getRoomSessions(tableId)
callback({ callback({ users: sessions })
users: sockets.map(socket => socket.data.user),
})
}) })
// Handle users selecting a new cell // Handle users selecting a new cell
socket.on("select-cell", cellId => { socket.on(GridSocketEvent.SelectCell, cellId => {
socket.data.user.focusedCellId = cellId this.updateUser(socket, { focusedCellId: cellId })
if (currentRoom) {
socket.to(currentRoom).emit("user-update", user)
}
})
// Disconnection cleanup
socket.on("disconnect", () => {
if (currentRoom) {
socket.to(currentRoom).emit("user-disconnect", user)
}
})
}) })
} }
emitRowUpdate(ctx: any, row: Row) { emitRowUpdate(ctx: any, row: Row) {
const tableId = getTableId(ctx) const tableId = getTableId(ctx)
this.io.in(tableId).emit("row-change", { id: row._id, row }) this.io.in(tableId).emit(GridSocketEvent.RowChange, { id: row._id, row })
} }
emitRowDeletion(ctx: any, id: string) { emitRowDeletion(ctx: any, id: string) {
const tableId = getTableId(ctx) const tableId = getTableId(ctx)
this.io.in(tableId).emit("row-change", { id, row: null }) this.io.in(tableId).emit(GridSocketEvent.RowChange, { id, row: null })
} }
emitTableUpdate(table: Table) { emitTableUpdate(table: Table) {
this.io.in(table._id!).emit("table-change", { id: table._id, table }) this.io
.in(table._id!)
.emit(GridSocketEvent.TableChange, { id: table._id, table })
} }
emitTableDeletion(id: string) { emitTableDeletion(id: string) {
this.io.in(id).emit("table-change", { id, table: null }) this.io.in(id).emit(GridSocketEvent.TableChange, { id, table: null })
} }
} }

View File

@ -3,14 +3,18 @@ import http from "http"
import Koa from "koa" import Koa from "koa"
import Cookies from "cookies" import Cookies from "cookies"
import { userAgent } from "koa-useragent" import { userAgent } from "koa-useragent"
import { auth } from "@budibase/backend-core" import { auth, redis } from "@budibase/backend-core"
import currentApp from "../middleware/currentapp" import currentApp from "../middleware/currentapp"
import { createAdapter } from "@socket.io/redis-adapter" import { createAdapter } from "@socket.io/redis-adapter"
import { Socket } from "socket.io"
import { getSocketPubSubClients } from "../utilities/redis" import { getSocketPubSubClients } from "../utilities/redis"
import uuid from "uuid" import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core"
import { SocketSession } from "@budibase/types"
export default class Socket { export class BaseSocket {
io: Server io: Server
path: string
redisClient?: redis.Client
constructor( constructor(
app: Koa, app: Koa,
@ -18,6 +22,7 @@ export default class Socket {
path: string = "/", path: string = "/",
additionalMiddlewares?: any[] additionalMiddlewares?: any[]
) { ) {
this.path = path
this.io = new Server(server, { this.io = new Server(server, {
path, path,
}) })
@ -65,18 +70,14 @@ export default class Socket {
// Middlewares are finished // Middlewares are finished
// Extract some data from our enriched koa context to persist // Extract some data from our enriched koa context to persist
// as metadata for the socket // as metadata for the socket
// Add user info, including a deterministic color and label
const { _id, email, firstName, lastName } = ctx.user const { _id, email, firstName, lastName } = ctx.user
socket.data.user = { socket.data = {
_id, _id,
email, email,
firstName, firstName,
lastName, lastName,
sessionId: uuid.v4(), sessionId: socket.id,
} }
// Add app ID to help split sockets into rooms
socket.data.appId = ctx.appId
next() next()
} }
}) })
@ -86,10 +87,184 @@ export default class Socket {
} }
}) })
// Instantiate redis adapter // Initialise redis before handling connections
this.initialise().then(() => {
this.io.on("connection", async socket => {
// Add built in handler for heartbeats
socket.on(SocketEvent.Heartbeat, async () => {
console.log(socket.data.email, "heartbeat received")
await this.extendSessionTTL(socket.data.sessionId)
})
// Add early disconnection handler to clean up and leave room
socket.on("disconnect", async () => {
// Run any custom disconnection logic before we leave the room,
// so that we have access to their room etc before disconnection
await this.onDisconnect(socket)
// Leave the current room when the user disconnects if we're in one
await this.leaveRoom(socket)
})
// Add handlers for this socket
await this.onConnect(socket)
})
})
}
async initialise() {
// Instantiate redis adapter.
// We use a fully qualified key name here as this bypasses the normal
// redis client#s key prefixing.
const { pub, sub } = getSocketPubSubClients() const { pub, sub } = getSocketPubSubClients()
const opts = { key: `socket.io-${path}` } const opts = {
key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`,
}
this.io.adapter(createAdapter(pub, sub, opts)) this.io.adapter(createAdapter(pub, sub, opts))
// Fetch redis client
this.redisClient = await redis.clients.getSocketClient()
}
// Gets the redis key for a certain session ID
getSessionKey(sessionId: string) {
return `${this.path}-session:${sessionId}`
}
// Gets the redis key for certain room name
getRoomKey(room: string) {
return `${this.path}-room:${room}`
}
async extendSessionTTL(sessionId: string) {
const key = this.getSessionKey(sessionId)
await this.redisClient?.setExpiry(key, SocketSessionTTL)
}
// Gets an array of all redis keys of users inside a certain room
async getRoomSessionIds(room: string): Promise<string[]> {
const keys = await this.redisClient?.get(this.getRoomKey(room))
return keys || []
}
// Sets the list of redis keys for users inside a certain room.
// There is no TTL on the actual room key map itself.
async setRoomSessionIds(room: string, ids: string[]) {
await this.redisClient?.store(this.getRoomKey(room), ids)
}
// Gets a list of all users inside a certain room
async getRoomSessions(room?: string): Promise<SocketSession[]> {
if (room) {
const sessionIds = await this.getRoomSessionIds(room)
const keys = sessionIds.map(this.getSessionKey.bind(this))
const sessions = await this.redisClient?.bulkGet(keys)
return Object.values(sessions || {})
} else {
return []
}
}
// Detects keys which have been pruned from redis due to TTL expiry in a certain
// room and broadcasts disconnection messages to ensure clients are aware
async pruneRoom(room: string) {
const sessionIds = await this.getRoomSessionIds(room)
const sessionsExist = await Promise.all(
sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id)))
)
const prunedSessionIds = sessionIds.filter((id, idx) => {
if (!sessionsExist[idx]) {
this.io.to(room).emit(SocketEvent.UserDisconnect, sessionIds[idx])
return false
}
return true
})
// Store new pruned keys
await this.setRoomSessionIds(room, prunedSessionIds)
}
// Adds a user to a certain room
async joinRoom(socket: Socket, room: string) {
if (!room) {
return
}
// Prune room before joining
await this.pruneRoom(room)
// Check if we're already in a room, as we'll need to leave if we are before we
// can join a different room
const oldRoom = socket.data.room
if (oldRoom && oldRoom !== room) {
await this.leaveRoom(socket)
}
// Join new room
if (!oldRoom || oldRoom !== room) {
socket.join(room)
socket.data.room = room
}
// Store in redis
// @ts-ignore
let user: SocketSession = socket.data
const { sessionId } = user
const key = this.getSessionKey(sessionId)
await this.redisClient?.store(key, user, SocketSessionTTL)
const sessionIds = await this.getRoomSessionIds(room)
if (!sessionIds.includes(sessionId)) {
await this.setRoomSessionIds(room, [...sessionIds, sessionId])
}
// Notify other users
socket.to(room).emit(SocketEvent.UserUpdate, user)
}
// Disconnects a socket from its current room
async leaveRoom(socket: Socket) {
// @ts-ignore
let user: SocketSession = socket.data
const { room, sessionId } = user
if (!room) {
return
}
// Leave room
socket.leave(room)
socket.data.room = undefined
// Delete from redis
const key = this.getSessionKey(sessionId)
await this.redisClient?.delete(key)
const sessionIds = await this.getRoomSessionIds(room)
await this.setRoomSessionIds(
room,
sessionIds.filter(id => id !== sessionId)
)
// Notify other users
socket.to(room).emit(SocketEvent.UserDisconnect, sessionId)
}
// Updates a connected user's metadata, assuming a room change is not required.
async updateUser(socket: Socket, patch: Object) {
socket.data = {
...socket.data,
...patch,
}
// If we're in a room, notify others of this change and update redis
if (socket.data.room) {
await this.joinRoom(socket, socket.data.room)
}
}
async onConnect(socket: Socket) {
// Override
}
async onDisconnect(socket: Socket) {
// Override
} }
// Emit an event to all sockets // Emit an event to all sockets

View File

@ -67,3 +67,24 @@ export const SqlNumberTypeRangeMap = {
min: -8388608, min: -8388608,
}, },
} }
export enum SocketEvent {
UserUpdate = "UserUpdate",
UserDisconnect = "UserDisconnect",
Heartbeat = "Heartbeat",
}
export enum GridSocketEvent {
RowChange = "RowChange",
TableChange = "TableChange",
SelectTable = "SelectTable",
SelectCell = "SelectCell",
}
export enum BuilderSocketEvent {
SelectApp = "SelectApp",
TableChange = "TableChange",
DatasourceChange = "DatasourceChange",
}
export const SocketSessionTTL = 60

View File

@ -17,3 +17,4 @@ export * from "./auditLogs"
export * from "./sso" export * from "./sso"
export * from "./user" export * from "./user"
export * from "./cli" export * from "./cli"
export * from "./websocket"

View File

@ -0,0 +1,8 @@
export interface SocketSession {
_id: string
email: string
firstName?: string
lastName?: string
sessionId: string
room?: string
}