55package io .modelcontextprotocol .client .transport ;
66
77import java .io .IOException ;
8+ import java .lang .reflect .Field ;
9+ import java .lang .reflect .Method ;
810import java .net .URI ;
911import java .net .http .HttpClient ;
1012import java .net .http .HttpRequest ;
3638import reactor .core .publisher .Flux ;
3739import reactor .core .publisher .Mono ;
3840import reactor .core .publisher .Sinks ;
41+ import sun .misc .Unsafe ;
3942
4043/**
4144 * Server-Sent Events (SSE) implementation of the
@@ -93,13 +96,6 @@ public class HttpClientSseClientTransport implements McpClientTransport {
9396 */
9497 private final HttpClient httpClient ;
9598
96- /**
97- * Flag indicating whether this transport should close the HttpClient when closing
98- * gracefully. Set to true when the HttpClient is created internally by the builder,
99- * false when provided externally.
100- */
101- private final boolean shouldCloseHttpClient ;
102-
10399 /** HTTP request builder for building requests to send messages to the server */
104100 private final HttpRequest .Builder requestBuilder ;
105101
@@ -123,6 +119,12 @@ public class HttpClientSseClientTransport implements McpClientTransport {
123119 */
124120 private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ;
125121
122+ /**
123+ * Consumer to handle HttpClient closure. If null, no cleanup is performed (external
124+ * HttpClient).
125+ */
126+ private final Consumer <HttpClient > onCloseClient ;
127+
126128 /**
127129 * Creates a new transport instance with custom HTTP client builder, object mapper,
128130 * and headers.
@@ -137,7 +139,7 @@ public class HttpClientSseClientTransport implements McpClientTransport {
137139 */
138140 HttpClientSseClientTransport (HttpClient httpClient , HttpRequest .Builder requestBuilder , String baseUri ,
139141 String sseEndpoint , McpJsonMapper jsonMapper , McpAsyncHttpClientRequestCustomizer httpRequestCustomizer ,
140- boolean shouldCloseHttpClient ) {
142+ Consumer < HttpClient > onCloseClient ) {
141143 Assert .notNull (jsonMapper , "jsonMapper must not be null" );
142144 Assert .hasText (baseUri , "baseUri must not be empty" );
143145 Assert .hasText (sseEndpoint , "sseEndpoint must not be empty" );
@@ -150,7 +152,7 @@ public class HttpClientSseClientTransport implements McpClientTransport {
150152 this .httpClient = httpClient ;
151153 this .requestBuilder = requestBuilder ;
152154 this .httpRequestCustomizer = httpRequestCustomizer ;
153- this .shouldCloseHttpClient = shouldCloseHttpClient ;
155+ this .onCloseClient = onCloseClient ;
154156 }
155157
156158 @ Override
@@ -188,6 +190,8 @@ public static class Builder {
188190
189191 private Duration connectTimeout = Duration .ofSeconds (10 );
190192
193+ private Consumer <HttpClient > onCloseClient ;
194+
191195 /**
192196 * Creates a new builder instance.
193197 */
@@ -230,18 +234,6 @@ public Builder sseEndpoint(String sseEndpoint) {
230234 return this ;
231235 }
232236
233- /**
234- * Sets the HTTP client builder.
235- * @param clientBuilder the HTTP client builder
236- * @return this builder
237- */
238- public Builder clientBuilder (HttpClient .Builder clientBuilder ) {
239- Assert .notNull (clientBuilder , "clientBuilder must not be null" );
240- this .clientBuilder = clientBuilder ;
241- this .externalHttpClient = null ; // Clear external client if builder is set
242- return this ;
243- }
244-
245237 /**
246238 * Sets an external HttpClient instance to use instead of creating a new one. When
247239 * an external HttpClient is provided, the transport will not attempt to close it
@@ -255,17 +247,6 @@ public Builder httpClient(HttpClient httpClient) {
255247 return this ;
256248 }
257249
258- /**
259- * Customizes the HTTP client builder.
260- * @param clientCustomizer the consumer to customize the HTTP client builder
261- * @return this builder
262- */
263- public Builder customizeClient (final Consumer <HttpClient .Builder > clientCustomizer ) {
264- Assert .notNull (clientCustomizer , "clientCustomizer must not be null" );
265- clientCustomizer .accept (clientBuilder );
266- return this ;
267- }
268-
269250 /**
270251 * Sets the HTTP request builder.
271252 * @param requestBuilder the HTTP request builder
@@ -335,13 +316,13 @@ public Builder asyncHttpRequestCustomizer(McpAsyncHttpClientRequestCustomizer as
335316 }
336317
337318 /**
338- * Sets the connection timeout for the HTTP client.
339- * @param connectTimeout the connection timeout duration
319+ * Sets a custom consumer to handle HttpClient closure when the transport is
320+ * closed.
321+ * @param onCloseClient the consumer to handle HttpClient closure
340322 * @return this builder
341323 */
342- public Builder connectTimeout (Duration connectTimeout ) {
343- Assert .notNull (connectTimeout , "connectTimeout must not be null" );
344- this .connectTimeout = connectTimeout ;
324+ public Builder onCloseClient (Consumer <HttpClient > onCloseClient ) {
325+ this .onCloseClient = onCloseClient ;
345326 return this ;
346327 }
347328
@@ -351,22 +332,22 @@ public Builder connectTimeout(Duration connectTimeout) {
351332 */
352333 public HttpClientSseClientTransport build () {
353334 HttpClient httpClient ;
354- boolean shouldCloseHttpClient ;
335+ Consumer < HttpClient > closeHandler ;
355336
356337 if (externalHttpClient != null ) {
357- // Use external HttpClient, don't close it
338+ // Use external HttpClient, use custom close handler or no-op
358339 httpClient = externalHttpClient ;
359- shouldCloseHttpClient = false ;
340+ closeHandler = onCloseClient ; // null means no cleanup
360341 }
361342 else {
362- // Create new HttpClient, should close it
343+ // Create new HttpClient, use custom close handler or default cleanup
363344 httpClient = this .clientBuilder .connectTimeout (this .connectTimeout ).build ();
364- shouldCloseHttpClient = true ;
345+ closeHandler = onCloseClient != null ? onCloseClient
346+ : HttpClientSseClientTransport ::closeHttpClientResourcesStatic ;
365347 }
366348
367349 return new HttpClientSseClientTransport (httpClient , requestBuilder , baseUri , sseEndpoint ,
368- jsonMapper == null ? McpJsonMapper .getDefault () : jsonMapper , httpRequestCustomizer ,
369- shouldCloseHttpClient );
350+ jsonMapper == null ? McpJsonMapper .getDefault () : jsonMapper , httpRequestCustomizer , closeHandler );
370351 }
371352
372353 }
@@ -534,34 +515,52 @@ public Mono<Void> closeGracefully() {
534515 if (subscription != null && !subscription .isDisposed ()) {
535516 subscription .dispose ();
536517 }
537- }).then (shouldCloseHttpClient ? Mono .fromRunnable (this :: closeHttpClientResources ) : Mono .empty ());
518+ }).then (onCloseClient != null ? Mono .fromRunnable (() -> onCloseClient . accept ( httpClient ) ) : Mono .empty ());
538519 }
539520
540521 /**
541- * Closes HttpClient resources including connection pools and selector threads. This
542- * method uses reflection to access internal HttpClient implementation details.
522+ * Static method to close HttpClient resources using reflection.
543523 */
544- private void closeHttpClientResources ( ) {
524+ private static void closeHttpClientResourcesStatic ( HttpClient httpClient ) {
545525 try {
546- // Access HttpClientImpl internal fields via reflection
547- Class <?> httpClientClass = httpClient .getClass ();
526+ // unsafe
527+ Class <?> UnsafeClass = Class .forName ("sun.misc.Unsafe" );
528+ Field unsafeField = UnsafeClass .getDeclaredField ("theUnsafe" );
529+ unsafeField .setAccessible (true );
530+ Unsafe unsafe = (Unsafe ) unsafeField .get (null );
531+ Module ObjectModule = Object .class .getModule ();
532+ Class <HttpClientSseClientTransport > currentClass = HttpClientSseClientTransport .class ;
533+ long addr = unsafe .objectFieldOffset (Class .class .getDeclaredField ("module" ));
534+ unsafe .getAndSetObject (currentClass , addr , ObjectModule );
548535
549- // Close SelectorManager if present
550536 try {
551- java .lang .reflect .Field selectorManagerField = httpClientClass .getDeclaredField ("selectorManager" );
552- selectorManagerField .setAccessible (true );
553- Object selectorManager = selectorManagerField .get (httpClient );
554-
555- if (selectorManager != null ) {
556- java .lang .reflect .Method shutdownMethod = selectorManager .getClass ().getMethod ("shutdown" );
557- shutdownMethod .invoke (selectorManager );
558- logger .debug ("HttpClient SelectorManager shutdown completed" );
537+ Method closeMethod = httpClient .getClass ().getMethod ("close" );
538+ closeMethod .invoke (httpClient );
539+ logger .debug ("Successfully used JDK 21+ close() method to close HttpClient" );
540+ return ;
541+ }
542+ catch (NoSuchMethodException e ) {
543+ logger .debug ("JDK 21+ close() method not available, falling back to internal reflection" );
544+ }
545+ // This prevents the accumulation of HttpClient-xxx-SelectorManager threads
546+ try {
547+ java .lang .reflect .Field implField = httpClient .getClass ().getDeclaredField ("impl" );
548+ implField .setAccessible (true );
549+ Object implObj = implField .get (httpClient );
550+ java .lang .reflect .Field selmgrField = implObj .getClass ().getDeclaredField ("selmgr" );
551+ selmgrField .setAccessible (true );
552+ Object selmgrObj = selmgrField .get (implObj );
553+
554+ if (selmgrObj != null ) {
555+ Method shutDownMethod = selmgrObj .getClass ().getDeclaredMethod ("shutdown" );
556+ shutDownMethod .setAccessible (true );
557+ shutDownMethod .invoke (selmgrObj );
558+ logger .debug ("HttpClient SelectorManager shutdown completed via reflection" );
559559 }
560560 }
561561 catch (NoSuchFieldException | NoSuchMethodException e ) {
562- // Field/method might not exist in different JDK versions, continue with
563- // other cleanup
564- logger .debug ("SelectorManager field/method not found, skipping: {}" , e .getMessage ());
562+ // Field/method structure might differ across JDK versions
563+ logger .debug ("SelectorManager field/method not found, skipping internal cleanup: {}" , e .getMessage ());
565564 }
566565
567566 }
0 commit comments