Bug 2044654 - Update the Github ETL process to run repository exports in parallel rather one one after another#18
Conversation
… in parallel rather one one after another
There was a problem hiding this comment.
Pull request overview
This PR updates the GitHub ETL entrypoint to process multiple configured repositories concurrently (rather than sequentially), with a bounded worker pool and added tests to validate worker-count resolution and concurrency behavior.
Changes:
- Add a thread pool (
ThreadPoolExecutor) to process repositories in parallel with a configurable worker cap. - Extract per-repository ETL logic into a dedicated
process_repo()function with a per-threadrequests.Session. - Add unit tests for
_resolve_max_workers()and a concurrency test to ensure repositories overlap in execution.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| main.py | Introduces bounded parallel per-repo processing via ThreadPoolExecutor, plus _resolve_max_workers() and process_repo() to isolate per-repo ETL work. |
| tests/test_main.py | Adds tests for worker resolution logic and verifies repositories are processed concurrently. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
| # Serializes installation-token creation across repo worker threads. Without it, | ||
| # all workers miss the cache on startup and each POST /access_tokens for the same | ||
| # installation, producing redundant token creations and a burst against GitHub's | ||
| # per-installation rate limit. The lock is only held on the slow (cache-miss) path. | ||
| _token_lock = threading.Lock() |
| # Slow path: serialize creation so concurrent workers sharing an installation | ||
| # don't each POST /access_tokens. Re-check the cache once the lock is held in | ||
| # case another thread populated it while we waited. | ||
| with _token_lock: |
| # Each repo is independent (its own session, token, and BigQuery rows keyed by | ||
| # target_repository), so they are processed concurrently. The work is I/O-bound | ||
| # (GitHub API + BigQuery), so threads — not processes — are the right fit. | ||
| max_workers = _resolve_max_workers(len(github_repos)) |
| # RuntimeError, or a bare Exception from load_data) is recorded as a | ||
| # failed repo rather than propagating out of the executor and | ||
| # discarding the results of other in-flight repos. | ||
| logger.error(f"Failed to process repo {repo}: {exc}") |
| f"Failed to get installation access token: {resp.status_code}: {resp.text}" | ||
| ) | ||
| if ( | ||
| resp.status_code == 403 |
| refresh_auth: Callable[[], None] | None = None | ||
| if github_app_id and github_private_key: | ||
|
|
||
| def _refresh() -> None: |
There was a problem hiding this comment.
Why not call this refresh_auth directly, so we don't need to declare it nor reassign it?
| # Each thread gets its own session; requests.Session is not safe to share | ||
| # across threads and we rewrite the Authorization header per repo. |
There was a problem hiding this comment.
nit: it may make the code clearer to read if the session-setup logic were split out to a separate function.
| _token_locks: dict[int, threading.Lock] = {} | ||
| _token_locks_guard = threading.Lock() | ||
|
|
||
|
|
||
| def _lock_for_installation(installation_id: int) -> threading.Lock: | ||
| """Return the (lazily created) token-creation lock for an installation.""" |
There was a problem hiding this comment.
Could this be encapsulated into a class, say, a TokenStore? I think this could include _cached_token too.
No description provided.