Skip to content

V0.11.0 sproutsocial rc1 #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
sproutMultiModuleBuild {
moduleToBuild = 'mcp-parent'
nodeLabel = 'ephemeral'
tribes = ['global']
notifySlackGroupsOnFailure = ['@kevin']
jdk = 17
deployRCBranches = true
}
12 changes: 6 additions & 6 deletions mcp-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.11.0-sproutsocial-rc1</version>
</parent>

<artifactId>mcp-bom</artifactId>
Expand All @@ -28,28 +28,28 @@
<dependencies>
<!-- Core MCP -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp</artifactId>
<version>${project.version}</version>
</dependency>

<!-- MCP Test -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-test</artifactId>
<version>${project.version}</version>
</dependency>

<!-- MCP Transport - WebFlux SSE -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-spring-webflux</artifactId>
<version>${project.version}</version>
</dependency>

<!-- MCP Transport - WebMVC SSE -->
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-spring-webmvc</artifactId>
<version>${project.version}</version>
</dependency>
Expand Down
12 changes: 6 additions & 6 deletions mcp-spring/mcp-spring-webflux/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-parent</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.11.0-sproutsocial-rc1</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>mcp-spring-webflux</artifactId>
Expand All @@ -23,15 +23,15 @@

<dependencies>
<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.11.0-sproutsocial-rc1</version>
</dependency>

<dependency>
<groupId>io.modelcontextprotocol.sdk</groupId>
<groupId>com.sproutsocial.io.modelcontextprotocol.sdk</groupId>
<artifactId>mcp-test</artifactId>
<version>0.11.0-SNAPSHOT</version>
<version>0.11.0-sproutsocial-rc1</version>
<scope>test</scope>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@

import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpTransportSession;
import io.modelcontextprotocol.spec.McpTransportSessionNotFoundException;
import io.modelcontextprotocol.spec.McpTransportStream;
import io.modelcontextprotocol.util.Assert;
import io.modelcontextprotocol.util.Utils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -65,6 +67,8 @@ public class WebClientStreamableHttpTransport implements McpClientTransport {

private static final Logger logger = LoggerFactory.getLogger(WebClientStreamableHttpTransport.class);

private static final String MCP_PROTOCOL_VERSION = "2025-03-26";

private static final String DEFAULT_ENDPOINT = "/mcp";

/**
Expand Down Expand Up @@ -102,6 +106,11 @@ private WebClientStreamableHttpTransport(ObjectMapper objectMapper, WebClient.Bu
this.activeSession.set(createTransportSession());
}

@Override
public String protocolVersion() {
return MCP_PROTOCOL_VERSION;
}

/**
* Create a stateful builder for creating {@link WebClientStreamableHttpTransport}
* instances.
Expand All @@ -127,12 +136,20 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem

private DefaultMcpTransportSession createTransportSession() {
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
: webClient.delete().uri(this.endpoint).headers(httpHeaders -> {
httpHeaders.add("mcp-session-id", sessionId);
}).retrieve().toBodilessEntity().onErrorComplete(e -> {
logger.warn("Got error when closing transport", e);
return true;
}).then();
: webClient.delete()
.uri(this.endpoint)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.headers(httpHeaders -> {
httpHeaders.add(HttpHeaders.MCP_SESSION_ID, sessionId);
httpHeaders.add(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION);
})
.retrieve()
.toBodilessEntity()
.onErrorComplete(e -> {
logger.warn("Got error when closing transport", e);
return true;
})
.then();
return new DefaultMcpTransportSession(onClose);
}

Expand Down Expand Up @@ -185,10 +202,11 @@ private Mono<Disposable> reconnect(McpTransportStream<Disposable> stream) {
Disposable connection = webClient.get()
.uri(this.endpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.headers(httpHeaders -> {
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
if (stream != null) {
stream.lastId().ifPresent(id -> httpHeaders.add("last-event-id", id));
stream.lastId().ifPresent(id -> httpHeaders.add(HttpHeaders.LAST_EVENT_ID, id));
}
})
.exchangeToFlux(response -> {
Expand Down Expand Up @@ -244,14 +262,15 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {

Disposable connection = webClient.post()
.uri(this.endpoint)
.accept(MediaType.TEXT_EVENT_STREAM, MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON, MediaType.TEXT_EVENT_STREAM)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.headers(httpHeaders -> {
transportSession.sessionId().ifPresent(id -> httpHeaders.add("mcp-session-id", id));
transportSession.sessionId().ifPresent(id -> httpHeaders.add(HttpHeaders.MCP_SESSION_ID, id));
})
.bodyValue(message)
.exchangeToFlux(response -> {
if (transportSession
.markInitialized(response.headers().asHttpHeaders().getFirst("mcp-session-id"))) {
.markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) {
// Once we have a session, we try to open an async stream for
// the server to send notifications and requests out-of-band.
reconnect(null).contextWrite(sink.contextView()).subscribe();
Expand Down Expand Up @@ -287,7 +306,7 @@ else if (mediaType.isCompatibleWith(MediaType.APPLICATION_JSON)) {
logger.trace("Received response to POST for session {}", sessionRepresentation);
// communicate to caller the message was delivered
sink.success();
return responseFlux(response);
return directResponseFlux(message, response);
}
else {
logger.warn("Unknown media type {} returned for POST in session {}", contentType,
Expand Down Expand Up @@ -340,7 +359,8 @@ private Flux<McpSchema.JSONRPCMessage> extractError(ClientResponse response, Str
McpSchema.JSONRPCResponse jsonRpcResponse = objectMapper.readValue(body,
McpSchema.JSONRPCResponse.class);
jsonRpcError = jsonRpcResponse.error();
toPropagate = new McpError(jsonRpcError);
toPropagate = jsonRpcError != null ? new McpError(jsonRpcError)
: new McpError("Can't parse the jsonResponse " + jsonRpcResponse);
}
catch (IOException ex) {
toPropagate = new RuntimeException("Sending request failed", e);
Expand Down Expand Up @@ -384,14 +404,22 @@ private static String sessionIdOrPlaceholder(McpTransportSession<?> transportSes
return transportSession.sessionId().orElse("[missing_session_id]");
}

private Flux<McpSchema.JSONRPCMessage> responseFlux(ClientResponse response) {
private Flux<McpSchema.JSONRPCMessage> directResponseFlux(McpSchema.JSONRPCMessage sentMessage,
ClientResponse response) {
return response.bodyToMono(String.class).<Iterable<McpSchema.JSONRPCMessage>>handle((responseMessage, s) -> {
try {
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseMessage);
s.next(List.of(jsonRpcResponse));
if (sentMessage instanceof McpSchema.JSONRPCNotification && Utils.hasText(responseMessage)) {
logger.warn("Notification: {} received non-compliant response: {}", sentMessage, responseMessage);
s.complete();
}
else {
McpSchema.JSONRPCMessage jsonRpcResponse = McpSchema.deserializeJsonRpcMessage(objectMapper,
responseMessage);
s.next(List.of(jsonRpcResponse));
}
}
catch (IOException e) {
// TODO: this should be a McpTransportError
s.error(e);
}
}).flatMapIterable(Function.identity());
Expand Down Expand Up @@ -423,7 +451,8 @@ private Tuple2<Optional<String>, Iterable<McpSchema.JSONRPCMessage>> parse(Serve
}
}
else {
throw new McpError("Received unrecognized SSE event type: " + event.event());
logger.debug("Received SSE event with type: {}", event);
return Tuples.of(Optional.empty(), List.of());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.modelcontextprotocol.spec.HttpHeaders;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpError;
import io.modelcontextprotocol.spec.McpSchema;
Expand Down Expand Up @@ -62,6 +64,8 @@ public class WebFluxSseClientTransport implements McpClientTransport {

private static final Logger logger = LoggerFactory.getLogger(WebFluxSseClientTransport.class);

private static final String MCP_PROTOCOL_VERSION = "2024-11-05";

/**
* Event type for JSON-RPC messages received through the SSE connection. The server
* sends messages with this event type to transmit JSON-RPC protocol data.
Expand Down Expand Up @@ -166,6 +170,11 @@ public WebFluxSseClientTransport(WebClient.Builder webClientBuilder, ObjectMappe
this.sseEndpoint = sseEndpoint;
}

@Override
public String protocolVersion() {
return MCP_PROTOCOL_VERSION;
}

/**
* Establishes a connection to the MCP server using Server-Sent Events (SSE). This
* method initiates the SSE connection and sets up the message processing pipeline.
Expand Down Expand Up @@ -216,7 +225,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
}
}
else {
s.error(new McpError("Received unrecognized SSE event type: " + event.event()));
logger.debug("Received unrecognized SSE event type: {}", event);
s.complete();
}
}).transform(handler)).subscribe();

Expand Down Expand Up @@ -249,6 +259,7 @@ public Mono<Void> sendMessage(JSONRPCMessage message) {
return webClient.post()
.uri(messageEndpointUri)
.contentType(MediaType.APPLICATION_JSON)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.bodyValue(jsonText)
.retrieve()
.toBodilessEntity()
Expand Down Expand Up @@ -281,6 +292,7 @@ protected Flux<ServerSentEvent<String>> eventStream() {// @formatter:off
.get()
.uri(this.sseEndpoint)
.accept(MediaType.TEXT_EVENT_STREAM)
.header(HttpHeaders.PROTOCOL_VERSION, MCP_PROTOCOL_VERSION)
.retrieve()
.bodyToFlux(SSE_TYPE)
.retryWhen(Retry.from(retrySignal -> retrySignal.handle(inboundRetryHandler)));
Expand Down
Loading