From 01e0726a8f4aa298ef0c13bcfcfbdbde2328c579 Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Fri, 12 Jun 2026 16:32:22 +0530 Subject: [PATCH 1/3] feat(governance): delegation guard Co-Authored-By: Claude Opus 4.8 --- .../runtime/governance/delegation_guard.py | 248 ++++++++++++++ tests/test_delegation_guard.py | 320 ++++++++++++++++++ 2 files changed, 568 insertions(+) create mode 100644 src/uipath/runtime/governance/delegation_guard.py create mode 100644 tests/test_delegation_guard.py diff --git a/src/uipath/runtime/governance/delegation_guard.py b/src/uipath/runtime/governance/delegation_guard.py new file mode 100644 index 0000000..f872f62 --- /dev/null +++ b/src/uipath/runtime/governance/delegation_guard.py @@ -0,0 +1,248 @@ +"""Delegation depth guard. + +Patches an agent's ``invoke`` method to track recursion depth and raise +a ``GovernanceBlockException`` when the configured maximum is exceeded. +This prevents runaway sub-agent chains. +""" + +from __future__ import annotations + +import asyncio +import functools +import logging +import os +from contextvars import ContextVar, Token +from typing import Any + +from uipath.core.governance.exceptions import ( + GovernanceBlockException, + GovernanceViolation, +) + +logger = logging.getLogger(__name__) + +_DEFAULT_MAX_DELEGATION_DEPTH = 25 +_ENV_MAX_DELEGATION_DEPTH = "UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH" + +# Single module-level ContextVar holding per-agent delegation depths +# keyed by ``id(agent)``. Each install / uninstall pair shares this one +# ContextVar instead of allocating a new one per agent — the interpreter +# interns ContextVars and never GCs them, so per-agent allocation was an +# unbounded leak in long-running hosts (every `install_delegation_guard` +# call permanently grew the interpreter's ContextVar registry). +# +# Per-context isolation (asyncio task / thread) still works the standard +# ContextVar way: each context sees its own copy of the depths dict, and +# nested invokes use ``set`` / ``reset`` for LIFO depth tracking. The +# dict itself is copied on every increment (copy-on-write) so concurrent +# contexts don't share state through a mutable mapping. +_DELEGATION_DEPTHS: ContextVar[dict[int, int]] = ContextVar( + "_uipath_delegation_depths" +) + + +def _current_depth(agent_key: int) -> int: + """Return the current depth for ``agent_key`` in this context.""" + try: + return _DELEGATION_DEPTHS.get().get(agent_key, 0) + except LookupError: + return 0 + + +def _enter_depth_if_under( + agent_key: int, max_depth: int +) -> tuple[int, Token[dict[int, int]] | None]: + """Attempt to increment depth for ``agent_key``. + + Returns ``(new_depth, token)`` where ``token`` is ``None`` if the + new depth would exceed ``max_depth`` — caller raises and does not + need to clean up. On success, caller must reset via ``token``. + """ + try: + depths = _DELEGATION_DEPTHS.get() + except LookupError: + depths = {} + new_depth = depths.get(agent_key, 0) + 1 + if new_depth > max_depth: + return new_depth, None + new_depths = dict(depths) + new_depths[agent_key] = new_depth + token = _DELEGATION_DEPTHS.set(new_depths) + return new_depth, token + + +def _exit_depth(token: Token[dict[int, int]]) -> None: + """Undo a successful :func:`_enter_depth_if_under` call. + + Tolerates cross-context resets (token created in a different + context — happens when a child task awaits an agent invoke) by + accepting the leak rather than crashing the agent on dispose. + """ + try: + _DELEGATION_DEPTHS.reset(token) + except (ValueError, LookupError): + logger.debug("Delegation depth reset from foreign context") + + +def _resolve_max_depth() -> int: + """Read max-depth from env at call time, falling back to default on parse error.""" + raw = os.getenv(_ENV_MAX_DELEGATION_DEPTH) + if raw is None: + return _DEFAULT_MAX_DELEGATION_DEPTH + try: + return int(raw) + except ValueError: + logger.warning( + "Invalid %s=%r; using default %d", + _ENV_MAX_DELEGATION_DEPTH, + raw, + _DEFAULT_MAX_DELEGATION_DEPTH, + ) + return _DEFAULT_MAX_DELEGATION_DEPTH + + +def _build_violation(current: int, resolved_max: int) -> GovernanceBlockException: + """Build the depth-exceeded exception (shared by sync and async guards).""" + return GovernanceBlockException.from_violation( + GovernanceViolation( + rule_id="ASI-02", + rule_name="Excessive Agency", + detail=f"Delegation depth {current} exceeds max {resolved_max}", + ) + ) + + +def _wrap_invoke(original: Any, agent_key: int, resolved_max: int) -> Any: + """Return a depth-guarded wrapper matching the sync/async shape of ``original``. + + Coroutine functions get an ``async def`` wrapper so the returned object + is itself an awaitable — wrapping with a sync function would return an + un-awaited coroutine and silently bypass the guard entirely. + + Depth lives in the module-level :data:`_DELEGATION_DEPTHS` ContextVar + keyed by ``agent_key`` (``id(agent)``), so every guarded agent shares + the same ContextVar instance and the interpreter's ContextVar + registry doesn't grow with each install. + """ + if asyncio.iscoroutinefunction(original): + + @functools.wraps(original) + async def _guarded_async(input_data: Any, **kwargs: Any) -> Any: + current, token = _enter_depth_if_under(agent_key, resolved_max) + if token is None: + raise _build_violation(current, resolved_max) + try: + return await original(input_data, **kwargs) + finally: + _exit_depth(token) + + return _guarded_async + + @functools.wraps(original) + def _guarded_sync(input_data: Any, **kwargs: Any) -> Any: + current, token = _enter_depth_if_under(agent_key, resolved_max) + if token is None: + raise _build_violation(current, resolved_max) + try: + return original(input_data, **kwargs) + finally: + _exit_depth(token) + + return _guarded_sync + + +# Method names we guard on the agent. ``ainvoke`` is required because +# LangChain / LangGraph / LlamaIndex agents expose it as the primary +# async entrypoint; wrapping only ``invoke`` would let async callers +# bypass the depth check entirely. A single ContextVar is shared across +# both so an async call that internally falls through to sync ``invoke`` +# still increments the same counter. +_GUARDED_METHODS = ("invoke", "ainvoke") + + +def install_delegation_guard(agent: Any, max_depth: int | None = None) -> None: + """Patch the agent's invoke methods to enforce a maximum delegation depth. + + Patches both ``invoke`` and ``ainvoke`` when present; each wrapper + matches the sync/async shape of the original so awaitables stay + awaitable. No-op when neither attribute exists or the agent has + already been guarded. + + Per-call-chain depth is tracked in a single :class:`contextvars.ContextVar` + shared across both methods so an ``ainvoke`` that internally calls + ``invoke`` still increments the same counter. Concurrent invokes on + the same agent (across threads or asyncio tasks) keep separate + counters because ContextVar values are per-context. + + Originals are stashed on the agent under + ``_uipath_original_`` so :func:`uninstall_delegation_guard` + can restore them on dispose. + """ + if max_depth is None: + max_depth = _resolve_max_depth() + if getattr(agent, "_delegation_wrapped", False): + return + + originals = { + name: getattr(agent, name, None) + for name in _GUARDED_METHODS + if callable(getattr(agent, name, None)) + } + if not originals: + return + + agent_key = id(agent) + resolved_max = max_depth + + for name, original in originals.items(): + try: + setattr(agent, name, _wrap_invoke(original, agent_key, resolved_max)) + setattr(agent, f"_uipath_original_{name}", original) + except (AttributeError, TypeError) as exc: + # Some agent objects expose `invoke` via __getattr__ or via a + # slot/descriptor that can't be re-assigned. Skip those — + # better to guard partial coverage than to crash the runtime. + logger.debug("Could not patch %s on agent: %s", name, exc) + agent._delegation_wrapped = True + logger.debug( + "Delegation guard installed (max=%d, methods=%s)", + resolved_max, + list(originals), + ) + + +def uninstall_delegation_guard(agent: Any) -> None: + """Restore the agent's invoke methods if a delegation guard was installed. + + Safe to call on agents that were never guarded. Also clears the + agent's entry from the current context's depth map — ``id(agent)`` + is reused by Python after GC, so a stale entry could mis-attribute + a future agent's count to this one. + """ + if not getattr(agent, "_delegation_wrapped", False): + return + for name in _GUARDED_METHODS: + attr = f"_uipath_original_{name}" + original = getattr(agent, attr, None) + if original is not None: + try: + setattr(agent, name, original) + except Exception as exc: # noqa: BLE001 - dispose path; never raise + logger.debug("Could not restore original %s: %s", name, exc) + try: + delattr(agent, attr) + except AttributeError: + pass + agent._delegation_wrapped = False + # Drop the agent's depth entry in the current context. Best-effort + # — if dispose runs from a different context than where the depth + # was set, the foreign context still owns its own copy and will + # discard it when it ends. + agent_key = id(agent) + try: + depths = _DELEGATION_DEPTHS.get() + except LookupError: + return + if agent_key in depths: + new_depths = {k: v for k, v in depths.items() if k != agent_key} + _DELEGATION_DEPTHS.set(new_depths) diff --git a/tests/test_delegation_guard.py b/tests/test_delegation_guard.py new file mode 100644 index 0000000..a1ba432 --- /dev/null +++ b/tests/test_delegation_guard.py @@ -0,0 +1,320 @@ +"""Tests for the async-aware delegation depth guard. + +The guard wraps an agent's ``invoke`` and ``ainvoke`` so a single +ContextVar tracks delegation depth across both sync and async call +chains. The async wrapper must itself be a coroutine — wrapping with a +sync function would return an un-awaited coroutine and silently bypass +the depth check. +""" + +from __future__ import annotations + +import asyncio +import os +from types import SimpleNamespace + +import pytest +from uipath.core.governance.exceptions import GovernanceBlockException + +from uipath.runtime.governance.delegation_guard import ( + install_delegation_guard, + uninstall_delegation_guard, +) + +# --------------------------------------------------------------------------- +# Helpers — minimal agent shapes the guard might encounter in the wild. +# --------------------------------------------------------------------------- + + +def _make_sync_agent() -> SimpleNamespace: + agent = SimpleNamespace() + agent.invoke = lambda payload, **_: {"sync": payload} + return agent + + +def _make_async_agent() -> SimpleNamespace: + agent = SimpleNamespace() + + async def _ainvoke(payload, **_): + return {"async": payload} + + agent.ainvoke = _ainvoke + return agent + + +def _make_dual_agent() -> SimpleNamespace: + """Agent with both sync invoke and async ainvoke (LangGraph React shape).""" + agent = _make_sync_agent() + + async def _ainvoke(payload, **_): + return {"async": payload} + + agent.ainvoke = _ainvoke + return agent + + +# --------------------------------------------------------------------------- +# Sync path — preserves the original behaviour the guard always had. +# --------------------------------------------------------------------------- + + +def test_sync_invoke_passes_through_under_limit() -> None: + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=3) + assert agent.invoke({"x": 1}) == {"sync": {"x": 1}} + + +def test_sync_invoke_raises_when_depth_exceeded() -> None: + """Recursive sync invokes blow the limit.""" + agent = SimpleNamespace() + calls = {"n": 0} + + def _invoke(_payload, **_): + calls["n"] += 1 + # Recurse into ourselves through the guarded attribute. + return agent.invoke({}) + + agent.invoke = _invoke + install_delegation_guard(agent, max_depth=3) + + with pytest.raises(GovernanceBlockException): + agent.invoke({}) + # Depth check fires inside the wrapper before the original runs, so + # we got exactly max_depth=3 successful entries plus one rejection. + assert calls["n"] == 3 + + +# --------------------------------------------------------------------------- +# Async path — the new shape this change unlocks. +# --------------------------------------------------------------------------- + + +def test_async_wrapper_is_a_coroutine_function() -> None: + """The wrapped ainvoke must itself be awaitable. + + Regression test for the original bug: a sync wrapper around an async + method returned an un-awaited coroutine and silently bypassed the + depth check entirely. + """ + agent = _make_async_agent() + install_delegation_guard(agent, max_depth=3) + assert asyncio.iscoroutinefunction(agent.ainvoke) + + +def test_async_invoke_passes_through_under_limit() -> None: + agent = _make_async_agent() + install_delegation_guard(agent, max_depth=3) + result = asyncio.run(agent.ainvoke({"x": 1})) + assert result == {"async": {"x": 1}} + + +def test_async_invoke_raises_when_depth_exceeded() -> None: + agent = SimpleNamespace() + calls = {"n": 0} + + async def _ainvoke(_payload, **_): + calls["n"] += 1 + return await agent.ainvoke({}) + + agent.ainvoke = _ainvoke + install_delegation_guard(agent, max_depth=3) + + with pytest.raises(GovernanceBlockException): + asyncio.run(agent.ainvoke({})) + assert calls["n"] == 3 + + +def test_sync_and_async_share_one_depth_counter() -> None: + """A coroutine that falls through to sync ``invoke`` increments the same counter.""" + agent = _make_dual_agent() + calls = {"n": 0} + + def _invoke(_payload, **_): + calls["n"] += 1 + # Sync self-recursion through the same guarded attribute. + return agent.invoke({}) + + async def _ainvoke(_payload, **_): + calls["n"] += 1 + # Cross-mode: async entry falls through to the sync path. + return agent.invoke({}) + + agent.invoke = _invoke + agent.ainvoke = _ainvoke + install_delegation_guard(agent, max_depth=2) + + with pytest.raises(GovernanceBlockException): + asyncio.run(agent.ainvoke({})) + # ainvoke (depth=1) → invoke (depth=2) → invoke (depth=3, blocked). + # The guard rejects the third call before _invoke runs, so calls=2. + assert calls["n"] == 2 + + +# --------------------------------------------------------------------------- +# Lifecycle — install / uninstall semantics. +# --------------------------------------------------------------------------- + + +def test_install_is_idempotent() -> None: + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=5) + wrapped_once = agent.invoke + install_delegation_guard(agent, max_depth=5) + assert agent.invoke is wrapped_once, "second install must not re-wrap" + + +def test_uninstall_restores_originals_for_both_methods() -> None: + agent = _make_dual_agent() + original_invoke = agent.invoke + original_ainvoke = agent.ainvoke + install_delegation_guard(agent, max_depth=5) + assert agent.invoke is not original_invoke + assert agent.ainvoke is not original_ainvoke + + uninstall_delegation_guard(agent) + assert agent.invoke is original_invoke + assert agent.ainvoke is original_ainvoke + assert not getattr(agent, "_delegation_wrapped", False) + + +def test_uninstall_safe_on_unguarded_agent() -> None: + agent = _make_sync_agent() + # Should not raise; should leave agent unchanged. + uninstall_delegation_guard(agent) + assert callable(agent.invoke) + + +# --------------------------------------------------------------------------- +# Edge cases. +# --------------------------------------------------------------------------- + + +def test_agent_without_invoke_methods_is_noop() -> None: + """Agents without any invokable method must not crash the install.""" + agent = SimpleNamespace(unrelated="value") + install_delegation_guard(agent, max_depth=5) + assert not getattr(agent, "_delegation_wrapped", False) + + +def test_env_var_max_depth_override(monkeypatch: pytest.MonkeyPatch) -> None: + """``UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH`` overrides the default.""" + monkeypatch.setenv("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH", "1") + agent = SimpleNamespace() + calls = {"n": 0} + + def _invoke(_payload, **_): + calls["n"] += 1 + return agent.invoke({}) + + agent.invoke = _invoke + install_delegation_guard(agent) # picks up env + + with pytest.raises(GovernanceBlockException): + agent.invoke({}) + assert calls["n"] == 1, "max_depth=1 should allow exactly one call" + + +def test_invalid_env_var_falls_back_to_default( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setenv("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH", "not-a-number") + agent = _make_sync_agent() + # Should not raise on install — falls back silently to the default. + install_delegation_guard(agent) + assert os.environ.get("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH") == "not-a-number" + assert callable(agent.invoke) + + +# --------------------------------------------------------------------------- +# Leak / scaling — pins the shared-ContextVar design. +# --------------------------------------------------------------------------- + + +def test_install_does_not_allocate_per_agent_contextvars() -> None: + """N installs must not grow the module's ContextVar registry by N. + + The old implementation allocated a ``ContextVar`` per agent. Since + ContextVar instances are interned by the interpreter and never GC'd, + that was an unbounded leak. The current design holds a single + module-level ContextVar of ``dict[id(agent), int]``. + """ + from uipath.runtime.governance import delegation_guard as dg + + # Snapshot the single shared ContextVar. + shared_var = dg._DELEGATION_DEPTHS + + for _ in range(100): + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=3) + uninstall_delegation_guard(agent) + + # The module-level ContextVar is unchanged — same instance, no new + # ContextVars were allocated. + assert dg._DELEGATION_DEPTHS is shared_var + + +def test_two_agents_have_independent_depth_counters() -> None: + """Exhausting one agent's depth limit doesn't leak into another agent. + + Both agents share the single module-level ContextVar but the dict + inside isolates them via ``id(agent)``. + """ + from uipath.runtime.governance import delegation_guard as dg + + agent_a = SimpleNamespace() + calls_a = {"n": 0} + + def _invoke_a(_payload, **_): + calls_a["n"] += 1 + return agent_a.invoke({}) # self-recursion until limit + + agent_a.invoke = _invoke_a + + agent_b = _make_sync_agent() + + install_delegation_guard(agent_a, max_depth=2) + install_delegation_guard(agent_b, max_depth=2) + + # Drive agent_a to its limit. + with pytest.raises(GovernanceBlockException): + agent_a.invoke({}) + assert calls_a["n"] == 2 + + # agent_b is a fresh chain in the same context. Its depth counter + # is keyed by id(agent_b), so agent_a's exhausted state doesn't + # affect it. Without the per-agent keying, agent_b would inherit + # whatever depth was last set in this context. + assert agent_b.invoke({"x": 1}) == {"sync": {"x": 1}} + + # After both calls, the ContextVar should be back to its initial + # state — either unset (LookupError) or holding an empty dict. The + # set/reset pairs each guarded call cleaned up after itself. + try: + depths = dg._DELEGATION_DEPTHS.get() + except LookupError: + depths = {} + assert depths.get(id(agent_a), 0) == 0 + assert depths.get(id(agent_b), 0) == 0 + + +def test_uninstall_clears_agent_depth_entry() -> None: + """After uninstall, the agent's id is no longer in the depths dict. + + Prevents ``id(agent)`` reuse — Python recycles ids after GC — from + mis-attributing a future agent's count to this one. + """ + from uipath.runtime.governance import delegation_guard as dg + + agent = _make_sync_agent() + install_delegation_guard(agent, max_depth=5) + # Enter the guard once so the agent gets a depth entry. + agent.invoke({}) + # invoke completed -> token reset -> entry should be back to 0 or + # absent. We re-enter manually to plant a non-zero entry. + agent_key = id(agent) + dg._DELEGATION_DEPTHS.set({agent_key: 3}) + assert dg._DELEGATION_DEPTHS.get().get(agent_key) == 3 + + uninstall_delegation_guard(agent) + # Uninstall pops the entry from the current context. + assert agent_key not in dg._DELEGATION_DEPTHS.get() From 600c18af9491f963bda18c83805b322ee921dc2b Mon Sep 17 00:00:00 2001 From: Aditi Kumari Date: Tue, 16 Jun 2026 15:36:16 +0530 Subject: [PATCH 2/3] =?UTF-8?q?fix(governance):=20address=20PR=20review=20?= =?UTF-8?q?=E2=80=94=20only=20mark=20agent=20guarded=20when=20a=20method?= =?UTF-8?q?=20is=20actually=20patched;=20correct=20=5Fresolve=5Fmax=5Fdept?= =?UTF-8?q?h=20docstring=20(install-time)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Claude Opus 4.8 --- .../runtime/governance/delegation_guard.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/uipath/runtime/governance/delegation_guard.py b/src/uipath/runtime/governance/delegation_guard.py index f872f62..18a4aa5 100644 --- a/src/uipath/runtime/governance/delegation_guard.py +++ b/src/uipath/runtime/governance/delegation_guard.py @@ -85,7 +85,12 @@ def _exit_depth(token: Token[dict[int, int]]) -> None: def _resolve_max_depth() -> int: - """Read max-depth from env at call time, falling back to default on parse error.""" + """Read max-depth from env at install time, falling back to default on parse error. + + Called once from :func:`install_delegation_guard`; the resolved value is + captured per agent (``resolved_max``), so changing the env var after the + guard is installed has no effect on already-wrapped agents. + """ raw = os.getenv(_ENV_MAX_DELEGATION_DEPTH) if raw is None: return _DEFAULT_MAX_DELEGATION_DEPTH @@ -194,20 +199,30 @@ def install_delegation_guard(agent: Any, max_depth: int | None = None) -> None: agent_key = id(agent) resolved_max = max_depth + patched: list[str] = [] for name, original in originals.items(): try: setattr(agent, name, _wrap_invoke(original, agent_key, resolved_max)) setattr(agent, f"_uipath_original_{name}", original) + patched.append(name) except (AttributeError, TypeError) as exc: # Some agent objects expose `invoke` via __getattr__ or via a # slot/descriptor that can't be re-assigned. Skip those — # better to guard partial coverage than to crash the runtime. logger.debug("Could not patch %s on agent: %s", name, exc) + + if not patched: + # Nothing was actually wrapped — don't mark the agent as guarded, + # or a later retry / uninstall would wrongly assume methods were + # patched. + logger.debug("Delegation guard patched no methods; leaving agent unguarded") + return + agent._delegation_wrapped = True logger.debug( "Delegation guard installed (max=%d, methods=%s)", resolved_max, - list(originals), + patched, ) From 95cbcb29055d397ea3dab8710e660428be1bbf00 Mon Sep 17 00:00:00 2001 From: Viswanath Lekshmanan Date: Thu, 25 Jun 2026 12:31:00 +0530 Subject: [PATCH 3/3] refactor(governance): drop runtime-side delegation guard (monkeypatch) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes architecture-review §2.3 — the delegation guard monkey-patched agent ``invoke``/``ainvoke`` methods in place via ``setattr``, naming no framework but mutating framework-owned objects through their private shapes. Fragile, depends on agent internals, and the runtime layer shouldn't be reaching into objects it didn't construct. Correct seam is the framework callback handler, which already receives ``parent_run_id`` on every callback and can derive delegation depth from the run tree without touching the agent. That work lives on the LangChain side (uipath-langchain-python PR #899, which is done) — so the runtime-side module is dead weight. Deletions - src/uipath/runtime/governance/delegation_guard.py (265 LOC) — ``install_delegation_guard`` / ``uninstall_delegation_guard``, the per-agent ContextVar depth tracking, the setattr-based wrap. - tests/test_delegation_guard.py (320 LOC) — the entire test suite for the deleted module. Verification - Monorepo grep for ``delegation_guard``, ``install_delegation_guard``, ``uninstall_delegation_guard``, ``ASI-02``, ``Excessive Agency``, and ``UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH``: zero hits outside the deleted files. The module was self-contained. - ruff clean, mypy clean (11 source files), 357 passed + 1 skipped (pre-existing wrapper skip). Net diff: −585 LOC. After this PR's rebase onto #124, the branch contains only deletions on top of the evaluator slice. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../runtime/governance/delegation_guard.py | 263 -------------- tests/test_delegation_guard.py | 320 ------------------ 2 files changed, 583 deletions(-) delete mode 100644 src/uipath/runtime/governance/delegation_guard.py delete mode 100644 tests/test_delegation_guard.py diff --git a/src/uipath/runtime/governance/delegation_guard.py b/src/uipath/runtime/governance/delegation_guard.py deleted file mode 100644 index 18a4aa5..0000000 --- a/src/uipath/runtime/governance/delegation_guard.py +++ /dev/null @@ -1,263 +0,0 @@ -"""Delegation depth guard. - -Patches an agent's ``invoke`` method to track recursion depth and raise -a ``GovernanceBlockException`` when the configured maximum is exceeded. -This prevents runaway sub-agent chains. -""" - -from __future__ import annotations - -import asyncio -import functools -import logging -import os -from contextvars import ContextVar, Token -from typing import Any - -from uipath.core.governance.exceptions import ( - GovernanceBlockException, - GovernanceViolation, -) - -logger = logging.getLogger(__name__) - -_DEFAULT_MAX_DELEGATION_DEPTH = 25 -_ENV_MAX_DELEGATION_DEPTH = "UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH" - -# Single module-level ContextVar holding per-agent delegation depths -# keyed by ``id(agent)``. Each install / uninstall pair shares this one -# ContextVar instead of allocating a new one per agent — the interpreter -# interns ContextVars and never GCs them, so per-agent allocation was an -# unbounded leak in long-running hosts (every `install_delegation_guard` -# call permanently grew the interpreter's ContextVar registry). -# -# Per-context isolation (asyncio task / thread) still works the standard -# ContextVar way: each context sees its own copy of the depths dict, and -# nested invokes use ``set`` / ``reset`` for LIFO depth tracking. The -# dict itself is copied on every increment (copy-on-write) so concurrent -# contexts don't share state through a mutable mapping. -_DELEGATION_DEPTHS: ContextVar[dict[int, int]] = ContextVar( - "_uipath_delegation_depths" -) - - -def _current_depth(agent_key: int) -> int: - """Return the current depth for ``agent_key`` in this context.""" - try: - return _DELEGATION_DEPTHS.get().get(agent_key, 0) - except LookupError: - return 0 - - -def _enter_depth_if_under( - agent_key: int, max_depth: int -) -> tuple[int, Token[dict[int, int]] | None]: - """Attempt to increment depth for ``agent_key``. - - Returns ``(new_depth, token)`` where ``token`` is ``None`` if the - new depth would exceed ``max_depth`` — caller raises and does not - need to clean up. On success, caller must reset via ``token``. - """ - try: - depths = _DELEGATION_DEPTHS.get() - except LookupError: - depths = {} - new_depth = depths.get(agent_key, 0) + 1 - if new_depth > max_depth: - return new_depth, None - new_depths = dict(depths) - new_depths[agent_key] = new_depth - token = _DELEGATION_DEPTHS.set(new_depths) - return new_depth, token - - -def _exit_depth(token: Token[dict[int, int]]) -> None: - """Undo a successful :func:`_enter_depth_if_under` call. - - Tolerates cross-context resets (token created in a different - context — happens when a child task awaits an agent invoke) by - accepting the leak rather than crashing the agent on dispose. - """ - try: - _DELEGATION_DEPTHS.reset(token) - except (ValueError, LookupError): - logger.debug("Delegation depth reset from foreign context") - - -def _resolve_max_depth() -> int: - """Read max-depth from env at install time, falling back to default on parse error. - - Called once from :func:`install_delegation_guard`; the resolved value is - captured per agent (``resolved_max``), so changing the env var after the - guard is installed has no effect on already-wrapped agents. - """ - raw = os.getenv(_ENV_MAX_DELEGATION_DEPTH) - if raw is None: - return _DEFAULT_MAX_DELEGATION_DEPTH - try: - return int(raw) - except ValueError: - logger.warning( - "Invalid %s=%r; using default %d", - _ENV_MAX_DELEGATION_DEPTH, - raw, - _DEFAULT_MAX_DELEGATION_DEPTH, - ) - return _DEFAULT_MAX_DELEGATION_DEPTH - - -def _build_violation(current: int, resolved_max: int) -> GovernanceBlockException: - """Build the depth-exceeded exception (shared by sync and async guards).""" - return GovernanceBlockException.from_violation( - GovernanceViolation( - rule_id="ASI-02", - rule_name="Excessive Agency", - detail=f"Delegation depth {current} exceeds max {resolved_max}", - ) - ) - - -def _wrap_invoke(original: Any, agent_key: int, resolved_max: int) -> Any: - """Return a depth-guarded wrapper matching the sync/async shape of ``original``. - - Coroutine functions get an ``async def`` wrapper so the returned object - is itself an awaitable — wrapping with a sync function would return an - un-awaited coroutine and silently bypass the guard entirely. - - Depth lives in the module-level :data:`_DELEGATION_DEPTHS` ContextVar - keyed by ``agent_key`` (``id(agent)``), so every guarded agent shares - the same ContextVar instance and the interpreter's ContextVar - registry doesn't grow with each install. - """ - if asyncio.iscoroutinefunction(original): - - @functools.wraps(original) - async def _guarded_async(input_data: Any, **kwargs: Any) -> Any: - current, token = _enter_depth_if_under(agent_key, resolved_max) - if token is None: - raise _build_violation(current, resolved_max) - try: - return await original(input_data, **kwargs) - finally: - _exit_depth(token) - - return _guarded_async - - @functools.wraps(original) - def _guarded_sync(input_data: Any, **kwargs: Any) -> Any: - current, token = _enter_depth_if_under(agent_key, resolved_max) - if token is None: - raise _build_violation(current, resolved_max) - try: - return original(input_data, **kwargs) - finally: - _exit_depth(token) - - return _guarded_sync - - -# Method names we guard on the agent. ``ainvoke`` is required because -# LangChain / LangGraph / LlamaIndex agents expose it as the primary -# async entrypoint; wrapping only ``invoke`` would let async callers -# bypass the depth check entirely. A single ContextVar is shared across -# both so an async call that internally falls through to sync ``invoke`` -# still increments the same counter. -_GUARDED_METHODS = ("invoke", "ainvoke") - - -def install_delegation_guard(agent: Any, max_depth: int | None = None) -> None: - """Patch the agent's invoke methods to enforce a maximum delegation depth. - - Patches both ``invoke`` and ``ainvoke`` when present; each wrapper - matches the sync/async shape of the original so awaitables stay - awaitable. No-op when neither attribute exists or the agent has - already been guarded. - - Per-call-chain depth is tracked in a single :class:`contextvars.ContextVar` - shared across both methods so an ``ainvoke`` that internally calls - ``invoke`` still increments the same counter. Concurrent invokes on - the same agent (across threads or asyncio tasks) keep separate - counters because ContextVar values are per-context. - - Originals are stashed on the agent under - ``_uipath_original_`` so :func:`uninstall_delegation_guard` - can restore them on dispose. - """ - if max_depth is None: - max_depth = _resolve_max_depth() - if getattr(agent, "_delegation_wrapped", False): - return - - originals = { - name: getattr(agent, name, None) - for name in _GUARDED_METHODS - if callable(getattr(agent, name, None)) - } - if not originals: - return - - agent_key = id(agent) - resolved_max = max_depth - - patched: list[str] = [] - for name, original in originals.items(): - try: - setattr(agent, name, _wrap_invoke(original, agent_key, resolved_max)) - setattr(agent, f"_uipath_original_{name}", original) - patched.append(name) - except (AttributeError, TypeError) as exc: - # Some agent objects expose `invoke` via __getattr__ or via a - # slot/descriptor that can't be re-assigned. Skip those — - # better to guard partial coverage than to crash the runtime. - logger.debug("Could not patch %s on agent: %s", name, exc) - - if not patched: - # Nothing was actually wrapped — don't mark the agent as guarded, - # or a later retry / uninstall would wrongly assume methods were - # patched. - logger.debug("Delegation guard patched no methods; leaving agent unguarded") - return - - agent._delegation_wrapped = True - logger.debug( - "Delegation guard installed (max=%d, methods=%s)", - resolved_max, - patched, - ) - - -def uninstall_delegation_guard(agent: Any) -> None: - """Restore the agent's invoke methods if a delegation guard was installed. - - Safe to call on agents that were never guarded. Also clears the - agent's entry from the current context's depth map — ``id(agent)`` - is reused by Python after GC, so a stale entry could mis-attribute - a future agent's count to this one. - """ - if not getattr(agent, "_delegation_wrapped", False): - return - for name in _GUARDED_METHODS: - attr = f"_uipath_original_{name}" - original = getattr(agent, attr, None) - if original is not None: - try: - setattr(agent, name, original) - except Exception as exc: # noqa: BLE001 - dispose path; never raise - logger.debug("Could not restore original %s: %s", name, exc) - try: - delattr(agent, attr) - except AttributeError: - pass - agent._delegation_wrapped = False - # Drop the agent's depth entry in the current context. Best-effort - # — if dispose runs from a different context than where the depth - # was set, the foreign context still owns its own copy and will - # discard it when it ends. - agent_key = id(agent) - try: - depths = _DELEGATION_DEPTHS.get() - except LookupError: - return - if agent_key in depths: - new_depths = {k: v for k, v in depths.items() if k != agent_key} - _DELEGATION_DEPTHS.set(new_depths) diff --git a/tests/test_delegation_guard.py b/tests/test_delegation_guard.py deleted file mode 100644 index a1ba432..0000000 --- a/tests/test_delegation_guard.py +++ /dev/null @@ -1,320 +0,0 @@ -"""Tests for the async-aware delegation depth guard. - -The guard wraps an agent's ``invoke`` and ``ainvoke`` so a single -ContextVar tracks delegation depth across both sync and async call -chains. The async wrapper must itself be a coroutine — wrapping with a -sync function would return an un-awaited coroutine and silently bypass -the depth check. -""" - -from __future__ import annotations - -import asyncio -import os -from types import SimpleNamespace - -import pytest -from uipath.core.governance.exceptions import GovernanceBlockException - -from uipath.runtime.governance.delegation_guard import ( - install_delegation_guard, - uninstall_delegation_guard, -) - -# --------------------------------------------------------------------------- -# Helpers — minimal agent shapes the guard might encounter in the wild. -# --------------------------------------------------------------------------- - - -def _make_sync_agent() -> SimpleNamespace: - agent = SimpleNamespace() - agent.invoke = lambda payload, **_: {"sync": payload} - return agent - - -def _make_async_agent() -> SimpleNamespace: - agent = SimpleNamespace() - - async def _ainvoke(payload, **_): - return {"async": payload} - - agent.ainvoke = _ainvoke - return agent - - -def _make_dual_agent() -> SimpleNamespace: - """Agent with both sync invoke and async ainvoke (LangGraph React shape).""" - agent = _make_sync_agent() - - async def _ainvoke(payload, **_): - return {"async": payload} - - agent.ainvoke = _ainvoke - return agent - - -# --------------------------------------------------------------------------- -# Sync path — preserves the original behaviour the guard always had. -# --------------------------------------------------------------------------- - - -def test_sync_invoke_passes_through_under_limit() -> None: - agent = _make_sync_agent() - install_delegation_guard(agent, max_depth=3) - assert agent.invoke({"x": 1}) == {"sync": {"x": 1}} - - -def test_sync_invoke_raises_when_depth_exceeded() -> None: - """Recursive sync invokes blow the limit.""" - agent = SimpleNamespace() - calls = {"n": 0} - - def _invoke(_payload, **_): - calls["n"] += 1 - # Recurse into ourselves through the guarded attribute. - return agent.invoke({}) - - agent.invoke = _invoke - install_delegation_guard(agent, max_depth=3) - - with pytest.raises(GovernanceBlockException): - agent.invoke({}) - # Depth check fires inside the wrapper before the original runs, so - # we got exactly max_depth=3 successful entries plus one rejection. - assert calls["n"] == 3 - - -# --------------------------------------------------------------------------- -# Async path — the new shape this change unlocks. -# --------------------------------------------------------------------------- - - -def test_async_wrapper_is_a_coroutine_function() -> None: - """The wrapped ainvoke must itself be awaitable. - - Regression test for the original bug: a sync wrapper around an async - method returned an un-awaited coroutine and silently bypassed the - depth check entirely. - """ - agent = _make_async_agent() - install_delegation_guard(agent, max_depth=3) - assert asyncio.iscoroutinefunction(agent.ainvoke) - - -def test_async_invoke_passes_through_under_limit() -> None: - agent = _make_async_agent() - install_delegation_guard(agent, max_depth=3) - result = asyncio.run(agent.ainvoke({"x": 1})) - assert result == {"async": {"x": 1}} - - -def test_async_invoke_raises_when_depth_exceeded() -> None: - agent = SimpleNamespace() - calls = {"n": 0} - - async def _ainvoke(_payload, **_): - calls["n"] += 1 - return await agent.ainvoke({}) - - agent.ainvoke = _ainvoke - install_delegation_guard(agent, max_depth=3) - - with pytest.raises(GovernanceBlockException): - asyncio.run(agent.ainvoke({})) - assert calls["n"] == 3 - - -def test_sync_and_async_share_one_depth_counter() -> None: - """A coroutine that falls through to sync ``invoke`` increments the same counter.""" - agent = _make_dual_agent() - calls = {"n": 0} - - def _invoke(_payload, **_): - calls["n"] += 1 - # Sync self-recursion through the same guarded attribute. - return agent.invoke({}) - - async def _ainvoke(_payload, **_): - calls["n"] += 1 - # Cross-mode: async entry falls through to the sync path. - return agent.invoke({}) - - agent.invoke = _invoke - agent.ainvoke = _ainvoke - install_delegation_guard(agent, max_depth=2) - - with pytest.raises(GovernanceBlockException): - asyncio.run(agent.ainvoke({})) - # ainvoke (depth=1) → invoke (depth=2) → invoke (depth=3, blocked). - # The guard rejects the third call before _invoke runs, so calls=2. - assert calls["n"] == 2 - - -# --------------------------------------------------------------------------- -# Lifecycle — install / uninstall semantics. -# --------------------------------------------------------------------------- - - -def test_install_is_idempotent() -> None: - agent = _make_sync_agent() - install_delegation_guard(agent, max_depth=5) - wrapped_once = agent.invoke - install_delegation_guard(agent, max_depth=5) - assert agent.invoke is wrapped_once, "second install must not re-wrap" - - -def test_uninstall_restores_originals_for_both_methods() -> None: - agent = _make_dual_agent() - original_invoke = agent.invoke - original_ainvoke = agent.ainvoke - install_delegation_guard(agent, max_depth=5) - assert agent.invoke is not original_invoke - assert agent.ainvoke is not original_ainvoke - - uninstall_delegation_guard(agent) - assert agent.invoke is original_invoke - assert agent.ainvoke is original_ainvoke - assert not getattr(agent, "_delegation_wrapped", False) - - -def test_uninstall_safe_on_unguarded_agent() -> None: - agent = _make_sync_agent() - # Should not raise; should leave agent unchanged. - uninstall_delegation_guard(agent) - assert callable(agent.invoke) - - -# --------------------------------------------------------------------------- -# Edge cases. -# --------------------------------------------------------------------------- - - -def test_agent_without_invoke_methods_is_noop() -> None: - """Agents without any invokable method must not crash the install.""" - agent = SimpleNamespace(unrelated="value") - install_delegation_guard(agent, max_depth=5) - assert not getattr(agent, "_delegation_wrapped", False) - - -def test_env_var_max_depth_override(monkeypatch: pytest.MonkeyPatch) -> None: - """``UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH`` overrides the default.""" - monkeypatch.setenv("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH", "1") - agent = SimpleNamespace() - calls = {"n": 0} - - def _invoke(_payload, **_): - calls["n"] += 1 - return agent.invoke({}) - - agent.invoke = _invoke - install_delegation_guard(agent) # picks up env - - with pytest.raises(GovernanceBlockException): - agent.invoke({}) - assert calls["n"] == 1, "max_depth=1 should allow exactly one call" - - -def test_invalid_env_var_falls_back_to_default( - monkeypatch: pytest.MonkeyPatch, -) -> None: - monkeypatch.setenv("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH", "not-a-number") - agent = _make_sync_agent() - # Should not raise on install — falls back silently to the default. - install_delegation_guard(agent) - assert os.environ.get("UIPATH_GOVERNANCE_MAX_DELEGATION_DEPTH") == "not-a-number" - assert callable(agent.invoke) - - -# --------------------------------------------------------------------------- -# Leak / scaling — pins the shared-ContextVar design. -# --------------------------------------------------------------------------- - - -def test_install_does_not_allocate_per_agent_contextvars() -> None: - """N installs must not grow the module's ContextVar registry by N. - - The old implementation allocated a ``ContextVar`` per agent. Since - ContextVar instances are interned by the interpreter and never GC'd, - that was an unbounded leak. The current design holds a single - module-level ContextVar of ``dict[id(agent), int]``. - """ - from uipath.runtime.governance import delegation_guard as dg - - # Snapshot the single shared ContextVar. - shared_var = dg._DELEGATION_DEPTHS - - for _ in range(100): - agent = _make_sync_agent() - install_delegation_guard(agent, max_depth=3) - uninstall_delegation_guard(agent) - - # The module-level ContextVar is unchanged — same instance, no new - # ContextVars were allocated. - assert dg._DELEGATION_DEPTHS is shared_var - - -def test_two_agents_have_independent_depth_counters() -> None: - """Exhausting one agent's depth limit doesn't leak into another agent. - - Both agents share the single module-level ContextVar but the dict - inside isolates them via ``id(agent)``. - """ - from uipath.runtime.governance import delegation_guard as dg - - agent_a = SimpleNamespace() - calls_a = {"n": 0} - - def _invoke_a(_payload, **_): - calls_a["n"] += 1 - return agent_a.invoke({}) # self-recursion until limit - - agent_a.invoke = _invoke_a - - agent_b = _make_sync_agent() - - install_delegation_guard(agent_a, max_depth=2) - install_delegation_guard(agent_b, max_depth=2) - - # Drive agent_a to its limit. - with pytest.raises(GovernanceBlockException): - agent_a.invoke({}) - assert calls_a["n"] == 2 - - # agent_b is a fresh chain in the same context. Its depth counter - # is keyed by id(agent_b), so agent_a's exhausted state doesn't - # affect it. Without the per-agent keying, agent_b would inherit - # whatever depth was last set in this context. - assert agent_b.invoke({"x": 1}) == {"sync": {"x": 1}} - - # After both calls, the ContextVar should be back to its initial - # state — either unset (LookupError) or holding an empty dict. The - # set/reset pairs each guarded call cleaned up after itself. - try: - depths = dg._DELEGATION_DEPTHS.get() - except LookupError: - depths = {} - assert depths.get(id(agent_a), 0) == 0 - assert depths.get(id(agent_b), 0) == 0 - - -def test_uninstall_clears_agent_depth_entry() -> None: - """After uninstall, the agent's id is no longer in the depths dict. - - Prevents ``id(agent)`` reuse — Python recycles ids after GC — from - mis-attributing a future agent's count to this one. - """ - from uipath.runtime.governance import delegation_guard as dg - - agent = _make_sync_agent() - install_delegation_guard(agent, max_depth=5) - # Enter the guard once so the agent gets a depth entry. - agent.invoke({}) - # invoke completed -> token reset -> entry should be back to 0 or - # absent. We re-enter manually to plant a non-zero entry. - agent_key = id(agent) - dg._DELEGATION_DEPTHS.set({agent_key: 3}) - assert dg._DELEGATION_DEPTHS.get().get(agent_key) == 3 - - uninstall_delegation_guard(agent) - # Uninstall pops the entry from the current context. - assert agent_key not in dg._DELEGATION_DEPTHS.get()