diff --git a/pyproject.toml b/pyproject.toml index 277cf622a..3cdc90bd5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,7 @@ requires-python = ">=3.11" dependencies = [ "uipath>=2.10.79, <2.11.0", "uipath-core>=0.5.17, <0.6.0", - "uipath-platform>=0.1.61, <0.2.0", + "uipath-platform>=0.1.65, <0.2.0", "uipath-runtime>=0.11.0, <0.12.0", "langgraph>=1.1.8, <2.0.0", "langchain-core>=1.2.11, <2.0.0", @@ -19,7 +19,7 @@ dependencies = [ "openinference-instrumentation-langchain>=0.1.56", "jsonschema-pydantic-converter>=0.4.0", "jsonpath-ng>=1.7.0", - "mcp==1.26.0", + "mcp==1.27.2", "langchain-mcp-adapters==0.2.1", "pillow>=12.1.1", "a2a-sdk>=0.2.0,<1.0.0", diff --git a/src/uipath_langchain/agent/tools/mcp/__init__.py b/src/uipath_langchain/agent/tools/mcp/__init__.py index 7b2c33f35..9ce200a91 100644 --- a/src/uipath_langchain/agent/tools/mcp/__init__.py +++ b/src/uipath_langchain/agent/tools/mcp/__init__.py @@ -1,5 +1,6 @@ """MCP (Model Context Protocol) tools.""" +from .job_executor import LangGraphJobExecutor from .mcp_client import McpClient, SessionInfoFactory from .mcp_tool import ( create_mcp_tools, @@ -9,6 +10,7 @@ from .streamable_http import SessionInfo __all__ = [ + "LangGraphJobExecutor", "McpClient", "SessionInfo", "SessionInfoFactory", diff --git a/src/uipath_langchain/agent/tools/mcp/claude.md b/src/uipath_langchain/agent/tools/mcp/claude.md index ecddb3b94..5c0174a4f 100644 --- a/src/uipath_langchain/agent/tools/mcp/claude.md +++ b/src/uipath_langchain/agent/tools/mcp/claude.md @@ -24,12 +24,14 @@ src/uipath_langchain/agent/tools/mcp/ ├── __init__.py # Public exports ├── mcp_client.py # SessionInfoFactory, McpClient ├── mcp_tool.py # Tool factory functions +├── job_executor.py # LangGraphJobExecutor (uipath.com/job suspend/resume) └── streamable_http.py # SessionInfo, StreamableHTTPTransport (copied from MCP SDK) ``` ### Public Exports (`__init__.py`) ```python +from .job_executor import LangGraphJobExecutor from .mcp_client import McpClient, SessionInfoFactory from .mcp_tool import ( create_mcp_tools_and_clients, @@ -502,6 +504,57 @@ endpoint (`GET/PUT agenthub_/design/debugstate/{agentId}/{key}`). It lives in `uipath-agents` because it depends on execution-type logic that belongs in the agent layer, not in the langchain tools layer. +## `uipath.com/job` — long-running job support + +When a UiPath MCP server backs a tool with an Orchestrator **job**, the agent +should *suspend* while the job runs and *resume* with its result instead of +blocking. This is negotiated entirely through MCP `_meta` (no `agent.json` +change), so a deployed agent gains the behavior on package upgrade. + +**Flow (single key `uipath.com/job`, all in `_meta`):** + +1. **Advertise** — on `initialize`, a job-capable server returns + `InitializeResult._meta["uipath.com/job"] = {"version": 1}`. + `McpClient._apply_job_advertisement()` reads it and sets `is_job_aware` / + `job_version`. Re-read on every fresh `initialize`, so the flag is stable + across a production suspend/resume. +2. **START** — for a job-aware session, `mcp_tool._invoke_job_aware` sends + `params._meta["uipath.com/job"] = {"version": N}` (no `key`) on `tools/call` + via `McpClient.call_tool(..., meta=...)`. A job-backed tool returns + `result._meta["uipath.com/job"] = {"key", "folderKey"}` immediately. +3. **Suspend** — the `McpJobExecutor` (default `LangGraphJobExecutor`) interrupts + with `WaitJobRaw(Job(id=0, key, folder_key), process_folder_key=folder_key)` + inside `@durable_interrupt` (same mechanism as `process_tool`). The runtime + persists a `JOB` resume trigger (`JOB_RAW` name → no output extraction); + Orchestrator resumes the agent when the child job is terminal. +4. **FETCH** — on resume the body is skipped; `interrupt(None)` returns the + terminal `Job`. The executor re-derives `{key, folderKey}` from it and calls + the neutral `fetch`, which re-issues `tools/call` (no args) with + `params._meta["uipath.com/job"] = {"key", "folderKey"}`. The server returns + the formatted result — that `CallToolResult` is the tool's output. + +**Where the pieces live:** + +| Piece | Location | Notes | +|-------|----------|-------| +| `_meta` build/parse, `UiPathJobHandle`, `JobStart`, `McpJobExecutor`, `BlockingJobExecutor` | `uipath.platform.mcp_jobs` (uipath-python) | Framework- and MCP-SDK-neutral (plain dicts) | +| `is_job_aware`, `job_version`, `job_executor`, `call_tool(meta=)`, `_apply_job_advertisement` | `mcp_client.py` | Reads the advertisement; threads `_meta` | +| `_invoke_job_aware`, `_normalize_tool_result`, `create_mcp_tools_and_clients(job_executor=)` | `mcp_tool.py` | Builds the neutral `start`/`fetch`; default executor = `LangGraphJobExecutor` | +| `LangGraphJobExecutor` (the only place `interrupt` is called) | `job_executor.py` | Suspend → resume → fetch | + +**Key invariants:** + +- `interrupt` is confined to `LangGraphJobExecutor`. The neutral core never + imports langgraph; the executor never imports `mcp`. +- The job handle survives the suspend **only** via the `WaitJobRaw` payload + (re-derived from the resumed `Job`), never via closures — the + `@durable_interrupt` body does not re-run on resume. +- Non-job tools on a job-aware server: the server returns a normal result (no + handle); `LangGraphJobExecutor` resolves it via `SkipInterruptValue` + (`_NonJobStartValue`) so the durable index stays aligned with no real suspend. +- Old/non-UiPath servers never advertise → `is_job_aware` stays `False` → today's + plain blocking path (back-compat, zero behavior change). + ## Tests Tests are in `tests/agent/tools/test_mcp/`. diff --git a/src/uipath_langchain/agent/tools/mcp/job_executor.py b/src/uipath_langchain/agent/tools/mcp/job_executor.py new file mode 100644 index 000000000..5b5538a53 --- /dev/null +++ b/src/uipath_langchain/agent/tools/mcp/job_executor.py @@ -0,0 +1,115 @@ +"""LangGraph executor for MCP-backed UiPath jobs. + +When an MCP ``tools/call`` starts a UiPath job (the server returns a +``uipath.com/job`` handle), :class:`LangGraphJobExecutor` suspends the LangGraph +agent on that job and resumes when it finishes — the same durable suspend/resume +mechanism :mod:`uipath_langchain.agent.tools.process_tool` uses. + +It mirrors ``process_tool`` exactly: + +* the START ``tools/call`` runs **inside** ``@durable_interrupt`` so it executes + exactly once (a resume re-runs the node but skips the body); +* it interrupts with ``WaitJobRaw`` so the runtime persists a ``JOB`` resume + trigger (with the ``JOB_RAW`` name → resume without output extraction) and + Orchestrator resumes the agent when the child job reaches a terminal state; +* on resume the body is skipped and ``interrupt(None)`` returns the terminal + ``Job``; we re-derive the ``{key, folderKey}`` handle from it and FETCH the + result with a follow-up ``tools/call`` (the server formats the output). + +The neutral wire work (building the START/FETCH ``_meta``, parsing the handle) +lives in :mod:`uipath.platform.mcp_jobs`; this class only owns the *suspend* +policy, so ``interrupt`` is confined here. +""" + +from __future__ import annotations + +from typing import Any + +from uipath.platform.common import WaitJobRaw +from uipath.platform.mcp_jobs import ( + FetchFn, + JobStart, + StartFn, + UiPathJobHandle, +) +from uipath.platform.orchestrator import Job, JobState + +from uipath_langchain._utils.durable_interrupt import ( + SkipInterruptValue, + durable_interrupt, +) + + +class _NonJobStartValue(SkipInterruptValue): + """Carries a non-job :class:`JobStart` back through ``@durable_interrupt``. + + A job-aware client sends the START ``_meta`` on every call, but the server + only returns a handle for job-backed tools. For a normal (non-job) result we + must NOT suspend — yet the ``@durable_interrupt`` body still has to run on + every pass to keep the durable index aligned. Returning this value injects the + result into the scratchpad and resumes immediately, without a real suspend. + """ + + def __init__(self, outcome: JobStart) -> None: + self._outcome = outcome + + @property + def resume_value(self) -> Any: + """The :class:`JobStart` to return to the executor without suspending.""" + return self._outcome + + +class LangGraphJobExecutor: + """``McpJobExecutor`` that suspends the LangGraph agent on the started job.""" + + async def run(self, *, start: StartFn, fetch: FetchFn, tool_name: str) -> Any: + """Start the job, suspend until it finishes, then FETCH its result. + + Args: + start: Issues the START ``tools/call`` once; returns a :class:`JobStart`. + fetch: Re-calls the tool with the FETCH ``_meta`` for a handle. + tool_name: The MCP tool name (for diagnostics). + + Returns: + The FETCH result for a job-backed call, or the normal tool result when + the call did not start a job. + """ + + @durable_interrupt + async def _suspend_on_job() -> Any: + outcome = await start() + if outcome.handle is None: + # Non-job tool / no opt-in: do not suspend; carry the result. + return _NonJobStartValue(outcome) + # The whole handle round-trips the suspend via this WaitJobRaw payload. + return WaitJobRaw( + job=Job( + id=0, + key=outcome.handle.job_key, + folder_key=outcome.handle.folder_key, + ), + process_folder_key=outcome.handle.folder_key, + ) + + resumed = await _suspend_on_job() + + if isinstance(resumed, JobStart): + # Non-job path: the START result is the tool's output. + return resumed.result + + # Resume path: `resumed` is the runtime-materialized terminal raw Job. + # WaitJobRaw skips state validation, so re-check for failure (as process_tool does). + job = resumed + if (job.state or "").lower() == JobState.FAULTED: + return str( + getattr(job, "info", None) or f"Job for tool '{tool_name}' faulted" + ) + if not job.key or not job.folder_key: + return str( + getattr(job, "info", None) + or f"Job for tool '{tool_name}' returned no key to fetch its result" + ) + + # Re-derive the handle from the resumed Job and let the server format the result. + handle = UiPathJobHandle(job_key=job.key, folder_key=job.folder_key) + return await fetch(handle) diff --git a/src/uipath_langchain/agent/tools/mcp/mcp_client.py b/src/uipath_langchain/agent/tools/mcp/mcp_client.py index c201a2dcb..88a1aab84 100644 --- a/src/uipath_langchain/agent/tools/mcp/mcp_client.py +++ b/src/uipath_langchain/agent/tools/mcp/mcp_client.py @@ -15,8 +15,9 @@ from mcp import ClientSession from mcp.shared.exceptions import McpError from mcp.shared.message import SessionMessage -from mcp.types import CallToolResult, ListToolsResult +from mcp.types import CallToolResult, InitializeResult, ListToolsResult from uipath._utils._ssl_context import get_httpx_client_kwargs +from uipath.platform.mcp_jobs import read_job_version from uipath.runtime.base import UiPathDisposableProtocol from uipath_langchain._utils import get_execution_folder_path @@ -25,6 +26,7 @@ if TYPE_CHECKING: from uipath.agent.models.agent import AgentMcpResourceConfig + from uipath.platform.mcp_jobs import McpJobExecutor from uipath.platform.orchestrator.mcp import McpServer logger = logging.getLogger(__name__) @@ -78,6 +80,7 @@ def __init__( max_retries: int = 1, session_info_factory: SessionInfoFactory | None = None, terminate_on_close: bool = True, + job_executor: "McpJobExecutor | None" = None, ) -> None: """Initialize the MCP tool session. @@ -90,12 +93,21 @@ def __init__( max_retries: Maximum number of retries on session disconnect errors. session_info_factory: Factory for creating SessionInfo instances. Defaults to ``SessionInfoFactory`` which returns a plain SessionInfo. + terminate_on_close: Whether to terminate the MCP session on close. + job_executor: Executor that awaits a UiPath job the server starts behind + a ``tools/call`` (see ``uipath.com/job``). Only used when the server + advertised support on ``initialize``. ``None`` disables job handling. """ self._config = config self._timeout = timeout or httpx.Timeout(600) self._max_retries = max_retries self._session_info_factory = session_info_factory or SessionInfoFactory() self._terminate_on_close = terminate_on_close + self._job_executor = job_executor + + # uipath.com/job negotiation state (set from the initialize advertisement). + self._job_aware: bool = False + self._job_version: int | None = None # URL and headers are resolved lazily from SDK self._url: str | None = None @@ -229,7 +241,8 @@ async def _initialize_session(self) -> None: ) if existing_session_id is None: - await self._session.initialize() + init_result = await self._session.initialize() + self._apply_job_advertisement(init_result) # The transport calls set_session_id during initialize, # so we just read the current value here. @@ -240,6 +253,35 @@ async def _initialize_session(self) -> None: ) logger.info(f"MCP session initialized with session ID: {new_session_id}") + def _apply_job_advertisement(self, init_result: InitializeResult) -> None: + """Read the ``uipath.com/job`` advertisement from the initialize result. + + When the server advertises support (it has job-backed tools), mark the + session job-aware so the client opts in (sends the START ``_meta``) on + every ``tools/call``. Re-evaluated on every fresh ``initialize`` so the + flag is stable across a suspend/resume in production. + """ + version = read_job_version(init_result.meta) + if version is not None: + self._job_aware = True + self._job_version = version + logger.info(f"MCP server advertised uipath.com/job v{version}") + + @property + def is_job_aware(self) -> bool: + """Whether the server advertised ``uipath.com/job`` support on initialize.""" + return self._job_aware + + @property + def job_version(self) -> int | None: + """The advertised ``uipath.com/job`` contract version, if any.""" + return self._job_version + + @property + def job_executor(self) -> "McpJobExecutor | None": + """The executor used to await job-backed tool calls, if configured.""" + return self._job_executor + async def _ensure_session(self) -> ClientSession: """Ensure client and session are initialized, return the session. @@ -351,18 +393,22 @@ async def call_tool( self, name: str, arguments: dict[str, Any] | None = None, + *, + meta: dict[str, Any] | None = None, ) -> CallToolResult: """Call an MCP tool by name. Args: name: The name of the tool to call. arguments: Optional arguments to pass to the tool. + meta: Optional request ``_meta`` (e.g. the ``uipath.com/job`` START or + FETCH marker). Mapped onto ``CallToolRequest.params._meta``. Returns: The tool call result. """ return await self._execute_with_retry( - lambda session: session.call_tool(name, arguments=arguments), + lambda session: session.call_tool(name, arguments=arguments, meta=meta), f"call_tool({name})", ) diff --git a/src/uipath_langchain/agent/tools/mcp/mcp_tool.py b/src/uipath_langchain/agent/tools/mcp/mcp_tool.py index 6963f1f70..c1750eab6 100644 --- a/src/uipath_langchain/agent/tools/mcp/mcp_tool.py +++ b/src/uipath_langchain/agent/tools/mcp/mcp_tool.py @@ -1,6 +1,6 @@ import logging from contextlib import AsyncExitStack, asynccontextmanager -from typing import Any, AsyncGenerator +from typing import TYPE_CHECKING, Any, AsyncGenerator from langchain_core.tools import BaseTool from uipath.agent.models.agent import ( @@ -10,6 +10,13 @@ DynamicToolsConfig, ) from uipath.eval.mocks import mockable +from uipath.platform.mcp_jobs import ( + JOB_PROTOCOL_VERSION, + JobStart, + build_fetch_meta, + build_start_meta, + read_job_handle, +) from uipath_langchain.agent.tools.structured_tool_with_argument_properties import ( StructuredToolWithArgumentProperties, @@ -18,9 +25,63 @@ from ..utils import sanitize_tool_name from .mcp_client import McpClient, SessionInfoFactory +if TYPE_CHECKING: + from uipath.platform.mcp_jobs import McpJobExecutor, UiPathJobHandle + logger: logging.Logger = logging.getLogger(__name__) +def _normalize_tool_result(result: Any) -> Any: + """Reduce a ``CallToolResult`` to the plain value the tool should return. + + Extracts ``.content`` and ``model_dump``s any structured blocks, matching the + shape the LLM expects from an MCP tool call. + """ + content = result.content if hasattr(result, "content") else result + if isinstance(content, list): + return [ + item.model_dump(exclude_none=True) if hasattr(item, "model_dump") else item + for item in content + ] + if hasattr(content, "model_dump"): + return content.model_dump(exclude_none=True) + return content + + +async def _invoke_job_aware( + mcp_tool: AgentMcpTool, + mcpClient: McpClient, + kwargs: dict[str, Any], +) -> Any: + """Invoke a tool on a job-aware server through the injected ``McpJobExecutor``. + + Sends the START ``uipath.com/job`` ``_meta`` on the call; if the server starts a + job it returns a handle and the executor suspends/awaits it, then FETCHes the + result with a follow-up ``tools/call``. Non-job tools just return their result. + """ + executor = mcpClient.job_executor + assert executor is not None # guarded by the caller + version = min(JOB_PROTOCOL_VERSION, mcpClient.job_version or JOB_PROTOCOL_VERSION) + + async def start() -> JobStart: + result = await mcpClient.call_tool( + mcp_tool.name, arguments=kwargs, meta=build_start_meta(version) + ) + handle = read_job_handle(result.meta) + if handle is not None: + logger.info(f"Tool '{mcp_tool.name}' started UiPath job {handle.job_key}") + return JobStart(handle=handle) + return JobStart(handle=None, result=_normalize_tool_result(result)) + + async def fetch(handle: "UiPathJobHandle") -> Any: + result = await mcpClient.call_tool( + mcp_tool.name, arguments=None, meta=build_fetch_meta(handle) + ) + return _normalize_tool_result(result) + + return await executor.run(start=start, fetch=fetch, tool_name=mcp_tool.name) + + @asynccontextmanager async def open_mcp_tools( config: list[AgentMcpResourceConfig], @@ -166,20 +227,17 @@ async def tool_fn(**kwargs: Any) -> Any: If a session disconnect error occurs (e.g., 404 or session terminated), the tool will retry once by re-initializing the session. + + When the server advertised ``uipath.com/job`` support and an executor is + configured, the call is routed through the executor so a long-running + UiPath job suspends/resumes the agent instead of blocking. """ + if mcpClient.is_job_aware and mcpClient.job_executor is not None: + return await _invoke_job_aware(mcp_tool, mcpClient, kwargs) + result = await mcpClient.call_tool(mcp_tool.name, arguments=kwargs) logger.info(f"Tool call successful for {mcp_tool.name}") - content = result.content if hasattr(result, "content") else result - if isinstance(content, list): - return [ - item.model_dump(exclude_none=True) - if hasattr(item, "model_dump") - else item - for item in content - ] - if hasattr(content, "model_dump"): - return content.model_dump(exclude_none=True) - return content + return _normalize_tool_result(result) return tool_fn @@ -188,6 +246,7 @@ async def create_mcp_tools_and_clients( resources: list[AgentMcpResourceConfig], session_info_factory: SessionInfoFactory | None = None, terminate_on_close: bool = True, + job_executor: "McpJobExecutor | None" = None, ) -> tuple[list[BaseTool], list[McpClient]]: """Create MCP tools from a list of MCP resource configurations. @@ -203,6 +262,12 @@ async def create_mcp_tools_and_clients( Defaults to the base ``SessionInfoFactory``. Pass ``SessionInfoDebugStateFactory()`` for playground mode. terminate_on_close: Whether to terminate the MCP session on close. + job_executor: Executor that awaits long-running UiPath jobs started behind + a ``tools/call`` on a job-aware server (``uipath.com/job``). Defaults to + :class:`LangGraphJobExecutor`, which suspends/resumes the agent — so a + deployed LangGraph agent gains the behavior on package upgrade, with no + ``agent.json`` change. Pass ``BlockingJobExecutor`` for non-LangGraph + hosts. Returns: A tuple of (tools, mcp_clients) where: @@ -213,6 +278,12 @@ async def create_mcp_tools_and_clients( The caller is responsible for closing the McpClient instances when done. Each McpClient manages its own session lifecycle with automatic 404 recovery. """ + if job_executor is None: + # Lazy import: this package's default is LangGraph suspend/resume. + from .job_executor import LangGraphJobExecutor + + job_executor = LangGraphJobExecutor() + tools: list[BaseTool] = [] clients: list[McpClient] = [] @@ -227,6 +298,7 @@ async def create_mcp_tools_and_clients( config=resource, session_info_factory=session_info_factory, terminate_on_close=terminate_on_close, + job_executor=job_executor, ) clients.append(mcpClient) diff --git a/tests/agent/tools/test_mcp/claude.md b/tests/agent/tools/test_mcp/claude.md index cb6ef7f26..cacf399d5 100644 --- a/tests/agent/tools/test_mcp/claude.md +++ b/tests/agent/tools/test_mcp/claude.md @@ -607,7 +607,9 @@ def _build_response(self): | File | Purpose | |------|---------| | `test_mcp_client.py` | McpClient session tests (7 tests) | -| `test_mcp_tool.py` | Tool factory tests (17 tests) | +| `test_mcp_tool.py` | Tool factory tests (20 tests) | +| `test_mcp_jobs.py` | `uipath.com/job` advertisement, START/FETCH `_meta`, `LangGraphJobExecutor` (9 tests) | | `src/.../mcp/mcp_client.py` | McpClient implementation | | `src/.../mcp/mcp_tool.py` | Tool factory implementation | +| `src/.../mcp/job_executor.py` | LangGraphJobExecutor implementation | | `src/.../mcp/claude.md` | Implementation documentation | diff --git a/tests/agent/tools/test_mcp/test_mcp_jobs.py b/tests/agent/tools/test_mcp/test_mcp_jobs.py new file mode 100644 index 000000000..4882a53b0 --- /dev/null +++ b/tests/agent/tools/test_mcp/test_mcp_jobs.py @@ -0,0 +1,232 @@ +"""Tests for the uipath.com/job MCP feature (PR-C1 client + PR-C2 LangGraph executor). + +Covers: the initialize advertisement → ``is_job_aware``; the job-aware tool_fn +routing and the START/FETCH ``_meta`` the wrapper sends; and the +``LangGraphJobExecutor`` non-job / suspend-resume / faulted paths. +""" + +from typing import Any +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest +from mcp.types import InitializeResult +from uipath.platform.common import WaitJobRaw +from uipath.platform.mcp_jobs import JobStart, UiPathJobHandle +from uipath.platform.orchestrator import Job + +from uipath_langchain.agent.tools.mcp import LangGraphJobExecutor, McpClient +from uipath_langchain.agent.tools.mcp.mcp_tool import build_mcp_tool + +_INTERRUPT = "uipath_langchain._utils.durable_interrupt.decorator.interrupt" + + +def _init_result(meta: dict[str, Any] | None) -> InitializeResult: + payload: dict[str, Any] = { + "protocolVersion": "2025-11-25", + "capabilities": {"tools": {}}, + "serverInfo": {"name": "test-server", "version": "1.0.0"}, + } + if meta is not None: + payload["_meta"] = meta + return InitializeResult.model_validate(payload) + + +def _agent_mcp_tool() -> Any: + from uipath.agent.models.agent import AgentMcpTool + + return AgentMcpTool( + name="run_process", + description="Runs a process", + input_schema={"type": "object", "properties": {}}, + ) + + +def _tool_result(*, meta: dict[str, Any] | None, content: Any) -> MagicMock: + result = MagicMock() + result.meta = meta + result.content = content + return result + + +# --- advertisement parsing (PR-C1) ----------------------------------------- + + +def test_advertisement_marks_session_job_aware() -> None: + client = McpClient(config=MagicMock()) + assert client.is_job_aware is False + + client._apply_job_advertisement(_init_result({"uipath.com/job": {"version": 1}})) + + assert client.is_job_aware is True + assert client.job_version == 1 + + +def test_no_advertisement_leaves_session_plain() -> None: + client = McpClient(config=MagicMock()) + + client._apply_job_advertisement(_init_result(None)) + + assert client.is_job_aware is False + assert client.job_version is None + + +# --- job-aware routing + START/FETCH _meta (PR-C1) ------------------------- + + +class _CaptureExecutor: + """Captures the start/fetch closures the wrapper hands the executor.""" + + def __init__(self) -> None: + self.start: Any = None + self.fetch: Any = None + self.tool_name: str | None = None + + async def run(self, *, start: Any, fetch: Any, tool_name: str) -> Any: + self.start = start + self.fetch = fetch + self.tool_name = tool_name + return "executor-ran" + + +def _job_aware_client(call_tool: AsyncMock) -> MagicMock: + client = MagicMock(spec=McpClient) + client.is_job_aware = True + client.job_version = 1 + client.call_tool = call_tool + return client + + +@pytest.mark.asyncio +async def test_job_aware_tool_delegates_to_executor() -> None: + executor = _CaptureExecutor() + client = _job_aware_client(AsyncMock()) + client.job_executor = executor + + tool_fn = build_mcp_tool(_agent_mcp_tool(), client) + out = await tool_fn(city="here") + + assert out == "executor-ran" + assert executor.tool_name == "run_process" + + +@pytest.mark.asyncio +async def test_start_sends_start_meta_and_parses_handle() -> None: + call_tool = AsyncMock( + return_value=_tool_result( + meta={"uipath.com/job": {"key": "job-1", "folderKey": "folder-1"}}, + content="ignored", + ) + ) + executor = _CaptureExecutor() + client = _job_aware_client(call_tool) + client.job_executor = executor + + tool_fn = build_mcp_tool(_agent_mcp_tool(), client) + await tool_fn(city="here") + start_outcome = await executor.start() + + assert start_outcome == JobStart( + handle=UiPathJobHandle(job_key="job-1", folder_key="folder-1") + ) + call_tool.assert_awaited_once_with( + "run_process", + arguments={"city": "here"}, + meta={"uipath.com/job": {"version": 1}}, + ) + + +@pytest.mark.asyncio +async def test_start_without_handle_returns_normal_result() -> None: + call_tool = AsyncMock(return_value=_tool_result(meta=None, content="plain output")) + executor = _CaptureExecutor() + client = _job_aware_client(call_tool) + client.job_executor = executor + + tool_fn = build_mcp_tool(_agent_mcp_tool(), client) + await tool_fn() + start_outcome = await executor.start() + + assert start_outcome == JobStart(handle=None, result="plain output") + + +@pytest.mark.asyncio +async def test_fetch_sends_fetch_meta_with_no_arguments() -> None: + call_tool = AsyncMock(return_value=_tool_result(meta=None, content="job result")) + executor = _CaptureExecutor() + client = _job_aware_client(call_tool) + client.job_executor = executor + + tool_fn = build_mcp_tool(_agent_mcp_tool(), client) + await tool_fn() + fetched = await executor.fetch( + UiPathJobHandle(job_key="job-1", folder_key="folder-1") + ) + + assert fetched == "job result" + call_tool.assert_awaited_once_with( + "run_process", + arguments=None, + meta={"uipath.com/job": {"key": "job-1", "folderKey": "folder-1"}}, + ) + + +# --- LangGraphJobExecutor (PR-C2) ------------------------------------------ + + +@pytest.mark.asyncio +async def test_executor_non_job_returns_result_without_interrupt() -> None: + executor = LangGraphJobExecutor() + + async def start() -> JobStart: + return JobStart(handle=None, result="plain") + + async def fetch(handle: UiPathJobHandle) -> Any: + raise AssertionError("fetch must not run for a non-job result") + + with patch(_INTERRUPT) as mock_interrupt: + out = await executor.run(start=start, fetch=fetch, tool_name="t") + + assert out == "plain" + mock_interrupt.assert_not_called() + + +@pytest.mark.asyncio +async def test_executor_suspends_with_waitjobraw_then_fetches_on_resume() -> None: + executor = LangGraphJobExecutor() + resumed_job = Job(id=0, key="job-1", state="successful", folder_key="folder-1") + + async def start() -> JobStart: + return JobStart(handle=UiPathJobHandle(job_key="job-1", folder_key="folder-1")) + + async def fetch(handle: UiPathJobHandle) -> Any: + return {"fetched": handle.job_key, "folder": handle.folder_key} + + # Patch interrupt to simulate suspend→resume: it returns the terminal Job. + with patch(_INTERRUPT, return_value=resumed_job) as mock_interrupt: + out = await executor.run(start=start, fetch=fetch, tool_name="t") + + assert out == {"fetched": "job-1", "folder": "folder-1"} + assert mock_interrupt.call_count == 1 + wait_value = mock_interrupt.call_args.args[0] + assert isinstance(wait_value, WaitJobRaw) + assert wait_value.job.key == "job-1" + assert wait_value.process_folder_key == "folder-1" + + +@pytest.mark.asyncio +async def test_executor_surfaces_faulted_job_without_fetching() -> None: + executor = LangGraphJobExecutor() + faulted_job = Job( + id=0, key="job-1", state="faulted", folder_key="folder-1", info="it broke" + ) + + async def start() -> JobStart: + return JobStart(handle=UiPathJobHandle(job_key="job-1", folder_key="folder-1")) + + async def fetch(handle: UiPathJobHandle) -> Any: + raise AssertionError("fetch must not run for a faulted job") + + with patch(_INTERRUPT, return_value=faulted_job): + out = await executor.run(start=start, fetch=fetch, tool_name="t") + + assert out == "it broke" diff --git a/tests/agent/tools/test_mcp/test_mcp_tool.py b/tests/agent/tools/test_mcp/test_mcp_tool.py index c1307efb1..54bcc8f70 100644 --- a/tests/agent/tools/test_mcp/test_mcp_tool.py +++ b/tests/agent/tools/test_mcp/test_mcp_tool.py @@ -622,6 +622,7 @@ async def test_single_object_with_model_dump(self, mcp_tool): mock_result.content = mock_content mock_client = MagicMock(spec=McpClient) + mock_client.is_job_aware = False mock_client.call_tool = AsyncMock(return_value=mock_result) tool_fn = build_mcp_tool(mcp_tool, mock_client) @@ -642,6 +643,7 @@ async def test_list_of_objects_with_model_dump(self, mcp_tool): mock_result.content = [mock_item] mock_client = MagicMock(spec=McpClient) + mock_client.is_job_aware = False mock_client.call_tool = AsyncMock(return_value=mock_result) tool_fn = build_mcp_tool(mcp_tool, mock_client) @@ -658,6 +660,7 @@ async def test_plain_value_returned_as_is(self, mcp_tool): mock_result.content = "plain string" mock_client = MagicMock(spec=McpClient) + mock_client.is_job_aware = False mock_client.call_tool = AsyncMock(return_value=mock_result) tool_fn = build_mcp_tool(mcp_tool, mock_client) diff --git a/uv.lock b/uv.lock index 3bacdc8ae..5f995eb2e 100644 --- a/uv.lock +++ b/uv.lock @@ -9,7 +9,7 @@ resolution-markers = [ ] [options] -exclude-newer = "0001-01-01T00:00:00Z" # This has no effect and is included for backwards compatibility when using relative exclude-newer values. +exclude-newer = "2026-06-12T07:24:09.025916Z" exclude-newer-span = "P2D" [options.exclude-newer-package] @@ -2200,7 +2200,7 @@ wheels = [ [[package]] name = "mcp" -version = "1.26.0" +version = "1.27.2" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "anyio" }, @@ -2218,9 +2218,9 @@ dependencies = [ { name = "typing-inspection" }, { name = "uvicorn", marker = "sys_platform != 'emscripten'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/fc/6d/62e76bbb8144d6ed86e202b5edd8a4cb631e7c8130f3f4893c3f90262b10/mcp-1.26.0.tar.gz", hash = "sha256:db6e2ef491eecc1a0d93711a76f28dec2e05999f93afd48795da1c1137142c66", size = 608005, upload-time = "2026-01-24T19:40:32.468Z" } +sdist = { url = "https://files.pythonhosted.org/packages/27/3c/347cf965d313f5d41764e7d46bea6ffe7d9ef13b983cc429b0340962a082/mcp-1.27.2.tar.gz", hash = "sha256:8e02db104096d1c25b28e64bde29a5c32b31bc241710213e12fd4d84985bdfef", size = 621116, upload-time = "2026-05-29T17:16:04.039Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/fd/d9/eaa1f80170d2b7c5ba23f3b59f766f3a0bb41155fbc32a69adfa1adaaef9/mcp-1.26.0-py3-none-any.whl", hash = "sha256:904a21c33c25aa98ddbeb47273033c435e595bbacfdb177f4bd87f6dceebe1ca", size = 233615, upload-time = "2026-01-24T19:40:30.652Z" }, + { url = "https://files.pythonhosted.org/packages/c9/11/252c6f971dc4f16af1d98a1c469d8ba523aab00d1bb76b4d3bc1ff32eacc/mcp-1.27.2-py3-none-any.whl", hash = "sha256:d6ff5160c6ca65d93013626efb3fc249de683c30b2d8570755ceddd490344de5", size = 220498, upload-time = "2026-05-29T17:16:02.442Z" }, ] [[package]] @@ -4458,7 +4458,7 @@ requires-dist = [ { name = "langchain-mcp-adapters", specifier = "==0.2.1" }, { name = "langgraph", specifier = ">=1.1.8,<2.0.0" }, { name = "langgraph-checkpoint-sqlite", specifier = ">=3.0.3,<4.0.0" }, - { name = "mcp", specifier = "==1.26.0" }, + { name = "mcp", specifier = "==1.27.2" }, { name = "openinference-instrumentation-langchain", specifier = ">=0.1.56" }, { name = "pillow", specifier = ">=12.1.1" }, { name = "pydantic-settings", specifier = ">=2.6.0" },