diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index bbece02060..056620c085 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -6,7 +6,8 @@ let userClient: Client, appClient: Client, cacheClient: Client, writethroughClient: Client, - lockClient: Client + lockClient: Client, + socketClient: Client async function init() { userClient = await new Client(utils.Databases.USER_CACHE).init() @@ -15,6 +16,7 @@ async function init() { cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init() lockClient = await new Client(utils.Databases.LOCKS).init() writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init() + socketClient = await new Client(utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO).init() } export async function shutdown() { @@ -24,6 +26,7 @@ export async function shutdown() { if (cacheClient) await cacheClient.finish() if (writethroughClient) await writethroughClient.finish() if (lockClient) await lockClient.finish() + if (socketClient) await socketClient.finish() } process.on("exit", async () => { @@ -71,3 +74,10 @@ export async function getLockClient() { } return lockClient } + +export async function getSocketClient() { + if (!socketClient) { + await init() + } + return socketClient +} \ No newline at end of file diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index de947f5edc..ea1f1944a1 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -212,6 +212,11 @@ class RedisWrapper { return this.getClient().keys(addDbPrefix(db, pattern)) } + async exists(key: string) { + const db = this._db + return await this.getClient().exists(addDbPrefix(db, key)) + } + async get(key: string) { const db = this._db let response = await this.getClient().get(addDbPrefix(db, key)) diff --git a/packages/builder/src/builderStore/store/frontend.js b/packages/builder/src/builderStore/store/frontend.js index c921ddef3b..4f2a1bef6f 100644 --- a/packages/builder/src/builderStore/store/frontend.js +++ b/packages/builder/src/builderStore/store/frontend.js @@ -118,7 +118,7 @@ export const getFrontendStore = () => { }, initialise: async pkg => { const { layouts, screens, application, clientLibPath, hasLock } = pkg - websocket = createBuilderWebsocket() + websocket = createBuilderWebsocket(application.appId) await store.actions.components.refreshDefinitions(application.appId) // Reset store state diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js index 79138b0767..ffaca7a025 100644 --- a/packages/builder/src/builderStore/websocket.js +++ b/packages/builder/src/builderStore/websocket.js @@ -3,12 +3,12 @@ import { userStore } from "builderStore" import { datasources, tables } from "stores/backend" import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core" -export const createBuilderWebsocket = () => { +export const createBuilderWebsocket = appId => { const socket = createWebsocket("/socket/builder") // Built-in events socket.on("connect", () => { - socket.emit(SocketEvents.GetUsers, null, response => { + socket.emit(BuilderSocketEvents.SelectApp, appId, response => { userStore.actions.init(response.users) }) }) @@ -30,11 +30,10 @@ export const createBuilderWebsocket = () => { datasources.replaceDatasource(id, datasource) }) - return { - ...socket, - disconnect: () => { - socket?.disconnect() - userStore.actions.reset() - }, - } + // Clean up user store on disconnect + socket.on("disconnect", () => { + userStore.actions.reset() + }) + + return socket } diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js index 6de9690253..6f907f5270 100644 --- a/packages/frontend-core/src/utils/websocket.js +++ b/packages/frontend-core/src/utils/websocket.js @@ -1,6 +1,7 @@ import { io } from "socket.io-client" +import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core" -export const createWebsocket = path => { +export const createWebsocket = (path, heartbeat = true) => { if (!path) { throw "A websocket path must be provided" } @@ -10,7 +11,7 @@ export const createWebsocket = path => { const proto = tls ? "wss:" : "ws:" const host = location.hostname const port = location.port || (tls ? 443 : 80) - return io(`${proto}//${host}:${port}`, { + const socket = io(`${proto}//${host}:${port}`, { path, // Cap reconnection attempts to 3 (total of 15 seconds before giving up) reconnectionAttempts: 3, @@ -21,6 +22,22 @@ export const createWebsocket = path => { timeout: 4000, // Disable polling and rely on websocket only, as HTTP transport // will only work with sticky sessions which we don't have - // transports: ["websocket"], + transports: ["websocket"], }) + + // Set up a heartbeat that's half of the session TTL + let interval + if (heartbeat) { + interval = setInterval(() => { + console.log("Sending heartbeat") + socket.emit(SocketEvents.Heartbeat) + }, SocketSessionTTL * 500) + } + + socket.on("disconnect", () => { + console.log("clear interval") + clearInterval(interval) + }) + + return socket } diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts index f2625b9cfa..5785d79d2a 100644 --- a/packages/server/src/utilities/redis.ts +++ b/packages/server/src/utilities/redis.ts @@ -1,12 +1,13 @@ import { redis } from "@budibase/backend-core" import { getGlobalIDFromUserMetadataID } from "../db/utils" -import { ContextUser, SocketUser } from "@budibase/types" +import { ContextUser } from "@budibase/types" const APP_DEV_LOCK_SECONDS = 600 const AUTOMATION_TEST_FLAG_SECONDS = 60 -let devAppClient: any, debounceClient: any, flagClient: any, socketClient: any +let devAppClient: any, debounceClient: any, flagClient: any // We need to maintain a duplicate client for socket.io pub/sub +let socketClient: any let socketSubClient: any // We init this as we want to keep the connection open all the time @@ -15,13 +16,12 @@ export async function init() { devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS) debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE) flagClient = new redis.Client(redis.utils.Databases.FLAGS) - socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO, redis.utils.SelectableDatabase.SOCKET_IO) await devAppClient.init() await debounceClient.init() await flagClient.init() - await socketClient.init() // Duplicate the socket client for pub/sub + socketClient = await redis.clients.getSocketClient() socketSubClient = socketClient.getClient().duplicate() } @@ -30,7 +30,6 @@ export async function shutdown() { if (devAppClient) await devAppClient.finish() if (debounceClient) await debounceClient.finish() if (flagClient) await flagClient.finish() - if (socketClient) await socketClient.finish() if (socketSubClient) socketSubClient.disconnect() // shutdown core clients await redis.clients.shutdown() @@ -98,14 +97,6 @@ export async function clearTestFlag(id: string) { await devAppClient.delete(id) } -export async function getSocketUsers(path: string, room: string) { - return await socketClient.get(`${path}:${room}`) -} - -export async function setSocketUsers(path: string, room: string, users: SocketUser[]) { - await socketClient.store(`${path}:${room}`, users) -} - export function getSocketPubSubClients() { return { pub: socketClient.getClient(), diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts index 6fe02be0ba..9fbedbe9e2 100644 --- a/packages/server/src/websockets/builder.ts +++ b/packages/server/src/websockets/builder.ts @@ -3,7 +3,7 @@ import { BaseSocket } from "./websocket" import { permissions } from "@budibase/backend-core" import http from "http" import Koa from "koa" -import { Datasource, Table, SocketUser, ContextUser } from "@budibase/types" +import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types" import { gridSocket } from "./index" import { clearLock } from "../utilities/redis" import { Socket } from "socket.io" @@ -15,24 +15,30 @@ export default class BuilderSocket extends BaseSocket { } async onConnect(socket: Socket) { - // Join a room for this app - await this.joinRoom(socket, socket.data.appId) + // Initial identification of selected app + socket.on(BuilderSocketEvents.SelectApp, async (appId, callback) => { + await this.joinRoom(socket, appId) + + // Reply with all users in current room + const sessions = await this.getRoomSessions(appId) + callback({ users: sessions }) + }) } async onDisconnect(socket: Socket) { // Remove app lock from this user if they have no other connections try { // @ts-ignore - const user: SocketUser = socket.data - const { _id, sessionId, appId } = user - const users = await this.getSocketUsers(user.room) - const hasOtherConnection = users.some(otherUser => { - return _id === otherUser._id && sessionId !== otherUser.sessionId + const session: SocketSession = socket.data + const { _id, sessionId, room } = session + const sessions = await this.getRoomSessions(room) + const hasOtherSession = sessions.some(otherSession => { + return _id === otherSession._id && sessionId !== otherSession.sessionId }) - if (!hasOtherConnection) { + if (!hasOtherSession && room) { // @ts-ignore const user: ContextUser = { _id: socket.data._id } - await clearLock(appId, user) + await clearLock(room, user) } } catch (e) { // This is fine, just means this user didn't hold the lock diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts index c3c460b19d..833c7b3348 100644 --- a/packages/server/src/websockets/grid.ts +++ b/packages/server/src/websockets/grid.ts @@ -18,9 +18,9 @@ export default class GridSocket extends BaseSocket { socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => { await this.joinRoom(socket, tableId) - // Reply with all users in current roome - const users = await this.getSocketUsers(tableId) - callback({ users }) + // Reply with all users in current room + const sessions = await this.getRoomSessions(tableId) + callback({ users: sessions }) }) // Handle users selecting a new cell diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts index 40ab42ab22..2ecff7a62c 100644 --- a/packages/server/src/websockets/websocket.ts +++ b/packages/server/src/websockets/websocket.ts @@ -3,21 +3,18 @@ import http from "http" import Koa from "koa" import Cookies from "cookies" import { userAgent } from "koa-useragent" -import { auth } from "@budibase/backend-core" +import { auth, redis } from "@budibase/backend-core" import currentApp from "../middleware/currentapp" import { createAdapter } from "@socket.io/redis-adapter" import { Socket } from "socket.io" -import { - getSocketPubSubClients, - getSocketUsers, - setSocketUsers, -} from "../utilities/redis" -import { SocketEvents } from "@budibase/shared-core" -import { SocketUser } from "@budibase/types" +import { getSocketPubSubClients } from "../utilities/redis" +import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core" +import { SocketSession } from "@budibase/types" export class BaseSocket { io: Server path: string + redisClient?: redis.Client constructor( app: Koa, @@ -79,7 +76,6 @@ export class BaseSocket { email, firstName, lastName, - appId: ctx.appId, sessionId: socket.id, } next() @@ -91,50 +87,116 @@ export class BaseSocket { } }) - // Instantiate redis adapter - const { pub, sub } = getSocketPubSubClients() - const opts = { key: `socket.io-${path}` } - this.io.adapter(createAdapter(pub, sub, opts)) + // Initialise redis before handling connections + this.initialise().then(() => { + this.io.on("connection", async socket => { + // Add built in handler to allow fetching all other users in this room + socket.on(SocketEvents.GetUsers, async (payload, callback) => { + const sessions = await this.getRoomSessions(socket.data.room) + callback({ users: sessions }) + }) - // Handle user connections and disconnections - this.io.on("connection", async socket => { - // Add built in handler to allow fetching all other users in this room - socket.on(SocketEvents.GetUsers, async (payload, callback) => { - let users - if (socket.data.room) { - users = await this.getSocketUsers(socket.data.room) - } - callback({ users }) - }) + // Add built in handler for heartbeats + socket.on(SocketEvents.Heartbeat, async () => { + console.log(socket.data.email, "heartbeat received") + await this.extendSessionTTL(socket.data.sessionId) + }) - // Add handlers for this socket - await this.onConnect(socket) + // Add early disconnection handler to clean up and leave room + socket.on("disconnect", async () => { + // Run any custom disconnection logic before we leave the room, + // so that we have access to their room etc before disconnection + await this.onDisconnect(socket) - // Add early disconnection handler to clean up and leave room - socket.on("disconnect", async () => { - // Leave the current room when the user disconnects if we're in one - if (socket.data.room) { + // Leave the current room when the user disconnects if we're in one await this.leaveRoom(socket) - } + }) - // Run any other disconnection logic - await this.onDisconnect(socket) + // Add handlers for this socket + await this.onConnect(socket) }) }) } + async initialise() { + // Instantiate redis adapter. + // We use a fully qualified key name here as this bypasses the normal + // redis client#s key prefixing. + const { pub, sub } = getSocketPubSubClients() + const opts = { + key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`, + } + this.io.adapter(createAdapter(pub, sub, opts)) + + // Fetch redis client + this.redisClient = await redis.clients.getSocketClient() + } + + // Gets the redis key for a certain session ID + getSessionKey(sessionId: string) { + return `${this.path}-session:${sessionId}` + } + + // Gets the redis key for certain room name + getRoomKey(room: string) { + return `${this.path}-room:${room}` + } + + async extendSessionTTL(sessionId: string) { + const key = this.getSessionKey(sessionId) + await this.redisClient?.setExpiry(key, SocketSessionTTL) + } + + // Gets an array of all redis keys of users inside a certain room + async getRoomSessionKeys(room: string): Promise { + const keys = await this.redisClient?.get(this.getRoomKey(room)) + return keys || [] + } + + // Sets the list of redis keys for users inside a certain room. + // There is no TTL on the actual room key map itself. + async setRoomSessionKeys(room: string, keys: string[]) { + await this.redisClient?.store(this.getRoomKey(room), keys) + } + // Gets a list of all users inside a certain room - async getSocketUsers(room?: string): Promise { + async getRoomSessions(room?: string): Promise { if (room) { - const users = await getSocketUsers(this.path, room) - return users || [] + const keys = await this.getRoomSessionKeys(room) + const sessions = await this.redisClient?.bulkGet(keys) + return Object.values(sessions || {}) } else { return [] } } + // Detects keys which have been pruned from redis due to TTL expiry in a certain + // room and broadcasts disconnection messages to ensure clients are aware + async pruneRoom(room: string) { + const keys = await this.getRoomSessionKeys(room) + const keysExist = await Promise.all( + keys.map(key => this.redisClient?.exists(key)) + ) + const prunedKeys = keys.filter((key, idx) => { + if (!keysExist[idx]) { + console.log("pruning key", keys[idx]) + return false + } + return true + }) + + // Store new pruned keys + await this.setRoomSessionKeys(room, prunedKeys) + } + // Adds a user to a certain room async joinRoom(socket: Socket, room: string) { + if (!room) { + return + } + // Prune room before joining + await this.pruneRoom(room) + // Check if we're already in a room, as we'll need to leave if we are before we // can join a different room const oldRoom = socket.data.room @@ -148,40 +210,43 @@ export class BaseSocket { socket.data.room = room } + // Store in redis // @ts-ignore - let user: SocketUser = socket.data - let users = await this.getSocketUsers(room) + let user: SocketSession = socket.data + const key = this.getSessionKey(user.sessionId) + await this.redisClient?.store(key, user, SocketSessionTTL) + const roomKeys = await this.getRoomSessionKeys(room) + if (!roomKeys.includes(key)) { + await this.setRoomSessionKeys(room, [...roomKeys, key]) + } - // Store this socket in redis - if (!users?.length) { - users = [] - } - const index = users.findIndex(x => x.sessionId === socket.data.sessionId) - if (index === -1) { - users.push(user) - } else { - users[index] = user - } - await setSocketUsers(this.path, room, users) + // Notify other users socket.to(room).emit(SocketEvents.UserUpdate, user) } // Disconnects a socket from its current room async leaveRoom(socket: Socket) { // @ts-ignore - let user: SocketUser = socket.data + let user: SocketSession = socket.data const { room, sessionId } = user if (!room) { return } + + // Leave room socket.leave(room) socket.data.room = undefined - let users = await this.getSocketUsers(room) + // Delete from redis + const key = this.getSessionKey(sessionId) + await this.redisClient?.delete(key) + const roomKeys = await this.getRoomSessionKeys(room) + await this.setRoomSessionKeys( + room, + roomKeys.filter(k => k !== key) + ) - // Remove this socket from redis - users = users.filter(user => user.sessionId !== sessionId) - await setSocketUsers(this.path, room, users) + // Notify other users socket.to(room).emit(SocketEvents.UserDisconnect, user) } diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts index 62603ee941..9c92755730 100644 --- a/packages/shared-core/src/constants.ts +++ b/packages/shared-core/src/constants.ts @@ -71,7 +71,8 @@ export const SqlNumberTypeRangeMap = { export const SocketEvents = { UserUpdate: "UserUpdate", UserDisconnect: "UserDisconnect", - GetUsers: "GetUsers" + GetUsers: "GetUsers", + Heartbeat: "Heartbeat" } export const GridSocketEvents = { @@ -82,6 +83,9 @@ export const GridSocketEvents = { } export const BuilderSocketEvents = { + SelectApp: "SelectApp", TableChange: "TableChange", DatasourceChange: "DatasourceChange" -} \ No newline at end of file +} + +export const SocketSessionTTL = 60 \ No newline at end of file diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts index 822c4294e0..0238478ed8 100644 --- a/packages/types/src/sdk/websocket.ts +++ b/packages/types/src/sdk/websocket.ts @@ -1,9 +1,8 @@ -export interface SocketUser { +export interface SocketSession { _id: string, email: string, firstName?: string, lastName?: string, - appId: string, sessionId: string, room?: string } \ No newline at end of file