Skip to content

Commit ee2c891

Browse files
committed
feat: implement MCP-compliant keep-alive functionality for server transports
- Add KeepAliveScheduler utility class for configurable periodic session pings - Integrate keep-alive support in WebFlux, WebMVC, and HttpServlet SSE transport providers - Add keepAliveInterval configuration option to all transport provider builders - Deprecate existing constructors in favor of builder pattern with enhanced configuration - Update graceful shutdown to properly clean up keep-alive schedulers - Add unit tests for KeepAliveScheduler functionality Implements MCP specification recommendations for connection health detection: - Configurable ping frequency to suit different network environments - Optional keep-alive (disabled by default) to avoid excessive network overhead - Proper resource cleanup to prevent connection leaks https://modelcontextprotocol.io/specification/2025-06-18/basic/utilities/ping#implementation-considerations Resolves: #414, #158 Replaces #353 Signed-off-by: Christian Tzolov <[email protected]>
1 parent ae2fc86 commit ee2c891

File tree

11 files changed

+954
-35
lines changed

11 files changed

+954
-35
lines changed

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxSseServerTransportProvider.java

Lines changed: 72 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.modelcontextprotocol.server.transport;
22

33
import java.io.IOException;
4+
import java.time.Duration;
45
import java.util.concurrent.ConcurrentHashMap;
56

67
import com.fasterxml.jackson.core.type.TypeReference;
@@ -11,6 +12,8 @@
1112
import io.modelcontextprotocol.spec.McpServerTransport;
1213
import io.modelcontextprotocol.spec.McpServerTransportProvider;
1314
import io.modelcontextprotocol.util.Assert;
15+
import io.modelcontextprotocol.util.KeepAliveScheduler;
16+
1417
import org.slf4j.Logger;
1518
import org.slf4j.LoggerFactory;
1619
import reactor.core.Exceptions;
@@ -109,6 +112,12 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
109112
*/
110113
private volatile boolean isClosing = false;
111114

115+
/**
116+
* Keep-alive scheduler for managing session pings. Activated if keepAliveInterval is
117+
* set. Disabled by default.
118+
*/
119+
private KeepAliveScheduler keepAliveScheduler;
120+
112121
/**
113122
* Constructs a new WebFlux SSE server transport provider instance with the default
114123
* SSE endpoint.
@@ -118,7 +127,10 @@ public class WebFluxSseServerTransportProvider implements McpServerTransportProv
118127
* messages. This endpoint will be communicated to clients during SSE connection
119128
* setup. Must not be null.
120129
* @throws IllegalArgumentException if either parameter is null
130+
* @deprecated Use the builder {@link #builder()} instead for better configuration
131+
* options.
121132
*/
133+
@Deprecated
122134
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint) {
123135
this(objectMapper, messageEndpoint, DEFAULT_SSE_ENDPOINT);
124136
}
@@ -131,7 +143,10 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
131143
* messages. This endpoint will be communicated to clients during SSE connection
132144
* setup. Must not be null.
133145
* @throws IllegalArgumentException if either parameter is null
146+
* @deprecated Use the builder {@link #builder()} instead for better configuration
147+
* options.
134148
*/
149+
@Deprecated
135150
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messageEndpoint, String sseEndpoint) {
136151
this(objectMapper, DEFAULT_BASE_URL, messageEndpoint, sseEndpoint);
137152
}
@@ -145,9 +160,32 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String messa
145160
* messages. This endpoint will be communicated to clients during SSE connection
146161
* setup. Must not be null.
147162
* @throws IllegalArgumentException if either parameter is null
163+
* @deprecated Use the builder {@link #builder()} instead for better configuration
164+
* options.
148165
*/
166+
@Deprecated
149167
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
150168
String sseEndpoint) {
169+
this(objectMapper, baseUrl, messageEndpoint, sseEndpoint, null);
170+
}
171+
172+
/**
173+
* Constructs a new WebFlux SSE server transport provider instance.
174+
* @param objectMapper The ObjectMapper to use for JSON serialization/deserialization
175+
* of MCP messages. Must not be null.
176+
* @param baseUrl webflux message base path
177+
* @param messageEndpoint The endpoint URI where clients should send their JSON-RPC
178+
* messages. This endpoint will be communicated to clients during SSE connection
179+
* setup. Must not be null.
180+
* @param sseEndpoint The SSE endpoint path. Must not be null.
181+
* @param keepAliveInterval The interval for sending keep-alive pings to clients.
182+
* @throws IllegalArgumentException if either parameter is null
183+
* @deprecated Use the builder {@link #builder()} instead for better configuration
184+
* options.
185+
*/
186+
@Deprecated
187+
public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseUrl, String messageEndpoint,
188+
String sseEndpoint, Duration keepAliveInterval) {
151189
Assert.notNull(objectMapper, "ObjectMapper must not be null");
152190
Assert.notNull(baseUrl, "Message base path must not be null");
153191
Assert.notNull(messageEndpoint, "Message endpoint must not be null");
@@ -161,6 +199,17 @@ public WebFluxSseServerTransportProvider(ObjectMapper objectMapper, String baseU
161199
.GET(this.sseEndpoint, this::handleSseConnection)
162200
.POST(this.messageEndpoint, this::handleMessage)
163201
.build();
202+
203+
if (keepAliveInterval != null) {
204+
205+
this.keepAliveScheduler = KeepAliveScheduler
206+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(sessions.values()))
207+
.initialDelay(keepAliveInterval)
208+
.interval(keepAliveInterval)
209+
.build();
210+
211+
this.keepAliveScheduler.start();
212+
}
164213
}
165214

166215
@Override
@@ -209,23 +258,21 @@ public Mono<Void> notifyClients(String method, Object params) {
209258
/**
210259
* Initiates a graceful shutdown of all the sessions. This method ensures all active
211260
* sessions are properly closed and cleaned up.
212-
*
213-
* <p>
214-
* The shutdown process:
215-
* <ul>
216-
* <li>Marks the transport as closing to prevent new connections</li>
217-
* <li>Closes each active session</li>
218-
* <li>Removes closed sessions from the sessions map</li>
219-
* <li>Times out after 5 seconds if shutdown takes too long</li>
220-
* </ul>
221261
* @return A Mono that completes when all sessions have been closed
222262
*/
223263
@Override
224264
public Mono<Void> closeGracefully() {
225265
return Flux.fromIterable(sessions.values())
226266
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
227267
.flatMap(McpServerSession::closeGracefully)
228-
.then();
268+
.then()
269+
.doOnSuccess(v -> {
270+
logger.debug("Graceful shutdown completed");
271+
sessions.clear();
272+
if (this.keepAliveScheduler != null) {
273+
this.keepAliveScheduler.shutdown();
274+
}
275+
});
229276
}
230277

231278
/**
@@ -396,6 +443,8 @@ public static class Builder {
396443

397444
private String sseEndpoint = DEFAULT_SSE_ENDPOINT;
398445

446+
private Duration keepAliveInterval;
447+
399448
/**
400449
* Sets the ObjectMapper to use for JSON serialization/deserialization of MCP
401450
* messages.
@@ -446,6 +495,17 @@ public Builder sseEndpoint(String sseEndpoint) {
446495
return this;
447496
}
448497

498+
/**
499+
* Sets the interval for sending keep-alive pings to clients.
500+
* @param keepAliveInterval The keep-alive interval duration. If null, keep-alive
501+
* is disabled.
502+
* @return this builder instance
503+
*/
504+
public Builder keepAliveInterval(Duration keepAliveInterval) {
505+
this.keepAliveInterval = keepAliveInterval;
506+
return this;
507+
}
508+
449509
/**
450510
* Builds a new instance of {@link WebFluxSseServerTransportProvider} with the
451511
* configured settings.
@@ -456,7 +516,8 @@ public WebFluxSseServerTransportProvider build() {
456516
Assert.notNull(objectMapper, "ObjectMapper must be set");
457517
Assert.notNull(messageEndpoint, "Message endpoint must be set");
458518

459-
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint);
519+
return new WebFluxSseServerTransportProvider(objectMapper, baseUrl, messageEndpoint, sseEndpoint,
520+
keepAliveInterval);
460521
}
461522

462523
}

mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/server/transport/WebFluxStreamableServerTransportProvider.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
import io.modelcontextprotocol.spec.McpStreamableServerTransportProvider;
1313
import io.modelcontextprotocol.server.McpTransportContext;
1414
import io.modelcontextprotocol.util.Assert;
15+
import io.modelcontextprotocol.util.KeepAliveScheduler;
16+
1517
import org.slf4j.Logger;
1618
import org.slf4j.LoggerFactory;
1719
import org.springframework.http.HttpStatus;
@@ -28,6 +30,7 @@
2830
import reactor.core.publisher.Mono;
2931

3032
import java.io.IOException;
33+
import java.time.Duration;
3134
import java.util.List;
3235
import java.util.concurrent.ConcurrentHashMap;
3336

@@ -58,8 +61,11 @@ public class WebFluxStreamableServerTransportProvider implements McpStreamableSe
5861

5962
private volatile boolean isClosing = false;
6063

64+
private KeepAliveScheduler keepAliveScheduler;
65+
6166
private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, String mcpEndpoint,
62-
McpTransportContextExtractor<ServerRequest> contextExtractor, boolean disallowDelete) {
67+
McpTransportContextExtractor<ServerRequest> contextExtractor, boolean disallowDelete,
68+
Duration keepAliveInterval) {
6369
Assert.notNull(objectMapper, "ObjectMapper must not be null");
6470
Assert.notNull(mcpEndpoint, "Message endpoint must not be null");
6571
Assert.notNull(contextExtractor, "Context extractor must not be null");
@@ -73,6 +79,20 @@ private WebFluxStreamableServerTransportProvider(ObjectMapper objectMapper, Stri
7379
.POST(this.mcpEndpoint, this::handlePost)
7480
.DELETE(this.mcpEndpoint, this::handleDelete)
7581
.build();
82+
83+
if (keepAliveInterval != null) {
84+
this.keepAliveScheduler = KeepAliveScheduler
85+
.builder(() -> (isClosing) ? Flux.empty() : Flux.fromIterable(this.sessions.values()))
86+
.initialDelay(keepAliveInterval)
87+
.interval(keepAliveInterval)
88+
.build();
89+
90+
this.keepAliveScheduler.start();
91+
}
92+
else {
93+
logger.warn("Keep-alive interval is not set or invalid. No keep-alive will be scheduled.");
94+
}
95+
7696
}
7797

7898
@Override
@@ -105,6 +125,11 @@ public Mono<Void> closeGracefully() {
105125
.doFirst(() -> logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()))
106126
.flatMap(McpStreamableServerSession::closeGracefully)
107127
.then();
128+
}).then().doOnSuccess(v -> {
129+
sessions.clear();
130+
if (this.keepAliveScheduler != null) {
131+
this.keepAliveScheduler.shutdown();
132+
}
108133
});
109134
}
110135

@@ -368,6 +393,8 @@ public static class Builder {
368393

369394
private boolean disallowDelete;
370395

396+
private Duration keepAliveInterval;
397+
371398
private Builder() {
372399
// used by a static method
373400
}
@@ -424,6 +451,17 @@ public Builder disallowDelete(boolean disallowDelete) {
424451
return this;
425452
}
426453

454+
/**
455+
* Sets the keep-alive interval for the server transport.
456+
* @param keepAliveInterval The interval for sending keep-alive messages. If null,
457+
* no keep-alive will be scheduled.
458+
* @return this builder instance
459+
*/
460+
public Builder keepAliveInterval(Duration keepAliveInterval) {
461+
this.keepAliveInterval = keepAliveInterval;
462+
return this;
463+
}
464+
427465
/**
428466
* Builds a new instance of {@link WebFluxStreamableServerTransportProvider} with
429467
* the configured settings.
@@ -435,7 +473,7 @@ public WebFluxStreamableServerTransportProvider build() {
435473
Assert.notNull(mcpEndpoint, "Message endpoint must be set");
436474

437475
return new WebFluxStreamableServerTransportProvider(objectMapper, mcpEndpoint, contextExtractor,
438-
disallowDelete);
476+
disallowDelete, keepAliveInterval);
439477
}
440478

441479
}

0 commit comments

Comments
 (0)