Skip to content

Feature/mcp transport context for http servlet sse server transport provider #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
1e93776
feat: enforce MCP request ID validation requirements
tzolov Jul 16, 2025
4937fc1
fix: improve compatibility with non-compliant MCP servers (#413)
tzolov Jul 18, 2025
c3a0b18
fix: improve SSE event handling to gracefully ignore unrecognized eve…
tzolov Jul 24, 2025
a8f5a3f
feat: add Streamable HTTP Server abstractions and implement WebFlux t…
chemicL Jul 30, 2025
7351534
feat: Add Spring WebMVC streamable server transport provider (#425)
tzolov Jul 30, 2025
bde1b6b
feat: HttpServlet Streamable HTTP server implementation (#290)
ZachGerman Jul 30, 2025
ae2fc86
Introduce HttpRequest.Builder customizer for HttpClient-based transport
Kehrlann Jul 11, 2025
ee2c891
feat: implement MCP-compliant keep-alive functionality for server tra…
tzolov Jul 30, 2025
67de84b
Expose the client initialization result
ilayaperumalg Jul 25, 2025
9d58621
feat: implement MCP protocol version per transport support (#404)
tzolov Jul 30, 2025
8f03a40
feat: add WebMVC and HttpServlet stateless server transports
tzolov Jul 30, 2025
34d2d84
Add missing POM descriptions
tzolov Jul 31, 2025
6384d79
Release version 0.11.0
tzolov Jul 31, 2025
bead532
Next development version
tzolov Jul 31, 2025
1664401
Next development version
tzolov Jul 31, 2025
b4887ad
Release version 0.11.0
tzolov Jul 31, 2025
8dcb217
Next development version
tzolov Jul 31, 2025
28edd36
revert
tzolov Jul 31, 2025
bf2c8c9
Next development version
tzolov Jul 31, 2025
3e4d5de
Update copyright headers to 2025 and clean up code
tzolov Aug 1, 2025
577952d
Fix typo
quaff Jun 12, 2025
5e035ea
fix: add backward compatibility for MCP servers returning older proto…
tzolov Aug 4, 2025
3a95f75
Remove duplicate header MCP_PROTOCOL_VERSION
quaff Aug 5, 2025
5aba8a0
fix: handle empty JSON responses in ResponseSubscribers
codezkk Aug 1, 2025
7f37ddc
Use `Last-Event-ID` instead of `last-event-id`
quaff Aug 5, 2025
110a8d1
refactor: downgrade unhandled notification logging from error to warn
tzolov Aug 6, 2025
0327165
Fix httpRequestCustomizer usage in HttpClientStreamableHttpTransport
Kehrlann Aug 7, 2025
4532b61
feat: handle SSE comment messages
tzolov Aug 7, 2025
a14ef42
If a handler throws McpError, use its values for the RPC error
Randgalt Aug 7, 2025
1edd1b6
feat: Add builder pattern for McpError and mutate method for capabili…
tzolov Aug 8, 2025
cbfdb14
refactor: extract common integration test logic into abstract base cl…
tzolov Aug 8, 2025
eb427ad
refactor: improve integration tests stability
tzolov Aug 9, 2025
789d875
test transportContext present for both HttpServletSseServerTransportP…
stantonk Aug 10, 2025
7d5fcb3
add McpTransportContext capability to HttpServletSseServerTransportPr…
stantonk Aug 9, 2025
04b562d
remove TODO/debug code
stantonk Aug 10, 2025
f2efd30
remove unused import
stantonk Aug 10, 2025
991aa4a
fix comment
stantonk Aug 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mcp-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.12.0-SNAPSHOT</version>
</parent>

<artifactId>mcp-bom</artifactId>
Expand Down
10 changes: 5 additions & 5 deletions mcp-spring/mcp-spring-webflux/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
<parent>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.12.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>mcp-spring-webflux</artifactId>
<packaging>jar</packaging>
<name>WebFlux implementation of the Java MCP SSE transport</name>
<description></description>
<name>WebFlux transports</name>
<description>WebFlux implementation for the SSE and Streamable Http Client and Server transports</description>
<url>https://github.com/modelcontextprotocol/java-sdk</url>

<scm>
Expand All @@ -25,13 +25,13 @@
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.12.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-test</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.12.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/*
* Copyright 2025-2025 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import java.io.IOException;
Expand All @@ -23,13 +27,16 @@

import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportSession;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.McpTransportStream;
import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -65,6 +72,8 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {

private static final Logger logger = LoggerFactory.getLogger(WebClientStreamableHttpTransport.class);

private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2025_03_26;

private static final String DEFAULT_ENDPOINT = "/mcp";

/**
Expand Down Expand Up @@ -102,6 +111,11 @@ private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bu
this.activeSession.set(createTransportSession());
}

@Override
public List<String> protocolVersions() {
return List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26);
}

/**
* Create a stateful builder for creating {@link WebClientStreamableHttpTransport}
* instances.
Expand All @@ -127,12 +141,17 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem

private DefaultMcpTransportSession createTransportSession() {
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
httpHeaders.add("mcp-session-id", sessionId);
}).retrieve().toBodilessEntity().onErrorComplete(e -> {
logger.warn("Got error when closing transport", e);
return true;
}).then();
: webClient.delete()
.uri(this.endpoint)
.header(HttpHeaders.MCP_SESSION_ID, sessionId)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.retrieve()
.toBodilessEntity()
.onErrorComplete(e -> {
logger.warn("Got error when closing transport", e);
return true;
})
.then();
return new DefaultMcpTransportSession(onClose);
}

Expand Down Expand Up @@ -185,10 +204,11 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
Disposable connection = webClient.get()
.uri(this.endpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.headers(httpHeaders -> {
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
if (stream != null) {
stream.lastId().ifPresent(id -> httpHeaders.add("last-event-id", id));
stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id));
}
})
.exchangeToFlux(response -> {
Expand Down Expand Up @@ -244,14 +264,15 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {

Disposable connection = webClient.post()
.uri(this.endpoint)
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.headers(httpHeaders -> {
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
})
.bodyValue(message)
.exchangeToFlux(response -> {
if (transportSession
.markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"))) {
.markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) {
// Once we have a session, we try to open an async stream for
// the server to send notifications and requests out-of-band.
reconnect(null).contextWrite(sink.contextView()).subscribe();
Expand Down Expand Up @@ -287,7 +308,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
logger.trace("Received response to POST for session {}", sessionRepresentation);
// communicate to caller the message was delivered
sink.success();
return responseFlux(response);
return directResponseFlux(message, response);
}
else {
logger.warn("Unknown media type {} returned for POST in session {}", contentType,
Expand Down Expand Up @@ -340,7 +361,8 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
McpSchema.JSONRPCResponse.class);
jsonRpcError = jsonRpcResponse.error();
toPropagate = new McpError(jsonRpcError);
toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
: new McpError("Can't parse the jsonResponse " + jsonRpcResponse);
}
catch (IOException ex) {
toPropagate = new RuntimeException("Sending request failed", e);
Expand Down Expand Up @@ -384,14 +406,22 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
return transportSession.sessionId().orElse("[missing_session_id]");
}

private Flux<McpSchema.JSONRPCMessage> responseFlux(ClientResponse response) {
private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessage sentMessage,
ClientResponse response) {
return response.bodyToMono(String.class).<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> {
try {
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseMessage);
s.next(List.of(jsonRpcResponse));
if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) {
logger.warn("Notification: {} received non-compliant response: {}", sentMessage, responseMessage);
s.complete();
}
else {
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseMessage);
s.next(List.of(jsonRpcResponse));
}
}
catch (IOException e) {
// TODO: this should be a McpTransportError
s.error(e);
}
}).flatMapIterable(Function.identity());
Expand Down Expand Up @@ -423,7 +453,8 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
}
}
else {
throw new McpError("Received unrecognized SSE event type: " + event.event());
logger.debug("Received SSE event with type: {}", event);
return Tuples.of(Optional.empty(), List.of());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
/*
* Copyright 2024 - 2024 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import java.io.IOException;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCMessage;
import io.modelcontextprotocol.spec.ProtocolVersions;
import io.modelcontextprotocol.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -62,6 +67,8 @@ public class WebFluxSseClientTransport implements McpClientTransport {

private static final Logger logger = LoggerFactory.getLogger(WebFluxSseClientTransport.class);

private static final String MCP_PROTOCOL_VERSION = ProtocolVersions.MCP_2024_11_05;

/**
* Event type for JSON-RPC messages received through the SSE connection. The server
* sends messages with this event type to transmit JSON-RPC protocol data.
Expand Down Expand Up @@ -166,6 +173,11 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
this.sseEndpoint = sseEndpoint;
}

@Override
public List<String> protocolVersions() {
return List.of(MCP_PROTOCOL_VERSION);
}

/**
* Establishes a connection to the MCP server using Server-Sent Events (SSE). This
* method initiates the SSE connection and sets up the message processing pipeline.
Expand Down Expand Up @@ -216,7 +228,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
}
}
else {
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
logger.debug("Received unrecognized SSE event type: {}", event);
s.complete();
}
}).transform(handler)).subscribe();

Expand Down Expand Up @@ -249,6 +262,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
return webClient.post()
.uri(messageEndpointUri)
.contentType(MediaType.APPLICATION_JSON)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.bodyValue(jsonText)
.retrieve()
.toBodilessEntity()
Expand Down Expand Up @@ -281,6 +295,7 @@ protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
.get()
.uri(this.sseEndpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.retrieve()
.bodyToFlux(SSE_TYPE)
.retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler)));
Expand Down
Loading