Fully rewrite websocket redis integration to use individual keys per session, enable TTLs on sesisons, prune sessions when users connect and add a heartbeat to sockets
This commit is contained in:
parent
d926650232
commit
05e34076f7
|
@ -6,7 +6,8 @@ let userClient: Client,
|
|||
appClient: Client,
|
||||
cacheClient: Client,
|
||||
writethroughClient: Client,
|
||||
lockClient: Client
|
||||
lockClient: Client,
|
||||
socketClient: Client
|
||||
|
||||
async function init() {
|
||||
userClient = await new Client(utils.Databases.USER_CACHE).init()
|
||||
|
@ -15,6 +16,7 @@ async function init() {
|
|||
cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init()
|
||||
lockClient = await new Client(utils.Databases.LOCKS).init()
|
||||
writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init()
|
||||
socketClient = await new Client(utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO).init()
|
||||
}
|
||||
|
||||
export async function shutdown() {
|
||||
|
@ -24,6 +26,7 @@ export async function shutdown() {
|
|||
if (cacheClient) await cacheClient.finish()
|
||||
if (writethroughClient) await writethroughClient.finish()
|
||||
if (lockClient) await lockClient.finish()
|
||||
if (socketClient) await socketClient.finish()
|
||||
}
|
||||
|
||||
process.on("exit", async () => {
|
||||
|
@ -71,3 +74,10 @@ export async function getLockClient() {
|
|||
}
|
||||
return lockClient
|
||||
}
|
||||
|
||||
export async function getSocketClient() {
|
||||
if (!socketClient) {
|
||||
await init()
|
||||
}
|
||||
return socketClient
|
||||
}
|
|
@ -212,6 +212,11 @@ class RedisWrapper {
|
|||
return this.getClient().keys(addDbPrefix(db, pattern))
|
||||
}
|
||||
|
||||
async exists(key: string) {
|
||||
const db = this._db
|
||||
return await this.getClient().exists(addDbPrefix(db, key))
|
||||
}
|
||||
|
||||
async get(key: string) {
|
||||
const db = this._db
|
||||
let response = await this.getClient().get(addDbPrefix(db, key))
|
||||
|
|
|
@ -118,7 +118,7 @@ export const getFrontendStore = () => {
|
|||
},
|
||||
initialise: async pkg => {
|
||||
const { layouts, screens, application, clientLibPath, hasLock } = pkg
|
||||
websocket = createBuilderWebsocket()
|
||||
websocket = createBuilderWebsocket(application.appId)
|
||||
await store.actions.components.refreshDefinitions(application.appId)
|
||||
|
||||
// Reset store state
|
||||
|
|
|
@ -3,12 +3,12 @@ import { userStore } from "builderStore"
|
|||
import { datasources, tables } from "stores/backend"
|
||||
import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core"
|
||||
|
||||
export const createBuilderWebsocket = () => {
|
||||
export const createBuilderWebsocket = appId => {
|
||||
const socket = createWebsocket("/socket/builder")
|
||||
|
||||
// Built-in events
|
||||
socket.on("connect", () => {
|
||||
socket.emit(SocketEvents.GetUsers, null, response => {
|
||||
socket.emit(BuilderSocketEvents.SelectApp, appId, response => {
|
||||
userStore.actions.init(response.users)
|
||||
})
|
||||
})
|
||||
|
@ -30,11 +30,10 @@ export const createBuilderWebsocket = () => {
|
|||
datasources.replaceDatasource(id, datasource)
|
||||
})
|
||||
|
||||
return {
|
||||
...socket,
|
||||
disconnect: () => {
|
||||
socket?.disconnect()
|
||||
// Clean up user store on disconnect
|
||||
socket.on("disconnect", () => {
|
||||
userStore.actions.reset()
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
return socket
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import { io } from "socket.io-client"
|
||||
import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core"
|
||||
|
||||
export const createWebsocket = path => {
|
||||
export const createWebsocket = (path, heartbeat = true) => {
|
||||
if (!path) {
|
||||
throw "A websocket path must be provided"
|
||||
}
|
||||
|
@ -10,7 +11,7 @@ export const createWebsocket = path => {
|
|||
const proto = tls ? "wss:" : "ws:"
|
||||
const host = location.hostname
|
||||
const port = location.port || (tls ? 443 : 80)
|
||||
return io(`${proto}//${host}:${port}`, {
|
||||
const socket = io(`${proto}//${host}:${port}`, {
|
||||
path,
|
||||
// Cap reconnection attempts to 3 (total of 15 seconds before giving up)
|
||||
reconnectionAttempts: 3,
|
||||
|
@ -21,6 +22,22 @@ export const createWebsocket = path => {
|
|||
timeout: 4000,
|
||||
// Disable polling and rely on websocket only, as HTTP transport
|
||||
// will only work with sticky sessions which we don't have
|
||||
// transports: ["websocket"],
|
||||
transports: ["websocket"],
|
||||
})
|
||||
|
||||
// Set up a heartbeat that's half of the session TTL
|
||||
let interval
|
||||
if (heartbeat) {
|
||||
interval = setInterval(() => {
|
||||
console.log("Sending heartbeat")
|
||||
socket.emit(SocketEvents.Heartbeat)
|
||||
}, SocketSessionTTL * 500)
|
||||
}
|
||||
|
||||
socket.on("disconnect", () => {
|
||||
console.log("clear interval")
|
||||
clearInterval(interval)
|
||||
})
|
||||
|
||||
return socket
|
||||
}
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
import { redis } from "@budibase/backend-core"
|
||||
import { getGlobalIDFromUserMetadataID } from "../db/utils"
|
||||
import { ContextUser, SocketUser } from "@budibase/types"
|
||||
import { ContextUser } from "@budibase/types"
|
||||
|
||||
const APP_DEV_LOCK_SECONDS = 600
|
||||
const AUTOMATION_TEST_FLAG_SECONDS = 60
|
||||
let devAppClient: any, debounceClient: any, flagClient: any, socketClient: any
|
||||
let devAppClient: any, debounceClient: any, flagClient: any
|
||||
|
||||
// We need to maintain a duplicate client for socket.io pub/sub
|
||||
let socketClient: any
|
||||
let socketSubClient: any
|
||||
|
||||
// We init this as we want to keep the connection open all the time
|
||||
|
@ -15,13 +16,12 @@ export async function init() {
|
|||
devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS)
|
||||
debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE)
|
||||
flagClient = new redis.Client(redis.utils.Databases.FLAGS)
|
||||
socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO, redis.utils.SelectableDatabase.SOCKET_IO)
|
||||
await devAppClient.init()
|
||||
await debounceClient.init()
|
||||
await flagClient.init()
|
||||
await socketClient.init()
|
||||
|
||||
// Duplicate the socket client for pub/sub
|
||||
socketClient = await redis.clients.getSocketClient()
|
||||
socketSubClient = socketClient.getClient().duplicate()
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,6 @@ export async function shutdown() {
|
|||
if (devAppClient) await devAppClient.finish()
|
||||
if (debounceClient) await debounceClient.finish()
|
||||
if (flagClient) await flagClient.finish()
|
||||
if (socketClient) await socketClient.finish()
|
||||
if (socketSubClient) socketSubClient.disconnect()
|
||||
// shutdown core clients
|
||||
await redis.clients.shutdown()
|
||||
|
@ -98,14 +97,6 @@ export async function clearTestFlag(id: string) {
|
|||
await devAppClient.delete(id)
|
||||
}
|
||||
|
||||
export async function getSocketUsers(path: string, room: string) {
|
||||
return await socketClient.get(`${path}:${room}`)
|
||||
}
|
||||
|
||||
export async function setSocketUsers(path: string, room: string, users: SocketUser[]) {
|
||||
await socketClient.store(`${path}:${room}`, users)
|
||||
}
|
||||
|
||||
export function getSocketPubSubClients() {
|
||||
return {
|
||||
pub: socketClient.getClient(),
|
||||
|
|
|
@ -3,7 +3,7 @@ import { BaseSocket } from "./websocket"
|
|||
import { permissions } from "@budibase/backend-core"
|
||||
import http from "http"
|
||||
import Koa from "koa"
|
||||
import { Datasource, Table, SocketUser, ContextUser } from "@budibase/types"
|
||||
import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types"
|
||||
import { gridSocket } from "./index"
|
||||
import { clearLock } from "../utilities/redis"
|
||||
import { Socket } from "socket.io"
|
||||
|
@ -15,24 +15,30 @@ export default class BuilderSocket extends BaseSocket {
|
|||
}
|
||||
|
||||
async onConnect(socket: Socket) {
|
||||
// Join a room for this app
|
||||
await this.joinRoom(socket, socket.data.appId)
|
||||
// Initial identification of selected app
|
||||
socket.on(BuilderSocketEvents.SelectApp, async (appId, callback) => {
|
||||
await this.joinRoom(socket, appId)
|
||||
|
||||
// Reply with all users in current room
|
||||
const sessions = await this.getRoomSessions(appId)
|
||||
callback({ users: sessions })
|
||||
})
|
||||
}
|
||||
|
||||
async onDisconnect(socket: Socket) {
|
||||
// Remove app lock from this user if they have no other connections
|
||||
try {
|
||||
// @ts-ignore
|
||||
const user: SocketUser = socket.data
|
||||
const { _id, sessionId, appId } = user
|
||||
const users = await this.getSocketUsers(user.room)
|
||||
const hasOtherConnection = users.some(otherUser => {
|
||||
return _id === otherUser._id && sessionId !== otherUser.sessionId
|
||||
const session: SocketSession = socket.data
|
||||
const { _id, sessionId, room } = session
|
||||
const sessions = await this.getRoomSessions(room)
|
||||
const hasOtherSession = sessions.some(otherSession => {
|
||||
return _id === otherSession._id && sessionId !== otherSession.sessionId
|
||||
})
|
||||
if (!hasOtherConnection) {
|
||||
if (!hasOtherSession && room) {
|
||||
// @ts-ignore
|
||||
const user: ContextUser = { _id: socket.data._id }
|
||||
await clearLock(appId, user)
|
||||
await clearLock(room, user)
|
||||
}
|
||||
} catch (e) {
|
||||
// This is fine, just means this user didn't hold the lock
|
||||
|
|
|
@ -18,9 +18,9 @@ export default class GridSocket extends BaseSocket {
|
|||
socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => {
|
||||
await this.joinRoom(socket, tableId)
|
||||
|
||||
// Reply with all users in current roome
|
||||
const users = await this.getSocketUsers(tableId)
|
||||
callback({ users })
|
||||
// Reply with all users in current room
|
||||
const sessions = await this.getRoomSessions(tableId)
|
||||
callback({ users: sessions })
|
||||
})
|
||||
|
||||
// Handle users selecting a new cell
|
||||
|
|
|
@ -3,21 +3,18 @@ import http from "http"
|
|||
import Koa from "koa"
|
||||
import Cookies from "cookies"
|
||||
import { userAgent } from "koa-useragent"
|
||||
import { auth } from "@budibase/backend-core"
|
||||
import { auth, redis } from "@budibase/backend-core"
|
||||
import currentApp from "../middleware/currentapp"
|
||||
import { createAdapter } from "@socket.io/redis-adapter"
|
||||
import { Socket } from "socket.io"
|
||||
import {
|
||||
getSocketPubSubClients,
|
||||
getSocketUsers,
|
||||
setSocketUsers,
|
||||
} from "../utilities/redis"
|
||||
import { SocketEvents } from "@budibase/shared-core"
|
||||
import { SocketUser } from "@budibase/types"
|
||||
import { getSocketPubSubClients } from "../utilities/redis"
|
||||
import { SocketEvents, SocketSessionTTL } from "@budibase/shared-core"
|
||||
import { SocketSession } from "@budibase/types"
|
||||
|
||||
export class BaseSocket {
|
||||
io: Server
|
||||
path: string
|
||||
redisClient?: redis.Client
|
||||
|
||||
constructor(
|
||||
app: Koa,
|
||||
|
@ -79,7 +76,6 @@ export class BaseSocket {
|
|||
email,
|
||||
firstName,
|
||||
lastName,
|
||||
appId: ctx.appId,
|
||||
sessionId: socket.id,
|
||||
}
|
||||
next()
|
||||
|
@ -91,50 +87,116 @@ export class BaseSocket {
|
|||
}
|
||||
})
|
||||
|
||||
// Instantiate redis adapter
|
||||
const { pub, sub } = getSocketPubSubClients()
|
||||
const opts = { key: `socket.io-${path}` }
|
||||
this.io.adapter(createAdapter(pub, sub, opts))
|
||||
|
||||
// Handle user connections and disconnections
|
||||
// Initialise redis before handling connections
|
||||
this.initialise().then(() => {
|
||||
this.io.on("connection", async socket => {
|
||||
// Add built in handler to allow fetching all other users in this room
|
||||
socket.on(SocketEvents.GetUsers, async (payload, callback) => {
|
||||
let users
|
||||
if (socket.data.room) {
|
||||
users = await this.getSocketUsers(socket.data.room)
|
||||
}
|
||||
callback({ users })
|
||||
const sessions = await this.getRoomSessions(socket.data.room)
|
||||
callback({ users: sessions })
|
||||
})
|
||||
|
||||
// Add built in handler for heartbeats
|
||||
socket.on(SocketEvents.Heartbeat, async () => {
|
||||
console.log(socket.data.email, "heartbeat received")
|
||||
await this.extendSessionTTL(socket.data.sessionId)
|
||||
})
|
||||
|
||||
// Add early disconnection handler to clean up and leave room
|
||||
socket.on("disconnect", async () => {
|
||||
// Run any custom disconnection logic before we leave the room,
|
||||
// so that we have access to their room etc before disconnection
|
||||
await this.onDisconnect(socket)
|
||||
|
||||
// Leave the current room when the user disconnects if we're in one
|
||||
await this.leaveRoom(socket)
|
||||
})
|
||||
|
||||
// Add handlers for this socket
|
||||
await this.onConnect(socket)
|
||||
|
||||
// Add early disconnection handler to clean up and leave room
|
||||
socket.on("disconnect", async () => {
|
||||
// Leave the current room when the user disconnects if we're in one
|
||||
if (socket.data.room) {
|
||||
await this.leaveRoom(socket)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// Run any other disconnection logic
|
||||
await this.onDisconnect(socket)
|
||||
})
|
||||
})
|
||||
async initialise() {
|
||||
// Instantiate redis adapter.
|
||||
// We use a fully qualified key name here as this bypasses the normal
|
||||
// redis client#s key prefixing.
|
||||
const { pub, sub } = getSocketPubSubClients()
|
||||
const opts = {
|
||||
key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`,
|
||||
}
|
||||
this.io.adapter(createAdapter(pub, sub, opts))
|
||||
|
||||
// Fetch redis client
|
||||
this.redisClient = await redis.clients.getSocketClient()
|
||||
}
|
||||
|
||||
// Gets the redis key for a certain session ID
|
||||
getSessionKey(sessionId: string) {
|
||||
return `${this.path}-session:${sessionId}`
|
||||
}
|
||||
|
||||
// Gets the redis key for certain room name
|
||||
getRoomKey(room: string) {
|
||||
return `${this.path}-room:${room}`
|
||||
}
|
||||
|
||||
async extendSessionTTL(sessionId: string) {
|
||||
const key = this.getSessionKey(sessionId)
|
||||
await this.redisClient?.setExpiry(key, SocketSessionTTL)
|
||||
}
|
||||
|
||||
// Gets an array of all redis keys of users inside a certain room
|
||||
async getRoomSessionKeys(room: string): Promise<string[]> {
|
||||
const keys = await this.redisClient?.get(this.getRoomKey(room))
|
||||
return keys || []
|
||||
}
|
||||
|
||||
// Sets the list of redis keys for users inside a certain room.
|
||||
// There is no TTL on the actual room key map itself.
|
||||
async setRoomSessionKeys(room: string, keys: string[]) {
|
||||
await this.redisClient?.store(this.getRoomKey(room), keys)
|
||||
}
|
||||
|
||||
// Gets a list of all users inside a certain room
|
||||
async getSocketUsers(room?: string): Promise<SocketUser[]> {
|
||||
async getRoomSessions(room?: string): Promise<SocketSession[]> {
|
||||
if (room) {
|
||||
const users = await getSocketUsers(this.path, room)
|
||||
return users || []
|
||||
const keys = await this.getRoomSessionKeys(room)
|
||||
const sessions = await this.redisClient?.bulkGet(keys)
|
||||
return Object.values(sessions || {})
|
||||
} else {
|
||||
return []
|
||||
}
|
||||
}
|
||||
|
||||
// Detects keys which have been pruned from redis due to TTL expiry in a certain
|
||||
// room and broadcasts disconnection messages to ensure clients are aware
|
||||
async pruneRoom(room: string) {
|
||||
const keys = await this.getRoomSessionKeys(room)
|
||||
const keysExist = await Promise.all(
|
||||
keys.map(key => this.redisClient?.exists(key))
|
||||
)
|
||||
const prunedKeys = keys.filter((key, idx) => {
|
||||
if (!keysExist[idx]) {
|
||||
console.log("pruning key", keys[idx])
|
||||
return false
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
// Store new pruned keys
|
||||
await this.setRoomSessionKeys(room, prunedKeys)
|
||||
}
|
||||
|
||||
// Adds a user to a certain room
|
||||
async joinRoom(socket: Socket, room: string) {
|
||||
if (!room) {
|
||||
return
|
||||
}
|
||||
// Prune room before joining
|
||||
await this.pruneRoom(room)
|
||||
|
||||
// Check if we're already in a room, as we'll need to leave if we are before we
|
||||
// can join a different room
|
||||
const oldRoom = socket.data.room
|
||||
|
@ -148,40 +210,43 @@ export class BaseSocket {
|
|||
socket.data.room = room
|
||||
}
|
||||
|
||||
// Store in redis
|
||||
// @ts-ignore
|
||||
let user: SocketUser = socket.data
|
||||
let users = await this.getSocketUsers(room)
|
||||
let user: SocketSession = socket.data
|
||||
const key = this.getSessionKey(user.sessionId)
|
||||
await this.redisClient?.store(key, user, SocketSessionTTL)
|
||||
const roomKeys = await this.getRoomSessionKeys(room)
|
||||
if (!roomKeys.includes(key)) {
|
||||
await this.setRoomSessionKeys(room, [...roomKeys, key])
|
||||
}
|
||||
|
||||
// Store this socket in redis
|
||||
if (!users?.length) {
|
||||
users = []
|
||||
}
|
||||
const index = users.findIndex(x => x.sessionId === socket.data.sessionId)
|
||||
if (index === -1) {
|
||||
users.push(user)
|
||||
} else {
|
||||
users[index] = user
|
||||
}
|
||||
await setSocketUsers(this.path, room, users)
|
||||
// Notify other users
|
||||
socket.to(room).emit(SocketEvents.UserUpdate, user)
|
||||
}
|
||||
|
||||
// Disconnects a socket from its current room
|
||||
async leaveRoom(socket: Socket) {
|
||||
// @ts-ignore
|
||||
let user: SocketUser = socket.data
|
||||
let user: SocketSession = socket.data
|
||||
const { room, sessionId } = user
|
||||
if (!room) {
|
||||
return
|
||||
}
|
||||
|
||||
// Leave room
|
||||
socket.leave(room)
|
||||
socket.data.room = undefined
|
||||
|
||||
let users = await this.getSocketUsers(room)
|
||||
// Delete from redis
|
||||
const key = this.getSessionKey(sessionId)
|
||||
await this.redisClient?.delete(key)
|
||||
const roomKeys = await this.getRoomSessionKeys(room)
|
||||
await this.setRoomSessionKeys(
|
||||
room,
|
||||
roomKeys.filter(k => k !== key)
|
||||
)
|
||||
|
||||
// Remove this socket from redis
|
||||
users = users.filter(user => user.sessionId !== sessionId)
|
||||
await setSocketUsers(this.path, room, users)
|
||||
// Notify other users
|
||||
socket.to(room).emit(SocketEvents.UserDisconnect, user)
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,8 @@ export const SqlNumberTypeRangeMap = {
|
|||
export const SocketEvents = {
|
||||
UserUpdate: "UserUpdate",
|
||||
UserDisconnect: "UserDisconnect",
|
||||
GetUsers: "GetUsers"
|
||||
GetUsers: "GetUsers",
|
||||
Heartbeat: "Heartbeat"
|
||||
}
|
||||
|
||||
export const GridSocketEvents = {
|
||||
|
@ -82,6 +83,9 @@ export const GridSocketEvents = {
|
|||
}
|
||||
|
||||
export const BuilderSocketEvents = {
|
||||
SelectApp: "SelectApp",
|
||||
TableChange: "TableChange",
|
||||
DatasourceChange: "DatasourceChange"
|
||||
}
|
||||
|
||||
export const SocketSessionTTL = 60
|
|
@ -1,9 +1,8 @@
|
|||
export interface SocketUser {
|
||||
export interface SocketSession {
|
||||
_id: string,
|
||||
email: string,
|
||||
firstName?: string,
|
||||
lastName?: string,
|
||||
appId: string,
|
||||
sessionId: string,
|
||||
room?: string
|
||||
}
|
Loading…
Reference in New Issue