Skip to content

Commit 8a2e978

Browse files
committed
refactor: wrap handler invocations with Mono.defer for lazy evaluation
- Wrap consumer, tool, resource, prompt, and completion handler calls with Mono.defer() - Ensures proper lazy evaluation and error handling in reactive streams Signed-off-by: Christian Tzolov <[email protected]>
1 parent f4eb55b commit 8a2e978

File tree

2 files changed

+5
-9
lines changed

2 files changed

+5
-9
lines changed

mcp/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ private McpNotificationHandler asyncRootsListChangedNotificationHandler(
296296
List<BiFunction<McpAsyncServerExchange, List<McpSchema.Root>, Mono<Void>>> rootsChangeConsumers) {
297297
return (exchange, params) -> exchange.listRoots()
298298
.flatMap(listRootsResult -> Flux.fromIterable(rootsChangeConsumers)
299-
.flatMap(consumer -> consumer.apply(exchange, listRootsResult.roots()))
299+
.flatMap(consumer -> Mono.defer(() -> consumer.apply(exchange, listRootsResult.roots())))
300300
.onErrorResume(error -> {
301301
logger.error("Error handling roots list change notification", error);
302302
return Mono.empty();
@@ -506,7 +506,7 @@ private McpRequestHandler<CallToolResult> toolsCallRequestHandler() {
506506
return Mono.error(new McpError("Tool not found: " + callToolRequest.name()));
507507
}
508508

509-
return toolSpecification.map(tool -> tool.callHandler().apply(exchange, callToolRequest))
509+
return toolSpecification.map(tool -> Mono.defer(() -> tool.callHandler().apply(exchange, callToolRequest)))
510510
.orElse(Mono.error(new McpError("Tool not found: " + callToolRequest.name())));
511511
};
512512
}
@@ -634,7 +634,7 @@ private McpRequestHandler<McpSchema.ReadResourceResult> resourcesReadRequestHand
634634
.findFirst()
635635
.orElseThrow(() -> new McpError("Resource not found: " + resourceUri));
636636

637-
return specification.readHandler().apply(exchange, resourceRequest);
637+
return Mono.defer(() -> specification.readHandler().apply(exchange, resourceRequest));
638638
};
639639
}
640640

@@ -740,7 +740,7 @@ private McpRequestHandler<McpSchema.GetPromptResult> promptsGetRequestHandler()
740740
return Mono.error(new McpError("Prompt not found: " + promptRequest.name()));
741741
}
742742

743-
return specification.promptHandler().apply(exchange, promptRequest);
743+
return Mono.defer(() -> specification.promptHandler().apply(exchange, promptRequest));
744744
};
745745
}
746746

@@ -845,7 +845,7 @@ private McpRequestHandler<McpSchema.CompleteResult> completionCompleteRequestHan
845845
return Mono.error(new McpError("AsyncCompletionSpecification not found: " + request.ref()));
846846
}
847847

848-
return specification.completionHandler().apply(exchange, request);
848+
return Mono.defer(() -> specification.completionHandler().apply(exchange, request));
849849
};
850850
}
851851

mcp/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -179,10 +179,6 @@ public Mono<Void> responseStream(McpSchema.JSONRPCRequest jsonrpcRequest, McpStr
179179
})
180180
.flatMap(transport::sendMessage)
181181
.then(transport.closeGracefully());
182-
}).onErrorResume(error -> {
183-
return transport.sendMessage(new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, jsonrpcRequest.id(),
184-
null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR,
185-
error.getMessage(), null)));
186182
});
187183
}
188184

0 commit comments

Comments
 (0)