Skip to content

Commit 7351534

Browse files
tzolovchemicL
andauthored
feat: Add Spring WebMVC streamable server transport provider (#425)
- Add WebMvcStreamableServerTransportProvider with SSE support for streamable sessions - Support GET, POST, DELETE endpoints for MCP protocol operations - Implement thread-safe SSE operations using ReentrantLock in WebMvcSseServerTransportProvider - Add test infrastructure with AbstractMcpClientServerIntegrationTests - Refactor WebMvcStreamableIntegrationTests to use parameterized tests - Support testing with both HttpClient and WebFlux transports - Add streamable transport tests for both async and sync server modes - Refactor existing WebMVC SSE integration tests to use shared test base - Add error handling improvements in McpStreamableServerSession - Update dependencies: add json-unit-assertj for enhanced JSON testing - Reorganize POM dependencies and add mcp-spring-webflux test dependency - Wrap handler invocations with Mono.defer for lazy evaluation - Wrap consumer, tool, resource, prompt, and completion handler calls with Mono.defer() - Ensures proper lazy evaluation and error handling in reactive streams Related to #72 Signed-off-by: Christian Tzolov <[email protected]> Signed-off-by: Dariusz Jędrzejczyk <[email protected]> Co-authored-by: Dariusz Jędrzejczyk <[email protected]>
1 parent a8f5a3f commit 7351534

File tree

11 files changed

+2410
-1159
lines changed

11 files changed

+2410
-1159
lines changed

mcp-spring/mcp-spring-webmvc/pom.xml

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,24 @@
2828
<version>0.11.0-SNAPSHOT</version>
2929
</dependency>
3030

31+
<dependency>
32+
<groupId>org.springframework</groupId>
33+
<artifactId>spring-webmvc</artifactId>
34+
<version>${springframework.version}</version>
35+
</dependency>
36+
3137
<dependency>
3238
<groupId>io.modelcontextprotocol.sdk</groupId>
3339
<artifactId>mcp-test</artifactId>
3440
<version>0.11.0-SNAPSHOT</version>
3541
<scope>test</scope>
3642
</dependency>
3743

38-
<dependency>
39-
<groupId>org.springframework</groupId>
40-
<artifactId>spring-webmvc</artifactId>
41-
<version>${springframework.version}</version>
44+
<dependency>
45+
<groupId>io.modelcontextprotocol.sdk</groupId>
46+
<artifactId>mcp-spring-webflux</artifactId>
47+
<version>0.11.0-SNAPSHOT</version>
48+
<scope>test</scope>
4249
</dependency>
4350

4451

mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.time.Duration;
99
import java.util.UUID;
1010
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.locks.ReentrantLock;
1112

1213
import com.fasterxml.jackson.core.type.TypeReference;
1314
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -339,6 +340,12 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
339340

340341
private final SseBuilder sseBuilder;
341342

343+
/**
344+
* Lock to ensure thread-safe access to the SSE builder when sending messages.
345+
* This prevents concurrent modifications that could lead to corrupted SSE events.
346+
*/
347+
private final ReentrantLock sseBuilderLock = new ReentrantLock();
348+
342349
/**
343350
* Creates a new session transport with the specified ID and SSE builder.
344351
* @param sessionId The unique identifier for this session
@@ -358,6 +365,7 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
358365
@Override
359366
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
360367
return Mono.fromRunnable(() -> {
368+
sseBuilderLock.lock();
361369
try {
362370
String jsonText = objectMapper.writeValueAsString(message);
363371
sseBuilder.id(sessionId).event(MESSAGE_EVENT_TYPE).data(jsonText);
@@ -367,6 +375,9 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
367375
logger.error("Failed to send message to session {}: {}", sessionId, e.getMessage());
368376
sseBuilder.error(e);
369377
}
378+
finally {
379+
sseBuilderLock.unlock();
380+
}
370381
});
371382
}
372383

@@ -390,13 +401,17 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
390401
public Mono<Void> closeGracefully() {
391402
return Mono.fromRunnable(() -> {
392403
logger.debug("Closing session transport: {}", sessionId);
404+
sseBuilderLock.lock();
393405
try {
394406
sseBuilder.complete();
395407
logger.debug("Successfully completed SSE builder for session {}", sessionId);
396408
}
397409
catch (Exception e) {
398410
logger.warn("Failed to complete SSE builder for session {}: {}", sessionId, e.getMessage());
399411
}
412+
finally {
413+
sseBuilderLock.unlock();
414+
}
400415
});
401416
}
402417

@@ -405,13 +420,17 @@ public Mono<Void> closeGracefully() {
405420
*/
406421
@Override
407422
public void close() {
423+
sseBuilderLock.lock();
408424
try {
409425
sseBuilder.complete();
410426
logger.debug("Successfully completed SSE builder for session {}", sessionId);
411427
}
412428
catch (Exception e) {
413429
logger.warn("Failed to complete SSE builder for session {}: {}", sessionId, e.getMessage());
414430
}
431+
finally {
432+
sseBuilderLock.unlock();
433+
}
415434
}
416435

417436
}

0 commit comments

Comments
 (0)