Skip to content

UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009

Draft
muhammad-ali-e wants to merge 1 commit into
mainfrom
UN-3508-executor-fairness
Draft

UN-3508 [FEAT] Plumb fairness through ExecutionDispatcher#2009
muhammad-ali-e wants to merge 1 commit into
mainfrom
UN-3508-executor-fairness

Conversation

@muhammad-ali-e
Copy link
Copy Markdown
Contributor

What

  • Add optional headers: dict | None kwarg to all three dispatch methods on unstract.sdk1.execution.dispatcher.ExecutionDispatcher (dispatch, dispatch_async, dispatch_with_callback). When non-None, forwarded as Celery message headers; when None, the kwarg is omitted from send_task so existing callers see the identical pre-change call shape.
  • New FairnessKey.as_header() helper in workers/queue_backend/fairness.py returns the wire-ready {"x-fairness-key": ...} dict, so producers don't need to import the slot-name constant.
  • Wire fairness from the three dispatcher.dispatch(...) sites in workers/file_processing/structure_tool_task.py (lines 468, 507, 720). Each passes headers=_fairness_headers(organization_id). A small in-file helper defaults workload_type to NON_API; propagating the real type from upstream is Phase 6 work.
  • New workers/tests/test_executor_dispatch.py (7 tests):
    • Header forwarding through all three dispatcher methods.
    • "Omit when None" — call shape preservation for callers that don't opt in (sdk1 round-trip tests still green unchanged).
    • FairnessKey.as_header() shape.
    • AST inventory canary: no production code calls *.send_task("execute_extraction", ...) outside ExecutionDispatcher.

Why

UN-3501 (#2003) plumbed fairness on bare dispatch(...) call sites — but execute_extraction, the most workflow-execution-y dispatch in the codebase, bypasses queue_backend.dispatch() entirely (it uses sdk1's ExecutionDispatcher). The Phase 5.1 canary audits bare-name dispatch() and missed it. This PR brings the third dispatch path under the same fairness umbrella.

Under Celery today the header sits inertly — no consumer reads it. Under the future PG Queue substrate, execute_extraction tasks land in the staging queue and the fairness scheduler sorts on the same three fields. Producer-side plumbing now means the consumer can land without backfilling in-flight payloads.

How

  • ExecutionDispatcher only attaches headers= to send_task when the caller provides them. That preserves the pre-change mock.assert_called_with(...) shape used by sdk1's existing 78 dispatcher tests — they all stay green without modification.
  • FairnessKey.as_header() shifts the wire-shape knowledge into the seam module, so producers compose a FairnessKey(...) and call .as_header(). The constant FAIRNESS_HEADER_NAME stays internal to queue_backend.fairness — keeps the "no consumer reads it yet" canary green at the producer level.
  • The _fairness_headers helper in structure_tool_task.py is intentionally one function: same workload_type for all three sites today; Phase 6 (chord lift) is the right place to thread the real workload type from the workflow's class.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why. (PS: Admins do not merge the PR without this section filled)

No.

  • headers is optional and defaults to None on all three dispatcher methods. Callers that don't opt in see the identical send_task call shape.
  • All 78 existing sdk1 dispatcher tests pass unchanged.
  • Worker seam suite extended (53 → 60 tests, all green).
  • Producer-side only: no consumer in workers/ reads x-fairness-key yet (canary in test_fairness_key.py still asserts this).
  • No queue routing, task name, or args/kwargs change.

Database Migrations

  • None.

Env Config

  • None.

Relevant Docs

  • N/A. Module-level docstrings in queue_backend/fairness.py and the dispatcher cover the header semantics.

Related Issues or PRs

Dependencies Versions

  • None.

Notes on Testing

```
cd workers
.venv/bin/python -m pytest \
tests/test_executor_dispatch.py \
tests/test_fairness_key.py \
tests/test_queue_backend_seam.py \
tests/test_dispatch_sites_characterisation.py

60 passed in ~6s

cd ../unstract/sdk1
uv run pytest tests/test_execution.py

80 passed

```

Verification:

```

Every execute_extraction dispatch goes through ExecutionDispatcher:

grep -rn "send_task.execute_extraction" workers/ --include=".py" \
| grep -v "ExecutionDispatcher\|tests/\|sdk1/"

(empty — only sdk1's ExecutionDispatcher.dispatch* methods)

```

Screenshots

N/A (no UI surface).

Checklist

I have read and understood the Contribution Guidelines.

Phase 5.2 of the PG Queue rollout (epic UN-3445). Adds fairness-header
support to the third dispatch path (sdk1's ExecutionDispatcher) so
``execute_extraction`` tasks emitted by file_processing carry the same
routing metadata as workflow-execution dispatches that go through
queue_backend.

What

* sdk1/execution/dispatcher.py: ``dispatch``, ``dispatch_async``,
  ``dispatch_with_callback`` all accept an optional ``headers`` kwarg.
  When non-None, forwarded to Celery's send_task; when None, omitted
  so the call shape stays identical to pre-Phase-5.2 for callers that
  don't opt in (sdk1's existing tests remain green unchanged).
* queue_backend/fairness.py: new ``FairnessKey.as_header()`` method
  returns the wire-ready ``{"x-fairness-key": ...}`` dict. Producers
  no longer need to reference ``FAIRNESS_HEADER_NAME`` directly —
  keeps the additive-only canary in test_fairness_key.py happy.
* file_processing/structure_tool_task.py: small ``_fairness_headers``
  helper builds the header (defaulting workload_type to NON_API;
  propagating the real type is Phase 6 work). All three
  ``dispatcher.dispatch(...)`` sites (lines 468, 507, 720) now pass
  ``headers=_fairness_headers(organization_id)``.
* tests/test_executor_dispatch.py: new file. Covers header forwarding
  through all three dispatcher methods (including the "omit when
  None" pre-existing shape preservation), the FairnessKey.as_header()
  shape, and an AST inventory canary that forbids raw
  ``*.send_task("execute_extraction", ...)`` outside
  ExecutionDispatcher.

Why

UN-3501 plumbed fairness on bare dispatch() call sites. The
``execute_extraction`` task is the most workflow-execution-y dispatch
in the codebase but bypasses queue_backend (uses ExecutionDispatcher
directly), so it had no fairness header. The canary in
test_fairness_key.py audits only bare-name dispatch() and missed it.

No regression risk

* Additive: ``headers`` is optional and defaults to None on all three
  dispatcher methods; the existing 78 sdk1 tests pass unchanged.
* Producer-side only — no consumer reads ``x-fairness-key`` yet.
* No queue routing, task name, or args/kwargs change.

Test count: workers seam suite 53 -> 60 (new test_executor_dispatch.py
with 7 tests). sdk1 dispatcher suite 80/80 green.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Jun 2, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d764b7ff-8bb1-48e4-a93e-79863da7fce7

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review
✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3508-executor-fairness

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Jun 2, 2026

Quality Gate Failed Quality Gate failed

Failed conditions
5.7% Duplication on New Code (required ≤ 3%)

See analysis details on SonarQube Cloud

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant