Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion agentex/src/utils/authorization_shortcuts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from fastapi import Depends, Path, Query, Request

from src.adapters.authorization.exceptions import AuthorizationError
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
Expand All @@ -15,7 +17,10 @@
from src.domain.repositories.task_message_repository import DTaskMessageRepository
from src.domain.repositories.task_repository import DTaskRepository
from src.domain.repositories.task_state_repository import DTaskStateRepository
from src.domain.services.authorization_service import DAuthorizationService
from src.domain.services.authorization_service import (
AuthorizationService,
DAuthorizationService,
)
from src.utils.agent_api_key_authorization import _check_api_key_or_collapse_to_404
from src.utils.task_authorization import check_task_or_collapse_to_404

Expand All @@ -39,6 +44,29 @@ async def _get_parent_task_id(
return resource.task_id


async def _check_agent_or_collapse_to_404(
authorization: AuthorizationService,
agent_id: str,
operation: AuthorizedOperationType,
) -> None:
"""Check an agent resource; collapse any denial to 404.

Collapsing a denied check into 404 stops callers from distinguishing 403
(exists, no access) from 404 (absent) and thereby probing whether an agent
exists in another tenant. Mirrors the task/api_key collapse helpers.

TODO: fold this into a single generic collapse helper, and restore the
403/404 split once agents carry tenant scope at the data layer.
"""
try:
await authorization.check(
resource=AgentexResource.agent(agent_id),
operation=operation,
)
except AuthorizationError:
raise ItemDoesNotExist(f"Item with id '{agent_id}' does not exist.") from None


def DAuthorizedId(
resource_type: AgentexResourceType | TaskChildResourceType,
operation: AuthorizedOperationType = AuthorizedOperationType.read,
Expand Down Expand Up @@ -74,6 +102,9 @@ async def _ensure_authorized_id(
await _check_api_key_or_collapse_to_404(
authorization, resource_id, operation
)
elif resource_type == AgentexResourceType.agent:
# Collapse agent denials to 404 for the same discoverability reason.
await _check_agent_or_collapse_to_404(authorization, resource_id, operation)
else:
await authorization.check(
resource=AgentexResource(type=resource_type, selector=resource_id),
Expand Down Expand Up @@ -200,6 +231,9 @@ async def _ensure_authorized_name(
# "present in another tenant" from "absent" (tasks.name is globally
# unique — any 403 leak here probes the whole system, not a tenant).
await check_task_or_collapse_to_404(authorization, resource.id, operation)
elif resource_type == AgentexResourceType.agent:
# Agents: same collapse, on the id resolved from the name above.
await _check_agent_or_collapse_to_404(authorization, resource.id, operation)
else:
await authorization.check(
resource=AgentexResource(type=resource_type, selector=resource.id),
Expand Down
224 changes: 224 additions & 0 deletions agentex/tests/unit/api/test_agents_authz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""Agent direct-route authorization: denied reads/deletes collapse to 404.

Asserts the route layer issues the right ``check`` calls and collapses a denial
to 404 (so callers can't probe cross-tenant existence), and that list filtering
forwards the authorized-id set to the use case.
"""

from __future__ import annotations

from unittest.mock import AsyncMock, MagicMock

import pytest
from src.adapters.authorization.exceptions import AuthorizationError
from src.adapters.crud_store.exceptions import ItemDoesNotExist
from src.api.schemas.authorization_types import (
AgentexResource,
AgentexResourceType,
AuthorizedOperationType,
)
from src.utils.authorization_shortcuts import (
DAuthorizedId,
DAuthorizedName,
_check_agent_or_collapse_to_404,
)


def _dep_callable(annotation):
"""Pull the inner FastAPI dependency function out of an ``Annotated[str, Depends(...)]``."""
return annotation.__metadata__[0].dependency


@pytest.mark.unit
@pytest.mark.asyncio
class TestCheckAgentOrCollapseTo404:
"""Helper collapses every denial to 404 (no cross-tenant existence leak)."""

async def test_allowed_check_returns_normally(self):
authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

await _check_agent_or_collapse_to_404(
authorization,
"agent-1",
AuthorizedOperationType.read,
)

authorization.check.assert_awaited_once()
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-1")
assert called_kwargs["operation"] == AuthorizedOperationType.read

async def test_denied_collapses_to_not_found(self):
authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await _check_agent_or_collapse_to_404(
authorization,
"agent-1",
AuthorizedOperationType.delete,
)

async def test_operation_forwarded_verbatim(self):
authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

await _check_agent_or_collapse_to_404(
authorization,
"agent-2",
AuthorizedOperationType.delete,
)

called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["operation"] == AuthorizedOperationType.delete


@pytest.mark.unit
@pytest.mark.asyncio
class TestDAuthorizedIdAgentWrap:
"""``DAuthorizedId(agent, ...)`` routes through the collapse wrap → 404 on denial."""

async def test_agent_id_returns_resource_id_when_allowed(self):
annotation = DAuthorizedId(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

result = await dep(
authorization, MagicMock(), MagicMock(), MagicMock(), "agent-9"
)

assert result == "agent-9"
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-9")
assert called_kwargs["operation"] == AuthorizedOperationType.read

async def test_agent_id_routes_through_wrap_on_denial(self):
annotation = DAuthorizedId(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-7")

async def test_agent_delete_op_propagated_to_check(self):
annotation = DAuthorizedId(
AgentexResourceType.agent, AuthorizedOperationType.delete
)
dep = _dep_callable(annotation)

authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

await dep(authorization, MagicMock(), MagicMock(), MagicMock(), "agent-del")

called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["operation"] == AuthorizedOperationType.delete


@pytest.mark.unit
@pytest.mark.asyncio
class TestDAuthorizedNameAgentWrap:
"""``DAuthorizedName(agent, ...)``: lookup-then-collapse on the resolved id."""

async def test_present_but_denied_collapses_to_404(self):
annotation = DAuthorizedName(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

agent_repository = MagicMock()
agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-resolved"))
task_repository = MagicMock()
authorization = MagicMock()
authorization.check = AsyncMock(side_effect=AuthorizationError("denied"))

with pytest.raises(ItemDoesNotExist):
await dep(authorization, agent_repository, task_repository, "prod-agent")

# Name was resolved to an id BEFORE the authz check intercepted the denial.
agent_repository.get.assert_awaited_once_with(name="prod-agent")
called_kwargs = authorization.check.await_args.kwargs
assert called_kwargs["resource"] == AgentexResource.agent("agent-resolved")

async def test_absent_name_surfaces_native_404_without_checking(self):
annotation = DAuthorizedName(
AgentexResourceType.agent, AuthorizedOperationType.read
)
dep = _dep_callable(annotation)

agent_repository = MagicMock()
agent_repository.get = AsyncMock(side_effect=ItemDoesNotExist("absent"))
task_repository = MagicMock()
authorization = MagicMock()
authorization.check = AsyncMock(return_value=True)

with pytest.raises(ItemDoesNotExist):
await dep(authorization, agent_repository, task_repository, "missing-agent")

# Truly-absent name → repo's native 404, and no authz check is attempted.
authorization.check.assert_not_awaited()


@pytest.mark.unit
@pytest.mark.asyncio
class TestListFiltering:
"""List forwards the authorized-id set to the use case (SQL-layer filtering)."""

async def test_authorized_ids_pushed_into_use_case(self):
from src.api.routes.agents import list_agents

agents_use_case = MagicMock()
agents_use_case.list = AsyncMock(return_value=[])

await list_agents(
agents_use_case=agents_use_case,
_authorized_ids=["agent-a", "agent-c"],
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
)

agents_use_case.list.assert_awaited_once_with(
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
id=["agent-a", "agent-c"],
)

async def test_none_authorized_ids_passes_through_unfiltered(self):
from src.api.routes.agents import list_agents

agents_use_case = MagicMock()
agents_use_case.list = AsyncMock(return_value=[])

await list_agents(
agents_use_case=agents_use_case,
_authorized_ids=None,
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
)

# ``None`` (authz bypass / disabled) means no id filter is applied.
agents_use_case.list.assert_awaited_once_with(
task_id=None,
limit=50,
page_number=1,
order_by=None,
order_direction="desc",
)
10 changes: 6 additions & 4 deletions agentex/tests/unit/api/test_tasks_authz.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,9 +428,11 @@ async def test_task_name_returns_resource_name_when_allowed(self):
# The selector is the resolved row id, not the user-provided name.
assert called_kwargs["resource"] == AgentexResource.task("task-allow")

async def test_non_task_name_skips_wrap(self):
"""Agent FGAC is out of scope for AGX1-264 — the wrap must NOT extend
to agent name-routes, which keep the existing 403-on-deny behavior."""
async def test_agent_name_collapses_to_404(self):
"""Agent name-routes now collapse a denial to 404 (their own wrap), same
as tasks. Full agent coverage lives in test_agents_authz.py; this guards
that the shared name dependency routes agents through a collapse, not the
generic 403 path."""
annotation = DAuthorizedName(
AgentexResourceType.agent, AuthorizedOperationType.read
)
Expand All @@ -442,5 +444,5 @@ async def test_non_task_name_skips_wrap(self):
agent_repository.get = AsyncMock(return_value=MagicMock(id="agent-1"))
task_repository = MagicMock()

with pytest.raises(AuthorizationError):
with pytest.raises(ItemDoesNotExist):
await dep(authorization, agent_repository, task_repository, "agent-name")
Loading