Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies = [
"openinference-instrumentation-langchain>=0.1.56",
"jsonschema-pydantic-converter>=0.4.0",
"jsonpath-ng>=1.7.0",
"mcp==1.26.0",
"mcp==1.27.2",
"langchain-mcp-adapters==0.2.1",
"pillow>=12.1.1",
"a2a-sdk>=0.2.0,<1.0.0",
Expand Down
12 changes: 12 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
"""MCP (Model Context Protocol) tools."""

from .job_executor import LangGraphJobExecutor
from .jobs import (
BlockingJobExecutor,
JobStart,
McpJobExecutor,
UiPathJobHandle,
)
from .mcp_client import McpClient, SessionInfoFactory
from .mcp_tool import (
create_mcp_tools,
Expand All @@ -9,9 +16,14 @@
from .streamable_http import SessionInfo

__all__ = [
"BlockingJobExecutor",
"JobStart",
"LangGraphJobExecutor",
"McpClient",
"McpJobExecutor",
"SessionInfo",
"SessionInfoFactory",
"UiPathJobHandle",
"create_mcp_tools_and_clients",
"open_mcp_tools",
"create_mcp_tools",
Expand Down
54 changes: 54 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/claude.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ src/uipath_langchain/agent/tools/mcp/
├── __init__.py # Public exports
├── mcp_client.py # SessionInfoFactory, McpClient
├── mcp_tool.py # Tool factory functions
├── jobs.py # uipath.com/job _meta contract + executors (neutral)
├── job_executor.py # LangGraphJobExecutor (uipath.com/job suspend/resume)
└── streamable_http.py # SessionInfo, StreamableHTTPTransport (copied from MCP SDK)
```

### Public Exports (`__init__.py`)

```python
from .job_executor import LangGraphJobExecutor
from .mcp_client import McpClient, SessionInfoFactory
from .mcp_tool import (
create_mcp_tools_and_clients,
Expand Down Expand Up @@ -502,6 +505,57 @@ endpoint (`GET/PUT agenthub_/design/debugstate/{agentId}/{key}`). It lives
in `uipath-agents` because it depends on execution-type logic that belongs
in the agent layer, not in the langchain tools layer.

## `uipath.com/job` — long-running job support

When a UiPath MCP server backs a tool with an Orchestrator **job**, the agent
should *suspend* while the job runs and *resume* with its result instead of
blocking. This is negotiated entirely through MCP `_meta` (no `agent.json`
change), so a deployed agent gains the behavior on package upgrade.

**Flow (single key `uipath.com/job`, all in `_meta`):**

1. **Advertise** — on `initialize`, a job-capable server returns
`InitializeResult._meta["uipath.com/job"] = {"version": 1}`.
`McpClient._apply_job_advertisement()` reads it and sets `is_job_aware` /
`job_version`. Re-read on every fresh `initialize`, so the flag is stable
across a production suspend/resume.
2. **START** — for a job-aware session, `mcp_tool._invoke_job_aware` sends
`params._meta["uipath.com/job"] = {"version": N}` (no `key`) on `tools/call`
via `McpClient.call_tool(..., meta=...)`. A job-backed tool returns
`result._meta["uipath.com/job"] = {"key", "folderKey"}` immediately.
3. **Suspend** — the `McpJobExecutor` (default `LangGraphJobExecutor`) interrupts
with `WaitJobRaw(Job(id=0, key, folder_key), process_folder_key=folder_key)`
inside `@durable_interrupt` (same mechanism as `process_tool`). The runtime
persists a `JOB` resume trigger (`JOB_RAW` name → no output extraction);
Orchestrator resumes the agent when the child job is terminal.
4. **FETCH** — on resume the body is skipped; `interrupt(None)` returns the
terminal `Job`. The executor re-derives `{key, folderKey}` from it and calls
the neutral `fetch`, which re-issues `tools/call` (no args) with
`params._meta["uipath.com/job"] = {"key", "folderKey"}`. The server returns
the formatted result — that `CallToolResult` is the tool's output.

**Where the pieces live:**

| Piece | Location | Notes |
|-------|----------|-------|
| `_meta` build/parse, `UiPathJobHandle`, `JobStart`, `McpJobExecutor`, `BlockingJobExecutor` | `jobs.py` (this package, self-contained) | Framework- and MCP-SDK-neutral (plain dicts); no base-SDK change |
| `is_job_aware`, `job_version`, `job_executor`, `call_tool(meta=)`, `_apply_job_advertisement` | `mcp_client.py` | Reads the advertisement; threads `_meta` |
| `_invoke_job_aware`, `_normalize_tool_result`, `create_mcp_tools_and_clients(job_executor=)` | `mcp_tool.py` | Builds the neutral `start`/`fetch`; default executor = `LangGraphJobExecutor` |
| `LangGraphJobExecutor` (the only place `interrupt` is called) | `job_executor.py` | Suspend → resume → fetch |

**Key invariants:**

- `interrupt` is confined to `LangGraphJobExecutor`. The neutral core never
imports langgraph; the executor never imports `mcp`.
- The job handle survives the suspend **only** via the `WaitJobRaw` payload
(re-derived from the resumed `Job`), never via closures — the
`@durable_interrupt` body does not re-run on resume.
- Non-job tools on a job-aware server: the server returns a normal result (no
handle); `LangGraphJobExecutor` resolves it via `SkipInterruptValue`
(`_NonJobStartValue`) so the durable index stays aligned with no real suspend.
- Old/non-UiPath servers never advertise → `is_job_aware` stays `False` → today's
plain blocking path (back-compat, zero behavior change).

## Tests

Tests are in `tests/agent/tools/test_mcp/`.
Expand Down
116 changes: 116 additions & 0 deletions src/uipath_langchain/agent/tools/mcp/job_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""LangGraph executor for MCP-backed UiPath jobs.

When an MCP ``tools/call`` starts a UiPath job (the server returns a
``uipath.com/job`` handle), :class:`LangGraphJobExecutor` suspends the LangGraph
agent on that job and resumes when it finishes — the same durable suspend/resume
mechanism :mod:`uipath_langchain.agent.tools.process_tool` uses.

It mirrors ``process_tool`` exactly:

* the START ``tools/call`` runs **inside** ``@durable_interrupt`` so it executes
exactly once (a resume re-runs the node but skips the body);
* it interrupts with ``WaitJobRaw`` so the runtime persists a ``JOB`` resume
trigger (with the ``JOB_RAW`` name → resume without output extraction) and
Orchestrator resumes the agent when the child job reaches a terminal state;
* on resume the body is skipped and ``interrupt(None)`` returns the terminal
``Job``; we re-derive the ``{key, folderKey}`` handle from it and FETCH the
result with a follow-up ``tools/call`` (the server formats the output).

The neutral wire work (building the START/FETCH ``_meta``, parsing the handle)
lives in the sibling :mod:`~uipath_langchain.agent.tools.mcp.jobs` module; this class only owns the *suspend*
policy, so ``interrupt`` is confined here.
"""

from __future__ import annotations

from typing import Any

from uipath.platform.common import WaitJobRaw
from uipath.platform.orchestrator import Job, JobState

from uipath_langchain._utils.durable_interrupt import (
SkipInterruptValue,
durable_interrupt,
)

from .jobs import (
FetchFn,
JobStart,
StartFn,
UiPathJobHandle,
)


class _NonJobStartValue(SkipInterruptValue):
"""Carries a non-job :class:`JobStart` back through ``@durable_interrupt``.

A job-aware client sends the START ``_meta`` on every call, but the server
only returns a handle for job-backed tools. For a normal (non-job) result we
must NOT suspend — yet the ``@durable_interrupt`` body still has to run on
every pass to keep the durable index aligned. Returning this value injects the
result into the scratchpad and resumes immediately, without a real suspend.
"""

def __init__(self, outcome: JobStart) -> None:
self._outcome = outcome

@property
def resume_value(self) -> Any:
"""The :class:`JobStart` to return to the executor without suspending."""
return self._outcome


class LangGraphJobExecutor:
"""``McpJobExecutor`` that suspends the LangGraph agent on the started job."""

async def run(self, *, start: StartFn, fetch: FetchFn, tool_name: str) -> Any:
"""Start the job, suspend until it finishes, then FETCH its result.

Args:
start: Issues the START ``tools/call`` once; returns a :class:`JobStart`.
fetch: Re-calls the tool with the FETCH ``_meta`` for a handle.
tool_name: The MCP tool name (for diagnostics).

Returns:
The FETCH result for a job-backed call, or the normal tool result when
the call did not start a job.
"""

@durable_interrupt
async def _suspend_on_job() -> Any:
outcome = await start()
if outcome.handle is None:
# Non-job tool / no opt-in: do not suspend; carry the result.
return _NonJobStartValue(outcome)
# The whole handle round-trips the suspend via this WaitJobRaw payload.
return WaitJobRaw(
job=Job(
id=0,
key=outcome.handle.job_key,
folder_key=outcome.handle.folder_key,
),
process_folder_key=outcome.handle.folder_key,
)

resumed = await _suspend_on_job()

if isinstance(resumed, JobStart):
# Non-job path: the START result is the tool's output.
return resumed.result

# Resume path: `resumed` is the runtime-materialized terminal raw Job.
# WaitJobRaw skips state validation, so re-check for failure (as process_tool does).
job = resumed
if (job.state or "").lower() == JobState.FAULTED:
return str(
getattr(job, "info", None) or f"Job for tool '{tool_name}' faulted"
)
if not job.key or not job.folder_key:
return str(
getattr(job, "info", None)
or f"Job for tool '{tool_name}' returned no key to fetch its result"
)

# Re-derive the handle from the resumed Job and let the server format the result.
handle = UiPathJobHandle(job_key=job.key, folder_key=job.folder_key)
return await fetch(handle)
Loading