Merge pull request #13197 from Budibase/BUDI-8046/redis-bulk-store
Create Redis bulk store utils
This commit is contained in:
commit
c19a852ee6
|
@ -67,7 +67,7 @@
|
||||||
"@types/lodash": "4.14.200",
|
"@types/lodash": "4.14.200",
|
||||||
"@types/node-fetch": "2.6.4",
|
"@types/node-fetch": "2.6.4",
|
||||||
"@types/pouchdb": "6.4.0",
|
"@types/pouchdb": "6.4.0",
|
||||||
"@types/redlock": "4.0.3",
|
"@types/redlock": "4.0.7",
|
||||||
"@types/semver": "7.3.7",
|
"@types/semver": "7.3.7",
|
||||||
"@types/tar-fs": "2.0.1",
|
"@types/tar-fs": "2.0.1",
|
||||||
"@types/uuid": "8.3.4",
|
"@types/uuid": "8.3.4",
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
import env from "../environment"
|
import env from "../environment"
|
||||||
import Redis from "ioredis"
|
import Redis, { Cluster } from "ioredis"
|
||||||
// mock-redis doesn't have any typing
|
// mock-redis doesn't have any typing
|
||||||
let MockRedis: any | undefined
|
let MockRedis: any | undefined
|
||||||
if (env.MOCK_REDIS) {
|
if (env.MOCK_REDIS) {
|
||||||
|
@ -28,7 +28,7 @@ const DEFAULT_SELECT_DB = SelectableDatabase.DEFAULT
|
||||||
|
|
||||||
// for testing just generate the client once
|
// for testing just generate the client once
|
||||||
let CLOSED = false
|
let CLOSED = false
|
||||||
let CLIENTS: { [key: number]: any } = {}
|
const CLIENTS: Record<number, Redis> = {}
|
||||||
let CONNECTED = false
|
let CONNECTED = false
|
||||||
|
|
||||||
// mock redis always connected
|
// mock redis always connected
|
||||||
|
@ -36,7 +36,7 @@ if (env.MOCK_REDIS) {
|
||||||
CONNECTED = true
|
CONNECTED = true
|
||||||
}
|
}
|
||||||
|
|
||||||
function pickClient(selectDb: number): any {
|
function pickClient(selectDb: number) {
|
||||||
return CLIENTS[selectDb]
|
return CLIENTS[selectDb]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,12 +201,15 @@ class RedisWrapper {
|
||||||
key = `${db}${SEPARATOR}${key}`
|
key = `${db}${SEPARATOR}${key}`
|
||||||
let stream
|
let stream
|
||||||
if (CLUSTERED) {
|
if (CLUSTERED) {
|
||||||
let node = this.getClient().nodes("master")
|
let node = (this.getClient() as never as Cluster).nodes("master")
|
||||||
stream = node[0].scanStream({ match: key + "*", count: 100 })
|
stream = node[0].scanStream({ match: key + "*", count: 100 })
|
||||||
} else {
|
} else {
|
||||||
stream = this.getClient().scanStream({ match: key + "*", count: 100 })
|
stream = (this.getClient() as Redis).scanStream({
|
||||||
|
match: key + "*",
|
||||||
|
count: 100,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return promisifyStream(stream, this.getClient())
|
return promisifyStream(stream, this.getClient() as any)
|
||||||
}
|
}
|
||||||
|
|
||||||
async keys(pattern: string) {
|
async keys(pattern: string) {
|
||||||
|
@ -221,14 +224,16 @@ class RedisWrapper {
|
||||||
|
|
||||||
async get(key: string) {
|
async get(key: string) {
|
||||||
const db = this._db
|
const db = this._db
|
||||||
let response = await this.getClient().get(addDbPrefix(db, key))
|
const response = await this.getClient().get(addDbPrefix(db, key))
|
||||||
// overwrite the prefixed key
|
// overwrite the prefixed key
|
||||||
|
// @ts-ignore
|
||||||
if (response != null && response.key) {
|
if (response != null && response.key) {
|
||||||
|
// @ts-ignore
|
||||||
response.key = key
|
response.key = key
|
||||||
}
|
}
|
||||||
// if its not an object just return the response
|
// if its not an object just return the response
|
||||||
try {
|
try {
|
||||||
return JSON.parse(response)
|
return JSON.parse(response!)
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
return response
|
return response
|
||||||
}
|
}
|
||||||
|
@ -274,13 +279,37 @@ class RedisWrapper {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async bulkStore(
|
||||||
|
data: Record<string, any>,
|
||||||
|
expirySeconds: number | null = null
|
||||||
|
) {
|
||||||
|
const client = this.getClient()
|
||||||
|
|
||||||
|
const dataToStore = Object.entries(data).reduce((acc, [key, value]) => {
|
||||||
|
acc[addDbPrefix(this._db, key)] =
|
||||||
|
typeof value === "object" ? JSON.stringify(value) : value
|
||||||
|
return acc
|
||||||
|
}, {} as Record<string, any>)
|
||||||
|
|
||||||
|
const pipeline = client.pipeline()
|
||||||
|
pipeline.mset(dataToStore)
|
||||||
|
|
||||||
|
if (expirySeconds !== null) {
|
||||||
|
for (const key of Object.keys(dataToStore)) {
|
||||||
|
pipeline.expire(key, expirySeconds)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await pipeline.exec()
|
||||||
|
}
|
||||||
|
|
||||||
async getTTL(key: string) {
|
async getTTL(key: string) {
|
||||||
const db = this._db
|
const db = this._db
|
||||||
const prefixedKey = addDbPrefix(db, key)
|
const prefixedKey = addDbPrefix(db, key)
|
||||||
return this.getClient().ttl(prefixedKey)
|
return this.getClient().ttl(prefixedKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
async setExpiry(key: string, expirySeconds: number | null) {
|
async setExpiry(key: string, expirySeconds: number) {
|
||||||
const db = this._db
|
const db = this._db
|
||||||
const prefixedKey = addDbPrefix(db, key)
|
const prefixedKey = addDbPrefix(db, key)
|
||||||
await this.getClient().expire(prefixedKey, expirySeconds)
|
await this.getClient().expire(prefixedKey, expirySeconds)
|
||||||
|
|
|
@ -72,7 +72,7 @@ const OPTIONS: Record<keyof typeof LockType, Redlock.Options> = {
|
||||||
export async function newRedlock(opts: Redlock.Options = {}) {
|
export async function newRedlock(opts: Redlock.Options = {}) {
|
||||||
const options = { ...OPTIONS.DEFAULT, ...opts }
|
const options = { ...OPTIONS.DEFAULT, ...opts }
|
||||||
const redisWrapper = await getLockClient()
|
const redisWrapper = await getLockClient()
|
||||||
const client = redisWrapper.getClient()
|
const client = redisWrapper.getClient() as any
|
||||||
return new Redlock([client], options)
|
return new Redlock([client], options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,110 @@
|
||||||
|
import { generator, structures } from "../../../tests"
|
||||||
|
import RedisWrapper from "../redis"
|
||||||
|
|
||||||
|
describe("redis", () => {
|
||||||
|
let redis: RedisWrapper
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
redis = new RedisWrapper(structures.db.id())
|
||||||
|
await redis.init()
|
||||||
|
})
|
||||||
|
|
||||||
|
describe("store", () => {
|
||||||
|
it("a basic value can be persisted", async () => {
|
||||||
|
const key = structures.uuid()
|
||||||
|
const value = generator.word()
|
||||||
|
|
||||||
|
await redis.store(key, value)
|
||||||
|
|
||||||
|
expect(await redis.get(key)).toEqual(value)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("objects can be persisted", async () => {
|
||||||
|
const key = structures.uuid()
|
||||||
|
const value = { [generator.word()]: generator.word() }
|
||||||
|
|
||||||
|
await redis.store(key, value)
|
||||||
|
|
||||||
|
expect(await redis.get(key)).toEqual(value)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe("bulkStore", () => {
|
||||||
|
function createRandomObject(
|
||||||
|
keyLength: number,
|
||||||
|
valueGenerator: () => any = () => generator.word()
|
||||||
|
) {
|
||||||
|
return generator
|
||||||
|
.unique(() => generator.word(), keyLength)
|
||||||
|
.reduce((acc, key) => {
|
||||||
|
acc[key] = valueGenerator()
|
||||||
|
return acc
|
||||||
|
}, {} as Record<string, string>)
|
||||||
|
}
|
||||||
|
|
||||||
|
it("a basic object can be persisted", async () => {
|
||||||
|
const data = createRandomObject(10)
|
||||||
|
|
||||||
|
await redis.bulkStore(data)
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
expect(await redis.get(key)).toEqual(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(await redis.keys("*")).toHaveLength(10)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("a complex object can be persisted", async () => {
|
||||||
|
const data = {
|
||||||
|
...createRandomObject(10, () => createRandomObject(5)),
|
||||||
|
...createRandomObject(5),
|
||||||
|
}
|
||||||
|
|
||||||
|
await redis.bulkStore(data)
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
expect(await redis.get(key)).toEqual(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(await redis.keys("*")).toHaveLength(15)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("no TTL is set by default", async () => {
|
||||||
|
const data = createRandomObject(10)
|
||||||
|
|
||||||
|
await redis.bulkStore(data)
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
expect(await redis.get(key)).toEqual(value)
|
||||||
|
expect(await redis.getTTL(key)).toEqual(-1)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
it("a bulk store can be persisted with TTL", async () => {
|
||||||
|
const ttl = 500
|
||||||
|
const data = createRandomObject(8)
|
||||||
|
|
||||||
|
await redis.bulkStore(data, ttl)
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
expect(await redis.get(key)).toEqual(value)
|
||||||
|
expect(await redis.getTTL(key)).toEqual(ttl)
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(await redis.keys("*")).toHaveLength(8)
|
||||||
|
})
|
||||||
|
|
||||||
|
it("setting a TTL of -1 will not persist the key", async () => {
|
||||||
|
const ttl = -1
|
||||||
|
const data = createRandomObject(5)
|
||||||
|
|
||||||
|
await redis.bulkStore(data, ttl)
|
||||||
|
|
||||||
|
for (const [key, value] of Object.entries(data)) {
|
||||||
|
expect(await redis.get(key)).toBe(null)
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(await redis.keys("*")).toHaveLength(0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
|
@ -18,7 +18,7 @@
|
||||||
"@budibase/nano": "10.1.5",
|
"@budibase/nano": "10.1.5",
|
||||||
"@types/koa": "2.13.4",
|
"@types/koa": "2.13.4",
|
||||||
"@types/pouchdb": "6.4.0",
|
"@types/pouchdb": "6.4.0",
|
||||||
"@types/redlock": "4.0.3",
|
"@types/redlock": "4.0.7",
|
||||||
"rimraf": "3.0.2",
|
"rimraf": "3.0.2",
|
||||||
"typescript": "5.2.2"
|
"typescript": "5.2.2"
|
||||||
},
|
},
|
||||||
|
|
11
yarn.lock
11
yarn.lock
|
@ -5408,7 +5408,7 @@
|
||||||
resolved "https://registry.yarnpkg.com/@types/http-errors/-/http-errors-2.0.1.tgz#20172f9578b225f6c7da63446f56d4ce108d5a65"
|
resolved "https://registry.yarnpkg.com/@types/http-errors/-/http-errors-2.0.1.tgz#20172f9578b225f6c7da63446f56d4ce108d5a65"
|
||||||
integrity sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==
|
integrity sha512-/K3ds8TRAfBvi5vfjuz8y6+GiAYBZ0x4tXv1Av6CWBWn0IlADc+ZX9pMq7oU0fNQPnBwIZl3rmeLp6SBApbxSQ==
|
||||||
|
|
||||||
"@types/ioredis@4.28.10":
|
"@types/ioredis@4.28.10", "@types/ioredis@^4.28.10":
|
||||||
version "4.28.10"
|
version "4.28.10"
|
||||||
resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.10.tgz#40ceb157a4141088d1394bb87c98ed09a75a06ff"
|
resolved "https://registry.yarnpkg.com/@types/ioredis/-/ioredis-4.28.10.tgz#40ceb157a4141088d1394bb87c98ed09a75a06ff"
|
||||||
integrity sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==
|
integrity sha512-69LyhUgrXdgcNDv7ogs1qXZomnfOEnSmrmMFqKgt1XMJxmoOSG/u3wYy13yACIfKuMJ8IhKgHafDO3sx19zVQQ==
|
||||||
|
@ -5896,12 +5896,13 @@
|
||||||
dependencies:
|
dependencies:
|
||||||
"@types/node" "*"
|
"@types/node" "*"
|
||||||
|
|
||||||
"@types/redlock@4.0.3":
|
"@types/redlock@4.0.7":
|
||||||
version "4.0.3"
|
version "4.0.7"
|
||||||
resolved "https://registry.yarnpkg.com/@types/redlock/-/redlock-4.0.3.tgz#aeab5fe5f0d433a125f6dcf9a884372ac0cddd4b"
|
resolved "https://registry.yarnpkg.com/@types/redlock/-/redlock-4.0.7.tgz#33ed56f22a38d6b2f2e6ae5ed1b3fc1875a08e6b"
|
||||||
integrity sha512-mcvvrquwREbAqyZALNBIlf49AL9Aa324BG+J/Dv4TAP8g+nxQMBI4/APNqqS99QEY7VTNT9XvsaczCVGK8uNnQ==
|
integrity sha512-5D6egBv0fCfdbmnCETjEynVuiwFMEFFc3YFjh9EwhaaVTAi0YmB6UI1swq1S1rjIu+n27ppmlTFDK3D3cadJqg==
|
||||||
dependencies:
|
dependencies:
|
||||||
"@types/bluebird" "*"
|
"@types/bluebird" "*"
|
||||||
|
"@types/ioredis" "^4.28.10"
|
||||||
"@types/redis" "^2.8.0"
|
"@types/redis" "^2.8.0"
|
||||||
|
|
||||||
"@types/request@^2.48.7":
|
"@types/request@^2.48.7":
|
||||||
|
|
Loading…
Reference in New Issue