From f7e134b5635c553fdcff304593626bb8d270cdf0 Mon Sep 17 00:00:00 2001 From: Asher Fink Date: Tue, 2 Jun 2026 22:54:37 -0400 Subject: [PATCH] feat(agents): collapse denied agent route access to 404 instead of 403 Direct agent-by-id and agent-by-name routes now return 404 instead of 403 when the caller isn't authorized for the agent, matching the existing behavior for tasks and api keys. A 403 on a specific id or name reveals that the agent exists, so collapsing denials to 404 stops callers from probing for agents in other tenants. The name routes resolve the name to an id first, so a genuinely missing name still surfaces the normal repository 404. Ticket: AGX1-242 --- agentex/src/utils/authorization_shortcuts.py | 36 ++- agentex/tests/unit/api/test_agents_authz.py | 224 +++++++++++++++++++ agentex/tests/unit/api/test_tasks_authz.py | 10 +- 3 files changed, 265 insertions(+), 5 deletions(-) create mode 100644 agentex/tests/unit/api/test_agents_authz.py diff --git a/agentex/src/utils/authorization_shortcuts.py b/agentex/src/utils/authorization_shortcuts.py index 395e6dc1..1229da72 100644 --- a/agentex/src/utils/authorization_shortcuts.py +++ b/agentex/src/utils/authorization_shortcuts.py @@ -2,6 +2,8 @@ from fastapi import Depends, Path, Query, Request +from src.adapters.authorization.exceptions import AuthorizationError +from src.adapters.crud_store.exceptions import ItemDoesNotExist from src.api.schemas.authorization_types import ( AgentexResource, AgentexResourceType, @@ -15,7 +17,10 @@ from src.domain.repositories.task_message_repository import DTaskMessageRepository from src.domain.repositories.task_repository import DTaskRepository from src.domain.repositories.task_state_repository import DTaskStateRepository -from src.domain.services.authorization_service import DAuthorizationService +from src.domain.services.authorization_service import ( + AuthorizationService, + DAuthorizationService, +) from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404 from src.utils.task_authorization import check_task_or_collapse_to_404 @@ -39,6 +44,29 @@ async def _get_parent_task_id( return resource.task_id +async def _check_agent_or_collapse_to_404( + authorization: AuthorizationService, + agent_id: str, + operation: AuthorizedOperationType, +) -> None: + """Check an agent resource; collapse any denial to 404. + + Collapsing a denied check into 404 stops callers from distinguishing 403 + (exists, no access) from 404 (absent) and thereby probing whether an agent + exists in another tenant. Mirrors the task/api_key collapse helpers. + + TODO: fold this into a single generic collapse helper, and restore the + 403/404 split once agents carry tenant scope at the data layer. + """ + try: + await authorization.check( + resource=AgentexResource.agent(agent_id), + operation=operation, + ) + except AuthorizationError: + raise ItemDoesNotExist(f"Item with id '{agent_id}' does not exist.") from None + + def DAuthorizedId( resource_type: AgentexResourceType | TaskChildResourceType, operation: AuthorizedOperationType = AuthorizedOperationType.read, @@ -74,6 +102,9 @@ async def _ensure_authorized_id( await _check_api_key_or_collapse_to_404( authorization, resource_id, operation ) + elif resource_type == AgentexResourceType.agent: + # Collapse agent denials to 404 for the same discoverability reason. + await _check_agent_or_collapse_to_404(authorization, resource_id, operation) else: await authorization.check( resource=AgentexResource(type=resource_type, selector=resource_id), @@ -200,6 +231,9 @@ async def _ensure_authorized_name( # "present in another tenant" from "absent" (tasks.name is globally # unique — any 403 leak here probes the whole system, not a tenant). await check_task_or_collapse_to_404(authorization, resource.id, operation) + elif resource_type == AgentexResourceType.agent: + # Agents: same collapse, on the id resolved from the name above. + await _check_agent_or_collapse_to_404(authorization, resource.id, operation) else: await authorization.check( resource=AgentexResource(type=resource_type, selector=resource.id), diff --git a/agentex/tests/unit/api/test_agents_authz.py b/agentex/tests/unit/api/test_agents_authz.py new file mode 100644 index 00000000..e65e9070 --- /dev/null +++ b/agentex/tests/unit/api/test_agents_authz.py @@ -0,0 +1,224 @@ +"""Agent direct-route authorization: denied reads/deletes collapse to 404. + +Asserts the route layer issues the right ``check`` calls and collapses a denial +to 404 (so callers can't probe cross-tenant existence), and that list filtering +forwards the authorized-id set to the use case. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest +from src.adapters.authorization.exceptions import AuthorizationError +from src.adapters.crud_store.exceptions import ItemDoesNotExist +from src.api.schemas.authorization_types import ( + AgentexResource, + AgentexResourceType, + AuthorizedOperationType, +) +from src.utils.authorization_shortcuts import ( + DAuthorizedId, + DAuthorizedName, + _check_agent_or_collapse_to_404, +) + + +def _dep_callable(annotation): + """Pull the inner FastAPI dependency function out of an ``Annotated[str, Depends(...)]``.""" + return annotation.__metadata__[0].dependency + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestCheckAgentOrCollapseTo404: + """Helper collapses every denial to 404 (no cross-tenant existence leak).""" + + async def test_allowed_check_returns_normally(self): + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await _check_agent_or_collapse_to_404( + authorization, + "agent-1", + AuthorizedOperationType.read, + ) + + authorization.check.assert_awaited_once() + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-1") + assert called_kwargs["operation"] == AuthorizedOperationType.read + + async def test_denied_collapses_to_not_found(self): + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await _check_agent_or_collapse_to_404( + authorization, + "agent-1", + AuthorizedOperationType.delete, + ) + + async def test_operation_forwarded_verbatim(self): + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await _check_agent_or_collapse_to_404( + authorization, + "agent-2", + AuthorizedOperationType.delete, + ) + + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestDAuthorizedIdAgentWrap: + """``DAuthorizedId(agent, ...)`` routes through the collapse wrap → 404 on denial.""" + + async def test_agent_id_returns_resource_id_when_allowed(self): + annotation = DAuthorizedId( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + result = await dep( + authorization, MagicMock(), MagicMock(), MagicMock(), "agent-9" + ) + + assert result == "agent-9" + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-9") + assert called_kwargs["operation"] == AuthorizedOperationType.read + + async def test_agent_id_routes_through_wrap_on_denial(self): + annotation = DAuthorizedId( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-7") + + async def test_agent_delete_op_propagated_to_check(self): + annotation = DAuthorizedId( + AgentexResourceType.agent, AuthorizedOperationType.delete + ) + dep = _dep_callable(annotation) + + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-del") + + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["operation"] == AuthorizedOperationType.delete + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestDAuthorizedNameAgentWrap: + """``DAuthorizedName(agent, ...)``: lookup-then-collapse on the resolved id.""" + + async def test_present_but_denied_collapses_to_404(self): + annotation = DAuthorizedName( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + agent_repository = MagicMock() + agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-resolved")) + task_repository = MagicMock() + authorization = MagicMock() + authorization.check = AsyncMock(side_effect=AuthorizationError("denied")) + + with pytest.raises(ItemDoesNotExist): + await dep(authorization, agent_repository, task_repository, "prod-agent") + + # Name was resolved to an id BEFORE the authz check intercepted the denial. + agent_repository.get.assert_awaited_once_with(name="prod-agent") + called_kwargs = authorization.check.await_args.kwargs + assert called_kwargs["resource"] == AgentexResource.agent("agent-resolved") + + async def test_absent_name_surfaces_native_404_without_checking(self): + annotation = DAuthorizedName( + AgentexResourceType.agent, AuthorizedOperationType.read + ) + dep = _dep_callable(annotation) + + agent_repository = MagicMock() + agent_repository.get = AsyncMock(side_effect=ItemDoesNotExist("absent")) + task_repository = MagicMock() + authorization = MagicMock() + authorization.check = AsyncMock(return_value=True) + + with pytest.raises(ItemDoesNotExist): + await dep(authorization, agent_repository, task_repository, "missing-agent") + + # Truly-absent name → repo's native 404, and no authz check is attempted. + authorization.check.assert_not_awaited() + + +@pytest.mark.unit +@pytest.mark.asyncio +class TestListFiltering: + """List forwards the authorized-id set to the use case (SQL-layer filtering).""" + + async def test_authorized_ids_pushed_into_use_case(self): + from src.api.routes.agents import list_agents + + agents_use_case = MagicMock() + agents_use_case.list = AsyncMock(return_value=[]) + + await list_agents( + agents_use_case=agents_use_case, + _authorized_ids=["agent-a", "agent-c"], + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + ) + + agents_use_case.list.assert_awaited_once_with( + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + id=["agent-a", "agent-c"], + ) + + async def test_none_authorized_ids_passes_through_unfiltered(self): + from src.api.routes.agents import list_agents + + agents_use_case = MagicMock() + agents_use_case.list = AsyncMock(return_value=[]) + + await list_agents( + agents_use_case=agents_use_case, + _authorized_ids=None, + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + ) + + # ``None`` (authz bypass / disabled) means no id filter is applied. + agents_use_case.list.assert_awaited_once_with( + task_id=None, + limit=50, + page_number=1, + order_by=None, + order_direction="desc", + ) diff --git a/agentex/tests/unit/api/test_tasks_authz.py b/agentex/tests/unit/api/test_tasks_authz.py index 47926413..6595462d 100644 --- a/agentex/tests/unit/api/test_tasks_authz.py +++ b/agentex/tests/unit/api/test_tasks_authz.py @@ -428,9 +428,11 @@ async def test_task_name_returns_resource_name_when_allowed(self): # The selector is the resolved row id, not the user-provided name. assert called_kwargs["resource"] == AgentexResource.task("task-allow") - async def test_non_task_name_skips_wrap(self): - """Agent FGAC is out of scope for AGX1-264 — the wrap must NOT extend - to agent name-routes, which keep the existing 403-on-deny behavior.""" + async def test_agent_name_collapses_to_404(self): + """Agent name-routes now collapse a denial to 404 (their own wrap), same + as tasks. Full agent coverage lives in test_agents_authz.py; this guards + that the shared name dependency routes agents through a collapse, not the + generic 403 path.""" annotation = DAuthorizedName( AgentexResourceType.agent, AuthorizedOperationType.read ) @@ -442,5 +444,5 @@ async def test_non_task_name_skips_wrap(self): agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-1")) task_repository = MagicMock() - with pytest.raises(AuthorizationError): + with pytest.raises(ItemDoesNotExist): await dep(authorization, agent_repository, task_repository, "agent-name")