diff --git a/packages/uipath-google-adk/pyproject.toml b/packages/uipath-google-adk/pyproject.toml index 73a696d6..48f2f6ea 100644 --- a/packages/uipath-google-adk/pyproject.toml +++ b/packages/uipath-google-adk/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "google-adk>=1.25.1", "openinference-instrumentation-google-adk>=0.1.9", "uipath>=2.10.0, <2.11.0", + "uipath-core>=0.5.18, <0.7.0", "uipath-runtime>=0.11.0, <0.12.0", ] classifiers = [ @@ -30,6 +31,9 @@ register = "uipath_google_adk.middlewares:register_middleware" [project.entry-points."uipath.runtime.factories"] google-adk = "uipath_google_adk.runtime:register_runtime_factory" +[project.entry-points."uipath.governance.adapters"] +google-adk = "uipath_google_adk.governance:register_governance_adapter" + [project.urls] Homepage = "https://uipath.com" Repository = "https://github.com/UiPath/uipath-integrations-python" diff --git a/packages/uipath-google-adk/src/uipath_google_adk/governance/__init__.py b/packages/uipath-google-adk/src/uipath_google_adk/governance/__init__.py new file mode 100644 index 00000000..03e22dee --- /dev/null +++ b/packages/uipath-google-adk/src/uipath_google_adk/governance/__init__.py @@ -0,0 +1,51 @@ +"""Governance integration for ``uipath-google-adk``. + +Registers :class:`GoogleADKAdapter` with the adapter registry in +``uipath.core.adapters`` so the governance host can attach the ADK-specific +inner hooks (BEFORE_MODEL, AFTER_MODEL, TOOL_CALL, AFTER_TOOL) when it sees a +Google ADK agent. + +Registration is **idempotent**: calling :func:`register_governance_adapter` +twice is a no-op on the second call. + +Wiring: the package exposes :func:`register_governance_adapter` as an entry +point under ``uipath.governance.adapters``. The governance adapter discovery +path calls it to register the adapter. Importing this module does not, by +itself, mutate the global registry. +""" + +from __future__ import annotations + +import logging + +from uipath.core.adapters import get_adapter_registry + +from .adapter import GoogleADKAdapter, GovernanceCallbacks + +logger = logging.getLogger(__name__) + +_registered: bool = False + + +def register_governance_adapter() -> None: + """Register :class:`GoogleADKAdapter` with the global registry. + + Idempotent — safe to call multiple times. + """ + global _registered + if _registered: + return + registry = get_adapter_registry() + if any(a.name == "GoogleADK" for a in registry.get_all()): + _registered = True + return + registry.register(GoogleADKAdapter()) + _registered = True + logger.debug("Registered uipath-google-adk governance adapter") + + +__all__ = [ + "GoogleADKAdapter", + "GovernanceCallbacks", + "register_governance_adapter", +] diff --git a/packages/uipath-google-adk/src/uipath_google_adk/governance/adapter.py b/packages/uipath-google-adk/src/uipath_google_adk/governance/adapter.py new file mode 100644 index 00000000..7828ef1f --- /dev/null +++ b/packages/uipath-google-adk/src/uipath_google_adk/governance/adapter.py @@ -0,0 +1,422 @@ +"""Google ADK adapter for UiPath governance. + +Provides governance for Google ADK agents (``google.adk.agents.LlmAgent`` +and any ``BaseAgent`` tree containing them). Unlike the LangChain adapter +— which wraps a ``Runnable`` and intercepts ``invoke`` / ``ainvoke`` — ADK +agents are executed by a ``Runner`` that holds its **own** reference to +the agent object. Replacing ``runtime.agent`` with a proxy would never +reach the ``Runner``. So this adapter installs governance directly onto +each ``LlmAgent``'s native callback attributes, mutating them in place: + +- ``before_model_callback`` → BEFORE_MODEL +- ``after_model_callback`` → AFTER_MODEL +- ``before_tool_callback`` → TOOL_CALL +- ``after_tool_callback`` → AFTER_TOOL + +Because the mutation is in place, :meth:`GoogleADKAdapter.attach` returns +the **original agent** (hooks installed) rather than a wrapping proxy. +Returning a proxy here would also break ADK's own ``isinstance(agent, +LlmAgent)`` checks in output-schema / graph resolution, since ``LlmAgent`` +is a Pydantic model. + +Chain-level boundaries (BEFORE_AGENT / AFTER_AGENT) are intentionally +*not* fired from here — they are owned by the governance host. Firing them +here too would duplicate every boundary evaluation. + +Contracts and the evaluator protocol come from ``uipath-core``; this +package contributes only the ADK-specific implementation and registers it +with the adapter registry via the ``uipath.governance.adapters`` entry +point. + +Audit emission and enforcement (raising :class:`GovernanceBlockException` +on DENY) are owned by the evaluator itself. Each callback only extracts +the relevant payload and calls the matching ``evaluate_*`` method; +:class:`GovernanceBlockException` is allowed to propagate (it aborts the +``Runner`` run), anything else is logged and swallowed so a governance +bug never breaks an agent run. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List +from uuid import uuid4 + +from uipath.core.adapters import BaseAdapter, EvaluatorProtocol +from uipath.core.governance.exceptions import GovernanceBlockException + +logger = logging.getLogger(__name__) + +# Cap on the text blob passed to BEFORE_MODEL / AFTER_MODEL governance +# evaluation. Sized to match the governance host and the other adapters so +# scan-time budgets are consistent across hooks. A long conversation +# history is governed at the LLM layer by scanning only the latest +# request content, not the full prompt — see +# :meth:`GovernanceCallbacks._latest_request_text`. +_BEFORE_MODEL_TEXT_CAP = 64000 + +# Native LlmAgent callback attribute names this adapter manages. +_MODEL_BEFORE = "before_model_callback" +_MODEL_AFTER = "after_model_callback" +_TOOL_BEFORE = "before_tool_callback" +_TOOL_AFTER = "after_tool_callback" + + +def _is_governance_callable(fn: Any) -> bool: + """True if ``fn`` is a bound method of a :class:`GovernanceCallbacks`.""" + return isinstance(getattr(fn, "__self__", None), GovernanceCallbacks) + + +def _install_callback(agent: Any, attr: str, fn: Any) -> None: + """Prepend ``fn`` to an ADK callback slot, preserving existing handlers. + + ADK accepts a single callable or a ``list`` of callables for each + ``*_callback`` field and runs them in order, stopping early if one + returns a value (a short-circuit). Governance is prepended (runs + first) so it always evaluates — and can BLOCK — before any + user-supplied callback gets a chance to short-circuit the model / + tool call. + + Idempotent: if a governance callback is already present in the slot, + this is a no-op (so a double ``attach`` does not stack duplicates). + """ + existing = getattr(agent, attr, None) + if existing is None: + handlers: List[Any] = [] + elif isinstance(existing, list): + handlers = list(existing) + else: + handlers = [existing] + if any(_is_governance_callable(h) for h in handlers): + return + setattr(agent, attr, [fn, *handlers]) + + +def _remove_callbacks(agent: Any) -> None: + """Strip this adapter's governance callbacks from every managed slot.""" + for attr in (_MODEL_BEFORE, _MODEL_AFTER, _TOOL_BEFORE, _TOOL_AFTER): + existing = getattr(agent, attr, None) + if existing is None: + continue + if isinstance(existing, list): + kept = [h for h in existing if not _is_governance_callable(h)] + setattr(agent, attr, kept or None) + elif _is_governance_callable(existing): + setattr(agent, attr, None) + + +def _iter_llm_agents(root: Any) -> List[Any]: + """Return every ``LlmAgent``-shaped node in the ``sub_agents`` tree. + + A node qualifies if it exposes the model-callback surface (duck-typed + via :data:`_MODEL_BEFORE` so we don't hard-require ``LlmAgent`` to be + importable). Container agents (``Sequential`` / ``Parallel`` / ``Loop``) + have no model callbacks themselves but their ``sub_agents`` are walked + so a multi-agent app is governed end to end. Cycles and pathological + depth are bounded by an id-visited set and a hard cap. + """ + found: List[Any] = [] + seen: set[int] = set() + stack: List[Any] = [root] + while stack and len(seen) < 1000: + node = stack.pop() + if node is None or id(node) in seen: + continue + seen.add(id(node)) + if hasattr(node, _MODEL_BEFORE): + found.append(node) + sub_agents = getattr(node, "sub_agents", None) + if isinstance(sub_agents, (list, tuple)): + stack.extend(sub_agents) + return found + + +class GoogleADKAdapter(BaseAdapter): + """Adapter for the Google ADK framework. + + Detects ``google.adk`` agents and installs governance callbacks on + every ``LlmAgent`` reachable through the ``sub_agents`` tree. + """ + + @property + def name(self) -> str: + return "GoogleADK" + + def can_handle(self, agent: Any) -> bool: + """Return True only for a Google ADK ``BaseAgent`` (incl. LlmAgent trees).""" + try: + from google.adk.agents import BaseAgent + except ImportError: + return False + return isinstance(agent, BaseAgent) + + def attach( + self, + agent: Any, + agent_id: str, + session_id: str, + evaluator: EvaluatorProtocol, + ) -> Any: + """Install governance callbacks on the agent (mutated in place). + + Returns the original ``agent`` — the ``Runner`` already holds this + reference, so in-place mutation is what actually wires governance + into execution. A wrapping proxy would not reach the ``Runner`` + and would break ADK's ``isinstance(agent, LlmAgent)`` checks. + """ + callbacks = GovernanceCallbacks( + evaluator=evaluator, + agent_name=agent_id, + session_id=session_id, + ) + llm_agents = _iter_llm_agents(agent) + for node in llm_agents: + _install_callback(node, _MODEL_BEFORE, callbacks.before_model) + _install_callback(node, _MODEL_AFTER, callbacks.after_model) + _install_callback(node, _TOOL_BEFORE, callbacks.before_tool) + _install_callback(node, _TOOL_AFTER, callbacks.after_tool) + if not llm_agents: + logger.warning( + "GoogleADKAdapter found no LlmAgent in %s — deep hooks will not fire", + type(agent).__name__, + ) + else: + logger.debug( + "Installed governance callbacks on %d ADK LlmAgent(s)", + len(llm_agents), + ) + return agent + + def detach(self, governed: Any) -> Any: + """Remove governance callbacks from the agent tree and return it.""" + for node in _iter_llm_agents(governed): + _remove_callbacks(node) + return governed + + +class GovernanceCallbacks: + """Holds the four ADK callbacks bound to one governance evaluator. + + The evaluator owns audit emission and DENY-raising. Each callback + extracts the relevant payload, calls the matching ``evaluate_*`` + method, and returns ``None`` (never short-circuiting the model / tool + on its own). :class:`GovernanceBlockException` is allowed to + propagate — it aborts the ``Runner`` run — anything else is logged + and swallowed. + """ + + def __init__( + self, + evaluator: EvaluatorProtocol, + agent_name: str, + session_id: str, + ) -> None: + self._evaluator = evaluator + self._agent_name = agent_name + self._session_id = session_id + self._trace_id = str(uuid4()) + self._session_state: Dict[str, Any] = {"tool_calls": 0, "llm_calls": 0} + + # ----- Model callbacks ------------------------------------------------- + + def before_model(self, callback_context: Any, llm_request: Any) -> None: + """Evaluate BEFORE_MODEL rules at model start. + + Scans only the **latest request content** — not the full history. + The model still receives the entire history (this callback does + not mutate ``llm_request``); the evaluator focuses on the new + content the agent is about to respond to. Without this scoping, a + violation in an earlier turn would re-fire on every subsequent + model call because that text stays in the prompt for context. + + Returns ``None`` so ADK proceeds with the model call. + """ + try: + self._session_state["llm_calls"] = ( + self._session_state.get("llm_calls", 0) + 1 + ) + model_input = self._latest_request_text(llm_request) + self._evaluator.evaluate_before_model( + model_input=model_input, + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + ) + except GovernanceBlockException: + raise + except Exception as e: + logger.warning("before_model governance check failed (continuing): %s", e) + return None + + def after_model(self, callback_context: Any, llm_response: Any) -> None: + """Evaluate AFTER_MODEL rules at model end. + + Partial (streamed) responses are skipped — ADK fires + ``after_model_callback`` for each chunk with ``partial=True`` and + once more for the aggregated final response. Governing only the + final response avoids re-scanning the same text token-by-token. + + Returns ``None`` so ADK keeps the model's response unchanged. + """ + try: + if getattr(llm_response, "partial", False): + return None + content = getattr(llm_response, "content", None) + model_output = self._content_text(content) + self._evaluator.evaluate_after_model( + model_output=model_output, + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + ) + except GovernanceBlockException: + raise + except Exception as e: + logger.warning("after_model governance check failed (continuing): %s", e) + return None + + # ----- Tool callbacks -------------------------------------------------- + + def before_tool(self, tool: Any, args: Dict[str, Any], tool_context: Any) -> None: + """Evaluate TOOL_CALL rules at tool start. + + Returns ``None`` so ADK proceeds with the tool call (a non-None + return would short-circuit it with a substitute result). + """ + try: + self._session_state["tool_calls"] = ( + self._session_state.get("tool_calls", 0) + 1 + ) + tool_name = getattr(tool, "name", None) or "unknown" + self._evaluator.evaluate_tool_call( + tool_name=tool_name, + tool_args=args or {}, + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + session_state=self._session_state, + ) + except GovernanceBlockException: + raise + except Exception as e: + logger.warning("before_tool governance check failed (continuing): %s", e) + return None + + def after_tool( + self, + tool: Any, + args: Dict[str, Any], + tool_context: Any, + tool_response: Any, + ) -> None: + """Evaluate AFTER_TOOL rules at tool end. + + Returns ``None`` so ADK keeps the tool's result unchanged. + """ + try: + tool_name = getattr(tool, "name", None) or "unknown" + tool_result = ( + "" if tool_response is None else self._stringify(tool_response) + ) + self._evaluator.evaluate_after_tool( + tool_name=tool_name, + tool_result=tool_result, + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + ) + except GovernanceBlockException: + raise + except Exception as e: + logger.warning("after_tool governance check failed (continuing): %s", e) + return None + + # ----- Text extraction ------------------------------------------------- + + def _latest_request_text(self, llm_request: Any) -> str: + """Extract text from the most-recent content in an ``LlmRequest``. + + ``llm_request.contents`` is the full ``list[Content]`` sent to the + model. We take the last entry — the new user message, or the tool + ``function_response`` being fed back — and pull its text cleanly + via :meth:`_content_text`. Returns ``""`` when there is nothing + extractable. + """ + contents = getattr(llm_request, "contents", None) + if not contents: + return "" + return self._content_text(contents[-1]) + + @classmethod + def _content_text(cls, content: Any) -> str: + """Return governance-relevant text from a ``Content`` (or part list). + + Walks ``content.parts`` and pulls, per part: + + - ``part.text`` — plain text. + - ``part.function_call`` — the tool name plus JSON-encoded + ``args``; ADK / Gemini routinely carry the user-visible reply in + a function call (e.g. a "submit final answer" tool). + - ``part.function_response`` — the tool result fed back to the + model; relevant when it is the latest content for BEFORE_MODEL. + + Capped at :data:`_BEFORE_MODEL_TEXT_CAP` so a runaway response or + large tool payload can't blow scan budgets. + """ + if content is None: + return "" + parts = getattr(content, "parts", None) + if parts is None: + # Some shapes hand us a bare string or a list of parts. + if isinstance(content, str): + return content[:_BEFORE_MODEL_TEXT_CAP] + if isinstance(content, (list, tuple)): + parts = content + else: + return "" + collected: List[str] = [] + remaining = _BEFORE_MODEL_TEXT_CAP + for part in parts: + if remaining <= 0: + break + piece = cls._part_text(part) + if piece: + collected.append(piece) + remaining -= len(piece) + 1 + return "\n".join(collected)[:_BEFORE_MODEL_TEXT_CAP] + + @classmethod + def _part_text(cls, part: Any) -> str: + """Return text / function-call args / function-response from one part.""" + pieces: List[str] = [] + text = getattr(part, "text", None) + if isinstance(text, str) and text: + pieces.append(text) + + function_call = getattr(part, "function_call", None) + if function_call is not None: + name = getattr(function_call, "name", "") or "" + fc_args = getattr(function_call, "args", None) + if name: + pieces.append(name) + if fc_args: + pieces.append(cls._stringify(fc_args)) + + function_response = getattr(part, "function_response", None) + if function_response is not None: + response = getattr(function_response, "response", None) + if response: + pieces.append(cls._stringify(response)) + + return "\n".join(p for p in pieces if p) + + @staticmethod + def _stringify(value: Any) -> str: + """Render a dict / object payload as compact, scannable text.""" + if isinstance(value, str): + return value + try: + return json.dumps(value, default=str, ensure_ascii=False) + except (TypeError, ValueError): + return str(value) diff --git a/packages/uipath-google-adk/tests/governance/__init__.py b/packages/uipath-google-adk/tests/governance/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/packages/uipath-google-adk/tests/governance/test_adapter.py b/packages/uipath-google-adk/tests/governance/test_adapter.py new file mode 100644 index 00000000..4feec4e5 --- /dev/null +++ b/packages/uipath-google-adk/tests/governance/test_adapter.py @@ -0,0 +1,357 @@ +"""Unit tests for the Google ADK governance adapter. + +``can_handle`` is tested against a real ``google.adk`` ``LlmAgent`` (the +adapter detects agents with ``isinstance(..., BaseAgent)``). The remaining +tests duck-type the ADK payloads — lightweight fakes for ``Part`` / +``Content`` / ``LlmRequest`` / ``LlmResponse`` / tool / agent — so the +callback code paths are exercised without driving the heavy ADK runtime. +""" + +from __future__ import annotations + +import logging +from types import SimpleNamespace +from typing import Any, List + +import pytest +from uipath.core.governance.exceptions import GovernanceBlockException + +from uipath_google_adk.governance.adapter import ( + _BEFORE_MODEL_TEXT_CAP, + GoogleADKAdapter, + GovernanceCallbacks, +) + +# -------------------------------------------------------------------------- +# Fakes +# -------------------------------------------------------------------------- + + +class FakeEvaluator: + """Records evaluate_* calls; optionally BLOCKs on a named hook.""" + + def __init__(self, block_on: str | None = None) -> None: + self.block_on = block_on + self.calls: List[tuple[str, dict]] = [] + + def _record(self, hook: str, **kwargs: Any) -> None: + self.calls.append((hook, kwargs)) + if self.block_on == hook: + raise GovernanceBlockException("blocked") # type: ignore[call-arg] + + def evaluate_before_agent(self, **kwargs: Any) -> None: + self._record("before_agent", **kwargs) + + def evaluate_after_agent(self, **kwargs: Any) -> None: + self._record("after_agent", **kwargs) + + def evaluate_before_model(self, **kwargs: Any) -> None: + self._record("before_model", **kwargs) + + def evaluate_after_model(self, **kwargs: Any) -> None: + self._record("after_model", **kwargs) + + def evaluate_tool_call(self, **kwargs: Any) -> None: + self._record("tool_call", **kwargs) + + def evaluate_after_tool(self, **kwargs: Any) -> None: + self._record("after_tool", **kwargs) + + +class FakeLlmAgent: + """Minimal stand-in for ``google.adk.agents.LlmAgent``.""" + + def __init__(self, name: str = "agent", sub_agents: List[Any] | None = None): + self.name = name + self.before_model_callback: Any = None + self.after_model_callback: Any = None + self.before_tool_callback: Any = None + self.after_tool_callback: Any = None + self.sub_agents = sub_agents or [] + + +class FakeContainerAgent: + """Container agent (Sequential/Parallel) with no model callbacks.""" + + def __init__(self, name: str, sub_agents: List[Any]): + self.name = name + self.sub_agents = sub_agents + + +class FakeTool: + def __init__(self, name: str): + self.name = name + + +def _part( + text: str | None = None, + function_call: Any = None, + function_response: Any = None, +) -> SimpleNamespace: + return SimpleNamespace( + text=text, + function_call=function_call, + function_response=function_response, + ) + + +def _content(parts: List[Any], role: str = "user") -> SimpleNamespace: + return SimpleNamespace(role=role, parts=parts) + + +def _make_callbacks(evaluator: FakeEvaluator) -> GovernanceCallbacks: + return GovernanceCallbacks( + evaluator=evaluator, agent_name="agent-1", session_id="sess-1" + ) + + +# -------------------------------------------------------------------------- +# can_handle +# -------------------------------------------------------------------------- + + +def test_can_handle_real_agent(): + from google.adk.agents import LlmAgent + + assert GoogleADKAdapter().can_handle(LlmAgent(name="t")) is True + + +def test_can_handle_rejects_non_adk_agent(): + # Duck-typed look-alikes (name + model-callback / sub_agents) must NOT be + # claimed — only a real google.adk BaseAgent is. + assert GoogleADKAdapter().can_handle(FakeLlmAgent()) is False + assert GoogleADKAdapter().can_handle(FakeContainerAgent("root", [FakeLlmAgent()])) is False + assert GoogleADKAdapter().can_handle(object()) is False + + +# -------------------------------------------------------------------------- +# attach / detach +# -------------------------------------------------------------------------- + + +def test_attach_installs_on_all_llm_agents_in_tree(): + leaf_a = FakeLlmAgent("a") + leaf_b = FakeLlmAgent("b") + root = FakeContainerAgent("root", [leaf_a, leaf_b]) + + returned = GoogleADKAdapter().attach( + root, agent_id="x", session_id="s", evaluator=FakeEvaluator() + ) + + assert returned is root # original returned, not a proxy + for leaf in (leaf_a, leaf_b): + assert isinstance(leaf.before_model_callback, list) + assert len(leaf.before_model_callback) == 1 + assert leaf.after_model_callback and leaf.before_tool_callback + assert leaf.after_tool_callback + + +def test_attach_is_idempotent(): + agent = FakeLlmAgent() + adapter = GoogleADKAdapter() + ev = FakeEvaluator() + adapter.attach(agent, agent_id="x", session_id="s", evaluator=ev) + adapter.attach(agent, agent_id="x", session_id="s", evaluator=ev) + assert len(agent.before_model_callback) == 1 + + +def test_attach_preserves_existing_callback_and_runs_governance_first(): + def user_cb(*_a, **_k): + return None + + agent = FakeLlmAgent() + agent.before_model_callback = user_cb + GoogleADKAdapter().attach( + agent, agent_id="x", session_id="s", evaluator=FakeEvaluator() + ) + cbs = agent.before_model_callback + assert isinstance(cbs, list) and len(cbs) == 2 + # governance prepended → runs first + assert getattr(cbs[0], "__self__", None).__class__ is GovernanceCallbacks + assert cbs[1] is user_cb + + +def test_detach_removes_governance_callbacks(): + def user_cb(*_a, **_k): + return None + + agent = FakeLlmAgent() + agent.after_tool_callback = user_cb + adapter = GoogleADKAdapter() + adapter.attach(agent, agent_id="x", session_id="s", evaluator=FakeEvaluator()) + adapter.detach(agent) + assert agent.before_model_callback is None + # unrelated user callback survives + assert agent.after_tool_callback == [user_cb] + + +def test_attach_warns_when_no_llm_agent(caplog): + container = FakeContainerAgent("root", []) + with caplog.at_level(logging.WARNING): + GoogleADKAdapter().attach( + container, agent_id="x", session_id="s", evaluator=FakeEvaluator() + ) + assert any("no LlmAgent" in r.message for r in caplog.records) + + +# -------------------------------------------------------------------------- +# before_model +# -------------------------------------------------------------------------- + + +def test_before_model_scopes_to_latest_content(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + req = SimpleNamespace( + contents=[ + _content([_part(text="OLD turn — secret leak here")]), + _content([_part(text="the new question")]), + ] + ) + cb.before_model(callback_context=None, llm_request=req) + hook, kwargs = ev.calls[-1] + assert hook == "before_model" + assert kwargs["model_input"] == "the new question" + assert "OLD turn" not in kwargs["model_input"] + + +def test_before_model_extracts_function_response_when_latest(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + fr = SimpleNamespace(name="lookup", response={"balance": "1000"}) + req = SimpleNamespace(contents=[_content([_part(function_response=fr)])]) + cb.before_model(callback_context=None, llm_request=req) + assert "1000" in ev.calls[-1][1]["model_input"] + + +def test_before_model_caps_text(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + huge = "x" * (_BEFORE_MODEL_TEXT_CAP + 5000) + req = SimpleNamespace(contents=[_content([_part(text=huge)])]) + cb.before_model(callback_context=None, llm_request=req) + assert len(ev.calls[-1][1]["model_input"]) <= _BEFORE_MODEL_TEXT_CAP + + +def test_before_model_empty_contents(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + cb.before_model(callback_context=None, llm_request=SimpleNamespace(contents=[])) + assert ev.calls[-1][1]["model_input"] == "" + + +# -------------------------------------------------------------------------- +# after_model +# -------------------------------------------------------------------------- + + +def test_after_model_skips_partial(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + resp = SimpleNamespace(partial=True, content=_content([_part(text="chunk")])) + cb.after_model(callback_context=None, llm_response=resp) + assert ev.calls == [] + + +def test_after_model_extracts_text_and_function_call(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + fc = SimpleNamespace(name="submit_answer", args={"content": "final reply"}) + resp = SimpleNamespace( + partial=False, + content=_content( + [_part(text="thinking"), _part(function_call=fc)], role="model" + ), + ) + cb.after_model(callback_context=None, llm_response=resp) + out = ev.calls[-1][1]["model_output"] + assert "thinking" in out and "submit_answer" in out and "final reply" in out + + +# -------------------------------------------------------------------------- +# tools +# -------------------------------------------------------------------------- + + +def test_before_tool_passes_args_and_session_state(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + cb.before_tool(FakeTool("transfer"), {"amount": 50}, tool_context=None) + hook, kwargs = ev.calls[-1] + assert hook == "tool_call" + assert kwargs["tool_name"] == "transfer" + assert kwargs["tool_args"] == {"amount": 50} + assert kwargs["session_state"]["tool_calls"] == 1 + + +def test_after_tool_stringifies_dict_response(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + cb.after_tool(FakeTool("lookup"), {}, tool_context=None, tool_response={"x": 1}) + out = ev.calls[-1][1]["tool_result"] + assert "x" in out and "1" in out + + +def test_after_tool_none_response(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + cb.after_tool(FakeTool("noop"), {}, tool_context=None, tool_response=None) + assert ev.calls[-1][1]["tool_result"] == "" + + +# -------------------------------------------------------------------------- +# enforcement semantics +# -------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "hook,invoke", + [ + ( + "before_model", + lambda cb: cb.before_model( + None, SimpleNamespace(contents=[_content([_part(text="hi")])]) + ), + ), + ( + "after_model", + lambda cb: cb.after_model( + None, + SimpleNamespace(partial=False, content=_content([_part(text="o")])), + ), + ), + ("tool_call", lambda cb: cb.before_tool(FakeTool("t"), {}, None)), + ( + "after_tool", + lambda cb: cb.after_tool(FakeTool("t"), {}, None, {"r": 1}), + ), + ], +) +def test_block_exception_propagates(hook, invoke): + cb = _make_callbacks(FakeEvaluator(block_on=hook)) + with pytest.raises(GovernanceBlockException): + invoke(cb) + + +def test_non_block_exception_is_swallowed(caplog): + class Boom: + def evaluate_before_model(self, **_): + raise RuntimeError("evaluator bug") + + cb = GovernanceCallbacks( + evaluator=Boom(), + agent_name="a", + session_id="s", # type: ignore[arg-type] + ) + with caplog.at_level(logging.WARNING): + # must NOT raise — a governance bug can't break the agent run + cb.before_model(None, SimpleNamespace(contents=[_content([_part(text="x")])])) + assert any("governance check failed" in r.message for r in caplog.records) + + +def test_callbacks_return_none(): + cb = _make_callbacks(FakeEvaluator()) + assert cb.before_model(None, SimpleNamespace(contents=[])) is None + assert cb.after_model(None, SimpleNamespace(partial=False, content=None)) is None + assert cb.before_tool(FakeTool("t"), {}, None) is None + assert cb.after_tool(FakeTool("t"), {}, None, {}) is None