From da8b750de25b51138c5c0a77baac3c6ddc8faad8 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Thu, 24 Jul 2025 15:48:01 +0200 Subject: [PATCH 1/5] fix: improve SSE event handling to gracefully ignore unrecognized events - Replace McpError exceptions with debug/warning logs for unrecognized SSE event types - Continue processing instead of failing when encountering unknown SSE events - Update transport implementations: - WebClientStreamableHttpTransport: return empty tuple instead of throwing - WebFluxSseClientTransport: complete stream instead of erroring - HttpClientSseClientTransport: call sink.success() instead of sink.error() - HttpClientStreamableHttpTransport: return empty Flux for unknown events This improves client resilience when servers send non-standard or future SSE event types. Resolves #272 , #223 , #93 Signed-off-by: Christian Tzolov --- .../WebClientStreamableHttpTransport.java | 3 +- .../transport/WebFluxSseClientTransport.java | 3 +- .../WebFluxSseClientTransportTests.java | 35 ++++++++++++++++--- .../HttpClientSseClientTransport.java | 6 ++-- .../HttpClientStreamableHttpTransport.java | 4 +++ 5 files changed, 41 insertions(+), 10 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index d5ac8e95c..abe6d33eb 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -432,7 +432,8 @@ private Tuple2, Iterable> parse(Serve } } else { - throw new McpError("Received unrecognized SSE event type: " + event.event()); + logger.debug("Received SSE event with type: {}", event); + return Tuples.of(Optional.empty(), List.of()); } } diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java index 128cda4c3..1736c26ac 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java @@ -216,7 +216,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) { } } else { - s.error(new McpError("Received unrecognized SSE event type: " + event.event())); + logger.warn("Received unrecognized SSE event type: {}", event); + s.complete(); // Ignore unrecognized events } }).transform(handler)).subscribe(); diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java index 42b91d14e..dbf06c259 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java @@ -6,6 +6,7 @@ import java.time.Duration; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; @@ -77,6 +78,11 @@ public int getInboundMessageCount() { return inboundMessageCount.get(); } + public void simulateSseComment(String comment) { + events.tryEmitNext(ServerSentEvent.builder().comment(comment).build()); + inboundMessageCount.incrementAndGet(); + } + public void simulateEndpointEvent(String jsonMessage) { events.tryEmitNext(ServerSentEvent.builder().event("endpoint").data(jsonMessage).build()); inboundMessageCount.incrementAndGet(); @@ -158,6 +164,27 @@ void testBuilderPattern() { assertThatCode(() -> transport4.closeGracefully().block()).doesNotThrowAnyException(); } + @Test + void testCommentSseMessage() { + // If the line starts with a character (:) are comment lins and should be ingored + // https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation + + CopyOnWriteArrayList droppedErrors = new CopyOnWriteArrayList<>(); + reactor.core.publisher.Hooks.onErrorDropped(droppedErrors::add); + + try { + // Simulate receiving the SSE comment line + transport.simulateSseComment("sse comment"); + + StepVerifier.create(transport.closeGracefully()).verifyComplete(); + + assertThat(droppedErrors).hasSize(0); + } + finally { + reactor.core.publisher.Hooks.resetOnErrorDropped(); + } + } + @Test void testMessageProcessing() { // Create a test message @@ -167,10 +194,10 @@ void testMessageProcessing() { // Simulate receiving the message transport.simulateMessageEvent(""" { - "jsonrpc": "2.0", - "method": "test-method", - "id": "test-id", - "params": {"key": "value"} + "jsonrpc": "2.0", + "method": "test-method", + "id": "test-id", + "params": {"key": "value"} } """); diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java index 8598e3164..b610ad93a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java @@ -366,10 +366,8 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) { return Flux.just(message); } else { - logger.error("Received unrecognized SSE event type: {}", - responseEvent.sseEvent().event()); - sink.error(new McpError( - "Received unrecognized SSE event type: " + responseEvent.sseEvent().event())); + logger.debug("Received unrecognized SSE event type: {}", responseEvent.sseEvent()); + sink.success(); } } catch (IOException e) { diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 12baa1706..d8dd97f1e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -264,6 +264,10 @@ private Mono reconnect(McpTransportStream stream) { "Error parsing JSON-RPC message: " + responseEvent.sseEvent().data())); } } + else { + logger.debug("Received SSE event with type: {}", responseEvent.sseEvent()); + return Flux.empty(); + } } else if (statusCode == METHOD_NOT_ALLOWED) { // NotAllowed logger.debug("The server does not support SSE streams, using request-response mode."); From 2a44477137001cc72d99ee248042f01ecd659858 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Thu, 24 Jul 2025 16:29:16 +0200 Subject: [PATCH 2/5] change log level to debug Signed-off-by: Christian Tzolov --- .../client/transport/WebFluxSseClientTransport.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java index 1736c26ac..59385b54a 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransport.java @@ -216,8 +216,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) { } } else { - logger.warn("Received unrecognized SSE event type: {}", event); - s.complete(); // Ignore unrecognized events + logger.debug("Received unrecognized SSE event type: {}", event); + s.complete(); } }).transform(handler)).subscribe(); From ee1b901b4f1614a335b2456f1f62c55b7e406802 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Thu, 24 Jul 2025 16:30:59 +0200 Subject: [PATCH 3/5] minor code formatting --- .../client/transport/WebFluxSseClientTransportTests.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java index dbf06c259..1f376ec01 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java @@ -194,11 +194,10 @@ void testMessageProcessing() { // Simulate receiving the message transport.simulateMessageEvent(""" { - "jsonrpc": "2.0", - "method": "test-method", - "id": "test-id", - "params": {"key": "value"} - } + "jsonrpc": "2.0", + "method": "test-method", + "id": "test-id", + "params": {"key": "value"} } """); // Subscribe to messages and verify From 44787d4b9085a24cf4288750f4281624cdbe2128 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Thu, 24 Jul 2025 16:31:32 +0200 Subject: [PATCH 4/5] minor code formatting --- .../client/transport/WebFluxSseClientTransportTests.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java index 1f376ec01..3805507c5 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java @@ -197,7 +197,8 @@ void testMessageProcessing() { "jsonrpc": "2.0", "method": "test-method", "id": "test-id", - "params": {"key": "value"} } + "params": {"key": "value"} + } """); // Subscribe to messages and verify From c0c56a19f7314084a480a9fa290bbd2678b36345 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Thu, 24 Jul 2025 16:31:56 +0200 Subject: [PATCH 5/5] minor code formatting --- .../client/transport/WebFluxSseClientTransportTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java index 3805507c5..1cf5dffe2 100644 --- a/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java +++ b/mcp-spring/mcp-spring-webflux/src/test/java/io/modelcontextprotocol/client/transport/WebFluxSseClientTransportTests.java @@ -197,7 +197,7 @@ void testMessageProcessing() { "jsonrpc": "2.0", "method": "test-method", "id": "test-id", - "params": {"key": "value"} + "params": {"key": "value"} } """);