Use socker.io redis adapter to broadcast events to all server instances

This commit is contained in:
Andrew Kingston 2023-05-25 08:48:56 +01:00
parent 01ffa34dea
commit 5e480a1527
7 changed files with 50 additions and 4 deletions

View File

@ -27,6 +27,7 @@ export enum Databases {
GENERIC_CACHE = "data_cache", GENERIC_CACHE = "data_cache",
WRITE_THROUGH = "writeThrough", WRITE_THROUGH = "writeThrough",
LOCKS = "locks", LOCKS = "locks",
SOCKET_IO = "socket_io",
} }
/** /**

View File

@ -58,6 +58,7 @@
"@koa/router": "8.0.8", "@koa/router": "8.0.8",
"@sendgrid/mail": "7.1.1", "@sendgrid/mail": "7.1.1",
"@sentry/node": "6.17.7", "@sentry/node": "6.17.7",
"@socket.io/redis-adapter": "^8.2.1",
"airtable": "0.10.1", "airtable": "0.10.1",
"arangojs": "7.2.0", "arangojs": "7.2.0",
"aws-sdk": "2.1030.0", "aws-sdk": "2.1030.0",

View File

@ -61,7 +61,6 @@ if (env.isProd()) {
const server = http.createServer(app.callback()) const server = http.createServer(app.callback())
destroyable(server) destroyable(server)
initialiseWebsockets(app, server)
let shuttingDown = false, let shuttingDown = false,
errCode = 0 errCode = 0

View File

@ -16,6 +16,7 @@ import * as bullboard from "./automations/bullboard"
import * as pro from "@budibase/pro" import * as pro from "@budibase/pro"
import * as api from "./api" import * as api from "./api"
import sdk from "./sdk" import sdk from "./sdk"
import { initialise as initialiseWebsockets } from "./websockets"
let STARTUP_RAN = false let STARTUP_RAN = false
@ -64,6 +65,7 @@ export async function startup(app?: any, server?: any) {
fileSystem.init() fileSystem.init()
await redis.init() await redis.init()
eventInit() eventInit()
initialiseWebsockets(app, server)
// run migrations on startup if not done via http // run migrations on startup if not done via http
// not recommended in a clustered environment // not recommended in a clustered environment

View File

@ -4,23 +4,33 @@ import { ContextUser } from "@budibase/types"
const APP_DEV_LOCK_SECONDS = 600 const APP_DEV_LOCK_SECONDS = 600
const AUTOMATION_TEST_FLAG_SECONDS = 60 const AUTOMATION_TEST_FLAG_SECONDS = 60
let devAppClient: any, debounceClient: any, flagClient: any let devAppClient: any, debounceClient: any, flagClient: any, socketClient: any
// we init this as we want to keep the connection open all the time // We need to maintain a duplicate client for socket.io pub/sub
let socketSubClient: any
// We init this as we want to keep the connection open all the time
// reduces the performance hit // reduces the performance hit
export async function init() { export async function init() {
devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS) devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS)
debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE) debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE)
flagClient = new redis.Client(redis.utils.Databases.FLAGS) flagClient = new redis.Client(redis.utils.Databases.FLAGS)
socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO)
await devAppClient.init() await devAppClient.init()
await debounceClient.init() await debounceClient.init()
await flagClient.init() await flagClient.init()
await socketClient.init()
// Duplicate the socket client for pub/sub
socketSubClient = socketClient.getClient().duplicate()
} }
export async function shutdown() { export async function shutdown() {
if (devAppClient) await devAppClient.finish() if (devAppClient) await devAppClient.finish()
if (debounceClient) await debounceClient.finish() if (debounceClient) await debounceClient.finish()
if (flagClient) await flagClient.finish() if (flagClient) await flagClient.finish()
if (socketClient) await socketClient.finish()
if (socketSubClient) socketSubClient.disconnect()
// shutdown core clients // shutdown core clients
await redis.clients.shutdown() await redis.clients.shutdown()
console.log("Redis shutdown") console.log("Redis shutdown")
@ -86,3 +96,10 @@ export async function checkTestFlag(id: string) {
export async function clearTestFlag(id: string) { export async function clearTestFlag(id: string) {
await devAppClient.delete(id) await devAppClient.delete(id)
} }
export function getSocketPubSubClients() {
return {
pub: socketClient.getClient(),
sub: socketSubClient,
}
}

View File

@ -5,6 +5,8 @@ import Cookies from "cookies"
import { userAgent } from "koa-useragent" import { userAgent } from "koa-useragent"
import { auth } from "@budibase/backend-core" import { auth } from "@budibase/backend-core"
import currentApp from "../middleware/currentapp" import currentApp from "../middleware/currentapp"
import { createAdapter } from "@socket.io/redis-adapter"
import { getSocketPubSubClients } from "../utilities/redis"
export default class Socket { export default class Socket {
io: Server io: Server
@ -12,7 +14,7 @@ export default class Socket {
constructor( constructor(
app: Koa, app: Koa,
server: http.Server, server: http.Server,
path: string, path: string = "/",
additionalMiddlewares?: any[] additionalMiddlewares?: any[]
) { ) {
this.io = new Server(server, { this.io = new Server(server, {
@ -97,6 +99,11 @@ export default class Socket {
next(error) next(error)
} }
}) })
// Instantiate redis adapter
const { pub, sub } = getSocketPubSubClients()
const opts = { key: `socket.io-${path}` }
this.io.adapter(createAdapter(pub, sub, opts))
} }
// Emit an event to all sockets // Emit an event to all sockets

View File

@ -4623,6 +4623,15 @@
resolved "https://registry.yarnpkg.com/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz#96116f2a912e0c02817345b3c10751069920d553" resolved "https://registry.yarnpkg.com/@socket.io/component-emitter/-/component-emitter-3.1.0.tgz#96116f2a912e0c02817345b3c10751069920d553"
integrity sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg== integrity sha512-+9jVqKhRSpsc591z5vX+X5Yyw+he/HCB4iQ/RYxw35CEPaY1gnsNE43nf9n9AaYjAQrTiI/mOwKUKdUs9vf7Xg==
"@socket.io/redis-adapter@^8.2.1":
version "8.2.1"
resolved "https://registry.yarnpkg.com/@socket.io/redis-adapter/-/redis-adapter-8.2.1.tgz#36f75afc518d0e1fa4fa7c29e6d042f53ee7563b"
integrity sha512-6Dt7EZgGSBP0qvXeOKGx7NnSr2tPMbVDfDyL97zerZo+v69hMfL99skMCL3RKZlWVqLyRme2T0wcy3udHhtOsg==
dependencies:
debug "~4.3.1"
notepack.io "~3.0.1"
uid2 "1.0.0"
"@spectrum-css/accordion@3.0.24": "@spectrum-css/accordion@3.0.24":
version "3.0.24" version "3.0.24"
resolved "https://registry.yarnpkg.com/@spectrum-css/accordion/-/accordion-3.0.24.tgz#f89066c120c57b0cfc9aba66d60c39fc1cf69f74" resolved "https://registry.yarnpkg.com/@spectrum-css/accordion/-/accordion-3.0.24.tgz#f89066c120c57b0cfc9aba66d60c39fc1cf69f74"
@ -18641,6 +18650,11 @@ normalize-url@^6.0.1:
resolved "https://registry.yarnpkg.com/normalize-url/-/normalize-url-6.1.0.tgz#40d0885b535deffe3f3147bec877d05fe4c5668a" resolved "https://registry.yarnpkg.com/normalize-url/-/normalize-url-6.1.0.tgz#40d0885b535deffe3f3147bec877d05fe4c5668a"
integrity sha512-DlL+XwOy3NxAQ8xuC0okPgK46iuVNAK01YN7RueYBqqFeGsBjV9XmCAzAdgt+667bCl5kPh9EqKKDwnaPG1I7A== integrity sha512-DlL+XwOy3NxAQ8xuC0okPgK46iuVNAK01YN7RueYBqqFeGsBjV9XmCAzAdgt+667bCl5kPh9EqKKDwnaPG1I7A==
notepack.io@~3.0.1:
version "3.0.1"
resolved "https://registry.yarnpkg.com/notepack.io/-/notepack.io-3.0.1.tgz#2c2c9de1bd4e64a79d34e33c413081302a0d4019"
integrity sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg==
npm-bundled@^1.1.2: npm-bundled@^1.1.2:
version "1.1.2" version "1.1.2"
resolved "https://registry.yarnpkg.com/npm-bundled/-/npm-bundled-1.1.2.tgz#944c78789bd739035b70baa2ca5cc32b8d860bc1" resolved "https://registry.yarnpkg.com/npm-bundled/-/npm-bundled-1.1.2.tgz#944c78789bd739035b70baa2ca5cc32b8d860bc1"
@ -24787,6 +24801,11 @@ uid2@0.0.x:
resolved "https://registry.yarnpkg.com/uid2/-/uid2-0.0.4.tgz#033f3b1d5d32505f5ce5f888b9f3b667123c0a44" resolved "https://registry.yarnpkg.com/uid2/-/uid2-0.0.4.tgz#033f3b1d5d32505f5ce5f888b9f3b667123c0a44"
integrity sha512-IevTus0SbGwQzYh3+fRsAMTVVPOoIVufzacXcHPmdlle1jUpq7BRL+mw3dgeLanvGZdwwbWhRV6XrcFNdBmjWA== integrity sha512-IevTus0SbGwQzYh3+fRsAMTVVPOoIVufzacXcHPmdlle1jUpq7BRL+mw3dgeLanvGZdwwbWhRV6XrcFNdBmjWA==
uid2@1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/uid2/-/uid2-1.0.0.tgz#ef8d95a128d7c5c44defa1a3d052eecc17a06bfb"
integrity sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ==
unbox-primitive@^1.0.2: unbox-primitive@^1.0.2:
version "1.0.2" version "1.0.2"
resolved "https://registry.yarnpkg.com/unbox-primitive/-/unbox-primitive-1.0.2.tgz#29032021057d5e6cdbd08c5129c226dff8ed6f9e" resolved "https://registry.yarnpkg.com/unbox-primitive/-/unbox-primitive-1.0.2.tgz#29032021057d5e6cdbd08c5129c226dff8ed6f9e"