Skip to content

Commit f4eb55b

Browse files
committed
Merge branch 'main' into streamable-server-webmvc
2 parents b204cd8 + a8f5a3f commit f4eb55b

File tree

9 files changed

+97
-17
lines changed

9 files changed

+97
-17
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
3131
import io.modelcontextprotocol.spec.McpTransportStream;
3232
import io.modelcontextprotocol.util.Assert;
33+
import io.modelcontextprotocol.util.Utils;
3334
import reactor.core.Disposable;
3435
import reactor.core.publisher.Flux;
3536
import reactor.core.publisher.Mono;
@@ -244,7 +245,7 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
244245

245246
Disposable connection = webClient.post()
246247
.uri(this.endpoint)
247-
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
248+
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
248249
.headers(httpHeaders -> {
249250
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
250251
})
@@ -287,7 +288,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
287288
logger.trace("Received response to POST for session {}", sessionRepresentation);
288289
// communicate to caller the message was delivered
289290
sink.success();
290-
return responseFlux(response);
291+
return directResponseFlux(message, response);
291292
}
292293
else {
293294
logger.warn("Unknown media type {} returned for POST in session {}", contentType,
@@ -385,14 +386,22 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
385386
return transportSession.sessionId().orElse("[missing_session_id]");
386387
}
387388

388-
private Flux<McpSchema.JSONRPCMessage> responseFlux(ClientResponse response) {
389+
private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessage sentMessage,
390+
ClientResponse response) {
389391
return response.bodyToMono(String.class).<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> {
390392
try {
391-
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
392-
responseMessage);
393-
s.next(List.of(jsonRpcResponse));
393+
if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) {
394+
logger.warn("Notification: {} received non-compliant response: {}", sentMessage, responseMessage);
395+
s.complete();
396+
}
397+
else {
398+
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
399+
responseMessage);
400+
s.next(List.of(jsonRpcResponse));
401+
}
394402
}
395403
catch (IOException e) {
404+
// TODO: this should be a McpTransportError
396405
s.error(e);
397406
}
398407
}).flatMapIterable(Function.identity());
@@ -424,7 +433,8 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
424433
}
425434
}
426435
else {
427-
throw new McpError("Received unrecognized SSE event type: " + event.event());
436+
logger.debug("Received SSE event with type: {}", event);
437+
return Tuples.of(Optional.empty(), List.of());
428438
}
429439
}
430440

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
216216
}
217217
}
218218
else {
219-
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
219+
logger.debug("Received unrecognized SSE event type: {}", event);
220+
s.complete();
220221
}
221222
}).transform(handler)).subscribe();
222223

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.time.Duration;
88
import java.util.Map;
9+
import java.util.concurrent.CopyOnWriteArrayList;
910
import java.util.concurrent.atomic.AtomicInteger;
1011
import java.util.function.Function;
1112

@@ -77,6 +78,11 @@ public int getInboundMessageCount() {
7778
return inboundMessageCount.get();
7879
}
7980

81+
public void simulateSseComment(String comment) {
82+
events.tryEmitNext(ServerSentEvent.<String>builder().comment(comment).build());
83+
inboundMessageCount.incrementAndGet();
84+
}
85+
8086
public void simulateEndpointEvent(String jsonMessage) {
8187
events.tryEmitNext(ServerSentEvent.<String>builder().event("endpoint").data(jsonMessage).build());
8288
inboundMessageCount.incrementAndGet();
@@ -158,6 +164,27 @@ void testBuilderPattern() {
158164
assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException();
159165
}
160166

167+
@Test
168+
void testCommentSseMessage() {
169+
// If the line starts with a character (:) are comment lins and should be ingored
170+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
171+
172+
CopyOnWriteArrayList<Throwable> droppedErrors = new CopyOnWriteArrayList<>();
173+
reactor.core.publisher.Hooks.onErrorDropped(droppedErrors::add);
174+
175+
try {
176+
// Simulate receiving the SSE comment line
177+
transport.simulateSseComment("sse comment");
178+
179+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
180+
181+
assertThat(droppedErrors).hasSize(0);
182+
}
183+
finally {
184+
reactor.core.publisher.Hooks.resetOnErrorDropped();
185+
}
186+
}
187+
161188
@Test
162189
void testMessageProcessing() {
163190
// Create a test message

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,8 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
366366
return Flux.just(message);
367367
}
368368
else {
369-
logger.error("Received unrecognized SSE event type: {}",
370-
responseEvent.sseEvent().event());
371-
sink.error(new McpError(
372-
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
369+
logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent());
370+
sink.success();
373371
}
374372
}
375373
catch (IOException e) {

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,10 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
264264
"Error parsing JSON-RPC message: " + responseEvent.sseEvent().data()));
265265
}
266266
}
267+
else {
268+
logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
269+
return Flux.empty();
270+
}
267271
}
268272
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
269273
logger.debug("The server does not support SSE streams, using request-response mode.");
@@ -342,9 +346,9 @@ public String toString(McpSchema.JSONRPCMessage message) {
342346
}
343347
}
344348

345-
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
349+
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sentMessage) {
346350
return Mono.create(messageSink -> {
347-
logger.debug("Sending message {}", sendMessage);
351+
logger.debug("Sending message {}", sentMessage);
348352

349353
final AtomicReference<Disposable> disposableRef = new AtomicReference<>();
350354
final McpTransportSession<Disposable> transportSession = this.activeSession.get();
@@ -355,10 +359,10 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage sendMessage) {
355359
requestBuilder = requestBuilder.header("mcp-session-id", transportSession.sessionId().get());
356360
}
357361

358-
String jsonBody = this.toString(sendMessage);
362+
String jsonBody = this.toString(sentMessage);
359363

360364
HttpRequest request = requestBuilder.uri(Utils.resolveUri(this.baseUri, this.endpoint))
361-
.header("Accept", TEXT_EVENT_STREAM + ", " + APPLICATION_JSON)
365+
.header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM)
362366
.header("Content-Type", APPLICATION_JSON)
363367
.header("Cache-Control", "no-cache")
364368
.POST(HttpRequest.BodyPublishers.ofString(jsonBody))
@@ -436,10 +440,16 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) {
436440
else if (contentType.contains(APPLICATION_JSON)) {
437441
messageSink.success();
438442
String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data();
443+
if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) {
444+
logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data);
445+
return Mono.empty();
446+
}
447+
439448
try {
440449
return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data));
441450
}
442451
catch (IOException e) {
452+
// TODO: this should be a McpTransportError
443453
return Mono.error(e);
444454
}
445455
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.reactivestreams.FlowAdapters;
1313
import org.reactivestreams.Subscription;
1414

15+
import io.modelcontextprotocol.spec.McpError;
1516
import reactor.core.publisher.BaseSubscriber;
1617
import reactor.core.publisher.FluxSink;
1718

@@ -135,6 +136,7 @@ protected void hookOnSubscribe(Subscription subscription) {
135136

136137
@Override
137138
protected void hookOnNext(String line) {
139+
138140
if (line.isEmpty()) {
139141
// Empty line means end of event
140142
if (this.eventBuilder.length() > 0) {
@@ -164,6 +166,13 @@ else if (line.startsWith("event:")) {
164166
this.currentEventType.set(matcher.group(1).trim());
165167
}
166168
}
169+
else {
170+
// If the response is not successful, emit an error
171+
// TODO: This should be a McpTransportError
172+
this.sink.error(new McpError(
173+
"Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line));
174+
175+
}
167176
}
168177
}
169178

mcp/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerTransportProvider.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,27 @@ public interface McpStreamableServerTransportProvider extends McpServerTransport
4141
*/
4242
void setSessionFactory(McpStreamableServerSession.Factory sessionFactory);
4343

44+
/**
45+
* Sends a notification to all connected clients.
46+
* @param method the name of the notification method to be called on the clients
47+
* @param params parameters to be sent with the notification
48+
* @return a Mono that completes when the notification has been broadcast
49+
*/
50+
Mono<Void> notifyClients(String method, Object params);
51+
52+
/**
53+
* Immediately closes all the transports with connected clients and releases any
54+
* associated resources.
55+
*/
56+
default void close() {
57+
this.closeGracefully().subscribe();
58+
}
59+
60+
/**
61+
* Gracefully closes all the transports with connected clients and releases any
62+
* associated resources asynchronously.
63+
* @return a {@link Mono} that completes when the connections have been closed.
64+
*/
65+
Mono<Void> closeGracefully();
66+
4467
}

mcp/src/main/java/io/modelcontextprotocol/util/Utils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public static boolean isEmpty(@Nullable Map<?, ?> map) {
6969
* base URL or URI is malformed
7070
*/
7171
public static URI resolveUri(URI baseUrl, String endpointUrl) {
72+
if (!Utils.hasText(endpointUrl)) {
73+
return baseUrl;
74+
}
7275
URI endpointUri = URI.create(endpointUrl);
7376
if (endpointUri.isAbsolute() && !isUnderBaseUri(baseUrl, endpointUri)) {
7477
throw new IllegalArgumentException("Absolute endpoint URL does not match the base URL.");

mcp/src/test/java/io/modelcontextprotocol/server/AbstractMcpAsyncServerTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ void tearDown() {
6464
// ---------------------------------------
6565
// Server Lifecycle Tests
6666
// ---------------------------------------
67-
6867
void testConstructorWithInvalidArguments() {
6968
assertThatThrownBy(() -> McpServer.async((McpServerTransportProvider) null))
7069
.isInstanceOf(IllegalArgumentException.class)

0 commit comments

Comments
 (0)