Skip to content

Commit 8e9ab52

Browse files
committed
WIP: add SSE id generation and DELETE handling
Signed-off-by: Dariusz Jędrzejczyk <[email protected]>
1 parent 6423fcd commit 8e9ab52

File tree

3 files changed

+87
-11
lines changed

3 files changed

+87
-11
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
import io.modelcontextprotocol.spec.DefaultMcpTransportContext;
66
import io.modelcontextprotocol.spec.McpError;
77
import io.modelcontextprotocol.spec.McpSchema;
8-
import io.modelcontextprotocol.spec.McpServerTransport;
98
import io.modelcontextprotocol.spec.McpStreamableServerSession;
9+
import io.modelcontextprotocol.spec.McpStreamableServerTransport;
1010
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
1111
import io.modelcontextprotocol.spec.McpTransportContext;
1212
import io.modelcontextprotocol.util.Assert;
@@ -45,6 +45,8 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe
4545

4646
private final String mcpEndpoint;
4747

48+
private final boolean disallowDelete;
49+
4850
private final RouterFunction<?> routerFunction;
4951

5052
private McpStreamableServerSession.Factory sessionFactory;
@@ -70,7 +72,7 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe
7072
* @throws IllegalArgumentException if either parameter is null
7173
*/
7274
public WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint) {
73-
this(objectMapper, DEFAULT_BASE_URL, mcpEndpoint);
75+
this(objectMapper, DEFAULT_BASE_URL, mcpEndpoint, false);
7476
}
7577

7678
/**
@@ -83,17 +85,20 @@ public WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, Strin
8385
* setup. Must not be null.
8486
* @throws IllegalArgumentException if either parameter is null
8587
*/
86-
public WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String mcpEndpoint) {
88+
public WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String mcpEndpoint,
89+
boolean disallowDelete) {
8790
Assert.notNull(objectMapper, "ObjectMapper must not be null");
8891
Assert.notNull(baseUrl, "Message base path must not be null");
8992
Assert.notNull(mcpEndpoint, "Message endpoint must not be null");
9093

9194
this.objectMapper = objectMapper;
9295
this.baseUrl = baseUrl;
9396
this.mcpEndpoint = mcpEndpoint;
97+
this.disallowDelete = disallowDelete;
9498
this.routerFunction = RouterFunctions.route()
9599
.GET(this.mcpEndpoint, this::handleGet)
96100
.POST(this.mcpEndpoint, this::handlePost)
101+
.DELETE(this.mcpEndpoint, this::handleDelete)
97102
.build();
98103
}
99104

@@ -306,7 +311,37 @@ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
306311
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
307312
}
308313

309-
private class WebFluxStreamableMcpSessionTransport implements McpServerTransport {
314+
private Mono<ServerResponse> handleDelete(ServerRequest request) {
315+
if (isClosing) {
316+
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
317+
}
318+
319+
McpTransportContext transportContext = this.contextExtractor.apply(request);
320+
321+
return Mono.defer(() -> {
322+
if (!request.headers().asHttpHeaders().containsKey("mcp-session-id")) {
323+
return ServerResponse.badRequest().build(); // TODO: say we need a session
324+
// id
325+
}
326+
327+
// TODO: The user can configure whether deletions are permitted
328+
if (this.disallowDelete) {
329+
return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED).build();
330+
}
331+
332+
String sessionId = request.headers().asHttpHeaders().getFirst("mcp-session-id");
333+
334+
McpStreamableServerSession session = this.sessions.get(sessionId);
335+
336+
if (session == null) {
337+
return ServerResponse.notFound().build();
338+
}
339+
340+
return session.delete().then(ServerResponse.ok().build());
341+
}).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
342+
}
343+
344+
private class WebFluxStreamableMcpSessionTransport implements McpStreamableServerTransport {
310345

311346
private final FluxSink<ServerSentEvent<?>> sink;
312347

@@ -316,6 +351,11 @@ public WebFluxStreamableMcpSessionTransport(FluxSink<ServerSentEvent<?>> sink) {
316351

317352
@Override
318353
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
354+
return this.sendMessage(message, null);
355+
}
356+
357+
@Override
358+
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId) {
319359
return Mono.fromSupplier(() -> {
320360
try {
321361
return objectMapper.writeValueAsString(message);
@@ -325,6 +365,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
325365
}
326366
}).doOnNext(jsonText -> {
327367
ServerSentEvent<Object> event = ServerSentEvent.builder()
368+
.id(messageId)
328369
.event(MESSAGE_EVENT_TYPE)
329370
.data(jsonText)
330371
.build();
@@ -419,7 +460,7 @@ public WebFluxStreamableServerTransportProvider build() {
419460
Assert.notNull(objectMapper, "ObjectMapper must be set");
420461
Assert.notNull(mcpEndpoint, "Message endpoint must be set");
421462

422-
return new WebFluxStreamableServerTransportProvider(objectMapper, baseUrl, mcpEndpoint);
463+
return new WebFluxStreamableServerTransportProvider(objectMapper, baseUrl, mcpEndpoint, false);
423464
}
424465

425466
}

mcp/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@
1212

1313
import java.time.Duration;
1414
import java.util.Map;
15+
import java.util.UUID;
1516
import java.util.concurrent.ConcurrentHashMap;
1617
import java.util.concurrent.atomic.AtomicInteger;
1718
import java.util.concurrent.atomic.AtomicLong;
1819
import java.util.concurrent.atomic.AtomicReference;
20+
import java.util.function.Supplier;
1921

2022
public class McpStreamableServerSession implements McpSession {
2123

@@ -88,7 +90,13 @@ public Mono<Void> sendNotification(String method, Object params) {
8890
});
8991
}
9092

91-
public McpStreamableServerSessionStream listeningStream(McpServerTransport transport) {
93+
public Mono<Void> delete() {
94+
return this.closeGracefully().then(Mono.fromRunnable(() -> {
95+
// delete history, etc.
96+
}));
97+
}
98+
99+
public McpStreamableServerSessionStream listeningStream(McpStreamableServerTransport transport) {
92100
McpStreamableServerSessionStream listeningStream = new McpStreamableServerSessionStream(transport);
93101
this.listeningStreamRef.set(listeningStream);
94102
return listeningStream;
@@ -100,7 +108,7 @@ public Flux<McpSchema.JSONRPCMessage> replay(Object lastEventId) {
100108
return Flux.empty();
101109
}
102110

103-
public Mono<Void> responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpServerTransport transport) {
111+
public Mono<Void> responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStreamableServerTransport transport) {
104112
return Mono.deferContextual(ctx -> {
105113
McpTransportContext transportContext = ctx.getOrDefault(McpTransportContext.KEY, McpTransportContext.EMPTY);
106114

@@ -235,10 +243,18 @@ public final class McpStreamableServerSessionStream implements McpSession {
235243

236244
private final ConcurrentHashMap<Object, MonoSink<McpSchema.JSONRPCResponse>> pendingResponses = new ConcurrentHashMap<>();
237245

238-
private final McpServerTransport transport;
246+
private final McpStreamableServerTransport transport;
247+
248+
private final String transportId;
249+
250+
private final Supplier<String> uuidGenerator;
239251

240-
public McpStreamableServerSessionStream(McpServerTransport transport) {
252+
public McpStreamableServerSessionStream(McpStreamableServerTransport transport) {
241253
this.transport = transport;
254+
this.transportId = UUID.randomUUID().toString();
255+
// This ID design allows for a constant-time extraction of the history by
256+
// precisely identifying the SSE stream using the first component
257+
this.uuidGenerator = () -> this.transportId + "_" + UUID.randomUUID();
242258
}
243259

244260
@Override
@@ -251,7 +267,9 @@ public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReferenc
251267
this.pendingResponses.put(requestId, sink);
252268
McpSchema.JSONRPCRequest jsonrpcRequest = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION,
253269
method, requestId, requestParams);
254-
this.transport.sendMessage(jsonrpcRequest).subscribe(v -> {
270+
String messageId = this.uuidGenerator.get();
271+
// TODO: store message in history
272+
this.transport.sendMessage(jsonrpcRequest, messageId).subscribe(v -> {
255273
}, sink::error);
256274
}).timeout(requestTimeout).doOnError(e -> {
257275
this.pendingResponses.remove(requestId);
@@ -275,7 +293,9 @@ public <T> Mono<T> sendRequest(String method, Object requestParams, TypeReferenc
275293
public Mono<Void> sendNotification(String method, Object params) {
276294
McpSchema.JSONRPCNotification jsonrpcNotification = new McpSchema.JSONRPCNotification(
277295
McpSchema.JSONRPC_VERSION, method, params);
278-
return this.transport.sendMessage(jsonrpcNotification);
296+
String messageId = this.uuidGenerator.get();
297+
// TODO: store message in history
298+
return this.transport.sendMessage(jsonrpcNotification, messageId);
279299
}
280300

281301
@Override
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.modelcontextprotocol.spec;
2+
3+
import reactor.core.publisher.Mono;
4+
5+
/**
6+
* Marker interface for the server-side MCP transport.
7+
*
8+
* @author Christian Tzolov
9+
* @author Dariusz Jędrzejczyk
10+
*/
11+
public interface McpStreamableServerTransport extends McpServerTransport {
12+
13+
Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId);
14+
15+
}

0 commit comments

Comments
 (0)