Ensure socket pruning due to redis TTL expiry works as expected
This commit is contained in:
parent
7a307e3de8
commit
a7c6298d1f
|
@ -20,9 +20,9 @@ export const getUserStore = () => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const removeUser = user => {
|
const removeUser = sessionId => {
|
||||||
store.update(state => {
|
store.update(state => {
|
||||||
return state.filter(x => x.sessionId !== user.sessionId)
|
return state.filter(x => x.sessionId !== sessionId)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,9 +51,9 @@ export const deriveStores = context => {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const removeUser = user => {
|
const removeUser = sessionId => {
|
||||||
users.update(state => {
|
users.update(state => {
|
||||||
return state.filter(x => x.sessionId !== user.sessionId)
|
return state.filter(x => x.sessionId !== sessionId)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -148,21 +148,22 @@ export class BaseSocket {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gets an array of all redis keys of users inside a certain room
|
// Gets an array of all redis keys of users inside a certain room
|
||||||
async getRoomSessionKeys(room: string): Promise<string[]> {
|
async getRoomSessionIds(room: string): Promise<string[]> {
|
||||||
const keys = await this.redisClient?.get(this.getRoomKey(room))
|
const keys = await this.redisClient?.get(this.getRoomKey(room))
|
||||||
return keys || []
|
return keys || []
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sets the list of redis keys for users inside a certain room.
|
// Sets the list of redis keys for users inside a certain room.
|
||||||
// There is no TTL on the actual room key map itself.
|
// There is no TTL on the actual room key map itself.
|
||||||
async setRoomSessionKeys(room: string, keys: string[]) {
|
async setRoomSessionIds(room: string, ids: string[]) {
|
||||||
await this.redisClient?.store(this.getRoomKey(room), keys)
|
await this.redisClient?.store(this.getRoomKey(room), ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Gets a list of all users inside a certain room
|
// Gets a list of all users inside a certain room
|
||||||
async getRoomSessions(room?: string): Promise<SocketSession[]> {
|
async getRoomSessions(room?: string): Promise<SocketSession[]> {
|
||||||
if (room) {
|
if (room) {
|
||||||
const keys = await this.getRoomSessionKeys(room)
|
const sessionIds = await this.getRoomSessionIds(room)
|
||||||
|
const keys = sessionIds.map(this.getSessionKey.bind(this))
|
||||||
const sessions = await this.redisClient?.bulkGet(keys)
|
const sessions = await this.redisClient?.bulkGet(keys)
|
||||||
return Object.values(sessions || {})
|
return Object.values(sessions || {})
|
||||||
} else {
|
} else {
|
||||||
|
@ -173,20 +174,20 @@ export class BaseSocket {
|
||||||
// Detects keys which have been pruned from redis due to TTL expiry in a certain
|
// Detects keys which have been pruned from redis due to TTL expiry in a certain
|
||||||
// room and broadcasts disconnection messages to ensure clients are aware
|
// room and broadcasts disconnection messages to ensure clients are aware
|
||||||
async pruneRoom(room: string) {
|
async pruneRoom(room: string) {
|
||||||
const keys = await this.getRoomSessionKeys(room)
|
const sessionIds = await this.getRoomSessionIds(room)
|
||||||
const keysExist = await Promise.all(
|
const sessionsExist = await Promise.all(
|
||||||
keys.map(key => this.redisClient?.exists(key))
|
sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id)))
|
||||||
)
|
)
|
||||||
const prunedKeys = keys.filter((key, idx) => {
|
const prunedSessionIds = sessionIds.filter((id, idx) => {
|
||||||
if (!keysExist[idx]) {
|
if (!sessionsExist[idx]) {
|
||||||
console.log("pruning key", keys[idx])
|
this.io.to(room).emit(SocketEvents.UserDisconnect, sessionIds[idx])
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
||||||
// Store new pruned keys
|
// Store new pruned keys
|
||||||
await this.setRoomSessionKeys(room, prunedKeys)
|
await this.setRoomSessionIds(room, prunedSessionIds)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adds a user to a certain room
|
// Adds a user to a certain room
|
||||||
|
@ -213,11 +214,12 @@ export class BaseSocket {
|
||||||
// Store in redis
|
// Store in redis
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
let user: SocketSession = socket.data
|
let user: SocketSession = socket.data
|
||||||
const key = this.getSessionKey(user.sessionId)
|
const { sessionId } = user
|
||||||
|
const key = this.getSessionKey(sessionId)
|
||||||
await this.redisClient?.store(key, user, SocketSessionTTL)
|
await this.redisClient?.store(key, user, SocketSessionTTL)
|
||||||
const roomKeys = await this.getRoomSessionKeys(room)
|
const sessionIds = await this.getRoomSessionIds(room)
|
||||||
if (!roomKeys.includes(key)) {
|
if (!sessionIds.includes(sessionId)) {
|
||||||
await this.setRoomSessionKeys(room, [...roomKeys, key])
|
await this.setRoomSessionIds(room, [...sessionIds, sessionId])
|
||||||
}
|
}
|
||||||
|
|
||||||
// Notify other users
|
// Notify other users
|
||||||
|
@ -240,14 +242,14 @@ export class BaseSocket {
|
||||||
// Delete from redis
|
// Delete from redis
|
||||||
const key = this.getSessionKey(sessionId)
|
const key = this.getSessionKey(sessionId)
|
||||||
await this.redisClient?.delete(key)
|
await this.redisClient?.delete(key)
|
||||||
const roomKeys = await this.getRoomSessionKeys(room)
|
const sessionIds = await this.getRoomSessionIds(room)
|
||||||
await this.setRoomSessionKeys(
|
await this.setRoomSessionIds(
|
||||||
room,
|
room,
|
||||||
roomKeys.filter(k => k !== key)
|
sessionIds.filter(id => id !== sessionId)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Notify other users
|
// Notify other users
|
||||||
socket.to(room).emit(SocketEvents.UserDisconnect, user)
|
socket.to(room).emit(SocketEvents.UserDisconnect, sessionId)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Updates a connected user's metadata, assuming a room change is not required.
|
// Updates a connected user's metadata, assuming a room change is not required.
|
||||||
|
|
Loading…
Reference in New Issue