Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ requires-python = ">=3.11"
dependencies = [
"uipath>=2.10.79, <2.11.0",
"uipath-core>=0.5.17, <0.6.0",
"uipath-platform>=0.1.61, <0.2.0",
"uipath-platform>=0.1.65, <0.2.0",
"uipath-runtime>=0.11.0, <0.12.0",
Comment on lines 8 to 11
"langgraph>=1.1.8, <2.0.0",
"langchain-core>=1.2.11, <2.0.0",
Expand All @@ -19,7 +19,7 @@ dependencies = [
"openinference-instrumentation-langchain>=0.1.56",
"jsonschema-pydantic-converter>=0.4.0",
"jsonpath-ng>=1.7.0",
"mcp==1.26.0",
"mcp==1.27.2",
"langchain-mcp-adapters==0.2.1",
"pillow>=12.1.1",
"a2a-sdk>=0.2.0,<1.0.0",
Expand Down
2 changes: 2 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""MCP (Model Context Protocol) tools."""

from .job_executor import LangGraphJobExecutor
from .mcp_client import McpClient, SessionInfoFactory
from .mcp_tool import (
create_mcp_tools,
Expand All @@ -9,6 +10,7 @@
from .streamable_http import SessionInfo

__all__ = [
"LangGraphJobExecutor",
"McpClient",
"SessionInfo",
"SessionInfoFactory",
Expand Down
53 changes: 53 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/claude.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ src/uipath_langchain/agent/tools/mcp/
├── __init__.py # Public exports
├── mcp_client.py # SessionInfoFactory, McpClient
├── mcp_tool.py # Tool factory functions
├── job_executor.py # LangGraphJobExecutor (uipath.com/job suspend/resume)
└── streamable_http.py # SessionInfo, StreamableHTTPTransport (copied from MCP SDK)
```

### Public Exports (`__init__.py`)

```python
from .job_executor import LangGraphJobExecutor
from .mcp_client import McpClient, SessionInfoFactory
from .mcp_tool import (
create_mcp_tools_and_clients,
Expand Down Expand Up @@ -502,6 +504,57 @@ endpoint (`GET/PUT agenthub_/design/debugstate/{agentId}/{key}`). It lives
in `uipath-agents` because it depends on execution-type logic that belongs
in the agent layer, not in the langchain tools layer.

## `uipath.com/job` — long-running job support

When a UiPath MCP server backs a tool with an Orchestrator **job**, the agent
should *suspend* while the job runs and *resume* with its result instead of
blocking. This is negotiated entirely through MCP `_meta` (no `agent.json`
change), so a deployed agent gains the behavior on package upgrade.

**Flow (single key `uipath.com/job`, all in `_meta`):**

1. **Advertise** — on `initialize`, a job-capable server returns
`InitializeResult._meta["uipath.com/job"] = {"version": 1}`.
`McpClient._apply_job_advertisement()` reads it and sets `is_job_aware` /
`job_version`. Re-read on every fresh `initialize`, so the flag is stable
across a production suspend/resume.
2. **START** — for a job-aware session, `mcp_tool._invoke_job_aware` sends
`params._meta["uipath.com/job"] = {"version": N}` (no `key`) on `tools/call`
via `McpClient.call_tool(..., meta=...)`. A job-backed tool returns
`result._meta["uipath.com/job"] = {"key", "folderKey"}` immediately.
3. **Suspend** — the `McpJobExecutor` (default `LangGraphJobExecutor`) interrupts
with `WaitJobRaw(Job(id=0, key, folder_key), process_folder_key=folder_key)`
inside `@durable_interrupt` (same mechanism as `process_tool`). The runtime
persists a `JOB` resume trigger (`JOB_RAW` name → no output extraction);
Orchestrator resumes the agent when the child job is terminal.
4. **FETCH** — on resume the body is skipped; `interrupt(None)` returns the
terminal `Job`. The executor re-derives `{key, folderKey}` from it and calls
the neutral `fetch`, which re-issues `tools/call` (no args) with
`params._meta["uipath.com/job"] = {"key", "folderKey"}`. The server returns
the formatted result — that `CallToolResult` is the tool's output.

**Where the pieces live:**

| Piece | Location | Notes |
|-------|----------|-------|
| `_meta` build/parse, `UiPathJobHandle`, `JobStart`, `McpJobExecutor`, `BlockingJobExecutor` | `uipath.platform.mcp_jobs` (uipath-python) | Framework- and MCP-SDK-neutral (plain dicts) |
| `is_job_aware`, `job_version`, `job_executor`, `call_tool(meta=)`, `_apply_job_advertisement` | `mcp_client.py` | Reads the advertisement; threads `_meta` |
| `_invoke_job_aware`, `_normalize_tool_result`, `create_mcp_tools_and_clients(job_executor=)` | `mcp_tool.py` | Builds the neutral `start`/`fetch`; default executor = `LangGraphJobExecutor` |
| `LangGraphJobExecutor` (the only place `interrupt` is called) | `job_executor.py` | Suspend → resume → fetch |

**Key invariants:**

- `interrupt` is confined to `LangGraphJobExecutor`. The neutral core never
imports langgraph; the executor never imports `mcp`.
- The job handle survives the suspend **only** via the `WaitJobRaw` payload
(re-derived from the resumed `Job`), never via closures — the
`@durable_interrupt` body does not re-run on resume.
- Non-job tools on a job-aware server: the server returns a normal result (no
handle); `LangGraphJobExecutor` resolves it via `SkipInterruptValue`
(`_NonJobStartValue`) so the durable index stays aligned with no real suspend.
- Old/non-UiPath servers never advertise → `is_job_aware` stays `False` → today's
plain blocking path (back-compat, zero behavior change).

## Tests

Tests are in `tests/agent/tools/test_mcp/`.
Expand Down
115 changes: 115 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/job_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""LangGraph executor for MCP-backed UiPath jobs.

When an MCP ``tools/call`` starts a UiPath job (the server returns a
``uipath.com/job`` handle), :class:`LangGraphJobExecutor` suspends the LangGraph
agent on that job and resumes when it finishes — the same durable suspend/resume
mechanism :mod:`uipath_langchain.agent.tools.process_tool` uses.

It mirrors ``process_tool`` exactly:

* the START ``tools/call`` runs **inside** ``@durable_interrupt`` so it executes
exactly once (a resume re-runs the node but skips the body);
* it interrupts with ``WaitJobRaw`` so the runtime persists a ``JOB`` resume
trigger (with the ``JOB_RAW`` name → resume without output extraction) and
Orchestrator resumes the agent when the child job reaches a terminal state;
* on resume the body is skipped and ``interrupt(None)`` returns the terminal
``Job``; we re-derive the ``{key, folderKey}`` handle from it and FETCH the
result with a follow-up ``tools/call`` (the server formats the output).

The neutral wire work (building the START/FETCH ``_meta``, parsing the handle)
lives in :mod:`uipath.platform.mcp_jobs`; this class only owns the *suspend*
policy, so ``interrupt`` is confined here.
"""

from __future__ import annotations

from typing import Any

from uipath.platform.common import WaitJobRaw
from uipath.platform.mcp_jobs import (
FetchFn,
JobStart,
StartFn,
UiPathJobHandle,
)
from uipath.platform.orchestrator import Job, JobState

from uipath_langchain._utils.durable_interrupt import (
SkipInterruptValue,
durable_interrupt,
)


class _NonJobStartValue(SkipInterruptValue):
"""Carries a non-job :class:`JobStart` back through ``@durable_interrupt``.

A job-aware client sends the START ``_meta`` on every call, but the server
only returns a handle for job-backed tools. For a normal (non-job) result we
must NOT suspend — yet the ``@durable_interrupt`` body still has to run on
every pass to keep the durable index aligned. Returning this value injects the
result into the scratchpad and resumes immediately, without a real suspend.
"""

def __init__(self, outcome: JobStart) -> None:
self._outcome = outcome

@property
def resume_value(self) -> Any:
"""The :class:`JobStart` to return to the executor without suspending."""
return self._outcome


class LangGraphJobExecutor:
"""``McpJobExecutor`` that suspends the LangGraph agent on the started job."""

async def run(self, *, start: StartFn, fetch: FetchFn, tool_name: str) -> Any:
"""Start the job, suspend until it finishes, then FETCH its result.

Args:
start: Issues the START ``tools/call`` once; returns a :class:`JobStart`.
fetch: Re-calls the tool with the FETCH ``_meta`` for a handle.
tool_name: The MCP tool name (for diagnostics).

Returns:
The FETCH result for a job-backed call, or the normal tool result when
the call did not start a job.
"""

@durable_interrupt
async def _suspend_on_job() -> Any:
outcome = await start()
if outcome.handle is None:
# Non-job tool / no opt-in: do not suspend; carry the result.
return _NonJobStartValue(outcome)
# The whole handle round-trips the suspend via this WaitJobRaw payload.
return WaitJobRaw(
job=Job(
id=0,
key=outcome.handle.job_key,
folder_key=outcome.handle.folder_key,
),
process_folder_key=outcome.handle.folder_key,
)

resumed = await _suspend_on_job()

if isinstance(resumed, JobStart):
# Non-job path: the START result is the tool's output.
return resumed.result

# Resume path: `resumed` is the runtime-materialized terminal raw Job.
# WaitJobRaw skips state validation, so re-check for failure (as process_tool does).
job = resumed
if (job.state or "").lower() == JobState.FAULTED:
return str(
getattr(job, "info", None) or f"Job for tool '{tool_name}' faulted"
)
if not job.key or not job.folder_key:
return str(
getattr(job, "info", None)
or f"Job for tool '{tool_name}' returned no key to fetch its result"
)

# Re-derive the handle from the resumed Job and let the server format the result.
handle = UiPathJobHandle(job_key=job.key, folder_key=job.folder_key)
return await fetch(handle)
52 changes: 49 additions & 3 deletions src/uipath_langchain/agent/tools/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
from mcp import ClientSession
from mcp.shared.exceptions import McpError
from mcp.shared.message import SessionMessage
from mcp.types import CallToolResult, ListToolsResult
from mcp.types import CallToolResult, InitializeResult, ListToolsResult
from uipath._utils._ssl_context import get_httpx_client_kwargs
from uipath.platform.mcp_jobs import read_job_version
from uipath.runtime.base import UiPathDisposableProtocol

from uipath_langchain._utils import get_execution_folder_path
Expand All @@ -25,6 +26,7 @@

if TYPE_CHECKING:
from uipath.agent.models.agent import AgentMcpResourceConfig
from uipath.platform.mcp_jobs import McpJobExecutor
from uipath.platform.orchestrator.mcp import McpServer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -78,6 +80,7 @@ def __init__(
max_retries: int = 1,
session_info_factory: SessionInfoFactory | None = None,
terminate_on_close: bool = True,
job_executor: "McpJobExecutor | None" = None,
) -> None:
"""Initialize the MCP tool session.

Expand All @@ -90,12 +93,21 @@ def __init__(
max_retries: Maximum number of retries on session disconnect errors.
session_info_factory: Factory for creating SessionInfo instances.
Defaults to ``SessionInfoFactory`` which returns a plain SessionInfo.
terminate_on_close: Whether to terminate the MCP session on close.
job_executor: Executor that awaits a UiPath job the server starts behind
a ``tools/call`` (see ``uipath.com/job``). Only used when the server
advertised support on ``initialize``. ``None`` disables job handling.
"""
self._config = config
self._timeout = timeout or httpx.Timeout(600)
self._max_retries = max_retries
self._session_info_factory = session_info_factory or SessionInfoFactory()
self._terminate_on_close = terminate_on_close
self._job_executor = job_executor

# uipath.com/job negotiation state (set from the initialize advertisement).
self._job_aware: bool = False
self._job_version: int | None = None

# URL and headers are resolved lazily from SDK
self._url: str | None = None
Expand Down Expand Up @@ -229,7 +241,8 @@ async def _initialize_session(self) -> None:
)

if existing_session_id is None:
await self._session.initialize()
init_result = await self._session.initialize()
self._apply_job_advertisement(init_result)

# The transport calls set_session_id during initialize,
# so we just read the current value here.
Expand All @@ -240,6 +253,35 @@ async def _initialize_session(self) -> None:
)
logger.info(f"MCP session initialized with session ID: {new_session_id}")

def _apply_job_advertisement(self, init_result: InitializeResult) -> None:
"""Read the ``uipath.com/job`` advertisement from the initialize result.

When the server advertises support (it has job-backed tools), mark the
session job-aware so the client opts in (sends the START ``_meta``) on
every ``tools/call``. Re-evaluated on every fresh ``initialize`` so the
flag is stable across a suspend/resume in production.
"""
version = read_job_version(init_result.meta)
if version is not None:
self._job_aware = True
self._job_version = version
logger.info(f"MCP server advertised uipath.com/job v{version}")
Comment on lines +264 to +268

@property
def is_job_aware(self) -> bool:
"""Whether the server advertised ``uipath.com/job`` support on initialize."""
return self._job_aware

@property
def job_version(self) -> int | None:
"""The advertised ``uipath.com/job`` contract version, if any."""
return self._job_version

@property
def job_executor(self) -> "McpJobExecutor | None":
"""The executor used to await job-backed tool calls, if configured."""
return self._job_executor

async def _ensure_session(self) -> ClientSession:
"""Ensure client and session are initialized, return the session.

Expand Down Expand Up @@ -351,18 +393,22 @@ async def call_tool(
self,
name: str,
arguments: dict[str, Any] | None = None,
*,
meta: dict[str, Any] | None = None,
) -> CallToolResult:
"""Call an MCP tool by name.

Args:
name: The name of the tool to call.
arguments: Optional arguments to pass to the tool.
meta: Optional request ``_meta`` (e.g. the ``uipath.com/job`` START or
FETCH marker). Mapped onto ``CallToolRequest.params._meta``.

Returns:
The tool call result.
"""
return await self._execute_with_retry(
lambda session: session.call_tool(name, arguments=arguments),
lambda session: session.call_tool(name, arguments=arguments, meta=meta),
f"call_tool({name})",
)

Expand Down
Loading