diff --git a/lerna.json b/lerna.json index a77a16a24e..a50794e91e 100644 --- a/lerna.json +++ b/lerna.json @@ -1,5 +1,5 @@ { - "version": "2.21.3", + "version": "2.21.4", "npmClient": "yarn", "packages": [ "packages/*", diff --git a/packages/backend-core/src/cache/base/index.ts b/packages/backend-core/src/cache/base/index.ts index 264984c6a5..74da4fe0d2 100644 --- a/packages/backend-core/src/cache/base/index.ts +++ b/packages/backend-core/src/cache/base/index.ts @@ -23,6 +23,18 @@ export default class BaseCache { return client.keys(pattern) } + async exists(key: string, opts = { useTenancy: true }) { + key = opts.useTenancy ? generateTenantKey(key) : key + const client = await this.getClient() + return client.exists(key) + } + + async scan(key: string, opts = { useTenancy: true }) { + key = opts.useTenancy ? generateTenantKey(key) : key + const client = await this.getClient() + return client.scan(key) + } + /** * Read only from the cache. */ @@ -32,6 +44,15 @@ export default class BaseCache { return client.get(key) } + /** + * Read only from the cache. + */ + async bulkGet(keys: string[], opts = { useTenancy: true }) { + keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys + const client = await this.getClient() + return client.bulkGet(keys) + } + /** * Write to the cache. */ @@ -46,6 +67,25 @@ export default class BaseCache { await client.store(key, value, ttl) } + /** + * Bulk write to the cache. + */ + async bulkStore( + data: Record, + ttl: number | null = null, + opts = { useTenancy: true } + ) { + if (opts.useTenancy) { + data = Object.entries(data).reduce((acc, [key, value]) => { + acc[generateTenantKey(key)] = value + return acc + }, {} as Record) + } + + const client = await this.getClient() + await client.bulkStore(data, ttl) + } + /** * Remove from cache. */ @@ -55,15 +95,24 @@ export default class BaseCache { return client.delete(key) } + /** + * Remove from cache. + */ + async bulkDelete(keys: string[], opts = { useTenancy: true }) { + keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys + const client = await this.getClient() + return client.bulkDelete(keys) + } + /** * Read from the cache. Write to the cache if not exists. */ - async withCache( + async withCache( key: string, - ttl: number, - fetchFn: any, + ttl: number | null = null, + fetchFn: () => Promise | T, opts = { useTenancy: true } - ) { + ): Promise { const cachedValue = await this.get(key, opts) if (cachedValue) { return cachedValue @@ -89,4 +138,13 @@ export default class BaseCache { throw err } } + + /** + * Delete the entry if the provided value matches the stored one. + */ + async deleteIfValue(key: string, value: any, opts = { useTenancy: true }) { + key = opts.useTenancy ? generateTenantKey(key) : key + const client = await this.getClient() + await client.deleteIfValue(key, value) + } } diff --git a/packages/backend-core/src/cache/docWritethrough.ts b/packages/backend-core/src/cache/docWritethrough.ts new file mode 100644 index 0000000000..51018b2317 --- /dev/null +++ b/packages/backend-core/src/cache/docWritethrough.ts @@ -0,0 +1,86 @@ +import { AnyDocument, Database } from "@budibase/types" + +import { JobQueue, createQueue } from "../queue" +import * as dbUtils from "../db" + +interface ProcessDocMessage { + dbName: string + docId: string + data: Record +} + +const PERSIST_MAX_ATTEMPTS = 100 + +export const docWritethroughProcessorQueue = createQueue( + JobQueue.DOC_WRITETHROUGH_QUEUE, + { + jobOptions: { + attempts: PERSIST_MAX_ATTEMPTS, + }, + } +) + +class DocWritethroughProcessor { + init() { + docWritethroughProcessorQueue.process(async message => { + try { + await this.persistToDb(message.data) + } catch (err: any) { + if (err.status === 409) { + // If we get a 409, it means that another job updated it meanwhile. We want to retry it to persist it again. + throw new Error( + `Conflict persisting message ${message.id}. Attempt ${message.attemptsMade}` + ) + } + + throw err + } + }) + return this + } + + private async persistToDb({ + dbName, + docId, + data, + }: { + dbName: string + docId: string + data: Record + }) { + const db = dbUtils.getDB(dbName) + let doc: AnyDocument | undefined + try { + doc = await db.get(docId) + } catch { + doc = { _id: docId } + } + + doc = { ...doc, ...data } + await db.put(doc) + } +} + +export const processor = new DocWritethroughProcessor().init() + +export class DocWritethrough { + private db: Database + private _docId: string + + constructor(db: Database, docId: string) { + this.db = db + this._docId = docId + } + + get docId() { + return this._docId + } + + async patch(data: Record) { + await docWritethroughProcessorQueue.add({ + dbName: this.db.name, + docId: this.docId, + data, + }) + } +} diff --git a/packages/backend-core/src/cache/generic.ts b/packages/backend-core/src/cache/generic.ts index 3ac323a8d4..2d6d8b9472 100644 --- a/packages/backend-core/src/cache/generic.ts +++ b/packages/backend-core/src/cache/generic.ts @@ -26,7 +26,8 @@ export const store = (...args: Parameters) => GENERIC.store(...args) export const destroy = (...args: Parameters) => GENERIC.delete(...args) -export const withCache = (...args: Parameters) => - GENERIC.withCache(...args) +export const withCache = ( + ...args: Parameters> +) => GENERIC.withCache(...args) export const bustCache = (...args: Parameters) => GENERIC.bustCache(...args) diff --git a/packages/backend-core/src/cache/index.ts b/packages/backend-core/src/cache/index.ts index 4fa986e4e2..3b25108634 100644 --- a/packages/backend-core/src/cache/index.ts +++ b/packages/backend-core/src/cache/index.ts @@ -5,3 +5,4 @@ export * as writethrough from "./writethrough" export * as invite from "./invite" export * as passwordReset from "./passwordReset" export * from "./generic" +export * as docWritethrough from "./docWritethrough" diff --git a/packages/backend-core/src/cache/tests/docWritethrough.spec.ts b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts new file mode 100644 index 0000000000..d90c83afd3 --- /dev/null +++ b/packages/backend-core/src/cache/tests/docWritethrough.spec.ts @@ -0,0 +1,288 @@ +import tk from "timekeeper" + +import _ from "lodash" +import { DBTestConfiguration, generator, structures } from "../../../tests" +import { getDB } from "../../db" + +import { + DocWritethrough, + docWritethroughProcessorQueue, +} from "../docWritethrough" + +import InMemoryQueue from "../../queue/inMemoryQueue" + +const initialTime = Date.now() + +async function waitForQueueCompletion() { + const queue: InMemoryQueue = docWritethroughProcessorQueue as never + await queue.waitForCompletion() +} + +describe("docWritethrough", () => { + const config = new DBTestConfiguration() + + const db = getDB(structures.db.id()) + let documentId: string + let docWritethrough: DocWritethrough + + describe("patch", () => { + function generatePatchObject(fieldCount: number) { + const keys = generator.unique(() => generator.word(), fieldCount) + return keys.reduce((acc, c) => { + acc[c] = generator.word() + return acc + }, {} as Record) + } + + beforeEach(async () => { + jest.clearAllMocks() + documentId = structures.uuid() + docWritethrough = new DocWritethrough(db, documentId) + }) + + it("patching will not persist until the messages are persisted", async () => { + await config.doInTenant(async () => { + await docWritethrough.patch(generatePatchObject(2)) + await docWritethrough.patch(generatePatchObject(2)) + + expect(await db.exists(documentId)).toBe(false) + }) + }) + + it("patching will persist when the messages are persisted", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + await waitForQueueCompletion() + + // This will not be persisted + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + expect(await db.get(documentId)).toEqual({ + _id: documentId, + ...patch1, + ...patch2, + _rev: expect.stringMatching(/2-.+/), + createdAt: new Date(initialTime).toISOString(), + updatedAt: new Date(initialTime).toISOString(), + }) + }) + }) + + it("patching will persist keeping the previous data", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await docWritethrough.patch(patch2) + + await waitForQueueCompletion() + + const patch3 = generatePatchObject(3) + await docWritethrough.patch(patch3) + + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + _id: documentId, + ...patch1, + ...patch2, + ...patch3, + }) + ) + }) + }) + + it("date audit fields are set correctly when persisting", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + const patch2 = generatePatchObject(2) + await docWritethrough.patch(patch1) + const date1 = new Date() + await waitForQueueCompletion() + await docWritethrough.patch(patch2) + + tk.travel(Date.now() + 100) + const date2 = new Date() + await waitForQueueCompletion() + + expect(date1).not.toEqual(date2) + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + createdAt: date1.toISOString(), + updatedAt: date2.toISOString(), + }) + ) + }) + }) + + it("concurrent patches will override keys", async () => { + await config.doInTenant(async () => { + const patch1 = generatePatchObject(2) + await docWritethrough.patch(patch1) + await waitForQueueCompletion() + const patch2 = generatePatchObject(1) + await docWritethrough.patch(patch2) + + const keyToOverride = _.sample(Object.keys(patch1))! + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + [keyToOverride]: patch1[keyToOverride], + }) + ) + + await waitForQueueCompletion() + + const patch3 = { + ...generatePatchObject(3), + [keyToOverride]: generator.word(), + } + await docWritethrough.patch(patch3) + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ + ...patch1, + ...patch2, + ...patch3, + }) + ) + }) + }) + + it("concurrent patches to different docWritethrough will not pollute each other", async () => { + await config.doInTenant(async () => { + const secondDocWritethrough = new DocWritethrough( + db, + structures.db.id() + ) + + const doc1Patch = generatePatchObject(2) + await docWritethrough.patch(doc1Patch) + const doc2Patch = generatePatchObject(1) + await secondDocWritethrough.patch(doc2Patch) + + await waitForQueueCompletion() + + const doc1Patch2 = generatePatchObject(3) + await docWritethrough.patch(doc1Patch2) + const doc2Patch2 = generatePatchObject(3) + await secondDocWritethrough.patch(doc2Patch2) + await waitForQueueCompletion() + + expect(await db.get(docWritethrough.docId)).toEqual( + expect.objectContaining({ + ...doc1Patch, + ...doc1Patch2, + }) + ) + + expect(await db.get(secondDocWritethrough.docId)).toEqual( + expect.objectContaining({ + ...doc2Patch, + ...doc2Patch2, + }) + ) + }) + }) + + it("cached values are persisted only once", async () => { + await config.doInTenant(async () => { + const initialPatch = generatePatchObject(5) + + await docWritethrough.patch(initialPatch) + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining(initialPatch) + ) + + await db.remove(await db.get(documentId)) + + await waitForQueueCompletion() + const extraPatch = generatePatchObject(5) + await docWritethrough.patch(extraPatch) + await waitForQueueCompletion() + + expect(await db.get(documentId)).toEqual( + expect.objectContaining(extraPatch) + ) + expect(await db.get(documentId)).not.toEqual( + expect.objectContaining(initialPatch) + ) + }) + }) + + it("concurrent calls will not cause conflicts", async () => { + async function parallelPatch(count: number) { + const patches = Array.from({ length: count }).map(() => + generatePatchObject(1) + ) + await Promise.all(patches.map(p => docWritethrough.patch(p))) + + return patches.reduce((acc, c) => { + acc = { ...acc, ...c } + return acc + }, {}) + } + const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add") + + await config.doInTenant(async () => { + let patches = await parallelPatch(5) + expect(queueMessageSpy).toBeCalledTimes(5) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining(patches) + ) + + patches = { ...patches, ...(await parallelPatch(40)) } + expect(queueMessageSpy).toBeCalledTimes(45) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining(patches) + ) + + patches = { ...patches, ...(await parallelPatch(10)) } + expect(queueMessageSpy).toBeCalledTimes(55) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining(patches) + ) + }) + }) + + // This is not yet supported + it.skip("patches will execute in order", async () => { + let incrementalValue = 0 + const keyToOverride = generator.word() + async function incrementalPatches(count: number) { + for (let i = 0; i < count; i++) { + await docWritethrough.patch({ [keyToOverride]: incrementalValue++ }) + } + } + + await config.doInTenant(async () => { + await incrementalPatches(5) + + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ [keyToOverride]: 5 }) + ) + + await incrementalPatches(40) + await waitForQueueCompletion() + expect(await db.get(documentId)).toEqual( + expect.objectContaining({ [keyToOverride]: 45 }) + ) + }) + }) + }) +}) diff --git a/packages/backend-core/src/constants/db.ts b/packages/backend-core/src/constants/db.ts index ac00483021..f4caac502e 100644 --- a/packages/backend-core/src/constants/db.ts +++ b/packages/backend-core/src/constants/db.ts @@ -57,6 +57,9 @@ export const StaticDatabases = { AUDIT_LOGS: { name: "audit-logs", }, + SCIM_LOGS: { + name: "scim-logs", + }, } export const APP_PREFIX = prefixed(DocumentType.APP) diff --git a/packages/backend-core/src/context/mainContext.ts b/packages/backend-core/src/context/mainContext.ts index 36fd5dcb48..ae86695168 100644 --- a/packages/backend-core/src/context/mainContext.ts +++ b/packages/backend-core/src/context/mainContext.ts @@ -35,6 +35,17 @@ export function getAuditLogDBName(tenantId?: string) { } } +export function getScimDBName(tenantId?: string) { + if (!tenantId) { + tenantId = getTenantId() + } + if (tenantId === DEFAULT_TENANT_ID) { + return StaticDatabases.SCIM_LOGS.name + } else { + return `${tenantId}${SEPARATOR}${StaticDatabases.SCIM_LOGS.name}` + } +} + export function baseGlobalDBName(tenantId: string | undefined | null) { if (!tenantId || tenantId === DEFAULT_TENANT_ID) { return StaticDatabases.GLOBAL.name diff --git a/packages/backend-core/src/db/couch/DatabaseImpl.ts b/packages/backend-core/src/db/couch/DatabaseImpl.ts index 7e7c997cbe..416313f520 100644 --- a/packages/backend-core/src/db/couch/DatabaseImpl.ts +++ b/packages/backend-core/src/db/couch/DatabaseImpl.ts @@ -70,7 +70,15 @@ export class DatabaseImpl implements Database { DatabaseImpl.nano = buildNano(couchInfo) } - async exists() { + exists(docId?: string) { + if (docId === undefined) { + return this.dbExists() + } + + return this.docExists(docId) + } + + private async dbExists() { const response = await directCouchUrlCall({ url: `${this.couchInfo.url}/${this.name}`, method: "HEAD", @@ -79,6 +87,15 @@ export class DatabaseImpl implements Database { return response.status === 200 } + private async docExists(id: string): Promise { + try { + await this.performCall(db => () => db.head(id)) + return true + } catch { + return false + } + } + private nano() { return this.instanceNano || DatabaseImpl.nano } diff --git a/packages/backend-core/src/db/instrumentation.ts b/packages/backend-core/src/db/instrumentation.ts index 03010d4c92..795f30d7cd 100644 --- a/packages/backend-core/src/db/instrumentation.ts +++ b/packages/backend-core/src/db/instrumentation.ts @@ -24,9 +24,12 @@ export class DDInstrumentedDatabase implements Database { return this.db.name } - exists(): Promise { + exists(docId?: string): Promise { return tracer.trace("db.exists", span => { - span?.addTags({ db_name: this.name }) + span?.addTags({ db_name: this.name, doc_id: docId }) + if (docId) { + return this.db.exists(docId) + } return this.db.exists() }) } diff --git a/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts new file mode 100644 index 0000000000..586f13f417 --- /dev/null +++ b/packages/backend-core/src/db/tests/DatabaseImpl.spec.ts @@ -0,0 +1,55 @@ +import _ from "lodash" +import { AnyDocument } from "@budibase/types" +import { generator } from "../../../tests" +import { DatabaseImpl } from "../couch" +import { newid } from "../../utils" + +describe("DatabaseImpl", () => { + const database = new DatabaseImpl(generator.word()) + const documents: AnyDocument[] = [] + + beforeAll(async () => { + const docsToCreate = Array.from({ length: 10 }).map(() => ({ + _id: newid(), + })) + const createdDocs = await database.bulkDocs(docsToCreate) + + documents.push(...createdDocs.map((x: any) => ({ _id: x.id, _rev: x.rev }))) + }) + + describe("document exists", () => { + it("can check existing docs by id", async () => { + const existingDoc = _.sample(documents) + const result = await database.exists(existingDoc!._id!) + + expect(result).toBe(true) + }) + + it("can check non existing docs by id", async () => { + const result = await database.exists(newid()) + + expect(result).toBe(false) + }) + + it("can check an existing doc by id multiple times", async () => { + const existingDoc = _.sample(documents) + const id = existingDoc!._id! + + const results = [] + results.push(await database.exists(id)) + results.push(await database.exists(id)) + results.push(await database.exists(id)) + + expect(results).toEqual([true, true, true]) + }) + + it("returns false after the doc is deleted", async () => { + const existingDoc = _.sample(documents) + const id = existingDoc!._id! + expect(await database.exists(id)).toBe(true) + + await database.remove(existingDoc!) + expect(await database.exists(id)).toBe(false) + }) + }) +}) diff --git a/packages/backend-core/src/environment.ts b/packages/backend-core/src/environment.ts index b3179cbeea..2da2a77d67 100644 --- a/packages/backend-core/src/environment.ts +++ b/packages/backend-core/src/environment.ts @@ -186,6 +186,7 @@ const environment = { environment[key] = value }, ROLLING_LOG_MAX_SIZE: process.env.ROLLING_LOG_MAX_SIZE || "10M", + DISABLE_SCIM_CALLS: process.env.DISABLE_SCIM_CALLS, } // clean up any environment variable edge cases diff --git a/packages/backend-core/src/queue/constants.ts b/packages/backend-core/src/queue/constants.ts index eb4f21aced..a095c6c769 100644 --- a/packages/backend-core/src/queue/constants.ts +++ b/packages/backend-core/src/queue/constants.ts @@ -4,4 +4,5 @@ export enum JobQueue { AUDIT_LOG = "auditLogQueue", SYSTEM_EVENT_QUEUE = "systemEventQueue", APP_MIGRATION = "appMigration", + DOC_WRITETHROUGH_QUEUE = "docWritethroughQueue", } diff --git a/packages/backend-core/src/queue/inMemoryQueue.ts b/packages/backend-core/src/queue/inMemoryQueue.ts index c05bbffbe9..afb5592562 100644 --- a/packages/backend-core/src/queue/inMemoryQueue.ts +++ b/packages/backend-core/src/queue/inMemoryQueue.ts @@ -1,5 +1,14 @@ import events from "events" -import { timeout } from "../utils" +import { newid, timeout } from "../utils" +import { Queue, QueueOptions, JobOptions } from "./queue" + +interface JobMessage { + id: string + timestamp: number + queue: string + data: any + opts?: JobOptions +} /** * Bull works with a Job wrapper around all messages that contains a lot more information about @@ -10,12 +19,13 @@ import { timeout } from "../utils" * @returns A new job which can now be put onto the queue, this is mostly an * internal structure so that an in memory queue can be easily swapped for a Bull queue. */ -function newJob(queue: string, message: any) { +function newJob(queue: string, message: any, opts?: JobOptions): JobMessage { return { + id: newid(), timestamp: Date.now(), queue: queue, data: message, - opts: {}, + opts, } } @@ -24,26 +34,29 @@ function newJob(queue: string, message: any) { * It is relatively simple, using an event emitter internally to register when messages are available * to the consumers - in can support many inputs and many consumers. */ -class InMemoryQueue { +class InMemoryQueue implements Partial { _name: string - _opts?: any - _messages: any[] + _opts?: QueueOptions + _messages: JobMessage[] + _queuedJobIds: Set _emitter: EventEmitter _runCount: number _addCount: number + /** * The constructor the queue, exactly the same as that of Bulls. * @param name The name of the queue which is being configured. * @param opts This is not used by the in memory queue as there is no real use * case when in memory, but is the same API as Bull */ - constructor(name: string, opts?: any) { + constructor(name: string, opts?: QueueOptions) { this._name = name this._opts = opts this._messages = [] this._emitter = new events.EventEmitter() this._runCount = 0 this._addCount = 0 + this._queuedJobIds = new Set() } /** @@ -55,22 +68,42 @@ class InMemoryQueue { * note this is incredibly limited compared to Bull as in reality the Job would contain * a lot more information about the queue and current status of Bull cluster. */ - process(func: any) { + async process(func: any) { this._emitter.on("message", async () => { if (this._messages.length <= 0) { return } let msg = this._messages.shift() + let resp = func(msg) + + async function retryFunc(fnc: any) { + try { + await fnc + } catch (e: any) { + await new Promise(r => setTimeout(() => r(), 50)) + + await retryFunc(func(msg)) + } + } + if (resp.then != null) { - await resp + try { + await retryFunc(resp) + } catch (e: any) { + console.error(e) + } } this._runCount++ + const jobId = msg?.opts?.jobId?.toString() + if (jobId && msg?.opts?.removeOnComplete) { + this._queuedJobIds.delete(jobId) + } }) } async isReady() { - return true + return this as any } // simply puts a message to the queue and emits to the queue for processing @@ -83,27 +116,45 @@ class InMemoryQueue { * @param repeat serves no purpose for the import queue. */ // eslint-disable-next-line no-unused-vars - add(msg: any, repeat: boolean) { - if (typeof msg !== "object") { + async add(data: any, opts?: JobOptions) { + const jobId = opts?.jobId?.toString() + if (jobId && this._queuedJobIds.has(jobId)) { + console.log(`Ignoring already queued job ${jobId}`) + return + } + + if (typeof data !== "object") { throw "Queue only supports carrying JSON." } - this._messages.push(newJob(this._name, msg)) - this._addCount++ - this._emitter.emit("message") + if (jobId) { + this._queuedJobIds.add(jobId) + } + + const pushMessage = () => { + this._messages.push(newJob(this._name, data, opts)) + this._addCount++ + this._emitter.emit("message") + } + + const delay = opts?.delay + if (delay) { + setTimeout(pushMessage, delay) + } else { + pushMessage() + } + return {} as any } /** * replicating the close function from bull, which waits for jobs to finish. */ - async close() { - return [] - } + async close() {} /** * This removes a cron which has been implemented, this is part of Bull API. * @param cronJobId The cron which is to be removed. */ - removeRepeatableByKey(cronJobId: string) { + async removeRepeatableByKey(cronJobId: string) { // TODO: implement for testing console.log(cronJobId) } @@ -111,12 +162,12 @@ class InMemoryQueue { /** * Implemented for tests */ - getRepeatableJobs() { + async getRepeatableJobs() { return [] } // eslint-disable-next-line no-unused-vars - removeJobs(pattern: string) { + async removeJobs(pattern: string) { // no-op } @@ -128,18 +179,22 @@ class InMemoryQueue { } async getJob() { - return {} + return null } on() { // do nothing - return this + return this as any } async waitForCompletion() { do { await timeout(50) - } while (this._addCount < this._runCount) + } while (this.hasRunningJobs()) + } + + hasRunningJobs() { + return this._addCount > this._runCount } } diff --git a/packages/backend-core/src/queue/listeners.ts b/packages/backend-core/src/queue/listeners.ts index 063a01bd2f..14dce5fe8d 100644 --- a/packages/backend-core/src/queue/listeners.ts +++ b/packages/backend-core/src/queue/listeners.ts @@ -88,6 +88,7 @@ enum QueueEventType { AUDIT_LOG_EVENT = "audit-log-event", SYSTEM_EVENT = "system-event", APP_MIGRATION = "app-migration", + DOC_WRITETHROUGH = "doc-writethrough", } const EventTypeMap: { [key in JobQueue]: QueueEventType } = { @@ -96,6 +97,7 @@ const EventTypeMap: { [key in JobQueue]: QueueEventType } = { [JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT, [JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT, [JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION, + [JobQueue.DOC_WRITETHROUGH_QUEUE]: QueueEventType.DOC_WRITETHROUGH, } function logging(queue: Queue, jobQueue: JobQueue) { diff --git a/packages/backend-core/src/queue/queue.ts b/packages/backend-core/src/queue/queue.ts index 0bcb25a35f..1838eed92f 100644 --- a/packages/backend-core/src/queue/queue.ts +++ b/packages/backend-core/src/queue/queue.ts @@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners" import { Duration } from "../utils" import * as timers from "../timers" +export { QueueOptions, Queue, JobOptions } from "bull" + // the queue lock is held for 5 minutes const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs() // queue lock is refreshed every 30 seconds diff --git a/packages/backend-core/src/redis/init.ts b/packages/backend-core/src/redis/init.ts index f3bcee3209..7920dfed2d 100644 --- a/packages/backend-core/src/redis/init.ts +++ b/packages/backend-core/src/redis/init.ts @@ -9,7 +9,8 @@ let userClient: Client, lockClient: Client, socketClient: Client, inviteClient: Client, - passwordResetClient: Client + passwordResetClient: Client, + docWritethroughClient: Client export async function init() { userClient = await new Client(utils.Databases.USER_CACHE).init() @@ -24,6 +25,9 @@ export async function init() { utils.Databases.SOCKET_IO, utils.SelectableDatabase.SOCKET_IO ).init() + docWritethroughClient = await new Client( + utils.Databases.DOC_WRITE_THROUGH + ).init() } export async function shutdown() { @@ -104,3 +108,10 @@ export async function getPasswordResetClient() { } return passwordResetClient } + +export async function getDocWritethroughClient() { + if (!writethroughClient) { + await init() + } + return writethroughClient +} diff --git a/packages/backend-core/src/redis/redis.ts b/packages/backend-core/src/redis/redis.ts index beb59ee3fa..79f75421d3 100644 --- a/packages/backend-core/src/redis/redis.ts +++ b/packages/backend-core/src/redis/redis.ts @@ -320,6 +320,11 @@ class RedisWrapper { await this.getClient().del(addDbPrefix(db, key)) } + async bulkDelete(keys: string[]) { + const db = this._db + await this.getClient().del(keys.map(key => addDbPrefix(db, key))) + } + async clear() { let items = await this.scan() await Promise.all(items.map((obj: any) => this.delete(obj.key))) diff --git a/packages/backend-core/src/redis/utils.ts b/packages/backend-core/src/redis/utils.ts index 7b93458b52..7f84f11467 100644 --- a/packages/backend-core/src/redis/utils.ts +++ b/packages/backend-core/src/redis/utils.ts @@ -30,6 +30,7 @@ export enum Databases { LOCKS = "locks", SOCKET_IO = "socket_io", BPM_EVENTS = "bpmEvents", + DOC_WRITE_THROUGH = "docWriteThrough", } /** diff --git a/packages/pro b/packages/pro index 6504ee7451..268a16f216 160000 --- a/packages/pro +++ b/packages/pro @@ -1 +1 @@ -Subproject commit 6504ee7451b0a4b21e7e061a0f49add143bb483e +Subproject commit 268a16f216f4b196c7b413e316bbfd20cc5ce6cc diff --git a/packages/types/src/documents/document.ts b/packages/types/src/documents/document.ts index 18feb9b518..0de4337f4b 100644 --- a/packages/types/src/documents/document.ts +++ b/packages/types/src/documents/document.ts @@ -38,6 +38,7 @@ export enum DocumentType { AUTOMATION_METADATA = "meta_au", AUDIT_LOG = "al", APP_MIGRATION_METADATA = "_design/migrations", + SCIM_LOG = "scimlog", } // these are the core documents that make up the data, design diff --git a/packages/types/src/sdk/db.ts b/packages/types/src/sdk/db.ts index c4e4a4f02f..4d103d5be6 100644 --- a/packages/types/src/sdk/db.ts +++ b/packages/types/src/sdk/db.ts @@ -128,6 +128,7 @@ export interface Database { exists(): Promise get(id?: string): Promise + exists(docId: string): Promise getMultiple( ids: string[], opts?: { allowMissing?: boolean } diff --git a/packages/worker/src/initPro.ts b/packages/worker/src/initPro.ts index ddc8d2562a..b34d514992 100644 --- a/packages/worker/src/initPro.ts +++ b/packages/worker/src/initPro.ts @@ -1,5 +1,4 @@ import { sdk as proSdk } from "@budibase/pro" -import * as userSdk from "./sdk/users" export const initPro = async () => { await proSdk.init({})