Skip to content

Commit ae2fc86

Browse files
Kehrlanntzolov
authored andcommitted
Introduce HttpRequest.Builder customizer for HttpClient-based transport
- Minor improvement: speed up HttpClientSseClientTransportTests by reusing the MCP Server container across tests. - Minor improvement: rename "messageSink" to "deliveredSink" in HttpClientStreamableHttpTransport#sendMessage
1 parent bde1b6b commit ae2fc86

File tree

6 files changed

+539
-150
lines changed

6 files changed

+539
-150
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Copyright 2024-2025 the original author or authors.
3+
*/
4+
5+
package io.modelcontextprotocol.client.transport;
6+
7+
import java.net.URI;
8+
import java.net.http.HttpRequest;
9+
import org.reactivestreams.Publisher;
10+
import reactor.core.publisher.Mono;
11+
import reactor.core.scheduler.Schedulers;
12+
import reactor.util.annotation.Nullable;
13+
14+
/**
15+
* Customize {@link HttpRequest.Builder} before executing the request, in either SSE or
16+
* Streamable HTTP transport.
17+
* <p>
18+
* When used in a non-blocking context, implementations MUST be non-blocking.
19+
*
20+
* @author Daniel Garnier-Moiroux
21+
*/
22+
public interface AsyncHttpRequestCustomizer {
23+
24+
Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
25+
@Nullable String body);
26+
27+
AsyncHttpRequestCustomizer NOOP = new Noop();
28+
29+
/**
30+
* Wrap a sync implementation in an async wrapper.
31+
* <p>
32+
* Do NOT wrap a blocking implementation for use in a non-blocking context. For a
33+
* blocking implementation, consider using {@link Schedulers#boundedElastic()}.
34+
*/
35+
static AsyncHttpRequestCustomizer fromSync(SyncHttpRequestCustomizer customizer) {
36+
return (builder, method, uri, body) -> Mono.fromSupplier(() -> {
37+
customizer.customize(builder, method, uri, body);
38+
return builder;
39+
});
40+
}
41+
42+
class Noop implements AsyncHttpRequestCustomizer {
43+
44+
@Override
45+
public Publisher<HttpRequest.Builder> customize(HttpRequest.Builder builder, String method, URI endpoint,
46+
String body) {
47+
return Mono.just(builder);
48+
}
49+
50+
}
51+
52+
}

mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientSseClientTransport.java

Lines changed: 80 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 - 2024 the original author or authors.
2+
* Copyright 2024 - 2025 the original author or authors.
33
*/
44
package io.modelcontextprotocol.client.transport;
55

@@ -102,6 +102,11 @@ public class HttpClientSseClientTransport implements McpClientTransport {
102102
*/
103103
protected final Sinks.One<String> messageEndpointSink = Sinks.one();
104104

105+
/**
106+
* Customizer to modify requests before they are executed.
107+
*/
108+
private final AsyncHttpRequestCustomizer httpRequestCustomizer;
109+
105110
/**
106111
* Creates a new transport instance with default HTTP client and object mapper.
107112
* @param baseUri the base URI of the MCP server
@@ -172,18 +177,38 @@ public HttpClientSseClientTransport(HttpClient.Builder clientBuilder, HttpReques
172177
* @param objectMapper the object mapper for JSON serialization/deserialization
173178
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
174179
*/
180+
@Deprecated(forRemoval = true)
175181
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
176182
String sseEndpoint, ObjectMapper objectMapper) {
183+
this(httpClient, requestBuilder, baseUri, sseEndpoint, objectMapper, AsyncHttpRequestCustomizer.NOOP);
184+
}
185+
186+
/**
187+
* Creates a new transport instance with custom HTTP client builder, object mapper,
188+
* and headers.
189+
* @param httpClient the HTTP client to use
190+
* @param requestBuilder the HTTP request builder to use
191+
* @param baseUri the base URI of the MCP server
192+
* @param sseEndpoint the SSE endpoint path
193+
* @param objectMapper the object mapper for JSON serialization/deserialization
194+
* @param httpRequestCustomizer customizer for the requestBuilder before executing
195+
* requests
196+
* @throws IllegalArgumentException if objectMapper, clientBuilder, or headers is null
197+
*/
198+
HttpClientSseClientTransport(HttpClient httpClient, HttpRequest.Builder requestBuilder, String baseUri,
199+
String sseEndpoint, ObjectMapper objectMapper, AsyncHttpRequestCustomizer httpRequestCustomizer) {
177200
Assert.notNull(objectMapper, "ObjectMapper must not be null");
178201
Assert.hasText(baseUri, "baseUri must not be empty");
179202
Assert.hasText(sseEndpoint, "sseEndpoint must not be empty");
180203
Assert.notNull(httpClient, "httpClient must not be null");
181204
Assert.notNull(requestBuilder, "requestBuilder must not be null");
205+
Assert.notNull(httpRequestCustomizer, "httpRequestCustomizer must not be null");
182206
this.baseUri = URI.create(baseUri);
183207
this.sseEndpoint = sseEndpoint;
184208
this.objectMapper = objectMapper;
185209
this.httpClient = httpClient;
186210
this.requestBuilder = requestBuilder;
211+
this.httpRequestCustomizer = httpRequestCustomizer;
187212
}
188213

189214
/**
@@ -213,6 +238,8 @@ public static class Builder {
213238
private HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
214239
.header("Content-Type", "application/json");
215240

241+
private AsyncHttpRequestCustomizer httpRequestCustomizer = AsyncHttpRequestCustomizer.NOOP;
242+
216243
/**
217244
* Creates a new builder instance.
218245
*/
@@ -310,31 +337,66 @@ public Builder objectMapper(ObjectMapper objectMapper) {
310337
return this;
311338
}
312339

340+
/**
341+
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
342+
* executing them.
343+
* <p>
344+
* This overrides the customizer from
345+
* {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}.
346+
* <p>
347+
* Do NOT use a blocking {@link SyncHttpRequestCustomizer} in a non-blocking
348+
* context. Use {@link #asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer)}
349+
* instead.
350+
* @param syncHttpRequestCustomizer the request customizer
351+
* @return this builder
352+
*/
353+
public Builder httpRequestCustomizer(SyncHttpRequestCustomizer syncHttpRequestCustomizer) {
354+
this.httpRequestCustomizer = AsyncHttpRequestCustomizer.fromSync(syncHttpRequestCustomizer);
355+
return this;
356+
}
357+
358+
/**
359+
* Sets the customizer for {@link HttpRequest.Builder}, to modify requests before
360+
* executing them.
361+
* <p>
362+
* This overrides the customizer from
363+
* {@link #httpRequestCustomizer(SyncHttpRequestCustomizer)}.
364+
* <p>
365+
* Do NOT use a blocking implementation in a non-blocking context.
366+
* @param asyncHttpRequestCustomizer the request customizer
367+
* @return this builder
368+
*/
369+
public Builder asyncHttpRequestCustomizer(AsyncHttpRequestCustomizer asyncHttpRequestCustomizer) {
370+
this.httpRequestCustomizer = asyncHttpRequestCustomizer;
371+
return this;
372+
}
373+
313374
/**
314375
* Builds a new {@link HttpClientSseClientTransport} instance.
315376
* @return a new transport instance
316377
*/
317378
public HttpClientSseClientTransport build() {
318379
return new HttpClientSseClientTransport(clientBuilder.build(), requestBuilder, baseUri, sseEndpoint,
319-
objectMapper);
380+
objectMapper, httpRequestCustomizer);
320381
}
321382

322383
}
323384

324385
@Override
325386
public Mono<Void> connect(Function<Mono<JSONRPCMessage>, Mono<JSONRPCMessage>> handler) {
387+
var uri = Utils.resolveUri(this.baseUri, this.sseEndpoint);
326388

327-
return Mono.create(sink -> {
328-
329-
HttpRequest request = requestBuilder.copy()
330-
.uri(Utils.resolveUri(this.baseUri, this.sseEndpoint))
389+
return Mono.defer(() -> {
390+
var builder = requestBuilder.copy()
391+
.uri(uri)
331392
.header("Accept", "text/event-stream")
332393
.header("Cache-Control", "no-cache")
333-
.GET()
334-
.build();
335-
394+
.GET();
395+
return Mono.from(this.httpRequestCustomizer.customize(builder, "GET", uri, null));
396+
}).flatMap(requestBuilder -> Mono.create(sink -> {
336397
Disposable connection = Flux.<ResponseEvent>create(sseSink -> this.httpClient
337-
.sendAsync(request, responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
398+
.sendAsync(requestBuilder.build(),
399+
responseInfo -> ResponseSubscribers.sseToBodySubscriber(responseInfo, sseSink))
338400
.exceptionallyCompose(e -> {
339401
sseSink.error(e);
340402
return CompletableFuture.failedFuture(e);
@@ -397,7 +459,7 @@ else if (MESSAGE_EVENT_TYPE.equals(responseEvent.sseEvent().event())) {
397459
.subscribe();
398460

399461
this.sseSubscription.set(connection);
400-
});
462+
}));
401463
}
402464

403465
/**
@@ -453,13 +515,13 @@ private Mono<String> serializeMessage(final JSONRPCMessage message) {
453515

454516
private Mono<HttpResponse<String>> sendHttpPost(final String endpoint, final String body) {
455517
final URI requestUri = Utils.resolveUri(baseUri, endpoint);
456-
final HttpRequest request = this.requestBuilder.copy()
457-
.uri(requestUri)
458-
.POST(HttpRequest.BodyPublishers.ofString(body))
459-
.build();
460-
461-
// TODO: why discard the body?
462-
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
518+
return Mono.defer(() -> {
519+
var builder = this.requestBuilder.copy().uri(requestUri).POST(HttpRequest.BodyPublishers.ofString(body));
520+
return Mono.from(this.httpRequestCustomizer.customize(builder, "POST", requestUri, body));
521+
}).flatMap(customizedBuilder -> {
522+
var request = customizedBuilder.build();
523+
return Mono.fromFuture(httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()));
524+
});
463525
}
464526

465527
/**

0 commit comments

Comments
 (0)