Skip to content

feat: prefetching dataloader (buffered + double-buffered modes)#196

Merged
d-laub merged 40 commits into
mainfrom
feat/prefetching-dataloader
May 29, 2026
Merged

feat: prefetching dataloader (buffered + double-buffered modes)#196
d-laub merged 40 commits into
mainfrom
feat/prefetching-dataloader

Conversation

@d-laub
Copy link
Copy Markdown
Collaborator

@d-laub d-laub commented May 29, 2026

Summary

Adds two new modes on Dataset.to_dataloader() that coarsen gvl fetches into large chunks and slice them into mini-batches on the consumer side. gvl's Dataset[r_idx, s_idx] throughput scales with fetch size (internal multithreading amortizes overhead), so one big call + slice outperforms many small per-batch calls.

  • mode="buffered" — main process fetches one chunk per call (sized to buffer_bytes, default 2 GB), slices via slice_chunk.
  • mode="double_buffered" — subprocess producer fills one of two POSIX shm slots while consumer drains the other; ping-pong via multiprocessing.Event. Refill latency is hidden behind the drain.

Footprint is computed exactly per (region, sample) via a new Dataset._output_bytes_per_instance method that walks haplotype_lengths, n_variants, and allele offset tables — no Zipf-style worst-case bounds, so slot capacity is tight.

API

loader = ds.to_dataloader(
    batch_size=32,
    mode="double_buffered",     # or "buffered", or None
    buffer_bytes=2 * 1024**3,
    copy=True,                  # default; zero-copy opt-out
    heartbeat_seconds=60.0,     # double_buffered: max wait per chunk before liveness check
)

Preconditions (raise ValueError at construction):

  • with_seqs in {"haplotypes", "annotated"} requires deterministic=True.
  • Spliced datasets are rejected.
  • num_workers > 0 is rejected.
  • A single mini-batch whose exact footprint exceeds slot capacity raises with remediation knobs (batch_size↓, buffer_bytes↑).
  • mode="double_buffered" rejects datasets with non-default insertion-fill strategies (can't be serialized to the subprocess).

New files

  • python/genvarloader/_chunked.pyChunkPlanner, slice_chunk
  • python/genvarloader/_buffered_loader.pymode="buffered" factory
  • python/genvarloader/_shm_layout.py — hand-rolled header pack/unpack for dense, Ragged, and RaggedVariants
  • python/genvarloader/_producer.py — subprocess entrypoint with broad schema replay
  • python/genvarloader/_double_buffered_loader.pymode="double_buffered" factory

Modified

  • _torch.py:get_dataloadermode/buffer_bytes/copy/heartbeat_seconds dispatch
  • _dataset/_impl.py — new _output_bytes_per_instance method (all four with_seqs modes + tracks + var_fields)
  • _dataset/_haps.py — new _allele_bytes_sum helper (O(|V|) via RaggedAlleles.offsets diff)
  • skills/genvarloader/SKILL.md — documents the new args

Test plan

  • Unit tests for _allele_bytes_sum (cross-checked via independent _get_variants materialization).
  • Unit tests for _output_bytes_per_instance across all (with_seqs × with_tracks × var_fields) combinations supported by the dummy fixture.
  • Pure-logic tests for ChunkPlanner (slot respect, mini-batch boundary preservation, oversized-batch rejection, peak chunk size).
  • Parity tests: slice_chunk output matches dataset[r, s] for every output mode.
  • Shm layout round-trip: dense, Ragged, multi-Ragged (AnnotatedHaps-style), RaggedVariants (real dataset output), cross-process.
  • Producer subprocess: signals correctly, surfaces exceptions via queue.
  • mode="buffered": parity with direct indexing for all output modes, rejects num_workers, oversized batch, non-deterministic haplotypes.
  • mode="double_buffered": parity with mode="buffered", parity under rc_neg=False, producer-crash heartbeat path, shm cleanup on close (Linux-only).
  • Existing test suite: 248 passed, no regressions.

Spec / plan

  • Spec: docs/superpowers/specs/2026-05-28-prefetching-dataloader-design.md
  • Plan: docs/superpowers/plans/2026-05-28-prefetching-dataloader-implementation.md

Out of scope (intentional)

  • DDP / multi-GPU (rank-aware sampler + per-rank producer).
  • Non-deterministic / random-variant-selection datasets for haplotypes/annotated.
  • Spliced datasets.
  • Auto mode selection — user picks; the loader does not guess.

🤖 Generated with Claude Code

d-laub and others added 30 commits May 28, 2026 13:59
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Adds Haps._allele_bytes_sum, an O(|selected variants|) helper that computes
the total allele byte length per (instance, ploid) using RaggedAlleles.offsets
differences, without touching allele payload bytes.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds _producer.py with producer_main() that opens a GVL Dataset,
reapplies a serializable schema dict (with_seqs/with_tracks/with_settings),
loops on an index_queue consuming (slot_idx, r_idx, s_idx, n_batches) items,
writes each fetched chunk into the designated shared-memory slot via
write_chunk, and signals the ready event. Any exception is pushed to exc_q
for clean subprocess exit. Tests verify the exception path (bad path pushes
to exc_q); the happy-path test skips when dummy dataset is not file-backed.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Implements _double_buffered_loader.py with _DoubleBufferedIterable:
- Spawns a producer subprocess (spawn context for macOS compat) that writes
  chunks into alternating shared-memory slots via Event ping-pong.
- Persistent producer across epochs; cleaned up via close()/atexit/weakref.
- Heartbeat timeout checks producer liveness; raises ProducerError/ProducerDied.
- Handles ragged shape restoration (ploidy axis) after shm round-trip.
- Schema includes reference_path + reference_in_memory so the producer opens
  the FASTA with the same in_memory setting as the main process.

Tests cover reference and haplotypes parity vs buffered, producer crash
surface (GVL_TEST_PRODUCER_RAISE env var), and macOS-skipped shm cleanup.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All T13 tests were written together with T12 (single feature):
- test_producer_exception_reraised: injects GVL_TEST_PRODUCER_RAISE=1 via
  monkeypatch; verifies the consumer surfaces ProducerError/ProducerDied.
- test_shm_cleanup_after_close: Linux-only check that no gvl-* entries
  linger in /dev/shm after explicit close() + GC.

Both pass in py310; shm_cleanup is pytest.skip on macOS (no /dev/shm).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Capture jitter, rc_neg, min_af, max_af, var_filter, and var_fields in
the schema dict built by _spawn_producer, then replay each via
with_settings in _apply_schema. Reject mode='double_buffered' at spawn
time for spliced datasets and non-default insertion_fill strategies,
which cannot be serialized, rather than silently producing divergent
output. Add test verifying buffered/double_buffered parity with rc_neg
and deterministic settings applied.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Delete _deep_copy_batch (dead: read_chunk(copy=True) already returns
owned data, making the per-mini-batch copy branch a no-op; collapse the
if/else to a plain yield-from). Delete _DTYPE_FROM_NUM from _shm_layout
(unused since dtype.str replaced it). Remove WHAT-comments and
task-number references across _chunked.py, _shm_layout.py,
_double_buffered_loader.py, and _producer.py. Add a one-line WHY comment
explaining the i % 2 slot rotation invariant.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Also fix Dataset.open to default seqs_kind to "variants" (not
"haplotypes") when no reference genome is provided, so open() succeeds
and callers can use .with_seqs("variants") without error.
…pen.py change

haplotypes/annotated reconstruct against a reference genome, so the bench
opens with the symlinked hg38 fixture. Reverts the _initial_seqs_kind patch
so the bench stays a pure consumer of the public API.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Implements CSV_COLUMNS, _build_dataset, _build_loader, _drain, and
measure_cell with warmup epoch, elapsed/epoch stop conditions, hard-cap
guard, and peak-RSS delta tracking.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
d-laub and others added 9 commits May 29, 2026 11:33
double_buffered slots were sized from _output_bytes_per_instance (payload
bytes only), but write_chunk also serializes int64 offset/lengths arrays:
outer offsets (~8*ploidy per instance per ragged array) and, for variants
alt/ref, inner allele offsets (~8 bytes per variant, dwarfing 1-byte SNV
alleles). On realistic 1KG data these overflowed the fixed slot slack and
the producer raised ProducerError. Unit tests passed because toy data's
offsets fit within the 4 KB slack.

Add an opt-in include_offsets to _output_bytes_per_instance that adds the
per-instance offset overhead, and use it to size the double_buffered planner
+ slots. buffered keeps payload-only sizing (no fixed slot). Regression test
exercises variants mode on the 1KG fixture.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
write_chunk had no case for RaggedAnnotatedHaps (annotated = 3 ragged arrays:
haps/var_idxs/ref_coords), so double_buffered+annotated raised TypeError in
the producer. Add a kind=3 shm descriptor that stores the three components and
reconstructs them on read; extend _reshape_ragged_for_chunk to re-introduce
the ploidy axis on each component. Regression test compares annotated batches
against buffered.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
double_buffered allocates two slots totaling buffer_bytes plus producer+
consumer chunk copies; the old 2-4 GiB values used too much RAM. Reduce
BUFFER_FACT/FAN/MID (ceiling 512 MiB). Cell counts unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
… exit

The loader registered atexit.register(self.close), whose bound method kept a
strong reference to every loader for the whole process lifetime. weakref.finalize
already cleans up on GC and at interpreter exit, so the atexit registration only
leaked: per-loader producer subprocesses + shm slots accumulated (one loader per
bench cell) until exit, exhausting RAM (observed ~60GB swap). Drop the atexit
line; rely on the finalizer. Bench also closes each loader's iterable explicitly
between cells. Regression test asserts the producer is reaped when a loader is
dropped.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Reference.from_path defaults to in_memory=True (~3 GB hg38 resident per cell
and per double_buffered producer). Coerce a path reference to a memmap so it
is file-backed/reclaimable; the producer mirrors it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Full run: 195/195 cells, 0 failures, 0 timeouts, all 3 modes x 3 outputs.
Fix plot baseline (mode=None) series: read_csv null_values=[''] loads the
empty mode field as null, so filter via is_null(); move legend to a panel
carrying all three series. Lint: hoist pytest import, drop unused seqpro.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Refactor plot_results into a per-metric _render and emit both
results_plot.png (instances/s) and results_plot_bandwidth.png (MiB/s).
Subset the double_buffered variants regression to 32 regions (still
exercises the alt/ref offset path, writes less data).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@d-laub d-laub marked this pull request as ready for review May 29, 2026 21:07
Add experiments/_sysinfo.py (py-cpuinfo primary, sysctl//proc fallbacks):
captures cpu brand, microarch, logical/physical cores, RAM, platform, python,
and key package versions. The dataloader bench writes system_info.json next to
results.csv, embeds cpu_brand/cpu_microarch/logical_cpus in every CSV row, and
the table-overlap benches print + write the same sidecar. Refresh dataloader
results (195/195, 0 failures) + plots with the new columns.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@d-laub d-laub merged commit 1239e7f into main May 29, 2026
7 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant