Parallelize MediaDive bulk download (~20x faster)#527
Parallelize MediaDive bulk download (~20x faster)#527crocodile27 wants to merge 3 commits intomasterfrom
Conversation
Replace sequential per-medium API loops with a 20-worker thread pool, reducing ~3,333 serial requests per phase (~30 min each) to ~1-2 min. Applies to both detailed recipe and medium-strain association downloads. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR speeds up the MediaDive bulk download utility by parallelizing the per-medium REST calls so bulk datasets can be generated much faster and used by transforms without repeated API hits.
Changes:
- Parallelizes detailed-medium recipe downloads using
ThreadPoolExecutor. - Parallelizes medium→strain association downloads using
ThreadPoolExecutor. - Adds private helper functions to fetch per-medium payloads for executor dispatch.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| futures = {executor.submit(_fetch_medium_strains, m): m for m in media_list} | ||
| for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading medium-strain associations"): | ||
| medium_id, data = future.result() |
There was a problem hiding this comment.
Same as above: futures is a dict mapping Future -> medium, but the medium values are unused. Using a list/set of futures (or executor.map) would reduce memory and simplify the loop.
| futures = {executor.submit(_fetch_medium_strains, m): m for m in media_list} | |
| for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading medium-strain associations"): | |
| medium_id, data = future.result() | |
| results_iter = executor.map(_fetch_medium_strains, media_list) | |
| for medium_id, data in tqdm( | |
| results_iter, | |
| total=len(media_list), | |
| desc="Downloading medium-strain associations", | |
| ): |
| def _fetch_medium_detail(medium: Dict) -> tuple: | ||
| """Fetch detailed recipe for a single medium. Returns (medium_id, data).""" | ||
| medium_id = str(medium.get(ID_KEY)) | ||
| url = MEDIADIVE_REST_API_BASE_URL + MEDIUM_ENDPOINT + medium_id | ||
| return medium_id, get_json_from_api(url) | ||
| | ||
| | ||
| def _fetch_medium_strains(medium: Dict) -> tuple: | ||
| """Fetch strain associations for a single medium. Returns (medium_id, data).""" | ||
| medium_id = str(medium.get(ID_KEY)) | ||
| url = MEDIADIVE_REST_API_BASE_URL + MEDIUM_STRAINS_ENDPOINT + medium_id | ||
| return medium_id, get_json_from_api(url) |
There was a problem hiding this comment.
The helper functions are annotated to return a bare tuple, which loses the structure/type information the rest of the code relies on ((medium_id, data)). Please tighten these return types (and ideally the parameter type) so callers and type checkers know what’s being returned (e.g., tuple[str, dict] / tuple[str, dict[str, Any]]).
| with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: | ||
| futures = {executor.submit(_fetch_medium_detail, m): m for m in media_list} | ||
| for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading medium details"): |
There was a problem hiding this comment.
futures is built as a dict mapping Future -> medium, but the medium value is never read. Consider using a simple list/set of futures (or executor.map) to avoid retaining the entire input list twice and to simplify the code.
| Need one more commit here to standardize the parameters max_workers and API retry. Also a quick test of defaults to show that they are dynamic would finalize here.
|
Test ResultsAll 128 tests pass. ✅ |
Review: parallel downloader — good netizen considerationsNice work on the parallelization! The 1. |
Benchmark Results (live API, 50 real media IDs)Timed sequential vs parallel fetching against the real MediaDive API (no cache), using the first 50 media IDs:
Run on: macOS 14, Python 3.11, |
Re: Question: Question about max_workers. Does max_workers scaledynamically? e.g. what if someone doesn't have enough CPU for 20 workers? CPU is not the bottleneck here — these are pure network I/O requests. Threads block waiting for HTTP responses and release the GIL during that wait, so 20 workers runs identically on a 2-core laptop or a 64-core server. The Python docs explicitly recommend higher The real concern with
A reasonable hardening would be to handle 429s explicitly and/or make |
Per Mark's comments: - Lower DEFAULT_MAX_WORKERS from 20 → 5 (polite for small academic API at DSMZ) - Add USER_AGENT constant identifying kg-microbe so API operator can reach out - Respect Retry-After header on 429 responses instead of fixed retry_delay - Switch from futures-dict to executor.map in both download functions - Tighten return type annotations on _fetch_* helpers to tuple[str, dict] Per Marcin's comments: - Make max_workers, retry_count, retry_delay parameters on download_detailed_media, download_medium_strains, and download_mediadive_bulk so callers can tune them - Pass retry_count and retry_delay through _fetch_* helpers into get_json_from_api (previously helpers always used defaults, ignoring caller overrides) - Add tests/test_mediadive_bulk_download.py: 8 tests covering default values, Retry-After behaviour, retry parameter propagation, and concurrency bounds 136 passed, 25 skipped. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Summary
mediadive_bulk_download.pywith aThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS)_fetch_medium_detail,_fetch_medium_strains) for the executor to dispatchAddresses review comments from Mark and Marcin
DEFAULT_MAX_WORKERSfrom 20 → 5 (polite for small academic API at DSMZ); add descriptiveUSER_AGENT; respectRetry-Afterheader on 429; switchfutures-dict toexecutor.map; tighten return type annotations totuple[str, dict]max_workers,retry_count,retry_delayparameters on all public download functions; pass them through_fetch_*helpers intoget_json_from_api(previously helpers always used defaults); add tests covering defaults, retry parameter propagation, and concurrency boundsPerformance
Measured on 50 real MediaDive API calls (no cache), extrapolated to full dataset:
Measured speedup: 16.2x (599 ms/req → 37 ms/req, 20 workers, live API). Default is now 5 workers; still a large speedup, and callers can increase for CI/trusted environments.
The
requests_cacheSQLite backend has built-in locking so concurrent writes are thread-safe.Test results
poetry run pytest tests/ -q)tests/test_mediadive_bulk_download.py: default values, Retry-After behaviour, retry parameter propagation, concurrency boundspoetry run kg downloadcompletes MediaDive bulk download in ~4 min instead of ~66 min🤖 Generated with Claude Code