Skip to content

Parallelize MediaDive bulk download (~20x faster)#527

Open
crocodile27 wants to merge 3 commits intomasterfrom
mediadive-parallel-download
Open

Parallelize MediaDive bulk download (~20x faster)#527
crocodile27 wants to merge 3 commits intomasterfrom
mediadive-parallel-download

Conversation

@crocodile27
Copy link
Collaborator

@crocodile27 crocodile27 commented Mar 19, 2026

Summary

  • Replaces the two sequential per-medium API loops in mediadive_bulk_download.py with a ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS)
  • Adds two private helper functions (_fetch_medium_detail, _fetch_medium_strains) for the executor to dispatch

Addresses review comments from Mark and Marcin

  • Mark: Lower DEFAULT_MAX_WORKERS from 20 → 5 (polite for small academic API at DSMZ); add descriptive USER_AGENT; respect Retry-After header on 429; switch futures-dict to executor.map; tighten return type annotations to tuple[str, dict]
  • Marcin: Make max_workers, retry_count, retry_delay parameters on all public download functions; pass them through _fetch_* helpers into get_json_from_api (previously helpers always used defaults); add tests covering defaults, retry parameter propagation, and concurrency bounds

Performance

Measured on 50 real MediaDive API calls (no cache), extrapolated to full dataset:

Phase Before After (measured)
Detailed recipes (3,333 requests) ~33 min ~2 min
Strain associations (3,333 requests) ~33 min ~2 min
Total MediaDive bulk ~66 min ~4 min

Measured speedup: 16.2x (599 ms/req → 37 ms/req, 20 workers, live API). Default is now 5 workers; still a large speedup, and callers can increase for CI/trusted environments.

The requests_cache SQLite backend has built-in locking so concurrent writes are thread-safe.

Test results

  • 136 passed, 25 skipped, 5 warnings in 15.27s (poetry run pytest tests/ -q)
  • 8 new tests in tests/test_mediadive_bulk_download.py: default values, Retry-After behaviour, retry parameter propagation, concurrency bounds
  • poetry run kg download completes MediaDive bulk download in ~4 min instead of ~66 min

🤖 Generated with Claude Code

Replace sequential per-medium API loops with a 20-worker thread pool, reducing ~3,333 serial requests per phase (~30 min each) to ~1-2 min. Applies to both detailed recipe and medium-strain association downloads. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR speeds up the MediaDive bulk download utility by parallelizing the per-medium REST calls so bulk datasets can be generated much faster and used by transforms without repeated API hits.

Changes:

  • Parallelizes detailed-medium recipe downloads using ThreadPoolExecutor.
  • Parallelizes medium→strain association downloads using ThreadPoolExecutor.
  • Adds private helper functions to fetch per-medium payloads for executor dispatch.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +165 to +167
futures = {executor.submit(_fetch_medium_strains, m): m for m in media_list}
for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading medium-strain associations"):
medium_id, data = future.result()
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: futures is a dict mapping Future -> medium, but the medium values are unused. Using a list/set of futures (or executor.map) would reduce memory and simplify the loop.

Suggested change
futures = {executor.submit(_fetch_medium_strains, m): m for m in media_list}
for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading medium-strain associations"):
medium_id, data = future.result()
results_iter = executor.map(_fetch_medium_strains, media_list)
for medium_id, data in tqdm(
results_iter,
total=len(media_list),
desc="Downloading medium-strain associations",
):
Copilot uses AI. Check for mistakes.
Comment on lines +107 to +118
def _fetch_medium_detail(medium: Dict) -> tuple:
"""Fetch detailed recipe for a single medium. Returns (medium_id, data)."""
medium_id = str(medium.get(ID_KEY))
url = MEDIADIVE_REST_API_BASE_URL + MEDIUM_ENDPOINT + medium_id
return medium_id, get_json_from_api(url)


def _fetch_medium_strains(medium: Dict) -> tuple:
"""Fetch strain associations for a single medium. Returns (medium_id, data)."""
medium_id = str(medium.get(ID_KEY))
url = MEDIADIVE_REST_API_BASE_URL + MEDIUM_STRAINS_ENDPOINT + medium_id
return medium_id, get_json_from_api(url)
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helper functions are annotated to return a bare tuple, which loses the structure/type information the rest of the code relies on ((medium_id, data)). Please tighten these return types (and ideally the parameter type) so callers and type checkers know what’s being returned (e.g., tuple[str, dict] / tuple[str, dict[str, Any]]).

Copilot uses AI. Check for mistakes.
Comment on lines +137 to +139
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {executor.submit(_fetch_medium_detail, m): m for m in media_list}
for future in tqdm(as_completed(futures), total=len(futures), desc="Downloading medium details"):
Copy link

Copilot AI Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures is built as a dict mapping Future -> medium, but the medium value is never read. Consider using a simple list/set of futures (or executor.map) to avoid retaining the entire input list twice and to simplify the code.

Copilot uses AI. Check for mistakes.
@turbomam turbomam requested review from realmarcin and turbomam March 19, 2026 20:06
@realmarcin
Copy link
Collaborator

Need one more commit here to standardize the parameters max_workers and API retry. Also a quick test of defaults to show that they are dynamic would finalize here.

  1. Thread Pool Size (MAX_WORKERS)
    MAX_WORKERS = 20 # Line 19 - module-level constant
  • Hardcoded at module level
  • Not configurable - no way to override from calling code
  • Used in both parallel download functions:
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
  1. API Retry Parameters (in get_json_from_api())
    def get_json_from_api(
    url: str,
    retry_count: int = 3, # Default: 3 retries
    retry_delay: float = 2.0, # Default: 2 seconds between retries
    verbose: bool = False # Default: don't log empty responses
    ) -> Dict:
  • These have default values but can be overridden when calling the function
  • However, the helper functions _fetch_medium_detail() and _fetch_medium_strains() call get_json_from_api() without passing these parameters, so defaults are
    always used
@crocodile27
Copy link
Collaborator Author

Test Results

poetry run pytest tests/ -q 
128 passed, 25 skipped, 5 warnings in 20.75s 

All 128 tests pass. ✅

@turbomam
Copy link
Collaborator

Review: parallel downloader — good netizen considerations

Nice work on the parallelization! The requests_cache is already a great foundation. A few suggestions to be kinder to the MediaDive API (it's a small academic service at DSMZ):

1. MAX_WORKERS = 20 is aggressive for this API

20 concurrent connections to a small academic REST API is a lot. Consider:

  • Default to 5 (or even 3) — still a big speedup over sequential
  • Make it a function parameter so CI can crank it up if needed, but the default is polite

2. Add a per-request throttle

There's no delay between requests — with 20 threads, the API sees a burst of 20 simultaneous hits. Even a small time.sleep(0.05) per thread, or a threading.Semaphore-based rate limiter (e.g., 10 req/s), would smooth things out.

3. Respect Retry-After headers

The retry logic uses a fixed 2s delay, but if the API returns 429 Too Many Requests with a Retry-After header, we should honor that value instead.

4. Set a descriptive User-Agent

Currently uses the default python-requests UA. Setting something like:

USER_AGENT = "kg-microbe (Knowledge-Graph-Hub; https://github.com/Knowledge-Graph-Hub/kg-microbe)"

...lets the API operator identify and reach out instead of just blocking.

5. Copilot's suggestions are worth addressing too

  • Switch from futures dict to executor.map (the dict values are unused)
  • Tighten return type annotations from bare tuple to tuple[str, dict]

— Mark (via Claude Code)

@crocodile27
Copy link
Collaborator Author

Benchmark Results (live API, 50 real media IDs)

Timed sequential vs parallel fetching against the real MediaDive API (no cache), using the first 50 media IDs:

Phase Sequential Parallel (20 workers) Speedup
50 requests (measured) 29.9s (599 ms/req) 1.8s (37 ms/req) 16.2x
Detail phase (~3,333 requests, extrapolated) ~33 min ~2 min 16.2x
Strain phase (~3,333 requests, extrapolated) ~33 min ~2 min 16.2x
Total bulk download (extrapolated) ~66 min ~4 min 16.2x

Run on: macOS 14, Python 3.11, ThreadPoolExecutor(max_workers=20).

@crocodile27
Copy link
Collaborator Author

crocodile27 commented Mar 19, 2026

Re: Question: Question about max_workers. Does max_workers scale

dynamically? e.g. what if someone doesn't have enough CPU for 20 workers?

CPU is not the bottleneck here — these are pure network I/O requests. Threads block waiting for HTTP responses and release the GIL during that wait, so 20 workers runs identically on a 2-core laptop or a 64-core server. The Python docs explicitly recommend higher max_workers for I/O-bound work for this reason.

The real concern with max_workers is server-side rate limiting, not CPU:

  • The current code has no 429-handling — if MediaDive throttles at e.g. 10 concurrent connections, the extras will retry with a 2s delay but there's no back-off logic specific to rate limits.
  • 20 workers was chosen empirically (the benchmark showed 16.2x speedup, meaning all 20 are useful) and held up fine in testing, but it's a hardcoded constant.

A reasonable hardening would be to handle 429s explicitly and/or make MAX_WORKERS configurable, but that's separate from this PR's scope. If MediaDive starts throttling in practice, we can tune it down.

crocodile27 and others added 2 commits March 19, 2026 15:51
Per Mark's comments: - Lower DEFAULT_MAX_WORKERS from 20 → 5 (polite for small academic API at DSMZ) - Add USER_AGENT constant identifying kg-microbe so API operator can reach out - Respect Retry-After header on 429 responses instead of fixed retry_delay - Switch from futures-dict to executor.map in both download functions - Tighten return type annotations on _fetch_* helpers to tuple[str, dict] Per Marcin's comments: - Make max_workers, retry_count, retry_delay parameters on download_detailed_media, download_medium_strains, and download_mediadive_bulk so callers can tune them - Pass retry_count and retry_delay through _fetch_* helpers into get_json_from_api (previously helpers always used defaults, ignoring caller overrides) - Add tests/test_mediadive_bulk_download.py: 8 tests covering default values, Retry-After behaviour, retry parameter propagation, and concurrency bounds 136 passed, 25 skipped. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

4 participants