diff --git a/packages/uipath-llamaindex/pyproject.toml b/packages/uipath-llamaindex/pyproject.toml index edcaff6c..51f3b7bf 100644 --- a/packages/uipath-llamaindex/pyproject.toml +++ b/packages/uipath-llamaindex/pyproject.toml @@ -12,6 +12,7 @@ dependencies = [ "llama-index-llms-azure-openai>=0.4.2", "openinference-instrumentation-llama-index>=4.3.9", "uipath>=2.10.0, <2.11.0", + "uipath-core>=0.5.18, <0.7.0", "uipath-runtime>=0.11.0, <0.12.0", ] classifiers = [ @@ -44,6 +45,9 @@ register = "uipath_llamaindex.middlewares:register_middleware" [project.entry-points."uipath.runtime.factories"] llamaindex = "uipath_llamaindex.runtime:register_runtime_factory" +[project.entry-points."uipath.governance.adapters"] +llamaindex = "uipath_llamaindex.governance:register_governance_adapter" + [project.urls] Homepage = "https://uipath.com" Repository = "https://github.com/UiPath/uipath-integrations-python/" diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/governance/__init__.py b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/__init__.py new file mode 100644 index 00000000..7dfcc4e7 --- /dev/null +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/__init__.py @@ -0,0 +1,52 @@ +"""Governance integration for ``uipath-llamaindex``. + +Registers :class:`LlamaIndexAdapter` with the adapter registry in +``uipath.core.adapters`` so the governance host can attach the +LlamaIndex-specific governance (BEFORE_MODEL, AFTER_MODEL, TOOL_CALL) when it +sees a LlamaIndex workflow/agent. + +Registration is **idempotent**: calling :func:`register_governance_adapter` +twice is a no-op on the second call. + +Wiring: the package exposes :func:`register_governance_adapter` as an entry +point under ``uipath.governance.adapters``. The governance adapter discovery +path calls it to register the adapter. Importing this module does not, by +itself, mutate the global registry. +""" + +from __future__ import annotations + +import logging + +from uipath.core.adapters import get_adapter_registry + +from .adapter import GovernanceEventHandler, LlamaIndexAdapter + +logger = logging.getLogger(__name__) + +_registered: bool = False + + +def register_governance_adapter() -> None: + """Register :class:`LlamaIndexAdapter` with the global registry. + + Idempotent — safe to call multiple times. + """ + global _registered + if _registered: + return + registry = get_adapter_registry() + if any(a.name == "LlamaIndex" for a in registry.get_all()): + _registered = True + return + registry.register(LlamaIndexAdapter()) + _registered = True + logger.debug("Registered uipath-llamaindex governance adapter") + + + +__all__ = [ + "GovernanceEventHandler", + "LlamaIndexAdapter", + "register_governance_adapter", +] \ No newline at end of file diff --git a/packages/uipath-llamaindex/src/uipath_llamaindex/governance/adapter.py b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/adapter.py new file mode 100644 index 00000000..1e1e46fc --- /dev/null +++ b/packages/uipath-llamaindex/src/uipath_llamaindex/governance/adapter.py @@ -0,0 +1,283 @@ +"""LlamaIndex adapter for UiPath governance. + +Provides governance for LlamaIndex agents/workflows. Unlike the ADK / OpenAI / +Agent-Framework adapters — which install per-agent callbacks or middleware — +LlamaIndex routes everything (LLM calls, tool calls) through its global +**instrumentation dispatcher** (the same mechanism the package already uses for +OpenInference tracing). So this adapter governs by registering a +:class:`GovernanceEventHandler` on the **root dispatcher**, which receives every +event propagated from child dispatchers: + +- ``LLMChatStartEvent`` → BEFORE_MODEL (scans the latest input message) +- ``LLMChatEndEvent`` → AFTER_MODEL (scans the response) +- ``AgentToolCallEvent`` → TOOL_CALL (tool name + arguments) + +The dispatcher is process-global, so registration is process-wide — which fits +the coded-agent model (one workflow per process). :meth:`attach` therefore +returns the ``agent`` unchanged (nothing is mutated on it); the wiring lives on +the dispatcher. :meth:`detach` removes the handler. + +LlamaIndex does **not** emit a tool-*end* instrumentation event, so AFTER_TOOL +is not wired here; a tool's result is governed at the next ``LLMChatStartEvent`` +where it is fed back to the model as input (analogous to how the OpenAI adapter +handles its missing tool-args). + +Chain-level boundaries (BEFORE_AGENT / AFTER_AGENT) are owned by the +governance host and are intentionally not fired here. + +Contracts and the evaluator protocol come from ``uipath-core``; this package +contributes only the LlamaIndex-specific implementation and registers it with +the adapter registry via the ``uipath.governance.adapters`` entry point. + +Audit emission and enforcement (raising :class:`GovernanceBlockException` on +DENY) are owned by the evaluator. The handler only extracts payloads and calls +the matching ``evaluate_*`` method; :class:`GovernanceBlockException` propagates +(aborting the run), anything else is logged and swallowed. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, List +from uuid import uuid4 + +from llama_index.core.instrumentation import ( # type: ignore[attr-defined] + get_dispatcher, +) +from llama_index.core.instrumentation.event_handlers.base import ( # type: ignore[attr-defined] + BaseEventHandler, +) +from llama_index.core.instrumentation.events.agent import AgentToolCallEvent +from llama_index.core.instrumentation.events.llm import ( + LLMChatEndEvent, + LLMChatStartEvent, +) +from pydantic import PrivateAttr +from uipath.core.adapters import BaseAdapter, EvaluatorProtocol +from uipath.core.governance.exceptions import GovernanceBlockException + +logger = logging.getLogger(__name__) + +# Cap on the text blob passed to BEFORE_MODEL / AFTER_MODEL governance +# evaluation. Sized to match the runtime side and the other adapters. +_BEFORE_MODEL_TEXT_CAP = 64000 + + +class LlamaIndexAdapter(BaseAdapter): + """Adapter for the LlamaIndex framework. + + Detects LlamaIndex workflows/agents and governs them by registering a + :class:`GovernanceEventHandler` on the root instrumentation dispatcher. + """ + + @property + def name(self) -> str: + return "LlamaIndex" + + def can_handle(self, agent: Any) -> bool: + """Return True only for a LlamaIndex ``Workflow`` (incl. agent workflows).""" + try: + from workflows import Workflow + except ImportError: + return False + return isinstance(agent, Workflow) + + def attach( + self, + agent: Any, + agent_id: str, + session_id: str, + evaluator: EvaluatorProtocol, + ) -> Any: + """Register the governance event handler on the root dispatcher. + + Returns the ``agent`` unchanged — LlamaIndex governance is wired on the + process-global dispatcher, not on the agent object. Idempotent: a + second attach is a no-op while a handler is already registered. + """ + dispatcher = get_dispatcher() + if any(isinstance(h, GovernanceEventHandler) for h in dispatcher.event_handlers): + return agent # idempotent — already governed + callbacks = GovernanceCallbacks( + evaluator=evaluator, agent_name=agent_id, session_id=session_id + ) + dispatcher.add_event_handler(GovernanceEventHandler(callbacks=callbacks)) + logger.debug("Registered governance event handler on LlamaIndex dispatcher") + return agent + + def detach(self, governed: Any) -> Any: + """Remove the governance event handler from the root dispatcher.""" + dispatcher = get_dispatcher() + dispatcher.event_handlers = [ + h + for h in dispatcher.event_handlers + if not isinstance(h, GovernanceEventHandler) + ] + return governed + + +class GovernanceEventHandler(BaseEventHandler): + """Routes LlamaIndex instrumentation events to a governance evaluator. + + A pydantic model (``BaseEventHandler`` is one), so the evaluator + state + are held in a private attribute. ``handle`` is called synchronously by the + dispatcher for every event; we dispatch the three governance-relevant + types and ignore the rest. + """ + + _callbacks: "GovernanceCallbacks" = PrivateAttr() + + def __init__(self, callbacks: "GovernanceCallbacks", **data: Any) -> None: + super().__init__(**data) + self._callbacks = callbacks + + @classmethod + def class_name(cls) -> str: + return "GovernanceEventHandler" + + def handle(self, event: Any, **kwargs: Any) -> Any: + if isinstance(event, LLMChatStartEvent): + self._callbacks.before_model(event.messages) + elif isinstance(event, LLMChatEndEvent): + self._callbacks.after_model(event.response) + elif isinstance(event, AgentToolCallEvent): + self._callbacks.tool_call(event.tool, event.arguments) + return None + + +class GovernanceCallbacks: + """Holds the evaluator + per-attach state, called by the event handler. + + :class:`GovernanceBlockException` is re-raised (it aborts the run); + anything else is logged and swallowed so a governance bug never breaks an + agent run. + """ + + def __init__( + self, + evaluator: EvaluatorProtocol, + agent_name: str, + session_id: str, + ) -> None: + self._evaluator = evaluator + self._agent_name = agent_name + self._session_id = session_id + self._trace_id = str(uuid4()) + self._session_state: Dict[str, Any] = {"tool_calls": 0, "llm_calls": 0} + + def before_model(self, messages: Any) -> None: + """Evaluate BEFORE_MODEL on the latest input message (see ADK rationale).""" + try: + self._session_state["llm_calls"] = ( + self._session_state.get("llm_calls", 0) + 1 + ) + self._evaluator.evaluate_before_model( + model_input=_latest_message_text(messages), + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + ) + except GovernanceBlockException: + raise + except Exception as e: # noqa: BLE001 - governance must not break the run + logger.warning("before_model governance check failed (continuing): %s", e) + + def after_model(self, response: Any) -> None: + """Evaluate AFTER_MODEL on the chat response text.""" + try: + self._evaluator.evaluate_after_model( + model_output=_response_text(response), + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + ) + except GovernanceBlockException: + raise + except Exception as e: # noqa: BLE001 + logger.warning("after_model governance check failed (continuing): %s", e) + + def tool_call(self, tool: Any, arguments: Any) -> None: + """Evaluate TOOL_CALL with the tool name + arguments.""" + try: + self._session_state["tool_calls"] = ( + self._session_state.get("tool_calls", 0) + 1 + ) + self._evaluator.evaluate_tool_call( + tool_name=getattr(tool, "name", None) or "unknown", + tool_args=_coerce_args(arguments), + agent_name=self._agent_name, + runtime_id=self._session_id, + trace_id=self._trace_id, + session_state=self._session_state, + ) + except GovernanceBlockException: + raise + except Exception as e: # noqa: BLE001 + logger.warning("tool_call governance check failed (continuing): %s", e) + + +# -------------------------------------------------------------------------- +# Text / argument extraction +# -------------------------------------------------------------------------- + + +def _latest_message_text(messages: Any) -> str: + """Text of the most-recent message in a chat request.""" + if not messages: + return "" + if isinstance(messages, (list, tuple)): + return _message_text(messages[-1]) + return _message_text(messages) + + +def _message_text(message: Any) -> str: + """Pull text from a ``ChatMessage`` (``.content``) or a bare string.""" + if message is None: + return "" + if isinstance(message, str): + return message[:_BEFORE_MODEL_TEXT_CAP] + content = getattr(message, "content", None) + if isinstance(content, str) and content: + return content[:_BEFORE_MODEL_TEXT_CAP] + # Newer ChatMessage carries typed blocks; fall back to str(). + return str(message)[:_BEFORE_MODEL_TEXT_CAP] + + +def _response_text(response: Any) -> str: + """Pull assistant text from a ``ChatResponse`` (``.message.content``).""" + if response is None: + return "" + message = getattr(response, "message", None) + if message is not None: + return _message_text(message) + text = getattr(response, "text", None) + if isinstance(text, str): + return text[:_BEFORE_MODEL_TEXT_CAP] + return str(response)[:_BEFORE_MODEL_TEXT_CAP] + + +def _coerce_args(arguments: Any) -> Dict[str, Any]: + """Normalise tool arguments (JSON string / Mapping / None) to a dict. + + ``AgentToolCallEvent.arguments`` is a JSON-encoded string; other call + sites may hand a dict directly. + """ + if arguments is None: + return {} + if isinstance(arguments, dict): + return arguments + if isinstance(arguments, str): + try: + parsed = json.loads(arguments) + return parsed if isinstance(parsed, dict) else {"_": parsed} + except (TypeError, ValueError): + return {} + return {} + + +__all__: List[str] = [ + "GovernanceCallbacks", + "GovernanceEventHandler", + "LlamaIndexAdapter", +] \ No newline at end of file diff --git a/packages/uipath-llamaindex/tests/governance/__init__.py b/packages/uipath-llamaindex/tests/governance/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/packages/uipath-llamaindex/tests/governance/test_adapter.py b/packages/uipath-llamaindex/tests/governance/test_adapter.py new file mode 100644 index 00000000..b9022ed2 --- /dev/null +++ b/packages/uipath-llamaindex/tests/governance/test_adapter.py @@ -0,0 +1,261 @@ +"""Unit tests for the LlamaIndex governance adapter. + +The adapter governs via the LlamaIndex instrumentation dispatcher, so these +tests exercise the real event types (``LLMChatStartEvent`` etc.) routed +through :class:`GovernanceEventHandler`, plus the adapter's register/detach on +the dispatcher. The dispatcher is process-global, so each dispatcher test +cleans up after itself via ``detach``. +""" + +from __future__ import annotations + +import logging +from typing import Any, List + +import pytest +from llama_index.core.base.llms.types import ChatMessage, ChatResponse +from llama_index.core.instrumentation import get_dispatcher +from llama_index.core.instrumentation.events.agent import AgentToolCallEvent +from llama_index.core.instrumentation.events.llm import ( + LLMChatEndEvent, + LLMChatStartEvent, +) +from llama_index.core.tools.types import ToolMetadata +from uipath.core.governance.exceptions import GovernanceBlockException + +from uipath_llamaindex.governance.adapter import ( + _BEFORE_MODEL_TEXT_CAP, + GovernanceCallbacks, + GovernanceEventHandler, + LlamaIndexAdapter, + _coerce_args, +) + +# -------------------------------------------------------------------------- +# Fakes +# -------------------------------------------------------------------------- + + +class FakeEvaluator: + """Records evaluate_* calls; optionally BLOCKs on a named hook.""" + + def __init__(self, block_on: str | None = None) -> None: + self.block_on = block_on + self.calls: List[tuple[str, dict]] = [] + + def _record(self, hook: str, **kwargs: Any) -> None: + self.calls.append((hook, kwargs)) + if self.block_on == hook: + raise GovernanceBlockException("blocked") # type: ignore[call-arg] + + def evaluate_before_agent(self, **kwargs: Any) -> None: + self._record("before_agent", **kwargs) + + def evaluate_after_agent(self, **kwargs: Any) -> None: + self._record("after_agent", **kwargs) + + def evaluate_before_model(self, **kwargs: Any) -> None: + self._record("before_model", **kwargs) + + def evaluate_after_model(self, **kwargs: Any) -> None: + self._record("after_model", **kwargs) + + def evaluate_tool_call(self, **kwargs: Any) -> None: + self._record("tool_call", **kwargs) + + def evaluate_after_tool(self, **kwargs: Any) -> None: + self._record("after_tool", **kwargs) + + +class FakeWorkflow: + """Duck-typed LlamaIndex workflow stand-in.""" + + async def run(self, *_a: Any, **_k: Any) -> None: + return None + + +def _make_callbacks(ev: FakeEvaluator) -> GovernanceCallbacks: + return GovernanceCallbacks(evaluator=ev, agent_name="agent-1", session_id="sess-1") + + +def _handler(ev: FakeEvaluator) -> GovernanceEventHandler: + return GovernanceEventHandler(callbacks=_make_callbacks(ev)) + + +# -------------------------------------------------------------------------- +# can_handle +# -------------------------------------------------------------------------- + + +def test_can_handle_real_workflow(): + from workflows import Workflow, step + from workflows.events import StartEvent, StopEvent + + class _RealWorkflow(Workflow): + @step + async def go(self, ev: StartEvent) -> StopEvent: + return StopEvent() + + assert LlamaIndexAdapter().can_handle(_RealWorkflow()) is True + + +def test_can_handle_rejects_non_workflow(): + # A duck-typed look-alike (has run / Workflow-shaped name) must NOT be + # claimed — only a real workflows.Workflow is. + assert LlamaIndexAdapter().can_handle(FakeWorkflow()) is False + assert LlamaIndexAdapter().can_handle(object()) is False + + +# -------------------------------------------------------------------------- +# attach / detach (real dispatcher) +# -------------------------------------------------------------------------- + + +def _gov_handlers() -> list: + return [ + h + for h in get_dispatcher().event_handlers + if isinstance(h, GovernanceEventHandler) + ] + + +def test_attach_registers_handler_then_detach_removes(): + adapter = LlamaIndexAdapter() + agent = FakeWorkflow() + try: + returned = adapter.attach(agent, agent_id="x", session_id="s", evaluator=FakeEvaluator()) + assert returned is agent + assert len(_gov_handlers()) == 1 + finally: + adapter.detach(agent) + assert _gov_handlers() == [] + + +def test_attach_is_idempotent(): + adapter = LlamaIndexAdapter() + agent = FakeWorkflow() + ev = FakeEvaluator() + try: + adapter.attach(agent, agent_id="x", session_id="s", evaluator=ev) + adapter.attach(agent, agent_id="x", session_id="s", evaluator=ev) + assert len(_gov_handlers()) == 1 + finally: + adapter.detach(agent) + + +# -------------------------------------------------------------------------- +# event routing through the handler +# -------------------------------------------------------------------------- + + +def test_handler_routes_llm_chat_start_to_before_model(): + ev = FakeEvaluator() + h = _handler(ev) + event = LLMChatStartEvent( + messages=[ChatMessage(role="user", content="old"), + ChatMessage(role="user", content="the question")], + additional_kwargs={}, + model_dict={}, + ) + h.handle(event) + hook, kwargs = ev.calls[-1] + assert hook == "before_model" + assert kwargs["model_input"] == "the question" # latest only + + +def test_handler_routes_llm_chat_end_to_after_model(): + ev = FakeEvaluator() + h = _handler(ev) + event = LLMChatEndEvent( + messages=[ChatMessage(role="user", content="q")], + response=ChatResponse(message=ChatMessage(role="assistant", content="the answer")), + ) + h.handle(event) + hook, kwargs = ev.calls[-1] + assert hook == "after_model" + assert kwargs["model_output"] == "the answer" + + +def test_handler_routes_tool_call(): + ev = FakeEvaluator() + h = _handler(ev) + event = AgentToolCallEvent( + tool=ToolMetadata(description="d", name="transfer"), + arguments='{"amount": 50}', + ) + h.handle(event) + hook, kwargs = ev.calls[-1] + assert hook == "tool_call" + assert kwargs["tool_name"] == "transfer" + assert kwargs["tool_args"] == {"amount": 50} + assert kwargs["session_state"]["tool_calls"] == 1 + + +def test_handler_ignores_unrelated_events(): + ev = FakeEvaluator() + h = _handler(ev) + h.handle(object()) # not a governance-relevant event + assert ev.calls == [] + + +# -------------------------------------------------------------------------- +# text / arg extraction +# -------------------------------------------------------------------------- + + +def test_before_model_caps_text(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + huge = "x" * (_BEFORE_MODEL_TEXT_CAP + 5000) + cb.before_model([ChatMessage(role="user", content=huge)]) + assert len(ev.calls[-1][1]["model_input"]) <= _BEFORE_MODEL_TEXT_CAP + + +def test_before_model_empty(): + ev = FakeEvaluator() + cb = _make_callbacks(ev) + cb.before_model([]) + assert ev.calls[-1][1]["model_input"] == "" + + +def test_coerce_args_json_string(): + assert _coerce_args('{"a": 1}') == {"a": 1} + + +def test_coerce_args_dict_passthrough(): + assert _coerce_args({"a": 1}) == {"a": 1} + + +def test_coerce_args_none_and_bad(): + assert _coerce_args(None) == {} + assert _coerce_args("not json") == {} + + +# -------------------------------------------------------------------------- +# enforcement semantics +# -------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "hook,invoke", + [ + ("before_model", lambda cb: cb.before_model([ChatMessage(role="user", content="hi")])), + ("after_model", lambda cb: cb.after_model(ChatResponse(message=ChatMessage(role="assistant", content="o")))), + ("tool_call", lambda cb: cb.tool_call(ToolMetadata(description="d", name="t"), "{}")), + ], +) +def test_block_exception_propagates(hook, invoke): + cb = _make_callbacks(FakeEvaluator(block_on=hook)) + with pytest.raises(GovernanceBlockException): + invoke(cb) + + +def test_non_block_exception_is_swallowed(caplog): + class Boom: + def evaluate_before_model(self, **_: Any) -> None: + raise RuntimeError("evaluator bug") + + cb = GovernanceCallbacks(evaluator=Boom(), agent_name="a", session_id="s") # type: ignore[arg-type] + with caplog.at_level(logging.WARNING): + cb.before_model([ChatMessage(role="user", content="x")]) + assert any("governance check failed" in r.message for r in caplog.records) \ No newline at end of file diff --git a/packages/uipath-llamaindex/uv.lock b/packages/uipath-llamaindex/uv.lock index f879a865..fe5fd437 100644 --- a/packages/uipath-llamaindex/uv.lock +++ b/packages/uipath-llamaindex/uv.lock @@ -3516,6 +3516,7 @@ dependencies = [ { name = "llama-index-workflows" }, { name = "openinference-instrumentation-llama-index" }, { name = "uipath" }, + { name = "uipath-core" }, { name = "uipath-runtime" }, ] @@ -3560,6 +3561,7 @@ requires-dist = [ { name = "llama-index-workflows", specifier = ">=2.18.0,<3.0.0" }, { name = "openinference-instrumentation-llama-index", specifier = ">=4.3.9" }, { name = "uipath", specifier = ">=2.10.0,<2.11.0" }, + { name = "uipath-core", specifier = ">=0.5.18,<0.7.0" }, { name = "uipath-runtime", specifier = ">=0.11.0,<0.12.0" }, ] provides-extras = ["bedrock", "vertex"]