Skip to content

Commit a8f5a3f

Browse files
chemicLtzolov
andauthored
feat: add Streamable HTTP Server abstractions and implement WebFlux transport provider (#420)
- Add WebFluxStreamableServerTransportProvider for streamable HTTP sessions - Add WebFluxStatelessServerTransport for stateless HTTP MCP servers - Add McpStatelessServerFeatures, McpStatelessServerHandler, McpStatelessRequestHandler, McpStatelessNotificationHandler - Refactor server architecture to support multiple transport categories: streamable, single-session, stateless - Introduce McpTransportContext for transport-level metadata extraction - Add session management capabilities for streamable HTTP connections - Update MCP protocol version to 2025-03-26 - Add test coverage for new transport implementations - Implement integration tests for both stateless and streamable transports Signed-off-by: Dariusz Jędrzejczyk <[email protected]> Signed-off-by: Christian Tzolov <[email protected]> Co-authored-by: Christian Tzolov <[email protected]>
1 parent c3a0b18 commit a8f5a3f

File tree

53 files changed

+6578
-704
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+6578
-704
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,8 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
341341
McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
342342
McpSchema.JSONRPCResponse.class);
343343
jsonRpcError = jsonRpcResponse.error();
344-
toPropagate = new McpError(jsonRpcError);
344+
toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
345+
: new McpError("Can't parse the jsonResponse " + jsonRpcResponse);
345346
}
346347
catch (IOException ex) {
347348
toPropagate = new RuntimeException("Sending request failed", e);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package io.modelcontextprotocol.server.transport;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import io.modelcontextprotocol.server.McpStatelessServerHandler;
5+
import io.modelcontextprotocol.server.DefaultMcpTransportContext;
6+
import io.modelcontextprotocol.server.McpTransportContextExtractor;
7+
import io.modelcontextprotocol.spec.McpError;
8+
import io.modelcontextprotocol.spec.McpSchema;
9+
import io.modelcontextprotocol.spec.McpStatelessServerTransport;
10+
import io.modelcontextprotocol.server.McpTransportContext;
11+
import io.modelcontextprotocol.util.Assert;
12+
import org.slf4j.Logger;
13+
import org.slf4j.LoggerFactory;
14+
import org.springframework.http.HttpStatus;
15+
import org.springframework.http.MediaType;
16+
import org.springframework.web.reactive.function.server.RouterFunction;
17+
import org.springframework.web.reactive.function.server.RouterFunctions;
18+
import org.springframework.web.reactive.function.server.ServerRequest;
19+
import org.springframework.web.reactive.function.server.ServerResponse;
20+
import reactor.core.publisher.Mono;
21+
22+
import java.io.IOException;
23+
import java.util.List;
24+
import java.util.function.Function;
25+
26+
/**
27+
* Implementation of a WebFlux based {@link McpStatelessServerTransport}.
28+
*
29+
* @author Dariusz Jędrzejczyk
30+
*/
31+
public class WebFluxStatelessServerTransport implements McpStatelessServerTransport {
32+
33+
private static final Logger logger = LoggerFactory.getLogger(WebFluxStatelessServerTransport.class);
34+
35+
private final ObjectMapper objectMapper;
36+
37+
private final String mcpEndpoint;
38+
39+
private final RouterFunction<?> routerFunction;
40+
41+
private McpStatelessServerHandler mcpHandler;
42+
43+
private McpTransportContextExtractor<ServerRequest> contextExtractor;
44+
45+
private volatile boolean isClosing = false;
46+
47+
private WebFluxStatelessServerTransport(ObjectMapper objectMapper, String mcpEndpoint,
48+
McpTransportContextExtractor<ServerRequest> contextExtractor) {
49+
Assert.notNull(objectMapper, "objectMapper must not be null");
50+
Assert.notNull(mcpEndpoint, "mcpEndpoint must not be null");
51+
Assert.notNull(contextExtractor, "contextExtractor must not be null");
52+
53+
this.objectMapper = objectMapper;
54+
this.mcpEndpoint = mcpEndpoint;
55+
this.contextExtractor = contextExtractor;
56+
this.routerFunction = RouterFunctions.route()
57+
.GET(this.mcpEndpoint, this::handleGet)
58+
.POST(this.mcpEndpoint, this::handlePost)
59+
.build();
60+
}
61+
62+
@Override
63+
public void setMcpHandler(McpStatelessServerHandler mcpHandler) {
64+
this.mcpHandler = mcpHandler;
65+
}
66+
67+
@Override
68+
public Mono<Void> closeGracefully() {
69+
return Mono.fromRunnable(() -> this.isClosing = true);
70+
}
71+
72+
/**
73+
* Returns the WebFlux router function that defines the transport's HTTP endpoints.
74+
* This router function should be integrated into the application's web configuration.
75+
*
76+
* <p>
77+
* The router function defines one endpoint handling two HTTP methods:
78+
* <ul>
79+
* <li>GET {messageEndpoint} - Unsupported, returns 405 METHOD NOT ALLOWED</li>
80+
* <li>POST {messageEndpoint} - For handling client requests and notifications</li>
81+
* </ul>
82+
* @return The configured {@link RouterFunction} for handling HTTP requests
83+
*/
84+
public RouterFunction<?> getRouterFunction() {
85+
return this.routerFunction;
86+
}
87+
88+
private Mono<ServerResponse> handleGet(ServerRequest request) {
89+
return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED).build();
90+
}
91+
92+
private Mono<ServerResponse> handlePost(ServerRequest request) {
93+
if (isClosing) {
94+
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
95+
}
96+
97+
McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
98+
99+
List<MediaType> acceptHeaders = request.headers().asHttpHeaders().getAccept();
100+
if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
101+
&& acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
102+
return ServerResponse.badRequest().build();
103+
}
104+
105+
return request.bodyToMono(String.class).<ServerResponse>flatMap(body -> {
106+
try {
107+
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
108+
109+
if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
110+
return this.mcpHandler.handleRequest(transportContext, jsonrpcRequest)
111+
.flatMap(jsonrpcResponse -> ServerResponse.ok()
112+
.contentType(MediaType.APPLICATION_JSON)
113+
.bodyValue(jsonrpcResponse));
114+
}
115+
else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
116+
return this.mcpHandler.handleNotification(transportContext, jsonrpcNotification)
117+
.then(ServerResponse.accepted().build());
118+
}
119+
else {
120+
return ServerResponse.badRequest()
121+
.bodyValue(new McpError("The server accepts either requests or notifications"));
122+
}
123+
}
124+
catch (IllegalArgumentException | IOException e) {
125+
logger.error("Failed to deserialize message: {}", e.getMessage());
126+
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
127+
}
128+
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
129+
}
130+
131+
/**
132+
* Create a builder for the server.
133+
* @return a fresh {@link Builder} instance.
134+
*/
135+
public static Builder builder() {
136+
return new Builder();
137+
}
138+
139+
/**
140+
* Builder for creating instances of {@link WebFluxStatelessServerTransport}.
141+
* <p>
142+
* This builder provides a fluent API for configuring and creating instances of
143+
* WebFluxSseServerTransportProvider with custom settings.
144+
*/
145+
public static class Builder {
146+
147+
private ObjectMapper objectMapper;
148+
149+
private String mcpEndpoint = "/mcp";
150+
151+
private McpTransportContextExtractor<ServerRequest> contextExtractor = (serverRequest, context) -> context;
152+
153+
private Builder() {
154+
// used by a static method
155+
}
156+
157+
/**
158+
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
159+
* messages.
160+
* @param objectMapper The ObjectMapper instance. Must not be null.
161+
* @return this builder instance
162+
* @throws IllegalArgumentException if objectMapper is null
163+
*/
164+
public Builder objectMapper(ObjectMapper objectMapper) {
165+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
166+
this.objectMapper = objectMapper;
167+
return this;
168+
}
169+
170+
/**
171+
* Sets the endpoint URI where clients should send their JSON-RPC messages.
172+
* @param messageEndpoint The message endpoint URI. Must not be null.
173+
* @return this builder instance
174+
* @throws IllegalArgumentException if messageEndpoint is null
175+
*/
176+
public Builder messageEndpoint(String messageEndpoint) {
177+
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
178+
this.mcpEndpoint = messageEndpoint;
179+
return this;
180+
}
181+
182+
/**
183+
* Sets the context extractor that allows providing the MCP feature
184+
* implementations to inspect HTTP transport level metadata that was present at
185+
* HTTP request processing time. This allows to extract custom headers and other
186+
* useful data for use during execution later on in the process.
187+
* @param contextExtractor The contextExtractor to fill in a
188+
* {@link McpTransportContext}.
189+
* @return this builder instance
190+
* @throws IllegalArgumentException if contextExtractor is null
191+
*/
192+
public Builder contextExtractor(McpTransportContextExtractor<ServerRequest> contextExtractor) {
193+
Assert.notNull(contextExtractor, "Context extractor must not be null");
194+
this.contextExtractor = contextExtractor;
195+
return this;
196+
}
197+
198+
/**
199+
* Builds a new instance of {@link WebFluxStatelessServerTransport} with the
200+
* configured settings.
201+
* @return A new WebFluxSseServerTransportProvider instance
202+
* @throws IllegalStateException if required parameters are not set
203+
*/
204+
public WebFluxStatelessServerTransport build() {
205+
Assert.notNull(objectMapper, "ObjectMapper must be set");
206+
Assert.notNull(mcpEndpoint, "Message endpoint must be set");
207+
208+
return new WebFluxStatelessServerTransport(objectMapper, mcpEndpoint, contextExtractor);
209+
}
210+
211+
}
212+
213+
}

0 commit comments

Comments
 (0)