Skip to content

Commit 92ca81f

Browse files
committed
WIP: Handle logging notifications, start with integration tests and async server tests
Signed-off-by: Dariusz Jędrzejczyk <[email protected]>
1 parent 8e9ab52 commit 92ca81f

File tree

15 files changed

+1799
-174
lines changed

15 files changed

+1799
-174
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,8 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
340340
McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
341341
McpSchema.JSONRPCResponse.class);
342342
jsonRpcError = jsonRpcResponse.error();
343-
toPropagate = new McpError(jsonRpcError);
343+
toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
344+
: new McpError("Can't parse the jsonResponse " + jsonRpcResponse);
344345
}
345346
catch (IOException ex) {
346347
toPropagate = new RuntimeException("Sending request failed", e);

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe
3535

3636
public static final String MESSAGE_EVENT_TYPE = "message";
3737

38-
public static final String ENDPOINT_EVENT_TYPE = "endpoint";
39-
4038
public static final String DEFAULT_BASE_URL = "";
4139

4240
private final ObjectMapper objectMapper;
@@ -263,17 +261,28 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
263261
McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory
264262
.startSession(initializeRequest);
265263
sessions.put(init.session().getId(), init.session());
266-
return init.initResult()
264+
return init.initResult().map(initializeResult -> {
265+
McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
266+
McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
267+
try {
268+
return this.objectMapper.writeValueAsString(jsonrpcResponse);
269+
}
270+
catch (IOException e) {
271+
logger.warn("Failed to serialize initResponse", e);
272+
throw Exceptions.propagate(e);
273+
}
274+
})
267275
.flatMap(initResult -> ServerResponse.ok()
276+
.contentType(MediaType.APPLICATION_JSON)
268277
.header("mcp-session-id", init.session().getId())
269278
.bodyValue(initResult));
270279
}
271280

272-
if (!request.headers().asHttpHeaders().containsKey("sessionId")) {
281+
if (!request.headers().asHttpHeaders().containsKey("mcp-session-id")) {
273282
return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing"));
274283
}
275284

276-
String sessionId = request.headers().asHttpHeaders().getFirst("sessionId");
285+
String sessionId = request.headers().asHttpHeaders().getFirst("mcp-session-id");
277286
McpStreamableServerSession session = sessions.get(sessionId);
278287

279288
if (session == null) {
@@ -308,7 +317,9 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
308317
logger.error("Failed to deserialize message: {}", e.getMessage());
309318
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
310319
}
311-
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
320+
})
321+
.switchIfEmpty(ServerResponse.badRequest().build())
322+
.contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
312323
}
313324

314325
private Mono<ServerResponse> handleDelete(ServerRequest request) {

0 commit comments

Comments
 (0)