Skip to content

Commit 993628c

Browse files
authored
Merge branch 'main' into feature/fix-httpclient-resource-management
2 parents c6bfd09 + 9bbfbe5 commit 993628c

File tree

16 files changed

+389
-38
lines changed

16 files changed

+389
-38
lines changed

mcp-core/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import io.modelcontextprotocol.client.transport.customizer.McpSyncHttpClientRequestCustomizer;
3232
import io.modelcontextprotocol.client.transport.ResponseSubscribers.ResponseEvent;
3333
import io.modelcontextprotocol.common.McpTransportContext;
34+
import io.modelcontextprotocol.spec.ClosedMcpTransportSession;
3435
import io.modelcontextprotocol.spec.DefaultMcpTransportSession;
3536
import io.modelcontextprotocol.spec.DefaultMcpTransportStream;
3637
import io.modelcontextprotocol.spec.HttpHeaders;
@@ -121,7 +122,7 @@ public class HttpClientStreamableHttpTransport implements McpClientTransport {
121122

122123
private final McpAsyncHttpClientRequestCustomizer httpRequestCustomizer;
123124

124-
private final AtomicReference<DefaultMcpTransportSession> activeSession = new AtomicReference<>();
125+
private final AtomicReference<McpTransportSession<Disposable>> activeSession = new AtomicReference<>();
125126

126127
private final AtomicReference<Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>>> handler = new AtomicReference<>();
127128

@@ -174,12 +175,20 @@ public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchem
174175
});
175176
}
176177

177-
private DefaultMcpTransportSession createTransportSession() {
178+
private McpTransportSession<Disposable> createTransportSession() {
178179
Function<String, Publisher<Void>> onClose = sessionId -> sessionId == null ? Mono.empty()
179180
: createDelete(sessionId);
180181
return new DefaultMcpTransportSession(onClose);
181182
}
182183

184+
private McpTransportSession<Disposable> createClosedSession(McpTransportSession<Disposable> existingSession) {
185+
var existingSessionId = Optional.ofNullable(existingSession)
186+
.filter(session -> !(session instanceof ClosedMcpTransportSession<Disposable>))
187+
.flatMap(McpTransportSession::sessionId)
188+
.orElse(null);
189+
return new ClosedMcpTransportSession<>(existingSessionId);
190+
}
191+
183192
private Publisher<Void> createDelete(String sessionId) {
184193

185194
var uri = Utils.resolveUri(this.baseUri, this.endpoint);
@@ -226,6 +235,9 @@ public Mono<Void> closeGracefully() {
226235

227236
if (onCloseClient != null) {
228237
return sessionClose.then(Mono.fromRunnable(() -> onCloseClient.accept(httpClient)));
238+
McpTransportSession<Disposable> currentSession = this.activeSession.getAndUpdate(this::createClosedSession);
239+
if (currentSession != null) {
240+
return Mono.from(currentSession.closeGracefully());
229241
}
230242
return sessionClose;
231243
});
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*/
4+
package io.modelcontextprotocol.spec;
5+
6+
import java.util.Optional;
7+
8+
import org.reactivestreams.Publisher;
9+
import reactor.core.publisher.Mono;
10+
import reactor.util.annotation.Nullable;
11+
12+
/**
13+
* Represents a closed MCP session, which may not be reused. All calls will throw a
14+
* {@link McpTransportSessionClosedException}.
15+
*
16+
* @param <CONNECTION> the resource representing the connection that the transport
17+
* manages.
18+
* @author Daniel Garnier-Moiroux
19+
*/
20+
public class ClosedMcpTransportSession<CONNECTION> implements McpTransportSession<CONNECTION> {
21+
22+
private final String sessionId;
23+
24+
public ClosedMcpTransportSession(@Nullable String sessionId) {
25+
this.sessionId = sessionId;
26+
}
27+
28+
@Override
29+
public Optional<String> sessionId() {
30+
throw new McpTransportSessionClosedException(sessionId);
31+
}
32+
33+
@Override
34+
public boolean markInitialized(String sessionId) {
35+
throw new McpTransportSessionClosedException(sessionId);
36+
}
37+
38+
@Override
39+
public void addConnection(CONNECTION connection) {
40+
throw new McpTransportSessionClosedException(sessionId);
41+
}
42+
43+
@Override
44+
public void removeConnection(CONNECTION connection) {
45+
throw new McpTransportSessionClosedException(sessionId);
46+
}
47+
48+
@Override
49+
public void close() {
50+
51+
}
52+
53+
@Override
54+
public Publisher<Void> closeGracefully() {
55+
return Mono.empty();
56+
}
57+
58+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2025-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.spec;
6+
7+
import reactor.util.annotation.Nullable;
8+
9+
/**
10+
* Exception thrown when trying to use an {@link McpTransportSession} that has been
11+
* closed.
12+
*
13+
* @see ClosedMcpTransportSession
14+
* @author Daniel Garnier-Moiroux
15+
*/
16+
public class McpTransportSessionClosedException extends RuntimeException {
17+
18+
public McpTransportSessionClosedException(@Nullable String sessionId) {
19+
super(sessionId != null ? "MCP session with ID %s has been closed".formatted(sessionId)
20+
: "MCP session has been closed");
21+
}
22+
23+
}

mcp-core/src/test/java/io/modelcontextprotocol/client/AbstractMcpAsyncClientResiliencyTests.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import io.modelcontextprotocol.spec.McpClientTransport;
1111
import io.modelcontextprotocol.spec.McpSchema;
1212
import io.modelcontextprotocol.spec.McpTransport;
13-
import org.junit.jupiter.api.Disabled;
13+
import io.modelcontextprotocol.spec.McpTransportSessionClosedException;
1414
import org.junit.jupiter.api.Test;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
@@ -222,9 +222,10 @@ void testSessionClose() {
222222
// In case of Streamable HTTP this call should issue a HTTP DELETE request
223223
// invalidating the session
224224
StepVerifier.create(mcpAsyncClient.closeGracefully()).expectComplete().verify();
225-
// The next use should immediately re-initialize with no issue and send the
226-
// request without any broken connections.
227-
StepVerifier.create(mcpAsyncClient.ping()).expectNextCount(1).verifyComplete();
225+
// The next tries to use the closed session and fails
226+
StepVerifier.create(mcpAsyncClient.ping())
227+
.expectErrorMatches(err -> err.getCause() instanceof McpTransportSessionClosedException)
228+
.verify();
228229
});
229230
}
230231

mcp-core/src/test/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransportTest.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,39 @@ void testAsyncRequestCustomizer() throws URISyntaxException {
125125
});
126126
}
127127

128+
@Test
129+
void testCloseUninitialized() {
130+
var transport = HttpClientStreamableHttpTransport.builder(host).build();
131+
132+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
133+
134+
var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
135+
McpSchema.ClientCapabilities.builder().roots(true).build(),
136+
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
137+
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
138+
"test-id", initializeRequest);
139+
140+
StepVerifier.create(transport.sendMessage(testMessage))
141+
.expectErrorMessage("MCP session has been closed")
142+
.verify();
143+
}
144+
145+
@Test
146+
void testCloseInitialized() {
147+
var transport = HttpClientStreamableHttpTransport.builder(host).build();
148+
149+
var initializeRequest = new McpSchema.InitializeRequest(McpSchema.LATEST_PROTOCOL_VERSION,
150+
McpSchema.ClientCapabilities.builder().roots(true).build(),
151+
new McpSchema.Implementation("Spring AI MCP Client", "0.3.1"));
152+
var testMessage = new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE,
153+
"test-id", initializeRequest);
154+
155+
StepVerifier.create(transport.sendMessage(testMessage)).verifyComplete();
156+
StepVerifier.create(transport.closeGracefully()).verifyComplete();
157+
158+
StepVerifier.create(transport.sendMessage(testMessage))
159+
.expectErrorMatches(err -> err.getMessage().matches("MCP session with ID [a-zA-Z0-9-]* has been closed"))
160+
.verify();
161+
}
162+
128163
}

mcp-core/src/test/java/io/modelcontextprotocol/spec/JSONRPCRequestMcpValidationTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
package io.modelcontextprotocol.spec;
66

77
import org.junit.jupiter.api.Test;
8-
import static org.junit.jupiter.api.Assertions.*;
8+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
9+
import static org.junit.jupiter.api.Assertions.assertEquals;
10+
import static org.junit.jupiter.api.Assertions.assertThrows;
11+
import static org.junit.jupiter.api.Assertions.assertTrue;
912

1013
/**
1114
* Tests for MCP-specific validation of JSONRPCRequest ID requirements.

mcp-core/src/test/java/io/modelcontextprotocol/spec/McpErrorTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
import java.util.Map;
66

7-
import static org.junit.jupiter.api.Assertions.*;
7+
import static org.junit.jupiter.api.Assertions.assertNotNull;
8+
import static org.junit.jupiter.api.Assertions.assertEquals;
89

910
class McpErrorTest {
1011

mcp-core/src/test/java/io/modelcontextprotocol/spec/PromptReferenceEqualsTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import io.modelcontextprotocol.spec.McpSchema.PromptReference;
88
import org.junit.jupiter.api.Test;
99

10-
import static org.junit.jupiter.api.Assertions.*;
10+
import static org.junit.jupiter.api.Assertions.assertTrue;
11+
import static org.junit.jupiter.api.Assertions.assertFalse;
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
1113

1214
/**
1315
* Test class to verify the equals method implementation for PromptReference.

mcp-core/src/test/java/io/modelcontextprotocol/spec/json/gson/GsonMcpJsonMapperTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import java.util.List;
99
import java.util.Map;
1010

11-
import static org.junit.jupiter.api.Assertions.*;
11+
import static org.junit.jupiter.api.Assertions.assertNotNull;
12+
import static org.junit.jupiter.api.Assertions.assertTrue;
13+
import static org.junit.jupiter.api.Assertions.assertEquals;
14+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
1215

1316
class GsonMcpJsonMapperTests {
1417

mcp-core/src/test/java/io/modelcontextprotocol/util/AssertTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
import java.util.List;
1010

11-
import static org.junit.jupiter.api.Assertions.*;
11+
import static org.junit.jupiter.api.Assertions.assertThrows;
12+
import static org.junit.jupiter.api.Assertions.assertEquals;
13+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
1214

1315
class AssertTests {
1416

0 commit comments

Comments
 (0)