2021-05-05 18:49:34 +02:00
|
|
|
const env = require("../environment")
|
|
|
|
// ioredis mock is all in memory
|
|
|
|
const Redis = env.isTest() ? require("ioredis-mock") : require("ioredis")
|
2021-05-04 19:13:44 +02:00
|
|
|
const { addDbPrefix, removeDbPrefix, getRedisOptions } = require("./utils")
|
2021-04-27 18:29:05 +02:00
|
|
|
|
|
|
|
const CLUSTERED = false
|
|
|
|
|
2021-05-05 18:49:34 +02:00
|
|
|
// for testing just generate the client once
|
2021-05-21 14:07:10 +02:00
|
|
|
let CONNECTED = false
|
2021-05-05 18:49:34 +02:00
|
|
|
let CLIENT = env.isTest() ? new Redis(getRedisOptions()) : null
|
2021-04-27 18:29:05 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Inits the system, will error if unable to connect to redis cluster (may take up to 10 seconds) otherwise
|
|
|
|
* will return the ioredis client which will be ready to use.
|
|
|
|
* @return {Promise<object>} The ioredis client.
|
|
|
|
*/
|
|
|
|
function init() {
|
|
|
|
return new Promise((resolve, reject) => {
|
2021-05-05 18:49:34 +02:00
|
|
|
// testing uses a single in memory client
|
|
|
|
if (env.isTest()) {
|
|
|
|
return resolve(CLIENT)
|
|
|
|
}
|
2021-05-05 13:11:06 +02:00
|
|
|
// if a connection existed, close it and re-create it
|
2021-05-21 14:07:10 +02:00
|
|
|
if (CLIENT && CONNECTED) {
|
|
|
|
return CLIENT
|
2021-05-05 13:11:06 +02:00
|
|
|
}
|
2021-05-04 19:13:44 +02:00
|
|
|
const { opts, host, port } = getRedisOptions(CLUSTERED)
|
2021-04-27 18:29:05 +02:00
|
|
|
if (CLUSTERED) {
|
2021-05-04 19:13:44 +02:00
|
|
|
CLIENT = new Redis.Cluster([{ host, port }], opts)
|
2021-04-27 18:29:05 +02:00
|
|
|
} else {
|
|
|
|
CLIENT = new Redis(opts)
|
|
|
|
}
|
|
|
|
CLIENT.on("end", err => {
|
|
|
|
reject(err)
|
2021-05-21 14:07:10 +02:00
|
|
|
CONNECTED = false
|
2021-04-27 18:29:05 +02:00
|
|
|
})
|
|
|
|
CLIENT.on("error", err => {
|
|
|
|
reject(err)
|
2021-05-21 14:07:10 +02:00
|
|
|
CONNECTED = false
|
2021-04-27 18:29:05 +02:00
|
|
|
})
|
|
|
|
CLIENT.on("connect", () => {
|
|
|
|
resolve(CLIENT)
|
2021-05-21 14:07:10 +02:00
|
|
|
CONNECTED = true
|
2021-04-27 18:29:05 +02:00
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Utility function, takes a redis stream and converts it to a promisified response -
|
|
|
|
* this can only be done with redis streams because they will have an end.
|
|
|
|
* @param stream A redis stream, specifically as this type of stream will have an end.
|
|
|
|
* @return {Promise<object>} The final output of the stream
|
|
|
|
*/
|
|
|
|
function promisifyStream(stream) {
|
|
|
|
return new Promise((resolve, reject) => {
|
|
|
|
const outputKeys = new Set()
|
|
|
|
stream.on("data", keys => {
|
|
|
|
keys.forEach(key => {
|
|
|
|
outputKeys.add(key)
|
|
|
|
})
|
|
|
|
})
|
2021-04-27 18:30:19 +02:00
|
|
|
stream.on("error", err => {
|
2021-04-27 18:29:05 +02:00
|
|
|
reject(err)
|
|
|
|
})
|
|
|
|
stream.on("end", async () => {
|
|
|
|
const keysArray = Array.from(outputKeys)
|
|
|
|
try {
|
|
|
|
let getPromises = []
|
|
|
|
for (let key of keysArray) {
|
|
|
|
getPromises.push(CLIENT.get(key))
|
|
|
|
}
|
|
|
|
const jsonArray = await Promise.all(getPromises)
|
2021-04-27 18:30:19 +02:00
|
|
|
resolve(
|
|
|
|
keysArray.map(key => ({
|
|
|
|
key: removeDbPrefix(key),
|
|
|
|
value: JSON.parse(jsonArray.shift()),
|
|
|
|
}))
|
|
|
|
)
|
2021-04-27 18:29:05 +02:00
|
|
|
} catch (err) {
|
|
|
|
reject(err)
|
|
|
|
}
|
|
|
|
})
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
class RedisWrapper {
|
|
|
|
constructor(db) {
|
|
|
|
this._db = db
|
|
|
|
}
|
|
|
|
|
|
|
|
async init() {
|
|
|
|
this._client = await init()
|
|
|
|
return this
|
|
|
|
}
|
|
|
|
|
2021-05-05 13:11:06 +02:00
|
|
|
async finish() {
|
|
|
|
this._client.disconnect()
|
|
|
|
}
|
|
|
|
|
2021-04-27 18:29:05 +02:00
|
|
|
async scan() {
|
2021-04-27 18:30:19 +02:00
|
|
|
const db = this._db,
|
|
|
|
client = this._client
|
2021-04-27 18:29:05 +02:00
|
|
|
let stream
|
|
|
|
if (CLUSTERED) {
|
|
|
|
let node = client.nodes("master")
|
2021-04-27 18:30:19 +02:00
|
|
|
stream = node[0].scanStream({ match: db + "-*", count: 100 })
|
2021-04-27 18:29:05 +02:00
|
|
|
} else {
|
2021-04-27 18:30:19 +02:00
|
|
|
stream = client.scanStream({ match: db + "-*", count: 100 })
|
2021-04-27 18:29:05 +02:00
|
|
|
}
|
|
|
|
return promisifyStream(stream)
|
|
|
|
}
|
|
|
|
|
|
|
|
async get(key) {
|
2021-04-27 18:30:19 +02:00
|
|
|
const db = this._db,
|
|
|
|
client = this._client
|
2021-04-27 18:29:05 +02:00
|
|
|
let response = await client.get(addDbPrefix(db, key))
|
|
|
|
// overwrite the prefixed key
|
|
|
|
if (response != null && response.key) {
|
|
|
|
response.key = key
|
|
|
|
}
|
2021-05-05 18:49:34 +02:00
|
|
|
// if its not an object just return the response
|
|
|
|
try {
|
|
|
|
return JSON.parse(response)
|
|
|
|
} catch (err) {
|
|
|
|
return response
|
|
|
|
}
|
2021-04-27 18:29:05 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
async store(key, value, expirySeconds = null) {
|
2021-04-27 18:30:19 +02:00
|
|
|
const db = this._db,
|
|
|
|
client = this._client
|
|
|
|
if (typeof value === "object") {
|
2021-04-27 18:29:05 +02:00
|
|
|
value = JSON.stringify(value)
|
|
|
|
}
|
|
|
|
const prefixedKey = addDbPrefix(db, key)
|
|
|
|
await client.set(prefixedKey, value)
|
|
|
|
if (expirySeconds) {
|
|
|
|
await client.expire(prefixedKey, expirySeconds)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async delete(key) {
|
2021-04-27 18:30:19 +02:00
|
|
|
const db = this._db,
|
|
|
|
client = this._client
|
2021-04-27 18:29:05 +02:00
|
|
|
await client.del(addDbPrefix(db, key))
|
|
|
|
}
|
|
|
|
|
|
|
|
async clear() {
|
2021-05-13 15:34:04 +02:00
|
|
|
let items = await this.scan()
|
|
|
|
await Promise.all(items.map(obj => this.delete(obj.key)))
|
2021-04-27 18:29:05 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = RedisWrapper
|