diff --git a/backend/app/core/settings.py b/backend/app/core/settings.py index 4aa04afb..19621127 100644 --- a/backend/app/core/settings.py +++ b/backend/app/core/settings.py @@ -299,6 +299,18 @@ def _validate_judgments_resume_sweep_minutes(cls, value: int) -> int: default="512m", description="ES_JAVA_OPTS heap sizing for the elasticsearch+opensearch containers", ) + relyloop_worker_api_base_url: str = Field( + default="http://api:8000", + description=( + "Base URL the demo-reseed Arq worker uses for its API self-calls " + "(POST /api/v1/clusters etc.). Defaults to the Compose service alias " + "`http://api:8000` (the worker runs in its own container, so `localhost` " + "would resolve to the worker, not the API). Overridden to " + "`http://127.0.0.1:8000` by the demo-reseed integration harness, which " + "boots an in-process uvicorn on the test host. " + "chore_demo_seeding_integration_tests_rewrite D-3." + ), + ) relyloop_allow_private_clusters: bool = Field( default=True, description="Permit cluster registration against private-range / loopback IPs. " diff --git a/backend/tests/integration/test_demo_seeding.py b/backend/tests/integration/test_demo_seeding.py index 755388f1..8f07d286 100644 --- a/backend/tests/integration/test_demo_seeding.py +++ b/backend/tests/integration/test_demo_seeding.py @@ -2,34 +2,32 @@ # # SPDX-License-Identifier: Apache-2.0 -"""Integration tests for the demo-reseed flow. - -PAUSED PER bug_demo_reseed_fake_metric_regression — the sync-flow tests -in this file were written against the previous handler that ran the -entire reseed inline and returned a ReseedSummary synchronously. The -handler now enqueues an Arq job and returns 202 immediately; status -flows through a Redis-backed polling endpoint. Rewriting the test -suite for the new async flow is tracked in the bug folder's "Follow-up -work" section. - -Until the rewrite lands: - -* Unit coverage of the new flow lives at - ``backend/tests/unit/services/test_demo_seeding_status.py`` (14 cases - covering the Pydantic shape, search_space builder, and Redis - status_get/status_set round-trip). -* The contract test at - ``backend/tests/contract/test_openapi_surface.py`` enforces the new - 202 + GET /status surface. -* End-to-end coverage on the real stack is provided by - ``backend/tests/smoke/test_demo_reseed_real_studies.py`` once it - lands (per the bug folder's regression-test commitment). +"""Integration tests for the async demo-reseed flow. + +``POST /api/v1/_test/demo/reseed`` enqueues an Arq job and returns **202** +with an initial ``ReseedStatusResponse{status:"running"}``; the worker +(:func:`backend.workers.demo_reseed.run_demo_reseed`) does the real +wipe + reseed and writes per-phase status to Redis; the frontend polls +``GET /api/v1/_test/demo/reseed/status`` for terminal ``complete`` / ``failed``. + +These tests exercise that contract end-to-end against real Postgres + ES/OS ++ Redis (heavy CI lane). The worker is invoked **inline** in the test process +(``await run_demo_reseed(ctx)``) so the advisory lock is held on a connection +this process can observe via ``pg_locks`` (AC-16) and so the cleanup gate +(AC-12) can be driven from the same event loop. The queued Arq job is never +consumed, so the singleton dedup keys are cleared before each test AND between +consecutive POSTs in one test. + +Skips cleanly when Postgres / ES / OS / Redis are unbound. Unit coverage of +the Redis status helpers lives at +``backend/tests/unit/services/test_demo_seeding_status.py``. """ from __future__ import annotations import asyncio import logging +import os import socket import threading from collections.abc import AsyncIterator @@ -39,30 +37,35 @@ import httpx import pytest import pytest_asyncio +from redis.asyncio import Redis from sqlalchemy import text from sqlalchemy.ext.asyncio import create_async_engine -from backend.app.services.demo_seeding import DEMO_RESEED_LOCK_KEY +from backend.app.services.demo_seeding import ( + DEMO_RESEED_LOCK_KEY, + DEMO_RESEED_STATUS_KEY, + ReseedStatusResponse, + _now_iso, +) +from backend.app.services.demo_seeding import status_set as _status_set from backend.tests.conftest import postgres_reachable from backend.tests.integration._demo_reseed_uvicorn import running_uvicorn -# bug_demo_reseed_fake_metric_regression — the sync-flow tests in this -# file are pre-async-flow. Pause all tests here until the file is -# rewritten for 202 + Redis-poll. Unit coverage lives at -# backend/tests/unit/services/test_demo_seeding_status.py. -pytestmark = [ - pytest.mark.integration, - pytest.mark.skip( - reason=( - "Sync-flow tests paused — bug_demo_reseed_fake_metric_regression " - "converted the reseed handler to async enqueue + poll." - ) - ), -] +# Arq 0.28.0 singleton dedup keys for _job_id="demo_reseed:singleton". The +# inline harness never consumes the queued job, so these persist (~24h TTL) +# and would silently drop the next POST's enqueue — clear them before each +# test and between consecutive POSTs. +_SINGLETON_DEDUP_KEYS: tuple[str, ...] = ( + "arq:job:demo_reseed:singleton", + "arq:result:demo_reseed:singleton", + "arq:in-progress:demo_reseed:singleton", +) + +pytestmark = [pytest.mark.integration] # --------------------------------------------------------------------------- -# Reachability helpers — skip the test gracefully if ES/OS aren't bound. +# Reachability helpers — skip the test gracefully if ES/OS/Redis aren't bound. # --------------------------------------------------------------------------- @@ -76,8 +79,6 @@ def _tcp_open(host: str, port: int, timeout: float = 0.5) -> bool: def _engine_reachable() -> bool: """Check ES (9200) + OS (9201) are bound on localhost (or compose dns).""" - # Try both localhost and the Compose DNS name; the test process may be - # the host (port-bound) or a CI service-container peer (DNS). for host in ("127.0.0.1", "localhost", "elasticsearch", "opensearch"): if _tcp_open(host, 9200, 0.3) and _tcp_open("127.0.0.1", 9201, 0.3): return True @@ -88,43 +89,52 @@ def _engine_reachable() -> bool: return False -# Skip the whole module if neither Postgres nor the engines are reachable. -# We can't drive the reseed without both. -if not postgres_reachable() or not _engine_reachable(): +def _redis_reachable() -> bool: + """Best-effort TCP probe of the configured Redis (host:port from redis_url).""" + from urllib.parse import urlparse + + from backend.app.core.settings import get_settings + + try: + parsed = urlparse(get_settings().redis_url) + except Exception: # noqa: BLE001 + return False + host = parsed.hostname or "127.0.0.1" + port = parsed.port or 6379 + if _tcp_open(host, port, 0.3): + return True + # The configured host may be a Compose DNS name (`redis`) unresolvable + # from the test host — fall back to loopback. + return _tcp_open("127.0.0.1", port, 0.3) + + +if not postgres_reachable() or not _engine_reachable() or not _redis_reachable(): pytest.skip( - "demo reseed integration tests require Postgres + ES + OS service " - "containers. Run via `make test-integration` against the dev stack " - "or in CI where the service containers are provisioned.", + "demo reseed integration tests require Postgres + ES + OS + Redis " + "service containers. Run via `make test-integration` against the dev " + "stack or in CI where the service containers are provisioned.", allow_module_level=True, ) # --------------------------------------------------------------------------- -# Module-scoped uvicorn fixture — one server for all 9 tests in this file. -# AC-4's timeout test lives in test_demo_seeding_timeout.py with its own -# function-scoped fixture to avoid ReadTimeout residual contamination. +# Carried-forward autouse fixtures (credentials + engine-host patching). # --------------------------------------------------------------------------- @pytest.fixture(scope="module", autouse=True) def _stub_cluster_credentials(tmp_path_factory: pytest.TempPathFactory) -> Any: - """Provide ``Settings.cluster_credentials_yaml`` for the test. - - The backend job in ``.github/workflows/pr.yml`` writes - ``./secrets/cluster_credentials.yaml`` but doesn't set - ``CLUSTER_CREDENTIALS_FILE`` for the pytest step — so - ``get_settings().cluster_credentials_yaml`` returns ``None``, - which makes the in-orchestrator ``POST /api/v1/clusters`` probe - fail with ``CredentialsMissing`` → 503 ``CLUSTER_UNREACHABLE``. - - Mount a tmp file with the ``local-es`` + ``local-opensearch`` - credentials the demo scenarios reference, set - ``CLUSTER_CREDENTIALS_FILE`` so the env-var read survives the - autouse ``_clear_settings_caches`` fixture (which runs before - every test and dumps the Settings lru_cache). + """Provide ``Settings.cluster_credentials_yaml`` + redirect the worker's + API self-call base URL to the in-process uvicorn. + + The credentials file backs the in-orchestrator ``POST /api/v1/clusters`` + probe (without it the probe 503s ``CLUSTER_UNREACHABLE``). The + ``RELYLOOP_WORKER_API_BASE_URL`` override (Story 0.1 / D-3) points the + worker's inline self-calls at ``127.0.0.1:8000`` — the harness uvicorn — + instead of the ``api:8000`` Compose alias the worker container would use + in production. Both env vars survive the autouse ``_clear_settings_caches`` + fixture (which dumps the Settings lru_cache before every test). """ - import os - tmp = tmp_path_factory.mktemp("demo_reseed_credentials") creds_file = tmp / "cluster_credentials.yaml" creds_file.write_text( @@ -135,43 +145,31 @@ def _stub_cluster_credentials(tmp_path_factory: pytest.TempPathFactory) -> Any: " username: admin\n" " password: admin\n" ) - original = os.environ.get("CLUSTER_CREDENTIALS_FILE") + prev_creds = os.environ.get("CLUSTER_CREDENTIALS_FILE") + prev_base = os.environ.get("RELYLOOP_WORKER_API_BASE_URL") os.environ["CLUSTER_CREDENTIALS_FILE"] = str(creds_file) + os.environ["RELYLOOP_WORKER_API_BASE_URL"] = "http://127.0.0.1:8000" try: yield finally: - if original is None: - os.environ.pop("CLUSTER_CREDENTIALS_FILE", None) - else: - os.environ["CLUSTER_CREDENTIALS_FILE"] = original + for key, prev in ( + ("CLUSTER_CREDENTIALS_FILE", prev_creds), + ("RELYLOOP_WORKER_API_BASE_URL", prev_base), + ): + if prev is None: + os.environ.pop(key, None) + else: + os.environ[key] = prev @pytest.fixture(scope="module", autouse=True) def _patch_engine_for_test_host() -> Any: - """Three patches to make the in-process uvicorn reach the CI services. - - The production resolver maps localhost:9200/9201 → Compose-DNS - elasticsearch:9200/opensearch:9201 — correct for the API container at - runtime. In the test environment the in-process uvicorn runs on the - GHA runner HOST (not in a container), where the Compose-DNS names - don't resolve (services are reachable only via forwarded ports). - - 1. The service-module's ``_resolve_engine_base_url`` reference so - engine self-calls (PUT/POST/DELETE against ES/OS) go to - ``127.0.0.1:9200`` / ``127.0.0.1:9201``. - 2. The route-handler's re-import of the same symbol so cleanup - index DELETEs land on the same loopback ports. - 3. The service-module's ``SCENARIOS`` list — each scenario's - ``base_url`` ships as ``http://elasticsearch:9200`` / - ``http://opensearch:9200`` (the value the api container stores - on the ``clusters`` row). When the test process calls - ``POST /api/v1/clusters``, the cluster-create handler probes - that ``base_url`` — and the test process can't resolve those - Compose DNS names. Rewrite to ``127.0.0.1`` for the test. + """Map localhost:9200/9201 → 127.0.0.1 loopback for the in-process uvicorn, + and rewrite each scenario's ``base_url`` to loopback so the cluster-create + probe resolves from the test host (which can't resolve Compose DNS names). """ import copy - import backend.app.api.v1._test as test_mod import backend.app.services.demo_seeding as svc_mod def passthrough(host_base_url: str) -> str: @@ -191,10 +189,10 @@ def passthrough(host_base_url: str) -> str: scenario["base_url"] = "http://127.0.0.1:9201" svc_mod.SCENARIOS = patched_scenarios try: - with ( - patch.object(svc_mod, "_resolve_engine_base_url", passthrough), - patch.object(test_mod, "_resolve_engine_base_url", passthrough), - ): + # Only demo_seeding owns _resolve_engine_base_url now (the async + # refactor moved cleanup out of the _test route handler), so patch it + # there only — _test.py no longer has the symbol. + with patch.object(svc_mod, "_resolve_engine_base_url", passthrough): yield finally: svc_mod.SCENARIOS = original_scenarios @@ -210,13 +208,34 @@ async def demo_reseed_base_url() -> AsyncIterator[str]: async def demo_reseed_client( demo_reseed_base_url: str, ) -> AsyncIterator[httpx.AsyncClient]: - async with httpx.AsyncClient(base_url=demo_reseed_base_url, timeout=180.0) as client: + async with httpx.AsyncClient(base_url=demo_reseed_base_url, timeout=300.0) as client: yield client +@pytest_asyncio.fixture +async def arq_ctx() -> AsyncIterator[dict[str, Any]]: + """Yield ``{"redis": }`` suitable for ``run_demo_reseed(ctx)``. + + The worker reads ``ctx["redis"]`` for status writes; everything else + (``get_engine``/``get_session_factory``/``get_settings``) resolves to the + process-wide instances the test queries. + """ + from backend.app.core.settings import get_settings + + redis: Redis = Redis.from_url(get_settings().redis_url, decode_responses=False) + try: + await redis.ping() + except Exception as exc: # noqa: BLE001 + await redis.aclose() + pytest.skip(f"Redis not reachable for arq_ctx: {exc}") + try: + yield {"redis": redis} + finally: + await redis.aclose() + + # --------------------------------------------------------------------------- -# Test-only DB engine for assertions / observers (separate from the -# request-scope ``AsyncSession`` the handler uses). +# Test-only DB engine for assertions / observers. # --------------------------------------------------------------------------- @@ -233,16 +252,22 @@ async def _table_count(engine: Any, table: str) -> int: async def _truncate_all_demo_tables(engine: Any) -> None: - """Reset DB between tests in this module — the autouse cleanup in - ``conftest.py`` only DELETEs Phase 2 tables; we also need to reset - sequences / RESTART IDENTITY so judgments cascade cleanly. - """ from backend.app.services.demo_seeding import _TRUNCATE_DEMO_TABLES_SQL async with engine.begin() as conn: await conn.execute(text(_TRUNCATE_DEMO_TABLES_SQL)) +async def _clear_singleton_dedup_keys(redis: Redis) -> None: + """DELETE the three Arq singleton dedup keys. Idempotent.""" + await redis.delete(*_SINGLETON_DEDUP_KEYS) + + +async def _clear_status_key(redis: Redis) -> None: + """DELETE the Redis status key so a stale running/complete never leaks a 409.""" + await redis.delete(DEMO_RESEED_STATUS_KEY) + + @pytest.fixture async def db_engine() -> Any: engine = _make_test_engine() @@ -254,12 +279,20 @@ async def db_engine() -> Any: @pytest.fixture(autouse=True) async def _clean_demo_state_before_each(db_engine: Any) -> Any: - """Wipe demo tables + ES/OS indices before EACH test in this module. - - We need an empty starting state for AC-1; previous tests may have - populated. Cheap (TRUNCATE on small tables). + """Wipe demo tables + ES/OS indices + the Redis status/dedup keys before + EACH test so every case starts from a clean, 409-free state. """ await _truncate_all_demo_tables(db_engine) + + from backend.app.core.settings import get_settings + + redis: Redis = Redis.from_url(get_settings().redis_url, decode_responses=False) + try: + await _clear_status_key(redis) + await _clear_singleton_dedup_keys(redis) + finally: + await redis.aclose() + # Best-effort wipe ES + OS indices in case a prior test left them. async with httpx.AsyncClient(timeout=10.0) as wipe_client: for idx in ("products", "docs-articles", "job-listings"): @@ -279,108 +312,230 @@ async def _clean_demo_state_before_each(db_engine: Any) -> Any: yield +# --------------------------------------------------------------------------- +# POST-then-poll helpers (Story 1.2). +# --------------------------------------------------------------------------- + + +async def _get_status(client: httpx.AsyncClient) -> dict[str, Any]: + r = await client.get("/api/v1/_test/demo/reseed/status") + assert r.status_code == 200, r.text + return r.json() + + +def _scenarios_total() -> int: + """len(SCENARIOS) + 1 — the rich ESCI scenario the POST handler adds.""" + from backend.app.services.demo_seeding import SCENARIOS + + return len(SCENARIOS) + 1 + + +async def post_and_run_to_terminal( + client: httpx.AsyncClient, ctx: dict[str, Any] +) -> dict[str, Any]: + """POST → assert 202 + initial running → drive the worker inline → return + the terminal ``GET /status`` payload. Clears the singleton dedup keys after + the inline run so a subsequent POST in the same test enqueues cleanly. + """ + from backend.workers.demo_reseed import run_demo_reseed + + resp = await client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text + initial = resp.json() + assert initial["status"] == "running", initial + assert initial["scenarios_completed"] == 0, initial + assert initial["scenarios_total"] == _scenarios_total(), initial + assert initial["current_step"], initial + assert initial["summary"] is None, initial + + await run_demo_reseed(ctx) + await _clear_singleton_dedup_keys(ctx["redis"]) + return await _get_status(client) + + # --------------------------------------------------------------------------- # AC-1: happy path on a clean DB. # --------------------------------------------------------------------------- async def test_reseed_happy_path_on_clean_db( - demo_reseed_client: httpx.AsyncClient, db_engine: Any + demo_reseed_client: httpx.AsyncClient, + db_engine: Any, + arq_ctx: dict[str, Any], + caplog: pytest.LogCaptureFixture, ) -> None: - response = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}, timeout=180.0) - assert response.status_code == 200, response.text - body = response.json() - assert body["clusters_created"] == 4 - assert body["query_sets_created"] == 4 - assert body["studies_completed"] == 4 - assert body["proposals_created"] == 4 - assert body["duration_ms"] > 0 - # Spec §9 invariants - assert await _table_count(db_engine, "clusters") == 4 - assert await _table_count(db_engine, "query_sets") == 4 - assert await _table_count(db_engine, "query_templates") == 4 - assert await _table_count(db_engine, "judgment_lists") == 4 - assert await _table_count(db_engine, "studies") == 4 - assert await _table_count(db_engine, "digests") == 4 - assert await _table_count(db_engine, "proposals") == 4 - # trials: 2 per study (winner + comparison) = 8 - assert await _table_count(db_engine, "trials") == 8 + # Diagnostic: capture the in-process uvicorn's unhandled-exception log so a + # 500 from an api self-call surfaces the real traceback (the generic 500 + # body only says "has been notified"). + caplog.set_level(logging.ERROR, logger="backend.app.api.errors") + terminal = await post_and_run_to_terminal(demo_reseed_client, arq_ctx) + if terminal["status"] != "complete": + errs = [ + f"{type(r.exc_info[1]).__name__ if r.exc_info else '?'}: " + f"{r.exc_info[1] if r.exc_info else r.getMessage()}" + for r in caplog.records + if r.name == "backend.app.api.errors" + ] + raise AssertionError(f"reseed not complete: {terminal}\nserver exceptions: {errs[-5:]}") + assert terminal["status"] == "complete", terminal + assert terminal["summary"] is not None, terminal + assert terminal["scenarios_completed"] == terminal["scenarios_total"] + + # Counts read runtime summary, never a hardcoded 4/5 (rich scenario may + # run when an OpenAI key is present → N==5, else N==4). + summary = terminal["summary"] + n = summary["clusters_created"] + # Summary symmetry (demo_seeding.py:1618-1626): clusters + query_sets track + # len(SCENARIOS)+rich_count exactly; studies == proposals == len(results)+rich. + assert summary["query_sets_created"] == n + assert summary["studies_completed"] == summary["proposals_created"] + + # DB counts vs runtime N. `==` for clusters/proposals/query_sets (the rich + # path registers its own query set → still == N); `>=` for tables the + # rich/UBI re-entry may augment. + assert await _table_count(db_engine, "clusters") == n + assert await _table_count(db_engine, "proposals") == n + assert await _table_count(db_engine, "query_sets") == n + assert await _table_count(db_engine, "query_templates") >= n + assert await _table_count(db_engine, "judgment_lists") >= n + assert await _table_count(db_engine, "studies") >= n + assert await _table_count(db_engine, "digests") >= n # --------------------------------------------------------------------------- -# AC-2: replaces pre-populated demo state. +# AC-2: replaces pre-populated demo state (disjoint cluster ids). # --------------------------------------------------------------------------- async def test_reseed_replaces_populated_demo_state( - demo_reseed_client: httpx.AsyncClient, db_engine: Any + demo_reseed_client: httpx.AsyncClient, db_engine: Any, arq_ctx: dict[str, Any] ) -> None: - # First reseed: populate. - first = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}, timeout=180.0) - assert first.status_code == 200, first.text + first = await post_and_run_to_terminal(demo_reseed_client, arq_ctx) + assert first["status"] == "complete", first async with db_engine.connect() as conn: - rows = await conn.execute(text("SELECT id FROM clusters ORDER BY name")) - first_cluster_ids = {row[0] for row in rows} - assert len(first_cluster_ids) == 4 + first_ids = {row[0] for row in await conn.execute(text("SELECT id FROM clusters"))} + assert first_ids - # Second reseed: replace. - second = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}, timeout=180.0) - assert second.status_code == 200, second.text + # The helper already cleared the dedup keys after the first run, so the + # second POST enqueues cleanly. + second = await post_and_run_to_terminal(demo_reseed_client, arq_ctx) + assert second["status"] == "complete", second async with db_engine.connect() as conn: - rows = await conn.execute(text("SELECT id FROM clusters ORDER BY name")) - second_cluster_ids = {row[0] for row in rows} - assert len(second_cluster_ids) == 4 - # New UUIDs — the TRUNCATE wiped the originals. - assert first_cluster_ids.isdisjoint(second_cluster_ids) + second_ids = {row[0] for row in await conn.execute(text("SELECT id FROM clusters"))} + assert second_ids + # UUIDv7 PKs (backend/app/db/models/cluster.py) are fresh after + # TRUNCATE ... RESTART IDENTITY CASCADE. + assert first_ids.isdisjoint(second_ids) # --------------------------------------------------------------------------- -# AC-3: concurrent reseed returns 409. +# AC-3: concurrent reseed returns 409; + ARQ_POOL_UNAVAILABLE 503 micro-case. # --------------------------------------------------------------------------- async def test_concurrent_reseed_returns_409( + demo_reseed_client: httpx.AsyncClient, arq_ctx: dict[str, Any] +) -> None: + # Seed a fresh `running` status directly so the POST's stale-check passes + # (not stale) and it 409s. Do NOT drive the worker. + await _status_set( + arq_ctx["redis"], + ReseedStatusResponse( + status="running", + started_at=_now_iso(), + scenarios_total=_scenarios_total(), + scenarios_completed=0, + ), + ) + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 409, resp.text + detail = resp.json()["detail"] + assert detail["error_code"] == "SEED_IN_PROGRESS" + assert isinstance(detail["message"], str) and detail["message"] + assert detail["retryable"] is True + # Clear so the next test doesn't 409 unexpectedly. + await _clear_status_key(arq_ctx["redis"]) + + +async def test_reseed_arq_pool_unavailable_returns_503( demo_reseed_client: httpx.AsyncClient, ) -> None: - async with ( - httpx.AsyncClient(base_url=str(demo_reseed_client.base_url), timeout=180.0) as client_a, - httpx.AsyncClient(base_url=str(demo_reseed_client.base_url), timeout=180.0) as client_b, - ): - results = await asyncio.gather( - client_a.post("/api/v1/_test/demo/reseed", json={}), - client_b.post("/api/v1/_test/demo/reseed", json={}), - return_exceptions=False, - ) - statuses = sorted(r.status_code for r in results) - # One 200, one 409. Either request can win the lock race. - assert statuses == [200, 409], [r.status_code for r in results] - bodies = {r.status_code: r.json() for r in results} - assert bodies[409]["detail"]["error_code"] == "SEED_IN_PROGRESS" - assert bodies[409]["detail"]["retryable"] is True + from backend.app.main import app + + prev = getattr(app.state, "arq_pool", None) + app.state.arq_pool = None + try: + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + finally: + app.state.arq_pool = prev + assert resp.status_code == 503, resp.text + detail = resp.json()["detail"] + assert detail["error_code"] == "ARQ_POOL_UNAVAILABLE" + assert detail["retryable"] is True + + +# --------------------------------------------------------------------------- +# AC-5: mid-loop engine failure → terminal `failed` + cleanup log. +# --------------------------------------------------------------------------- + + +async def test_reseed_mid_flight_engine_failure_drives_failed_and_cleanup( + demo_reseed_client: httpx.AsyncClient, + arq_ctx: dict[str, Any], + caplog: pytest.LogCaptureFixture, +) -> None: + from backend.workers.demo_reseed import run_demo_reseed + + caplog.set_level(logging.INFO, logger="backend.app.services.demo_seeding") + + original_put = httpx.AsyncClient.put + call_count = {"engine_put": 0} + fail_threshold = 6 # scenario-1's mapping + docs PUTs succeed; fail on scenario-2. + + async def counting_put(self: httpx.AsyncClient, url: Any, *args: Any, **kwargs: Any) -> Any: + url_str = str(url) + if ":9200" in url_str or ":9201" in url_str: + call_count["engine_put"] += 1 + if call_count["engine_put"] > fail_threshold: + raise httpx.ConnectError( + "simulated ES unreachable", request=httpx.Request("PUT", url) + ) + return await original_put(self, url, *args, **kwargs) + + with patch.object(httpx.AsyncClient, "put", counting_put): + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text + await run_demo_reseed(arq_ctx) # inner handler catches → cleanup → failed → returns + await _clear_singleton_dedup_keys(arq_ctx["redis"]) + + terminal = await _get_status(demo_reseed_client) + assert terminal["status"] == "failed", terminal + assert terminal["failed_reason"], terminal + assert any(r.message == "demo_reseed_cleanup_truncated" for r in caplog.records), ( + f"no cleanup log; saw {[r.message for r in caplog.records][-15:]}" + ) # --------------------------------------------------------------------------- -# AC-13: TRUNCATE commits before any self-call (log ordering). +# AC-13: TRUNCATE commits before any api self-call (log ordering). # --------------------------------------------------------------------------- async def test_truncate_commits_before_first_self_call( demo_reseed_client: httpx.AsyncClient, + arq_ctx: dict[str, Any], caplog: pytest.LogCaptureFixture, ) -> None: caplog.set_level(logging.INFO, logger="backend.app.services.demo_seeding") - caplog.set_level(logging.INFO, logger="backend.app.api.v1._test") - response = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}, timeout=180.0) - assert response.status_code == 200, response.text - # caplog captures logs in the test process — uvicorn runs in a - # background thread of THIS same process, so the records land. + terminal = await post_and_run_to_terminal(demo_reseed_client, arq_ctx) + assert terminal["status"] == "complete", terminal + messages = [r.message for r in caplog.records] truncate_idx = next( (i for i, m in enumerate(messages) if m == "demo_reseed_truncate_committed"), None, ) assert truncate_idx is not None, f"missing truncate log; saw {messages[:10]}" - # Find the first api-client api/v1/clusters POST start log. first_cluster_call_idx = next( ( i @@ -395,125 +550,59 @@ async def test_truncate_commits_before_first_self_call( "no demo_reseed_api_call_started log for POST /api/v1/clusters" ) assert truncate_idx < first_cluster_call_idx, ( - f"TRUNCATE commit log (idx={truncate_idx}) MUST precede first " - f"api-client self-call log (idx={first_cluster_call_idx})" + f"TRUNCATE commit (idx={truncate_idx}) MUST precede first api self-call " + f"(idx={first_cluster_call_idx})" ) # --------------------------------------------------------------------------- -# AC-14: natural failure (api_client side) cleans up deterministically. +# AC-14: natural failure cleanup is deterministic (wiped tables are 0). # --------------------------------------------------------------------------- -async def test_natural_failure_cleanup_after_python_control_returns( - demo_reseed_client: httpx.AsyncClient, db_engine: Any +async def test_natural_failure_cleanup_is_deterministic( + demo_reseed_client: httpx.AsyncClient, db_engine: Any, arq_ctx: dict[str, Any] ) -> None: - """Monkeypatch the test_seeding service so the FIRST scenario's - ``seed-completed`` self-call fails with a RuntimeError. The - orchestrator unwinds → cleanup TRUNCATEs partial state → 503. - """ - from backend.app.api.v1 import _test as test_mod - from backend.app.services import test_seeding - - original = test_seeding.seed_study_completed_with_digest - - async def _raise(*args: Any, **kwargs: Any) -> None: - raise RuntimeError("simulated api self-call failure") - - # _test.py imports the function into its own namespace at module load — - # patching only the service module's attribute leaves the handler's - # local reference pointing at the unpatched original. Patch BOTH. - with ( - patch.object(test_seeding, "seed_study_completed_with_digest", _raise), - patch.object(test_mod, "seed_study_completed_with_digest", _raise), - ): - response = await demo_reseed_client.post( - "/api/v1/_test/demo/reseed", json={}, timeout=180.0 - ) - assert response.status_code == 503, response.text - body = response.json() - assert body["detail"]["error_code"] == "SEED_FAILED" - # Cleanup ran — demo tables are deterministically empty. - assert await _table_count(db_engine, "clusters") == 0 - assert await _table_count(db_engine, "studies") == 0 - assert await _table_count(db_engine, "query_sets") == 0 - # Sanity: the original symbol is restored after the with-block. - assert test_seeding.seed_study_completed_with_digest is original + from backend.workers.demo_reseed import run_demo_reseed - -# --------------------------------------------------------------------------- -# AC-5: mid-loop ES failure — partial state cleans up, 503 returns. -# --------------------------------------------------------------------------- - - -async def test_reseed_mid_flight_engine_failure_returns_503_and_cleans_up( - demo_reseed_client: httpx.AsyncClient, db_engine: Any -) -> None: - """Monkeypatch ``httpx.AsyncClient.put`` to fail on the SECOND - scenario's first engine PUT. The first scenario completes (real ES - PUT succeeds), then the second scenario's PUT raises ConnectError. - Orchestrator unwinds → cleanup → 503. - """ original_put = httpx.AsyncClient.put call_count = {"engine_put": 0} - # The first scenario does: 1 PUT (mapping) + 5 PUT (docs) = 6 engine PUTs - # before we want to fail. Fail on the 7th (start of scenario 2's PUTs). fail_threshold = 6 async def counting_put(self: httpx.AsyncClient, url: Any, *args: Any, **kwargs: Any) -> Any: - # Only count engine-targeted PUTs (i.e., not the api self-call client). - # Match by port — engine clients hit :9200 / :9201; api self-calls hit :8000. url_str = str(url) if ":9200" in url_str or ":9201" in url_str: call_count["engine_put"] += 1 if call_count["engine_put"] > fail_threshold: - raise httpx.ConnectError("simulated ES unreachable", request=None) + raise httpx.ConnectError( + "simulated ES unreachable", request=httpx.Request("PUT", url) + ) return await original_put(self, url, *args, **kwargs) with patch.object(httpx.AsyncClient, "put", counting_put): - response = await demo_reseed_client.post( - "/api/v1/_test/demo/reseed", json={}, timeout=180.0 - ) - assert response.status_code == 503, response.text - body = response.json() - assert body["detail"]["error_code"] == "SEED_FAILED" - # Cleanup ran — partial scenario-1 state is gone. + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text + await run_demo_reseed(arq_ctx) + await _clear_singleton_dedup_keys(arq_ctx["redis"]) + + terminal = await _get_status(demo_reseed_client) + assert terminal["status"] == "failed", terminal + # run_demo_reseed_cleanup TRUNCATEs the demo tables under the held lock. assert await _table_count(db_engine, "clusters") == 0 - # The `products` index (scenario 1) was deleted by cleanup. - async with httpx.AsyncClient(timeout=10.0) as check: - for host in ("http://elasticsearch:9200", "http://127.0.0.1:9200"): - try: - head = await check.head(f"{host}/products", auth=("elastic", "changeme")) - assert head.status_code == 404, ( - f"products index still exists after cleanup: {head.status_code}" - ) - break - except (httpx.ConnectError, httpx.RemoteProtocolError): - continue + assert await _table_count(db_engine, "studies") == 0 + assert await _table_count(db_engine, "query_sets") == 0 # --------------------------------------------------------------------------- -# AC-15: dual-client contract — no role mixing, correct basic auth. +# AC-15: dual-client contract — no role mixing. # --------------------------------------------------------------------------- async def test_dual_client_contract_no_role_mixing( - demo_reseed_client: httpx.AsyncClient, + demo_reseed_client: httpx.AsyncClient, arq_ctx: dict[str, Any] ) -> None: - """Record every httpx request and assert: - - * Every ``/api/v1/*`` request hits ``localhost:8000`` (api self-call). - * Every ``:9200`` request lands on the engine client (ES port). - * Every ``:9201`` request lands on the engine client (OS port). - * No api request bleeds into the engine port range and vice versa. - - Authorization-header correctness is NOT asserted at this layer — - httpx applies ``auth=...`` arguments inside ``AsyncClient.send`` AFTER - the interceptor sees the request, so the Authorization header isn't - visible to the recorder. AC-1's happy-path already proves the auth is - correct: a wrong auth would 401 on the ES PUT and bubble up as - 503 ``SEED_FAILED``. - """ + from backend.workers.demo_reseed import run_demo_reseed + recorded: list[dict[str, Any]] = [] original_send = httpx.AsyncClient.send @@ -524,42 +613,34 @@ async def recording_send( return await original_send(self, request, *args, **kwargs) with patch.object(httpx.AsyncClient, "send", recording_send): - response = await demo_reseed_client.post( - "/api/v1/_test/demo/reseed", json={}, timeout=180.0 - ) - assert response.status_code == 200, response.text + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text + await run_demo_reseed(arq_ctx) + await _clear_singleton_dedup_keys(arq_ctx["redis"]) + terminal = await _get_status(demo_reseed_client) + assert terminal["status"] == "complete", terminal # Partition by port: api self-calls hit :8000; engine calls hit :9200/:9201. - # The resolver-patch test fixture makes these all loopback addresses; the - # production resolver uses ``elasticsearch`` / ``opensearch`` Compose DNS - # names — partition by port so the assertion is robust. api_requests = [r for r in recorded if ":8000" in r["url"]] es_requests = [r for r in recorded if ":9200" in r["url"]] os_requests = [r for r in recorded if ":9201" in r["url"]] - assert len(api_requests) > 0, "no api-client self-calls recorded" - assert len(es_requests) > 0, "no engine ES requests recorded" - assert len(os_requests) > 0, "no engine OS requests recorded" - # No role mixing: no api-client request hit an engine port, no - # engine-client request hit the api port. + assert api_requests, "no api-client self-calls recorded" + assert es_requests, "no engine ES requests recorded" + assert os_requests, "no engine OS requests recorded" for r in api_requests: assert ":9200" not in r["url"] and ":9201" not in r["url"], ( - f"api request unexpectedly hit engine port: {r}" + f"api request hit an engine port: {r}" ) for r in es_requests + os_requests: - assert ":8000" not in r["url"], f"engine request unexpectedly hit api port: {r}" + assert ":8000" not in r["url"], f"engine request hit the api port: {r}" # --------------------------------------------------------------------------- -# AC-16: advisory lock pinned to one Postgres connection. +# AC-16: advisory lock pinned to one Postgres connection (inline worker). # --------------------------------------------------------------------------- def _pg_locks_key_parts(key: int) -> tuple[int, int]: - """Split a signed int64 key into Postgres ``pg_locks`` (classid, objid). - - Postgres stores a single-bigint advisory lock as two 32-bit ints in - ``pg_locks``. Per cycle-5 plan-review finding B3. - """ key_u64 = key & ((1 << 64) - 1) classid = (key_u64 >> 32) & 0xFFFFFFFF objid = key_u64 & 0xFFFFFFFF @@ -567,15 +648,15 @@ def _pg_locks_key_parts(key: int) -> tuple[int, int]: async def test_advisory_lock_pinned_to_one_connection( - demo_reseed_client: httpx.AsyncClient, db_engine: Any + demo_reseed_client: httpx.AsyncClient, db_engine: Any, arq_ctx: dict[str, Any] ) -> None: - classid, objid = _pg_locks_key_parts(DEMO_RESEED_LOCK_KEY) + from backend.workers.demo_reseed import run_demo_reseed + classid, objid = _pg_locks_key_parts(DEMO_RESEED_LOCK_KEY) observed_pids: list[int] = [] observer_done = asyncio.Event() async def _observer() -> None: - # Poll pg_locks every 200ms while the reseed is running. while not observer_done.is_set(): async with db_engine.connect() as conn: rows = ( @@ -594,28 +675,23 @@ async def _observer() -> None: except TimeoutError: continue + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text observer_task = asyncio.create_task(_observer()) try: - response = await demo_reseed_client.post( - "/api/v1/_test/demo/reseed", json={}, timeout=180.0 - ) - assert response.status_code == 200, response.text + await run_demo_reseed(arq_ctx) # holds the lock on get_engine().connect() in-process finally: observer_done.set() await observer_task + await _clear_singleton_dedup_keys(arq_ctx["redis"]) - # During the reseed, the lock was held — at least one observation. assert observed_pids, ( - "observer never saw the advisory lock — possibly the reseed " - "finished too fast for the 200ms poll. Increase the workload " - "if this test starts flaking." + "observer never saw the advisory lock — possibly the reseed finished " + "too fast for the 200ms poll. Increase the workload if this flakes." ) - # The pid never changed — same backend held the lock throughout. assert len(set(observed_pids)) == 1, ( f"advisory lock changed pids mid-flight: {sorted(set(observed_pids))}" ) - - # After the handler returned, the lock is gone. async with db_engine.connect() as conn: post_rows = ( await conn.execute( @@ -626,35 +702,29 @@ async def _observer() -> None: {"c": classid, "o": objid}, ) ).all() - assert not post_rows, f"lock still held after handler return: {post_rows}" + assert not post_rows, f"lock still held after the inline worker returned: {post_rows}" # --------------------------------------------------------------------------- -# AC-12: cleanup-while-locked blocks a concurrent reseed. +# AC-12: cleanup-while-locked blocks a concurrent reseed (inline-task). # --------------------------------------------------------------------------- async def test_cleanup_while_locked_blocks_concurrent_reseed( - demo_reseed_client: httpx.AsyncClient, db_engine: Any + demo_reseed_client: httpx.AsyncClient, arq_ctx: dict[str, Any] ) -> None: - """Inject a ``threading.Event`` test gate into the cleanup pass. The - test: - - 1. Triggers request A with a forced-failure monkeypatch. - 2. Waits for A's handler to enter the cleanup pass (gated by event). - 3. Fires request B and asserts 409 SEED_IN_PROGRESS (A still holds - the lock). - 4. Signals the gate — A's cleanup completes, lock releases. - 5. Fires request C and asserts 200. - """ - from backend.app.api.v1 import _test as _test_module + import backend.app.services.demo_seeding as svc_mod from backend.app.services import test_seeding + from backend.workers.demo_reseed import run_demo_reseed gate = threading.Event() cleanup_entered = threading.Event() - # Wrap the cleanup so the test learns the moment it's about to wait. - original_cleanup = _test_module._run_demo_reseed_cleanup + # The worker calls demo_seeding.run_demo_reseed_cleanup, which itself reads + # the demo_seeding module gate at :492 — patch the gate on demo_seeding, + # NOT _test. Wrap cleanup to learn when it's entered (before it blocks on + # the gate's to_thread offload). + original_cleanup = svc_mod.run_demo_reseed_cleanup async def gated_cleanup(*args: Any, **kwargs: Any) -> None: cleanup_entered.set() @@ -665,35 +735,119 @@ async def _fail_first_seed(*args: Any, **kwargs: Any) -> None: with ( patch.object(test_seeding, "seed_study_completed_with_digest", _fail_first_seed), - patch.object(_test_module, "seed_study_completed_with_digest", _fail_first_seed), - patch.object(_test_module, "_demo_reseed_cleanup_test_gate", gate), - patch.object(_test_module, "_run_demo_reseed_cleanup", gated_cleanup), + patch.object(svc_mod, "_demo_reseed_cleanup_test_gate", gate), + patch.object(svc_mod, "run_demo_reseed_cleanup", gated_cleanup), ): - # Step 1: kick off request A. - async with httpx.AsyncClient( - base_url=str(demo_reseed_client.base_url), timeout=180.0 - ) as client_a: - task_a = asyncio.create_task(client_a.post("/api/v1/_test/demo/reseed", json={})) - # Step 2: wait for A's handler to enter cleanup. - for _ in range(200): # up to 20s + # Also patch the worker's binding of run_demo_reseed_cleanup (imported + # into the worker module namespace at load). + with patch("backend.workers.demo_reseed.run_demo_reseed_cleanup", gated_cleanup): + # POST A seeds `running`; launch the worker as a background task so + # it holds the lock + blocks in cleanup on the gate. + resp_a = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp_a.status_code == 202, resp_a.text + task_a = asyncio.create_task(run_demo_reseed(arq_ctx)) + + for _ in range(200): # up to 20s; gate's to_thread offload frees the loop if cleanup_entered.is_set(): break await asyncio.sleep(0.1) - assert cleanup_entered.is_set(), "request A never entered cleanup within 20s" + assert cleanup_entered.is_set(), "worker A never entered cleanup within 20s" - # Step 3: fire B; assert 409 (A still holds the lock). - response_b = await demo_reseed_client.post( - "/api/v1/_test/demo/reseed", json={}, timeout=180.0 - ) - assert response_b.status_code == 409, response_b.text - assert response_b.json()["detail"]["error_code"] == "SEED_IN_PROGRESS" + # POST B while A holds the lock + a `running` status → 409. + resp_b = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp_b.status_code == 409, resp_b.text + assert resp_b.json()["detail"]["error_code"] == "SEED_IN_PROGRESS" - # Step 4: release A's cleanup. gate.set() - response_a = await task_a - assert response_a.status_code == 503, response_a.text + await task_a + + terminal_a = await _get_status(demo_reseed_client) + assert terminal_a["status"] == "failed", terminal_a + + # Third reseed (outside the patches) reaches complete. + await _clear_singleton_dedup_keys(arq_ctx["redis"]) + await _clear_status_key(arq_ctx["redis"]) + terminal_c = await post_and_run_to_terminal(demo_reseed_client, arq_ctx) + assert terminal_c["status"] == "complete", terminal_c + + +# --------------------------------------------------------------------------- +# AC-Async: polling transition running → complete, monotonic scenarios_completed. +# --------------------------------------------------------------------------- + + +async def test_polling_transition_running_to_complete_monotonic( + demo_reseed_client: httpx.AsyncClient, arq_ctx: dict[str, Any] +) -> None: + import backend.workers.demo_reseed as worker_mod + from backend.app.services.demo_seeding import status_set as real_status_set + + recorded: list[dict[str, Any]] = [] - # Step 5: fire C now that the lock is gone. (Outside the patches so - # the third reseed actually succeeds.) - response_c = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}, timeout=180.0) - assert response_c.status_code == 200, response_c.text + async def spy_status_set(redis: Any, status: ReseedStatusResponse) -> None: + recorded.append( + {"status": status.status, "scenarios_completed": status.scenarios_completed} + ) + await real_status_set(redis, status) + + # The worker's _redis_status_cb calls the demo_reseed module binding of + # status_set — patch THAT to capture every per-phase write (the single + # Redis key is overwritten, so start/end reads cannot prove monotonicity). + with patch.object(worker_mod, "status_set", spy_status_set): + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text + await worker_mod.run_demo_reseed(arq_ctx) + await _clear_singleton_dedup_keys(arq_ctx["redis"]) + + assert recorded, "no status writes captured" + assert recorded[0]["status"] == "running" + assert recorded[0]["scenarios_completed"] == 0 + completes = [r["scenarios_completed"] for r in recorded] + assert all(b >= a for a, b in zip(completes, completes[1:], strict=False)), ( + f"scenarios_completed not monotonic: {completes}" + ) + assert recorded[-1]["status"] == "complete", recorded[-1] + terminal = await _get_status(demo_reseed_client) + assert terminal["status"] == "complete" + assert terminal["summary"] is not None + assert terminal["scenarios_completed"] == terminal["scenarios_total"] + + +# --------------------------------------------------------------------------- +# AC-Reg: worker registration + enqueue guard. +# --------------------------------------------------------------------------- + + +async def test_worker_registration_and_enqueue_guard( + demo_reseed_client: httpx.AsyncClient, +) -> None: + from backend.app.main import app + from backend.workers.all import WorkerSettings + + names = {getattr(f, "coroutine", f).__name__ for f in WorkerSettings.functions} + assert "run_demo_reseed" in names, f"run_demo_reseed not registered; saw {sorted(names)}" + + calls: list[dict[str, Any]] = [] + pool = app.state.arq_pool + real_enqueue = pool.enqueue_job + + async def spy_enqueue(*args: Any, **kwargs: Any) -> Any: + calls.append({"args": args, "kwargs": kwargs}) + return await real_enqueue(*args, **kwargs) + + with patch.object(pool, "enqueue_job", spy_enqueue): + resp = await demo_reseed_client.post("/api/v1/_test/demo/reseed", json={}) + assert resp.status_code == 202, resp.text + assert len(calls) == 1, calls + assert calls[0]["args"][0] == "run_demo_reseed", calls[0] + assert calls[0]["kwargs"].get("_job_id") == "demo_reseed:singleton", calls[0] + + # This test enqueued a real (unconsumed) job — clear the dedup + status keys. + from backend.app.core.settings import get_settings + + redis: Redis = Redis.from_url(get_settings().redis_url, decode_responses=False) + try: + await _clear_singleton_dedup_keys(redis) + await _clear_status_key(redis) + finally: + await redis.aclose() diff --git a/backend/tests/integration/test_demo_seeding_timeout.py b/backend/tests/integration/test_demo_seeding_timeout.py index 56752d46..3d052681 100644 --- a/backend/tests/integration/test_demo_seeding_timeout.py +++ b/backend/tests/integration/test_demo_seeding_timeout.py @@ -2,30 +2,23 @@ # # SPDX-License-Identifier: Apache-2.0 -"""AC-4 isolated test: per-call timeout returns 503 SEED_FAILED. - -Per plan §3.2 Task 4, AC-4 lives in its own file with a function-scoped -uvicorn fixture because the ``httpx.ReadTimeout`` residual may leave a -server-side handler completing AFTER the test's 503 has returned, and -shared uvicorn instances across other tests would risk contamination. - -This file: - -1. Boots a fresh uvicorn for THIS test only. -2. Forces ``seed_study_completed_with_digest`` to sleep 5s. -3. Bypasses the Pydantic validator's ``ge=30`` lower bound via - ``model_construct`` to set the per-call timeout to 1s (per the plan's - "DO NOT weaken the production validator" rule). -4. Asserts the response is 503 SEED_FAILED. -5. Asserts via caplog that the ``demo_reseed_cleanup_truncated`` log - line is emitted — proving cleanup was attempted even though - post-cleanup emptiness is NOT guaranteed (per the spec's - ReadTimeout residual). +"""AC-4 isolated test: a worker-side per-call HTTP timeout drives terminal +``failed`` + cleanup. + +After ``bug_demo_reseed_fake_metric_regression`` made the reseed async, the +per-call timeout no longer surfaces on the POST (which returns 202 before any +self-call runs) — it lives in the worker. When a single self-call inside +``run_demo_reseed`` exceeds ``demo_reseed_per_call_http_timeout_s``, the +``httpx.ReadTimeout`` propagates, the worker's inner handler runs +``run_demo_reseed_cleanup`` under the held advisory lock, writes terminal +``failed`` to Redis, and returns. ``GET /status`` then reports ``failed`` with +a timeout-flavored ``failed_reason``. + +Function-scoped uvicorn so the ReadTimeout residual can't pollute siblings. """ from __future__ import annotations -import asyncio import logging import socket from collections.abc import AsyncIterator @@ -35,21 +28,19 @@ import httpx import pytest import pytest_asyncio +from redis.asyncio import Redis +from backend.app.services.demo_seeding import DEMO_RESEED_STATUS_KEY from backend.tests.conftest import postgres_reachable from backend.tests.integration._demo_reseed_uvicorn import running_uvicorn -pytestmark = [ - pytest.mark.integration, - pytest.mark.skip( - reason=( - "Sync-flow timeout test paused — bug_demo_reseed_fake_metric_regression " - "converted the reseed handler to async enqueue + poll. The " - "per-call HTTP timeout assertion no longer applies to the POST " - "handler; the timeout now lives in the worker." - ) - ), -] +pytestmark = [pytest.mark.integration] + +_SINGLETON_DEDUP_KEYS: tuple[str, ...] = ( + "arq:job:demo_reseed:singleton", + "arq:result:demo_reseed:singleton", + "arq:in-progress:demo_reseed:singleton", +) def _tcp_open(host: str, port: int, timeout: float = 0.5) -> bool: @@ -69,21 +60,30 @@ def _engine_reachable() -> bool: return False -if not postgres_reachable() or not _engine_reachable(): +def _redis_reachable() -> bool: + from urllib.parse import urlparse + + from backend.app.core.settings import get_settings + + try: + parsed = urlparse(get_settings().redis_url) + except Exception: # noqa: BLE001 + return False + port = parsed.port or 6379 + return _tcp_open(parsed.hostname or "127.0.0.1", port, 0.3) or _tcp_open("127.0.0.1", port, 0.3) + + +if not postgres_reachable() or not _engine_reachable() or not _redis_reachable(): pytest.skip( - "demo reseed timeout test requires Postgres + ES + OS service containers.", + "demo reseed timeout test requires Postgres + ES + OS + Redis service containers.", allow_module_level=True, ) @pytest.fixture(autouse=True) def _stub_cluster_credentials(tmp_path: Any) -> Any: - """Mount cluster_credentials.yaml so the cluster-create probe inside - the orchestrator can resolve ``local-es`` / ``local-opensearch``. - See the sibling fixture in ``test_demo_seeding.py`` for the - detailed rationale (uses CLUSTER_CREDENTIALS_FILE env var to - survive the autouse ``_clear_settings_caches`` reset). - """ + """Mount cluster_credentials.yaml + redirect the worker's API base URL to + the in-process uvicorn (mirrors test_demo_seeding.py).""" import os creds_file = tmp_path / "cluster_credentials.yaml" @@ -95,25 +95,27 @@ def _stub_cluster_credentials(tmp_path: Any) -> Any: " username: admin\n" " password: admin\n" ) - original = os.environ.get("CLUSTER_CREDENTIALS_FILE") + prev_creds = os.environ.get("CLUSTER_CREDENTIALS_FILE") + prev_base = os.environ.get("RELYLOOP_WORKER_API_BASE_URL") os.environ["CLUSTER_CREDENTIALS_FILE"] = str(creds_file) + os.environ["RELYLOOP_WORKER_API_BASE_URL"] = "http://127.0.0.1:8000" try: yield finally: - if original is None: - os.environ.pop("CLUSTER_CREDENTIALS_FILE", None) - else: - os.environ["CLUSTER_CREDENTIALS_FILE"] = original + for key, prev in ( + ("CLUSTER_CREDENTIALS_FILE", prev_creds), + ("RELYLOOP_WORKER_API_BASE_URL", prev_base), + ): + if prev is None: + os.environ.pop(key, None) + else: + os.environ[key] = prev @pytest.fixture(autouse=True) def _patch_engine_for_test_host() -> Any: - """Same patches as the sibling fixture in ``test_demo_seeding.py`` — - resolver + SCENARIOS base_url so the test process can reach ES/OS - + the cluster-create probe via 127.0.0.1 ports.""" import copy - import backend.app.api.v1._test as test_mod import backend.app.services.demo_seeding as svc_mod def passthrough(host_base_url: str) -> str: @@ -133,17 +135,13 @@ def passthrough(host_base_url: str) -> str: scenario["base_url"] = "http://127.0.0.1:9201" svc_mod.SCENARIOS = patched_scenarios try: - with ( - patch.object(svc_mod, "_resolve_engine_base_url", passthrough), - patch.object(test_mod, "_resolve_engine_base_url", passthrough), - ): + # _resolve_engine_base_url lives only in demo_seeding now. + with patch.object(svc_mod, "_resolve_engine_base_url", passthrough): yield finally: svc_mod.SCENARIOS = original_scenarios -# Function-scoped uvicorn — one fresh server per test in this file so the -# ReadTimeout residual cannot pollute later tests in the module. @pytest_asyncio.fixture async def demo_reseed_client_function_scoped() -> AsyncIterator[httpx.AsyncClient]: with running_uvicorn() as base_url: @@ -151,26 +149,48 @@ async def demo_reseed_client_function_scoped() -> AsyncIterator[httpx.AsyncClien yield client -async def test_reseed_per_call_timeout_returns_503( +@pytest_asyncio.fixture +async def arq_ctx() -> AsyncIterator[dict[str, Any]]: + from backend.app.core.settings import get_settings + + redis: Redis = Redis.from_url(get_settings().redis_url, decode_responses=False) + try: + await redis.ping() + except Exception as exc: # noqa: BLE001 + await redis.aclose() + pytest.skip(f"Redis not reachable for arq_ctx: {exc}") + # Clear any leaked status/dedup keys so the POST doesn't 409 and the + # enqueue isn't deduped. + await redis.delete(DEMO_RESEED_STATUS_KEY, *_SINGLETON_DEDUP_KEYS) + try: + yield {"redis": redis} + finally: + await redis.delete(DEMO_RESEED_STATUS_KEY, *_SINGLETON_DEDUP_KEYS) + await redis.aclose() + + +async def test_worker_per_call_timeout_drives_failed_and_cleanup( demo_reseed_client_function_scoped: httpx.AsyncClient, + arq_ctx: dict[str, Any], caplog: pytest.LogCaptureFixture, ) -> None: - """AC-4 — single self-call exceeds the per-call timeout → 503 SEED_FAILED. - - Implementation: - - * Monkeypatch ``test_seeding.seed_study_completed_with_digest`` to - ``asyncio.sleep(5)`` — guarantees the api-client self-call - exceeds our 1s ceiling. - * Patch the lru_cache'd ``Settings`` instance's - ``demo_reseed_per_call_http_timeout_s`` to 1 — bypasses the - ``ge=30`` validator without weakening the production rule. + """A self-call exceeding the per-call timeout → terminal ``failed`` + + cleanup, WITHOUT weakening the production ``ge=30`` validator. + + * ``seed_study_completed_with_digest`` sleeps 5s so the api-client + self-call exceeds the 1s ceiling. + * The per-call timeout is set to 1 via ``settings.__dict__`` on the + lru_cached instance (bypasses ``ge=30`` without changing the field), + restored in ``finally``. """ + import asyncio + from backend.app.api.v1 import _test as test_mod from backend.app.core.settings import get_settings from backend.app.services import test_seeding + from backend.workers.demo_reseed import run_demo_reseed - caplog.set_level(logging.INFO, logger="backend.app.api.v1._test") + caplog.set_level(logging.INFO, logger="backend.app.services.demo_seeding") async def _slow_seed(*args: Any, **kwargs: Any) -> None: await asyncio.sleep(5) @@ -183,24 +203,25 @@ async def _slow_seed(*args: Any, **kwargs: Any) -> None: patch.object(test_seeding, "seed_study_completed_with_digest", _slow_seed), patch.object(test_mod, "seed_study_completed_with_digest", _slow_seed), ): - response = await demo_reseed_client_function_scoped.post( - "/api/v1/_test/demo/reseed", json={}, timeout=60.0 + resp = await demo_reseed_client_function_scoped.post( + "/api/v1/_test/demo/reseed", json={} ) + assert resp.status_code == 202, resp.text + # Inner handler catches the ReadTimeout → cleanup → failed → returns. + await run_demo_reseed(arq_ctx) finally: if original_timeout is None: settings.__dict__.pop("demo_reseed_per_call_http_timeout_s", None) else: settings.__dict__["demo_reseed_per_call_http_timeout_s"] = original_timeout - assert response.status_code == 503, response.text - body = response.json() - assert body["detail"]["error_code"] == "SEED_FAILED" - assert body["detail"]["retryable"] is True - # Cycle-10 finding A1 — cleanup MUST have been attempted. - cleanup_messages = [ - r.message for r in caplog.records if r.message == "demo_reseed_cleanup_truncated" - ] - assert cleanup_messages, ( - "expected demo_reseed_cleanup_truncated in caplog — cleanup pass " - "was not entered after the timeout path" + status_resp = await demo_reseed_client_function_scoped.get("/api/v1/_test/demo/reseed/status") + assert status_resp.status_code == 200, status_resp.text + terminal = status_resp.json() + assert terminal["status"] == "failed", terminal + # failed_reason is f"{type(exc).__name__}: ..." — httpx timeout classes all + # contain "Timeout" (ReadTimeout / ConnectTimeout / PoolTimeout). + assert "Timeout" in (terminal["failed_reason"] or ""), terminal + assert any(r.message == "demo_reseed_cleanup_truncated" for r in caplog.records), ( + f"no cleanup log; saw {[r.message for r in caplog.records][-15:]}" ) diff --git a/backend/workers/demo_reseed.py b/backend/workers/demo_reseed.py index 234ef88a..8e7e0558 100644 --- a/backend/workers/demo_reseed.py +++ b/backend/workers/demo_reseed.py @@ -180,7 +180,7 @@ async def run_demo_reseed(ctx: dict[str, Any]) -> None: # because it ran INSIDE the API container — same loopback.) async with ( httpx.AsyncClient( - base_url="http://api:8000", + base_url=settings.relyloop_worker_api_base_url, timeout=timeout, ) as api_client, httpx.AsyncClient(timeout=timeout) as engine_client, diff --git a/docs/03_runbooks/local-dev.md b/docs/03_runbooks/local-dev.md index 0e8e584f..f6fb4b18 100644 --- a/docs/03_runbooks/local-dev.md +++ b/docs/03_runbooks/local-dev.md @@ -284,6 +284,11 @@ Common causes: - **Database state.** `make test-integration` runs against the live Compose Postgres. Reset with `make reset` (destructive — drops volumes) if migrations diverge. +- **Demo-reseed worker self-call URL.** The demo-reseed Arq worker reaches the + API via `Settings.relyloop_worker_api_base_url` (default `http://api:8000`, + the Compose alias). The demo-reseed integration harness + (`test_demo_seeding.py`) overrides it to `http://127.0.0.1:8000` because it + boots an in-process uvicorn on the test host. - **Pre-commit hooks.** Run `make pre-commit` before pushing — CI runs the same ruff/format-check/mypy gates and will reject formatting drift.