UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009
Draft
muhammad-ali-e wants to merge 1 commit into
Draft
UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009muhammad-ali-e wants to merge 1 commit into
muhammad-ali-e wants to merge 1 commit into
Conversation
Phase 5.2 of the PG Queue rollout (epic UN-3445). Adds fairness-header
support to the third dispatch path (sdk1's ExecutionDispatcher) so
``execute_extraction`` tasks emitted by file_processing carry the same
routing metadata as workflow-execution dispatches that go through
queue_backend.
What
* sdk1/execution/dispatcher.py: ``dispatch``, ``dispatch_async``,
``dispatch_with_callback`` all accept an optional ``headers`` kwarg.
When non-None, forwarded to Celery's send_task; when None, omitted
so the call shape stays identical to pre-Phase-5.2 for callers that
don't opt in (sdk1's existing tests remain green unchanged).
* queue_backend/fairness.py: new ``FairnessKey.as_header()`` method
returns the wire-ready ``{"x-fairness-key": ...}`` dict. Producers
no longer need to reference ``FAIRNESS_HEADER_NAME`` directly —
keeps the additive-only canary in test_fairness_key.py happy.
* file_processing/structure_tool_task.py: small ``_fairness_headers``
helper builds the header (defaulting workload_type to NON_API;
propagating the real type is Phase 6 work). All three
``dispatcher.dispatch(...)`` sites (lines 468, 507, 720) now pass
``headers=_fairness_headers(organization_id)``.
* tests/test_executor_dispatch.py: new file. Covers header forwarding
through all three dispatcher methods (including the "omit when
None" pre-existing shape preservation), the FairnessKey.as_header()
shape, and an AST inventory canary that forbids raw
``*.send_task("execute_extraction", ...)`` outside
ExecutionDispatcher.
Why
UN-3501 plumbed fairness on bare dispatch() call sites. The
``execute_extraction`` task is the most workflow-execution-y dispatch
in the codebase but bypasses queue_backend (uses ExecutionDispatcher
directly), so it had no fairness header. The canary in
test_fairness_key.py audits only bare-name dispatch() and missed it.
No regression risk
* Additive: ``headers`` is optional and defaults to None on all three
dispatcher methods; the existing 78 sdk1 tests pass unchanged.
* Producer-side only — no consumer reads ``x-fairness-key`` yet.
* No queue routing, task name, or args/kwargs change.
Test count: workers seam suite 53 -> 60 (new test_executor_dispatch.py
with 7 tests). sdk1 dispatcher suite 80/80 green.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.


What
headers: dict | Nonekwarg to all three dispatch methods onunstract.sdk1.execution.dispatcher.ExecutionDispatcher(dispatch,dispatch_async,dispatch_with_callback). When non-None, forwarded as Celery message headers; when None, the kwarg is omitted fromsend_taskso existing callers see the identical pre-change call shape.FairnessKey.as_header()helper inworkers/queue_backend/fairness.pyreturns the wire-ready{"x-fairness-key": ...}dict, so producers don't need to import the slot-name constant.dispatcher.dispatch(...)sites inworkers/file_processing/structure_tool_task.py(lines 468, 507, 720). Each passesheaders=_fairness_headers(organization_id). A small in-file helper defaultsworkload_typetoNON_API; propagating the real type from upstream is Phase 6 work.workers/tests/test_executor_dispatch.py(7 tests):FairnessKey.as_header()shape.*.send_task("execute_extraction", ...)outsideExecutionDispatcher.Why
UN-3501 (#2003) plumbed fairness on bare
dispatch(...)call sites — butexecute_extraction, the most workflow-execution-y dispatch in the codebase, bypassesqueue_backend.dispatch()entirely (it uses sdk1'sExecutionDispatcher). The Phase 5.1 canary audits bare-namedispatch()and missed it. This PR brings the third dispatch path under the same fairness umbrella.Under Celery today the header sits inertly — no consumer reads it. Under the future PG Queue substrate,
execute_extractiontasks land in the staging queue and the fairness scheduler sorts on the same three fields. Producer-side plumbing now means the consumer can land without backfilling in-flight payloads.How
ExecutionDispatcheronly attachesheaders=tosend_taskwhen the caller provides them. That preserves the pre-changemock.assert_called_with(...)shape used by sdk1's existing 78 dispatcher tests — they all stay green without modification.FairnessKey.as_header()shifts the wire-shape knowledge into the seam module, so producers compose aFairnessKey(...)and call.as_header(). The constantFAIRNESS_HEADER_NAMEstays internal toqueue_backend.fairness— keeps the "no consumer reads it yet" canary green at the producer level._fairness_headershelper instructure_tool_task.pyis intentionally one function: same workload_type for all three sites today; Phase 6 (chord lift) is the right place to thread the real workload type from the workflow's class.Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)
No.
headersis optional and defaults to None on all three dispatcher methods. Callers that don't opt in see the identicalsend_taskcall shape.x-fairness-keyyet (canary in test_fairness_key.py still asserts this).Database Migrations
Env Config
Relevant Docs
queue_backend/fairness.pyand the dispatcher cover the header semantics.Related Issues or PRs
Dependencies Versions
Notes on Testing
```
cd workers
.venv/bin/python -m pytest \
tests/test_executor_dispatch.py \
tests/test_fairness_key.py \
tests/test_queue_backend_seam.py \
tests/test_dispatch_sites_characterisation.py
60 passed in ~6s
cd ../unstract/sdk1
uv run pytest tests/test_execution.py
80 passed
```
Verification:
```
Every execute_extraction dispatch goes through ExecutionDispatcher:
grep -rn "send_task.execute_extraction" workers/ --include=".py" \
| grep -v "ExecutionDispatcher\|tests/\|sdk1/"
(empty — only sdk1's ExecutionDispatcher.dispatch* methods)
```
Screenshots
N/A (no UI surface).
Checklist
I have read and understood the Contribution Guidelines.