Store all socket users in redis to allow all server instances to be aware of all users

This commit is contained in:
Andrew Kingston 2023-05-31 10:21:50 +01:00
parent 3f88280867
commit d3fdb52bf5
12 changed files with 243 additions and 110 deletions

View File

@ -1,13 +1,14 @@
import { createWebsocket } from "@budibase/frontend-core"
import { userStore } from "builderStore"
import { datasources, tables } from "stores/backend"
import { SocketEvents, BuilderSocketEvents } from "@budibase/shared-core"
export const createBuilderWebsocket = () => {
const socket = createWebsocket("/socket/builder")
// Connection events
// Built-in events
socket.on("connect", () => {
socket.emit("get-users", null, response => {
socket.emit(SocketEvents.GetUsers, null, response => {
userStore.actions.init(response.users)
})
})
@ -16,16 +17,16 @@ export const createBuilderWebsocket = () => {
})
// User events
socket.on("user-update", userStore.actions.updateUser)
socket.on("user-disconnect", userStore.actions.removeUser)
socket.on(SocketEvents.UserUpdate, userStore.actions.updateUser)
socket.on(SocketEvents.UserDisconnect, userStore.actions.removeUser)
// Table events
socket.on("table-change", ({ id, table }) => {
socket.on(BuilderSocketEvents.TableChange, ({ id, table }) => {
tables.replaceTable(id, table)
})
// Datasource events
socket.on("datasource-change", ({ id, datasource }) => {
socket.on(BuilderSocketEvents.DatasourceChange, ({ id, datasource }) => {
datasources.replaceDatasource(id, datasource)
})

View File

@ -62,6 +62,7 @@
stripeRows,
})
// Build up context
let context = {
API: API || createAPIClient(),

View File

@ -1,5 +1,6 @@
import { get } from "svelte/store"
import { createWebsocket } from "../../../utils"
import { SocketEvents, GridSocketEvents } from "@budibase/shared-core"
export const createGridWebsocket = context => {
const { rows, tableId, users, focusedCellId, table } = context
@ -10,13 +11,13 @@ export const createGridWebsocket = context => {
return
}
// Identify which table we are editing
socket.emit("select-table", tableId, response => {
socket.emit(GridSocketEvents.SelectTable, tableId, response => {
// handle initial connection info
users.set(response.users)
})
}
// Connection events
// Built-in events
socket.on("connect", () => {
connectToTable(get(tableId))
})
@ -25,25 +26,25 @@ export const createGridWebsocket = context => {
})
// User events
socket.on("user-update", user => {
socket.on(SocketEvents.UserUpdate, user => {
users.actions.updateUser(user)
})
socket.on("user-disconnect", user => {
socket.on(SocketEvents.UserDisconnect, user => {
users.actions.removeUser(user)
})
// Row events
socket.on("row-change", async data => {
socket.on(GridSocketEvents.RowChange, async data => {
if (data.id) {
rows.actions.replaceRow(data.id, data.row)
} else if (data.row.id) {
// Handle users table edge case
// Handle users table edge cased
await rows.actions.refreshRow(data.row.id)
}
})
// Table events
socket.on("table-change", data => {
socket.on(GridSocketEvents.TableChange, data => {
// Only update table if one exists. If the table was deleted then we don't
// want to know - let the builder navigate away
if (data.table) {
@ -56,7 +57,7 @@ export const createGridWebsocket = context => {
// Notify selected cell changes
focusedCellId.subscribe($focusedCellId => {
socket.emit("select-cell", $focusedCellId)
socket.emit(GridSocketEvents.SelectCell, $focusedCellId)
})
return () => socket?.disconnect()

View File

@ -21,6 +21,6 @@ export const createWebsocket = path => {
timeout: 4000,
// Disable polling and rely on websocket only, as HTTP transport
// will only work with sticky sessions which we don't have
transports: ["websocket"],
// transports: ["websocket"],
})
}

View File

@ -1,6 +1,6 @@
import { redis } from "@budibase/backend-core"
import { getGlobalIDFromUserMetadataID } from "../db/utils"
import { ContextUser } from "@budibase/types"
import { ContextUser, SocketUser } from "@budibase/types"
const APP_DEV_LOCK_SECONDS = 600
const AUTOMATION_TEST_FLAG_SECONDS = 60
@ -26,6 +26,7 @@ export async function init() {
}
export async function shutdown() {
console.log("REDIS SHUTDOWN")
if (devAppClient) await devAppClient.finish()
if (debounceClient) await debounceClient.finish()
if (flagClient) await flagClient.finish()
@ -97,6 +98,14 @@ export async function clearTestFlag(id: string) {
await devAppClient.delete(id)
}
export async function getSocketUsers(path: string, room: string) {
return await socketClient.get(`${path}:${room}`)
}
export async function setSocketUsers(path: string, room: string, users: SocketUser[]) {
await socketClient.store(`${path}:${room}`, users)
}
export function getSocketPubSubClients() {
return {
pub: socketClient.getClient(),

View File

@ -1,69 +1,70 @@
import authorized from "../middleware/authorized"
import Socket from "./websocket"
import { BaseSocket } from "./websocket"
import { permissions } from "@budibase/backend-core"
import http from "http"
import Koa from "koa"
import { Datasource, Table } from "@budibase/types"
import { Datasource, Table, SocketUser, ContextUser } from "@budibase/types"
import { gridSocket } from "./index"
import { clearLock } from "../utilities/redis"
import { Socket } from "socket.io"
import { BuilderSocketEvents } from "@budibase/shared-core"
export default class BuilderSocket extends Socket {
export default class BuilderSocket extends BaseSocket {
constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/builder", [authorized(permissions.BUILDER)])
}
this.io.on("connection", socket => {
async onConnect(socket: Socket) {
// Join a room for this app
const user = socket.data.user
const appId = socket.data.appId
socket.join(appId)
socket.to(appId).emit("user-update", user)
// Initial identification of connected spreadsheet
socket.on("get-users", async (payload, callback) => {
const sockets = await this.io.in(appId).fetchSockets()
callback({
users: sockets.map(socket => socket.data.user),
})
})
// Disconnection cleanup
socket.on("disconnect", async () => {
socket.to(appId).emit("user-disconnect", user)
await this.joinRoom(socket, socket.data.appId)
}
async onDisconnect(socket: Socket) {
// Remove app lock from this user if they have no other connections
try {
const sockets = await this.io.in(appId).fetchSockets()
const hasOtherConnection = sockets.some(socket => {
const { _id, sessionId } = socket.data.user
return _id === user._id && sessionId !== user.sessionId
// @ts-ignore
const user: SocketUser = socket.data
const { _id, sessionId, appId } = user
const users = await this.getSocketUsers(user.room)
const hasOtherConnection = users.some(otherUser => {
return _id === otherUser._id && sessionId !== otherUser.sessionId
})
if (!hasOtherConnection) {
// @ts-ignore
const user: ContextUser = { _id: socket.data._id }
await clearLock(appId, user)
}
} catch (e) {
// This is fine, just means this user didn't hold the lock
}
})
})
}
emitTableUpdate(ctx: any, table: Table) {
this.io.in(ctx.appId).emit("table-change", { id: table._id, table })
this.io
.in(ctx.appId)
.emit(BuilderSocketEvents.TableChange, { id: table._id, table })
gridSocket.emitTableUpdate(table)
}
emitTableDeletion(ctx: any, id: string) {
this.io.in(ctx.appId).emit("table-change", { id, table: null })
this.io
.in(ctx.appId)
.emit(BuilderSocketEvents.TableChange, { id, table: null })
gridSocket.emitTableDeletion(id)
}
emitDatasourceUpdate(ctx: any, datasource: Datasource) {
this.io
.in(ctx.appId)
.emit("datasource-change", { id: datasource._id, datasource })
.emit(BuilderSocketEvents.DatasourceChange, {
id: datasource._id,
datasource,
})
}
emitDatasourceDeletion(ctx: any, id: string) {
this.io.in(ctx.appId).emit("datasource-change", { id, datasource: null })
this.io
.in(ctx.appId)
.emit(BuilderSocketEvents.DatasourceChange, { id, datasource: null })
}
}

View File

@ -1,10 +1,10 @@
import Socket from "./websocket"
import { BaseSocket } from "./websocket"
import authorized from "../middleware/authorized"
import http from "http"
import Koa from "koa"
import { permissions } from "@budibase/backend-core"
export default class ClientAppWebsocket extends Socket {
export default class ClientAppWebsocket extends BaseSocket {
constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/client", [authorized(permissions.BUILDER)])
}

View File

@ -1,73 +1,51 @@
import authorized from "../middleware/authorized"
import Socket from "./websocket"
import { BaseSocket } from "./websocket"
import { permissions } from "@budibase/backend-core"
import http from "http"
import Koa from "koa"
import { getTableId } from "../api/controllers/row/utils"
import { Row, Table } from "@budibase/types"
import { Socket } from "socket.io"
import { GridSocketEvents } from "@budibase/shared-core"
export default class GridSocket extends Socket {
export default class GridSocket extends BaseSocket {
constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/grid", [authorized(permissions.BUILDER)])
this.io.on("connection", socket => {
const user = socket.data.user
// Socket state
let currentRoom: string
// Initial identification of connected spreadsheet
socket.on("select-table", async (tableId, callback) => {
// Leave current room
if (currentRoom) {
socket.to(currentRoom).emit("user-disconnect", user)
socket.leave(currentRoom)
}
// Join new room
currentRoom = tableId
socket.join(currentRoom)
socket.to(currentRoom).emit("user-update", user)
async onConnect(socket: Socket) {
// Initial identification of connected spreadsheet
socket.on(GridSocketEvents.SelectTable, async (tableId, callback) => {
await this.joinRoom(socket, tableId)
// Reply with all users in current room
const sockets = await this.io.in(currentRoom).fetchSockets()
callback({
users: sockets.map(socket => socket.data.user),
})
// Reply with all users in current roome
const users = await this.getSocketUsers(tableId)
callback({ users })
})
// Handle users selecting a new cell
socket.on("select-cell", cellId => {
socket.data.user.focusedCellId = cellId
if (currentRoom) {
socket.to(currentRoom).emit("user-update", user)
}
})
// Disconnection cleanup
socket.on("disconnect", () => {
if (currentRoom) {
socket.to(currentRoom).emit("user-disconnect", user)
}
})
socket.on(GridSocketEvents.SelectCell, cellId => {
this.updateUser(socket, { focusedCellId: cellId })
})
}
emitRowUpdate(ctx: any, row: Row) {
const tableId = getTableId(ctx)
this.io.in(tableId).emit("row-change", { id: row._id, row })
this.io.in(tableId).emit(GridSocketEvents.RowChange, { id: row._id, row })
}
emitRowDeletion(ctx: any, id: string) {
const tableId = getTableId(ctx)
this.io.in(tableId).emit("row-change", { id, row: null })
this.io.in(tableId).emit(GridSocketEvents.RowChange, { id, row: null })
}
emitTableUpdate(table: Table) {
this.io.in(table._id!).emit("table-change", { id: table._id, table })
this.io
.in(table._id!)
.emit(GridSocketEvents.TableChange, { id: table._id, table })
}
emitTableDeletion(id: string) {
this.io.in(id).emit("table-change", { id, table: null })
this.io.in(id).emit(GridSocketEvents.TableChange, { id, table: null })
}
}

View File

@ -6,11 +6,18 @@ import { userAgent } from "koa-useragent"
import { auth } from "@budibase/backend-core"
import currentApp from "../middleware/currentapp"
import { createAdapter } from "@socket.io/redis-adapter"
import { getSocketPubSubClients } from "../utilities/redis"
import uuid from "uuid"
import { Socket } from "socket.io"
import {
getSocketPubSubClients,
getSocketUsers,
setSocketUsers,
} from "../utilities/redis"
import { SocketEvents } from "@budibase/shared-core"
import { SocketUser } from "@budibase/types"
export default class Socket {
export class BaseSocket {
io: Server
path: string
constructor(
app: Koa,
@ -18,6 +25,7 @@ export default class Socket {
path: string = "/",
additionalMiddlewares?: any[]
) {
this.path = path
this.io = new Server(server, {
path,
})
@ -65,18 +73,15 @@ export default class Socket {
// Middlewares are finished
// Extract some data from our enriched koa context to persist
// as metadata for the socket
// Add user info, including a deterministic color and label
const { _id, email, firstName, lastName } = ctx.user
socket.data.user = {
socket.data = {
_id,
email,
firstName,
lastName,
sessionId: uuid.v4(),
appId: ctx.appId,
sessionId: socket.id,
}
// Add app ID to help split sockets into rooms
socket.data.appId = ctx.appId
next()
}
})
@ -90,6 +95,115 @@ export default class Socket {
const { pub, sub } = getSocketPubSubClients()
const opts = { key: `socket.io-${path}` }
this.io.adapter(createAdapter(pub, sub, opts))
// Handle user connections and disconnections
this.io.on("connection", async socket => {
// Add built in handler to allow fetching all other users in this room
socket.on(SocketEvents.GetUsers, async (payload, callback) => {
let users
if (socket.data.room) {
users = await this.getSocketUsers(socket.data.room)
}
callback({ users })
})
// Add handlers for this socket
await this.onConnect(socket)
// Add early disconnection handler to clean up and leave room
socket.on("disconnect", async () => {
// Leave the current room when the user disconnects if we're in one
if (socket.data.room) {
await this.leaveRoom(socket)
}
// Run any other disconnection logic
await this.onDisconnect(socket)
})
})
}
// Gets a list of all users inside a certain room
async getSocketUsers(room?: string): Promise<SocketUser[]> {
if (room) {
const users = await getSocketUsers(this.path, room)
return users || []
} else {
return []
}
}
// Adds a user to a certain room
async joinRoom(socket: Socket, room: string) {
// Check if we're already in a room, as we'll need to leave if we are before we
// can join a different room
const oldRoom = socket.data.room
if (oldRoom && oldRoom !== room) {
await this.leaveRoom(socket)
}
// Join new room
if (!oldRoom || oldRoom !== room) {
socket.join(room)
socket.data.room = room
}
// @ts-ignore
let user: SocketUser = socket.data
let users = await this.getSocketUsers(room)
// Store this socket in redis
if (!users?.length) {
users = []
}
const index = users.findIndex(x => x.sessionId === socket.data.sessionId)
if (index === -1) {
users.push(user)
} else {
users[index] = user
}
await setSocketUsers(this.path, room, users)
socket.to(room).emit(SocketEvents.UserUpdate, user)
}
// Disconnects a socket from its current room
async leaveRoom(socket: Socket) {
// @ts-ignore
let user: SocketUser = socket.data
const { room, sessionId } = user
if (!room) {
return
}
socket.leave(room)
socket.data.room = undefined
let users = await this.getSocketUsers(room)
// Remove this socket from redis
users = users.filter(user => user.sessionId !== sessionId)
await setSocketUsers(this.path, room, users)
socket.to(room).emit(SocketEvents.UserDisconnect, user)
}
// Updates a connected user's metadata, assuming a room change is not required.
async updateUser(socket: Socket, patch: Object) {
socket.data = {
...socket.data,
...patch,
}
// If we're in a room, notify others of this change and update redis
if (socket.data.room) {
await this.joinRoom(socket, socket.data.room)
}
}
async onConnect(socket: Socket) {
// Override
}
async onDisconnect(socket: Socket) {
// Override
}
// Emit an event to all sockets

View File

@ -67,3 +67,21 @@ export const SqlNumberTypeRangeMap = {
min: -8388608,
},
}
export const SocketEvents = {
UserUpdate: "UserUpdate",
UserDisconnect: "UserDisconnect",
GetUsers: "GetUsers"
}
export const GridSocketEvents = {
RowChange: "RowChange",
TableChange: "TableChange",
SelectTable: "SelectTable",
SelectCell: "SelectCell"
}
export const BuilderSocketEvents = {
TableChange: "TableChange",
DatasourceChange: "DatasourceChange"
}

View File

@ -17,3 +17,4 @@ export * from "./auditLogs"
export * from "./sso"
export * from "./user"
export * from "./cli"
export * from "./websocket"

View File

@ -0,0 +1,9 @@
export interface SocketUser {
_id: string,
email: string,
firstName?: string,
lastName?: string,
appId: string,
sessionId: string,
room?: string
}