8
8
import java .time .Duration ;
9
9
import java .util .UUID ;
10
10
import java .util .concurrent .ConcurrentHashMap ;
11
+ import java .util .concurrent .locks .ReentrantLock ;
11
12
12
13
import com .fasterxml .jackson .core .type .TypeReference ;
13
14
import com .fasterxml .jackson .databind .ObjectMapper ;
@@ -339,6 +340,12 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
339
340
340
341
private final SseBuilder sseBuilder ;
341
342
343
+ /**
344
+ * Lock to ensure thread-safe access to the SSE builder when sending messages.
345
+ * This prevents concurrent modifications that could lead to corrupted SSE events.
346
+ */
347
+ private final ReentrantLock sseBuilderLock = new ReentrantLock ();
348
+
342
349
/**
343
350
* Creates a new session transport with the specified ID and SSE builder.
344
351
* @param sessionId The unique identifier for this session
@@ -358,6 +365,7 @@ private class WebMvcMcpSessionTransport implements McpServerTransport {
358
365
@ Override
359
366
public Mono <Void > sendMessage (McpSchema .JSONRPCMessage message ) {
360
367
return Mono .fromRunnable (() -> {
368
+ sseBuilderLock .lock ();
361
369
try {
362
370
String jsonText = objectMapper .writeValueAsString (message );
363
371
sseBuilder .id (sessionId ).event (MESSAGE_EVENT_TYPE ).data (jsonText );
@@ -367,6 +375,9 @@ public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
367
375
logger .error ("Failed to send message to session {}: {}" , sessionId , e .getMessage ());
368
376
sseBuilder .error (e );
369
377
}
378
+ finally {
379
+ sseBuilderLock .unlock ();
380
+ }
370
381
});
371
382
}
372
383
@@ -390,13 +401,17 @@ public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
390
401
public Mono <Void > closeGracefully () {
391
402
return Mono .fromRunnable (() -> {
392
403
logger .debug ("Closing session transport: {}" , sessionId );
404
+ sseBuilderLock .lock ();
393
405
try {
394
406
sseBuilder .complete ();
395
407
logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
396
408
}
397
409
catch (Exception e ) {
398
410
logger .warn ("Failed to complete SSE builder for session {}: {}" , sessionId , e .getMessage ());
399
411
}
412
+ finally {
413
+ sseBuilderLock .unlock ();
414
+ }
400
415
});
401
416
}
402
417
@@ -405,13 +420,17 @@ public Mono<Void> closeGracefully() {
405
420
*/
406
421
@ Override
407
422
public void close () {
423
+ sseBuilderLock .lock ();
408
424
try {
409
425
sseBuilder .complete ();
410
426
logger .debug ("Successfully completed SSE builder for session {}" , sessionId );
411
427
}
412
428
catch (Exception e ) {
413
429
logger .warn ("Failed to complete SSE builder for session {}: {}" , sessionId , e .getMessage ());
414
430
}
431
+ finally {
432
+ sseBuilderLock .unlock ();
433
+ }
415
434
}
416
435
417
436
}
0 commit comments