-
Notifications
You must be signed in to change notification settings - Fork 478
Description
Description
When a stdio MCP client sends multiple JSON-RPC messages then closes stdin, only the initialize response is flushed. Subsequent responses (e.g., tools/call) are processed by the handler but never reach stdout. This breaks non-interactive MCP clients that batch requests.
Root Cause
In service.rs, serve_inner's main event loop breaks immediately on stdin EOF:
m = transport.receive() => {
if let Some(m) = m {
Event::PeerMessage(m)
} else {
tracing::info!("input stream terminated");
break QuitReason::Closed // <-- exits immediately
}
}Request handlers are spawned via tokio::spawn and send responses back through sink_proxy_tx. When stdin closes, the loop breaks before those spawned tasks complete. The sink_proxy_rx reader and transport are then dropped, so the responses are lost.
The sequence:
- Client sends
initialize,notifications/initialized,tools/call, then closes stdin initializeis handled synchronously during the init phase — its response is flushedtools/callarrives inserve_inner, which spawns a handler tasktransport.receive()returnsNone(stdin EOF) — the loop breaks- The handler task completes and sends to
sink_proxy_tx, but nobody is readingsink_proxy_rxanymore
Suggested Fix
After the loop breaks with QuitReason::Closed, drain pending responses from sink_proxy_rx and write them to the transport before calling transport.close(). Something like:
// After the main loop breaks:
// Drain any pending responses from in-flight handler tasks
while let Ok(msg) = sink_proxy_rx.try_recv() {
let _ = transport.send(msg).await;
}Or wait with a timeout for spawned handler tasks to complete before closing.
Minimal Reproduction
Cargo.toml:
[package]
name = "rmcp-stdin-eof-repro"
version = "0.1.0"
edition = "2021"
[dependencies]
rmcp = { version = "1.2", features = ["server", "transport-io"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "io-util", "time"] }
serde_json = "1"src/main.rs:
use rmcp::handler::server::ServerHandler;
use rmcp::model::*;
use rmcp::service::{RequestContext, RoleServer};
use rmcp::ServiceExt;
struct EchoServer;
impl ServerHandler for EchoServer {
async fn initialize(
&self,
_request: InitializeRequestParams,
_context: RequestContext<RoleServer>,
) -> Result<InitializeResult, ErrorData> {
Ok(InitializeResult::new(
ServerCapabilities::builder().enable_tools().build(),
))
}
async fn list_tools(
&self,
_request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
) -> Result<ListToolsResult, ErrorData> {
Ok(ListToolsResult { ..Default::default() })
}
async fn call_tool(
&self,
request: CallToolRequestParams,
_context: RequestContext<RoleServer>,
) -> Result<CallToolResult, ErrorData> {
let msg = request
.arguments
.as_ref()
.and_then(|a| a.get("msg"))
.and_then(|v| v.as_str())
.unwrap_or("(empty)");
Ok(CallToolResult::success(vec![Content::text(msg)]))
}
}
#[tokio::main]
async fn main() {
let transport = rmcp::transport::io::stdio();
let service = EchoServer.serve(transport).await.expect("serve failed");
service.waiting().await.expect("waiting failed");
}Test (add tokio = { ..., features = ["process"] } to dev-dependencies):
#[cfg(test)]
mod tests {
use std::process::Stdio;
use tokio::io::AsyncWriteExt;
use tokio::process::Command;
#[tokio::test]
async fn tool_response_survives_stdin_close() {
Command::new("cargo")
.args(["build", "--quiet"])
.current_dir(env!("CARGO_MANIFEST_DIR"))
.status().await.unwrap();
let mut child = Command::new("./target/debug/rmcp-stdin-eof-repro")
.current_dir(env!("CARGO_MANIFEST_DIR"))
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::null())
.spawn().unwrap();
let mut stdin = child.stdin.take().unwrap();
// initialize
stdin.write_all(br#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2024-11-05","capabilities":{},"clientInfo":{"name":"test","version":"0.1"}}}"#).await.unwrap();
stdin.write_all(b"\n").await.unwrap();
// notifications/initialized
stdin.write_all(br#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#).await.unwrap();
stdin.write_all(b"\n").await.unwrap();
// tools/call
stdin.write_all(br#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"echo","arguments":{"msg":"hello"}}}"#).await.unwrap();
stdin.write_all(b"\n").await.unwrap();
stdin.flush().await.unwrap();
// Let the server read all messages, then close stdin
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
drop(stdin);
let output = child.wait_with_output().await.unwrap();
let stdout_str = String::from_utf8_lossy(&output.stdout);
let responses: Vec<&str> = stdout_str.lines().collect();
// BUG: only 1 response (initialize). tools/call response is dropped.
assert!(responses.len() >= 2,
"Expected 2 responses, got {}. stdout:\n{}", responses.len(), stdout_str);
}
}Observed Output
=== stdout (1 lines) ===
[0] {"jsonrpc":"2.0","id":1,"result":{"protocolVersion":"2024-11-05","capabilities":{"tools":{}},"serverInfo":{"name":"rmcp","version":"1.2.0"}}}
Only the initialize response appears. The tools/call response (id:2) is lost.
Expected Output
Both responses should appear on stdout before the server exits.
Version
- rmcp 1.2.0
- tokio 1.x
- macOS / Linux