Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,7 @@
import io.modelcontextprotocol.json.McpJsonDefaults;
import io.modelcontextprotocol.json.McpJsonMapper;
import io.modelcontextprotocol.json.TypeRef;
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportException;
import io.modelcontextprotocol.spec.McpTransportSession;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.McpTransportStream;
import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.spec.*;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - we typically don't do star imports.

import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import org.reactivestreams.Publisher;
Expand Down Expand Up @@ -189,14 +179,6 @@ private McpTransportSession<Disposable> createTransportSession() {
return new DefaultMcpTransportSession(onClose);
}

private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
var existingSessionId = Optional.ofNullable(existingSession)
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
.flatMap(McpTransportSession::sessionId)
.orElse(null);
return new ClosedMcpTransportSession<>(existingSessionId);
}

private Publisher<Void> createDelete(String sessionId) {

var uri = Utils.resolveUri(this.baseUri, this.endpoint);
Expand Down Expand Up @@ -240,7 +222,8 @@ private void handleException(Throwable t) {
public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
logger.debug("Graceful close triggered");
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
McpTransportSession<Disposable> currentSession = this.activeSession
.getAndSet(ClosedMcpTransportSession.INSTANCE);
if (currentSession != null) {
return Mono.from(currentSession.closeGracefully());
}
Expand All @@ -250,6 +233,19 @@ public Mono<Void> closeGracefully() {

private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
return Mono.deferContextual(ctx -> {
var rh = this.handler.get();
if (rh == null) {
logger.warn("Transport has no request handler registered. Remember to call connect!");
}

final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));

final McpTransportSession<Disposable> transportSession = this.activeSession.get();

if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
throw new McpTransportSessionClosedException();
}

if (stream != null) {
logger.debug("Reconnecting stream {} with lastId {}", stream.streamId(), stream.lastId());
Expand All @@ -259,7 +255,7 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
}

final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
final McpTransportSession<Disposable> transportSession = this.activeSession.get();

var uri = Utils.resolveUri(this.baseUri, this.endpoint);

Disposable connection = Mono.deferContextual(connectionCtx -> {
Expand Down Expand Up @@ -389,18 +385,18 @@ else if (statusCode == BAD_REQUEST) {
"Received unrecognized SSE event type: " + sseResponseEvent.sseEvent().event()));
})
.retryWhen(authorizationErrorRetrySpec())
.flatMap(jsonrpcMessage -> this.handler.get().apply(Mono.just(jsonrpcMessage)))
.flatMap(jsonrpcMessage -> requestHandler.apply(Mono.just(jsonrpcMessage)))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this leak?

.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
this.handleException(t);
return true;
})
.doFinally(s -> {
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
}
}))
.onErrorComplete(t -> {
this.handleException(t);
return true;
})
.contextWrite(ctx)
.subscribe();

Expand Down Expand Up @@ -467,10 +463,23 @@ public String toString(McpSchema.JSONRPCMessage message) {

public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
return Mono.create(deliveredSink -> {
var rh = this.handler.get();
if (rh == null) {
logger.warn("Transport has no request handler registered. Remember to call connect!");
}

final Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> requestHandler = rh != null
? rh : msg -> Mono.error(new IllegalStateException("No request handler"));

var transportSession = this.activeSession.get();

if (ClosedMcpTransportSession.INSTANCE.equals(transportSession)) {
throw new McpTransportSessionClosedException();
}

logger.debug("Sending message {}", sentMessage);

final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
final McpTransportSession<Disposable> transportSession = this.activeSession.get();

var uri = Utils.resolveUri(this.baseUri, this.endpoint);
String jsonBody = this.toString(sentMessage);
Expand Down Expand Up @@ -643,22 +652,26 @@ else if (statusCode == BAD_REQUEST) {
new RuntimeException("Failed to send message: " + responseEvent));
})
.retryWhen(authorizationErrorRetrySpec())
.flatMap(jsonRpcMessage -> this.handler.get().apply(Mono.just(jsonRpcMessage)))
.flatMap(jsonRpcMessage -> requestHandler.apply(Mono.just(jsonRpcMessage)))
.onErrorMap(CompletionException.class, t -> t.getCause())
.onErrorComplete(t -> {
// handle the error first
this.handleException(t);
// inform the caller of sendMessage
deliveredSink.error(t);
return true;
})
.doFinally(s -> {
logger.debug("SendMessage finally: {}", s);
Disposable ref = disposableRef.getAndSet(null);
if (ref != null) {
transportSession.removeConnection(ref);
}
})).contextWrite(deliveredSink.contextView()).subscribe();
})).onErrorComplete(t -> {
// handle the error first
try {
this.handleException(t);
}
catch (Exception e) {
logger.error("Error handling exception {}", t.getMessage(), e);
}
// inform the caller of sendMessage
deliveredSink.error(t);
return true;
}).contextWrite(deliveredSink.contextView()).subscribe();

disposableRef.set(connection);
transportSession.addConnection(connection);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,41 @@
import java.util.Optional;

import org.reactivestreams.Publisher;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;

/**
* Represents a closed MCP session, which may not be reused. All calls will throw a
* {@link McpTransportSessionClosedException}.
* Represents a closed MCP session, which may not be reused.
*
* @param <CONNECTION> the resource representing the connection that the transport
* manages.
* @author Daniel Garnier-Moiroux
* @author Dariusz Jędrzejczyk
*/
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {
public final class ClosedMcpTransportSession implements McpTransportSession<Disposable> {

private final String sessionId;
public static final ClosedMcpTransportSession INSTANCE = new ClosedMcpTransportSession();

public ClosedMcpTransportSession(@Nullable String sessionId) {
this.sessionId = sessionId;
private ClosedMcpTransportSession() {
}

@Override
public Optional<String> sessionId() {
throw new McpTransportSessionClosedException(sessionId);
return Optional.empty();
}

@Override
public boolean markInitialized(String sessionId) {
throw new McpTransportSessionClosedException(sessionId);
throw new IllegalStateException("MCP Session is already closed");
}

@Override
public void addConnection(CONNECTION connection) {
throw new McpTransportSessionClosedException(sessionId);
public void addConnection(Disposable connection) {
throw new IllegalStateException("MCP Session is already closed");
}

@Override
public void removeConnection(CONNECTION connection) {
throw new McpTransportSessionClosedException(sessionId);
public void removeConnection(Disposable connection) {
throw new IllegalStateException("MCP Session is already closed");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport,
this.requestHandlers.putAll(requestHandlers);
this.notificationHandlers.putAll(notificationHandlers);

this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe();
this.transport.connect(mono -> mono.doOnNext(this::handle)).transform(connectHook).subscribe(ignored -> {
}, error -> logger.warn("Client failed during connect", error));
}

private void dismissPendingResponses() {
Expand Down Expand Up @@ -160,7 +161,15 @@ else if (message instanceof McpSchema.JSONRPCRequest request) {
var errorResponse = McpSchema.JSONRPCResponse.error(request.id(), jsonRpcError);
return Mono.just(errorResponse);
}).flatMap(this.transport::sendMessage).onErrorComplete(t -> {
logger.warn("Issue sending response to the client, ", t);
if (t instanceof McpTransportSessionClosedException) {
logger.debug("Can't send response to request {} when the transport is closed", request.id());
}
else if (McpTransport.isPeerClosed(t)) {
logger.debug("Can't send response to request {}: connection closed by peer", request.id(), t);
}
else {
logger.warn("Failed to send response to the server", t);
}
return true;
}).subscribe();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
import io.modelcontextprotocol.json.TypeRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/**
Expand Down Expand Up @@ -39,6 +41,8 @@
*/
public interface McpTransport {

Logger logger = LoggerFactory.getLogger(McpTransport.class);

/**
* Closes the transport connection and releases any associated resources.
*
Expand All @@ -48,7 +52,24 @@ public interface McpTransport {
* </p>
*/
default void close() {
this.closeGracefully().subscribe();
this.closeGracefully().subscribe(ignored -> {
}, error -> {
if (isPeerClosed(error)) {
logger.debug("Error during asynchronous close", error);
}
else {
logger.warn("Error during asynchronous close", error);
}
});
}

static boolean isPeerClosed(Throwable t) {
for (Throwable c = t; c != null; c = c.getCause()) {
if (c instanceof java.io.EOFException) {
return true;
}
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@
* @see ClosedMcpTransportSession
* @author Daniel Garnier-Moiroux
*/

public class McpTransportSessionClosedException extends RuntimeException {

public McpTransportSessionClosedException() {
super("Transport has already been closed.");
}

@Deprecated(forRemoval = true)
public McpTransportSessionClosedException(@Nullable String sessionId) {
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
: "MCP session has been closed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ void testRootsListChanged() {
void testInitializeWithRootsListProviders() {
withClient(createMcpTransport(),
builder -> builder.roots(Root.builder("file:///test/path").name("test-root").build()), client -> {
StepVerifier.create(client.initialize().then(client.closeGracefully())).verifyComplete();
StepVerifier.create(client.initialize()).expectNextCount(1).verifyComplete();
});
}

Expand Down Expand Up @@ -725,8 +725,6 @@ void testLoggingConsumer() {
builder -> builder.loggingConsumer(notification -> Mono.fromRunnable(() -> logReceived.set(true))),
client -> {
StepVerifier.create(client.initialize()).expectNextMatches(Objects::nonNull).verifyComplete();
StepVerifier.create(client.closeGracefully()).verifyComplete();

});

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
import io.modelcontextprotocol.common.McpTransportContext;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
import io.modelcontextprotocol.spec.ProtocolVersions;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -139,13 +142,14 @@ void testCloseUninitialized() {
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.METHOD_INITIALIZE, "test-id", initializeRequest);

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMessage("MCP session has been closed")
.expectErrorMessage("Transport has already been closed.")
.verify();
}

@Test
void testCloseInitialized() {
var transport = HttpClientStreamableHttpTransport.builder(host).build();
// transport.connect(Function.identity()).block();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come we don't need to call "connect" anymore?
If we are not "connect"-ed, shouldn't sendMessage error rather than complete?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, notice that this commented line is added. Previously there was no connect at all and it generated drops. Now you get a warning.

The thing is that for streamable http there is no "connect", it just sets up a json-rpc handler.
If you call sendMessage it is an asynchronous operation. We don't have any feedback on the sending whether the server responded. The historical reason is the separate channel on STDIO and on the legacy SSE transport. With Streamable HTTP it should actually be possible to change this paradigm but the intention at the time was not to break the common interfaces.

Since the test case used to work without the connect call, I didn't want to suddenly break it.

Apparently this test passes despite the transport ignoring all the responses.

WDYT should be the right behaviour? I can either remove the commented code (it slipped, but maybe it's good that it stayed here so we can have the discussion) or uncomment it and have the sendMessage fail if connect was not setup.


var initializeRequest = McpSchema.InitializeRequest
.builder(ProtocolVersions.MCP_2025_11_25, McpSchema.ClientCapabilities.builder().roots(true).build(),
Expand All @@ -157,7 +161,8 @@ void testCloseInitialized() {
StepVerifier.create(transport.closeGracefully()).verifyComplete();

StepVerifier.create(transport.sendMessage(testMessage))
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
.expectErrorMatches(err -> err instanceof McpTransportSessionClosedException
&& err.getMessage().contains("Transport has already been closed"))
.verify();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void setUp() {

@AfterEach
void tearDown() {
requestCustomizer.reset();
mcpClient.close();
}

Expand Down
Loading