Merge branch 'master' of github.com:Budibase/budibase into labday/sqs
This commit is contained in:
commit
121ad109ec
|
@ -44,7 +44,8 @@
|
|||
"no-undef": "off",
|
||||
"no-prototype-builtins": "off",
|
||||
"local-rules/no-budibase-imports": "error",
|
||||
"local-rules/no-test-com": "error"
|
||||
"local-rules/no-test-com": "error",
|
||||
"local-rules/email-domain-example-com": "error"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
|
|
@ -51,4 +51,41 @@ module.exports = {
|
|||
}
|
||||
},
|
||||
},
|
||||
"email-domain-example-com": {
|
||||
meta: {
|
||||
type: "problem",
|
||||
docs: {
|
||||
description:
|
||||
"enforce using the example.com domain for generator.email calls",
|
||||
category: "Possible Errors",
|
||||
recommended: false,
|
||||
},
|
||||
fixable: "code",
|
||||
schema: [],
|
||||
},
|
||||
create: function (context) {
|
||||
return {
|
||||
CallExpression(node) {
|
||||
if (
|
||||
node.callee.type === "MemberExpression" &&
|
||||
node.callee.object.name === "generator" &&
|
||||
node.callee.property.name === "email" &&
|
||||
node.arguments.length === 0
|
||||
) {
|
||||
context.report({
|
||||
node,
|
||||
message:
|
||||
"Prefer using generator.email with the domain \"{ domain: 'example.com' }\".",
|
||||
fix: function (fixer) {
|
||||
return fixer.replaceText(
|
||||
node,
|
||||
'generator.email({ domain: "example.com" })'
|
||||
)
|
||||
},
|
||||
})
|
||||
}
|
||||
},
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
{
|
||||
"version": "2.21.3",
|
||||
"version": "2.21.4",
|
||||
"npmClient": "yarn",
|
||||
"packages": [
|
||||
"packages/*",
|
||||
|
|
|
@ -23,6 +23,18 @@ export default class BaseCache {
|
|||
return client.keys(pattern)
|
||||
}
|
||||
|
||||
async exists(key: string, opts = { useTenancy: true }) {
|
||||
key = opts.useTenancy ? generateTenantKey(key) : key
|
||||
const client = await this.getClient()
|
||||
return client.exists(key)
|
||||
}
|
||||
|
||||
async scan(key: string, opts = { useTenancy: true }) {
|
||||
key = opts.useTenancy ? generateTenantKey(key) : key
|
||||
const client = await this.getClient()
|
||||
return client.scan(key)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read only from the cache.
|
||||
*/
|
||||
|
@ -32,6 +44,15 @@ export default class BaseCache {
|
|||
return client.get(key)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read only from the cache.
|
||||
*/
|
||||
async bulkGet<T>(keys: string[], opts = { useTenancy: true }) {
|
||||
keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys
|
||||
const client = await this.getClient()
|
||||
return client.bulkGet<T>(keys)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write to the cache.
|
||||
*/
|
||||
|
@ -46,6 +67,25 @@ export default class BaseCache {
|
|||
await client.store(key, value, ttl)
|
||||
}
|
||||
|
||||
/**
|
||||
* Bulk write to the cache.
|
||||
*/
|
||||
async bulkStore(
|
||||
data: Record<string, any>,
|
||||
ttl: number | null = null,
|
||||
opts = { useTenancy: true }
|
||||
) {
|
||||
if (opts.useTenancy) {
|
||||
data = Object.entries(data).reduce((acc, [key, value]) => {
|
||||
acc[generateTenantKey(key)] = value
|
||||
return acc
|
||||
}, {} as Record<string, any>)
|
||||
}
|
||||
|
||||
const client = await this.getClient()
|
||||
await client.bulkStore(data, ttl)
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove from cache.
|
||||
*/
|
||||
|
@ -55,15 +95,24 @@ export default class BaseCache {
|
|||
return client.delete(key)
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove from cache.
|
||||
*/
|
||||
async bulkDelete(keys: string[], opts = { useTenancy: true }) {
|
||||
keys = opts.useTenancy ? keys.map(key => generateTenantKey(key)) : keys
|
||||
const client = await this.getClient()
|
||||
return client.bulkDelete(keys)
|
||||
}
|
||||
|
||||
/**
|
||||
* Read from the cache. Write to the cache if not exists.
|
||||
*/
|
||||
async withCache(
|
||||
async withCache<T>(
|
||||
key: string,
|
||||
ttl: number,
|
||||
fetchFn: any,
|
||||
ttl: number | null = null,
|
||||
fetchFn: () => Promise<T> | T,
|
||||
opts = { useTenancy: true }
|
||||
) {
|
||||
): Promise<T> {
|
||||
const cachedValue = await this.get(key, opts)
|
||||
if (cachedValue) {
|
||||
return cachedValue
|
||||
|
@ -89,4 +138,13 @@ export default class BaseCache {
|
|||
throw err
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete the entry if the provided value matches the stored one.
|
||||
*/
|
||||
async deleteIfValue(key: string, value: any, opts = { useTenancy: true }) {
|
||||
key = opts.useTenancy ? generateTenantKey(key) : key
|
||||
const client = await this.getClient()
|
||||
await client.deleteIfValue(key, value)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
import { AnyDocument, Database } from "@budibase/types"
|
||||
|
||||
import { JobQueue, createQueue } from "../queue"
|
||||
import * as dbUtils from "../db"
|
||||
|
||||
interface ProcessDocMessage {
|
||||
dbName: string
|
||||
docId: string
|
||||
data: Record<string, any>
|
||||
}
|
||||
|
||||
const PERSIST_MAX_ATTEMPTS = 100
|
||||
|
||||
export const docWritethroughProcessorQueue = createQueue<ProcessDocMessage>(
|
||||
JobQueue.DOC_WRITETHROUGH_QUEUE,
|
||||
{
|
||||
jobOptions: {
|
||||
attempts: PERSIST_MAX_ATTEMPTS,
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
class DocWritethroughProcessor {
|
||||
init() {
|
||||
docWritethroughProcessorQueue.process(async message => {
|
||||
try {
|
||||
await this.persistToDb(message.data)
|
||||
} catch (err: any) {
|
||||
if (err.status === 409) {
|
||||
// If we get a 409, it means that another job updated it meanwhile. We want to retry it to persist it again.
|
||||
throw new Error(
|
||||
`Conflict persisting message ${message.id}. Attempt ${message.attemptsMade}`
|
||||
)
|
||||
}
|
||||
|
||||
throw err
|
||||
}
|
||||
})
|
||||
return this
|
||||
}
|
||||
|
||||
private async persistToDb({
|
||||
dbName,
|
||||
docId,
|
||||
data,
|
||||
}: {
|
||||
dbName: string
|
||||
docId: string
|
||||
data: Record<string, any>
|
||||
}) {
|
||||
const db = dbUtils.getDB(dbName)
|
||||
let doc: AnyDocument | undefined
|
||||
try {
|
||||
doc = await db.get(docId)
|
||||
} catch {
|
||||
doc = { _id: docId }
|
||||
}
|
||||
|
||||
doc = { ...doc, ...data }
|
||||
await db.put(doc)
|
||||
}
|
||||
}
|
||||
|
||||
export const processor = new DocWritethroughProcessor().init()
|
||||
|
||||
export class DocWritethrough {
|
||||
private db: Database
|
||||
private _docId: string
|
||||
|
||||
constructor(db: Database, docId: string) {
|
||||
this.db = db
|
||||
this._docId = docId
|
||||
}
|
||||
|
||||
get docId() {
|
||||
return this._docId
|
||||
}
|
||||
|
||||
async patch(data: Record<string, any>) {
|
||||
await docWritethroughProcessorQueue.add({
|
||||
dbName: this.db.name,
|
||||
docId: this.docId,
|
||||
data,
|
||||
})
|
||||
}
|
||||
}
|
|
@ -26,7 +26,8 @@ export const store = (...args: Parameters<typeof GENERIC.store>) =>
|
|||
GENERIC.store(...args)
|
||||
export const destroy = (...args: Parameters<typeof GENERIC.delete>) =>
|
||||
GENERIC.delete(...args)
|
||||
export const withCache = (...args: Parameters<typeof GENERIC.withCache>) =>
|
||||
GENERIC.withCache(...args)
|
||||
export const withCache = <T>(
|
||||
...args: Parameters<typeof GENERIC.withCache<T>>
|
||||
) => GENERIC.withCache(...args)
|
||||
export const bustCache = (...args: Parameters<typeof GENERIC.bustCache>) =>
|
||||
GENERIC.bustCache(...args)
|
||||
|
|
|
@ -0,0 +1,288 @@
|
|||
import tk from "timekeeper"
|
||||
|
||||
import _ from "lodash"
|
||||
import { DBTestConfiguration, generator, structures } from "../../../tests"
|
||||
import { getDB } from "../../db"
|
||||
|
||||
import {
|
||||
DocWritethrough,
|
||||
docWritethroughProcessorQueue,
|
||||
} from "../docWritethrough"
|
||||
|
||||
import InMemoryQueue from "../../queue/inMemoryQueue"
|
||||
|
||||
const initialTime = Date.now()
|
||||
|
||||
async function waitForQueueCompletion() {
|
||||
const queue: InMemoryQueue = docWritethroughProcessorQueue as never
|
||||
await queue.waitForCompletion()
|
||||
}
|
||||
|
||||
describe("docWritethrough", () => {
|
||||
const config = new DBTestConfiguration()
|
||||
|
||||
const db = getDB(structures.db.id())
|
||||
let documentId: string
|
||||
let docWritethrough: DocWritethrough
|
||||
|
||||
describe("patch", () => {
|
||||
function generatePatchObject(fieldCount: number) {
|
||||
const keys = generator.unique(() => generator.word(), fieldCount)
|
||||
return keys.reduce((acc, c) => {
|
||||
acc[c] = generator.word()
|
||||
return acc
|
||||
}, {} as Record<string, any>)
|
||||
}
|
||||
|
||||
beforeEach(async () => {
|
||||
jest.clearAllMocks()
|
||||
documentId = structures.uuid()
|
||||
docWritethrough = new DocWritethrough(db, documentId)
|
||||
})
|
||||
|
||||
it("patching will not persist until the messages are persisted", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
await docWritethrough.patch(generatePatchObject(2))
|
||||
await docWritethrough.patch(generatePatchObject(2))
|
||||
|
||||
expect(await db.exists(documentId)).toBe(false)
|
||||
})
|
||||
})
|
||||
|
||||
it("patching will persist when the messages are persisted", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const patch1 = generatePatchObject(2)
|
||||
const patch2 = generatePatchObject(2)
|
||||
await docWritethrough.patch(patch1)
|
||||
await docWritethrough.patch(patch2)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
|
||||
// This will not be persisted
|
||||
const patch3 = generatePatchObject(3)
|
||||
await docWritethrough.patch(patch3)
|
||||
|
||||
expect(await db.get(documentId)).toEqual({
|
||||
_id: documentId,
|
||||
...patch1,
|
||||
...patch2,
|
||||
_rev: expect.stringMatching(/2-.+/),
|
||||
createdAt: new Date(initialTime).toISOString(),
|
||||
updatedAt: new Date(initialTime).toISOString(),
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
it("patching will persist keeping the previous data", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const patch1 = generatePatchObject(2)
|
||||
const patch2 = generatePatchObject(2)
|
||||
await docWritethrough.patch(patch1)
|
||||
await docWritethrough.patch(patch2)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
|
||||
const patch3 = generatePatchObject(3)
|
||||
await docWritethrough.patch(patch3)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
_id: documentId,
|
||||
...patch1,
|
||||
...patch2,
|
||||
...patch3,
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
it("date audit fields are set correctly when persisting", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const patch1 = generatePatchObject(2)
|
||||
const patch2 = generatePatchObject(2)
|
||||
await docWritethrough.patch(patch1)
|
||||
const date1 = new Date()
|
||||
await waitForQueueCompletion()
|
||||
await docWritethrough.patch(patch2)
|
||||
|
||||
tk.travel(Date.now() + 100)
|
||||
const date2 = new Date()
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(date1).not.toEqual(date2)
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
createdAt: date1.toISOString(),
|
||||
updatedAt: date2.toISOString(),
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
it("concurrent patches will override keys", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const patch1 = generatePatchObject(2)
|
||||
await docWritethrough.patch(patch1)
|
||||
await waitForQueueCompletion()
|
||||
const patch2 = generatePatchObject(1)
|
||||
await docWritethrough.patch(patch2)
|
||||
|
||||
const keyToOverride = _.sample(Object.keys(patch1))!
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
[keyToOverride]: patch1[keyToOverride],
|
||||
})
|
||||
)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
|
||||
const patch3 = {
|
||||
...generatePatchObject(3),
|
||||
[keyToOverride]: generator.word(),
|
||||
}
|
||||
await docWritethrough.patch(patch3)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining({
|
||||
...patch1,
|
||||
...patch2,
|
||||
...patch3,
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
it("concurrent patches to different docWritethrough will not pollute each other", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const secondDocWritethrough = new DocWritethrough(
|
||||
db,
|
||||
structures.db.id()
|
||||
)
|
||||
|
||||
const doc1Patch = generatePatchObject(2)
|
||||
await docWritethrough.patch(doc1Patch)
|
||||
const doc2Patch = generatePatchObject(1)
|
||||
await secondDocWritethrough.patch(doc2Patch)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
|
||||
const doc1Patch2 = generatePatchObject(3)
|
||||
await docWritethrough.patch(doc1Patch2)
|
||||
const doc2Patch2 = generatePatchObject(3)
|
||||
await secondDocWritethrough.patch(doc2Patch2)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(docWritethrough.docId)).toEqual(
|
||||
expect.objectContaining({
|
||||
...doc1Patch,
|
||||
...doc1Patch2,
|
||||
})
|
||||
)
|
||||
|
||||
expect(await db.get(secondDocWritethrough.docId)).toEqual(
|
||||
expect.objectContaining({
|
||||
...doc2Patch,
|
||||
...doc2Patch2,
|
||||
})
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
it("cached values are persisted only once", async () => {
|
||||
await config.doInTenant(async () => {
|
||||
const initialPatch = generatePatchObject(5)
|
||||
|
||||
await docWritethrough.patch(initialPatch)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining(initialPatch)
|
||||
)
|
||||
|
||||
await db.remove(await db.get(documentId))
|
||||
|
||||
await waitForQueueCompletion()
|
||||
const extraPatch = generatePatchObject(5)
|
||||
await docWritethrough.patch(extraPatch)
|
||||
await waitForQueueCompletion()
|
||||
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining(extraPatch)
|
||||
)
|
||||
expect(await db.get(documentId)).not.toEqual(
|
||||
expect.objectContaining(initialPatch)
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
it("concurrent calls will not cause conflicts", async () => {
|
||||
async function parallelPatch(count: number) {
|
||||
const patches = Array.from({ length: count }).map(() =>
|
||||
generatePatchObject(1)
|
||||
)
|
||||
await Promise.all(patches.map(p => docWritethrough.patch(p)))
|
||||
|
||||
return patches.reduce((acc, c) => {
|
||||
acc = { ...acc, ...c }
|
||||
return acc
|
||||
}, {})
|
||||
}
|
||||
const queueMessageSpy = jest.spyOn(docWritethroughProcessorQueue, "add")
|
||||
|
||||
await config.doInTenant(async () => {
|
||||
let patches = await parallelPatch(5)
|
||||
expect(queueMessageSpy).toBeCalledTimes(5)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining(patches)
|
||||
)
|
||||
|
||||
patches = { ...patches, ...(await parallelPatch(40)) }
|
||||
expect(queueMessageSpy).toBeCalledTimes(45)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining(patches)
|
||||
)
|
||||
|
||||
patches = { ...patches, ...(await parallelPatch(10)) }
|
||||
expect(queueMessageSpy).toBeCalledTimes(55)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining(patches)
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
// This is not yet supported
|
||||
it.skip("patches will execute in order", async () => {
|
||||
let incrementalValue = 0
|
||||
const keyToOverride = generator.word()
|
||||
async function incrementalPatches(count: number) {
|
||||
for (let i = 0; i < count; i++) {
|
||||
await docWritethrough.patch({ [keyToOverride]: incrementalValue++ })
|
||||
}
|
||||
}
|
||||
|
||||
await config.doInTenant(async () => {
|
||||
await incrementalPatches(5)
|
||||
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining({ [keyToOverride]: 5 })
|
||||
)
|
||||
|
||||
await incrementalPatches(40)
|
||||
await waitForQueueCompletion()
|
||||
expect(await db.get(documentId)).toEqual(
|
||||
expect.objectContaining({ [keyToOverride]: 45 })
|
||||
)
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
|
@ -71,7 +71,15 @@ export class DatabaseImpl implements Database {
|
|||
DatabaseImpl.nano = buildNano(couchInfo)
|
||||
}
|
||||
|
||||
async exists() {
|
||||
exists(docId?: string) {
|
||||
if (docId === undefined) {
|
||||
return this.dbExists()
|
||||
}
|
||||
|
||||
return this.docExists(docId)
|
||||
}
|
||||
|
||||
private async dbExists() {
|
||||
const response = await directCouchUrlCall({
|
||||
url: `${this.couchInfo.url}/${this.name}`,
|
||||
method: "HEAD",
|
||||
|
@ -80,6 +88,15 @@ export class DatabaseImpl implements Database {
|
|||
return response.status === 200
|
||||
}
|
||||
|
||||
private async docExists(id: string): Promise<boolean> {
|
||||
try {
|
||||
await this.performCall(db => () => db.head(id))
|
||||
return true
|
||||
} catch {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
private nano() {
|
||||
return this.instanceNano || DatabaseImpl.nano
|
||||
}
|
||||
|
|
|
@ -24,9 +24,12 @@ export class DDInstrumentedDatabase implements Database {
|
|||
return this.db.name
|
||||
}
|
||||
|
||||
exists(): Promise<boolean> {
|
||||
exists(docId?: string): Promise<boolean> {
|
||||
return tracer.trace("db.exists", span => {
|
||||
span?.addTags({ db_name: this.name })
|
||||
span?.addTags({ db_name: this.name, doc_id: docId })
|
||||
if (docId) {
|
||||
return this.db.exists(docId)
|
||||
}
|
||||
return this.db.exists()
|
||||
})
|
||||
}
|
||||
|
|
|
@ -0,0 +1,55 @@
|
|||
import _ from "lodash"
|
||||
import { AnyDocument } from "@budibase/types"
|
||||
import { generator } from "../../../tests"
|
||||
import { DatabaseImpl } from "../couch"
|
||||
import { newid } from "../../utils"
|
||||
|
||||
describe("DatabaseImpl", () => {
|
||||
const database = new DatabaseImpl(generator.word())
|
||||
const documents: AnyDocument[] = []
|
||||
|
||||
beforeAll(async () => {
|
||||
const docsToCreate = Array.from({ length: 10 }).map(() => ({
|
||||
_id: newid(),
|
||||
}))
|
||||
const createdDocs = await database.bulkDocs(docsToCreate)
|
||||
|
||||
documents.push(...createdDocs.map((x: any) => ({ _id: x.id, _rev: x.rev })))
|
||||
})
|
||||
|
||||
describe("document exists", () => {
|
||||
it("can check existing docs by id", async () => {
|
||||
const existingDoc = _.sample(documents)
|
||||
const result = await database.exists(existingDoc!._id!)
|
||||
|
||||
expect(result).toBe(true)
|
||||
})
|
||||
|
||||
it("can check non existing docs by id", async () => {
|
||||
const result = await database.exists(newid())
|
||||
|
||||
expect(result).toBe(false)
|
||||
})
|
||||
|
||||
it("can check an existing doc by id multiple times", async () => {
|
||||
const existingDoc = _.sample(documents)
|
||||
const id = existingDoc!._id!
|
||||
|
||||
const results = []
|
||||
results.push(await database.exists(id))
|
||||
results.push(await database.exists(id))
|
||||
results.push(await database.exists(id))
|
||||
|
||||
expect(results).toEqual([true, true, true])
|
||||
})
|
||||
|
||||
it("returns false after the doc is deleted", async () => {
|
||||
const existingDoc = _.sample(documents)
|
||||
const id = existingDoc!._id!
|
||||
expect(await database.exists(id)).toBe(true)
|
||||
|
||||
await database.remove(existingDoc!)
|
||||
expect(await database.exists(id)).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
|
@ -4,4 +4,5 @@ export enum JobQueue {
|
|||
AUDIT_LOG = "auditLogQueue",
|
||||
SYSTEM_EVENT_QUEUE = "systemEventQueue",
|
||||
APP_MIGRATION = "appMigration",
|
||||
DOC_WRITETHROUGH_QUEUE = "docWritethroughQueue",
|
||||
}
|
||||
|
|
|
@ -1,5 +1,14 @@
|
|||
import events from "events"
|
||||
import { timeout } from "../utils"
|
||||
import { newid, timeout } from "../utils"
|
||||
import { Queue, QueueOptions, JobOptions } from "./queue"
|
||||
|
||||
interface JobMessage {
|
||||
id: string
|
||||
timestamp: number
|
||||
queue: string
|
||||
data: any
|
||||
opts?: JobOptions
|
||||
}
|
||||
|
||||
/**
|
||||
* Bull works with a Job wrapper around all messages that contains a lot more information about
|
||||
|
@ -10,12 +19,13 @@ import { timeout } from "../utils"
|
|||
* @returns A new job which can now be put onto the queue, this is mostly an
|
||||
* internal structure so that an in memory queue can be easily swapped for a Bull queue.
|
||||
*/
|
||||
function newJob(queue: string, message: any) {
|
||||
function newJob(queue: string, message: any, opts?: JobOptions): JobMessage {
|
||||
return {
|
||||
id: newid(),
|
||||
timestamp: Date.now(),
|
||||
queue: queue,
|
||||
data: message,
|
||||
opts: {},
|
||||
opts,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -24,26 +34,29 @@ function newJob(queue: string, message: any) {
|
|||
* It is relatively simple, using an event emitter internally to register when messages are available
|
||||
* to the consumers - in can support many inputs and many consumers.
|
||||
*/
|
||||
class InMemoryQueue {
|
||||
class InMemoryQueue implements Partial<Queue> {
|
||||
_name: string
|
||||
_opts?: any
|
||||
_messages: any[]
|
||||
_opts?: QueueOptions
|
||||
_messages: JobMessage[]
|
||||
_queuedJobIds: Set<string>
|
||||
_emitter: EventEmitter
|
||||
_runCount: number
|
||||
_addCount: number
|
||||
|
||||
/**
|
||||
* The constructor the queue, exactly the same as that of Bulls.
|
||||
* @param name The name of the queue which is being configured.
|
||||
* @param opts This is not used by the in memory queue as there is no real use
|
||||
* case when in memory, but is the same API as Bull
|
||||
*/
|
||||
constructor(name: string, opts?: any) {
|
||||
constructor(name: string, opts?: QueueOptions) {
|
||||
this._name = name
|
||||
this._opts = opts
|
||||
this._messages = []
|
||||
this._emitter = new events.EventEmitter()
|
||||
this._runCount = 0
|
||||
this._addCount = 0
|
||||
this._queuedJobIds = new Set<string>()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -55,22 +68,42 @@ class InMemoryQueue {
|
|||
* note this is incredibly limited compared to Bull as in reality the Job would contain
|
||||
* a lot more information about the queue and current status of Bull cluster.
|
||||
*/
|
||||
process(func: any) {
|
||||
async process(func: any) {
|
||||
this._emitter.on("message", async () => {
|
||||
if (this._messages.length <= 0) {
|
||||
return
|
||||
}
|
||||
let msg = this._messages.shift()
|
||||
|
||||
let resp = func(msg)
|
||||
|
||||
async function retryFunc(fnc: any) {
|
||||
try {
|
||||
await fnc
|
||||
} catch (e: any) {
|
||||
await new Promise<void>(r => setTimeout(() => r(), 50))
|
||||
|
||||
await retryFunc(func(msg))
|
||||
}
|
||||
}
|
||||
|
||||
if (resp.then != null) {
|
||||
await resp
|
||||
try {
|
||||
await retryFunc(resp)
|
||||
} catch (e: any) {
|
||||
console.error(e)
|
||||
}
|
||||
}
|
||||
this._runCount++
|
||||
const jobId = msg?.opts?.jobId?.toString()
|
||||
if (jobId && msg?.opts?.removeOnComplete) {
|
||||
this._queuedJobIds.delete(jobId)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
async isReady() {
|
||||
return true
|
||||
return this as any
|
||||
}
|
||||
|
||||
// simply puts a message to the queue and emits to the queue for processing
|
||||
|
@ -83,27 +116,45 @@ class InMemoryQueue {
|
|||
* @param repeat serves no purpose for the import queue.
|
||||
*/
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
add(msg: any, repeat: boolean) {
|
||||
if (typeof msg !== "object") {
|
||||
async add(data: any, opts?: JobOptions) {
|
||||
const jobId = opts?.jobId?.toString()
|
||||
if (jobId && this._queuedJobIds.has(jobId)) {
|
||||
console.log(`Ignoring already queued job ${jobId}`)
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof data !== "object") {
|
||||
throw "Queue only supports carrying JSON."
|
||||
}
|
||||
this._messages.push(newJob(this._name, msg))
|
||||
this._addCount++
|
||||
this._emitter.emit("message")
|
||||
if (jobId) {
|
||||
this._queuedJobIds.add(jobId)
|
||||
}
|
||||
|
||||
const pushMessage = () => {
|
||||
this._messages.push(newJob(this._name, data, opts))
|
||||
this._addCount++
|
||||
this._emitter.emit("message")
|
||||
}
|
||||
|
||||
const delay = opts?.delay
|
||||
if (delay) {
|
||||
setTimeout(pushMessage, delay)
|
||||
} else {
|
||||
pushMessage()
|
||||
}
|
||||
return {} as any
|
||||
}
|
||||
|
||||
/**
|
||||
* replicating the close function from bull, which waits for jobs to finish.
|
||||
*/
|
||||
async close() {
|
||||
return []
|
||||
}
|
||||
async close() {}
|
||||
|
||||
/**
|
||||
* This removes a cron which has been implemented, this is part of Bull API.
|
||||
* @param cronJobId The cron which is to be removed.
|
||||
*/
|
||||
removeRepeatableByKey(cronJobId: string) {
|
||||
async removeRepeatableByKey(cronJobId: string) {
|
||||
// TODO: implement for testing
|
||||
console.log(cronJobId)
|
||||
}
|
||||
|
@ -111,12 +162,12 @@ class InMemoryQueue {
|
|||
/**
|
||||
* Implemented for tests
|
||||
*/
|
||||
getRepeatableJobs() {
|
||||
async getRepeatableJobs() {
|
||||
return []
|
||||
}
|
||||
|
||||
// eslint-disable-next-line no-unused-vars
|
||||
removeJobs(pattern: string) {
|
||||
async removeJobs(pattern: string) {
|
||||
// no-op
|
||||
}
|
||||
|
||||
|
@ -128,18 +179,22 @@ class InMemoryQueue {
|
|||
}
|
||||
|
||||
async getJob() {
|
||||
return {}
|
||||
return null
|
||||
}
|
||||
|
||||
on() {
|
||||
// do nothing
|
||||
return this
|
||||
return this as any
|
||||
}
|
||||
|
||||
async waitForCompletion() {
|
||||
do {
|
||||
await timeout(50)
|
||||
} while (this._addCount < this._runCount)
|
||||
} while (this.hasRunningJobs())
|
||||
}
|
||||
|
||||
hasRunningJobs() {
|
||||
return this._addCount > this._runCount
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -88,6 +88,7 @@ enum QueueEventType {
|
|||
AUDIT_LOG_EVENT = "audit-log-event",
|
||||
SYSTEM_EVENT = "system-event",
|
||||
APP_MIGRATION = "app-migration",
|
||||
DOC_WRITETHROUGH = "doc-writethrough",
|
||||
}
|
||||
|
||||
const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
|
||||
|
@ -96,6 +97,7 @@ const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
|
|||
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
|
||||
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
|
||||
[JobQueue.APP_MIGRATION]: QueueEventType.APP_MIGRATION,
|
||||
[JobQueue.DOC_WRITETHROUGH_QUEUE]: QueueEventType.DOC_WRITETHROUGH,
|
||||
}
|
||||
|
||||
function logging(queue: Queue, jobQueue: JobQueue) {
|
||||
|
|
|
@ -7,6 +7,8 @@ import { addListeners, StalledFn } from "./listeners"
|
|||
import { Duration } from "../utils"
|
||||
import * as timers from "../timers"
|
||||
|
||||
export { QueueOptions, Queue, JobOptions } from "bull"
|
||||
|
||||
// the queue lock is held for 5 minutes
|
||||
const QUEUE_LOCK_MS = Duration.fromMinutes(5).toMs()
|
||||
// queue lock is refreshed every 30 seconds
|
||||
|
|
|
@ -9,7 +9,8 @@ let userClient: Client,
|
|||
lockClient: Client,
|
||||
socketClient: Client,
|
||||
inviteClient: Client,
|
||||
passwordResetClient: Client
|
||||
passwordResetClient: Client,
|
||||
docWritethroughClient: Client
|
||||
|
||||
export async function init() {
|
||||
userClient = await new Client(utils.Databases.USER_CACHE).init()
|
||||
|
@ -24,6 +25,9 @@ export async function init() {
|
|||
utils.Databases.SOCKET_IO,
|
||||
utils.SelectableDatabase.SOCKET_IO
|
||||
).init()
|
||||
docWritethroughClient = await new Client(
|
||||
utils.Databases.DOC_WRITE_THROUGH
|
||||
).init()
|
||||
}
|
||||
|
||||
export async function shutdown() {
|
||||
|
@ -104,3 +108,10 @@ export async function getPasswordResetClient() {
|
|||
}
|
||||
return passwordResetClient
|
||||
}
|
||||
|
||||
export async function getDocWritethroughClient() {
|
||||
if (!writethroughClient) {
|
||||
await init()
|
||||
}
|
||||
return writethroughClient
|
||||
}
|
||||
|
|
|
@ -320,6 +320,11 @@ class RedisWrapper {
|
|||
await this.getClient().del(addDbPrefix(db, key))
|
||||
}
|
||||
|
||||
async bulkDelete(keys: string[]) {
|
||||
const db = this._db
|
||||
await this.getClient().del(keys.map(key => addDbPrefix(db, key)))
|
||||
}
|
||||
|
||||
async clear() {
|
||||
let items = await this.scan()
|
||||
await Promise.all(items.map((obj: any) => this.delete(obj.key)))
|
||||
|
|
|
@ -30,6 +30,7 @@ export enum Databases {
|
|||
LOCKS = "locks",
|
||||
SOCKET_IO = "socket_io",
|
||||
BPM_EVENTS = "bpmEvents",
|
||||
DOC_WRITE_THROUGH = "docWriteThrough",
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,7 +18,7 @@ export const account = (partial: Partial<Account> = {}): Account => {
|
|||
return {
|
||||
accountId: uuid(),
|
||||
tenantId: generator.word(),
|
||||
email: generator.email(),
|
||||
email: generator.email({ domain: "example.com" }),
|
||||
tenantName: generator.word(),
|
||||
hosting: Hosting.SELF,
|
||||
createdAt: Date.now(),
|
||||
|
|
|
@ -13,7 +13,7 @@ interface CreateUserRequestFields {
|
|||
export function createUserRequest(userData?: Partial<CreateUserRequestFields>) {
|
||||
const defaultValues = {
|
||||
externalId: uuid(),
|
||||
email: generator.email(),
|
||||
email: `${uuid()}@example.com`,
|
||||
firstName: generator.first(),
|
||||
lastName: generator.last(),
|
||||
username: generator.name(),
|
||||
|
|
|
@ -40,8 +40,15 @@
|
|||
part2: PrettyRelationshipDefinitions.MANY,
|
||||
},
|
||||
}
|
||||
let relationshipOpts1 = Object.values(PrettyRelationshipDefinitions)
|
||||
let relationshipOpts2 = Object.values(PrettyRelationshipDefinitions)
|
||||
$: relationshipOpts1 =
|
||||
relationshipPart2 === PrettyRelationshipDefinitions.ONE
|
||||
? [PrettyRelationshipDefinitions.MANY]
|
||||
: Object.values(PrettyRelationshipDefinitions)
|
||||
|
||||
$: relationshipOpts2 =
|
||||
relationshipPart1 === PrettyRelationshipDefinitions.ONE
|
||||
? [PrettyRelationshipDefinitions.MANY]
|
||||
: Object.values(PrettyRelationshipDefinitions)
|
||||
|
||||
let relationshipPart1 = PrettyRelationshipDefinitions.ONE
|
||||
let relationshipPart2 = PrettyRelationshipDefinitions.MANY
|
||||
|
|
|
@ -45,7 +45,10 @@
|
|||
<Checkbox text="Require confirmation" bind:value={parameters.confirm} />
|
||||
|
||||
{#if parameters.confirm}
|
||||
<Label small>Confirm text</Label>
|
||||
<Label small>Title</Label>
|
||||
<Input placeholder="Delete Row" bind:value={parameters.customTitleText} />
|
||||
|
||||
<Label small>Text</Label>
|
||||
<Input
|
||||
placeholder="Are you sure you want to delete?"
|
||||
bind:value={parameters.confirmText}
|
||||
|
|
|
@ -72,7 +72,13 @@
|
|||
<Checkbox text="Require confirmation" bind:value={parameters.confirm} />
|
||||
|
||||
{#if parameters.confirm}
|
||||
<Label small>Confirm text</Label>
|
||||
<Label small>Title</Label>
|
||||
<Input
|
||||
placeholder="Duplicate Row"
|
||||
bind:value={parameters.customTitleText}
|
||||
/>
|
||||
|
||||
<Label small>Text</Label>
|
||||
<Input
|
||||
placeholder="Are you sure you want to duplicate this row?"
|
||||
bind:value={parameters.confirmText}
|
||||
|
|
|
@ -64,7 +64,13 @@
|
|||
|
||||
{#if parameters.confirm}
|
||||
<Input
|
||||
label="Confirm text"
|
||||
label="Title"
|
||||
placeholder="Execute Query"
|
||||
bind:value={parameters.customTitleText}
|
||||
/>
|
||||
|
||||
<Input
|
||||
label="Text"
|
||||
placeholder="Are you sure you want to execute this query?"
|
||||
bind:value={parameters.confirmText}
|
||||
/>
|
||||
|
|
|
@ -72,7 +72,10 @@
|
|||
<Checkbox text="Require confirmation" bind:value={parameters.confirm} />
|
||||
|
||||
{#if parameters.confirm}
|
||||
<Label small>Confirm text</Label>
|
||||
<Label small>Title</Label>
|
||||
<Input placeholder="Save Row" bind:value={parameters.customTitleText} />
|
||||
|
||||
<Label small>Text</Label>
|
||||
<Input
|
||||
placeholder="Are you sure you want to save this row?"
|
||||
bind:value={parameters.confirmText}
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 22a278da720d92991dabdcd4cb6c96e7abe29781
|
||||
Subproject commit 2b322d0f4b71ba96664d383b94c30445ead7ac5b
|
|
@ -8,6 +8,7 @@ module FetchMock {
|
|||
let mockSearch = false
|
||||
|
||||
const func = async (url: any, opts: any) => {
|
||||
const { host, pathname } = new URL(url)
|
||||
function json(body: any, status = 200) {
|
||||
return {
|
||||
status,
|
||||
|
@ -34,7 +35,7 @@ module FetchMock {
|
|||
}
|
||||
}
|
||||
|
||||
if (url.includes("/api/global")) {
|
||||
if (pathname.includes("/api/global")) {
|
||||
const user = {
|
||||
email: "test@example.com",
|
||||
_id: "us_test@example.com",
|
||||
|
@ -47,31 +48,31 @@ module FetchMock {
|
|||
global: false,
|
||||
},
|
||||
}
|
||||
return url.endsWith("/users") && opts.method === "GET"
|
||||
return pathname.endsWith("/users") && opts.method === "GET"
|
||||
? json([user])
|
||||
: json(user)
|
||||
}
|
||||
// mocked data based on url
|
||||
else if (url.includes("api/apps")) {
|
||||
else if (pathname.includes("api/apps")) {
|
||||
return json({
|
||||
app1: {
|
||||
url: "/app1",
|
||||
},
|
||||
})
|
||||
} else if (url.includes("example.com")) {
|
||||
} else if (host.includes("example.com")) {
|
||||
return json({
|
||||
body: opts.body,
|
||||
url,
|
||||
method: opts.method,
|
||||
})
|
||||
} else if (url.includes("invalid.com")) {
|
||||
} else if (host.includes("invalid.com")) {
|
||||
return json(
|
||||
{
|
||||
invalid: true,
|
||||
},
|
||||
404
|
||||
)
|
||||
} else if (mockSearch && url.includes("_search")) {
|
||||
} else if (mockSearch && pathname.includes("_search")) {
|
||||
const body = opts.body
|
||||
const parts = body.split("tableId:")
|
||||
let tableId
|
||||
|
@ -90,7 +91,7 @@ module FetchMock {
|
|||
],
|
||||
bookmark: "test",
|
||||
})
|
||||
} else if (url.includes("google.com")) {
|
||||
} else if (host.includes("google.com")) {
|
||||
return json({
|
||||
url,
|
||||
opts,
|
||||
|
@ -177,7 +178,7 @@ module FetchMock {
|
|||
} else if (url === "https://www.googleapis.com/oauth2/v4/token") {
|
||||
// any valid response
|
||||
return json({})
|
||||
} else if (url.includes("failonce.com")) {
|
||||
} else if (host.includes("failonce.com")) {
|
||||
failCount++
|
||||
if (failCount === 1) {
|
||||
return json({ message: "error" }, 500)
|
||||
|
|
|
@ -10,3 +10,4 @@ process.env.MOCK_REDIS = "1"
|
|||
process.env.PLATFORM_URL = "http://localhost:10000"
|
||||
process.env.REDIS_PASSWORD = "budibase"
|
||||
process.env.BUDIBASE_VERSION = "0.0.0+jest"
|
||||
process.env.WORKER_URL = "http://localhost:10000"
|
||||
|
|
|
@ -347,7 +347,7 @@ export default class TestConfiguration {
|
|||
lastName = generator.last(),
|
||||
builder = { global: true },
|
||||
admin = { global: false },
|
||||
email = generator.email(),
|
||||
email = generator.email({ domain: "example.com" }),
|
||||
tenantId = this.getTenantId(),
|
||||
roles = {},
|
||||
} = config
|
||||
|
@ -512,7 +512,7 @@ export default class TestConfiguration {
|
|||
|
||||
async basicRoleHeaders() {
|
||||
return await this.roleHeaders({
|
||||
email: generator.email(),
|
||||
email: generator.email({ domain: "example.com" }),
|
||||
builder: false,
|
||||
prodApp: true,
|
||||
roleId: roles.BUILTIN_ROLE_IDS.BASIC,
|
||||
|
@ -520,7 +520,7 @@ export default class TestConfiguration {
|
|||
}
|
||||
|
||||
async roleHeaders({
|
||||
email = generator.email(),
|
||||
email = generator.email({ domain: "example.com" }),
|
||||
roleId = roles.BUILTIN_ROLE_IDS.ADMIN,
|
||||
builder = false,
|
||||
prodApp = true,
|
||||
|
|
|
@ -24,7 +24,7 @@ export interface GroupUser {
|
|||
}
|
||||
|
||||
export interface UserGroupRoles {
|
||||
[key: string]: string
|
||||
[key: string]: string | undefined
|
||||
}
|
||||
|
||||
export interface SearchGroupRequest {}
|
||||
|
|
|
@ -128,6 +128,7 @@ export interface Database {
|
|||
|
||||
exists(): Promise<boolean>
|
||||
get<T extends Document>(id?: string): Promise<T>
|
||||
exists(docId: string): Promise<boolean>
|
||||
getMultiple<T extends Document>(
|
||||
ids: string[],
|
||||
opts?: { allowMissing?: boolean }
|
||||
|
|
|
@ -48,7 +48,7 @@ export interface GroupAddedOnboardingEvent extends BaseEvent {
|
|||
}
|
||||
|
||||
export interface GroupPermissionsEditedEvent extends BaseEvent {
|
||||
permissions: Record<string, string>
|
||||
permissions: Record<string, string | undefined>
|
||||
groupId: string
|
||||
audited: {
|
||||
name: string
|
||||
|
|
|
@ -147,7 +147,7 @@ describe("/api/global/groups", () => {
|
|||
|
||||
await Promise.all(
|
||||
Array.from({ length: 30 }).map(async (_, i) => {
|
||||
const email = `user${i}@${generator.domain()}`
|
||||
const email = `user${i}@example.com`
|
||||
const user = await config.api.users.saveUser({
|
||||
...structures.users.user(),
|
||||
email,
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
import { sdk as proSdk } from "@budibase/pro"
|
||||
import * as userSdk from "./sdk/users"
|
||||
|
||||
export const initPro = async () => {
|
||||
await proSdk.init({})
|
||||
|
|
|
@ -84,7 +84,7 @@ describe("Accounts", () => {
|
|||
})
|
||||
|
||||
it("searches by email", async () => {
|
||||
const email = generator.email()
|
||||
const email = generator.email({ domain: "example.com" })
|
||||
|
||||
// Empty result
|
||||
const [_, emptyBody] = await config.api.accounts.search(email, "email")
|
||||
|
|
|
@ -4,7 +4,7 @@ import { generator } from "../../shared"
|
|||
export const generateUser = (
|
||||
overrides: Partial<User> = {}
|
||||
): CreateUserParams => ({
|
||||
email: generator.email(),
|
||||
email: generator.email({ domain: "example.com" }),
|
||||
roles: {
|
||||
[generator.string({ length: 32, alpha: true, numeric: true })]:
|
||||
generator.word(),
|
||||
|
|
Loading…
Reference in New Issue