diff --git a/lerna.json b/lerna.json
index 4bc22e9c1d..4361afa2b7 100644
--- a/lerna.json
+++ b/lerna.json
@@ -1,5 +1,5 @@
{
- "version": "2.6.19-alpha.36",
+ "version": "2.6.19-alpha.37",
"npmClient": "yarn",
"packages": [
"packages/backend-core",
diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts
index 485268edad..55ffe3dd12 100644
--- a/packages/backend-core/src/redis/init.ts
+++ b/packages/backend-core/src/redis/init.ts
@@ -6,7 +6,8 @@ let userClient: Client,
appClient: Client,
cacheClient: Client,
writethroughClient: Client,
- lockClient: Client
+ lockClient: Client,
+ socketClient: Client
async function init() {
userClient = await new Client(utils.Databases.USER_CACHE).init()
@@ -14,9 +15,10 @@ async function init() {
appClient = await new Client(utils.Databases.APP_METADATA).init()
cacheClient = await new Client(utils.Databases.GENERIC_CACHE).init()
lockClient = await new Client(utils.Databases.LOCKS).init()
- writethroughClient = await new Client(
- utils.Databases.WRITE_THROUGH,
- utils.SelectableDatabase.WRITE_THROUGH
+ writethroughClient = await new Client(utils.Databases.WRITE_THROUGH).init()
+ socketClient = await new Client(
+ utils.Databases.SOCKET_IO,
+ utils.SelectableDatabase.SOCKET_IO
).init()
}
@@ -27,6 +29,7 @@ export async function shutdown() {
if (cacheClient) await cacheClient.finish()
if (writethroughClient) await writethroughClient.finish()
if (lockClient) await lockClient.finish()
+ if (socketClient) await socketClient.finish()
}
process.on("exit", async () => {
@@ -74,3 +77,10 @@ export async function getLockClient() {
}
return lockClient
}
+
+export async function getSocketClient() {
+ if (!socketClient) {
+ await init()
+ }
+ return socketClient
+}
diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts
index 60d3966e27..48057dec0b 100644
--- a/packages/backend-core/src/redis/redis.ts
+++ b/packages/backend-core/src/redis/redis.ts
@@ -174,6 +174,9 @@ class RedisWrapper {
CLOSED = false
init(this._select)
await waitForConnection(this._select)
+ if (this._select && !env.isTest()) {
+ this.getClient().select(this._select)
+ }
return this
}
@@ -200,6 +203,11 @@ class RedisWrapper {
return this.getClient().keys(addDbPrefix(db, pattern))
}
+ async exists(key: string) {
+ const db = this._db
+ return await this.getClient().exists(addDbPrefix(db, key))
+ }
+
async get(key: string) {
const db = this._db
let response = await this.getClient().get(addDbPrefix(db, key))
diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts
index ab39acf215..34b7275a2b 100644
--- a/packages/backend-core/src/redis/utils.ts
+++ b/packages/backend-core/src/redis/utils.ts
@@ -41,7 +41,7 @@ export enum Databases {
*/
export enum SelectableDatabase {
DEFAULT = 0,
- WRITE_THROUGH = 1,
+ SOCKET_IO = 1,
UNUSED_1 = 2,
UNUSED_2 = 3,
UNUSED_3 = 4,
diff --git a/packages/builder/src/builderStore/store/frontend.js b/packages/builder/src/builderStore/store/frontend.js
index c921ddef3b..4f2a1bef6f 100644
--- a/packages/builder/src/builderStore/store/frontend.js
+++ b/packages/builder/src/builderStore/store/frontend.js
@@ -118,7 +118,7 @@ export const getFrontendStore = () => {
},
initialise: async pkg => {
const { layouts, screens, application, clientLibPath, hasLock } = pkg
- websocket = createBuilderWebsocket()
+ websocket = createBuilderWebsocket(application.appId)
await store.actions.components.refreshDefinitions(application.appId)
// Reset store state
diff --git a/packages/builder/src/builderStore/store/users.js b/packages/builder/src/builderStore/store/users.js
index 18f343c884..6e10b081d6 100644
--- a/packages/builder/src/builderStore/store/users.js
+++ b/packages/builder/src/builderStore/store/users.js
@@ -20,9 +20,9 @@ export const getUserStore = () => {
}
}
- const removeUser = user => {
+ const removeUser = sessionId => {
store.update(state => {
- return state.filter(x => x.sessionId !== user.sessionId)
+ return state.filter(x => x.sessionId !== sessionId)
})
}
diff --git a/packages/builder/src/builderStore/websocket.js b/packages/builder/src/builderStore/websocket.js
index d58eb24bbf..6a3ce76ae6 100644
--- a/packages/builder/src/builderStore/websocket.js
+++ b/packages/builder/src/builderStore/websocket.js
@@ -1,13 +1,14 @@
import { createWebsocket } from "@budibase/frontend-core"
import { userStore } from "builderStore"
import { datasources, tables } from "stores/backend"
+import { SocketEvent, BuilderSocketEvent } from "@budibase/shared-core"
-export const createBuilderWebsocket = () => {
+export const createBuilderWebsocket = appId => {
const socket = createWebsocket("/socket/builder")
- // Connection events
+ // Built-in events
socket.on("connect", () => {
- socket.emit("get-users", null, response => {
+ socket.emit(BuilderSocketEvent.SelectApp, appId, response => {
userStore.actions.init(response.users)
})
})
@@ -16,24 +17,23 @@ export const createBuilderWebsocket = () => {
})
// User events
- socket.on("user-update", userStore.actions.updateUser)
- socket.on("user-disconnect", userStore.actions.removeUser)
+ socket.on(SocketEvent.UserUpdate, userStore.actions.updateUser)
+ socket.on(SocketEvent.UserDisconnect, userStore.actions.removeUser)
// Table events
- socket.on("table-change", ({ id, table }) => {
+ socket.on(BuilderSocketEvent.TableChange, ({ id, table }) => {
tables.replaceTable(id, table)
})
// Datasource events
- socket.on("datasource-change", ({ id, datasource }) => {
+ socket.on(BuilderSocketEvent.DatasourceChange, ({ id, datasource }) => {
datasources.replaceDatasource(id, datasource)
})
- return {
- ...socket,
- disconnect: () => {
- socket?.disconnect()
- userStore.actions.reset()
- },
- }
+ // Clean up user store on disconnect
+ socket.on("disconnect", () => {
+ userStore.actions.reset()
+ })
+
+ return socket
}
diff --git a/packages/builder/src/pages/builder/portal/overview/[appId]/overview.svelte b/packages/builder/src/pages/builder/portal/overview/[appId]/overview.svelte
index 4e84b78f67..0932d2d79a 100644
--- a/packages/builder/src/pages/builder/portal/overview/[appId]/overview.svelte
+++ b/packages/builder/src/pages/builder/portal/overview/[appId]/overview.svelte
@@ -51,6 +51,26 @@
return groups.actions.getGroupAppIds(group).includes(prodAppId)
})
+ const updateDeploymentString = () => {
+ return deployments?.length
+ ? processStringSync(
+ "Last published {{ duration time 'millisecond' }} ago",
+ {
+ time:
+ new Date().getTime() -
+ new Date(deployments[0].updatedAt).getTime(),
+ }
+ )
+ : ""
+ }
+ // App is updating in the layout asynchronously
+ $: if ($store.appId?.length) {
+ fetchDeployments().then(resp => {
+ deployments = resp
+ })
+ }
+ $: deploymentString = updateDeploymentString(deployments)
+
async function fetchAppEditor(editorId) {
appEditor = await users.get(editorId)
}
@@ -107,19 +127,11 @@
- {#if deployments?.length}
- {processStringSync(
- "Last published {{ duration time 'millisecond' }} ago",
- {
- time:
- new Date().getTime() -
- new Date(deployments[0].updatedAt).getTime(),
- }
- )}
- {#if isPublished}
- - Unpublish
- {/if}
+ {#if isPublished}
+ {deploymentString}
+ - Unpublish
{/if}
+
{#if !deployments?.length}
-
{/if}
diff --git a/packages/client/src/websocket.js b/packages/client/src/websocket.js
index a3356b7eaa..1e8e908d2b 100644
--- a/packages/client/src/websocket.js
+++ b/packages/client/src/websocket.js
@@ -18,7 +18,7 @@ export const initWebsocket = () => {
}
// Initialise connection
- socket = createWebsocket("/socket/client")
+ socket = createWebsocket("/socket/client", false)
// Event handlers
socket.on("plugin-update", data => {
diff --git a/packages/frontend-core/src/components/grid/lib/websocket.js b/packages/frontend-core/src/components/grid/lib/websocket.js
index 3f1c473ea0..bb1d6991b0 100644
--- a/packages/frontend-core/src/components/grid/lib/websocket.js
+++ b/packages/frontend-core/src/components/grid/lib/websocket.js
@@ -1,5 +1,6 @@
import { get } from "svelte/store"
import { createWebsocket } from "../../../utils"
+import { SocketEvent, GridSocketEvent } from "@budibase/shared-core"
export const createGridWebsocket = context => {
const { rows, tableId, users, focusedCellId, table } = context
@@ -10,13 +11,13 @@ export const createGridWebsocket = context => {
return
}
// Identify which table we are editing
- socket.emit("select-table", tableId, response => {
+ socket.emit(GridSocketEvent.SelectTable, tableId, response => {
// handle initial connection info
users.set(response.users)
})
}
- // Connection events
+ // Built-in events
socket.on("connect", () => {
connectToTable(get(tableId))
})
@@ -25,25 +26,25 @@ export const createGridWebsocket = context => {
})
// User events
- socket.on("user-update", user => {
+ socket.on(SocketEvent.UserUpdate, user => {
users.actions.updateUser(user)
})
- socket.on("user-disconnect", user => {
+ socket.on(SocketEvent.UserDisconnect, user => {
users.actions.removeUser(user)
})
// Row events
- socket.on("row-change", async data => {
+ socket.on(GridSocketEvent.RowChange, async data => {
if (data.id) {
rows.actions.replaceRow(data.id, data.row)
} else if (data.row.id) {
- // Handle users table edge case
+ // Handle users table edge cased
await rows.actions.refreshRow(data.row.id)
}
})
// Table events
- socket.on("table-change", data => {
+ socket.on(GridSocketEvent.TableChange, data => {
// Only update table if one exists. If the table was deleted then we don't
// want to know - let the builder navigate away
if (data.table) {
@@ -56,7 +57,7 @@ export const createGridWebsocket = context => {
// Notify selected cell changes
focusedCellId.subscribe($focusedCellId => {
- socket.emit("select-cell", $focusedCellId)
+ socket.emit(GridSocketEvent.SelectCell, $focusedCellId)
})
return () => socket?.disconnect()
diff --git a/packages/frontend-core/src/components/grid/stores/users.js b/packages/frontend-core/src/components/grid/stores/users.js
index 5368c414ce..b6e74ef276 100644
--- a/packages/frontend-core/src/components/grid/stores/users.js
+++ b/packages/frontend-core/src/components/grid/stores/users.js
@@ -51,9 +51,9 @@ export const deriveStores = context => {
}
}
- const removeUser = user => {
+ const removeUser = sessionId => {
users.update(state => {
- return state.filter(x => x.sessionId !== user.sessionId)
+ return state.filter(x => x.sessionId !== sessionId)
})
}
diff --git a/packages/frontend-core/src/utils/websocket.js b/packages/frontend-core/src/utils/websocket.js
index 839fa6d73d..561a020e13 100644
--- a/packages/frontend-core/src/utils/websocket.js
+++ b/packages/frontend-core/src/utils/websocket.js
@@ -1,6 +1,7 @@
import { io } from "socket.io-client"
+import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core"
-export const createWebsocket = path => {
+export const createWebsocket = (path, heartbeat = true) => {
if (!path) {
throw "A websocket path must be provided"
}
@@ -10,7 +11,7 @@ export const createWebsocket = path => {
const proto = tls ? "wss:" : "ws:"
const host = location.hostname
const port = location.port || (tls ? 443 : 80)
- return io(`${proto}//${host}:${port}`, {
+ const socket = io(`${proto}//${host}:${port}`, {
path,
// Cap reconnection attempts to 3 (total of 15 seconds before giving up)
reconnectionAttempts: 3,
@@ -23,4 +24,18 @@ export const createWebsocket = path => {
// will only work with sticky sessions which we don't have
transports: ["websocket"],
})
+
+ // Set up a heartbeat that's half of the session TTL
+ let interval
+ if (heartbeat) {
+ interval = setInterval(() => {
+ socket.emit(SocketEvent.Heartbeat)
+ }, SocketSessionTTL * 500)
+ }
+
+ socket.on("disconnect", () => {
+ clearInterval(interval)
+ })
+
+ return socket
}
diff --git a/packages/server/src/api/controllers/deploy/index.ts b/packages/server/src/api/controllers/deploy/index.ts
index 60239d2c4c..6a31998c14 100644
--- a/packages/server/src/api/controllers/deploy/index.ts
+++ b/packages/server/src/api/controllers/deploy/index.ts
@@ -104,6 +104,7 @@ export async function fetchDeployments(ctx: any) {
}
ctx.body = Object.values(deployments.history).reverse()
} catch (err) {
+ console.error(err)
ctx.body = []
}
}
diff --git a/packages/server/src/utilities/redis.ts b/packages/server/src/utilities/redis.ts
index d7e9b16290..d8605a0e29 100644
--- a/packages/server/src/utilities/redis.ts
+++ b/packages/server/src/utilities/redis.ts
@@ -8,9 +8,9 @@ const AUTOMATION_TEST_FLAG_SECONDS = 60
let devAppClient: RedisClient,
debounceClient: RedisClient,
flagClient: RedisClient,
- socketClient: RedisClient
// We need to maintain a duplicate client for socket.io pub/sub
+let socketClient: RedisClient
let socketSubClient: any
// We init this as we want to keep the connection open all the time
@@ -19,21 +19,20 @@ export async function init() {
devAppClient = new redis.Client(redis.utils.Databases.DEV_LOCKS)
debounceClient = new redis.Client(redis.utils.Databases.DEBOUNCE)
flagClient = new redis.Client(redis.utils.Databases.FLAGS)
- socketClient = new redis.Client(redis.utils.Databases.SOCKET_IO)
await devAppClient.init()
await debounceClient.init()
await flagClient.init()
- await socketClient.init()
// Duplicate the socket client for pub/sub
+ socketClient = await redis.clients.getSocketClient()
socketSubClient = socketClient.getClient().duplicate()
}
export async function shutdown() {
+ console.log("REDIS SHUTDOWN")
if (devAppClient) await devAppClient.finish()
if (debounceClient) await debounceClient.finish()
if (flagClient) await flagClient.finish()
- if (socketClient) await socketClient.finish()
if (socketSubClient) socketSubClient.disconnect()
// shutdown core clients
await redis.clients.shutdown()
diff --git a/packages/server/src/websockets/builder.ts b/packages/server/src/websockets/builder.ts
index d856dd42ae..6a0a401925 100644
--- a/packages/server/src/websockets/builder.ts
+++ b/packages/server/src/websockets/builder.ts
@@ -1,69 +1,74 @@
import authorized from "../middleware/authorized"
-import Socket from "./websocket"
+import { BaseSocket } from "./websocket"
import { permissions } from "@budibase/backend-core"
import http from "http"
import Koa from "koa"
-import { Datasource, Table } from "@budibase/types"
+import { Datasource, Table, SocketSession, ContextUser } from "@budibase/types"
import { gridSocket } from "./index"
import { clearLock } from "../utilities/redis"
+import { Socket } from "socket.io"
+import { BuilderSocketEvent } from "@budibase/shared-core"
-export default class BuilderSocket extends Socket {
+export default class BuilderSocket extends BaseSocket {
constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/builder", [authorized(permissions.BUILDER)])
+ }
- this.io.on("connection", socket => {
- // Join a room for this app
- const user = socket.data.user
- const appId = socket.data.appId
- socket.join(appId)
- socket.to(appId).emit("user-update", user)
+ async onConnect(socket: ?Socket) {
+ // Initial identification of selected app
+ socket.on(BuilderSocketEvent.SelectApp, async (appId, callback) => {
+ await this.joinRoom(socket, appId)
- // Initial identification of connected spreadsheet
- socket.on("get-users", async (payload, callback) => {
- const sockets = await this.io.in(appId).fetchSockets()
- callback({
- users: sockets.map(socket => socket.data.user),
- })
- })
-
- // Disconnection cleanup
- socket.on("disconnect", async () => {
- socket.to(appId).emit("user-disconnect", user)
-
- // Remove app lock from this user if they have no other connections
- try {
- const sockets = await this.io.in(appId).fetchSockets()
- const hasOtherConnection = sockets.some(socket => {
- const { _id, sessionId } = socket.data.user
- return _id === user._id && sessionId !== user.sessionId
- })
- if (!hasOtherConnection) {
- await clearLock(appId, user)
- }
- } catch (e) {
- // This is fine, just means this user didn't hold the lock
- }
- })
+ // Reply with all users in current room
+ const sessions = await this.getRoomSessions(appId)
+ callback({ users: sessions })
})
}
+ async onDisconnect(socket: Socket) {
+ // Remove app lock from this user if they have no other connections
+ try {
+ // @ts-ignore
+ const session: SocketSession = socket.data
+ const { _id, sessionId, room } = session
+ const sessions = await this.getRoomSessions(room)
+ const hasOtherSession = sessions.some(otherSession => {
+ return _id === otherSession._id && sessionId !== otherSession.sessionId
+ })
+ if (!hasOtherSession && room) {
+ // @ts-ignore
+ const user: ContextUser = { _id: socket.data._id }
+ await clearLock(room, user)
+ }
+ } catch (e) {
+ // This is fine, just means this user didn't hold the lock
+ }
+ }
+
emitTableUpdate(ctx: any, table: Table) {
- this.io.in(ctx.appId).emit("table-change", { id: table._id, table })
+ this.io
+ .in(ctx.appId)
+ .emit(BuilderSocketEvent.TableChange, { id: table._id, table })
gridSocket?.emitTableUpdate(table)
}
emitTableDeletion(ctx: any, id: string) {
- this.io.in(ctx.appId).emit("table-change", { id, table: null })
+ this.io
+ .in(ctx.appId)
+ .emit(BuilderSocketEvent.TableChange, { id, table: null })
gridSocket?.emitTableDeletion(id)
}
emitDatasourceUpdate(ctx: any, datasource: Datasource) {
- this.io
- .in(ctx.appId)
- .emit("datasource-change", { id: datasource._id, datasource })
+ this.io.in(ctx.appId).emit(BuilderSocketEvent.DatasourceChange, {
+ id: datasource._id,
+ datasource,
+ })
}
emitDatasourceDeletion(ctx: any, id: string) {
- this.io.in(ctx.appId).emit("datasource-change", { id, datasource: null })
+ this.io
+ .in(ctx.appId)
+ .emit(BuilderSocketEvent.DatasourceChange, { id, datasource: null })
}
}
diff --git a/packages/server/src/websockets/client.ts b/packages/server/src/websockets/client.ts
index d59325f66e..165febb7a3 100644
--- a/packages/server/src/websockets/client.ts
+++ b/packages/server/src/websockets/client.ts
@@ -1,10 +1,10 @@
-import Socket from "./websocket"
+import { BaseSocket } from "./websocket"
import authorized from "../middleware/authorized"
import http from "http"
import Koa from "koa"
import { permissions } from "@budibase/backend-core"
-export default class ClientAppWebsocket extends Socket {
+export default class ClientAppWebsocket extends BaseSocket {
constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/client", [authorized(permissions.BUILDER)])
}
diff --git a/packages/server/src/websockets/grid.ts b/packages/server/src/websockets/grid.ts
index a0272be33c..440b4f78a5 100644
--- a/packages/server/src/websockets/grid.ts
+++ b/packages/server/src/websockets/grid.ts
@@ -1,73 +1,51 @@
import authorized from "../middleware/authorized"
-import Socket from "./websocket"
+import { BaseSocket } from "./websocket"
import { permissions } from "@budibase/backend-core"
import http from "http"
import Koa from "koa"
import { getTableId } from "../api/controllers/row/utils"
import { Row, Table } from "@budibase/types"
+import { Socket } from "socket.io"
+import { GridSocketEvent } from "@budibase/shared-core"
-export default class GridSocket extends Socket {
+export default class GridSocket extends BaseSocket {
constructor(app: Koa, server: http.Server) {
super(app, server, "/socket/grid", [authorized(permissions.BUILDER)])
+ }
- this.io.on("connection", socket => {
- const user = socket.data.user
+ async onConnect(socket: Socket) {
+ // Initial identification of connected spreadsheet
+ socket.on(GridSocketEvent.SelectTable, async (tableId, callback) => {
+ await this.joinRoom(socket, tableId)
- // Socket state
- let currentRoom: string
+ // Reply with all users in current room
+ const sessions = await this.getRoomSessions(tableId)
+ callback({ users: sessions })
+ })
- // Initial identification of connected spreadsheet
- socket.on("select-table", async (tableId, callback) => {
- // Leave current room
- if (currentRoom) {
- socket.to(currentRoom).emit("user-disconnect", user)
- socket.leave(currentRoom)
- }
-
- // Join new room
- currentRoom = tableId
- socket.join(currentRoom)
- socket.to(currentRoom).emit("user-update", user)
-
- // Reply with all users in current room
- const sockets = await this.io.in(currentRoom).fetchSockets()
- callback({
- users: sockets.map(socket => socket.data.user),
- })
- })
-
- // Handle users selecting a new cell
- socket.on("select-cell", cellId => {
- socket.data.user.focusedCellId = cellId
- if (currentRoom) {
- socket.to(currentRoom).emit("user-update", user)
- }
- })
-
- // Disconnection cleanup
- socket.on("disconnect", () => {
- if (currentRoom) {
- socket.to(currentRoom).emit("user-disconnect", user)
- }
- })
+ // Handle users selecting a new cell
+ socket.on(GridSocketEvent.SelectCell, cellId => {
+ this.updateUser(socket, { focusedCellId: cellId })
})
}
emitRowUpdate(ctx: any, row: Row) {
const tableId = getTableId(ctx)
- this.io.in(tableId).emit("row-change", { id: row._id, row })
+ this.io.in(tableId).emit(GridSocketEvent.RowChange, { id: row._id, row })
}
emitRowDeletion(ctx: any, id: string) {
const tableId = getTableId(ctx)
- this.io.in(tableId).emit("row-change", { id, row: null })
+ this.io.in(tableId).emit(GridSocketEvent.RowChange, { id, row: null })
}
emitTableUpdate(table: Table) {
- this.io.in(table._id!).emit("table-change", { id: table._id, table })
+ this.io
+ .in(table._id!)
+ .emit(GridSocketEvent.TableChange, { id: table._id, table })
}
emitTableDeletion(id: string) {
- this.io.in(id).emit("table-change", { id, table: null })
+ this.io.in(id).emit(GridSocketEvent.TableChange, { id, table: null })
}
}
diff --git a/packages/server/src/websockets/websocket.ts b/packages/server/src/websockets/websocket.ts
index 5cac959099..a2d2474c77 100644
--- a/packages/server/src/websockets/websocket.ts
+++ b/packages/server/src/websockets/websocket.ts
@@ -3,14 +3,18 @@ import http from "http"
import Koa from "koa"
import Cookies from "cookies"
import { userAgent } from "koa-useragent"
-import { auth } from "@budibase/backend-core"
+import { auth, redis } from "@budibase/backend-core"
import currentApp from "../middleware/currentapp"
import { createAdapter } from "@socket.io/redis-adapter"
+import { Socket } from "socket.io"
import { getSocketPubSubClients } from "../utilities/redis"
-import uuid from "uuid"
+import { SocketEvent, SocketSessionTTL } from "@budibase/shared-core"
+import { SocketSession } from "@budibase/types"
-export default class Socket {
+export class BaseSocket {
io: Server
+ path: string
+ redisClient?: redis.Client
constructor(
app: Koa,
@@ -18,6 +22,7 @@ export default class Socket {
path: string = "/",
additionalMiddlewares?: any[]
) {
+ this.path = path
this.io = new Server(server, {
path,
})
@@ -65,18 +70,14 @@ export default class Socket {
// Middlewares are finished
// Extract some data from our enriched koa context to persist
// as metadata for the socket
- // Add user info, including a deterministic color and label
const { _id, email, firstName, lastName } = ctx.user
- socket.data.user = {
+ socket.data = {
_id,
email,
firstName,
lastName,
- sessionId: uuid.v4(),
+ sessionId: socket.id,
}
-
- // Add app ID to help split sockets into rooms
- socket.data.appId = ctx.appId
next()
}
})
@@ -86,10 +87,184 @@ export default class Socket {
}
})
- // Instantiate redis adapter
+ // Initialise redis before handling connections
+ this.initialise().then(() => {
+ this.io.on("connection", async socket => {
+ // Add built in handler for heartbeats
+ socket.on(SocketEvent.Heartbeat, async () => {
+ console.log(socket.data.email, "heartbeat received")
+ await this.extendSessionTTL(socket.data.sessionId)
+ })
+
+ // Add early disconnection handler to clean up and leave room
+ socket.on("disconnect", async () => {
+ // Run any custom disconnection logic before we leave the room,
+ // so that we have access to their room etc before disconnection
+ await this.onDisconnect(socket)
+
+ // Leave the current room when the user disconnects if we're in one
+ await this.leaveRoom(socket)
+ })
+
+ // Add handlers for this socket
+ await this.onConnect(socket)
+ })
+ })
+ }
+
+ async initialise() {
+ // Instantiate redis adapter.
+ // We use a fully qualified key name here as this bypasses the normal
+ // redis client#s key prefixing.
const { pub, sub } = getSocketPubSubClients()
- const opts = { key: `socket.io-${path}` }
+ const opts = {
+ key: `${redis.utils.Databases.SOCKET_IO}-${this.path}-pubsub`,
+ }
this.io.adapter(createAdapter(pub, sub, opts))
+
+ // Fetch redis client
+ this.redisClient = await redis.clients.getSocketClient()
+ }
+
+ // Gets the redis key for a certain session ID
+ getSessionKey(sessionId: string) {
+ return `${this.path}-session:${sessionId}`
+ }
+
+ // Gets the redis key for certain room name
+ getRoomKey(room: string) {
+ return `${this.path}-room:${room}`
+ }
+
+ async extendSessionTTL(sessionId: string) {
+ const key = this.getSessionKey(sessionId)
+ await this.redisClient?.setExpiry(key, SocketSessionTTL)
+ }
+
+ // Gets an array of all redis keys of users inside a certain room
+ async getRoomSessionIds(room: string): Promise {
+ const keys = await this.redisClient?.get(this.getRoomKey(room))
+ return keys || []
+ }
+
+ // Sets the list of redis keys for users inside a certain room.
+ // There is no TTL on the actual room key map itself.
+ async setRoomSessionIds(room: string, ids: string[]) {
+ await this.redisClient?.store(this.getRoomKey(room), ids)
+ }
+
+ // Gets a list of all users inside a certain room
+ async getRoomSessions(room?: string): Promise {
+ if (room) {
+ const sessionIds = await this.getRoomSessionIds(room)
+ const keys = sessionIds.map(this.getSessionKey.bind(this))
+ const sessions = await this.redisClient?.bulkGet(keys)
+ return Object.values(sessions || {})
+ } else {
+ return []
+ }
+ }
+
+ // Detects keys which have been pruned from redis due to TTL expiry in a certain
+ // room and broadcasts disconnection messages to ensure clients are aware
+ async pruneRoom(room: string) {
+ const sessionIds = await this.getRoomSessionIds(room)
+ const sessionsExist = await Promise.all(
+ sessionIds.map(id => this.redisClient?.exists(this.getSessionKey(id)))
+ )
+ const prunedSessionIds = sessionIds.filter((id, idx) => {
+ if (!sessionsExist[idx]) {
+ this.io.to(room).emit(SocketEvent.UserDisconnect, sessionIds[idx])
+ return false
+ }
+ return true
+ })
+
+ // Store new pruned keys
+ await this.setRoomSessionIds(room, prunedSessionIds)
+ }
+
+ // Adds a user to a certain room
+ async joinRoom(socket: Socket, room: string) {
+ if (!room) {
+ return
+ }
+ // Prune room before joining
+ await this.pruneRoom(room)
+
+ // Check if we're already in a room, as we'll need to leave if we are before we
+ // can join a different room
+ const oldRoom = socket.data.room
+ if (oldRoom && oldRoom !== room) {
+ await this.leaveRoom(socket)
+ }
+
+ // Join new room
+ if (!oldRoom || oldRoom !== room) {
+ socket.join(room)
+ socket.data.room = room
+ }
+
+ // Store in redis
+ // @ts-ignore
+ let user: SocketSession = socket.data
+ const { sessionId } = user
+ const key = this.getSessionKey(sessionId)
+ await this.redisClient?.store(key, user, SocketSessionTTL)
+ const sessionIds = await this.getRoomSessionIds(room)
+ if (!sessionIds.includes(sessionId)) {
+ await this.setRoomSessionIds(room, [...sessionIds, sessionId])
+ }
+
+ // Notify other users
+ socket.to(room).emit(SocketEvent.UserUpdate, user)
+ }
+
+ // Disconnects a socket from its current room
+ async leaveRoom(socket: Socket) {
+ // @ts-ignore
+ let user: SocketSession = socket.data
+ const { room, sessionId } = user
+ if (!room) {
+ return
+ }
+
+ // Leave room
+ socket.leave(room)
+ socket.data.room = undefined
+
+ // Delete from redis
+ const key = this.getSessionKey(sessionId)
+ await this.redisClient?.delete(key)
+ const sessionIds = await this.getRoomSessionIds(room)
+ await this.setRoomSessionIds(
+ room,
+ sessionIds.filter(id => id !== sessionId)
+ )
+
+ // Notify other users
+ socket.to(room).emit(SocketEvent.UserDisconnect, sessionId)
+ }
+
+ // Updates a connected user's metadata, assuming a room change is not required.
+ async updateUser(socket: Socket, patch: Object) {
+ socket.data = {
+ ...socket.data,
+ ...patch,
+ }
+
+ // If we're in a room, notify others of this change and update redis
+ if (socket.data.room) {
+ await this.joinRoom(socket, socket.data.room)
+ }
+ }
+
+ async onConnect(socket: Socket) {
+ // Override
+ }
+
+ async onDisconnect(socket: Socket) {
+ // Override
}
// Emit an event to all sockets
diff --git a/packages/shared-core/src/constants.ts b/packages/shared-core/src/constants.ts
index 35b02d1e15..5802a6d88a 100644
--- a/packages/shared-core/src/constants.ts
+++ b/packages/shared-core/src/constants.ts
@@ -67,3 +67,24 @@ export const SqlNumberTypeRangeMap = {
min: -8388608,
},
}
+
+export enum SocketEvent {
+ UserUpdate = "UserUpdate",
+ UserDisconnect = "UserDisconnect",
+ Heartbeat = "Heartbeat",
+}
+
+export enum GridSocketEvent {
+ RowChange = "RowChange",
+ TableChange = "TableChange",
+ SelectTable = "SelectTable",
+ SelectCell = "SelectCell",
+}
+
+export enum BuilderSocketEvent {
+ SelectApp = "SelectApp",
+ TableChange = "TableChange",
+ DatasourceChange = "DatasourceChange",
+}
+
+export const SocketSessionTTL = 60
diff --git a/packages/types/src/sdk/index.ts b/packages/types/src/sdk/index.ts
index ed44c13667..49d0387a82 100644
--- a/packages/types/src/sdk/index.ts
+++ b/packages/types/src/sdk/index.ts
@@ -17,3 +17,4 @@ export * from "./auditLogs"
export * from "./sso"
export * from "./user"
export * from "./cli"
+export * from "./websocket"
diff --git a/packages/types/src/sdk/websocket.ts b/packages/types/src/sdk/websocket.ts
new file mode 100644
index 0000000000..4fa7e155d6
--- /dev/null
+++ b/packages/types/src/sdk/websocket.ts
@@ -0,0 +1,8 @@
+export interface SocketSession {
+ _id: string
+ email: string
+ firstName?: string
+ lastName?: string
+ sessionId: string
+ room?: string
+}
diff --git a/qa-core/src/internal-api/fixtures/datasources.ts b/qa-core/src/internal-api/fixtures/datasources.ts
index 4797f86631..c7f969a98c 100644
--- a/qa-core/src/internal-api/fixtures/datasources.ts
+++ b/qa-core/src/internal-api/fixtures/datasources.ts
@@ -1,4 +1,7 @@
+import { Datasource } from "@budibase/types"
import { DatasourceRequest } from "../../types"
+import { generator } from "../../shared"
+
// Add information about the data source to the fixtures file from 1password
export const mongoDB = (): DatasourceRequest => {
return {
@@ -70,3 +73,50 @@ export const restAPI = (): DatasourceRequest => {
fetchSchema: false,
}
}
+
+export const generateRelationshipForMySQL = (
+ updatedDataSourceJson: any
+): Datasource => {
+ const entities = updatedDataSourceJson!.datasource!.entities!
+ const datasourceId = updatedDataSourceJson!.datasource!._id!
+ const relationShipBody = {
+ ...updatedDataSourceJson.datasource,
+ entities: {
+ ...updatedDataSourceJson.datasource.entities,
+ employees: {
+ ...entities.employees,
+ schema: {
+ ...entities.employees.schema,
+ salaries: {
+ tableId: `${datasourceId}__salaries`,
+ name: "salaries",
+ relationshipType: "many-to-one",
+ fieldName: "salary",
+ type: "link",
+ main: true,
+ _id: generator.string(),
+ foreignKey: "emp_no",
+ },
+ },
+ },
+ titles: {
+ ...entities.titles,
+ schema: {
+ ...entities.titles.schema,
+ employees: {
+ tableId: `${datasourceId}__employees`,
+ name: "employees",
+ relationshipType: "one-to-many",
+ fieldName: "emp_no",
+ type: "link",
+ main: true,
+ _id: generator.string(),
+ foreignKey: "emp_no",
+ },
+ },
+ },
+ },
+ }
+
+ return relationShipBody
+}
diff --git a/qa-core/src/internal-api/tests/dataSources/mariaDB.integration.spec.ts b/qa-core/src/internal-api/tests/dataSources/mariaDB.integration.spec.ts
index 3f479136f6..24c9cea94b 100644
--- a/qa-core/src/internal-api/tests/dataSources/mariaDB.integration.spec.ts
+++ b/qa-core/src/internal-api/tests/dataSources/mariaDB.integration.spec.ts
@@ -66,4 +66,41 @@ describe("Internal API - Data Sources: MariaDB", () => {
updatedDataSourceJson.datasource._rev!
)
})
+
+ it("Create a relationship", async () => {
+ // Create app
+ await config.createApp()
+
+ // Get all integrations
+ await config.api.integrations.getAll()
+
+ // Add data source
+ const [dataSourceResponse, dataSourceJson] =
+ await config.api.datasources.add(fixtures.datasources.mariaDB())
+
+ // Update data source
+ const newDataSourceInfo = {
+ ...dataSourceJson.datasource,
+ name: "MariaDB2",
+ }
+ const [updatedDataSourceResponse, updatedDataSourceJson] =
+ await config.api.datasources.update(newDataSourceInfo)
+
+ // Query data source
+ const [queryResponse, queryJson] = await config.api.queries.preview(
+ fixtures.queries.mariaDB(updatedDataSourceJson.datasource._id!)
+ )
+
+ expect(queryJson.rows.length).toBeGreaterThan(9)
+ expect(queryJson.schemaFields).toEqual(
+ fixtures.queries.expectedSchemaFields.mariaDB
+ )
+
+ // Add relationship
+ const relationShipBody = fixtures.datasources.generateRelationshipForMySQL(
+ updatedDataSourceJson
+ )
+ const [relationshipResponse, relationshipJson] =
+ await config.api.datasources.update(relationShipBody)
+ })
})
diff --git a/qa-core/src/internal-api/tests/dataSources/postgresSQL.integration.spec.ts b/qa-core/src/internal-api/tests/dataSources/postgresSQL.integration.spec.ts
index ccfcce6f55..2aabd608bc 100644
--- a/qa-core/src/internal-api/tests/dataSources/postgresSQL.integration.spec.ts
+++ b/qa-core/src/internal-api/tests/dataSources/postgresSQL.integration.spec.ts
@@ -37,7 +37,7 @@ describe("Internal API - Data Sources: PostgresSQL", () => {
fixtures.queries.postgres(updatedDataSourceJson.datasource._id!)
)
- expect(queryJson.rows.length).toEqual(91)
+ expect(queryJson.rows.length).toBeGreaterThan(10)
expect(queryJson.schemaFields).toEqual(
fixtures.queries.expectedSchemaFields.postgres
)