Skip to content

Commit c3a0b18

Browse files
committed
fix: improve SSE event handling to gracefully ignore unrecognized events (#423)
- Replace McpError exceptions with debug/warning logs for unrecognized SSE event types - Continue processing instead of failing when encountering unknown SSE events - Update transport implementations: - WebClientStreamableHttpTransport: return empty tuple instead of throwing - WebFluxSseClientTransport: complete stream instead of erroring - HttpClientSseClientTransport: call sink.success() instead of sink.error() - HttpClientStreamableHttpTransport: return empty Flux for unknown events This improves client resilience when servers send non-standard or future SSE event types. Resolves #272 , #223 , #93, #421 Signed-off-by: Christian Tzolov <[email protected]>
1 parent 4937fc1 commit c3a0b18

File tree

5 files changed

+37
-6
lines changed

5 files changed

+37
-6
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,8 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
432432
}
433433
}
434434
else {
435-
throw new McpError("Received unrecognized SSE event type: " + event.event());
435+
logger.debug("Received SSE event with type: {}", event);
436+
return Tuples.of(Optional.empty(), List.of());
436437
}
437438
}
438439

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
216216
}
217217
}
218218
else {
219-
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
219+
logger.debug("Received unrecognized SSE event type: {}", event);
220+
s.complete();
220221
}
221222
}).transform(handler)).subscribe();
222223

mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import java.time.Duration;
88
import java.util.Map;
9+
import java.util.concurrent.CopyOnWriteArrayList;
910
import java.util.concurrent.atomic.AtomicInteger;
1011
import java.util.function.Function;
1112

@@ -77,6 +78,11 @@ public int getInboundMessageCount() {
7778
return inboundMessageCount.get();
7879
}
7980

81+
public void simulateSseComment(String comment) {
82+
events.tryEmitNext(ServerSentEvent.<String>builder().comment(comment).build());
83+
inboundMessageCount.incrementAndGet();
84+
}
85+
8086
public void simulateEndpointEvent(String jsonMessage) {
8187
events.tryEmitNext(ServerSentEvent.<String>builder().event("endpoint").data(jsonMessage).build());
8288
inboundMessageCount.incrementAndGet();
@@ -158,6 +164,27 @@ void testBuilderPattern() {
158164
assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException();
159165
}
160166

167+
@Test
168+
void testCommentSseMessage() {
169+
// If the line starts with a character (:) are comment lins and should be ingored
170+
// https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation
171+
172+
CopyOnWriteArrayList<Throwable> droppedErrors = new CopyOnWriteArrayList<>();
173+
reactor.core.publisher.Hooks.onErrorDropped(droppedErrors::add);
174+
175+
try {
176+
// Simulate receiving the SSE comment line
177+
transport.simulateSseComment("sse comment");
178+
179+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
180+
181+
assertThat(droppedErrors).hasSize(0);
182+
}
183+
finally {
184+
reactor.core.publisher.Hooks.resetOnErrorDropped();
185+
}
186+
}
187+
161188
@Test
162189
void testMessageProcessing() {
163190
// Create a test message

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,8 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
366366
return Flux.just(message);
367367
}
368368
else {
369-
logger.error("Received unrecognized SSE event type: {}",
370-
responseEvent.sseEvent().event());
371-
sink.error(new McpError(
372-
"Received unrecognized SSE event type: " + responseEvent.sseEvent().event()));
369+
logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent());
370+
sink.success();
373371
}
374372
}
375373
catch (IOException e) {

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,10 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
264264
"Error parsing JSON-RPC message: " + responseEvent.sseEvent().data()));
265265
}
266266
}
267+
else {
268+
logger.debug("Received SSE event with type: {}", responseEvent.sseEvent());
269+
return Flux.empty();
270+
}
267271
}
268272
else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed
269273
logger.debug("The server does not support SSE streams, using request-response mode.");

0 commit comments

Comments
 (0)