Revert unnecessary change.

This commit is contained in:
Sam Rose 2024-09-19 16:51:00 +01:00
parent aecd4f9e4d
commit ce105d8f4e
No known key found for this signature in database
2 changed files with 33 additions and 24 deletions

View File

@ -339,7 +339,7 @@ export class GoogleSheetsIntegration implements DatasourcePlus {
const tables: Record<string, Table> = {} const tables: Record<string, Table> = {}
let errors: Record<string, string> = {} let errors: Record<string, string> = {}
await utils.parallelForEach( await utils.parallelForeach(
sheets, sheets,
async sheet => { async sheet => {
try { try {
@ -367,7 +367,7 @@ export class GoogleSheetsIntegration implements DatasourcePlus {
throw err throw err
} }
}, },
{ maxConcurrency: 2 } 10
) )
for (const sheet of sheets) { for (const sheet of sheets) {

View File

@ -7,34 +7,43 @@ export function unreachable(
throw new Error(message) throw new Error(message)
} }
interface PromiseWithId<T> { export async function parallelForeach<T>(
promise: Promise<T>
id: number
}
export async function parallelForEach<T>(
items: T[], items: T[],
task: (item: T) => Promise<void>, task: (item: T) => Promise<void>,
opts?: { maxConcurrency?: number } maxConcurrency: number
): Promise<void> { ): Promise<void> {
const { maxConcurrency = 10 } = opts || {} const promises: Promise<void>[] = []
let next = 0 let index = 0
let inProgress: PromiseWithId<number>[] = []
while (next < items.length) {
if (inProgress.length === maxConcurrency) {
const finished = await Promise.race(inProgress.map(t => t.promise))
inProgress = inProgress.filter(task => task.id !== finished)
}
const promise = async (next: number) => { const processItem = async (item: T) => {
await task(items[next]) try {
return next await task(item)
} finally {
processNext()
} }
inProgress.push({ promise: promise(next), id: next })
next++
} }
await Promise.all(inProgress.map(t => t.promise))
const processNext = () => {
if (index >= items.length) {
// No more items to process
return
}
const item = items[index]
index++
const promise = processItem(item)
promises.push(promise)
if (promises.length >= maxConcurrency) {
Promise.race(promises).then(processNext)
} else {
processNext()
}
}
processNext()
await Promise.all(promises)
} }
export function filterValueToLabel() { export function filterValueToLabel() {