7
7
import java .io .IOException ;
8
8
import java .time .Duration ;
9
9
import java .util .concurrent .ConcurrentHashMap ;
10
+ import java .util .concurrent .locks .ReentrantLock ;
10
11
import java .util .function .Function ;
11
12
12
13
import com .fasterxml .jackson .core .type .TypeReference ;
@@ -230,9 +231,6 @@ private ServerResponse handleGet(ServerRequest request) {
230
231
231
232
try {
232
233
return ServerResponse .sse (sseBuilder -> {
233
- sseBuilder .onComplete (() -> {
234
- logger .debug ("SSE connection completed for session: {}" , sessionId );
235
- });
236
234
sseBuilder .onTimeout (() -> {
237
235
logger .debug ("SSE connection timed out for session: {}" , sessionId );
238
236
});
@@ -264,8 +262,11 @@ private ServerResponse handleGet(ServerRequest request) {
264
262
// Establish new listening stream
265
263
McpStreamableServerSession .McpStreamableServerSessionStream listeningStream = session
266
264
.listeningStream (sessionTransport );
267
- // Note: WebMVC SSE doesn't have onCancel, cleanup will happen in
268
- // onComplete/onTimeout
265
+
266
+ sseBuilder .onComplete (() -> {
267
+ logger .debug ("SSE connection completed for session: {}" , sessionId );
268
+ listeningStream .close ();
269
+ });
269
270
}
270
271
}, Duration .ZERO );
271
272
}
@@ -415,13 +416,22 @@ private ServerResponse handleDelete(ServerRequest request) {
415
416
/**
416
417
* Implementation of McpStreamableServerTransport for WebMVC SSE sessions. This class
417
418
* handles the transport-level communication for a specific client session.
419
+ *
420
+ * <p>
421
+ * This class is thread-safe and uses a ReentrantLock to synchronize access to the
422
+ * underlying SSE builder to prevent race conditions when multiple threads attempt to
423
+ * send messages concurrently.
418
424
*/
419
425
private class WebMvcStreamableMcpSessionTransport implements McpStreamableServerTransport {
420
426
421
427
private final String sessionId ;
422
428
423
429
private final SseBuilder sseBuilder ;
424
430
431
+ private final ReentrantLock lock = new ReentrantLock ();
432
+
433
+ private volatile boolean closed = false ;
434
+
425
435
/**
426
436
* Creates a new session transport with the specified ID and SSE builder.
427
437
* @param sessionId The unique identifier for this session
@@ -453,7 +463,18 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
453
463
@ Override
454
464
public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message , String messageId ) {
455
465
return Mono .fromRunnable (() -> {
466
+ if (this .closed ) {
467
+ logger .debug ("Attempted to send message to closed session: {}" , this .sessionId );
468
+ return ;
469
+ }
470
+
471
+ this .lock .lock ();
456
472
try {
473
+ if (this .closed ) {
474
+ logger .debug ("Session {} was closed during message send attempt" , this .sessionId );
475
+ return ;
476
+ }
477
+
457
478
String jsonText = objectMapper .writeValueAsString (message );
458
479
this .sseBuilder .id (messageId != null ? messageId : this .sessionId )
459
480
.event (MESSAGE_EVENT_TYPE )
@@ -462,7 +483,16 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message, String messageId
462
483
}
463
484
catch (Exception e ) {
464
485
logger .error ("Failed to send message to session {}: {}" , this .sessionId , e .getMessage ());
465
- this .sseBuilder .error (e );
486
+ try {
487
+ this .sseBuilder .error (e );
488
+ }
489
+ catch (Exception errorException ) {
490
+ logger .error ("Failed to send error to SSE builder for session {}: {}" , this .sessionId ,
491
+ errorException .getMessage ());
492
+ }
493
+ }
494
+ finally {
495
+ this .lock .unlock ();
466
496
}
467
497
});
468
498
}
@@ -486,14 +516,7 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
486
516
@ Override
487
517
public Mono <Void > closeGracefully () {
488
518
return Mono .fromRunnable (() -> {
489
- logger .debug ("Closing streamable session transport: {}" , this .sessionId );
490
- try {
491
- this .sseBuilder .complete ();
492
- logger .debug ("Successfully completed SSE builder for session {}" , this .sessionId );
493
- }
494
- catch (Exception e ) {
495
- logger .warn ("Failed to complete SSE builder for session {}: {}" , this .sessionId , e .getMessage ());
496
- }
519
+ WebMvcStreamableMcpSessionTransport .this .close ();
497
520
});
498
521
}
499
522
@@ -502,13 +525,24 @@ public Mono<Void> closeGracefully() {
502
525
*/
503
526
@ Override
504
527
public void close () {
528
+ this .lock .lock ();
505
529
try {
530
+ if (this .closed ) {
531
+ logger .debug ("Session transport {} already closed" , this .sessionId );
532
+ return ;
533
+ }
534
+
535
+ this .closed = true ;
536
+
506
537
this .sseBuilder .complete ();
507
538
logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
508
539
}
509
540
catch (Exception e ) {
510
541
logger .warn ("Failed to complete SSE builder for session {}: {}" , sessionId , e .getMessage ());
511
542
}
543
+ finally {
544
+ this .lock .unlock ();
545
+ }
512
546
}
513
547
514
548
}
0 commit comments