@@ -193,7 +193,8 @@ private Mono<ServerResponse> handleGet(ServerRequest request) {
193
193
194
194
return Mono .defer (() -> {
195
195
if (!request .headers ().asHttpHeaders ().containsKey ("mcp-session-id" )) {
196
- return ServerResponse .badRequest ().build (); // TODO: say we need a session id
196
+ return ServerResponse .badRequest ().build (); // TODO: say we need a session
197
+ // id
197
198
}
198
199
199
200
String sessionId = request .headers ().asHttpHeaders ().getFirst ("mcp-session-id" );
@@ -206,15 +207,20 @@ private Mono<ServerResponse> handleGet(ServerRequest request) {
206
207
207
208
if (request .headers ().asHttpHeaders ().containsKey ("mcp-last-id" )) {
208
209
String lastId = request .headers ().asHttpHeaders ().getFirst ("mcp-last-id" );
209
- return ServerResponse .ok ().contentType (MediaType .TEXT_EVENT_STREAM ).body (session .replay (lastId ), ServerSentEvent .class );
210
+ return ServerResponse .ok ()
211
+ .contentType (MediaType .TEXT_EVENT_STREAM )
212
+ .body (session .replay (lastId ), ServerSentEvent .class );
210
213
}
211
214
212
- return ServerResponse .ok ().contentType (MediaType .TEXT_EVENT_STREAM )
213
- .body (Flux .<ServerSentEvent <?>>create (sink -> {
214
- WebFluxStreamableMcpSessionTransport sessionTransport = new WebFluxStreamableMcpSessionTransport (sink );
215
- McpStreamableServerSession .McpStreamableServerSessionStream listeningStream = session .listeningStream (sessionTransport );
216
- sink .onDispose (listeningStream ::close );
217
- }), ServerSentEvent .class );
215
+ return ServerResponse .ok ()
216
+ .contentType (MediaType .TEXT_EVENT_STREAM )
217
+ .body (Flux .<ServerSentEvent <?>>create (sink -> {
218
+ WebFluxStreamableMcpSessionTransport sessionTransport = new WebFluxStreamableMcpSessionTransport (
219
+ sink );
220
+ McpStreamableServerSession .McpStreamableServerSessionStream listeningStream = session
221
+ .listeningStream (sessionTransport );
222
+ sink .onDispose (listeningStream ::close );
223
+ }), ServerSentEvent .class );
218
224
219
225
}).contextWrite (ctx -> ctx .put (McpTransportContext .KEY , transportContext ));
220
226
}
@@ -244,11 +250,18 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
244
250
return request .bodyToMono (String .class ).<ServerResponse >flatMap (body -> {
245
251
try {
246
252
McpSchema .JSONRPCMessage message = McpSchema .deserializeJsonRpcMessage (objectMapper , body );
247
- if (message instanceof McpSchema .JSONRPCRequest jsonrpcRequest && jsonrpcRequest .method ().equals (McpSchema .METHOD_INITIALIZE )) {
248
- McpSchema .InitializeRequest initializeRequest = objectMapper .convertValue (jsonrpcRequest .params (), new TypeReference <McpSchema .InitializeRequest >() {});
249
- McpStreamableServerSession .McpStreamableServerSessionInit init = this .sessionFactory .startSession (initializeRequest );
253
+ if (message instanceof McpSchema .JSONRPCRequest jsonrpcRequest
254
+ && jsonrpcRequest .method ().equals (McpSchema .METHOD_INITIALIZE )) {
255
+ McpSchema .InitializeRequest initializeRequest = objectMapper .convertValue (jsonrpcRequest .params (),
256
+ new TypeReference <McpSchema .InitializeRequest >() {
257
+ });
258
+ McpStreamableServerSession .McpStreamableServerSessionInit init = this .sessionFactory
259
+ .startSession (initializeRequest );
250
260
sessions .put (init .session ().getId (), init .session ());
251
- return init .initResult ().flatMap (initResult -> ServerResponse .ok ().header ("mcp-session-id" , init .session ().getId ()).bodyValue (initResult ));
261
+ return init .initResult ()
262
+ .flatMap (initResult -> ServerResponse .ok ()
263
+ .header ("mcp-session-id" , init .session ().getId ())
264
+ .bodyValue (initResult ));
252
265
}
253
266
254
267
if (!request .headers ().asHttpHeaders ().containsKey ("sessionId" )) {
@@ -260,26 +273,30 @@ private Mono<ServerResponse> handlePost(ServerRequest request) {
260
273
261
274
if (session == null ) {
262
275
return ServerResponse .status (HttpStatus .NOT_FOUND )
263
- .bodyValue (new McpError ("Session not found: " + sessionId ));
276
+ .bodyValue (new McpError ("Session not found: " + sessionId ));
264
277
}
265
278
266
279
if (message instanceof McpSchema .JSONRPCResponse jsonrpcResponse ) {
267
280
return session .accept (jsonrpcResponse ).then (ServerResponse .accepted ().build ());
268
- } else if (message instanceof McpSchema .JSONRPCNotification jsonrpcNotification ) {
281
+ }
282
+ else if (message instanceof McpSchema .JSONRPCNotification jsonrpcNotification ) {
269
283
return session .accept (jsonrpcNotification ).then (ServerResponse .accepted ().build ());
270
- } else if (message instanceof McpSchema .JSONRPCRequest jsonrpcRequest ) {
271
- return ServerResponse .ok ().contentType (MediaType .TEXT_EVENT_STREAM )
272
- .body (Flux .<ServerSentEvent <?>>create (sink -> {
273
- WebFluxStreamableMcpSessionTransport st = new WebFluxStreamableMcpSessionTransport (sink );
274
- Mono <Void > stream = session .responseStream (jsonrpcRequest , st );
275
- Disposable streamSubscription = stream
276
- .doOnError (err -> sink .error (err ))
277
- .contextWrite (sink .contextView ())
278
- .subscribe ();
279
- sink .onCancel (streamSubscription );
280
- }), ServerSentEvent .class );
281
- } else {
282
- return ServerResponse .status (HttpStatus .INTERNAL_SERVER_ERROR ).bodyValue (new McpError ("Unknown message type" ));
284
+ }
285
+ else if (message instanceof McpSchema .JSONRPCRequest jsonrpcRequest ) {
286
+ return ServerResponse .ok ()
287
+ .contentType (MediaType .TEXT_EVENT_STREAM )
288
+ .body (Flux .<ServerSentEvent <?>>create (sink -> {
289
+ WebFluxStreamableMcpSessionTransport st = new WebFluxStreamableMcpSessionTransport (sink );
290
+ Mono <Void > stream = session .responseStream (jsonrpcRequest , st );
291
+ Disposable streamSubscription = stream .doOnError (err -> sink .error (err ))
292
+ .contextWrite (sink .contextView ())
293
+ .subscribe ();
294
+ sink .onCancel (streamSubscription );
295
+ }), ServerSentEvent .class );
296
+ }
297
+ else {
298
+ return ServerResponse .status (HttpStatus .INTERNAL_SERVER_ERROR )
299
+ .bodyValue (new McpError ("Unknown message type" ));
283
300
}
284
301
}
285
302
catch (IllegalArgumentException | IOException e ) {
@@ -393,8 +410,8 @@ public Builder messageEndpoint(String messageEndpoint) {
393
410
}
394
411
395
412
/**
396
- * Builds a new instance of {@link WebFluxStreamableServerTransportProvider} with the
397
- * configured settings.
413
+ * Builds a new instance of {@link WebFluxStreamableServerTransportProvider} with
414
+ * the configured settings.
398
415
* @return A new WebFluxSseServerTransportProvider instance
399
416
* @throws IllegalStateException if required parameters are not set
400
417
*/
0 commit comments