[DataFlow runtime · online] O1.1 — shared cross-process control plane#624
Conversation
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
|
Warning You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again! |
Code reviewNo high-confidence issues found. Checked for bugs in the shared durable control-plane path, restart skip behavior, and online disaggregated launcher wiring. |
Wire a shared, durable metadata store into the online disaggregated
producer/consumer so commit/dedup/ack are cross-process, and make the
streaming consumer restart-safe.
- build_disagg_online_{producer,consumer} accept metadata_store /
metadata_db_path; both processes share one SQLiteMetadataStore instead of
a private InMemoryMetadataStore each. New _resolve_metadata_store helper.
- consumer gains resume=: reconcile_on_restart derives the already-trained
set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer
drops durably-trained refs on the append-only channel re-read (no duplicate
train); the committed-but-unacked tail re-streams and re-trains.
- StreamingRefQueue gains skip_ids: drops matching refs on read and counts
them consumed so the producer's in_flight_remote backpressure stays exact.
CPU tests (test_disagg_online_shared_plane.py): cross-process commit/dedup,
durable ack visible across processes, reconcile releases-acked/requeues-
unacked (and requeues-all when the optimizer step wasn't durable), restart
skip. Implements stage O1.1 of the online roadmap (#618).
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
5f79d08 to
d6a6fd3
Compare
|
Addressed review feedback (self-review pass).
Deferred (out of scope here): folding Validated: full |
Stage O1.1 of the online disaggregated training roadmap (#618): a shared, durable, cross-process control plane for the online producer/consumer.
Problem
The online producer and consumer run in separate processes, but each built its own in-process
InMemoryMetadataStore— so commit/dedup/ack were never shared, and a restart couldn't reconcile (the producer held the committed set, the consumer held the ack marker, in two separate process-local stores). On restart the consumer also re-read the append-only channel from offset 0, silently re-training already-trained samples.Change
build_disagg_online_{producer,consumer}acceptmetadata_store/metadata_db_path; both processes share oneSQLiteMetadataStore(single-host O1 — SQLite WAL serializes the producer's commits against the consumer's ack transaction). New_resolve_metadata_storehelper. Omitting both keeps the pre-O1.1 private in-process store.resume=:reconcile_on_restartderives the already-durably-trained set and hands it toStreamingRefQueueasskip_ids, so a restarted consumer drops those refs on the channel re-read (no duplicate train); the committed-but-unacked tail re-streams and re-trains.StreamingRefQueuegainsskip_ids: drops matching refs on read and counts them consumed so the producer'sin_flight_remotebackpressure stays exact across the restart.Tests
tests/test_runtime/test_disagg_online_shared_plane.py(CPU): cross-process commit/dedup, durable ack visible across processes, reconcile releases-acked / requeues-unacked (and requeues-all when the optimizer step wasn't durable), and restart-skip. Existing online/streaming tests unchanged.Scope / non-goals
SQLite is the single-host O1 tier; a
RedisMetadataStorefor multi-node O2 is a later subclass behind the sameMetadataStoreABC. The weight-version / staleness machinery is out of scope per the roadmap's train-with-decode decision (#618 §1/§8) and is deliberately not in this stack.Stacked on #622 (
dataflow-up-17-online-disagg); O1.2 (async loop) stacks on top.🤖 Generated with Claude Code