Skip to content

[DataFlow runtime · online] O1.1 — shared cross-process control plane#624

Merged
jiapingW merged 1 commit into
dataflow-up-16-zerocopyfrom
dataflow-up-19-online-shared-control-plane
Jul 2, 2026
Merged

[DataFlow runtime · online] O1.1 — shared cross-process control plane#624
jiapingW merged 1 commit into
dataflow-up-16-zerocopyfrom
dataflow-up-19-online-shared-control-plane

Conversation

@maocheng23

Copy link
Copy Markdown
Collaborator

Stage O1.1 of the online disaggregated training roadmap (#618): a shared, durable, cross-process control plane for the online producer/consumer.

Problem

The online producer and consumer run in separate processes, but each built its own in-process InMemoryMetadataStore — so commit/dedup/ack were never shared, and a restart couldn't reconcile (the producer held the committed set, the consumer held the ack marker, in two separate process-local stores). On restart the consumer also re-read the append-only channel from offset 0, silently re-training already-trained samples.

Change

  • build_disagg_online_{producer,consumer} accept metadata_store / metadata_db_path; both processes share one SQLiteMetadataStore (single-host O1 — SQLite WAL serializes the producer's commits against the consumer's ack transaction). New _resolve_metadata_store helper. Omitting both keeps the pre-O1.1 private in-process store.
  • consumer gains resume=: reconcile_on_restart derives the already-durably-trained set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer drops those refs on the channel re-read (no duplicate train); the committed-but-unacked tail re-streams and re-trains.
  • StreamingRefQueue gains skip_ids: drops matching refs on read and counts them consumed so the producer's in_flight_remote backpressure stays exact across the restart.

Tests

tests/test_runtime/test_disagg_online_shared_plane.py (CPU): cross-process commit/dedup, durable ack visible across processes, reconcile releases-acked / requeues-unacked (and requeues-all when the optimizer step wasn't durable), and restart-skip. Existing online/streaming tests unchanged.

Scope / non-goals

SQLite is the single-host O1 tier; a RedisMetadataStore for multi-node O2 is a later subclass behind the same MetadataStore ABC. The weight-version / staleness machinery is out of scope per the roadmap's train-with-decode decision (#618 §1/§8) and is deliberately not in this stack.

Stacked on #622 (dataflow-up-17-online-disagg); O1.2 (async loop) stacks on top.

🤖 Generated with Claude Code

@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

Base automatically changed from dataflow-up-17-online-disagg to dataflow-up-16-zerocopy June 30, 2026 13:29
@maocheng23 maocheng23 marked this pull request as ready for review June 30, 2026 23:26
@maocheng23 maocheng23 requested a review from FrankLeeeee as a code owner June 30, 2026 23:26
@gemini-code-assist

Copy link
Copy Markdown
Contributor

Warning

You have reached your daily quota limit. Please wait up to 24 hours and I will start processing your requests again!

@maocheng23

Copy link
Copy Markdown
Collaborator Author

Code review

No high-confidence issues found. Checked for bugs in the shared durable control-plane path, restart skip behavior, and online disaggregated launcher wiring.

Wire a shared, durable metadata store into the online disaggregated
producer/consumer so commit/dedup/ack are cross-process, and make the
streaming consumer restart-safe.

- build_disagg_online_{producer,consumer} accept metadata_store /
  metadata_db_path; both processes share one SQLiteMetadataStore instead of
  a private InMemoryMetadataStore each. New _resolve_metadata_store helper.
- consumer gains resume=: reconcile_on_restart derives the already-trained
  set and hands it to StreamingRefQueue as skip_ids, so a restarted consumer
  drops durably-trained refs on the append-only channel re-read (no duplicate
  train); the committed-but-unacked tail re-streams and re-trains.
- StreamingRefQueue gains skip_ids: drops matching refs on read and counts
  them consumed so the producer's in_flight_remote backpressure stays exact.

CPU tests (test_disagg_online_shared_plane.py): cross-process commit/dedup,
durable ack visible across processes, reconcile releases-acked/requeues-
unacked (and requeues-all when the optimizer step wasn't durable), restart
skip. Implements stage O1.1 of the online roadmap (#618).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@maocheng23 maocheng23 force-pushed the dataflow-up-19-online-shared-control-plane branch from 5f79d08 to d6a6fd3 Compare July 1, 2026 00:29
@maocheng23

Copy link
Copy Markdown
Collaborator Author

Addressed review feedback (self-review pass).

  • streaming_ref_channel.py: StreamingRefQueue._poll now drains restart skip_ids from the set and drops to the zero-overhead fast path once the (already-trained front-prefix) skip set is empty, instead of hashing every ref for the rest of a long online run.

Deferred (out of scope here): folding StreamingRefQueue.get() into StreamingRefChannel.stream() (concurrency-loop dedup) and making reconcile_on_restart streaming-aware (it lives in control_plane/controller.py, outside this PR) — both noted for follow-up.

Validated: full tests/test_runtime = 200 OK (2 skipped, 1 xfail), zero failures, on a 2-node H200 pod. Lint clean (black 24.10.0 / isort 5.13.2 / autoflake).

@jiapingW jiapingW merged commit 7ed8527 into dataflow-up-16-zerocopy Jul 2, 2026
1 check passed
@jiapingW jiapingW deleted the dataflow-up-19-online-shared-control-plane branch July 2, 2026 04:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants