Skip to content

UN-3056 [FEAT] Failure-only notifications with batched dispatch#1936

Open
kirtimanmishrazipstack wants to merge 52 commits into
mainfrom
UN-3056-Notify-on-API-deployment-failures
Open

UN-3056 [FEAT] Failure-only notifications with batched dispatch#1936
kirtimanmishrazipstack wants to merge 52 commits into
mainfrom
UN-3056-Notify-on-API-deployment-failures

Conversation

@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor

@kirtimanmishrazipstack kirtimanmishrazipstack commented Apr 29, 2026

What

  • New "Notify on failures only" option — subscribers stop getting webhooks for successful runs.
  • Notifications are now grouped into one webhook per time window instead of one per run.
  • New Notifications panel in Platform Settings lets each org set the grouping window (1–120 minutes, default 5).

Why

  • Customers asked for failure-only alerts; previously every successful run also fired a webhook.
  • High-volume deployments drowned in per-run webhooks; grouping collapses them into one message per window.
  • Ops can tune alert cadence per org from the UI without code changes or redeploys.

How

  • Three dispatch sites (api_v2, pipeline_v2, worker callback) enqueue into NotificationBuffer; workers/log_consumer/scheduler.sh periodically calls the internal flush endpoint, which dispatches with SELECT … FOR UPDATE OF o SKIP LOCKED.
  • Rendering goes through unstract.core.notification_clubbed_renderer — canonical {summary, events} envelope for API webhooks, Slack mrkdwn for platform=SLACK.
  • Frontend: redesigned Platform Settings page, added notify_on_failures checkbox in the notification modal, removed pagination from Setup Notifications.

Can this PR break any existing features. If yes, please list possible items. If no, please explain why.

  • Webhook payload shape changed — receivers that read the old flat per-run body must now read events[0].
  • Notifications no longer fire instantly; they arrive once per grouping window (default 5 minutes).

Database Migrations

  • backend/notification_v2/migrations/0002_notification_notify_on_failures.py — adds failure-only flag.
  • backend/notification_v2/migrations/0003_add_notification_buffer.py — adds buffer table for grouped dispatch.
  • backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py — adds file-count fields used in payload.

Env Config

Relevant Docs

Related Issues or PRs

Dependencies Versions

  • None

Notes on Testing

  • Tested on local using slack webhook + api webhook
  • Confirm the grouped webhook arrives within the configured window (default 5 min).

Screenshots

  • UI screen changes
pr_1 pr_2 pr_3
  • Slack Webhook Notification
pr_4
  • API Webhook Notification
pr_5

Checklist

I have read and understood the Contribution Guidelines.

kirtimanmishrazipstack and others added 6 commits April 24, 2026 16:25
…down rendering (#1927)

* [FIX] Make tool-run logs visible in workflow execution UI

Two stacked gaps were keeping tool-level log lines (Processing prompt,
Running LLM completion, lookup calls, etc.) out of the workflow
execution logs UI and the execution_log DB table for API / workflow
runs:

1. Empty log_events_id.  structure_tool_task seeded LOG_EVENTS_ID in
   StateStore but never threaded it into pipeline_ctx / agentic_ctx.
   ExecutorToolShim.stream_log gated publishing on
   self.log_events_id, so every tool-level log was dropped before it
   ever reached the broker.

2. Wrong payload shape.  Even with the channel threaded,
   stream_log used LogPublisher.log_progress(...) whose payload omits
   execution_id / organization_id / file_execution_id.
   get_validated_log_data (log_utils.py) requires those IDs and
   LogType == LOG to persist to execution_log, so tool-level messages
   were silently filtered at the Redis->DB drain step — orchestration
   logs persisted, tool logs did not.

Fixes:
- ExecutionContext gains execution_id + file_execution_id, populated
  in structure_tool_task for both the legacy pipeline and agentic
  contexts.
- LegacyExecutor caches the three IDs on self during execute() and
  passes them into every ExecutorToolShim construction
  (~7 callsites).
- ExecutorToolShim.stream_log now dual-emits: PROGRESS (unchanged,
  drives the IDE prompt-card live progress pane) plus LOG carrying
  the workflow IDs (feeds the workflow execution logs UI and persists
  to execution_log via the existing drain). LOG emission is gated on
  execution_id + organization_id being present, so bare IDE test
  runs without a workflow still behave as before.

Rendering polish
- The LogModal and pipeline LogsModal now pipe log text through the
  existing CustomMarkdown renderer, so backticked identifiers render
  as inline-code pills and embedded newlines break lines. This lets
  multi-line structured events (e.g. the lookup pre-call trio)
  surface as a single row with readable inner formatting.
- Prompt-key mentions inside legacy_executor tool logs are wrapped
  in backticks for consistency with the rest of the log surface.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Wrap prompt_name in backticks in remaining stream_log calls

Completes the consistency pass on tool-run log formatting: the table-
and line-item-extraction success and error paths still emitted prompt
names without backticks, so the markdown-rendered logs UI showed them
as bare text instead of inline-code pills. Matches the pattern already
applied to the other 9 stream_log calls in this file.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Validate URL schemes in CustomMarkdown link renderer

Workflow logs rendered via CustomMarkdown can contain tool-generated or
user-derived content, so an untrusted \`[text](url)\` sequence could
inject a \`javascript:\` or \`data:\` scheme and get clickable through
antd \`Typography.Link\`. Allow-list the safe external schemes (http,
https, mailto, tel) before rendering as a link; everything else falls
back to plain text while still honouring the existing internal-path
branch used for in-app navigation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Thread workflow IDs into remaining shim/context callsites

Addresses CodeRabbit review gaps so the log-plumbing fix is consistent
across every pre-dispatch and plugin-dispatch path:

- `table_ctx` / `line_item_ctx` in `legacy_executor.py` now carry
  `log_events_id`, `execution_id`, `file_execution_id` from context so
  downstream table/line-item plugins that build their own
  `ExecutorToolShim` pass the `execution_id + organization_id` gate
  and emit workflow LOG payloads.
- `structure_tool_task.py` threads the same IDs into the bare
  pre-dispatch shim, so `X2Text.process()` calls during agentic
  extraction reach the workflow logs UI.
- `LogsModal.jsx` stores the raw log string in row data and lets the
  column renderer wrap it in `CustomMarkdown` — the previous map
  stored a `<CustomMarkdown />` element that was then passed back into
  `CustomMarkdown.text`, producing `[object Object]` for multi-row
  lookups.
- Dropped `getattr(context, ...)` on `execution_id` /
  `file_execution_id` now that they are dataclass fields — matches the
  direct access used for `organization_id`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [REFACTOR] Trim overly specific comments in log-plumbing changes

Pass through the new comments added across this PR and either remove or
tighten the ones that restate what the code already shows. Keep only
the WHY lines that protect future readers from missing a non-obvious
constraint (XSS guard in CustomMarkdown, dual PROGRESS/LOG emission in
the shim, pre-dispatch shim needing workflow IDs so X2Text logs are
not silently dropped).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [REFACTOR] Extract isSafeExternalUrl into shared helpers module

Moves the URL scheme allow-list check out of CustomMarkdown into
helpers/urlSafety.js so any future component that renders links from
user- or tool-derived content can reuse the same guard instead of
re-implementing it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Tighten URL guard, split publish try/excepts, and extract shim builder

Addresses the must-fix and worth-doing comments from the PR review:

Security
- CustomMarkdown: treat protocol-relative URLs (`//host/...`) as external,
  not internal, so they can no longer skip the scheme guard via the
  `startsWith("/")` branch.
- `isSafeExternalUrl`: drop the `window.location.origin` base so bare
  strings ("javascript", "../foo") fail to parse instead of silently
  resolving to `https://<origin>/...` and passing the scheme check.

Silent failure + comment accuracy
- ExecutorToolShim.stream_log: split the PROGRESS and LOG publish paths
  into separate try/except blocks so a LogDataDTO validation failure on
  the LOG payload is no longer mis-attributed to "progress publish
  failed". Corrected the inline comments — the DB drop is driven by
  LogPublisher's `payload.type == 'LOG'` check, and only
  `execution_id` + `organization_id` are strictly required.

Refactor
- New `LegacyExecutor._build_shim()` helper — all seven
  ExecutorToolShim callsites now share one construction path so the
  workflow-ID plumbing can't drift out of sync across sites again.
- Thread `execution_id` / `file_execution_id` into the seven
  self-dispatched sub-`ExecutionContext`s alongside `log_events_id`,
  matching the table/line-item sites and keeping the context
  consistent for any downstream consumer that reads the IDs from the
  context rather than from the executor instance.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Address remaining type-design and silent-failure comments

- ExecutionContext: drop the BE-coupled inline comment, document the
  new IDs in the Attributes block, and enforce the invariant that
  execution_id implies organization_id via __post_init__.
- ExecutorToolShim: typed the three new IDs as str | None instead of
  str = "" so the signature matches the Optional semantics already
  enforced by the runtime guards.
- LegacyExecutor: move per-request state to __init__ so _log_component
  is no longer a class-level mutable default shared across instances;
  stop silently coercing None IDs to ""; add a one-shot warning when a
  tool-sourced run lands without workflow IDs so the silent-no-persist
  case is visible in GKE logs.
- structure_tool_task: emit the same warning when LOG_EVENTS_ID is
  absent from StateStore.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Surface first publish failure per shim at WARN

Both PROGRESS and LOG publish paths previously swallowed every broker
failure at DEBUG, so a misconfigured or down Redis broker meant every
tool-level log silently vanished with no operator-visible signal.

Track a per-shim _progress_publish_failed / _log_publish_failed flag
and log the first failure at WARNING (with traceback), then downgrade
subsequent failures on the same shim back to DEBUG. Preserves the
non-fatal semantics of the publish path while making broker outages
visible in GKE logs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* [FIX] Auto-bump modified_at on QuerySet.update() and bulk_update()

Django's auto_now=True only fires on Model.save(); QuerySet.update() and
bulk_update() bypass save(), so BaseModel.modified_at silently stayed at
the creation time for every bulk-path write. Audit trail drifted.

Introduce BaseModelQuerySet that injects modified_at=timezone.now() into
both paths, and expose it via BaseModelManager. Migrate all custom
managers on BaseModel subclasses to compose BaseModelManager so their
querysets inherit the overrides. Drop the ad-hoc modified_at=now() kwarg
in FileHistoryHelper now that the queryset handles it.

* [FIX] Materialize objs in BaseModelQuerySet.bulk_update to support generators

Addresses PR review: if callers pass a non-rewindable iterable (generator,
queryset iterator), the modified_at stamping loop would exhaust it before
super().bulk_update() saw it, silently updating zero rows. list(objs) up
front keeps generator callers working.

Also drop the mock-based unit test — it needed django.setup() at module
import which isn't viable without pytest-django, and proper DB-backed
coverage is tracked separately.

* [FIX] Auto-inject modified_at into BaseModel.save(update_fields=...)

Django only runs auto_now for fields listed in update_fields, so every
save(update_fields=["foo"]) on a BaseModel subclass silently drops the
modified_at bump — same family of bug as QuerySet.update/bulk_update.

Override BaseModel.save() to add modified_at to update_fields whenever
the caller supplies a restricted list without it. Also drop two dead
manual-assignment lines (execution.modified_at = timezone.now() before
save()) that were redundant with auto_now on a full save().

* [FIX] Auto-bump modified_at on upsert bulk_create and drop workarounds

QuerySet.bulk_create(update_conflicts=True, update_fields=[...]) runs an
UPDATE on conflict with only the listed fields — same auto_now-bypass as
save(update_fields=...) and QuerySet.update(). Patch BaseModelQuerySet's
bulk_create to inject modified_at into update_fields on upsert.

With that in place, the explicit "modified_at" entries in dashboard_metrics
upsert callers are redundant. Drop them.

* [REFACTOR] Tighten BaseModel auto-bump helpers and edge cases

- Extract `_with_modified_at` helper; single source of truth for the "inject
  modified_at into a partial field list" rule across `bulk_update`,
  `bulk_create` and `BaseModel.save`.
- Preserve Django's documented `save(update_fields=[])` no-op (signals-only
  save, no column writes) instead of rewriting it to `["modified_at"]`.
  Apply the same guard to `bulk_create(update_conflicts=True, update_fields=[])`.
- Match Django's positional `save()` signature (`force_insert`, `force_update`,
  `using`, `update_fields`) so callers passing flags positionally still hit
  the auto-bump override.
- Skip the per-obj `modified_at` stamp + `objs` materialization in
  `bulk_update` when the caller already listed `modified_at` — lets the
  opt-in path stay O(1) before the `super()` delegation.
- Docstring corrections: "previous save() timestamp" (not just creation
  time); manager-level convention note; precise `auto_now` semantics
  (attribute still updates in-memory, just isn't persisted without
  `update_fields` inclusion).
…wered table extraction (#1914)

* Execution backend - revamp

* async flow

* Streaming progress to FE

* Removing multi hop in Prompt studio ide and structure tool

* UN-3234 [FIX] Add beta tag to agentic prompt studio navigation item

* Added executors for agentic prompt studio

* Added executors for agentic prompt studio

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* Removed redundant envs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Removed redundant envs

* adding worker for callbacks

* adding worker for callbacks

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Pluggable apps and plugins to fit the new async prompt execution architecture

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Pluggable apps and plugins to fit the new async prompt execution architecture

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Pluggable apps and plugins to fit the new async prompt execution architecture

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* adding worker for callbacks

* fix: write output files in agentic extraction pipeline

Agentic extraction returned early without writing INFILE (JSON) or
METADATA.json, causing destination connectors to read the original PDF
and fail with "Expected tool output type: TXT, got: application/pdf".

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: replace hardcoded /tmp paths with secure temp dirs in tests (#1850)

* UN-3266 fix: replace hardcoded /tmp paths with secure temp dirs in tests

Replace hardcoded /tmp/ paths (SonarCloud S5443 security hotspots) with
pytest's tmp_path fixture or module-level tempfile.mkdtemp() constants
in all affected test files to avoid world-writable directory vulnerabilities.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update docs

* UN-3266 fix: remove dead code with undefined names in fetch_response

Remove unreachable code block after the async callback return in
fetch_response that still referenced output_count_before and response
from the old synchronous implementation, causing ruff F821 errors.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* Un 3266 fix security hotspot tmp paths (#1851)

* UN-3266 fix: replace hardcoded /tmp paths with secure temp dirs in tests

Replace hardcoded /tmp/ paths (SonarCloud S5443 security hotspots) with
pytest's tmp_path fixture or module-level tempfile.mkdtemp() constants
in all affected test files to avoid world-writable directory vulnerabilities.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: resolve ruff linting failures across multiple files

- B026: pass url positionally in worker_celery.py to avoid star-arg after keyword
- N803: rename MockAsyncResult to mock_async_result in test_tasks.py
- E501/I001: fix long line and import sort in llm_whisperer helper
- ANN401: replace Any with object|None in dispatcher.py; add noqa in test helpers
- F841: remove unused workflow_id and result assignments

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

* UN-3266 fix: resolve SonarCloud bugs S2259 and S1244 in PR #1849

- S2259: guard against None after _discover_plugins() in loader.py
  to satisfy static analysis on the dict[str,type]|None field type
- S1244: replace float equality checks with pytest.approx() in
  test_answer_prompt.py and test_phase2h.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: resolve SonarCloud code smells in PR #1849

- S5799: Merge all implicit string concatenations in log messages
  (legacy_executor.py, tasks.py, dispatcher.py, orchestrator.py,
   registry.py, variable_replacement.py, structure_tool_task.py)
- S1192: Extract duplicate literal to _NO_CELERY_APP_MSG constant in
  dispatcher.py
- S1871: Merge identical elif/else branches in tasks.py and
  test_sanity_phase6j.py
- S1186: Add comment to empty stub method in test_sanity_phase6a.py
- S1481: Remove unused local variables in test_sanity_phase6d/e/f/g/h/j
  and test_phase5d.py
- S117: Rename PascalCase local variables to snake_case in
  test_sanity_phase3/5/6i.py
- S5655: Broaden tool type annotation to StreamMixin in
  IndexingUtils.generate_index_key and PlatformHelper.get_adapter_config
- docker:S7031: Merge consecutive RUN instructions in
  worker-unified.Dockerfile
- javascript:S1128: Remove unused pollForCompletion import in
  usePromptRun.js

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* UN-3266 fix: wrap long log message in dispatcher.py to fix E501

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: resolve remaining SonarCloud S117 naming violations

Rename PascalCase local variables to snake_case to comply with S117:

- legacy_executor.py: rename tuple-unpacked _get_prompt_deps() results
  (AnswerPromptService→answer_prompt_svc, RetrievalService→retrieval_svc,
  VariableReplacementService→variable_replacement_svc, LLM→llm_cls,
  EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls) and
  update all downstream usages including _apply_type_conversion and
  _handle_summarize
- test_phase1_log_streaming.py: rename Mock* local variables to
  mock_* snake_case equivalents
- test_sanity_phase3.py: rename MockDispatcher→mock_dispatcher_cls
  and MockShim→mock_shim_cls across all 10 test methods
- test_sanity_phase5.py: rename MockShim→mock_shim, MockX2Text→mock_x2text
  in 6 test methods; MockDispatcher→mock_dispatcher_cls in dispatch test;
  fix LLM_cls→llm_cls, EmbeddingCompat→embedding_compat_cls,
  VectorDB→vector_db_cls in _mock_prompt_deps helper

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* UN-3266 fix: resolve remaining SonarCloud code smells in PR #1849

- test_sanity_phase2/4.py, test_answer_prompt.py: rename PascalCase
  local variables in _mock_prompt_deps/_mock_deps to snake_case
  (RetrievalService→retrieval_svc, VariableReplacementService→
  variable_replacement_svc, Index→index_cls, LLM_cls→llm_cls,
  EmbeddingCompat→embedding_compat_cls, VectorDB→vector_db_cls,
  AnswerPromptService→answer_prompt_svc_cls) — fixes S117
- test_sanity_phase3.py: remove unused local variable "result" — fixes S1481
- structure_tool_task.py: remove redundant json.JSONDecodeError from
  except clause (subclass of ValueError) — fixes S5713
- shared/workflow/execution/service.py: replace generic Exception with
  RuntimeError for structure tool failure — fixes S112
- run-worker-docker.sh: define EXECUTOR_WORKER_TYPE constant and
  replace 10 literal "executor" occurrences — fixes S1192

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: resolve SonarCloud cognitive complexity and code smell violations

- Reduce cognitive complexity in answer_prompt.py:
  - Extract _build_grammar_notes, _run_webhook_postprocess helpers
  - _is_safe_public_url: extracted _resolve_host_addresses helper
  - handle_json: early-return pattern eliminates nesting
  - construct_prompt: delegates grammar loop to _build_grammar_notes
- Reduce cognitive complexity in legacy_executor.py:
  - Extract _execute_single_prompt, _run_table_extraction helpers
  - Extract _run_challenge_if_enabled, _run_evaluation_if_enabled
  - Extract _inject_table_settings, _finalize_pipeline_result
  - Extract _convert_number_answer, _convert_scalar_answer
  - Extract _sanitize_dict_values helper
  - _handle_answer_prompt CC reduced from 50 to ~7
- Reduce CC in structure_tool_task.py: guard-clause refactor
- Reduce CC in backend: dto.py, deployment_helper.py,
  api_deployment_views.py, prompt_studio_helper.py
- Fix S117: rename PascalCase local vars in test_answer_prompt.py
- Fix S1192: extract EXECUTOR_WORKER_TYPE constant in run-worker.sh
- Fix S1172: remove unused params from structure_tool_task.py
- Fix S5713: remove redundant JSONDecodeError in json_repair_helper.py
- Fix S112/S5727 in test_execution.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: remove unused RetrievalStrategy import from _handle_answer_prompt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* UN-3266 fix: rename UsageHelper params to lowercase (N803)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* UN-3266 fix: resolve remaining SonarCloud issues from check run 66691002192

- Add @staticmethod to _sanitize_null_values (fixes S2325 missing self)
- Reduce _execute_single_prompt params from 25 to 11 (S107)
  by grouping services as deps tuple and extracting exec params
  from context.executor_params
- Add NOSONAR suppression for raise exc in test helper (S112)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* UN-3266 fix: remove unused locals in _handle_answer_prompt (F841)

execution_id, file_hash, log_events_id, custom_data are now extracted
inside _execute_single_prompt from context.executor_params.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>

* fix: resolve Biome linting errors in frontend source files

Auto-fixed 48 lint errors across 56 files: import ordering, block
statements, unused variable prefixing, and formatting issues.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: replace dynamic import of SharePermission with static import in Workflows

Resolves vite build warning about SharePermission.jsx being both
dynamically and statically imported across the codebase.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* fix: resolve SonarCloud warnings in frontend components

- Remove unnecessary try-catch around PostHog event calls
- Flip negated condition in PromptOutput.handleTable for clarity

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Address PR #1849 review comments: fix null guards, dead code, and test drift

- Remove redundant inline `import uuid as _uuid` in views.py (use module-level uuid)
- URL-encode DB_USER in worker_celery.py result backend connection string
- Remove misleading task_queues=[Queue("executor")] from dispatch-only Celery app
- Remove dead `if not tool:` guards after objects.get() (already raises DoesNotExist)
- Move profile_manager/default_profile null checks before first dereference
- Reorder ProfileManager.objects.get before mark_document_indexed in tasks.py
- Handle ProfileManager.DoesNotExist as warning, not hard failure
- Wrap PostHog analytics in try/catch so failures don't block prompt execution
- Handle pending-indexing 200 response in usePromptRun.js (clear RUNNING status)
- Reset formData when metadata is missing in ConfigureDs.jsx
- Fix test_should_skip_extraction tests: function now takes 1 arg (outputs only)
- Fix agentic routing tests: mock X2Text.process, remove stale platform_helper kwarg

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fix missing llm_usage_reason for summarize LLM usage tracking

Add PSKeys.LLM_USAGE_REASON to usage_kwargs in _handle_summarize() so
summarization costs appear under summarize_llm in API response metadata.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* UN-3266 [FIX] Fix single-pass extraction routing in LegacyExecutor

- Route _handle_structure_pipeline to _handle_single_pass_extraction when
  is_single_pass=True (was always calling _handle_answer_prompt)
- Delegate _handle_single_pass_extraction to cloud plugin via ExecutorRegistry,
  falling back to _handle_answer_prompt if plugin not installed

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Fixing API depployment response mismatches

* Add complete_vision() method to SDK1 LLM for multimodal completions

Adds a new complete_vision() method alongside existing complete() that
accepts pre-built multimodal messages (text + image_url) in OpenAI-style
format. LiteLLM auto-translates for Anthropic/Bedrock/Vertex providers.
This enables the agentic table extractor plugin to send page images
alongside text prompts for VLM-based table detection and extraction.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* UN-3266 [FIX] Gate Run button by agentic table readiness checklist

- PromptCardItems loads AgenticTableChecklist plugin and owns the
  isAgenticTableReady state, rendering the checklist above the prompt
  text area and delegating the settings gear visibility to the plugin.
- Header and PromptOutput disable their Run buttons when
  isAgenticTableReady is false (default true for non-agentic types).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* [FIX] Use correct primary key field in prompt count subquery (#1905)

ToolStudioPrompt uses prompt_id as its primary key, not id.
Count("id") causes FieldError on the list endpoint (500).

Co-authored-by: Chandrasekharan M <chandrasekharan@zipstack.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [FIX] Add agentic_table as valid enforce_type choice

The cloud build adds "agentic_table" to the prompt enforce_type
dropdown, but the OSS ToolStudioPrompt model rejected it as an
invalid choice. Add AGENTIC_TABLE to EnforceType and ship a
matching migration so the value can be persisted.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* UN-3266 [FIX] Wire agentic_table enforce_type to executor dispatch

The single-prompt run flow had no branch for prompts with
enforce_type=agentic_table, so clicking Run silently fell through to
the legacy prompt-service path and never invoked the agentic_table
executor. Adds an AGENTIC_TABLE constant to TSPKeys, includes it in
the OperationNotSupported guard, and dispatches to
PayloadModifier.execute_agentic_table when the plugin is available
so the result still flows through _handle_response.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* UN-3266 [FIX] Add agentic_table queue to executor worker defaults

The ExecutionDispatcher derives the queue name from the executor name
(celery_executor_{name}), so dispatches to the agentic_table executor
land on celery_executor_agentic_table. The local docker-compose default
only listed celery_executor_legacy and celery_executor_agentic, so no
worker consumed the new queue and dispatch hung for the full 1-hour
result timeout. Adds the missing queue to the docker-compose default.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* UN-3266 [FIX] Dispatch agentic_table prompts to executor on IDE Run

The IDE Run button was building a legacy answer_prompt payload for
agentic_table prompts, so the agentic table executor was never
invoked. Branch fetch_response on enforce_type so agentic_table
prompts are built via the cloud payload_modifier plugin and
dispatched directly to celery_executor_agentic_table. Add the
enforce_type to the OSS dropdown choices and the JSON-dump set in
OutputManagerHelper so the persisted output is parseable by the FE
table renderer.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* UN-3266 [FIX] Reshape agentic_table executor output in IDE callback

The agentic_table executor returns {"output": {"tables": [...],
"page_count": ..., "headers": [...], ...}}, but
OutputManagerHelper.handle_prompt_output_update reads
outputs[prompt.prompt_key] when persisting prompt output. Without a
reshape the table list never lands under the prompt key and the FE
sees an empty result.

When cb_kwargs carries is_agentic_table=True and prompt_key (set by
the cloud build_agentic_table_payload), reshape outputs to
{prompt_key: tables} before calling update_prompt_output. The
executor itself also shapes its envelope, so this is a defensive
double-keying that keeps the legacy answer_prompt path untouched.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

* Fixing timeout issues

* API deployment fixes for Agentic table extractor

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Fixing syntax issues

* Fix agentic_table executor reading INFILE after JSON overwrite

Read from SOURCE instead of INFILE when dispatching to the
agentic_table executor. INFILE gets overwritten with JSON output
by the regular pipeline, causing PDFium parse errors when the
agentic_table executor tries to process it as a PDF.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

---------

Signed-off-by: harini-venkataraman <115449948+harini-venkataraman@users.noreply.github.com>
Co-authored-by: Ghost Jake <89829542+Deepak-Kesavan@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: Chandrasekharan M <chandrasekharan@zipstack.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented Apr 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Adds org-configurable batched webhook buffering with enqueue/process endpoints, clubbed envelope rendering, worker routing to enqueue events, file-count propagation for failure-only routing, DB buffer model retention/dispatch lifecycle, and frontend settings/UI for interval and failure-only toggles.

Changes

Buffered notification feature

Layer / File(s) Summary
Workflow execution file-counts
backend/workflow_manager/workflow_v2/models/execution.py
Adds successful_files and failed_files fields to WorkflowExecution and uses them in run-status/timeline logic.
Internal API: buffer endpoints & processing
backend/notification_v2/internal_api_views.py
Adds enqueue_notification_buffer and process_notification_buffer; implements enqueue validation (including notify_on_failures suppression), grouping/claiming, clubbed rendering, marking DISPATCHED, scheduling Celery sends, and GC of terminal/aged rows.
Worker routing & enqueue client
workers/shared/patterns/notification/helper.py
Routes WEBHOOK events to backend enqueue endpoint (v1/webhook/buffer/enqueue/), passes execution_id and file-counts when listing notifications, and adds _enqueue_to_buffer/_route_notification and ENQUEUE_BUFFER_ENDPOINT.
Frontend settings & UI
frontend/src/components/settings/platform/PlatformSettings.jsx, frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx, frontend/src/components/settings/platform/PlatformSettings.css, frontend/src/components/pipelines-or-deployments/notification-modal/DisplayNotifications.jsx
Adds org notification-interval UI (minutes ↔ seconds) with GET/PATCH, notify_on_failures checkbox in create modal, disables table pagination, and updates styling.
Supporting: clubbed renderer & payloads
unstract/core/src/unstract/core/notification_clubbed_renderer.py, backend/notification_v2/clubbed_renderer.py, backend/pipeline_v2/dto.py, unstract/core/src/unstract/core/data_models.py
Introduces canonical clubbed envelope and Slack renderer, extends pipeline payloads and notification payloads to include total/successful/failed file counts.
sequenceDiagram
  participant Producer as Notifier Source
  participant Backend as Backend API
  participant BufferAPI as Buffer Enqueue API
  participant DB as NotificationBuffer
  participant Scheduler as Scheduler/LogConsumer
  participant Celery as Celery Worker
  participant Webhook as External Webhook

  Producer->>Backend: trigger notification (includes execution_id)
  Backend->>Backend: load execution -> compute file counts
  Backend->>Backend: filter notifications (notify_on_failures)
  Backend->>BufferAPI: POST enqueue_notification_buffer (payload + counts)
  BufferAPI->>DB: persist NotificationBuffer (PENDING, flush_after)
  Scheduler->>DB: process_notification_buffer (groups due flushes)
  DB->>DB: claim rows FOR UPDATE SKIP LOCKED
  DB->>DB: build_envelope() / render_clubbed_message()
  DB->>DB: mark rows DISPATCHED
  DB->>Celery: enqueue single clubbed send task (buffer ids)
  Celery->>Webhook: POST clubbed payload
  Webhook-->>Celery: 200 OK / error
  Celery->>DB: on success keep DISPATCHED / on failure mark DEAD_LETTER or revert to PENDING
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 72.22% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main feature: failure-only notifications with batched dispatch instead of per-run webhooks.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description check ✅ Passed The PR description comprehensively covers all required template sections with detailed information about changes, rationale, implementation approach, and breaking changes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch UN-3056-Notify-on-API-deployment-failures

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@kirtimanmishrazipstack kirtimanmishrazipstack changed the title Un 3056 notify on api deployment failures UN-3056 [FEAT] Filter notifications by run outcome Apr 29, 2026
kirtimanmishrazipstack and others added 13 commits May 5, 2026 20:49
* UN-3439 [FIX] Accept wildcard subdomain origins in SocketIO and Django CORS (#1938)

* UN-3439 [FIX] Accept wildcard subdomain origins in SocketIO and Django CORS

Production socket connections were failing for `*.env.us-central.unstract.com`
because python-socketio does exact-string comparison on `cors_allowed_origins`,
so a literal `*` pattern silently rejected every real subdomain.

- Add `CORS_ALLOWED_ORIGIN_REGEXES` derived from `WEB_APP_ORIGIN_URL_WITH_WILD_CARD`.
- Wire SocketIO via `_RegexOrigin` whose `__eq__` does the regex match — single
  list entry covers all wildcard subdomains, no library subclass needed.
- Normalize `WEB_APP_ORIGIN_URL` through `urlparse` so trailing slashes / paths
  in env are stripped (also fixes the `…com//oauth-status/` double-slash).
- Add startup guard for malformed env values.

Resolves item #1 of UN-3439. Items #2/#3 (decoupling indexing from Socket.io,
fallback) are owned separately.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3439 [FIX] Address PR review: canonical origin, fullmatch, unhashable RegexOrigin, tests

Addresses five review comments on #1938:

1. coderabbitai (Major) — RFC 6454 canonicalization. Browsers serialize
   `Origin` headers with a lowercase host and no explicit default ports;
   `parsed_url.netloc` preserved both, so `https://APP.EXAMPLE.COM:443`
   would silently fail to match the browser's `https://app.example.com`.
   Switch to `parsed_url.hostname` + drop default ports, and reject
   non-http(s) schemes at startup.

2. greptile (P2) — `re.fullmatch` instead of `re.match`. With `re.match`
   plus `$`, a candidate ending in `\n` matches because `$` is allowed
   before an optional trailing newline. `fullmatch` removes the ambiguity.

3. self — `_RegexOrigin.__hash__` violated `a == b ⇒ hash(a) == hash(b)`
   (one fixed pattern hash vs. many matching strings). Today this is
   masked because python-socketio uses linear `__eq__` on a list, but if
   the allow-list is ever wrapped in a set, every legitimate subdomain
   would silently be rejected — exactly the failure mode UN-3439 closes.
   Make instances unhashable so the contract can't be broken.

4. self — No regression tests. Add `backend/utils/tests/test_cors_origin.py`
   (33 cases) covering: regex match/no-match, lookalike spoofing, scheme
   mismatch, trailing-newline rejection, non-string equality protocol,
   unhashability, ReDoS bounds, URL normalization (case, default ports,
   trailing slash, paths, queries), startup-guard rejections (empty,
   no-scheme, non-browser-scheme, no-host), and end-to-end via the same
   `RegexOrigin` path SocketIO uses.

5. self — Over-clever wildcard-to-regex builder. The
   `split('*').join(re.escape, ...)` construction generalised to N
   wildcards but the input has exactly one; replace with a direct rf-string
   that's self-evident on review.

Refactor for testability: extract `RegexOrigin` and `normalize_web_app_origin`
into `backend/utils/cors_origin.py` (Django-free, importable from settings
and tests). Settings now delegates to one helper call; `log_events.py`
imports `RegexOrigin`. No behavioural change beyond what each comment fixes.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3439 [FIX] Address SonarCloud quality gate

The Sonar quality gate failed with C reliability + 5 security hotspots, all
on the new test file:

- S905 (Bug, Major) — `{ro}` flagged as no-side-effect statement (Sonar
  doesn't see the implicit `__hash__` call). Drove the C reliability rating.
  Fix: use `len({ro})` so the side effect is via an explicit function call;
  test still asserts the same `TypeError`.
- S5727 (Code Smell, Critical) — `assert ro != None` is tautological and
  doesn't exercise `__eq__`. Switch to `(ro == None) is False` which directly
  tests that `NotImplemented` falls back to identity-equality.
- S5332 × 5 (Hotspots) — `http://` and `ftp://` literals in test data.
  These are intentional inputs proving the rejection logic. Annotate with
  `# NOSONAR` and an explanatory comment so the hotspots can be marked
  reviewed.

No production code changed; tests still 33/33 passing.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3439 [FIX] Remove last S5727 code smell — test __eq__ via dunder

Sonar S5727 correctly inferred that ``ro == None`` is statically always
False (NotImplemented falls back to identity), making the assertion look
tautological. The intent is to lock the protocol contract: ``__eq__`` must
return the ``NotImplemented`` sentinel for non-strings. Test that directly
via ``ro.__eq__(None) is NotImplemented`` instead of going through ``==``.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3439 [FIX] Address remaining CodeRabbit nits — port validation, ReDoS bound

Two minor follow-ups from the second CodeRabbit pass:

- `parsed.port` is a property that raises ValueError on malformed/out-of-range
  inputs (e.g. `:abc`, `:99999`). That bypassed our normalized config-error
  message and surfaced as a stack trace. Wrap the access and re-raise with
  the same actionable text. Adds two test cases (`https://example.com:abc`,
  `https://example.com:99999`) to lock the new behaviour.

- The 50ms ReDoS timing bound is too tight for noisy CI runners. Loosen to
  500ms — still orders of magnitude below what catastrophic backtracking
  would produce.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ReverseMerge: V0.161.4 hotfix (#1943)

* Change csp to report only

* [HOTFIX] Bool-parse ENABLE_HIGHLIGHT_API_DEPLOYMENT env var (v0.161.4) (#1939)

[HOTFIX] Bool-parse ENABLE_HIGHLIGHT_API_DEPLOYMENT env var (#1937)

[FIX] Bool-parse ENABLE_HIGHLIGHT_API_DEPLOYMENT env var

os.environ.get returns the raw string when the variable is set, so
ENABLE_HIGHLIGHT_API_DEPLOYMENT="False" was truthy in Python (any
non-empty string is truthy). Wrap in CommonUtils.str_to_bool so
"False" / "false" / "0" actually evaluate to False.

The setting is consumed by the cloud configuration plugin's spec
default (ConfigSpec.default in plugins/configuration/cloud_config.py)
on cloud and on-prem builds. With this fix, an admin who explicitly
sets the env var to a falsy string sees highlight data stripped as
expected.

Co-authored-by: vishnuszipstack <117254672+vishnuszipstack@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Deepak K <89829542+Deepak-Kesavan@users.noreply.github.com>
Co-authored-by: vishnuszipstack <117254672+vishnuszipstack@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3448 [FIX] Remove vestigial `uv pip install` line in uv-lock-automation workflow (#1941)

* UN-3448 [FIX] Add --system flag to uv pip install in uv-lock-automation workflow

Modern uv requires uv pip install to run inside a virtual environment OR
with the explicit --system flag. The workflow currently has neither, so
it errors out:

  error: No virtual environment found for Python 3.12.9; run `uv venv`
  to create an environment, or pass `--system` to install into a
  non-virtual environment

This breaks every PR that touches a pyproject.toml (the workflow's
paths filter triggers on those). Last successful run was 2026-04-01,
before a behaviour change in uv or astral-sh/setup-uv@v7.

The --system flag is exactly what the error message suggests and is
correct here — we install pip into the runner's system Python; the
downstream uv-lock.sh script creates its own venvs as needed.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* UN-3448 [FIX] Remove vestigial `uv pip install` line per review

Per @jaseemjaskp's review: the pre-step `uv pip install ... pip` does
nothing useful for this workflow. The downstream uv-lock.sh script
uses uv sync at line 74, which manages its own venvs internally and
never invokes pip directly:

  $ grep -rn 'pip' docker/scripts/uv-lock-gen/
  docker/scripts/uv-lock-gen/uv-lock.sh:2:set -o pipefail

Only match is pipefail (shell option), no real pip references.

Removing the line entirely is cleaner than papering over with --system.
The line was likely copy-pasted from a sibling workflow that legitimately
needed pip in the system Python.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ReverseMerge: V0.163.2 hotfix (#1946)

* [HOTFIX] Use importlib.util.find_spec for pluggable worker discovery (#1918)

* [FIX] Use importlib.util.find_spec for pluggable worker discovery

_verify_pluggable_worker_exists() previously checked for the literal file
`pluggable_worker/<name>/worker.py` on disk, which breaks when the plugin
has been compiled to a .so (Nuitka, Cython, or any C extension) — the
module is perfectly importable but the pre-check rejects it because only
the .py extension is considered.

Replace the filesystem check with importlib.util.find_spec(), which is
Python's standard way to ask "is this module resolvable by the import
system?". It honors every registered finder — source .py, compiled .so,
bytecode .pyc, namespace packages, zipimports — so the function now
matches what its docstring claims: verifying the module can be loaded,
not that a specific file extension is present.

Behavior is preserved for existing deployments:
- Images with no `pluggable_worker/<name>/` subpackage → find_spec
  raises ModuleNotFoundError (ImportError subclass) → returns False.
- Images with source .py → find_spec resolves the .py → returns True.
- Images with compiled .so → find_spec resolves the .so → returns True.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Handle ValueError from find_spec in pluggable worker verification

Greptile-flagged edge case: importlib.util.find_spec() can raise
ValueError (not just ImportError) when sys.modules has a partially
initialised module entry with __spec__ = None from a prior failed import.
Broaden the except to catch both.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FIX] Resolve api-deployment worker directory from enum import path

worker.py:452 did worker_type.value.replace("-", "_") to derive the
on-disk dir name. All WorkerType enum values already use underscores,
so the replace was a no-op; for API_DEPLOYMENT whose dir is
"api-deployment" (hyphen), it resolved to "api_deployment" and the
os.path.exists() check failed. Boot then logged a spurious
"❌ Worker directory not found: /app/api_deployment" at ERROR level.

The task registration path (builder + celery autodiscover via
to_import_path) is unaffected, so this was purely log noise — but
noise at ERROR level that masks real failures in log scans.

Fix: derive the directory from the authoritative to_import_path()
which already handles the hyphen case (api_deployment -> api-deployment).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [HOTFIX] Add IAM Role / Instance Profile auth mode to AWS Bedrock adapter (#1944)

* [FEAT] Allow Bedrock to fall through to boto3's default credential chain

Match the S3/MinIO connector pattern: when AWS access keys are left blank
on the Bedrock LLM and embedding adapter forms, drop them from the kwargs
dict so boto3's default credential chain handles authentication. This
unlocks IAM role / instance profile / IRSA / AWS Profile scenarios on
hosts that already have ambient AWS credentials (e.g. EKS workers with
IRSA, EC2 with an instance profile).

- llm1/static/bedrock.json: clarify access-key descriptions to mention
  IRSA and instance profile (already non-required at v0.163.2 base).
- embedding1/static/bedrock.json: drop aws_access_key_id and
  aws_secret_access_key from top-level required; same description fix;
  expose aws_profile_name for parity with the LLM form.
- base1.py: AWSBedrockLLMParameters and AWSBedrockEmbeddingParameters
  now strip empty access-key values from the validated kwargs before
  returning, so empty strings don't override boto3's default chain.
  AWSBedrockEmbeddingParameters fields gain explicit None defaults
  and an aws_profile_name field.

Backward-compatible: existing adapters with access keys filled in
continue to work unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [FEAT] Add Authentication Type selector to Bedrock adapter form

Add an explicit `auth_type` selector with two options, making the auth
choice clear to users:

- "Access Keys" (default): existing flow, keys required
- "IAM Role / Instance Profile (on-prem AWS only)": no fields; relies on
  boto3's default credential chain (IRSA on EKS, task role on ECS,
  instance profile on EC2). Description on the selector explicitly notes
  this option is only for AWS-hosted Unstract deployments.

The form-only auth_type field is stripped before LiteLLM validation in
both AWSBedrockLLMParameters.validate() and AWSBedrockEmbeddingParameters.
validate(). Empty access keys continue to be stripped so boto3 falls
through to the default chain even when the access_keys arm is selected
without values (matches the S3/MinIO connector pattern).

Backward-compatible: legacy adapters without auth_type behave as
"Access Keys" mode (the default), and existing keys are forwarded
unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [REVIEW] Address Bedrock auth_type review feedback

Fixes the P0/P1 issues raised by greptile-apps and jaseemjaskp on
PR #1944.

Behaviour fixes:
- Stale-key leak in IAM Role mode: switching an existing adapter from
  Access Keys to IAM Role would carry truthy stored access keys through
  the strip-empty-only loop, so boto3 silently authenticated with the
  old long-lived credentials instead of falling through to the host's
  IRSA / instance-profile identity. Both LLM and embedding paths were
  affected.
- Silent acceptance of unknown auth_type: a typo (e.g. "access_key") or
  a malformed payload from a non-UI client passed through the dict
  comprehension untouched, with no enum guard.
- Cross-field validation gap: explicit Access Keys mode with blank or
  whitespace-only values silently fell through to the default
  credential chain instead of surfacing the misconfiguration.

Implementation:
- Add a module-level _resolve_bedrock_aws_credentials helper used by
  both AWSBedrockLLMParameters.validate() and AWSBedrock
  EmbeddingParameters.validate(), so the auth-type contract is
  expressed once.
  - Validates auth_type against an allowlist (None | "access_keys" |
    "iam_role"); raises ValueError on anything else.
  - iam_role: unconditionally drops aws_access_key_id and
    aws_secret_access_key.
  - access_keys (explicit): requires non-blank values; raises ValueError
    if either is empty or whitespace-only.
  - Legacy (auth_type absent): retains the lenient strip behaviour so
    pre-PR adapter configurations continue to deserialise unchanged.
- Restore aws_region_name as required (no `= None` default) on
  AWSBedrockEmbeddingParameters; only credentials may legitimately be
  absent.
- Drop the orphan aws_profile_name field from
  embedding1/static/bedrock.json: it was added for parity with the LLM
  form but lives outside the auth_type oneOf and contradicts the
  selector's "no further input" semantics. The LLM form already had
  aws_profile_name pre-PR and is left alone for backwards compatibility.

Tests:
- New tests/test_bedrock_adapter.py covers 15 cases across LLM and
  embedding adapters: legacy-no-auth-type, explicit access_keys with
  valid/blank/whitespace keys, iam_role with stale/no keys, unknown
  auth_type rejection, cross-field validation, and preservation of
  unrelated params (model_id, aws_profile_name, region, thinking).

Skipped (P2 nice-to-have):
- Comment-scope clarification, MinIO reference rewording,
  validate-mutates-caller'\''s-dict, and the LLM form description nit
  about aws_profile_name visibility. These don'\''t change behaviour
  and can be addressed in a follow-up.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>

---------

Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Athul <89829560+athul-rs@users.noreply.github.com>

* batch notification

---------

Co-authored-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: Deepak K <89829542+Deepak-Kesavan@users.noreply.github.com>
Co-authored-by: vishnuszipstack <117254672+vishnuszipstack@users.noreply.github.com>
Co-authored-by: Praveen Kumar <praveen@zipstack.com>
Co-authored-by: Chandrasekharan M <117059509+chandrasekharan-zipstack@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Athul <89829560+athul-rs@users.noreply.github.com>
…m:Zipstack/unstract into UN-3056-Notify-on-API-deployment-failures
* batch notification

* notification slack
@kirtimanmishrazipstack kirtimanmishrazipstack marked this pull request as ready for review May 13, 2026 07:55
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/workflow_manager/internal_serializers.py (1)

176-184: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate the file aggregates against total_files.

These fields are validated independently right now, so impossible payloads like total_files=1, successful_files=2, failed_files=2 will pass and then be persisted. That will skew the new outcome-based notification logic.

Suggested fix
 class WorkflowExecutionStatusUpdateSerializer(serializers.Serializer):
     """Serializer for updating workflow execution status."""
@@
     failed_files = serializers.IntegerField(required=False, min_value=0)
     attempts = serializers.IntegerField(required=False, min_value=0)
     execution_time = serializers.FloatField(required=False, min_value=0)
+
+    def validate(self, attrs):
+        total_files = attrs.get("total_files")
+        successful_files = attrs.get("successful_files")
+        failed_files = attrs.get("failed_files")
+
+        if (successful_files is not None or failed_files is not None) and total_files is None:
+            raise serializers.ValidationError(
+                {"total_files": "total_files is required when file aggregates are provided."}
+            )
+
+        if total_files is not None:
+            if successful_files is not None and successful_files > total_files:
+                raise serializers.ValidationError(
+                    {"successful_files": "successful_files cannot exceed total_files."}
+                )
+            if failed_files is not None and failed_files > total_files:
+                raise serializers.ValidationError(
+                    {"failed_files": "failed_files cannot exceed total_files."}
+                )
+            if (
+                successful_files is not None
+                and failed_files is not None
+                and successful_files + failed_files > total_files
+            ):
+                raise serializers.ValidationError(
+                    "successful_files + failed_files cannot exceed total_files."
+                )
+
+        return attrs
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/workflow_manager/internal_serializers.py` around lines 176 - 184, The
serializer currently validates total_files, successful_files and failed_files
independently; add a validate(self, data) method in the same serializer (where
status, error_message, total_files, successful_files, failed_files, attempts,
execution_time are defined) that, when total_files is provided, enforces that
successful_files and failed_files are each <= total_files (if present) and that
(successful_files + failed_files) <= total_files; also handle the case where
only one of successful_files/failed_files is present by ensuring it does not
exceed total_files, and raise serializers.ValidationError with a clear message
on violation so invalid aggregates like total_files=1, successful_files=2 are
rejected before persisting.
🧹 Nitpick comments (2)
backend/notification_v2/views.py (1)

56-68: ⚡ Quick win

Use tuple for permission_classes class attribute.

Class attributes that are collections should be immutable (tuples) rather than mutable (lists) to avoid potential issues and follow best practices.

♻️ Proposed fix
-    permission_classes = [IsAuthenticated, IsOrganizationAdmin]
+    permission_classes = (IsAuthenticated, IsOrganizationAdmin)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/notification_v2/views.py` around lines 56 - 68, The class attribute
permission_classes on NotificationSettingsView is currently a list; change it to
an immutable tuple to follow best practices by replacing the mutable list
[IsAuthenticated, IsOrganizationAdmin] with a tuple (IsAuthenticated,
IsOrganizationAdmin) so permission_classes is not modifiable at runtime and
matches other DRF class-attribute patterns.
backend/notification_v2/tasks.py (1)

46-50: ⚡ Quick win

Combine the implicitly concatenated strings.

The two string literals on line 47 are implicitly concatenated. While valid Python, this can be error-prone and less readable.

♻️ Proposed fix
     logger.warning(
-        "metric=notification_batch_dispatched_total result=dead_letter rows=%d " "exc=%r",
+        "metric=notification_batch_dispatched_total result=dead_letter rows=%d exc=%r",
         updated,
         exc,
     )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/notification_v2/tasks.py` around lines 46 - 50, The logger.warning
call currently uses two implicitly concatenated string literals; replace them
with a single combined format string in the logger.warning invocation so the
message is explicit and readable (keep the format placeholders and the same
arguments: updated and exc), e.g., a single string like
"metric=notification_batch_dispatched_total result=dead_letter rows=%d exc=%r"
passed to logger.warning with updated and exc.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@backend/notification_v2/clubbed_renderer.py`:
- Line 3: Update the docstring in clubbed_renderer.py to replace the ambiguous
multiplication symbol "×" with a plain ASCII "x" so it satisfies Ruff rule
RUF002; locate the module-level or function/class docstring that contains the
sentence "The same envelope shape feeds every channel × mode cell so receivers
never" and change "×" to "x" (i.e., "channel x mode") to avoid the lint failure.

In `@backend/notification_v2/helper.py`:
- Around line 34-41: The current auth_sig is built by joining fields with "|"
which is ambiguous because authorization_key/header may contain "|" — change the
construction in helper.py where the hash is computed (the block that builds raw
from notification.authorization_type/authorization_key/authorization_header and
returns hashlib.sha256(...).hexdigest()) to encode the three parts as an
unambiguous structured value before hashing (e.g., create a fixed-order
list/tuple of the three parts with fallback to _AUTH_SIG_NONE and then
JSON-serialize it with stable separators, or use a length-prefixed
concatenation) and hash that encoded representation so different tuples can
never collide due to delimiter characters.

In `@backend/notification_v2/internal_api_views.py`:
- Around line 423-425: The batching flush currently groups rows by
(organization_id, webhook_url, auth_sig) but then uses rows[0].platform when
calling render_clubbed_message in _dispatch_group, causing mixed-platform
batches; update the flush query that persists/groups batches to include platform
in its grouping key (and add the corresponding DB index change) so grouping is
done by (organization_id, webhook_url, auth_sig, platform), and ensure
_dispatch_group (and any other consumer like the code around lines ~496-500)
reads platform from the grouped key rather than assuming rows[0].platform.

In `@backend/notification_v2/migrations/0002_notification_notify_on_failures.py`:
- Around line 10-21: The add-field migration currently creates a BooleanField
notify_on_failures which only distinguishes ALL vs FAILURES_ONLY; change this to
a tri-state field (e.g. models.CharField with choices or a small IntegerField)
on the Notification model in this migration so it can represent ALL,
FAILURES_ONLY, and SUCCESS_ONLY (use explicit choices like ("all","ALL"),
("failures","FAILURES_ONLY"), ("success","SUCCESS_ONLY")), set a sensible
default (e.g. "all"), and update the db_comment to document the three modes;
ensure the migration operation uses the new field type and name
notify_on_failures so downstream code can read the string/enum value rather than
a boolean.

In `@backend/notification_v2/migrations/0003_add_notification_buffer.py`:
- Around line 14-28: The migration adds a non-null CharField delivery_mode to
the notification model with default="BATCHED", which will change existing rows
to BATCHED on deploy; instead, modify the migration to preserve existing
behavior by performing a two-step change: 1) add delivery_mode as nullable (or
without a DB-level default) and include a RunPython data migration that sets
delivery_mode="IMMEDIATE" for existing Notification rows that should remain
immediate, and 2) then add a subsequent migration to set default="BATCHED" and
make the field non-nullable for future records; reference the migration
module/migration class in 0003_add_notification_buffer.py and the model name
"notification" and field name "delivery_mode" when implementing the nullable
field + RunPython backfill, or alternatively write a single migration that uses
RunPython before altering the field to set existing rows to "IMMEDIATE".

In `@backend/notification_v2/models.py`:
- Around line 58-65: The boolean field notify_on_failures on the model cannot
represent the three states required (ALL / FAILURES_ONLY / SUCCESS_ONLY); change
this to a tri-state enum field (e.g., a CharField or IntegerField with explicit
choices like NOTIFY_ALL, NOTIFY_FAILURES_ONLY, NOTIFY_SUCCESS_ONLY) and rename
the DB column/field to something clearer if helpful (e.g., notify_mode or
notify_condition) so intent is explicit; update the corresponding serializer(s)
and any filters/UI code that read/write notify_on_failures to accept and
validate the new enum values and migrate existing boolean data to the new enum
values in a migration.

In `@backend/notification_v2/provider/webhook/api_webhook.py`:
- Line 15: format_payload() is unconditionally wrapping self.payload which
causes double-enveloping on already-enveloped payloads or repeated send() calls;
change format_payload (and any callers like the constructor assignment
self.payload = self.format_payload() and the send() path at the 24-30 block) to
first detect whether the payload is already in the expected envelope shape
(e.g., check for the envelope root key/structure) and return it unchanged if so,
otherwise wrap it; ensure the envelope-detection logic is deterministic and
idempotent so multiple calls to format_payload/send() do not alter an
already-correctly-enveloped payload.

In `@backend/pipeline_v2/notification.py`:
- Around line 14-15: Remove ExecutionStatus.STOPPED from the set treated as
failures: update the _FAILURE_STATUSES definition to exclude
ExecutionStatus.STOPPED and similarly remove/adjust any other checks that
include ExecutionStatus.STOPPED (the second occurrence around the block handling
audience selection at lines 56-60) so that STOPPED executions are no longer
routed to failure-only subscriptions; ensure only true failure statuses (e.g.,
ExecutionStatus.ERROR) remain in _FAILURE_STATUSES and that any conditional
logic using that set (in notification audience selection) treats STOPPED as
non-failure/catch-all.

In `@backend/workflow_manager/workflow_v2/models/execution.py`:
- Around line 440-441: The current mapping sets successful = e.successful_files
or 0 and failed = e.failed_files or 0 which coerces NULL/None to 0 and can hide
unknown historical counts; change these assignments to preserve NULL/None (e.g.,
successful = e.successful_files if e.successful_files is not None else None, and
likewise for failed) and update any downstream status logic that treats 0 as "no
failures" to explicitly handle None as "unknown" (so PARTIAL_SUCCESS isn't
lost). Ensure references to successful and failed in the execution status
computation explicitly check for None versus integer values.

In
`@frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx`:
- Line 15: The component currently sends a boolean notify_on_failures which
can't represent SUCCESS_ONLY—replace this with a notify_on enum carrying one of
"ALL", "FAILURES_ONLY", or "SUCCESS_ONLY": update the component state/props
default (in CreateNotification.jsx) to hold notify_on instead of
notify_on_failures, map the form inputs (checkboxes/radio/select) to produce the
correct enum value, and change all payload constructions and update/create API
calls (the places around the existing notify_on_failures usage and the other
block referenced later in the file) to include notify_on with the proper enum
string; also adjust any validation/serialization logic that reads
notify_on_failures to use notify_on.

In `@frontend/src/components/settings/platform/PlatformSettings.jsx`:
- Around line 61-77: The effect in useEffect that fetches org-scoped batch
interval should guard on sessionDetails?.orgId and include proper deps and
cancellation: at the top of the effect return early if !sessionDetails?.orgId,
add sessionDetails?.orgId and axiosPrivate to the dependency array, and
implement request cancellation (e.g., AbortController or axios cancel token) so
in-flight responses don't call setBatchIntervalMinutes after unmount or when
orgId changes; update references to axiosPrivate and setBatchIntervalMinutes
accordingly.

In `@workers/shared/patterns/notification/helper.py`:
- Around line 77-84: The except block in _enqueue_to_buffer() is swallowing
enqueue failures (logging then returning False) which causes
_route_notification() to treat BATCHED delivery as successful; instead,
propagate the failure so a retrying caller can act (or implement local
retry/backoff). Replace the logger.error+return False with logger.exception(...)
to include stack context and then re-raise the exception (or raise a specific
EnqueueError) so _route_notification() sees the failure; make the same change
for the other similar block referenced (lines ~104-106) to avoid silent drops.

---

Outside diff comments:
In `@backend/workflow_manager/internal_serializers.py`:
- Around line 176-184: The serializer currently validates total_files,
successful_files and failed_files independently; add a validate(self, data)
method in the same serializer (where status, error_message, total_files,
successful_files, failed_files, attempts, execution_time are defined) that, when
total_files is provided, enforces that successful_files and failed_files are
each <= total_files (if present) and that (successful_files + failed_files) <=
total_files; also handle the case where only one of
successful_files/failed_files is present by ensuring it does not exceed
total_files, and raise serializers.ValidationError with a clear message on
violation so invalid aggregates like total_files=1, successful_files=2 are
rejected before persisting.

---

Nitpick comments:
In `@backend/notification_v2/tasks.py`:
- Around line 46-50: The logger.warning call currently uses two implicitly
concatenated string literals; replace them with a single combined format string
in the logger.warning invocation so the message is explicit and readable (keep
the format placeholders and the same arguments: updated and exc), e.g., a single
string like "metric=notification_batch_dispatched_total result=dead_letter
rows=%d exc=%r" passed to logger.warning with updated and exc.

In `@backend/notification_v2/views.py`:
- Around line 56-68: The class attribute permission_classes on
NotificationSettingsView is currently a list; change it to an immutable tuple to
follow best practices by replacing the mutable list [IsAuthenticated,
IsOrganizationAdmin] with a tuple (IsAuthenticated, IsOrganizationAdmin) so
permission_classes is not modifiable at runtime and matches other DRF
class-attribute patterns.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d8dfdd8e-ab77-48bb-a35e-1d2d9ef6fd9a

📥 Commits

Reviewing files that changed from the base of the PR and between 9678d78 and 33d77e9.

📒 Files selected for processing (38)
  • backend/api_v2/notification.py
  • backend/backend/settings/base.py
  • backend/configuration/enums.py
  • backend/notification_v2/clubbed_renderer.py
  • backend/notification_v2/enums.py
  • backend/notification_v2/helper.py
  • backend/notification_v2/internal_api_views.py
  • backend/notification_v2/internal_serializers.py
  • backend/notification_v2/internal_urls.py
  • backend/notification_v2/migrations/0002_notification_notify_on_failures.py
  • backend/notification_v2/migrations/0003_add_notification_buffer.py
  • backend/notification_v2/models.py
  • backend/notification_v2/provider/webhook/api_webhook.py
  • backend/notification_v2/provider/webhook/slack_webhook.py
  • backend/notification_v2/provider/webhook/webhook.py
  • backend/notification_v2/serializers.py
  • backend/notification_v2/tasks.py
  • backend/notification_v2/urls.py
  • backend/notification_v2/views.py
  • backend/pipeline_v2/dto.py
  • backend/pipeline_v2/notification.py
  • backend/workflow_manager/internal_serializers.py
  • backend/workflow_manager/internal_views.py
  • backend/workflow_manager/workflow_v2/migrations/0020_workflowexecution_file_counts.py
  • backend/workflow_manager/workflow_v2/models/execution.py
  • frontend/src/components/pipelines-or-deployments/notification-modal/CreateNotification.jsx
  • frontend/src/components/settings/platform/PlatformSettings.jsx
  • unstract/core/src/unstract/core/data_models.py
  • workers/callback/tasks.py
  • workers/log_consumer/process_notification_buffer.py
  • workers/log_consumer/scheduler.sh
  • workers/notification/providers/_clubbed_format.py
  • workers/notification/providers/api_webhook.py
  • workers/notification/providers/slack_webhook.py
  • workers/scheduler/tasks.py
  • workers/shared/api/internal_client.py
  • workers/shared/clients/execution_client.py
  • workers/shared/patterns/notification/helper.py

Comment thread backend/notification_v2/clubbed_renderer.py Outdated
Comment thread backend/notification_v2/helper.py Outdated
Comment thread backend/notification_v2/internal_api_views.py Outdated
Comment thread backend/notification_v2/migrations/0002_notification_notify_on_failures.py Outdated
Comment thread backend/notification_v2/migrations/0002_notification_batching.py
Comment thread backend/pipeline_v2/notification.py Outdated
Comment thread backend/workflow_manager/workflow_v2/models/execution.py
Comment thread frontend/src/components/settings/platform/PlatformSettings.jsx Outdated
Comment thread workers/shared/patterns/notification/helper.py Outdated
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 13, 2026

Greptile Summary

This PR introduces failure-only webhook subscriptions (notify_on_failures) and a batched dispatch pipeline: execution events are buffered in NotificationBuffer, grouped by (org, webhook_url, auth_sig, platform), and flushed in a configurable window (default 5 min) rather than fired per-run. A new Platform Settings panel lets org admins set the grouping window (1–120 min).

  • Backend: enqueue_notification_buffer writes buffer rows; process_notification_buffer drives a SELECT … FOR UPDATE SKIP LOCKED flush, SENDING-reaper, and two-sweep GC. Duplicate-dispatch is prevented by the on_commit publish order and the notification__is_active filter on the lock query.
  • Worker / shared core: NotificationPayload now carries per-run file counts (total_files, successful_files, failed_files); workers/notification providers render via the shared notification_clubbed_renderer; scheduler.sh runs the buffer-flush script on its own independent cadence.
  • Frontend: redesigned Platform Settings page with an "Internal API Keys" section and a new "Notifications" section; CreateNotification modal adds a notify_on_failures checkbox.

Confidence Score: 4/5

Safe to merge with awareness of one transient notification gap for in-flight executions at deploy time.

The flush engine, duplicate-dispatch prevention, is_active filter, and retry-budget refund all look correct. The one concern worth tracking: pre-migration WorkflowExecution rows have null failed_files, so is_failure_run silently returns False for COMPLETED runs with partial file failures until the worker has written updated counts. Runs that were in-flight at deploy time may not trigger failure-only notifications for that window.

backend/workflow_manager/workflow_v2/models/execution.py — the null coalescing of successful_files/failed_files affects is_failure_run for pre-migration rows.

Important Files Changed

Filename Overview
backend/notification_v2/internal_api_views.py Core flush engine: GROUP BY → SKIP LOCKED dispatch, SENDING reaper, GC. The is_active guard, max() retry budget, atomic PENDING→SENDING claim, and on_commit publish order all look correct.
backend/notification_v2/helper.py Buffer enqueue path: computes auth_sig, flush_after, stamps timestamp. Raises ValueError for missing org (correct). Broad except in dispatch_notifications logs and continues — acceptable for failure-alert path.
backend/notification_v2/models.py Adds notify_on_failures BooleanField to Notification and new NotificationBuffer model with composite partial index on PENDING rows. Cascade delete on notification FK is intentional and documented.
backend/notification_v2/migrations/0002_notification_batching.py Combined migration adds notify_on_failures (default False, safe for existing rows) and creates NotificationBuffer with partial covering index.
backend/notification_v2/tasks.py Celery callbacks mark_buffer_dispatched / mark_buffer_dead_letter only flip SENDING rows, preventing stale callbacks from clobbering reaper-reclaimed rows.
unstract/core/src/unstract/core/notification_clubbed_renderer.py Canonical envelope + Slack renderer. _humanize_timestamp uses dt.day instead of %-d (portability fix). LEGACY_FLAT_KEYS backward-compat shim for single-event batches is well-documented.
workers/shared/patterns/notification/helper.py Replaced direct Celery send_task with backend enqueue HTTP call. execution_id is now forwarded for failure-filter correctness.
workers/log_consumer/scheduler.sh Scheduler now runs two independent tasks at their own cadences. Dropped -e from set flags intentionally so one task failure doesn't abort the loop.
backend/workflow_manager/workflow_v2/models/execution.py Adds successful_files / failed_files nullable fields. list_executions now reads denormalized counts instead of N+1 queries. Pre-migration rows return 0/0 (null coalesced) — documented trade-off with notification filter implications.
backend/backend/settings/base.py Adds four new notification settings constants. NOTIFICATION_BUFFER_RETENTION_DAYS comment is stale after the inactive-PENDING GC sweep was added.
frontend/src/components/settings/platform/PlatformSettings.jsx Adds Notifications section with org-scoped batch interval control. Session guard on useEffect. InputNumber bounds (1–120 min) match serializer constraints.
workers/notification/tasks.py Adds raise_on_final_failure flag so the clubbed dispatch's link_error callback fires on retry exhaustion, enabling dead-letter rows.
backend/notification_v2/views.py New NotificationSettingsView GET/PATCH for org-scoped club_interval_seconds. Protected by IsOrganizationAdmin. Configuration.get_value_by_organization always returns typed int — int() cast is safe.

Sequence Diagram

sequenceDiagram
    participant W as Worker (callback/scheduler)
    participant B as Backend (enqueue endpoint)
    participant DB as NotificationBuffer (DB)
    participant S as scheduler.sh
    participant F as Flush endpoint
    participant C as Celery (send_webhook_notification)
    participant WH as Webhook receiver

    W->>B: POST /v1/webhook/buffer/enqueue/
    B->>DB: "INSERT NotificationBuffer status=PENDING"
    B-->>W: 201 buffer_row_id

    S->>F: POST /v1/webhook/buffer/process/
    F->>DB: "GROUP BY org+url+auth_sig+platform WHERE MIN(flush_after)<=now"
    loop For each due group
        F->>DB: "SELECT FOR UPDATE SKIP LOCKED is_active=True"
        F->>DB: "UPDATE status=SENDING dispatch_attempts+=1"
        F-->>C: "on_commit send_task link=mark_dispatched link_error=mark_dead_letter"
    end
    F->>DB: Reclaim stale SENDING to PENDING
    F->>DB: GC terminal rows + inactive PENDING past retention

    C->>WH: POST clubbed envelope summary+events
    WH-->>C: 200 OK
    C->>DB: mark_buffer_dispatched DISPATCHED

    alt Retry exhaustion
        C->>DB: mark_buffer_dead_letter DEAD_LETTER
    end
Loading

Fix All in Claude Code

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
backend/backend/settings/base.py:228-230
The inline comment says "PENDING rows are never GC'd regardless of age," but `_gc_terminal_rows()` in `internal_api_views.py` now also deletes PENDING rows whose source notification is inactive and whose `flush_after` is past the retention cutoff. The comment should be updated to match the actual behaviour.

```suggestion
# Retention window for NotificationBuffer GC. Two sweeps run on each flush tick:
# 1. Terminal rows (DISPATCHED / DEAD_LETTER) older than this window.
# 2. PENDING rows whose source notification is inactive and whose flush_after
#    is past this window (see _gc_terminal_rows in internal_api_views.py).
#    PENDING rows for *active* notifications are never touched.
NOTIFICATION_BUFFER_RETENTION_DAYS = int(
```

### Issue 2 of 2
backend/workflow_manager/workflow_v2/models/execution.py:443-447
**Historical executions show 0/0 file counts — affects failure filter for pre-migration rows**

For any `WorkflowExecution` row created before this migration, `successful_files` and `failed_files` are `null`. `e.successful_files or 0` coalesces to `0`, so `is_failure_run(status, 0)` returns `False` for COMPLETED runs where files actually failed. This means failure-only subscribers won't receive notifications for partial failures on historical re-triggers until the worker has written the new column values at least once for that execution.

The list view cosmetic impact (showing 0/0 for old rows) is a known trade-off per the migration comment, but the silent miss on `notify_on_failures` filtering is worth calling out so operators know to expect a brief gap for runs that were already in-flight at deploy time.

Reviews (26): Last reviewed commit: "UN-3056 [FIX] Single-event webhook compa..." | Re-trigger Greptile

Comment thread backend/notification_v2/migrations/0002_notification_batching.py
Comment thread backend/notification_v2/internal_api_views.py Outdated
Comment thread backend/api_v2/notification.py Outdated
Comment thread frontend/src/components/settings/platform/PlatformSettings.jsx Outdated
@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor Author

kirtimanmishrazipstack commented May 13, 2026

Review triage summary

Thanks to @coderabbitai and @greptile-apps for the thorough pass. Status of each item:

Fixed in this branch

  • CR clubbed_renderer.py ×x (RUF002)
  • CR auth_sig JSON-encoded before SHA-256 (no delimiter-collision)
  • CR + Greptile P1: platform added to flush grouping and folded into the original idx_notif_buffer_pending partial index in migration 0003 (in lockstep with Meta.indexes)
  • CR + Greptile P2: PlatformSettings.jsx useEffect guards on orgId + adds it to deps
  • CR: _enqueue_to_buffer now logger.exception/raises; caller catches per-row so sibling notifications still iterate
  • Greptile P1 duplicate-delivery race: dispatch order in _dispatch_group reversed — UPDATE first inside transaction.atomic(), broker send_task runs via transaction.on_commit(...); broker failure post-commit reverts rows to PENDING for next-tick retry
  • Greptile P2: _FAILURE_STATUSES consolidated into a single notification_v2.enums.FAILURE_STATUSES
  • CR outside-diff: WorkflowExecutionStatusUpdateSerializer.validate(...) added for cross-field aggregate checks
  • CR nitpick (tasks.py): implicit string concat merged

Stale / wont fix (replies posted on each thread with reasoning)\n- Tri-state notify_on (CR migration 0002 / model / CreateNotification.jsx) — PR scope cut to boolean notify_on_failures; PR description updated to match\n- BATCHED-as-default migration backfill (CR + Greptile P1) — deliberate v2.7 design choice; operator note added to PR description\n- STOPPED as failure (CR) — db_comment in migration 0002 is the spec\n- APIWebhook double-wrap (CR) — fresh instance per dispatch in current dispatcher loop; not reachable today\n- Historical NULL-aggregate PARTIAL_SUCCESS in last_run_statuses tooltip (CR) — display-only path outside UN-3056 scope; deferred\n\n### Nitpick — not adopting\n- permission_classes as tuple (CR views.py:56-68): project convention is list across every DRF view (verified across 10+ files including pipeline_v2/execution_view.py, dashboard_metrics/views.py, platform_api/views.py). Keeping consistent with house style.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 13, 2026

Tip

For best results, initiate chat on the files or code changes.

@kirtimanmishrazipstack Thanks for the thorough triage — really appreciate the clear breakdown.

All the fixes look solid:

  • The transaction.on_commit(...) broker dispatch ordering is the right approach for the duplicate-delivery race.
  • _FAILURE_STATUSES consolidation into notification_v2.enums is a clean improvement.
  • The WorkflowExecutionStatusUpdateSerializer.validate(...) cross-field check is a good catch to have in place.

The reasoning on the won't-fix items is sound — the db_comment as spec for STOPPED, the deliberate BATCHED-as-default design decision, and the deferred display-only PARTIAL_SUCCESS path are all reasonable calls for this PR's scope.

Noted on the permission_classes convention — keeping it as a list is the right call for project consistency.


📝 Learnings were identified and not saved (knowledge base disabled). Enable

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 13, 2026

Thanks for the thorough triage write-up. All of my P1 and P2 items are confirmed addressed:

  • Duplicate-delivery raceUPDATE inside transaction.atomic() + on_commit broker dispatch with PENDING revert on post-commit broker failure. Correct pattern.
  • platform flush grouping → migration 0004 rebuilds the partial index in lockstep.
  • FAILURE_STATUSES consolidation → single source of truth in notification_v2.enums.
  • _enqueue_to_buffer error handlinglogger.exception/raise with per-row catch in the caller so siblings still iterate.
  • useEffect orgId guard → dependency array fixed.

The "wont fix" reasoning all holds up:

  • Boolean notify_on_failures scope cut is reasonable and the PR description is updated to match.
  • BATCHED-as-default is a deliberate v2.7 design choice with operator notes documented.
  • STOPPED as failure is spec'd in the migration db_comment.
  • APIWebhook double-wrap is not reachable with the current dispatcher loop structure.
  • permission_classes as list — consistent with the project's house style across DRF views.

LGTM from my side. The implementation is solid and the edge cases I flagged are all properly handled.

Tip: You can customize Greptile's behavior for this repo with .greptile/rules.md and .greptile/config.json.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 13, 2026

Tip

For best results, initiate chat on the files or code changes.

@kirtimanmishrazipstack Thanks for the detailed triage — really well organized. Everything looks good:

Fixes confirmed:

  • transaction.on_commit(...) for broker dispatch is exactly the right pattern for the duplicate-delivery race.
  • _FAILURE_STATUSES consolidation into notification_v2.enums is a clean improvement for future consumers.
  • The WorkflowExecutionStatusUpdateSerializer.validate(...) cross-field check is a solid guard to have.
  • Per-row exception handling in _enqueue_to_buffer preserving sibling iteration is the right resilience model.

Won't-fix reasoning is sound:

  • Boolean notify_on_failures is a cleaner scope for this PR; the tri-state can come in a follow-up if needed.
  • BATCHED-as-default as a deliberate v2.7 design choice with an operator note in the PR description is the right way to handle that.
  • db_comment as the spec for STOPPED-as-failure is clear and self-documenting.
  • The PARTIAL_SUCCESS display path being deferred is a reasonable call — it's isolated and not on the critical notification path.

permission_classes convention noted — keeping list for consistency with the rest of the project is the right call.


📝 Learnings were identified and not saved (knowledge base disabled). Enable

Comment thread backend/backend/settings/base.py
Comment thread backend/notification_v2/migrations/0002_notification_notify_on_failures.py Outdated
Comment thread backend/notification_v2/enums.py Outdated
Comment thread backend/notification_v2/serializers.py Outdated
@johnyrahul
Copy link
Copy Markdown
Contributor

🔴 The dead-letter path never fires — failed webhook deliveries are silently lost

While tracing the dispatch/flush concurrency I hit a correctness gap in the dead-letter wiring that I don't think is covered by the existing review threads. Flagging it because it defeats the reliability guarantee the buffer design is built around.

Symptom: if a subscriber's webhook endpoint is down (or returns 4xx/5xx) for its whole retry budget, the run produces zero notifications, zero DEAD_LETTER rows, and zero error signal — and the buffer rows are eventually GC'd as if they'd been delivered.

Root cause — the link_error contract doesn't match the worker task's behavior:

  • _send_clubbed wires mark_buffer_dead_letter as a Celery link_error on send_webhook_notification:
    # backend/notification_v2/internal_api_views.py:433
    link_error=celery_app.signature(
        "notification_v2.mark_buffer_dead_letter",
        kwargs={"buffer_row_ids": buffer_ids},
    ),
  • But send_webhook_notification returns None on every terminal-failure branch instead of raising:
    # workers/notification/tasks.py:259 / :262 / :278 / :281
    return None  # Final failure - matches original behavior
    Celery only invokes link_error when a task ends in the FAILURE state (i.e. it raised). A task that swallows the error and returns None ends in SUCCESS, so mark_buffer_dead_letter is never called.

Consequence chain:

  1. _dispatch_group has already flipped the rows to DISPATCHED (internal_api_views.py:514-517) before the on_commit publish.
  2. Delivery fails through all retries → task returns Nonelink_error never fires → no row ever reaches DEAD_LETTER.
  3. The rows stay DISPATCHED forever (the flush query filters status=PENDING, so they're never revisited).
  4. _gc_terminal_rows (internal_api_views.py:393-396) deletes aged DISPATCHED rows — i.e. failed deliveries are cleaned up exactly as if they had succeeded.

For a failure-alerting feature, this means the failure alert itself can be lost with no trace.

Suggested fix: make the retry-exhaustion branches of send_webhook_notification re-raise so the task ends FAILURE and link_error runs:

else:
    logger.error("Failed to send webhook to %s after %s attempts: %s", url, max_retries, e)
    raise  # let link_error → mark_buffer_dead_letter run

If the legacy "return None" contract must be preserved for other callers, gate the re-raise behind a flag passed from _send_clubbed (e.g. dead_letter_on_failure=True). Worth adding a test that asserts rows reach DEAD_LETTER on delivery exhaustion.


Related, lower-severity hardening (same end-state, different trigger): transaction.on_commit(_send_clubbed) is not crash-durable. If the backend process dies in the window between the transaction committing (rows now DISPATCHED) and the on_commit lambda executing, the lambda is lost — no broker publish, no revert — and the rows are stranded in DISPATCHED with nothing to retry them. Same outcome as above: a row that says "dispatched" but was never delivered.

Both stem from the same root: DISPATCHED currently means "claimed for sending," but is treated as "delivered." Beyond the link_error fix, a periodic sweep that reclaims rows stuck in DISPATCHED with no terminal outcome past a timeout would close the crash-window case too.

Note: the broker-publish failure path inside _send_clubbed itself (RabbitMQ unreachable → except reverts rows to PENDING, retried next tick) is handled correctly — this issue is specifically about delivery failure after a successful publish.

@johnyrahul
Copy link
Copy Markdown
Contributor

🔴 The dead-letter path never fires — failed webhook deliveries are silently lost

While tracing the dispatch/flush concurrency I hit a correctness gap in the dead-letter wiring that I don't think is covered by the existing review threads. Flagging it because it defeats the reliability guarantee the buffer design is built around.

Symptom: if a subscriber's webhook endpoint is down (or returns 4xx/5xx) for its whole retry budget, the run produces zero notifications, zero DEAD_LETTER rows, and zero error signal — and the buffer rows are eventually GC'd as if they'd been delivered.

Root cause — the link_error contract doesn't match the worker task's behavior:

* `_send_clubbed` wires `mark_buffer_dead_letter` as a Celery `link_error` on `send_webhook_notification`:
  ```python
  # backend/notification_v2/internal_api_views.py:433
  link_error=celery_app.signature(
      "notification_v2.mark_buffer_dead_letter",
      kwargs={"buffer_row_ids": buffer_ids},
  ),
  ```

* But `send_webhook_notification` **returns `None`** on every terminal-failure branch instead of raising:
  ```python
  # workers/notification/tasks.py:259 / :262 / :278 / :281
  return None  # Final failure - matches original behavior
  ```
  
  
      
        
      
  
        
      
  
      
    
  Celery only invokes `link_error` when a task ends in the **FAILURE** state (i.e. it raised). A task that swallows the error and returns `None` ends in **SUCCESS**, so `mark_buffer_dead_letter` is never called.

Consequence chain:

1. `_dispatch_group` has already flipped the rows to `DISPATCHED` (`internal_api_views.py:514-517`) before the `on_commit` publish.

2. Delivery fails through all retries → task returns `None` → `link_error` never fires → no row ever reaches `DEAD_LETTER`.

3. The rows stay `DISPATCHED` forever (the flush query filters `status=PENDING`, so they're never revisited).

4. `_gc_terminal_rows` (`internal_api_views.py:393-396`) deletes aged `DISPATCHED` rows — i.e. failed deliveries are cleaned up exactly as if they had succeeded.

For a failure-alerting feature, this means the failure alert itself can be lost with no trace.

Suggested fix: make the retry-exhaustion branches of send_webhook_notification re-raise so the task ends FAILURE and link_error runs:

else:
    logger.error("Failed to send webhook to %s after %s attempts: %s", url, max_retries, e)
    raise  # let link_error → mark_buffer_dead_letter run

If the legacy "return None" contract must be preserved for other callers, gate the re-raise behind a flag passed from _send_clubbed (e.g. dead_letter_on_failure=True). Worth adding a test that asserts rows reach DEAD_LETTER on delivery exhaustion.

Related, lower-severity hardening (same end-state, different trigger): transaction.on_commit(_send_clubbed) is not crash-durable. If the backend process dies in the window between the transaction committing (rows now DISPATCHED) and the on_commit lambda executing, the lambda is lost — no broker publish, no revert — and the rows are stranded in DISPATCHED with nothing to retry them. Same outcome as above: a row that says "dispatched" but was never delivered.

Both stem from the same root: DISPATCHED currently means "claimed for sending," but is treated as "delivered." Beyond the link_error fix, a periodic sweep that reclaims rows stuck in DISPATCHED with no terminal outcome past a timeout would close the crash-window case too.

Note: the broker-publish failure path inside _send_clubbed itself (RabbitMQ unreachable → except reverts rows to PENDING, retried next tick) is handled correctly — this issue is specifically about delivery failure after a successful publish.

@kirtimanmishrazipstack Please check this scenrio , by configuring a webhook which is not working

kirtimanmishrazipstack and others added 2 commits May 30, 2026 12:36
Address human review feedback on the batched-notification PR:

- Fix dead-letter path (Johny): send_webhook_notification now re-raises on
  retry exhaustion via raise_on_final_failure so the Celery link_error fires
  and buffer rows reach DEAD_LETTER instead of being silently lost.
- Add crash-window reaper: new BufferStatus.SENDING claim state with success
  (mark_buffer_dispatched) / failure callbacks; _reclaim_stale_sending returns
  rows stuck past NOTIFICATION_DISPATCH_LEASE_SECONDS to PENDING.
- Move FAILURE_STATUSES onto ExecutionStatus (failure_statuses/is_failure);
  drop the duplicated frozenset and update call sites (Chandru).
- Remove dead delivery_mode column + DeliveryMode enum (product is
  batched-only); rename dispatch_with_delivery_mode -> dispatch_notifications.
- Squash notification_v2 migrations 0002+0003 into 0002_notification_batching.
- Scrub JIRA/mfbt references from docstrings; clarify NOTIFICATION_CLUB_INTERVAL
  is a per-org-overridable default.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor Author

@johnyrahul good catch on the dead-letter path — fixed in c2dd5ee03.

Root cause confirmed exactly as you described: send_webhook_notification returned None on retry exhaustion, so the task ended SUCCESS and link_error → mark_buffer_dead_letter never fired; rows stayed in the (then) terminal dispatched state and got GC'd as if delivered.

Fix:

  1. send_webhook_notification now takes raise_on_final_failure and re-raises on exhaustion; _send_clubbed passes it True, so the buffered path ends FAILURE → link_error fires → rows reach DEAD_LETTER.
  2. Also closed the on_commit crash-window you flagged: added a SENDING claim state. _dispatch_group marks rows SENDING (not "dispatched"), the success callback (mark_buffer_dispatched) moves them to DISPATCHED, the error callback to DEAD_LETTER, and a new _reclaim_stale_sending reaper returns rows stuck in SENDING past NOTIFICATION_DISPATCH_LEASE_SECONDS (default 900s) back to PENDING. So a backend crash between commit and publish no longer strands rows — they retry on a later flush. DISPATCHED now genuinely means "delivered".

Could you re-run your broken-webhook scenario (point a notification at a dead URL) and confirm the rows land in DEAD_LETTER after the retry budget? That's the one path I'd like a live confirmation on.

@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor Author

🔎 Comprehensive review — Failure-only notifications + batched dispatch

Reviewed the full feature surface (38 files: notification_v2, workflow_v2, pipeline_v2, api_v2, unstract/core, workers/, frontend Platform Settings) across five lenses: general correctness, error-handling/silent-failures, type design, comments, and test coverage. The design is genuinely strong — the on-commit dispatch ordering, SELECT … FOR UPDATE SKIP LOCKED group flush, SENDING-guarded terminal callbacks, the partial PENDING index, and the single shared renderer are all the right calls. Findings below are filtered for signal (two agent "criticals" were downgraded after verifying the code — see notes).

✅ Strengths worth keeping

  • _dispatch_group claims rows SENDING inside the txn and publishes via transaction.on_commit, so a rolled-back claim never orphans a broker message (no DB-vs-broker duplicate send).
  • mark_buffer_dispatched / mark_buffer_dead_letter both gate their UPDATE on status=SENDING, so a reaper-reclaimed row can't be clobbered by a stale callback.
  • Org is a mandatory grouping key everywhere; auth_sig JSON-encodes the tuple before hashing to avoid delimiter collisions across tenants.
  • WorkflowExecutionStatusUpdateSerializer.validate rejects impossible aggregates (successful + failed > total, etc.).
  • Thorough db_comments and the shared unstract.core.notification_clubbed_renderer give byte-identical receiver payloads on both dispatch sides.

🟠 Important

1. The failure-filter rule has drifted across its 3 sites.

  • backend/api_v2/notification.py:28-30is_failure(status) or failed_files > 0
  • backend/notification_v2/internal_api_views.py _apply_failure_filter → same two terms
  • backend/pipeline_v2/notification.py:53-56 → adds a third term or pipeline.last_run_status == FAILURE

The worker callback path for pipelines uses _apply_failure_filter (two terms), while the backend PipelineNotification.send path uses three. A pipeline run with non-failure status/failed_files==0 but last_run_status==FAILURE would notify failure-only subscribers on one path and not the other. The _apply_failure_filter docstring even claims parity with the pipeline site, which isn't true. Please confirm whether the third term is intentional and, either way, mirror the rule (or extract one shared helper) and fix the docstring. This is exactly the "notifications fire from worker callback and backend; mirror both paths" hazard.

2. The backend webhook-provider cluster is dead code — and this PR modified it.
backend/notification_v2/provider/webhook/{api,slack}_webhook.py gained envelope-wrapping + "IMMEDIATE" docstrings, but get_notification_provider, NotificationHelper, and provider/registry.py have zero callers in backend/ (verified). Every live backend dispatch now goes dispatch_notifications → enqueue → NotificationBuffer → flush → worker. Recommend deleting the provider/webhook/ + registry.py cluster (or confirming it's retained for a planned non-buffered path). Either way the "IMMEDIATE" comments are stale — the platform is BATCHED-only.

3. Buffer redelivery is unbounded if a terminal callback is ever lost.
(Downgraded from a reported "Critical": the link/link_error signatures have no explicit queue=, so they route to the default celery queue — which is a backend-consumed queue per celery_config._BACKEND_QUEUES. So the common path works.) The residual risk: if a terminal callback is genuinely lost (worker for celery down, broker hiccup), _reclaim_stale_sending recycles SENDING → PENDING with no attempt counter, so the batch is re-dispatched every lease window forever — duplicate webhook spam, and a legitimately dead-lettered batch never reaches DEAD_LETTER. Suggest a reclaim counter that transitions to DEAD_LETTER after N reclaims, and an alert on sustained nonzero notification_buffer_reclaimed_total. Pinning the callback signatures to an explicit backend queue would also remove the implicit-routing assumption.

4. Dropped notifications aren't observable enough for a failure-alerting feature.
dispatch_notifications (helper.py:121-129) and _route_notification (workers/shared/patterns/notification/helper.py) swallow per-row failures into logger.exception/logger.warning with no metric= counter, unlike the success path (metric=notification_buffer_enqueued_total). A silently dropped failure alert is precisely what this feature exists to prevent. Add metric=notification_*_failed_total counters at those sites, and thread org_id/webhook_url_hash/buffer-id sample into the mark_buffer_dead_letter log so a delivered-never event is actionable.

5. process_notification_buffer.py can raise despite its "never raises" contract.
result = response.json() (line ~70) sits outside the try/except. A 200 with a non-JSON body (proxy/error page) raises JSONDecodeError, bypassing the clean return False the docstring promises. Wrap response.json() + the .gets and return False on decode error.


🟡 Suggestions

6. No tests for highly-testable logic. The PR adds zero tests; the pure functions in unstract/core/notification_clubbed_renderer.py need no DB/Django. Highest value (follow the existing unstract/core/tests/test_pubsub_helper.py unittest pattern):

  • compute_auth_sig — equality + the classic collision pair (key="a",header="b,c" vs key="a,b",header="c") → distinct sigs (multi-tenant safety).
  • Cross-path failure-filter truth table (ties into fix: Prompt studio index and run issues #1).
  • _is_effective_success / build_envelope summary counts (COMPLETED + failed_files>0 ⇒ counts as failed).
  • WorkflowExecutionStatusUpdateSerializer.validate five branches.
  • render_slack_text overflow/pluralization (1, 25, 26 events), _humanize_timestamp fallback, build_envelope 501-payload cap.

7. Single-source the batch cap. _PROCESS_BUFFER_CAP = 500 (internal_api_views) and MAX_BATCH_SIZE = 500 (renderer) are independent literals; the comment claims "lock-step" but nothing enforces it — raising the cap above MAX_BATCH_SIZE would silently mark un-rendered rows DISPATCHED. Have internal_api_views import MAX_BATCH_SIZE.

8. NotificationBuffer.status db_comment omits SENDING (models.py:176 + the duplicate in migration 0002). It reads PENDING -> DISPATCHED / DEAD_LETTER, hiding the real PENDING → SENDING → … machine the enum docstring documents correctly. Fix both (a db_comment change needs a migration).

9. Type encapsulation (optional, no behavior change):

  • Promote the {summary, events} envelope from dict[str, Any] to a frozen dataclass in unstract/core — it crosses the backend↔worker JSON boundary, has a real arithmetic invariant (total == succeeded + failed), and is read in 5+ places; replaces the fragile "events" not in payload heuristic.
  • Add BufferStatus.terminal() (mirroring the ExecutionStatus.failure_statuses() this PR already adds) and centralize the filter(status=SENDING) transition guard into one helper.
  • Derive NotificationSettingsSerializer bounds from ConfigKey.NOTIFICATION_CLUB_INTERVAL's ConfigSpec instead of repeating 60/7200; hoist the frontend 1/120 to one constant.

10. Stale comments: worker providers/{api,slack}_webhook.py docstrings attribute the flat-payload path to "worker-callback payloads", but callbacks now go through the backend buffer pre-rendered — the flat-wrap branch is only hit by the generic internal webhook-send endpoints. workers/shared/patterns/notification/helper.py:35-37 and internal_api_views.py:354 describe a "Type: — / Additional Data" KV layout the single-line renderer no longer produces.


Review assisted by Claude Code (multi-agent: code-review, silent-failure, type-design, comment-accuracy, test-coverage). Findings were verified against the code; the two highest-severity agent flags were downgraded after confirming queue routing and the lock-step cap behavior.

… hardening

Addresses the in-scope items from the self-review on PR #1936:

- Single-source the failure-only rule via notification_v2.helper.is_failure_run,
  used by api_v2 / pipeline_v2 / internal_api_views (_apply_failure_filter); the
  pipeline path keeps a documented last_run_status backstop. Fixes the false
  "parity" docstring (#1).
- Emit metric= counters at the notification drop sites (backend
  dispatch_notifications, worker _route_notification) and a row-id sample on the
  dead-letter log so a delivered-never event is observable (#4).
- process_notification_buffer.py honors its "never raises" contract: wrap
  response.json() so a non-JSON 200 returns False instead of raising (#5).
- Bind the flush cap to the renderer's MAX_BATCH_SIZE so rows and rendered
  events stay in lock-step by construction (#7).
- status db_comment now documents the PENDING -> SENDING -> DISPATCHED/DEAD_LETTER
  lifecycle in both the model and migration 0002 (#8).
- Scrub stale IMMEDIATE / worker-callback comments from the provider docstrings
  (#2, #10).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor Author

✅ Self-review dispositions — addressed in 8b05977a9

Worked through all 10 findings. Fixes landed in 8b05977a9; three items are deliberately deferred (rationale below).

# Finding Disposition
1 Failure-filter rule drift Fixed — extracted notification_v2.helper.is_failure_run; api_v2, pipeline_v2 and _apply_failure_filter all use it. The pipeline path keeps its last_run_status==FAILURE backstop (now commented as intentional, for the no-loadable-execution case), and the false "parity" docstring is corrected.
2 Dead provider/ cluster Fixed (comments only) — scrubbed the stale IMMEDIATE/envelope docstrings this PR added. Kept the files: they pre-exist on main and may be referenced by the cloud repo, so deletion is out of scope here.
3 Unbounded redelivery on lost callback Deferred — real gap, not yet implemented. The SENDING lease bounds the common path; the unbounded case only triggers on a genuinely lost terminal callback. Proper fix needs a reclaim_count column → DEAD_LETTER after N reclaims + an alert on sustained notification_buffer_reclaimed_total. Tracking as a follow-up to avoid schema churn in this PR.
4 Dropped notifications not observable Fixed — added metric=notification_enqueue_failed_total / notification_dropped_total at the backend (dispatch_notifications) and worker (_route_notification) drop sites, plus a row-id sample= on the dead-letter log.
5 process_notification_buffer.py can raise Fixedresponse.json() is now inside the try/except; a non-JSON 200 returns False, honoring the "never raises" contract.
6 No tests Deferred — per the standing "manual test only" decision for this PR. Pure-function targets (compute_auth_sig, renderer, validate) noted for a follow-up.
7 Batch cap not single-sourced Fixed_PROCESS_BUFFER_CAP = MAX_BATCH_SIZE (imported via clubbed_renderer), so rows and rendered events stay lock-step by construction.
8 status db_comment omits SENDING Fixed — documents the PENDING → SENDING → DISPATCHED/DEAD_LETTER lifecycle in both the model and migration 0002.
9 Type-design polish Deferred — optional, no behavior change; follow-up.
10 Stale comments Fixed — corrected the worker-callback / KV-layout / IMMEDIATE comments in the worker and backend provider docstrings.

Note for testing: the worker image needs a rebuild (worker containers don't hot-reload) before the manual run; the migration 0002 db_comment change applies cleanly (model + migration agree, no drift).

- NotificationBuffer.dispatch_attempts + NOTIFICATION_MAX_DISPATCH_ATTEMPTS:
  _dispatch_group dead-letters rows past the cap and increments on each SENDING
  claim, bounding the reaper reclaim loop so a lost terminal callback can't
  redeliver forever (self-review #3).
- Delete the orphaned synchronous notification_v2/provider/ cluster — zero
  callers after the batched dispatch_notifications path replaced it (#2).
- Fold dispatch_attempts into 0002_notification_batching; refresh lifecycle
  db_comments + BufferStatus docstring.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor Author

✅ Self-review resolution

Addressing my review above. All 5 Important findings are resolved; 3 of 5 suggestions done, 2 consciously deferred (both non-behavioral). Fixes landed in 8b05977a9 (in-scope self-review items) and 170ec43d8 (dead-code removal + redelivery cap).

🟠 Important — all resolved

# Finding Resolution
1 Failure-filter rule drift + false parity docstring 8b05977a9 — single-sourced as notification_v2.helper.is_failure_run, used by api_v2 / pipeline_v2 / _apply_failure_filter. The pipeline's third last_run_status term is kept as a documented backstop (only reached when the WorkflowExecution can't be loaded — legacy caller / missing execution_id), and the misleading "parity" docstring is corrected.
2 Dead backend webhook-provider cluster 170ec43d8 — deleted the entire notification_v2/provider/ package. Verified zero callers on-branch (the batched dispatch_notifications path replaced it) and zero references in cloud. Sending now lives solely in the worker's own providers/ registry.
3 Unbounded redelivery if a terminal callback is lost 170ec43d8 — added NotificationBuffer.dispatch_attempts + NOTIFICATION_MAX_DISPATCH_ATTEMPTS; _dispatch_group increments on each SENDING claim and dead-letters rows past the cap, so a lost callback can't redeliver forever. metric=notification_buffer_reclaimed_total is emitted for alerting. (The optional explicit-queue pinning on the link/link_error callbacks was intentionally skipped — celery is a backend-consumed queue per _BACKEND_QUEUES, so the attempt cap addresses the actual risk.)
4 Dropped notifications not observable 8b05977a9metric=…_failed_total counters at both drop sites (dispatch_notifications, worker _route_notification); the dead-letter log now carries a row-id sample= so a delivered-never event is traceable.
5 process_notification_buffer.py can raise despite "never raises" 8b05977a9response.json() is wrapped; a non-JSON 200 returns False instead of raising.

🟡 Suggestions

# Suggestion Status
7 Single-source the batch cap 8b05977a9_PROCESS_BUFFER_CAP = MAX_BATCH_SIZE.
8 status db_comment omits SENDING 8b05977a9 + 170ec43d8 — model + migration db_comment now document PENDING → SENDING → DISPATCHED / DEAD_LETTER (incl. the attempt-cap path).
10 Stale worker / renderer comments 8b05977a9 — provider docstrings and the KV-layout references scrubbed.
6 Unit tests for the pure functions ⏳ Deferred — not in this PR. Highest-value targets remain compute_auth_sig (incl. the collision pair), the renderer (build_envelope / render_slack_text overflow), and the serializer branches.
9 Type encapsulation (envelope dataclass, BufferStatus.terminal(), serializer bounds dedup) ⏳ Deferred — optional, no behavior change; left out to keep this PR's diff focused.

Net: every correctness / observability / resilience finding is in; the two deferred items are non-behavioral (test coverage + ergonomics).

Comment thread unstract/core/src/unstract/core/notification_clubbed_renderer.py Outdated
_humanize_timestamp used the `%-d` strftime directive, a glibc/Linux
extension that raises ValueError on macOS/Windows. The call sat outside
the fromisoformat try/except, so the raise propagated through
build_envelope -> render_clubbed_message and was swallowed by
process_notification_buffer's outer except, silently skipping every due
group on non-Linux dev/CI machines. Interpolate the day from dt.day
(plain int, no leading zero) instead so the render is platform-portable;
output is byte-identical to the old format.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
- internal_serializers.py: validate() now has a single terminal return
  (S3516); the total_files branch is if/else instead of an early return.
- internal_views.py update_status: guard-clause on invalid serializer +
  extract _truncate_error_message / _update_file_aggregates helpers to
  drop cognitive complexity below 15 (S3776). Behavior unchanged.
- PlatformSettings.jsx: extract InactivePlatformKeyTag sibling component so
  the key-row map callback drops below cognitive complexity 15 (S3776);
  keyboard activation (Enter/Space) behavior preserved.
- process_notification_buffer.py: logger.exception() in the HTTPError
  branch to capture the traceback (S8572).
- scheduler.sh: explicit return statements (S7682) — run_task returns the
  task exit code; cleanup returns 0 with the exit moved into the trap.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Frontend Lint Report (Biome)

All checks passed! No linting or formatting issues found.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented Jun 1, 2026

Unstract test results

Per-group results

Status Group Tier Passed Failed Errors Skipped Duration (s)
unit-connectors unit 64 12 0 3 16.8
unit-core unit 0 0 2 0 1.2
unit-platform-service unit 9 0 1 0 1.3
unit-prompt-service unit 15 0 0 0 18.7
unit-rig unit 53 0 0 0 3.3
unit-runner unit 11 0 0 0 3.4
unit-sdk1 unit 354 0 0 0 20.0
unit-tool-registry unit 0 0 1 0 1.4
unit-workers unit 0 0 0 0 17.6
TOTAL 506 12 4 3 83.6

Critical paths

⚠️ Critical paths not yet covered

  • auth-login — User can log in and obtain a session cookie. (entry: POST /api/v1/auth/login; declared coverage: no groups declared)
  • adapter-register-llm — Register and validate an LLM adapter. (entry: POST /api/v1/adapter/; declared coverage: no groups declared)
  • workflow-create-execute — Create a workflow, configure source+destination, execute, poll, fetch result. (entry: POST /api/v1/workflow/{id}/execute/; declared coverage: e2e-workflow)
  • api-deployment-run — Deploy a workflow as an API, POST a document, receive structured JSON. (entry: POST /deployment/api/{org}/{name}/; declared coverage: e2e-api-deployment)
  • prompt-studio-fetch-response — Prompt Studio: create project, add prompt, run single-pass, get response. (entry: POST /api/v1/prompt-studio/prompt-studio-tool/{id}/fetch_response/; declared coverage: e2e-prompt-studio)
  • pipeline-etl-execute — Run an ETL pipeline from source connector to destination. (entry: POST /api/v1/pipeline/{id}/execute/; declared coverage: no groups declared)
  • usage-token-tracking — Per-execution token usage is recorded and retrievable. (entry: GET /api/v1/usage/get_token_usage/; declared coverage: no groups declared)
  • workflow-execution-fan-out — Multi-file workflow execution fans out to file-processing workers and rejoins. (entry: internal: backend → rabbitmq → workers/file_processing; declared coverage: no groups declared)
  • callback-result-delivery — Async results are posted back via the callback worker. (entry: internal: workers/callback → backend /internal endpoints; declared coverage: no groups declared)
✅ Covered critical paths
  • tool-sandbox-exec — covered by unit-runner

Comment thread backend/notification_v2/internal_api_views.py
_dispatch_group increments dispatch_attempts atomically with the
PENDING -> SENDING claim. When _send_clubbed fails to publish to the
broker, the revert reset status/dispatched_at but left the increment in
place, so a clean broker outage (no task queued, no webhook sent) still
burned redelivery budget — N consecutive outages would dead-letter a
never-delivered row.

Decrement dispatch_attempts in the broker-failure revert so a publish
that never reached the broker doesn't consume the cap. Crash / lost-
callback paths never hit this except block, so they keep the increment
and remain bounded by the reaper, which is the redelivery risk the cap
exists for.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Comment thread backend/notification_v2/views.py
Comment thread workers/log_consumer/scheduler.sh Outdated
set -uo pipefail
# Note: pipefail without -e — one task's failure must not abort the loop.

INTERVAL="${LOG_HISTORY_CONSUMER_INTERVAL:-5}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flush poll cadence reuses LOG_HISTORY_CONSUMER_INTERVAL (fallback 5 here), but the Django setting of the same name defaults to 60 (backend/backend/settings/base.py). So the "5s tick is cheap" assumption in the comment block below only holds if nothing exports the var — in any deployment that sets it (e.g. 60 for the log-history consumer), the buffer flush silently polls every 60s instead, and one env knob now governs two unrelated tasks.

Suggest a dedicated var for the flush (e.g. NOTIFICATION_BUFFER_POLL_INTERVAL), decoupled from the log-history consumer, and reconcile the default with the comment. Functional impact is bounded (worst case a due group waits one extra tick), but the coupling + the now-misleading "5s" comment are a maintenance trap.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 600eade02. Added a dedicated NOTIFICATION_BUFFER_POLL_INTERVAL (default 10s) for the flush, decoupled from LOG_HISTORY_CONSUMER_INTERVAL. scheduler.sh now wakes at the min of the two cadences and fires each task on its own elapsed interval, so setting the log-history knob no longer changes the flush cadence. Rewrote the misleading "5s" comment (real dispatch cadence is still gated by NOTIFICATION_CLUB_INTERVAL). Kept the single loop + single trap — no subshell/signal rewrite — since the functional impact you noted is bounded.

return isinstance(failed, int) and failed > 0


def _is_effective_success(status: str | None, counts: dict[str, Any]) -> bool:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This re-implements the failure rule that notification_v2.helper.is_failure_run already owns (status ∈ {ERROR, STOPPED} OR failed_files > 0). The docstring even flags it as "mirrors the failure-filter contract" — but two copies of one rule across two packages will drift, and a drift here means the rendered summary counts disagree with the very reason the alert fired.

Since unstract.core is the lower layer, the canonical predicate should live here and the backend is_failure_run import it (not the reverse mirror). One definition, no drift — same class of issue as the hardcoded-list drift flagged on the group-sharing PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed — fixed in 600eade02. Since the canonical ExecutionStatus already lives in unstract.core.data_models, I added the canonical is_failure_run(status, failed_files) right beside it and made both consumers use it:

  • the renderer now derives succeeded/failed and the per-event emoji from not is_failure_run(...) (dropped the duplicated _SUCCESS_STATUSES / _is_success / _is_effective_success);
  • notification_v2.helper.is_failure_run delegates to the core function (all existing imports unchanged).

One definition, lower layer, no reverse mirror — routing filter and rendered summary can't drift. Verified summary counts identical to before across single/multi, COMPLETED/ERROR/partial-failure payloads.

@chandrasekharan-zipstack
Copy link
Copy Markdown
Contributor

⚠️ Webhook payload is a hard break for existing receivers — needs a rollout/compat plan

The clubbed renderer always emits the new {summary, events} envelope (even for a single event), with no version field and no compat shape — by design ("one envelope shape so receivers parse a single schema, not two"). That means every existing external webhook receiver breaks: they parse the old flat per-run body, get the new envelope, and silently fail or drop alerts. The "Relevant Docs" section is also empty, so today this ships as an undocumented contract change to a customer-facing surface.

Flagging two decisions to settle before merge:

1. Payload shape (flat{summary, events}) — options, cheapest-safe first:

  • Single-event superset (suggested): when a group has exactly one event, also emit the old flat top-level fields alongside events. Existing receivers keep working, new ones read events; trivial payload bloat, removable after a deprecation window. Self-healing, no customer action required.
  • Versioned envelope ("version": 2): future-proofs but doesn't save receivers that don't yet branch on it.
  • Opt-in clubbing (per-notification format flag, old shape default): cleanest rollout, more config surface.
  • Hard break + comms: acceptable only if we know who consumes these webhooks and warn them — and the docs/changelog land with the PR.

2. Alerts are no longer instant. Window min is 60s, so there's no instant path even for failures — a critical failure alert is delayed up to window + one poll tick. Is a 1-min+ floor acceptable for failure/paging use-cases? Worth considering a low-latency path (send failures immediately, club successes) so the failures-only feature and the clubbing feature don't work against each other — and surfacing the expected latency in the notification UI so users aren't surprised.

Neither is a code bug — both are product/rollout calls that need an owner. Happy to help implement whichever direction we pick (the single-event superset is a small change to build_envelope).

…ple flush cadence

Addresses review feedback on PR #1936:

- Single-source the failure rule: add canonical is_failure_run to
  unstract.core (beside ExecutionStatus). The clubbed renderer derives its
  summary counts / emoji from it (drops the duplicate _SUCCESS_STATUSES /
  _is_success / _is_effective_success), and notification_v2.helper.is_failure_run
  delegates to it. Routing filter and rendered outcome can no longer drift.

- Webhook backward compat: build_envelope spreads the legacy flat fields
  (type, pipeline_id, pipeline_name, status, execution_id?, error_message?)
  onto a single-event envelope alongside summary/events, so existing API
  webhook receivers parsing the pre-clubbing flat body keep working. Multi-event
  stays envelope-only; Slack path untouched.

- Decouple the buffer-flush poll cadence from the log-history consumer:
  dedicated NOTIFICATION_BUFFER_POLL_INTERVAL (default 10s); scheduler.sh wakes
  at the min of the two intervals and fires each task on its own elapsed
  interval. Removes the misleading "5s" comment and the shared-knob coupling.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@kirtimanmishrazipstack
Copy link
Copy Markdown
Contributor Author

Thanks @chandrasekharan-zipstack — addressed both:

1. Payload break → fixed in 600eade02 (single-event superset). Went with your cheapest-safe option. build_envelope now, when a batch holds exactly one event, also spreads the legacy flat fields onto the envelope alongside summary/events, reproducing main's pre-clubbing shape: {type, pipeline_id, pipeline_name, status, execution_id?, error_message?}. Existing API webhook receivers parsing the flat body keep working; new ones read events. Multi-event batches stay envelope-only (there was never a flat shape for them), and Slack is untouched (it only ever gets {text}). No key collision — summary/events are new. Self-healing, removable after a deprecation window. Documented in the design notes.

2. Latency floor → keeping batched-only (by design), not adding an instant path. Batched-only is a deliberate post-v2.8 decision (no IMMEDIATE path), so a low-latency carve-out would reverse a settled architecture rather than tune it. The window is per-org configurable down to 60s, so an org that wants fast failure paging can set the floor; failures and clubbing don't actually fight here because the failures-only filter runs at enqueue, independent of the window. I'd treat "surface expected latency in the notification UI" as a small, worthwhile follow-up rather than part of this PR — agree it's worth setting the expectation for users. Happy to file it.

Relevant docs gap noted — I'll make sure the changelog/customer-facing note lands with this (the compat superset means it's no longer a hard break, but the new events shape should still be documented).

The single-event legacy superset in build_envelope only reproduced the
backend dispatch DTO (PipelineStatusPayload.to_dict). The worker callback
path's pre-clubbing body (NotificationPayload.to_webhook_payload) also
emitted top-level `timestamp` and `additional_data`, so receivers reading
those against the old flat shape broke even on single-event sends. Add both
keys to _LEGACY_FLAT_KEYS; purely additive (the existing not-None guard keeps
backend-origin events from gaining an empty additional_data).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
@sonarqubecloud
Copy link
Copy Markdown

sonarqubecloud Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

@jaseemjaskp jaseemjaskp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Review Toolkit — consolidated findings

Ran Code Reviewer, Silent Failure Hunter, Type Design Analyzer, PR Test Analyzer, Comment Analyzer, and Code Simplifier over the diff (43 files, ~2.4k insertions). The architecture is sound — on-commit dispatch ordering, SKIP-LOCKED grouping, the dispatch-attempt cap with broker-failure refund, and the lease-based reaper are all correctly reasoned. Inline comments below carry the new, deduplicated findings (existing review threads were cross-checked and not reposted).

Two findings that can't be anchored to diff lines:

P0 — Existing test suite is broken by this PR. workers/tests/test_dispatch_sites_characterisation.py (lines 42-127, 267) imports/exercises send_notification_to_worker and get_webhook_headers in workers/shared/patterns/notification/helper.pyboth deleted by this PR. Those tests will fail at collection, and they are currently the only coverage of the worker dispatch path (now rewritten as _route_notification/_enqueue_to_buffer). They must be rewritten to characterise the new HTTP enqueue contract, or removed.

P1 — The feature ships with zero new tests. No test file is touched in the diff and there is no backend/notification_v2/tests/ directory. The highest-ROI gaps are pure functions that decide whether the right alert is sent with correct content and need no DB:

  • is_failure_run (unstract/core/.../data_models.py) — the single routing predicate: ERROR/STOPPED, COMPLETED + failed_files>0 (partial failure), failed_files=None, unknown-status.
  • clubbed renderer (notification_clubbed_renderer.py) — _humanize_timestamp portability (the %-d regression that was already fixed once), MAX_BATCH_SIZE=500 cap, Slack SLACK_MAX_DISPLAY_EVENTS=25 overflow footer, and the legacy single-event flat-key envelope shape (reworked twice in 577c881c/600eade0).
  • dispatch_attempts refund on broker-publish failure (internal_api_views._send_clubbed) — the exact invariant commit 16c2dbac had to fix; a regression silently dead-letters never-delivered failure alerts. Test with TestCase.captureOnCommitCallbacks(execute=True) + a mocked send_task that raises vs succeeds.

Priority order: address the broken test (P0) and the two correctness comments below (_load_execution fail-open, _update_file_aggregates invariant); the rest are accuracy/cleanup/simplification.

try:
return cast(WorkflowExecution, WorkflowExecution.objects.get(id=execution_id))
except WorkflowExecution.DoesNotExist:
logger.warning("WorkflowExecution %s not found", execution_id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Correctness / P1] _load_execution returns None both when no execution_id was supplied and when an execution_id was supplied but the row isn't found. _apply_failure_filter then treats None as "no filter" and returns every active row. For a missing-but-requested execution (replication lag / race between the status write and this fetch), this fails open: notify_on_failures=True subscribers get notified on successful runs — the opposite of the feature's intent. Suggest distinguishing the two cases: when execution_id is present but the lookup misses, retry briefly or fail closed (skip notify_on_failures rows) and emit a metric, rather than silently sending success alerts. The same pattern exists in pipeline_v2/notification.py (partially backstopped there by last_run_status).

return error_msg

@staticmethod
def _update_file_aggregates(execution, validated_data) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Validation invariant / P2] This setattrs total/successful/failed_files individually and saves only the present fields. The cross-field invariant (successful + failed <= total) is checked in WorkflowExecutionStatusUpdateSerializer.validate() against the submitted payload only. A partial update sending just failed_files (no total_files) is validated with total_files=None (check skipped) and persisted against a previously-stored, possibly-smaller total_files — so the row can end up with failed_files > total_files. Since is_failure_run keys on failed_files, a bad aggregate mis-routes the failure alerts this feature exists to deliver. Suggest re-validating against the merged/persisted values here (or in WorkflowExecution.save()/clean()).

Currently hosts ``mark_buffer_dead_letter`` — a thin task attached as a
Celery ``link_error`` to the clubbed dispatch chain. When the underlying
``send_webhook_notification`` task exhausts retries, this task converts
the buffered rows from PENDING/DISPATCHED to terminal DEAD_LETTER so the
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Docstring accuracy / P2] The module docstring says this task converts rows from PENDING/DISPATCHED to terminal DEAD_LETTER, but both mark_buffer_dead_letter (line 47) and mark_buffer_dispatched (line 80) only transition rows still in SENDING (.filter(status=BufferStatus.SENDING.value)). The inline comment at line 43 already states this correctly. Suggest: ...converts the buffered rows from SENDING to terminal DEAD_LETTER....

)
flush_after = models.DateTimeField(
db_comment=(
"created_at + NOTIFICATION_CLUB_INTERVAL, precomputed at enqueue. "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Comment accuracy / P2] The db_comment states created_at + NOTIFICATION_CLUB_INTERVAL, but enqueue computes flush_after = timezone.now() + timedelta(seconds=interval) using the per-org interval from get_org_club_interval_seconds (org override, with NOTIFICATION_CLUB_INTERVAL only as the fallback default) — not created_at, and not the bare setting. Suggest: enqueue-time now() + the org's effective club interval (NOTIFICATION_CLUB_INTERVAL default, per-org override), precomputed at enqueue. Apply the same fix to the matching db_comment in migration 0002_notification_batching.py.


Worker code is model-free: it forwards a notification_id + structured
payload here and lets the backend write the NotificationBuffer row.
Rejects rows whose source notification is not BATCHED so a worker
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Docstring accuracy / P2] The docstring says the endpoint Rejects rows whose source notification is not BATCHED, but there is no notification_type/BATCHED gate anywhere in the function body — every active notification is buffered. Either add the gate or drop this sentence so the docstring matches behavior (misleads maintainers into assuming a guard that isn't there).

# Task trigger command - can be overridden via environment variable
DEFAULT_TRIGGER_CMD="/app/.venv/bin/python /app/log_consumer/process_log_history.py"
TRIGGER_CMD="${TASK_TRIGGER_COMMAND:-$DEFAULT_TRIGGER_CMD}"
# Task 2: notification buffer flush (UNS-611 clubbed dispatch).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Cleanup / P2] Leftover JIRA reference — reviewers previously asked to scrub JIRA keys from comments/docstrings, and this UNS-611 was missed. Suggest: # Task 2: notification buffer flush (clubbed dispatch).

if (!sessionDetails?.orgId) {
return;
}
// Load org-scoped batch interval (UNS-611 v2). Falls back silently to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Cleanup / P2] Leftover UNS-611 v2 JIRA reference — please scrub per the earlier review request (other JIRA refs were removed in c2dd5ee03; this one was missed). Suggest: // Load org-scoped batch interval. Falls back silently to ...

org_id=org_id,
)
)
return len(rows), len(rows)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Simplification / P3] _dispatch_group returns len(rows), len(rows) — the two elements are always identical, the early returns are all 0, 0, and the sole caller discards the second (rows, _succeeded = _dispatch_group(...) at line 629). succeeded never diverges from rows. Collapse to a single int return, update the caller to rows = _dispatch_group(...), and fix the (rows, succeeded) wording in the docstring at line 503.


logger = logging.getLogger(__name__)

__all__ = ["MAX_BATCH_SIZE", "build_envelope", "render_clubbed_message"]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Dead code / P3] build_envelope is imported (line 19) and re-exported in __all__, but nothing imports it from this shim — internal_api_views.py and both worker providers import build_envelope directly from unstract.core.notification_clubbed_renderer. Only MAX_BATCH_SIZE and render_clubbed_message are consumed here. Drop build_envelope from the import and __all__ (or add a one-line comment if the re-export is a deliberate public surface).


def _render_for_slack(envelope: dict[str, Any]) -> dict[str, Any]:
"""Wrap the rendered Slack mrkdwn body in the dict shape Slack expects."""
return {"text": render_slack_text(envelope)}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Simplification / P3] _render_for_slack is a single-use one-liner (return {"text": render_slack_text(envelope)}) called once at line 44. Consider inlining it at the call site and deleting the helper — minor, keep if you value the named intent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants