Skip to content

Commit 0ec3e23

Browse files
committed
fix: limit concurrent requests
1 parent cc2255d commit 0ec3e23

5 files changed

Lines changed: 228 additions & 89 deletions

File tree

app/composables/useNpmRegistry.ts

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type {
99
PackageVersionInfo,
1010
} from '#shared/types'
1111
import type { ReleaseType } from 'semver'
12+
import { mapWithConcurrency } from '#shared/utils/async'
1213
import { maxSatisfying, prerelease, major, minor, diff, gt, compare } from 'semver'
1314
import { isExactVersion } from '~/utils/versions'
1415
import { extractInstallScriptsInfo } from '~/utils/install-scripts'
@@ -546,34 +547,28 @@ export function useOrgPackages(orgName: MaybeRefOrGetter<string>) {
546547

547548
// Fetch packuments and downloads in parallel
548549
const [packuments, downloads] = await Promise.all([
549-
// Fetch packuments in parallel (with concurrency limit)
550+
// Fetch packuments with concurrency limit
550551
(async () => {
551-
const concurrency = 10
552-
const results: MinimalPackument[] = []
553-
for (let i = 0; i < packageNames.length; i += concurrency) {
554-
const batch = packageNames.slice(i, i + concurrency)
555-
const batchResults = await Promise.all(
556-
batch.map(async name => {
557-
try {
558-
const encoded = encodePackageName(name)
559-
const { data: pkg } = await cachedFetch<MinimalPackument>(
560-
`${NPM_REGISTRY}/${encoded}`,
561-
{ signal },
562-
)
563-
return pkg
564-
} catch {
565-
return null
566-
}
567-
}),
568-
)
569-
for (const pkg of batchResults) {
570-
// Filter out any unpublished packages (missing dist-tags)
571-
if (pkg && pkg['dist-tags']) {
572-
results.push(pkg)
552+
const results = await mapWithConcurrency(
553+
packageNames,
554+
async name => {
555+
try {
556+
const encoded = encodePackageName(name)
557+
const { data: pkg } = await cachedFetch<MinimalPackument>(
558+
`${NPM_REGISTRY}/${encoded}`,
559+
{ signal },
560+
)
561+
return pkg
562+
} catch {
563+
return null
573564
}
574-
}
575-
}
576-
return results
565+
},
566+
10,
567+
)
568+
// Filter out any unpublished packages (missing dist-tags)
569+
return results.filter(
570+
(pkg): pkg is MinimalPackument => pkg !== null && !!pkg['dist-tags'],
571+
)
577572
})(),
578573
// Fetch downloads in bulk
579574
fetchBulkDownloads(packageNames, { signal }),
@@ -772,23 +767,20 @@ export function useOutdatedDependencies(
772767
return
773768
}
774769

775-
const results: Record<string, OutdatedDependencyInfo> = {}
776770
const entries = Object.entries(deps)
777-
const batchSize = 5
778-
779-
for (let i = 0; i < entries.length; i += batchSize) {
780-
const batch = entries.slice(i, i + batchSize)
781-
const batchResults = await Promise.all(
782-
batch.map(async ([name, constraint]) => {
783-
const info = await checkDependencyOutdated(cachedFetch, name, constraint)
784-
return [name, info] as const
785-
}),
786-
)
771+
const batchResults = await mapWithConcurrency(
772+
entries,
773+
async ([name, constraint]) => {
774+
const info = await checkDependencyOutdated(cachedFetch, name, constraint)
775+
return [name, info] as const
776+
},
777+
5,
778+
)
787779

788-
for (const [name, info] of batchResults) {
789-
if (info) {
790-
results[name] = info
791-
}
780+
const results: Record<string, OutdatedDependencyInfo> = {}
781+
for (const [name, info] of batchResults) {
782+
if (info) {
783+
results[name] = info
792784
}
793785
}
794786

server/utils/dependency-analysis.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,12 @@ import type {
99
VulnerabilityTreeResult,
1010
DeprecatedPackageInfo,
1111
} from '#shared/types/dependency-analysis'
12+
import { mapWithConcurrency } from '#shared/utils/async'
1213
import { resolveDependencyTree } from './dependency-resolver'
1314

15+
/** Maximum concurrent requests for fetching vulnerability details */
16+
const OSV_DETAIL_CONCURRENCY = 25
17+
1418
/** Package info needed for OSV queries */
1519
interface PackageQueryInfo {
1620
name: string
@@ -47,6 +51,15 @@ async function queryOsvBatch(
4751
if (result?.vulns && result.vulns.length > 0) {
4852
vulnerableIndices.push(i)
4953
}
54+
// Warn if pagination token present (>1000 vulns for single query or >3000 total)
55+
// This is extremely unlikely for npm packages but log for visibility
56+
if (result?.next_page_token) {
57+
// oxlint-disable-next-line no-console -- warn about paginated results
58+
console.warn(
59+
`[dep-analysis] OSV batch result has pagination token for package index ${i} ` +
60+
`(${packages[i]?.name}@${packages[i]?.version}) - some vulnerabilities may be missing`,
61+
)
62+
}
5063
}
5164

5265
return { vulnerableIndices, failed: false }
@@ -199,10 +212,10 @@ export const analyzeDependencyTree = defineCachedFunction(
199212
if (!batchFailed && vulnerableIndices.length > 0) {
200213
// Step 2: Fetch full vulnerability details only for packages with vulns
201214
// This is typically a small fraction of total packages
202-
const vulnerablePackageInfos = vulnerableIndices.map(i => packages[i]!)
203-
204-
const detailResults = await Promise.all(
205-
vulnerablePackageInfos.map(pkg => queryOsvDetails(pkg)),
215+
const detailResults = await mapWithConcurrency(
216+
vulnerableIndices,
217+
i => queryOsvDetails(packages[i]!),
218+
OSV_DETAIL_CONCURRENCY,
206219
)
207220

208221
for (const result of detailResults) {

server/utils/dependency-resolver.ts

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import type { Packument, PackumentVersion, DependencyDepth } from '#shared/types'
2+
import { mapWithConcurrency } from '#shared/utils/async'
23
import { maxSatisfying } from 'semver'
34

5+
/** Concurrency limit for fetching packuments during dependency resolution */
6+
const PACKUMENT_FETCH_CONCURRENCY = 20
7+
48
/**
59
* Target platform for dependency resolution.
610
* We resolve for linux-x64 with glibc as a representative platform.
@@ -142,63 +146,61 @@ export async function resolveDependencyTree(
142146
seen.add(name)
143147
}
144148

145-
// Process current level in batches
149+
// Process current level with concurrency limit
146150
const entries = [...currentLevel.entries()]
147-
for (let i = 0; i < entries.length; i += 20) {
148-
const batch = entries.slice(i, i + 20)
149-
150-
await Promise.all(
151-
batch.map(async ([name, { range, optional, path }]) => {
152-
const packument = await fetchPackument(name)
153-
if (!packument) return
151+
await mapWithConcurrency(
152+
entries,
153+
async ([name, { range, optional, path }]) => {
154+
const packument = await fetchPackument(name)
155+
if (!packument) return
154156

155-
const versions = Object.keys(packument.versions)
156-
const version = resolveVersion(range, versions)
157-
if (!version) return
157+
const versions = Object.keys(packument.versions)
158+
const version = resolveVersion(range, versions)
159+
if (!version) return
158160

159-
const versionData = packument.versions[version]
160-
if (!versionData) return
161+
const versionData = packument.versions[version]
162+
if (!versionData) return
161163

162-
if (!matchesPlatform(versionData)) return
164+
if (!matchesPlatform(versionData)) return
163165

164-
const size = (versionData.dist as { unpackedSize?: number })?.unpackedSize ?? 0
165-
const key = `${name}@${version}`
166+
const size = (versionData.dist as { unpackedSize?: number })?.unpackedSize ?? 0
167+
const key = `${name}@${version}`
166168

167-
// Build path for this package (path to parent + this package with version)
168-
const currentPath = [...path, `${name}@${version}`]
169+
// Build path for this package (path to parent + this package with version)
170+
const currentPath = [...path, `${name}@${version}`]
169171

170-
if (!resolved.has(key)) {
171-
const pkg: ResolvedPackage = { name, version, size, optional }
172-
if (options.trackDepth) {
173-
pkg.depth = level === 0 ? 'root' : level === 1 ? 'direct' : 'transitive'
174-
pkg.path = currentPath
175-
}
176-
if (versionData.deprecated) {
177-
pkg.deprecated = versionData.deprecated
178-
}
179-
resolved.set(key, pkg)
172+
if (!resolved.has(key)) {
173+
const pkg: ResolvedPackage = { name, version, size, optional }
174+
if (options.trackDepth) {
175+
pkg.depth = level === 0 ? 'root' : level === 1 ? 'direct' : 'transitive'
176+
pkg.path = currentPath
180177
}
181-
182-
// Collect dependencies for next level
183-
if (versionData.dependencies) {
184-
for (const [depName, depRange] of Object.entries(versionData.dependencies)) {
185-
if (!seen.has(depName) && !nextLevel.has(depName)) {
186-
nextLevel.set(depName, { range: depRange, optional: false, path: currentPath })
187-
}
178+
if (versionData.deprecated) {
179+
pkg.deprecated = versionData.deprecated
180+
}
181+
resolved.set(key, pkg)
182+
}
183+
184+
// Collect dependencies for next level
185+
if (versionData.dependencies) {
186+
for (const [depName, depRange] of Object.entries(versionData.dependencies)) {
187+
if (!seen.has(depName) && !nextLevel.has(depName)) {
188+
nextLevel.set(depName, { range: depRange, optional: false, path: currentPath })
188189
}
189190
}
191+
}
190192

191-
// Collect optional dependencies
192-
if (versionData.optionalDependencies) {
193-
for (const [depName, depRange] of Object.entries(versionData.optionalDependencies)) {
194-
if (!seen.has(depName) && !nextLevel.has(depName)) {
195-
nextLevel.set(depName, { range: depRange, optional: true, path: currentPath })
196-
}
193+
// Collect optional dependencies
194+
if (versionData.optionalDependencies) {
195+
for (const [depName, depRange] of Object.entries(versionData.optionalDependencies)) {
196+
if (!seen.has(depName) && !nextLevel.has(depName)) {
197+
nextLevel.set(depName, { range: depRange, optional: true, path: currentPath })
197198
}
198199
}
199-
}),
200-
)
201-
}
200+
}
201+
},
202+
PACKUMENT_FETCH_CONCURRENCY,
203+
)
202204

203205
currentLevel = nextLevel
204206
level++

shared/utils/async.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/**
2+
* Async utilities for controlled concurrency and parallel execution.
3+
*/
4+
5+
/**
6+
* Map over an array with limited concurrency.
7+
* Similar to Promise.all but limits how many promises run simultaneously.
8+
*
9+
* @param items - Array of items to process
10+
* @param fn - Async function to apply to each item
11+
* @param concurrency - Maximum number of concurrent operations (default: 10)
12+
* @returns Array of results in the same order as input
13+
*
14+
* @example
15+
* const results = await mapWithConcurrency(urls, fetchUrl, 5)
16+
*/
17+
export async function mapWithConcurrency<T, R>(
18+
items: T[],
19+
fn: (item: T, index: number) => Promise<R>,
20+
concurrency = 10,
21+
): Promise<R[]> {
22+
const results: R[] = Array.from({ length: items.length }) as R[]
23+
let currentIndex = 0
24+
25+
async function worker(): Promise<void> {
26+
while (currentIndex < items.length) {
27+
const index = currentIndex++
28+
results[index] = await fn(items[index]!, index)
29+
}
30+
}
31+
32+
// Start workers up to concurrency limit
33+
const workers = Array.from({ length: Math.min(concurrency, items.length) }, () => worker())
34+
await Promise.all(workers)
35+
36+
return results
37+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import { describe, expect, it, vi } from 'vitest'
2+
import { mapWithConcurrency } from '../../../../shared/utils/async'
3+
4+
describe('mapWithConcurrency', () => {
5+
it('processes all items and returns results in order', async () => {
6+
const items = [1, 2, 3, 4, 5]
7+
const results = await mapWithConcurrency(items, async x => x * 2)
8+
9+
expect(results).toEqual([2, 4, 6, 8, 10])
10+
})
11+
12+
it('respects concurrency limit', async () => {
13+
let concurrent = 0
14+
let maxConcurrent = 0
15+
16+
const items = Array.from({ length: 10 }, (_, i) => i)
17+
18+
await mapWithConcurrency(
19+
items,
20+
async () => {
21+
concurrent++
22+
maxConcurrent = Math.max(maxConcurrent, concurrent)
23+
await new Promise(resolve => setTimeout(resolve, 10))
24+
concurrent--
25+
},
26+
3,
27+
)
28+
29+
expect(maxConcurrent).toBe(3)
30+
})
31+
32+
it('handles empty array', async () => {
33+
const results = await mapWithConcurrency([], async x => x)
34+
expect(results).toEqual([])
35+
})
36+
37+
it('handles single item', async () => {
38+
const results = await mapWithConcurrency([42], async x => x * 2)
39+
expect(results).toEqual([84])
40+
})
41+
42+
it('passes index to callback', async () => {
43+
const items = ['a', 'b', 'c']
44+
const results = await mapWithConcurrency(items, async (item, index) => `${item}${index}`)
45+
46+
expect(results).toEqual(['a0', 'b1', 'c2'])
47+
})
48+
49+
it('propagates errors', async () => {
50+
const items = [1, 2, 3]
51+
const fn = vi.fn(async (x: number) => {
52+
if (x === 2) throw new Error('test error')
53+
return x
54+
})
55+
56+
await expect(mapWithConcurrency(items, fn)).rejects.toThrow('test error')
57+
})
58+
59+
it('uses default concurrency of 10', async () => {
60+
let concurrent = 0
61+
let maxConcurrent = 0
62+
63+
const items = Array.from({ length: 20 }, (_, i) => i)
64+
65+
await mapWithConcurrency(items, async () => {
66+
concurrent++
67+
maxConcurrent = Math.max(maxConcurrent, concurrent)
68+
await new Promise(resolve => setTimeout(resolve, 5))
69+
concurrent--
70+
})
71+
72+
expect(maxConcurrent).toBe(10)
73+
})
74+
75+
it('handles fewer items than concurrency limit', async () => {
76+
let concurrent = 0
77+
let maxConcurrent = 0
78+
79+
const items = [1, 2, 3]
80+
81+
await mapWithConcurrency(
82+
items,
83+
async () => {
84+
concurrent++
85+
maxConcurrent = Math.max(maxConcurrent, concurrent)
86+
await new Promise(resolve => setTimeout(resolve, 10))
87+
concurrent--
88+
},
89+
10,
90+
)
91+
92+
// Should only have 3 concurrent since we only have 3 items
93+
expect(maxConcurrent).toBe(3)
94+
})
95+
})

0 commit comments

Comments
 (0)