Skip to content

Commit b6d132b

Browse files
committed
feat: implement MCP-compliant keep-alive functionality for server transports
- Add KeepAliveScheduler utility class for configurable periodic session pings - Integrate keep-alive support in WebFlux, WebMVC, and HttpServlet SSE transport providers - Add keepAliveInterval configuration option to all transport provider builders - Deprecate existing constructors in favor of builder pattern with enhanced configuration - Update graceful shutdown to properly clean up keep-alive schedulers - Add unit tests for KeepAliveScheduler functionality Implements MCP specification recommendations for connection health detection: - Configurable ping frequency to suit different network environments - Optional keep-alive (disabled by default) to avoid excessive network overhead - Proper resource cleanup to prevent connection leaks https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/ping#implementation-considerations Signed-off-by: Christian Tzolov <[email protected]>
1 parent c3a0b18 commit b6d132b

File tree

5 files changed

+809
-17
lines changed

5 files changed

+809
-17
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.modelcontextprotocol.server.transport;
22

33
import java.io.IOException;
4+
import java.time.Duration;
45
import java.util.concurrent.ConcurrentHashMap;
56

67
import com.fasterxml.jackson.core.type.TypeReference;
@@ -11,6 +12,8 @@
1112
import io.modelcontextprotocol.spec.McpServerTransport;
1213
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1314
import io.modelcontextprotocol.util.Assert;
15+
import io.modelcontextprotocol.util.KeepAliveScheduler;
16+
1417
import org.slf4j.Logger;
1518
import org.slf4j.LoggerFactory;
1619
import reactor.core.Exceptions;
@@ -109,6 +112,12 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
109112
*/
110113
private volatile boolean isClosing = false;
111114

115+
/**
116+
* Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
117+
* set. Disabled by default.
118+
*/
119+
private KeepAliveScheduler keepAliveScheduler;
120+
112121
/**
113122
* Constructs a new WebFlux SSE server transport provider instance with the default
114123
* SSE endpoint.
@@ -118,7 +127,10 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
118127
* messages. This endpoint will be communicated to clients during SSE connection
119128
* setup. Must not be null.
120129
* @throws IllegalArgumentException if either parameter is null
130+
* @deprecated Use the builder {@link #builder()} instead for better configuration
131+
* options.
121132
*/
133+
@Deprecated
122134
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
123135
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
124136
}
@@ -131,7 +143,10 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
131143
* messages. This endpoint will be communicated to clients during SSE connection
132144
* setup. Must not be null.
133145
* @throws IllegalArgumentException if either parameter is null
146+
* @deprecated Use the builder {@link #builder()} instead for better configuration
147+
* options.
134148
*/
149+
@Deprecated
135150
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
136151
this(objectMapper, DEFAULT_BASE_URL, messageEndpoint, sseEndpoint);
137152
}
@@ -145,9 +160,32 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
145160
* messages. This endpoint will be communicated to clients during SSE connection
146161
* setup. Must not be null.
147162
* @throws IllegalArgumentException if either parameter is null
163+
* @deprecated Use the builder {@link #builder()} instead for better configuration
164+
* options.
148165
*/
166+
@Deprecated
149167
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
150168
String sseEndpoint) {
169+
this(objectMapper, baseUrl, messageEndpoint, sseEndpoint, null);
170+
}
171+
172+
/**
173+
* Constructs a new WebFlux SSE server transport provider instance.
174+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
175+
* of MCP messages. Must not be null.
176+
* @param baseUrl webflux message base path
177+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
178+
* messages. This endpoint will be communicated to clients during SSE connection
179+
* setup. Must not be null.
180+
* @param sseEndpoint The SSE endpoint path. Must not be null.
181+
* @param keepAliveInterval The interval for sending keep-alive pings to clients.
182+
* @throws IllegalArgumentException if either parameter is null
183+
* @deprecated Use the builder {@link #builder()} instead for better configuration
184+
* options.
185+
*/
186+
@Deprecated
187+
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
188+
String sseEndpoint, Duration keepAliveInterval) {
151189
Assert.notNull(objectMapper, "ObjectMapper must not be null");
152190
Assert.notNull(baseUrl, "Message base path must not be null");
153191
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
@@ -161,6 +199,17 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseU
161199
.GET(this.sseEndpoint, this::handleSseConnection)
162200
.POST(this.messageEndpoint, this::handleMessage)
163201
.build();
202+
203+
if (keepAliveInterval != null) {
204+
205+
this.keepAliveScheduler = KeepAliveScheduler
206+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
207+
.initialDelay(keepAliveInterval)
208+
.interval(keepAliveInterval)
209+
.build();
210+
211+
this.keepAliveScheduler.start();
212+
}
164213
}
165214

166215
@Override
@@ -209,23 +258,21 @@ public Mono<Void> notifyClients(String method, Object params) {
209258
/**
210259
* Initiates a graceful shutdown of all the sessions. This method ensures all active
211260
* sessions are properly closed and cleaned up.
212-
*
213-
* <p>
214-
* The shutdown process:
215-
* <ul>
216-
* <li>Marks the transport as closing to prevent new connections</li>
217-
* <li>Closes each active session</li>
218-
* <li>Removes closed sessions from the sessions map</li>
219-
* <li>Times out after 5 seconds if shutdown takes too long</li>
220-
* </ul>
221261
* @return A Mono that completes when all sessions have been closed
222262
*/
223263
@Override
224264
public Mono<Void> closeGracefully() {
225265
return Flux.fromIterable(sessions.values())
226266
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
227267
.flatMap(McpServerSession::closeGracefully)
228-
.then();
268+
.then()
269+
.doOnSuccess(v -> {
270+
logger.debug("Graceful shutdown completed");
271+
sessions.clear();
272+
if (this.keepAliveScheduler != null) {
273+
this.keepAliveScheduler.shutdown();
274+
}
275+
});
229276
}
230277

231278
/**
@@ -396,6 +443,8 @@ public static class Builder {
396443

397444
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
398445

446+
private Duration keepAliveInterval;
447+
399448
/**
400449
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
401450
* messages.
@@ -446,6 +495,17 @@ public Builder sseEndpoint(String sseEndpoint) {
446495
return this;
447496
}
448497

498+
/**
499+
* Sets the interval for sending keep-alive pings to clients.
500+
* @param keepAliveInterval The keep-alive interval duration. If null, keep-alive
501+
* is disabled.
502+
* @return this builder instance
503+
*/
504+
public Builder keepAliveInterval(Duration keepAliveInterval) {
505+
this.keepAliveInterval = keepAliveInterval;
506+
return this;
507+
}
508+
449509
/**
450510
* Builds a new instance of {@link WebFluxSseServerTransportProvider} with the
451511
* configured settings.
@@ -456,7 +516,8 @@ public WebFluxSseServerTransportProvider build() {
456516
Assert.notNull(objectMapper, "ObjectMapper must be set");
457517
Assert.notNull(messageEndpoint, "Message endpoint must be set");
458518

459-
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint);
519+
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint,
520+
keepAliveInterval);
460521
}
461522

462523
}

mcp-spring/mcp-spring-webmvc/src/main/java/io/modelcontextprotocol/server/transport/WebMvcSseServerTransportProvider.java

Lines changed: 154 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1818
import io.modelcontextprotocol.spec.McpServerSession;
1919
import io.modelcontextprotocol.util.Assert;
20+
import io.modelcontextprotocol.util.KeepAliveScheduler;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224
import reactor.core.publisher.Flux;
@@ -106,6 +108,8 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
106108
*/
107109
private volatile boolean isClosing = false;
108110

111+
private KeepAliveScheduler keepAliveScheduler;
112+
109113
/**
110114
* Constructs a new WebMvcSseServerTransportProvider instance with the default SSE
111115
* endpoint.
@@ -115,7 +119,10 @@ public class WebMvcSseServerTransportProvider implements McpServerTransportProvi
115119
* messages via HTTP POST. This endpoint will be communicated to clients through the
116120
* SSE connection's initial endpoint event.
117121
* @throws IllegalArgumentException if either objectMapper or messageEndpoint is null
122+
* @deprecated Use the builder {@link #builder()} instead for better configuration
123+
* options.
118124
*/
125+
@Deprecated
119126
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
120127
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
121128
}
@@ -129,7 +136,10 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messag
129136
* SSE connection's initial endpoint event.
130137
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
131138
* @throws IllegalArgumentException if any parameter is null
139+
* @deprecated Use the builder {@link #builder()} instead for better configuration
140+
* options.
132141
*/
142+
@Deprecated
133143
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
134144
this(objectMapper, "", messageEndpoint, sseEndpoint);
135145
}
@@ -145,9 +155,33 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String messag
145155
* SSE connection's initial endpoint event.
146156
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
147157
* @throws IllegalArgumentException if any parameter is null
158+
* @deprecated Use the builder {@link #builder()} instead for better configuration
159+
* options.
148160
*/
161+
@Deprecated
149162
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
150163
String sseEndpoint) {
164+
this(objectMapper, baseUrl, messageEndpoint, sseEndpoint, null);
165+
}
166+
167+
/**
168+
* Constructs a new WebMvcSseServerTransportProvider instance.
169+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
170+
* of messages.
171+
* @param baseUrl The base URL for the message endpoint, used to construct the full
172+
* endpoint URL for clients.
173+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
174+
* messages via HTTP POST. This endpoint will be communicated to clients through the
175+
* SSE connection's initial endpoint event.
176+
* @param sseEndpoint The endpoint URI where clients establish their SSE connections.
177+
* * @param keepAliveInterval The interval for sending keep-alive messages to
178+
* @throws IllegalArgumentException if any parameter is null
179+
* @deprecated Use the builder {@link #builder()} instead for better configuration
180+
* options.
181+
*/
182+
@Deprecated
183+
public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
184+
String sseEndpoint, Duration keepAliveInterval) {
151185
Assert.notNull(objectMapper, "ObjectMapper must not be null");
152186
Assert.notNull(baseUrl, "Message base URL must not be null");
153187
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
@@ -161,6 +195,17 @@ public WebMvcSseServerTransportProvider(ObjectMapper objectMapper, String baseUr
161195
.GET(this.sseEndpoint, this::handleSseConnection)
162196
.POST(this.messageEndpoint, this::handleMessage)
163197
.build();
198+
199+
if (keepAliveInterval != null) {
200+
201+
this.keepAliveScheduler = KeepAliveScheduler
202+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
203+
.initialDelay(keepAliveInterval)
204+
.interval(keepAliveInterval)
205+
.build();
206+
207+
this.keepAliveScheduler.start();
208+
}
164209
}
165210

166211
@Override
@@ -208,10 +253,13 @@ public Mono<Void> closeGracefully() {
208253
return Flux.fromIterable(sessions.values()).doFirst(() -> {
209254
this.isClosing = true;
210255
logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size());
211-
})
212-
.flatMap(McpServerSession::closeGracefully)
213-
.then()
214-
.doOnSuccess(v -> logger.debug("Graceful shutdown completed"));
256+
}).flatMap(McpServerSession::closeGracefully).then().doOnSuccess(v -> {
257+
logger.debug("Graceful shutdown completed");
258+
sessions.clear();
259+
if (this.keepAliveScheduler != null) {
260+
this.keepAliveScheduler.shutdown();
261+
}
262+
});
215263
}
216264

217265
/**
@@ -416,4 +464,106 @@ public void close() {
416464

417465
}
418466

467+
/**
468+
* Creates a new Builder instance for configuring and creating instances of
469+
* WebMvcSseServerTransportProvider.
470+
* @return A new Builder instance
471+
*/
472+
public static Builder builder() {
473+
return new Builder();
474+
}
475+
476+
/**
477+
* Builder for creating instances of WebMvcSseServerTransportProvider.
478+
* <p>
479+
* This builder provides a fluent API for configuring and creating instances of
480+
* WebMvcSseServerTransportProvider with custom settings.
481+
*/
482+
public static class Builder {
483+
484+
private ObjectMapper objectMapper = new ObjectMapper();
485+
486+
private String baseUrl = "";
487+
488+
private String messageEndpoint;
489+
490+
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
491+
492+
private Duration keepAliveInterval;
493+
494+
/**
495+
* Sets the JSON object mapper to use for message serialization/deserialization.
496+
* @param objectMapper The object mapper to use
497+
* @return This builder instance for method chaining
498+
*/
499+
public Builder objectMapper(ObjectMapper objectMapper) {
500+
Assert.notNull(objectMapper, "ObjectMapper must not be null");
501+
this.objectMapper = objectMapper;
502+
return this;
503+
}
504+
505+
/**
506+
* Sets the base URL for the server transport.
507+
* @param baseUrl The base URL to use
508+
* @return This builder instance for method chaining
509+
*/
510+
public Builder baseUrl(String baseUrl) {
511+
Assert.notNull(baseUrl, "Base URL must not be null");
512+
this.baseUrl = baseUrl;
513+
return this;
514+
}
515+
516+
/**
517+
* Sets the endpoint path where clients will send their messages.
518+
* @param messageEndpoint The message endpoint path
519+
* @return This builder instance for method chaining
520+
*/
521+
public Builder messageEndpoint(String messageEndpoint) {
522+
Assert.hasText(messageEndpoint, "Message endpoint must not be empty");
523+
this.messageEndpoint = messageEndpoint;
524+
return this;
525+
}
526+
527+
/**
528+
* Sets the endpoint path where clients will establish SSE connections.
529+
* <p>
530+
* If not specified, the default value of {@link #DEFAULT_SSE_ENDPOINT} will be
531+
* used.
532+
* @param sseEndpoint The SSE endpoint path
533+
* @return This builder instance for method chaining
534+
*/
535+
public Builder sseEndpoint(String sseEndpoint) {
536+
Assert.hasText(sseEndpoint, "SSE endpoint must not be empty");
537+
this.sseEndpoint = sseEndpoint;
538+
return this;
539+
}
540+
541+
/**
542+
* Sets the interval for keep-alive pings.
543+
* <p>
544+
* If not specified, keep-alive pings will be disabled.
545+
* @param keepAliveInterval The interval duration for keep-alive pings
546+
* @return This builder instance for method chaining
547+
*/
548+
public Builder keepAliveInterval(Duration keepAliveInterval) {
549+
this.keepAliveInterval = keepAliveInterval;
550+
return this;
551+
}
552+
553+
/**
554+
* Builds a new instance of WebMvcSseServerTransportProvider with the configured
555+
* settings.
556+
* @return A new WebMvcSseServerTransportProvider instance
557+
* @throws IllegalStateException if objectMapper or messageEndpoint is not set
558+
*/
559+
public WebMvcSseServerTransportProvider build() {
560+
if (messageEndpoint == null) {
561+
throw new IllegalStateException("MessageEndpoint must be set");
562+
}
563+
return new WebMvcSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint,
564+
keepAliveInterval);
565+
}
566+
567+
}
568+
419569
}

0 commit comments

Comments
 (0)