flatMap(body -> {
+ try {
+ McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+
+ if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
+ return this.mcpHandler.handleRequest(transportContext, jsonrpcRequest)
+ .flatMap(jsonrpcResponse -> ServerResponse.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .bodyValue(jsonrpcResponse));
+ }
+ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
+ return this.mcpHandler.handleNotification(transportContext, jsonrpcNotification)
+ .then(ServerResponse.accepted().build());
+ }
+ else {
+ return ServerResponse.badRequest()
+ .bodyValue(new McpError("The server accepts either requests or notifications"));
+ }
+ }
+ catch (IllegalArgumentException | IOException e) {
+ logger.error("Failed to deserialize message: {}", e.getMessage());
+ return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
+ }
+ }).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
+ }
+
+ /**
+ * Create a builder for the server.
+ * @return a fresh {@link Builder} instance.
+ */
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for creating instances of {@link WebFluxStatelessServerTransport}.
+ *
+ * This builder provides a fluent API for configuring and creating instances of
+ * WebFluxSseServerTransportProvider with custom settings.
+ */
+ public static class Builder {
+
+ private ObjectMapper objectMapper;
+
+ private String mcpEndpoint = "/mcp";
+
+ private McpTransportContextExtractor contextExtractor = (serverRequest, context) -> context;
+
+ private Builder() {
+ // used by a static method
+ }
+
+ /**
+ * Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
+ * messages.
+ * @param objectMapper The ObjectMapper instance. Must not be null.
+ * @return this builder instance
+ * @throws IllegalArgumentException if objectMapper is null
+ */
+ public Builder objectMapper(ObjectMapper objectMapper) {
+ Assert.notNull(objectMapper, "ObjectMapper must not be null");
+ this.objectMapper = objectMapper;
+ return this;
+ }
+
+ /**
+ * Sets the endpoint URI where clients should send their JSON-RPC messages.
+ * @param messageEndpoint The message endpoint URI. Must not be null.
+ * @return this builder instance
+ * @throws IllegalArgumentException if messageEndpoint is null
+ */
+ public Builder messageEndpoint(String messageEndpoint) {
+ Assert.notNull(messageEndpoint, "Message endpoint must not be null");
+ this.mcpEndpoint = messageEndpoint;
+ return this;
+ }
+
+ /**
+ * Sets the context extractor that allows providing the MCP feature
+ * implementations to inspect HTTP transport level metadata that was present at
+ * HTTP request processing time. This allows to extract custom headers and other
+ * useful data for use during execution later on in the process.
+ * @param contextExtractor The contextExtractor to fill in a
+ * {@link McpTransportContext}.
+ * @return this builder instance
+ * @throws IllegalArgumentException if contextExtractor is null
+ */
+ public Builder contextExtractor(McpTransportContextExtractor contextExtractor) {
+ Assert.notNull(contextExtractor, "Context extractor must not be null");
+ this.contextExtractor = contextExtractor;
+ return this;
+ }
+
+ /**
+ * Builds a new instance of {@link WebFluxStatelessServerTransport} with the
+ * configured settings.
+ * @return A new WebFluxSseServerTransportProvider instance
+ * @throws IllegalStateException if required parameters are not set
+ */
+ public WebFluxStatelessServerTransport build() {
+ Assert.notNull(objectMapper, "ObjectMapper must be set");
+ Assert.notNull(mcpEndpoint, "Message endpoint must be set");
+
+ return new WebFluxStatelessServerTransport(objectMapper, mcpEndpoint, contextExtractor);
+ }
+
+ }
+
+}
diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java
new file mode 100644
index 000000000..f3f6c2c33
--- /dev/null
+++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java
@@ -0,0 +1,487 @@
+/*
+ * Copyright 2025-2025 the original author or authors.
+ */
+
+package io.modelcontextprotocol.server.transport;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.modelcontextprotocol.server.DefaultMcpTransportContext;
+import io.modelcontextprotocol.server.McpTransportContextExtractor;
+import io.modelcontextprotocol.spec.HttpHeaders;
+import io.modelcontextprotocol.spec.McpError;
+import io.modelcontextprotocol.spec.McpSchema;
+import io.modelcontextprotocol.spec.McpStreamableServerSession;
+import io.modelcontextprotocol.spec.McpStreamableServerTransport;
+import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
+import io.modelcontextprotocol.spec.ProtocolVersions;
+import io.modelcontextprotocol.server.McpTransportContext;
+import io.modelcontextprotocol.util.Assert;
+import io.modelcontextprotocol.util.KeepAliveScheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.Disposable;
+import reactor.core.Exceptions;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Implementation of a WebFlux based {@link McpStreamableServerTransportProvider}.
+ *
+ * @author Dariusz Jędrzejczyk
+ */
+public class WebFluxStreamableServerTransportProvider implements McpStreamableServerTransportProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(WebFluxStreamableServerTransportProvider.class);
+
+ public static final String MESSAGE_EVENT_TYPE = "message";
+
+ private final ObjectMapper objectMapper;
+
+ private final String mcpEndpoint;
+
+ private final boolean disallowDelete;
+
+ private final RouterFunction> routerFunction;
+
+ private McpStreamableServerSession.Factory sessionFactory;
+
+ private final ConcurrentHashMap sessions = new ConcurrentHashMap<>();
+
+ private McpTransportContextExtractor contextExtractor;
+
+ private volatile boolean isClosing = false;
+
+ private KeepAliveScheduler keepAliveScheduler;
+
+ private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint,
+ McpTransportContextExtractor contextExtractor, boolean disallowDelete,
+ Duration keepAliveInterval) {
+ Assert.notNull(objectMapper, "ObjectMapper must not be null");
+ Assert.notNull(mcpEndpoint, "Message endpoint must not be null");
+ Assert.notNull(contextExtractor, "Context extractor must not be null");
+
+ this.objectMapper = objectMapper;
+ this.mcpEndpoint = mcpEndpoint;
+ this.contextExtractor = contextExtractor;
+ this.disallowDelete = disallowDelete;
+ this.routerFunction = RouterFunctions.route()
+ .GET(this.mcpEndpoint, this::handleGet)
+ .POST(this.mcpEndpoint, this::handlePost)
+ .DELETE(this.mcpEndpoint, this::handleDelete)
+ .build();
+
+ if (keepAliveInterval != null) {
+ this.keepAliveScheduler = KeepAliveScheduler
+ .builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(this.sessions.values()))
+ .initialDelay(keepAliveInterval)
+ .interval(keepAliveInterval)
+ .build();
+
+ this.keepAliveScheduler.start();
+ }
+ }
+
+ @Override
+ public List protocolVersions() {
+ return List.of(ProtocolVersions.MCP_2024_11_05, ProtocolVersions.MCP_2025_03_26);
+ }
+
+ @Override
+ public void setSessionFactory(McpStreamableServerSession.Factory sessionFactory) {
+ this.sessionFactory = sessionFactory;
+ }
+
+ @Override
+ public Mono notifyClients(String method, Object params) {
+ if (sessions.isEmpty()) {
+ logger.debug("No active sessions to broadcast message to");
+ return Mono.empty();
+ }
+
+ logger.debug("Attempting to broadcast message to {} active sessions", sessions.size());
+
+ return Flux.fromIterable(sessions.values())
+ .flatMap(session -> session.sendNotification(method, params)
+ .doOnError(
+ e -> logger.error("Failed to send message to session {}: {}", session.getId(), e.getMessage()))
+ .onErrorComplete())
+ .then();
+ }
+
+ @Override
+ public Mono closeGracefully() {
+ return Mono.defer(() -> {
+ this.isClosing = true;
+ return Flux.fromIterable(sessions.values())
+ .doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
+ .flatMap(McpStreamableServerSession::closeGracefully)
+ .then();
+ }).then().doOnSuccess(v -> {
+ sessions.clear();
+ if (this.keepAliveScheduler != null) {
+ this.keepAliveScheduler.shutdown();
+ }
+ });
+ }
+
+ /**
+ * Returns the WebFlux router function that defines the transport's HTTP endpoints.
+ * This router function should be integrated into the application's web configuration.
+ *
+ *
+ * The router function defines one endpoint with three methods:
+ *
+ * - GET {messageEndpoint} - For the client listening SSE stream
+ * - POST {messageEndpoint} - For receiving client messages
+ * - DELETE {messageEndpoint} - For removing sessions
+ *
+ * @return The configured {@link RouterFunction} for handling HTTP requests
+ */
+ public RouterFunction> getRouterFunction() {
+ return this.routerFunction;
+ }
+
+ /**
+ * Opens the listening SSE streams for clients.
+ * @param request The incoming server request
+ * @return A Mono which emits a response with the SSE event stream
+ */
+ private Mono handleGet(ServerRequest request) {
+ if (isClosing) {
+ return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
+ }
+
+ McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
+
+ return Mono.defer(() -> {
+ List acceptHeaders = request.headers().asHttpHeaders().getAccept();
+ if (!acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM)) {
+ return ServerResponse.badRequest().build();
+ }
+
+ if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
+ return ServerResponse.badRequest().build(); // TODO: say we need a session
+ // id
+ }
+
+ String sessionId = request.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID);
+
+ McpStreamableServerSession session = this.sessions.get(sessionId);
+
+ if (session == null) {
+ return ServerResponse.notFound().build();
+ }
+
+ if (request.headers().asHttpHeaders().containsKey(HttpHeaders.LAST_EVENT_ID)) {
+ String lastId = request.headers().asHttpHeaders().getFirst(HttpHeaders.LAST_EVENT_ID);
+ return ServerResponse.ok()
+ .contentType(MediaType.TEXT_EVENT_STREAM)
+ .body(session.replay(lastId), ServerSentEvent.class);
+ }
+
+ return ServerResponse.ok()
+ .contentType(MediaType.TEXT_EVENT_STREAM)
+ .body(Flux.>create(sink -> {
+ WebFluxStreamableMcpSessionTransport sessionTransport = new WebFluxStreamableMcpSessionTransport(
+ sink);
+ McpStreamableServerSession.McpStreamableServerSessionStream listeningStream = session
+ .listeningStream(sessionTransport);
+ sink.onDispose(listeningStream::close);
+ }), ServerSentEvent.class);
+
+ }).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
+ }
+
+ /**
+ * Handles incoming JSON-RPC messages from clients.
+ * @param request The incoming server request containing the JSON-RPC message
+ * @return A Mono with the response appropriate to a particular Streamable HTTP flow.
+ */
+ private Mono handlePost(ServerRequest request) {
+ if (isClosing) {
+ return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
+ }
+
+ McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
+
+ List acceptHeaders = request.headers().asHttpHeaders().getAccept();
+ if (!(acceptHeaders.contains(MediaType.APPLICATION_JSON)
+ && acceptHeaders.contains(MediaType.TEXT_EVENT_STREAM))) {
+ return ServerResponse.badRequest().build();
+ }
+
+ return request.bodyToMono(String.class).flatMap(body -> {
+ try {
+ McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body);
+ if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest
+ && jsonrpcRequest.method().equals(McpSchema.METHOD_INITIALIZE)) {
+ McpSchema.InitializeRequest initializeRequest = objectMapper.convertValue(jsonrpcRequest.params(),
+ new TypeReference() {
+ });
+ McpStreamableServerSession.McpStreamableServerSessionInit init = this.sessionFactory
+ .startSession(initializeRequest);
+ sessions.put(init.session().getId(), init.session());
+ return init.initResult().map(initializeResult -> {
+ McpSchema.JSONRPCResponse jsonrpcResponse = new McpSchema.JSONRPCResponse(
+ McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(), initializeResult, null);
+ try {
+ return this.objectMapper.writeValueAsString(jsonrpcResponse);
+ }
+ catch (IOException e) {
+ logger.warn("Failed to serialize initResponse", e);
+ throw Exceptions.propagate(e);
+ }
+ })
+ .flatMap(initResult -> ServerResponse.ok()
+ .contentType(MediaType.APPLICATION_JSON)
+ .header(HttpHeaders.MCP_SESSION_ID, init.session().getId())
+ .bodyValue(initResult));
+ }
+
+ if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
+ return ServerResponse.badRequest().bodyValue(new McpError("Session ID missing"));
+ }
+
+ String sessionId = request.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID);
+ McpStreamableServerSession session = sessions.get(sessionId);
+
+ if (session == null) {
+ return ServerResponse.status(HttpStatus.NOT_FOUND)
+ .bodyValue(new McpError("Session not found: " + sessionId));
+ }
+
+ if (message instanceof McpSchema.JSONRPCResponse jsonrpcResponse) {
+ return session.accept(jsonrpcResponse).then(ServerResponse.accepted().build());
+ }
+ else if (message instanceof McpSchema.JSONRPCNotification jsonrpcNotification) {
+ return session.accept(jsonrpcNotification).then(ServerResponse.accepted().build());
+ }
+ else if (message instanceof McpSchema.JSONRPCRequest jsonrpcRequest) {
+ return ServerResponse.ok()
+ .contentType(MediaType.TEXT_EVENT_STREAM)
+ .body(Flux.>create(sink -> {
+ WebFluxStreamableMcpSessionTransport st = new WebFluxStreamableMcpSessionTransport(sink);
+ Mono stream = session.responseStream(jsonrpcRequest, st);
+ Disposable streamSubscription = stream.onErrorComplete(err -> {
+ sink.error(err);
+ return true;
+ }).contextWrite(sink.contextView()).subscribe();
+ sink.onCancel(streamSubscription);
+ }), ServerSentEvent.class);
+ }
+ else {
+ return ServerResponse.badRequest().bodyValue(new McpError("Unknown message type"));
+ }
+ }
+ catch (IllegalArgumentException | IOException e) {
+ logger.error("Failed to deserialize message: {}", e.getMessage());
+ return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
+ }
+ })
+ .switchIfEmpty(ServerResponse.badRequest().build())
+ .contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
+ }
+
+ private Mono handleDelete(ServerRequest request) {
+ if (isClosing) {
+ return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).bodyValue("Server is shutting down");
+ }
+
+ McpTransportContext transportContext = this.contextExtractor.extract(request, new DefaultMcpTransportContext());
+
+ return Mono.defer(() -> {
+ if (!request.headers().asHttpHeaders().containsKey(HttpHeaders.MCP_SESSION_ID)) {
+ return ServerResponse.badRequest().build(); // TODO: say we need a session
+ // id
+ }
+
+ if (this.disallowDelete) {
+ return ServerResponse.status(HttpStatus.METHOD_NOT_ALLOWED).build();
+ }
+
+ String sessionId = request.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID);
+
+ McpStreamableServerSession session = this.sessions.get(sessionId);
+
+ if (session == null) {
+ return ServerResponse.notFound().build();
+ }
+
+ return session.delete().then(ServerResponse.ok().build());
+ }).contextWrite(ctx -> ctx.put(McpTransportContext.KEY, transportContext));
+ }
+
+ private class WebFluxStreamableMcpSessionTransport implements McpStreamableServerTransport {
+
+ private final FluxSink> sink;
+
+ public WebFluxStreamableMcpSessionTransport(FluxSink> sink) {
+ this.sink = sink;
+ }
+
+ @Override
+ public Mono sendMessage(McpSchema.JSONRPCMessage message) {
+ return this.sendMessage(message, null);
+ }
+
+ @Override
+ public Mono sendMessage(McpSchema.JSONRPCMessage message, String messageId) {
+ return Mono.fromSupplier(() -> {
+ try {
+ return objectMapper.writeValueAsString(message);
+ }
+ catch (IOException e) {
+ throw Exceptions.propagate(e);
+ }
+ }).doOnNext(jsonText -> {
+ ServerSentEvent