Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions agentex/src/api/authentication_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import OrderedDict
from typing import Any

from src.utils.cache_metrics import record_cache_access, record_cache_eviction
from src.utils.logging import make_logger

logger = make_logger(__name__)
Expand All @@ -27,14 +28,16 @@
class AsyncTTLCache:
"""Async-safe TTL cache implementation using OrderedDict with asyncio locks."""

def __init__(self, max_size: int = 1000, ttl_seconds: int = 300):
def __init__(self, name: str, max_size: int = 1000, ttl_seconds: int = 300):
"""
Initialize async-safe TTL cache.

Args:
name: Logical cache name, used as the ``cache`` tag on emitted metrics
max_size: Maximum number of entries in the cache
ttl_seconds: Time-to-live for cache entries in seconds
"""
self.name = name
self.cache: OrderedDict[str, tuple[Any, float]] = OrderedDict()
self.max_size = max_size
self.ttl_seconds = ttl_seconds
Expand All @@ -44,17 +47,26 @@ async def get(self, key: str) -> Any | None:
"""Get value from cache if it exists and hasn't expired."""
async with self._lock:
if key not in self.cache:
# MISS: the key was never cached (or was already evicted). In a
# load test this dominating means the key never repeats (unique
# creds/cookies per request) or the cache is cold per-worker.
record_cache_access(self.name, "miss_absent")
return None

value, timestamp = self.cache[key]

# Check if entry has expired
if time.time() - timestamp > self.ttl_seconds:
del self.cache[key]
# MISS: the key was present but past its TTL. This dominating
# means the TTL is too short for the request rate (churn).
record_cache_access(self.name, "miss_expired")
return None

# Move to end (most recently used)
self.cache.move_to_end(key)
# HIT: present and fresh.
record_cache_access(self.name, "hit")
return value
Comment thread
smoreinis marked this conversation as resolved.

async def set(self, key: str, value: Any) -> None:
Expand All @@ -63,6 +75,7 @@ async def set(self, key: str, value: Any) -> None:
# Remove oldest entry if cache is full
if len(self.cache) >= self.max_size and key not in self.cache:
self.cache.popitem(last=False)
record_cache_eviction(self.name)
Comment thread
smoreinis marked this conversation as resolved.

self.cache[key] = (value, time.time())
self.cache.move_to_end(key)
Expand Down Expand Up @@ -117,12 +130,20 @@ def __init__(
authorization_cache_ttl: TTL for authorization checks in seconds
max_cache_size: Maximum number of entries per cache
"""
# Separate async-safe caches for different authentication types
self.agent_identity_cache = AsyncTTLCache(max_cache_size, agent_cache_ttl)
self.agent_api_key_cache = AsyncTTLCache(max_cache_size, agent_cache_ttl)
self.auth_gateway_cache = AsyncTTLCache(max_cache_size, auth_gateway_cache_ttl)
# Separate async-safe caches for different authentication types.
# The name is used as the ``cache`` tag on emitted metrics so hit rate
# can be broken down per flow.
self.agent_identity_cache = AsyncTTLCache(
"agent_identity", max_cache_size, agent_cache_ttl
)
self.agent_api_key_cache = AsyncTTLCache(
"agent_api_key", max_cache_size, agent_cache_ttl
)
self.auth_gateway_cache = AsyncTTLCache(
"auth_gateway", max_cache_size, auth_gateway_cache_ttl
)
self.authorization_check_cache = AsyncTTLCache(
max_cache_size, authorization_cache_ttl
"authorization_check", max_cache_size, authorization_cache_ttl
)

logger.info(
Expand Down
114 changes: 114 additions & 0 deletions agentex/src/utils/cache_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
"""
Metrics instrumentation for the in-process authentication/authorization caches.

Mirrors the dual-emit pattern in ``src/utils/db_metrics.py``:

- When an OTLP endpoint is configured (``OTEL_EXPORTER_OTLP_ENDPOINT``), counters
are recorded through the OpenTelemetry SDK.
- When the Datadog Agent is reachable (``DD_AGENT_HOST``), the same events are
emitted as StatsD counters.
- When neither is configured, every function here is a cheap no-op.

The goal is to make cache effectiveness observable: hit rate per cache, the
reason for misses (expired vs never-seen), and capacity-driven evictions. These
are exactly the signals needed to tell whether a low hit rate is a TTL problem,
a key-cardinality problem, or a capacity problem.
"""

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Literal

from datadog import statsd

from src.utils.logging import make_logger
from src.utils.otel_metrics import get_meter

if TYPE_CHECKING:
from opentelemetry.metrics import Counter

logger = make_logger(__name__)

# StatsD is only emitted if the Datadog Agent host is configured.
_STATSD_ENABLED = bool(os.environ.get("DD_AGENT_HOST"))

# Outcome of a single cache read. "hit" = present and fresh; "miss_expired" =
# present but past its TTL (TTL too short / churn); "miss_absent" = never cached
# or evicted (cold cache, key never repeats, or capacity eviction).
CacheResult = Literal["hit", "miss_expired", "miss_absent"]

# Lazily-created OTel instruments (created once, on first use).
_access_counter: Counter | None = None
_eviction_counter: Counter | None = None
_instruments_initialized = False


def _ensure_instruments() -> None:
"""Create OTel counters on first use. No-op if OTel is not configured."""
global _access_counter, _eviction_counter, _instruments_initialized

if _instruments_initialized:
return
_instruments_initialized = True

meter = get_meter("agentex.auth_cache")
if meter is None:
# OTel not configured; OTel path stays disabled. StatsD may still emit.
return

_access_counter = meter.create_counter(
name="auth_cache.access",
description="Authentication/authorization cache reads, tagged by cache and result",
unit="{access}",
)
_eviction_counter = meter.create_counter(
name="auth_cache.eviction",
description="LRU evictions from authentication/authorization caches",
unit="{eviction}",
)


def record_cache_access(cache_name: str, result: CacheResult) -> None:
"""
Record a single cache read.

Args:
cache_name: Logical cache name (e.g. "auth_gateway", "agent_api_key").
result: One of "hit", "miss_expired", "miss_absent".

Never raises: emission failures (e.g. a StatsD UDP socket error or an OTel
SDK fault) are swallowed so instrumentation can never disrupt a caller on
the critical auth path.
"""
try:
_ensure_instruments()

if _access_counter is not None:
_access_counter.add(1, {"cache": cache_name, "result": result})

if _STATSD_ENABLED:
statsd.increment(
"auth_cache.access",
tags=[f"cache:{cache_name}", f"result:{result}"],
)
except Exception:
logger.debug("Failed to emit auth_cache.access metric", exc_info=True)


def record_cache_eviction(cache_name: str) -> None:
"""
Record a single capacity-driven (LRU) eviction.

Never raises: see ``record_cache_access``.
"""
try:
_ensure_instruments()

if _eviction_counter is not None:
_eviction_counter.add(1, {"cache": cache_name})

if _STATSD_ENABLED:
statsd.increment("auth_cache.eviction", tags=[f"cache:{cache_name}"])
except Exception:
logger.debug("Failed to emit auth_cache.eviction metric", exc_info=True)
92 changes: 92 additions & 0 deletions agentex/tests/unit/api/test_authentication_cache_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Tests for cache hit/miss/eviction instrumentation in ``authentication_cache``.

Asserts that ``AsyncTTLCache`` records the correct metric for each of the three
read outcomes (hit / miss_expired / miss_absent) and emits an eviction metric on
capacity-driven LRU eviction. The emission backend (OTel / StatsD) is covered
separately; here we patch the recorder functions at their import site in
``authentication_cache`` and assert the calls.
"""

from __future__ import annotations

from unittest.mock import patch

import pytest
from src.api.authentication_cache import AsyncTTLCache, AuthenticationCache


@pytest.mark.unit
@pytest.mark.asyncio
class TestAsyncTTLCacheMetrics:
async def test_hit_records_hit(self):
cache = AsyncTTLCache(name="agent_api_key", ttl_seconds=300)
await cache.set("k", "v")

with patch("src.api.authentication_cache.record_cache_access") as record_access:
result = await cache.get("k")

assert result == "v"
record_access.assert_called_once_with("agent_api_key", "hit")

async def test_absent_key_records_miss_absent(self):
cache = AsyncTTLCache(name="auth_gateway", ttl_seconds=300)

with patch("src.api.authentication_cache.record_cache_access") as record_access:
result = await cache.get("never-set")

assert result is None
record_access.assert_called_once_with("auth_gateway", "miss_absent")

async def test_expired_entry_records_miss_expired(self):
cache = AsyncTTLCache(name="auth_gateway", ttl_seconds=60)
await cache.set("k", "v")
# Force expiry deterministically by backdating the stored timestamp,
# avoiding any reliance on wall-clock timing in the test.
value, _ = cache.cache["k"]
cache.cache["k"] = (value, 0.0)

with patch("src.api.authentication_cache.record_cache_access") as record_access:
result = await cache.get("k")

assert result is None
record_access.assert_called_once_with("auth_gateway", "miss_expired")
# The expired entry should also have been purged from the cache.
assert "k" not in cache.cache

async def test_capacity_eviction_records_eviction(self):
cache = AsyncTTLCache(name="authorization_check", max_size=1, ttl_seconds=300)
await cache.set("first", "v1")

with patch(
"src.api.authentication_cache.record_cache_eviction"
) as record_eviction:
# Inserting a second distinct key evicts the oldest (LRU).
await cache.set("second", "v2")

record_eviction.assert_called_once_with("authorization_check")
assert "first" not in cache.cache
assert "second" in cache.cache

async def test_overwriting_existing_key_does_not_evict(self):
cache = AsyncTTLCache(name="authorization_check", max_size=1, ttl_seconds=300)
await cache.set("k", "v1")

with patch(
"src.api.authentication_cache.record_cache_eviction"
) as record_eviction:
# Re-setting an existing key is an update, not a capacity eviction.
await cache.set("k", "v2")

record_eviction.assert_not_called()
assert await cache.get("k") == "v2"


@pytest.mark.unit
def test_authentication_cache_assigns_distinct_names():
"""Each sub-cache carries the name used as the ``cache`` metric tag."""
cache = AuthenticationCache()

assert cache.agent_identity_cache.name == "agent_identity"
assert cache.agent_api_key_cache.name == "agent_api_key"
assert cache.auth_gateway_cache.name == "auth_gateway"
assert cache.authorization_check_cache.name == "authorization_check"
75 changes: 75 additions & 0 deletions agentex/tests/unit/utils/test_cache_metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""Tests for the auth-cache metrics emitter.

Covers the two paths that matter operationally: the no-op path (neither OTel nor
StatsD configured, which is the default in tests and local dev) must never
raise, and the StatsD path must emit a counter with the expected name and tags.
"""

from __future__ import annotations

from unittest.mock import patch

import pytest
from src.utils import cache_metrics


@pytest.mark.unit
def test_record_functions_are_noop_when_unconfigured():
# With no OTLP endpoint and no DD_AGENT_HOST, both calls must be harmless.
with (
patch.object(cache_metrics, "_STATSD_ENABLED", False),
patch.object(cache_metrics, "_access_counter", None),
patch.object(cache_metrics, "_eviction_counter", None),
patch.object(cache_metrics, "_instruments_initialized", True),
):
cache_metrics.record_cache_access("auth_gateway", "hit")
cache_metrics.record_cache_eviction("auth_gateway")


@pytest.mark.unit
def test_record_functions_swallow_emission_errors():
# A failing backend must never propagate to the caller (critical auth path).
with (
patch.object(cache_metrics, "_STATSD_ENABLED", True),
patch.object(cache_metrics, "_instruments_initialized", True),
patch.object(cache_metrics, "_access_counter", None),
patch.object(cache_metrics, "_eviction_counter", None),
patch.object(cache_metrics, "statsd") as mock_statsd,
):
mock_statsd.increment.side_effect = OSError("socket in a bad state")

# Neither call should raise despite the backend blowing up.
cache_metrics.record_cache_access("auth_gateway", "hit")
cache_metrics.record_cache_eviction("auth_gateway")


@pytest.mark.unit
def test_record_cache_access_emits_statsd_when_enabled():
with (
patch.object(cache_metrics, "_STATSD_ENABLED", True),
patch.object(cache_metrics, "_instruments_initialized", True),
patch.object(cache_metrics, "_access_counter", None),
patch.object(cache_metrics, "statsd") as mock_statsd,
):
cache_metrics.record_cache_access("auth_gateway", "miss_absent")

mock_statsd.increment.assert_called_once_with(
"auth_cache.access",
tags=["cache:auth_gateway", "result:miss_absent"],
)


@pytest.mark.unit
def test_record_cache_eviction_emits_statsd_when_enabled():
with (
patch.object(cache_metrics, "_STATSD_ENABLED", True),
patch.object(cache_metrics, "_instruments_initialized", True),
patch.object(cache_metrics, "_eviction_counter", None),
patch.object(cache_metrics, "statsd") as mock_statsd,
):
cache_metrics.record_cache_eviction("agent_api_key")

mock_statsd.increment.assert_called_once_with(
"auth_cache.eviction",
tags=["cache:agent_api_key"],
)
Loading