From 7d1ce5257afabc13be3629c4307c927d0dbc0ab7 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Fri, 18 Jul 2025 14:43:35 +0200 Subject: [PATCH 1/5] fix: improve compatibility with non-compliant MCP servers (#413) Addresses issues with servers like Shopify that violate MCP/HTTP specs: - Prioritize application/json in Accept headers to fix content-type issues - Handle empty notification bodies ({}) that should be bodiless per spec - Add status code validation and null safety improvements Resolves #406 Signed-off-by: Christian Tzolov --- .../WebClientStreamableHttpTransport.java | 14 +++-- .../HttpClientStreamableHttpTransport.java | 18 ++++-- .../client/transport/ResponseSubscribers.java | 58 +++++++++++-------- .../io/modelcontextprotocol/util/Utils.java | 3 + 4 files changed, 60 insertions(+), 33 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 53b59cb30..a37dde81a 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -30,6 +30,7 @@ import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException; import io.modelcontextprotocol.spec.McpTransportStream; import io.modelcontextprotocol.util.Assert; +import io.modelcontextprotocol.util.Utils; import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -244,7 +245,7 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { Disposable connection = webClient.post() .uri(this.endpoint) - .accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON) + .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM) .headers(httpHeaders -> { transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id)); }) @@ -387,9 +388,14 @@ private static String sessionIdOrPlaceholder(McpTransportSession transportSes private Flux responseFlux(ClientResponse response) { return response.bodyToMono(String.class).>handle((responseMessage, s) -> { try { - McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, - responseMessage); - s.next(List.of(jsonRpcResponse)); + if (Utils.hasText(responseMessage) && !responseMessage.trim().equals("{}")) { + McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, + responseMessage); + s.next(List.of(jsonRpcResponse)); + } + else { + logger.warn("Received empty response message: {}", responseMessage); + } } catch (IOException e) { s.error(e); diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 4cf1690ff..b9a51f72d 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -358,7 +358,7 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sendMessage) { String jsonBody = this.toString(sendMessage); HttpRequest request = requestBuilder.uri(Utils.resolveUri(this.baseUri, this.endpoint)) - .header("Accept", TEXT_EVENT_STREAM + ", " + APPLICATION_JSON) + .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) .header("Content-Type", APPLICATION_JSON) .header("Cache-Control", "no-cache") .POST(HttpRequest.BodyPublishers.ofString(jsonBody)) @@ -436,11 +436,19 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) { else if (contentType.contains(APPLICATION_JSON)) { messageSink.success(); String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); - try { - return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); + if (Utils.hasText(data) && !data.trim().equals("{}")) { + + try { + return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); + } + catch (IOException e) { + return Mono.error(e); + } } - catch (IOException e) { - return Mono.error(e); + else { + // No content type means no response body + logger.debug("No content type returned for POST in session {}", sessionRepresentation); + return Mono.empty(); } } logger.warn("Unknown media type {} returned for POST in session {}", contentType, diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 26b0d13bd..662572163 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -135,36 +135,46 @@ protected void hookOnSubscribe(Subscription subscription) { @Override protected void hookOnNext(String line) { - if (line.isEmpty()) { - // Empty line means end of event - if (this.eventBuilder.length() > 0) { - String eventData = this.eventBuilder.toString(); - SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim()); - - this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); - this.eventBuilder.setLength(0); - } - } - else { - if (line.startsWith("data:")) { - var matcher = EVENT_DATA_PATTERN.matcher(line); - if (matcher.find()) { - this.eventBuilder.append(matcher.group(1).trim()).append("\n"); + if (this.responseInfo.statusCode() >= 200 && this.responseInfo.statusCode() < 300) { + + if (line.isEmpty()) { + // Empty line means end of event + if (this.eventBuilder.length() > 0) { + String eventData = this.eventBuilder.toString(); + SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), + eventData.trim()); + + this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); + this.eventBuilder.setLength(0); } } - else if (line.startsWith("id:")) { - var matcher = EVENT_ID_PATTERN.matcher(line); - if (matcher.find()) { - this.currentEventId.set(matcher.group(1).trim()); + else { + if (line.startsWith("data:")) { + var matcher = EVENT_DATA_PATTERN.matcher(line); + if (matcher.find()) { + this.eventBuilder.append(matcher.group(1).trim()).append("\n"); + } } - } - else if (line.startsWith("event:")) { - var matcher = EVENT_TYPE_PATTERN.matcher(line); - if (matcher.find()) { - this.currentEventType.set(matcher.group(1).trim()); + else if (line.startsWith("id:")) { + var matcher = EVENT_ID_PATTERN.matcher(line); + if (matcher.find()) { + this.currentEventId.set(matcher.group(1).trim()); + } + } + else if (line.startsWith("event:")) { + var matcher = EVENT_TYPE_PATTERN.matcher(line); + if (matcher.find()) { + this.currentEventType.set(matcher.group(1).trim()); + } } } } + else { + // If the response is not successful, emit an error + System.out.println("Received non-successful response: " + this.responseInfo.statusCode()); + SseEvent sseEvent = new SseEvent(null, null, null); + this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); + } } @Override diff --git a/mcp/src/main/java/io/modelcontextprotocol/util/Utils.java b/mcp/src/main/java/io/modelcontextprotocol/util/Utils.java index 8e654e596..039b0d68e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/util/Utils.java +++ b/mcp/src/main/java/io/modelcontextprotocol/util/Utils.java @@ -69,6 +69,9 @@ public static boolean isEmpty(@Nullable Map map) { * base URL or URI is malformed */ public static URI resolveUri(URI baseUrl, String endpointUrl) { + if (!Utils.hasText(endpointUrl)) { + return baseUrl; + } URI endpointUri = URI.create(endpointUrl); if (endpointUri.isAbsolute() && !isUnderBaseUri(baseUrl, endpointUri)) { throw new IllegalArgumentException("Absolute endpoint URL does not match the base URL."); From 2d053085c83a34bfc7325405a4abebcc9e8ab6f0 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Fri, 18 Jul 2025 22:08:44 +0200 Subject: [PATCH 2/5] addres review comments Signed-off-by: Christian Tzolov --- .../client/transport/ResponseSubscribers.java | 62 +++++++++---------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 662572163..489f7efd2 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -12,6 +12,7 @@ import org.reactivestreams.FlowAdapters; import org.reactivestreams.Subscription; +import io.modelcontextprotocol.spec.McpError; import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.FluxSink; @@ -135,45 +136,42 @@ protected void hookOnSubscribe(Subscription subscription) { @Override protected void hookOnNext(String line) { - if (this.responseInfo.statusCode() >= 200 && this.responseInfo.statusCode() < 300) { - if (line.isEmpty()) { - // Empty line means end of event - if (this.eventBuilder.length() > 0) { - String eventData = this.eventBuilder.toString(); - SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), - eventData.trim()); + if (line.isEmpty()) { + // Empty line means end of event + if (this.eventBuilder.length() > 0) { + String eventData = this.eventBuilder.toString(); + SseEvent sseEvent = new SseEvent(currentEventId.get(), currentEventType.get(), eventData.trim()); - this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); - this.eventBuilder.setLength(0); - } + this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); + this.eventBuilder.setLength(0); } - else { - if (line.startsWith("data:")) { - var matcher = EVENT_DATA_PATTERN.matcher(line); - if (matcher.find()) { - this.eventBuilder.append(matcher.group(1).trim()).append("\n"); - } + } + else { + if (line.startsWith("data:")) { + var matcher = EVENT_DATA_PATTERN.matcher(line); + if (matcher.find()) { + this.eventBuilder.append(matcher.group(1).trim()).append("\n"); } - else if (line.startsWith("id:")) { - var matcher = EVENT_ID_PATTERN.matcher(line); - if (matcher.find()) { - this.currentEventId.set(matcher.group(1).trim()); - } + } + else if (line.startsWith("id:")) { + var matcher = EVENT_ID_PATTERN.matcher(line); + if (matcher.find()) { + this.currentEventId.set(matcher.group(1).trim()); } - else if (line.startsWith("event:")) { - var matcher = EVENT_TYPE_PATTERN.matcher(line); - if (matcher.find()) { - this.currentEventType.set(matcher.group(1).trim()); - } + } + else if (line.startsWith("event:")) { + var matcher = EVENT_TYPE_PATTERN.matcher(line); + if (matcher.find()) { + this.currentEventType.set(matcher.group(1).trim()); } } - } - else { - // If the response is not successful, emit an error - System.out.println("Received non-successful response: " + this.responseInfo.statusCode()); - SseEvent sseEvent = new SseEvent(null, null, null); - this.sink.next(new SseResponseEvent(responseInfo, sseEvent)); + else { + // If the response is not successful, emit an error + this.sink.error(new McpError( + "Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line)); + + } } } From 521c2cc6e72b828868285a570738bccd4f21dbd4 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Tue, 22 Jul 2025 10:22:06 +0200 Subject: [PATCH 3/5] Address review comments Signed-off-by: Christian Tzolov --- .../WebClientStreamableHttpTransport.java | 13 +++++++------ .../HttpClientStreamableHttpTransport.java | 19 ++++++++----------- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index a37dde81a..0f797fd71 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -288,7 +288,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { logger.trace("Received response to POST for session {}", sessionRepresentation); // communicate to caller the message was delivered sink.success(); - return responseFlux(response); + return statelessResponseFlux(message, response); } else { logger.warn("Unknown media type {} returned for POST in session {}", contentType, @@ -385,17 +385,18 @@ private static String sessionIdOrPlaceholder(McpTransportSession transportSes return transportSession.sessionId().orElse("[missing_session_id]"); } - private Flux responseFlux(ClientResponse response) { + private Flux statelessResponseFlux(McpSchema.JSONRPCMessage sendMessage, + ClientResponse response) { return response.bodyToMono(String.class).>handle((responseMessage, s) -> { try { - if (Utils.hasText(responseMessage) && !responseMessage.trim().equals("{}")) { + if (sendMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { + logger.warn("Notificaiton: {} received non-compliant response: {}", sendMessage, responseMessage); + } + else { McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, responseMessage); s.next(List.of(jsonRpcResponse)); } - else { - logger.warn("Received empty response message: {}", responseMessage); - } } catch (IOException e) { s.error(e); diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index b9a51f72d..104d4898a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -436,19 +436,16 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) { else if (contentType.contains(APPLICATION_JSON)) { messageSink.success(); String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); - if (Utils.hasText(data) && !data.trim().equals("{}")) { + if (sendMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { + logger.warn("Notificaiton: {} received non-compliant response: {}", sendMessage, data); + return Mono.empty(); + } - try { - return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); - } - catch (IOException e) { - return Mono.error(e); - } + try { + return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); } - else { - // No content type means no response body - logger.debug("No content type returned for POST in session {}", sessionRepresentation); - return Mono.empty(); + catch (IOException e) { + return Mono.error(e); } } logger.warn("Unknown media type {} returned for POST in session {}", contentType, From 18e10f0eebc7c9186cfb194ec48f80c3c8d5b633 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Tue, 22 Jul 2025 10:42:10 +0200 Subject: [PATCH 4/5] address review Signed-off-by: Christian Tzolov --- .../transport/WebClientStreamableHttpTransport.java | 8 ++++---- .../transport/HttpClientStreamableHttpTransport.java | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 0f797fd71..99b0416ca 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -288,7 +288,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) { logger.trace("Received response to POST for session {}", sessionRepresentation); // communicate to caller the message was delivered sink.success(); - return statelessResponseFlux(message, response); + return directResponseFlux(message, response); } else { logger.warn("Unknown media type {} returned for POST in session {}", contentType, @@ -385,12 +385,12 @@ private static String sessionIdOrPlaceholder(McpTransportSession transportSes return transportSession.sessionId().orElse("[missing_session_id]"); } - private Flux statelessResponseFlux(McpSchema.JSONRPCMessage sendMessage, + private Flux directResponseFlux(McpSchema.JSONRPCMessage sentMessage, ClientResponse response) { return response.bodyToMono(String.class).>handle((responseMessage, s) -> { try { - if (sendMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { - logger.warn("Notificaiton: {} received non-compliant response: {}", sendMessage, responseMessage); + if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { + logger.warn("Notificaiton: {} received non-compliant response: {}", sentMessage, responseMessage); } else { McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index 104d4898a..a6dae039e 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -342,9 +342,9 @@ public String toString(McpSchema.JSONRPCMessage message) { } } - public Mono sendMessage(McpSchema.JSONRPCMessage sendMessage) { + public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { return Mono.create(messageSink -> { - logger.debug("Sending message {}", sendMessage); + logger.debug("Sending message {}", sentMessage); final AtomicReference disposableRef = new AtomicReference<>(); final McpTransportSession transportSession = this.activeSession.get(); @@ -355,7 +355,7 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sendMessage) { requestBuilder = requestBuilder.header("mcp-session-id", transportSession.sessionId().get()); } - String jsonBody = this.toString(sendMessage); + String jsonBody = this.toString(sentMessage); HttpRequest request = requestBuilder.uri(Utils.resolveUri(this.baseUri, this.endpoint)) .header("Accept", APPLICATION_JSON + ", " + TEXT_EVENT_STREAM) @@ -436,8 +436,8 @@ else if (contentType.contains(TEXT_EVENT_STREAM)) { else if (contentType.contains(APPLICATION_JSON)) { messageSink.success(); String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); - if (sendMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { - logger.warn("Notificaiton: {} received non-compliant response: {}", sendMessage, data); + if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { + logger.warn("Notificaiton: {} received non-compliant response: {}", sentMessage, data); return Mono.empty(); } From babb566c7d69a922df571059dd933dd2a5d99330 Mon Sep 17 00:00:00 2001 From: Christian Tzolov Date: Tue, 22 Jul 2025 11:04:14 +0200 Subject: [PATCH 5/5] another round Signed-off-by: Christian Tzolov --- .../client/transport/WebClientStreamableHttpTransport.java | 4 +++- .../client/transport/HttpClientStreamableHttpTransport.java | 3 ++- .../client/transport/ResponseSubscribers.java | 1 + 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 99b0416ca..d5ac8e95c 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -390,7 +390,8 @@ private Flux directResponseFlux(McpSchema.JSONRPCMessa return response.bodyToMono(String.class).>handle((responseMessage, s) -> { try { if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) { - logger.warn("Notificaiton: {} received non-compliant response: {}", sentMessage, responseMessage); + logger.warn("Notification: {} received non-compliant response: {}", sentMessage, responseMessage); + s.complete(); } else { McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper, @@ -399,6 +400,7 @@ private Flux directResponseFlux(McpSchema.JSONRPCMessa } } catch (IOException e) { + // TODO: this should be a McpTransportError s.error(e); } }).flatMapIterable(Function.identity()); diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index a6dae039e..12baa1706 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -437,7 +437,7 @@ else if (contentType.contains(APPLICATION_JSON)) { messageSink.success(); String data = ((ResponseSubscribers.AggregateResponseEvent) responseEvent).data(); if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(data)) { - logger.warn("Notificaiton: {} received non-compliant response: {}", sentMessage, data); + logger.warn("Notification: {} received non-compliant response: {}", sentMessage, data); return Mono.empty(); } @@ -445,6 +445,7 @@ else if (contentType.contains(APPLICATION_JSON)) { return Mono.just(McpSchema.deserializeJsonRpcMessage(objectMapper, data)); } catch (IOException e) { + // TODO: this should be a McpTransportError return Mono.error(e); } } diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java index 489f7efd2..eb9d3c65c 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/ResponseSubscribers.java @@ -168,6 +168,7 @@ else if (line.startsWith("event:")) { } else { // If the response is not successful, emit an error + // TODO: This should be a McpTransportError this.sink.error(new McpError( "Invalid SSE response. Status code: " + this.responseInfo.statusCode() + " Line: " + line));