Merge branch 'master' into feat/BUDI-8046

This commit is contained in:
Adria Navarro 2024-03-07 14:34:14 +01:00
commit d3b9739396
23 changed files with 639 additions and 37 deletions

View File

@ -1,5 +1,5 @@
{ {
"version": "2.21.3", "version": "2.21.4",
"npmClient": "yarn", "npmClient": "yarn",
"packages": [ "packages": [
"packages/*", "packages/*",

View File

@ -23,6 +23,18 @@ export default class BaseCache {
return client.keys(pattern) return client.keys(pattern)
} }
async exists(key: string, opts = { useTenancy: true }) {
key = opts.useTenancy ? generateTenantKey(key) : key
const client = await this.getClient()
return client.exists(key)
}
async scan(key: string, opts = { useTenancy: true }) {
key = opts.useTenancy ? generateTenantKey(key) : key
const client = await this.getClient()
return client.scan(key)
}
/** /**
* Read only from the cache. * Read only from the cache.
*/ */
@ -32,6 +44,15 @@ export default class BaseCache {
return client.get(key) return client.get(key)
} }
/**
* Read only from the cache.
*/
async bulkGet<T>(keys: string[], opts = { useTenancy: true }) {
keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys
const client = await this.getClient()
return client.bulkGet<T>(keys)
}
/** /**
* Write to the cache. * Write to the cache.
*/ */
@ -46,6 +67,25 @@ export default class BaseCache {
await client.store(key, value, ttl) await client.store(key, value, ttl)
} }
/**
* Bulk write to the cache.
*/
async bulkStore(
data: Record<string, any>,
ttl: number | null = null,
opts = { useTenancy: true }
) {
if (opts.useTenancy) {
data = Object.entries(data).reduce((acc, [key, value]) => {
acc[generateTenantKey(key)] = value
return acc
}, {} as Record<string, any>)
}
const client = await this.getClient()
await client.bulkStore(data, ttl)
}
/** /**
* Remove from cache. * Remove from cache.
*/ */
@ -55,15 +95,24 @@ export default class BaseCache {
return client.delete(key) return client.delete(key)
} }
/**
* Remove from cache.
*/
async bulkDelete(keys: string[], opts = { useTenancy: true }) {
keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys
const client = await this.getClient()
return client.bulkDelete(keys)
}
/** /**
* Read from the cache. Write to the cache if not exists. * Read from the cache. Write to the cache if not exists.
*/ */
async withCache( async withCache<T>(
key: string, key: string,
ttl: number, ttl: number | null = null,
fetchFn: any, fetchFn: () => Promise<T> | T,
opts = { useTenancy: true } opts = { useTenancy: true }
) { ): Promise<T> {
const cachedValue = await this.get(key, opts) const cachedValue = await this.get(key, opts)
if (cachedValue) { if (cachedValue) {
return cachedValue return cachedValue
@ -89,4 +138,13 @@ export default class BaseCache {
throw err throw err
} }
} }
/**
* Delete the entry if the provided value matches the stored one.
*/
async deleteIfValue(key: string, value: any, opts = { useTenancy: true }) {
key = opts.useTenancy ? generateTenantKey(key) : key
const client = await this.getClient()
await client.deleteIfValue(key, value)
}
} }

View File

@ -0,0 +1,86 @@
import { AnyDocument, Database } from "@budibase/types"
import { JobQueue, createQueue } from "../queue"
import * as dbUtils from "../db"
interface ProcessDocMessage {
dbName: string
docId: string
data: Record<string, any>
}
const PERSIST_MAX_ATTEMPTS = 100
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
JobQueue.DOC_WRITETHROUGH_QUEUE,
{
jobOptions: {
attempts: PERSIST_MAX_ATTEMPTS,
},
}
)
class DocWritethroughProcessor {
init() {
docWritethroughProcessorQueue.process(async message => {
try {
await this.persistToDb(message.data)
} catch (err: any) {
if (err.status === 409) {
// If we get a 409, it means that another job updated it meanwhile. We want to retry it to persist it again.
throw new Error(
`Conflict persisting message ${message.id}. Attempt ${message.attemptsMade}`
)
}
throw err
}
})
return this
}
private async persistToDb({
dbName,
docId,
data,
}: {
dbName: string
docId: string
data: Record<string, any>
}) {
const db = dbUtils.getDB(dbName)
let doc: AnyDocument | undefined
try {
doc = await db.get(docId)
} catch {
doc = { _id: docId }
}
doc = { ...doc, ...data }
await db.put(doc)
}
}
export const processor = new DocWritethroughProcessor().init()
export class DocWritethrough {
private db: Database
private _docId: string
constructor(db: Database, docId: string) {
this.db = db
this._docId = docId
}
get docId() {
return this._docId
}
async patch(data: Record<string, any>) {
await docWritethroughProcessorQueue.add({
dbName: this.db.name,
docId: this.docId,
data,
})
}
}

View File

@ -26,7 +26,8 @@ export const store = (...args: Parameters<typeof GENERIC.store>) =>
GENERIC.store(...args) GENERIC.store(...args)
export const destroy = (...args: Parameters<typeof GENERIC.delete>) => export const destroy = (...args: Parameters<typeof GENERIC.delete>) =>
GENERIC.delete(...args) GENERIC.delete(...args)
export const withCache = (...args: Parameters<typeof GENERIC.withCache>) => export const withCache = <T>(
GENERIC.withCache(...args) ...args: Parameters<typeof GENERIC.withCache<T>>
) => GENERIC.withCache(...args)
export const bustCache = (...args: Parameters<typeof GENERIC.bustCache>) => export const bustCache = (...args: Parameters<typeof GENERIC.bustCache>) =>
GENERIC.bustCache(...args) GENERIC.bustCache(...args)

View File

@ -5,3 +5,4 @@ export * as writethrough from "./writethrough"
export * as invite from "./invite" export * as invite from "./invite"
export * as passwordReset from "./passwordReset" export * as passwordReset from "./passwordReset"
export * from "./generic" export * from "./generic"
export * as docWritethrough from "./docWritethrough"

View File

@ -0,0 +1,288 @@
import tk from "timekeeper"
import _ from "lodash"
import { DBTestConfiguration, generator, structures } from "../../../tests"
import { getDB } from "../../db"
import {
DocWritethrough,
docWritethroughProcessorQueue,
} from "../docWritethrough"
import InMemoryQueue from "../../queue/inMemoryQueue"
const initialTime = Date.now()
async function waitForQueueCompletion() {
const queue: InMemoryQueue = docWritethroughProcessorQueue as never
await queue.waitForCompletion()
}
describe("docWritethrough", () => {
const config = new DBTestConfiguration()
const db = getDB(structures.db.id())
let documentId: string
let docWritethrough: DocWritethrough
describe("patch", () => {
function generatePatchObject(fieldCount: number) {
const keys = generator.unique(() => generator.word(), fieldCount)
return keys.reduce((acc, c) => {
acc[c] = generator.word()
return acc
}, {} as Record<string, any>)
}
beforeEach(async () => {
jest.clearAllMocks()
documentId = structures.uuid()
docWritethrough = new DocWritethrough(db, documentId)
})
it("patching will not persist until the messages are persisted", async () => {
await config.doInTenant(async () => {
await docWritethrough.patch(generatePatchObject(2))
await docWritethrough.patch(generatePatchObject(2))
expect(await db.exists(documentId)).toBe(false)
})
})
it("patching will persist when the messages are persisted", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2)
await waitForQueueCompletion()
// This will not be persisted
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
expect(await db.get(documentId)).toEqual({
_id: documentId,
...patch1,
...patch2,
_rev: expect.stringMatching(/2-.+/),
createdAt: new Date(initialTime).toISOString(),
updatedAt: new Date(initialTime).toISOString(),
})
})
})
it("patching will persist keeping the previous data", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await docWritethrough.patch(patch2)
await waitForQueueCompletion()
const patch3 = generatePatchObject(3)
await docWritethrough.patch(patch3)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
_id: documentId,
...patch1,
...patch2,
...patch3,
})
)
})
})
it("date audit fields are set correctly when persisting", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
const patch2 = generatePatchObject(2)
await docWritethrough.patch(patch1)
const date1 = new Date()
await waitForQueueCompletion()
await docWritethrough.patch(patch2)
tk.travel(Date.now() + 100)
const date2 = new Date()
await waitForQueueCompletion()
expect(date1).not.toEqual(date2)
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
createdAt: date1.toISOString(),
updatedAt: date2.toISOString(),
})
)
})
})
it("concurrent patches will override keys", async () => {
await config.doInTenant(async () => {
const patch1 = generatePatchObject(2)
await docWritethrough.patch(patch1)
await waitForQueueCompletion()
const patch2 = generatePatchObject(1)
await docWritethrough.patch(patch2)
const keyToOverride = _.sample(Object.keys(patch1))!
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
[keyToOverride]: patch1[keyToOverride],
})
)
await waitForQueueCompletion()
const patch3 = {
...generatePatchObject(3),
[keyToOverride]: generator.word(),
}
await docWritethrough.patch(patch3)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({
...patch1,
...patch2,
...patch3,
})
)
})
})
it("concurrent patches to different docWritethrough will not pollute each other", async () => {
await config.doInTenant(async () => {
const secondDocWritethrough = new DocWritethrough(
db,
structures.db.id()
)
const doc1Patch = generatePatchObject(2)
await docWritethrough.patch(doc1Patch)
const doc2Patch = generatePatchObject(1)
await secondDocWritethrough.patch(doc2Patch)
await waitForQueueCompletion()
const doc1Patch2 = generatePatchObject(3)
await docWritethrough.patch(doc1Patch2)
const doc2Patch2 = generatePatchObject(3)
await secondDocWritethrough.patch(doc2Patch2)
await waitForQueueCompletion()
expect(await db.get(docWritethrough.docId)).toEqual(
expect.objectContaining({
...doc1Patch,
...doc1Patch2,
})
)
expect(await db.get(secondDocWritethrough.docId)).toEqual(
expect.objectContaining({
...doc2Patch,
...doc2Patch2,
})
)
})
})
it("cached values are persisted only once", async () => {
await config.doInTenant(async () => {
const initialPatch = generatePatchObject(5)
await docWritethrough.patch(initialPatch)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(initialPatch)
)
await db.remove(await db.get(documentId))
await waitForQueueCompletion()
const extraPatch = generatePatchObject(5)
await docWritethrough.patch(extraPatch)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(extraPatch)
)
expect(await db.get(documentId)).not.toEqual(
expect.objectContaining(initialPatch)
)
})
})
it("concurrent calls will not cause conflicts", async () => {
async function parallelPatch(count: number) {
const patches = Array.from({ length: count }).map(() =>
generatePatchObject(1)
)
await Promise.all(patches.map(p => docWritethrough.patch(p)))
return patches.reduce((acc, c) => {
acc = { ...acc, ...c }
return acc
}, {})
}
const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add")
await config.doInTenant(async () => {
let patches = await parallelPatch(5)
expect(queueMessageSpy).toBeCalledTimes(5)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(patches)
)
patches = { ...patches, ...(await parallelPatch(40)) }
expect(queueMessageSpy).toBeCalledTimes(45)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(patches)
)
patches = { ...patches, ...(await parallelPatch(10)) }
expect(queueMessageSpy).toBeCalledTimes(55)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining(patches)
)
})
})
// This is not yet supported
it.skip("patches will execute in order", async () => {
let incrementalValue = 0
const keyToOverride = generator.word()
async function incrementalPatches(count: number) {
for (let i = 0; i < count; i++) {
await docWritethrough.patch({ [keyToOverride]: incrementalValue++ })
}
}
await config.doInTenant(async () => {
await incrementalPatches(5)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({ [keyToOverride]: 5 })
)
await incrementalPatches(40)
await waitForQueueCompletion()
expect(await db.get(documentId)).toEqual(
expect.objectContaining({ [keyToOverride]: 45 })
)
})
})
})
})

View File

@ -57,6 +57,9 @@ export const StaticDatabases = {
AUDIT_LOGS: { AUDIT_LOGS: {
name: "audit-logs", name: "audit-logs",
}, },
SCIM_LOGS: {
name: "scim-logs",
},
} }
export const APP_PREFIX = prefixed(DocumentType.APP) export const APP_PREFIX = prefixed(DocumentType.APP)

View File

@ -35,6 +35,17 @@ export function getAuditLogDBName(tenantId?: string) {
} }
} }
export function getScimDBName(tenantId?: string) {
if (!tenantId) {
tenantId = getTenantId()
}
if (tenantId === DEFAULT_TENANT_ID) {
return StaticDatabases.SCIM_LOGS.name
} else {
return `${tenantId}${SEPARATOR}${StaticDatabases.SCIM_LOGS.name}`
}
}
export function baseGlobalDBName(tenantId: string | undefined | null) { export function baseGlobalDBName(tenantId: string | undefined | null) {
if (!tenantId || tenantId === DEFAULT_TENANT_ID) { if (!tenantId || tenantId === DEFAULT_TENANT_ID) {
return StaticDatabases.GLOBAL.name return StaticDatabases.GLOBAL.name

View File

@ -70,7 +70,15 @@ export class DatabaseImpl implements Database {
DatabaseImpl.nano = buildNano(couchInfo) DatabaseImpl.nano = buildNano(couchInfo)
} }
async exists() { exists(docId?: string) {
if (docId === undefined) {
return this.dbExists()
}
return this.docExists(docId)
}
private async dbExists() {
const response = await directCouchUrlCall({ const response = await directCouchUrlCall({
url: `${this.couchInfo.url}/${this.name}`, url: `${this.couchInfo.url}/${this.name}`,
method: "HEAD", method: "HEAD",
@ -79,6 +87,15 @@ export class DatabaseImpl implements Database {
return response.status === 200 return response.status === 200
} }
private async docExists(id: string): Promise<boolean> {
try {
await this.performCall(db => () => db.head(id))
return true
} catch {
return false
}
}
private nano() { private nano() {
return this.instanceNano || DatabaseImpl.nano return this.instanceNano || DatabaseImpl.nano
} }

View File

@ -24,9 +24,12 @@ export class DDInstrumentedDatabase implements Database {
return this.db.name return this.db.name
} }
exists(): Promise<boolean> { exists(docId?: string): Promise<boolean> {
return tracer.trace("db.exists", span => { return tracer.trace("db.exists", span => {
span?.addTags({ db_name: this.name }) span?.addTags({ db_name: this.name, doc_id: docId })
if (docId) {
return this.db.exists(docId)
}
return this.db.exists() return this.db.exists()
}) })
} }

View File

@ -0,0 +1,55 @@
import _ from "lodash"
import { AnyDocument } from "@budibase/types"
import { generator } from "../../../tests"
import { DatabaseImpl } from "../couch"
import { newid } from "../../utils"
describe("DatabaseImpl", () => {
const database = new DatabaseImpl(generator.word())
const documents: AnyDocument[] = []
beforeAll(async () => {
const docsToCreate = Array.from({ length: 10 }).map(() => ({
_id: newid(),
}))
const createdDocs = await database.bulkDocs(docsToCreate)
documents.push(...createdDocs.map((x: any) => ({ _id: x.id, _rev: x.rev })))
})
describe("document exists", () => {
it("can check existing docs by id", async () => {
const existingDoc = _.sample(documents)
const result = await database.exists(existingDoc!._id!)
expect(result).toBe(true)
})
it("can check non existing docs by id", async () => {
const result = await database.exists(newid())
expect(result).toBe(false)
})
it("can check an existing doc by id multiple times", async () => {
const existingDoc = _.sample(documents)
const id = existingDoc!._id!
const results = []
results.push(await database.exists(id))
results.push(await database.exists(id))
results.push(await database.exists(id))
expect(results).toEqual([true, true, true])
})
it("returns false after the doc is deleted", async () => {
const existingDoc = _.sample(documents)
const id = existingDoc!._id!
expect(await database.exists(id)).toBe(true)
await database.remove(existingDoc!)
expect(await database.exists(id)).toBe(false)
})
})
})

View File

@ -186,6 +186,7 @@ const environment = {
environment[key] = value environment[key] = value
}, },
ROLLING_LOG_MAX_SIZE: process.env.ROLLING_LOG_MAX_SIZE || "10M", ROLLING_LOG_MAX_SIZE: process.env.ROLLING_LOG_MAX_SIZE || "10M",
DISABLE_SCIM_CALLS: process.env.DISABLE_SCIM_CALLS,
} }
// clean up any environment variable edge cases // clean up any environment variable edge cases

View File

@ -4,4 +4,5 @@ export enum JobQueue {
AUDIT_LOG = "auditLogQueue", AUDIT_LOG = "auditLogQueue",
SYSTEM_EVENT_QUEUE = "systemEventQueue", SYSTEM_EVENT_QUEUE = "systemEventQueue",
APP_MIGRATION = "appMigration", APP_MIGRATION = "appMigration",
DOC_WRITETHROUGH_QUEUE = "docWritethroughQueue",
} }

View File

@ -1,5 +1,14 @@
import events from "events" import events from "events"
import { timeout } from "../utils" import { newid, timeout } from "../utils"
import { Queue, QueueOptions, JobOptions } from "./queue"
interface JobMessage {
id: string
timestamp: number
queue: string
data: any
opts?: JobOptions
}
/** /**
* Bull works with a Job wrapper around all messages that contains a lot more information about * Bull works with a Job wrapper around all messages that contains a lot more information about
@ -10,12 +19,13 @@ import { timeout } from "../utils"
* @returns A new job which can now be put onto the queue, this is mostly an * @returns A new job which can now be put onto the queue, this is mostly an
* internal structure so that an in memory queue can be easily swapped for a Bull queue. * internal structure so that an in memory queue can be easily swapped for a Bull queue.
*/ */
function newJob(queue: string, message: any) { function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
return { return {
id: newid(),
timestamp: Date.now(), timestamp: Date.now(),
queue: queue, queue: queue,
data: message, data: message,
opts: {}, opts,
} }
} }
@ -24,26 +34,29 @@ function newJob(queue: string, message: any) {
* It is relatively simple, using an event emitter internally to register when messages are available * It is relatively simple, using an event emitter internally to register when messages are available
* to the consumers - in can support many inputs and many consumers. * to the consumers - in can support many inputs and many consumers.
*/ */
class InMemoryQueue { class InMemoryQueue implements Partial<Queue> {
_name: string _name: string
_opts?: any _opts?: QueueOptions
_messages: any[] _messages: JobMessage[]
_queuedJobIds: Set<string>
_emitter: EventEmitter _emitter: EventEmitter
_runCount: number _runCount: number
_addCount: number _addCount: number
/** /**
* The constructor the queue, exactly the same as that of Bulls. * The constructor the queue, exactly the same as that of Bulls.
* @param name The name of the queue which is being configured. * @param name The name of the queue which is being configured.
* @param opts This is not used by the in memory queue as there is no real use * @param opts This is not used by the in memory queue as there is no real use
* case when in memory, but is the same API as Bull * case when in memory, but is the same API as Bull
*/ */
constructor(name: string, opts?: any) { constructor(name: string, opts?: QueueOptions) {
this._name = name this._name = name
this._opts = opts this._opts = opts
this._messages = [] this._messages = []
this._emitter = new events.EventEmitter() this._emitter = new events.EventEmitter()
this._runCount = 0 this._runCount = 0
this._addCount = 0 this._addCount = 0
this._queuedJobIds = new Set<string>()
} }
/** /**
@ -55,22 +68,42 @@ class InMemoryQueue {
* note this is incredibly limited compared to Bull as in reality the Job would contain * note this is incredibly limited compared to Bull as in reality the Job would contain
* a lot more information about the queue and current status of Bull cluster. * a lot more information about the queue and current status of Bull cluster.
*/ */
process(func: any) { async process(func: any) {
this._emitter.on("message", async () => { this._emitter.on("message", async () => {
if (this._messages.length <= 0) { if (this._messages.length <= 0) {
return return
} }
let msg = this._messages.shift() let msg = this._messages.shift()
let resp = func(msg) let resp = func(msg)
async function retryFunc(fnc: any) {
try {
await fnc
} catch (e: any) {
await new Promise<void>(r => setTimeout(() => r(), 50))
await retryFunc(func(msg))
}
}
if (resp.then != null) { if (resp.then != null) {
await resp try {
await retryFunc(resp)
} catch (e: any) {
console.error(e)
}
} }
this._runCount++ this._runCount++
const jobId = msg?.opts?.jobId?.toString()
if (jobId && msg?.opts?.removeOnComplete) {
this._queuedJobIds.delete(jobId)
}
}) })
} }
async isReady() { async isReady() {
return true return this as any
} }
// simply puts a message to the queue and emits to the queue for processing // simply puts a message to the queue and emits to the queue for processing
@ -83,27 +116,45 @@ class InMemoryQueue {
* @param repeat serves no purpose for the import queue. * @param repeat serves no purpose for the import queue.
*/ */
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
add(msg: any, repeat: boolean) { async add(data: any, opts?: JobOptions) {
if (typeof msg !== "object") { const jobId = opts?.jobId?.toString()
if (jobId && this._queuedJobIds.has(jobId)) {
console.log(`Ignoring already queued job ${jobId}`)
return
}
if (typeof data !== "object") {
throw "Queue only supports carrying JSON." throw "Queue only supports carrying JSON."
} }
this._messages.push(newJob(this._name, msg)) if (jobId) {
this._addCount++ this._queuedJobIds.add(jobId)
this._emitter.emit("message") }
const pushMessage = () => {
this._messages.push(newJob(this._name, data, opts))
this._addCount++
this._emitter.emit("message")
}
const delay = opts?.delay
if (delay) {
setTimeout(pushMessage, delay)
} else {
pushMessage()
}
return {} as any
} }
/** /**
* replicating the close function from bull, which waits for jobs to finish. * replicating the close function from bull, which waits for jobs to finish.
*/ */
async close() { async close() {}
return []
}
/** /**
* This removes a cron which has been implemented, this is part of Bull API. * This removes a cron which has been implemented, this is part of Bull API.
* @param cronJobId The cron which is to be removed. * @param cronJobId The cron which is to be removed.
*/ */
removeRepeatableByKey(cronJobId: string) { async removeRepeatableByKey(cronJobId: string) {
// TODO: implement for testing // TODO: implement for testing
console.log(cronJobId) console.log(cronJobId)
} }
@ -111,12 +162,12 @@ class InMemoryQueue {
/** /**
* Implemented for tests * Implemented for tests
*/ */
getRepeatableJobs() { async getRepeatableJobs() {
return [] return []
} }
// eslint-disable-next-line no-unused-vars // eslint-disable-next-line no-unused-vars
removeJobs(pattern: string) { async removeJobs(pattern: string) {
// no-op // no-op
} }
@ -128,18 +179,22 @@ class InMemoryQueue {
} }
async getJob() { async getJob() {
return {} return null
} }
on() { on() {
// do nothing // do nothing
return this return this as any
} }
async waitForCompletion() { async waitForCompletion() {
do { do {
await timeout(50) await timeout(50)
} while (this._addCount < this._runCount) } while (this.hasRunningJobs())
}
hasRunningJobs() {
return this._addCount > this._runCount
} }
} }

View File

@ -88,6 +88,7 @@ enum QueueEventType {
AUDIT_LOG_EVENT = "audit-log-event", AUDIT_LOG_EVENT = "audit-log-event",
SYSTEM_EVENT = "system-event", SYSTEM_EVENT = "system-event",
APP_MIGRATION = "app-migration", APP_MIGRATION = "app-migration",
DOC_WRITETHROUGH = "doc-writethrough",
} }
const EventTypeMap: { [key in JobQueue]: QueueEventType } = { const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
@ -96,6 +97,7 @@ const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT, [JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT, [JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
[JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION, [JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION,
[JobQueue.DOC_WRITETHROUGH_QUEUE]: QueueEventType.DOC_WRITETHROUGH,
} }
function logging(queue: Queue, jobQueue: JobQueue) { function logging(queue: Queue, jobQueue: JobQueue) {

View File

@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners"
import { Duration } from "../utils" import { Duration } from "../utils"
import * as timers from "../timers" import * as timers from "../timers"
export { QueueOptions, Queue, JobOptions } from "bull"
// the queue lock is held for 5 minutes // the queue lock is held for 5 minutes
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
// queue lock is refreshed every 30 seconds // queue lock is refreshed every 30 seconds

View File

@ -9,7 +9,8 @@ let userClient: Client,
lockClient: Client, lockClient: Client,
socketClient: Client, socketClient: Client,
inviteClient: Client, inviteClient: Client,
passwordResetClient: Client passwordResetClient: Client,
docWritethroughClient: Client
export async function init() { export async function init() {
userClient = await new Client(utils.Databases.USER_CACHE).init() userClient = await new Client(utils.Databases.USER_CACHE).init()
@ -24,6 +25,9 @@ export async function init() {
utils.Databases.SOCKET_IO, utils.Databases.SOCKET_IO,
utils.SelectableDatabase.SOCKET_IO utils.SelectableDatabase.SOCKET_IO
).init() ).init()
docWritethroughClient = await new Client(
utils.Databases.DOC_WRITE_THROUGH
).init()
} }
export async function shutdown() { export async function shutdown() {
@ -104,3 +108,10 @@ export async function getPasswordResetClient() {
} }
return passwordResetClient return passwordResetClient
} }
export async function getDocWritethroughClient() {
if (!writethroughClient) {
await init()
}
return writethroughClient
}

View File

@ -320,6 +320,11 @@ class RedisWrapper {
await this.getClient().del(addDbPrefix(db, key)) await this.getClient().del(addDbPrefix(db, key))
} }
async bulkDelete(keys: string[]) {
const db = this._db
await this.getClient().del(keys.map(key => addDbPrefix(db, key)))
}
async clear() { async clear() {
let items = await this.scan() let items = await this.scan()
await Promise.all(items.map((obj: any) => this.delete(obj.key))) await Promise.all(items.map((obj: any) => this.delete(obj.key)))

View File

@ -30,6 +30,7 @@ export enum Databases {
LOCKS = "locks", LOCKS = "locks",
SOCKET_IO = "socket_io", SOCKET_IO = "socket_io",
BPM_EVENTS = "bpmEvents", BPM_EVENTS = "bpmEvents",
DOC_WRITE_THROUGH = "docWriteThrough",
} }
/** /**

@ -1 +1 @@
Subproject commit 6504ee7451b0a4b21e7e061a0f49add143bb483e Subproject commit 268a16f216f4b196c7b413e316bbfd20cc5ce6cc

View File

@ -38,6 +38,7 @@ export enum DocumentType {
AUTOMATION_METADATA = "meta_au", AUTOMATION_METADATA = "meta_au",
AUDIT_LOG = "al", AUDIT_LOG = "al",
APP_MIGRATION_METADATA = "_design/migrations", APP_MIGRATION_METADATA = "_design/migrations",
SCIM_LOG = "scimlog",
} }
// these are the core documents that make up the data, design // these are the core documents that make up the data, design

View File

@ -128,6 +128,7 @@ export interface Database {
exists(): Promise<boolean> exists(): Promise<boolean>
get<T extends Document>(id?: string): Promise<T> get<T extends Document>(id?: string): Promise<T>
exists(docId: string): Promise<boolean>
getMultiple<T extends Document>( getMultiple<T extends Document>(
ids: string[], ids: string[],
opts?: { allowMissing?: boolean } opts?: { allowMissing?: boolean }

View File

@ -1,5 +1,4 @@
import { sdk as proSdk } from "@budibase/pro" import { sdk as proSdk } from "@budibase/pro"
import * as userSdk from "./sdk/users"
export const initPro = async () => { export const initPro = async () => {
await proSdk.init({}) await proSdk.init({})