Skip to content

Commit 361c6cc

Browse files
author
zoey
committed
STDIO transport (#1)
The MCP SDK requires a reliable STDIO (Standard Input/Output) transport implementation for communicating with external processes. This transport needs to handle message passing, error conditions, environment variables, and proper process lifecycle management in a way that's compatible with the Model Context Protocol's JSON-RPC communication pattern. This PR implements `Hermes.Transport.STDIO`, a GenServer-based transport module that: 1. Spawns external processes using Erlang ports 2. Manages bidirectional message passing between client and port 3. Handles port lifecycle events (data, exit, close) 4. Forwards received data to client processes 5. Provides configurable environment variable support The implementation uses Erlang's built-in port system with proper error handling and process monitoring to ensure reliable operation. The GenServer-based design was chosen for several key reasons: 1. **Process Isolation**: Each transport runs in its own process, preventing failures from affecting other connections 2. **State Management**: GenServer handles the complex state transitions of port lifecycle cleanly 3. **Message Passing**: The GenServer model fits naturally with the asynchronous nature of port communication 4. **Error Handling**: The supervision model allows proper recovery from port failures 5. **Security**: The implementation carefully manages environment variable passing, with controlled inheritance of safe variables The transport follows the "delayed reply" pattern for synchronous-feeling APIs similar to the main client, maintaining a clean interface while handling the asynchronous nature of port communication under the hood. Tests use real commands rather than attempting to mock unmockable Erlang modules like Port and System, focusing on verifying functionality over implementation details.
1 parent 6b97987 commit 361c6cc

File tree

16 files changed

+1064
-18
lines changed

16 files changed

+1064
-18
lines changed

.dialyzerignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
lib/hermes/transport/stdio.ex:81:guard_fail

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@ hermes_mcp-*.tar
2424

2525
# Dialyzer
2626
/priv/plts/
27+
28+
# Python specific
29+
/priv/dev/**/__pycache__/
30+
/priv/dev/**/.venv/

CHANGELOG.md

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,35 @@
22

33
All notable changes to this project are documented in this file.
44

5-
## [0.1.0] - 2023-09-18
5+
## [0.1.0](https://github.com/cloudwalk/hermes-mcp) - 2025-02-26
66

77
### Added
8-
- Initial release with core features
8+
- Implemented STDIO transport (#1) for MCP communication
9+
- Support for bidirectional communication via standard I/O
10+
- Automatic process monitoring and recovery
11+
- Environment variable handling for cross-platform support
12+
- Integration test utilities in Mix tasks
13+
14+
- Created stateful client interface (#6)
15+
- Robust GenServer implementation for MCP client
16+
- Automatic initialization and protocol handshake
17+
- Synchronous-feeling API over asynchronous transport
18+
- Support for all MCP operations (ping, resources, prompts, tools)
19+
- Proper error handling and logging
20+
- Capability negotiation and management
21+
22+
- Developed JSON-RPC message parsing (#5)
23+
- Schema-based validation of MCP messages
24+
- Support for requests, responses, notifications, and errors
25+
- Comprehensive test suite for message handling
26+
- Encoding/decoding functions with proper validation
27+
28+
- Established core architecture and client API
29+
- MCP protocol implementation following specification
30+
- Client struct for maintaining connection state
31+
- Request/response correlation with unique IDs
32+
- Initial transport abstraction layer
33+
34+
### Documentation
35+
- Added detailed RFC document describing the library architecture
36+
- Enhanced README with project overview and installation instructions

flake.nix

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
in {
2828
default = mkShell {
2929
name = "hermes-ex";
30-
packages = [elixir_1_18];
30+
packages = with pkgs; [elixir_1_18 uv just];
3131
};
3232
});
3333
};

justfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
start-echo-server:
2+
source priv/dev/echo/.venv/bin/activate && \
3+
mcp run priv/dev/echo/index.py

lib/hermes/client.ex

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ defmodule Hermes.Client do
4747

4848
defschema :parse_options, [
4949
{:name, {:atom, {:default, __MODULE__}}},
50-
{:transport, {:required, :atom}},
50+
{:transport, {:required, {:either, {:pid, :atom}}}},
5151
{:client_info, {:required, :map}},
5252
{:capabilities, {:map, {:default, %{"resources" => %{}, "tools" => %{}}}}},
5353
{:protocol_version, {:string, {:default, @default_protocol_version}}},
@@ -283,14 +283,14 @@ defmodule Hermes.Client do
283283
end
284284

285285
@impl true
286-
def handle_info({:response, response_data}, %{pending_requests: pending_requests} = state) do
286+
def handle_info({:response, response_data}, state) do
287287
case Message.decode(response_data) do
288288
{:ok, [error]} when Message.is_error(error) ->
289289
Logger.error("Received error response: #{inspect(error)}")
290-
pending = Map.get(pending_requests, error["id"])
291-
{:noreply, handle_error(error, pending, state)}
290+
{:noreply, handle_error(error, error["id"], state)}
292291

293292
{:ok, [response]} when Message.is_response(response) ->
293+
Logger.info("Received response: #{response["id"]}")
294294
{:noreply, handle_response(response, response["id"], state)}
295295

296296
{:ok, [notification]} when Message.is_notification(notification) ->
@@ -301,22 +301,27 @@ defmodule Hermes.Client do
301301
Logger.error("Failed to decode response: #{inspect(reason)}")
302302
{:noreply, state}
303303
end
304+
rescue
305+
e ->
306+
err = Exception.format(:error, e, __STACKTRACE__)
307+
Logger.error("Failed to handle response: #{err}")
308+
{:noreply, state}
304309
end
305310

306311
# Response handling
307312

308-
defp handle_error(response, nil, state) do
309-
Logger.warning("Received error response for unknown request ID: #{response["id"]}")
310-
state
311-
end
312-
313-
defp handle_error(%{"error" => error} = response, {from, _}, state) do
314-
%{pending_requests: pending} = state
313+
defp handle_error(%{"error" => error, "id" => id}, id, state) do
314+
{{from, _method}, pending} = Map.pop(state.pending_requests, id)
315315

316316
# unblocks original caller
317317
GenServer.reply(from, {:error, error})
318318

319-
%{state | pending_requests: Map.delete(pending, response["id"])}
319+
%{state | pending_requests: pending}
320+
end
321+
322+
defp handle_error(response, _, state) do
323+
Logger.warning("Received error response for unknown request ID: #{response["id"]}")
324+
state
320325
end
321326

322327
defp handle_response(%{"id" => id, "result" => %{"serverInfo" => _} = result}, id, state) do
@@ -339,7 +344,11 @@ defmodule Hermes.Client do
339344
{{from, method}, pending} = Map.pop(state.pending_requests, id)
340345

341346
# unblocks original caller
342-
GenServer.reply(from, if(method == "ping", do: :pong, else: {:ok, result}))
347+
cond do
348+
method == "ping" -> GenServer.reply(from, :pong)
349+
result["isError"] -> GenServer.reply(from, {:error, result})
350+
true -> GenServer.reply(from, {:ok, result})
351+
end
343352

344353
%{state | pending_requests: pending}
345354
end

lib/hermes/transport/stdio.ex

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
defmodule Hermes.Transport.STDIO do
2+
@moduledoc """
3+
A transport implementation that uses standard input/output.
4+
"""
5+
6+
use GenServer
7+
8+
import Peri
9+
10+
alias Hermes.Transport.Behaviour, as: Transport
11+
12+
require Logger
13+
14+
@behaviour Transport
15+
16+
@type params_t :: Enumerable.t(option)
17+
@type option ::
18+
{:command, Path.t()}
19+
| {:args, list(String.t()) | nil}
20+
| {:env, map() | nil}
21+
| {:cwd, Path.t() | nil}
22+
| Supervisor.init_option()
23+
24+
defschema :options_schema, %{
25+
name: {:atom, {:default, __MODULE__}},
26+
client: {:required, {:either, {:pid, :atom}}},
27+
command: {:required, :string},
28+
args: {{:list, :string}, {:default, nil}},
29+
env: {:map, {:default, nil}},
30+
cwd: {:string, {:default, nil}}
31+
}
32+
33+
@win32_default_env [
34+
"APPDATA",
35+
"HOMEDRIVE",
36+
"HOMEPATH",
37+
"LOCALAPPDATA",
38+
"PATH",
39+
"PROCESSOR_ARCHITECTURE",
40+
"SYSTEMDRIVE",
41+
"SYSTEMROOT",
42+
"TEMP",
43+
"USERNAME",
44+
"USERPROFILE"
45+
]
46+
@unix_default_env ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"]
47+
48+
@impl Transport
49+
@spec start_link(params_t) :: Supervisor.on_start()
50+
def start_link(opts \\ []) do
51+
opts = options_schema!(opts)
52+
GenServer.start_link(__MODULE__, Map.new(opts), name: opts[:name])
53+
end
54+
55+
@impl Transport
56+
def send_message(pid \\ __MODULE__, message) when is_binary(message) do
57+
GenServer.call(pid, {:send, message})
58+
end
59+
60+
@impl GenServer
61+
def init(%{} = opts) do
62+
state = Map.merge(opts, %{port: nil, ref: nil})
63+
64+
{:ok, state, {:continue, :spawn}}
65+
end
66+
67+
@impl GenServer
68+
def handle_continue(:spawn, state) do
69+
if cmd = System.find_executable(state.command) do
70+
port = spawn_port(cmd, state)
71+
ref = Port.monitor(port)
72+
73+
{:noreply, %{state | port: port, ref: ref}}
74+
else
75+
{:stop, {:error, "Command not found: #{state.command}"}, state}
76+
end
77+
end
78+
79+
@impl GenServer
80+
def handle_call({:send, message}, _, %{port: port} = state) when is_port(port) do
81+
result = if Port.command(port, message), do: :ok, else: {:error, :port_not_connected}
82+
{:reply, result, state}
83+
end
84+
85+
def handle_call({:send, _message}, _, state) do
86+
{:reply, {:error, :port_not_connected}, state}
87+
end
88+
89+
@impl GenServer
90+
def handle_info({port, {:data, data}}, %{port: port} = state) do
91+
Logger.info("Received data: #{inspect(data)}")
92+
Process.send(state.client, {:response, data}, [:noconnect])
93+
{:noreply, state}
94+
end
95+
96+
def handle_info({port, :closed}, %{port: port} = state) do
97+
Logger.warning("Port closed, restarting")
98+
{:stop, :normal, state}
99+
end
100+
101+
def handle_info({port, {:exit_status, status}}, %{port: port} = state) do
102+
Logger.warning("Port exited with status: #{status}")
103+
{:stop, status, state}
104+
end
105+
106+
def handle_info({:DOWN, ref, :port, port, reason}, %{ref: ref, port: port} = state) do
107+
Logger.error("Port monitor DOWN: #{inspect(reason)}")
108+
{:stop, reason, state}
109+
end
110+
111+
def handle_info({:EXIT, port, reason}, %{port: port} = state) do
112+
Logger.error("Port exited: #{inspect(reason)}")
113+
{:stop, reason, state}
114+
end
115+
116+
@impl GenServer
117+
def handle_cast(:close_port, %{port: port} = state) do
118+
Port.close(port)
119+
{:stop, :normal, state}
120+
end
121+
122+
defp spawn_port(cmd, state) do
123+
default_env = get_default_env()
124+
env = if is_nil(state.env), do: default_env, else: Map.merge(default_env, state.env)
125+
env = normalize_env_for_erlang(env)
126+
127+
opts =
128+
[:binary]
129+
|> then(&if is_nil(state.args), do: &1, else: Enum.concat(&1, args: state.args))
130+
|> then(&if is_nil(state.env), do: &1, else: Enum.concat(&1, env: env))
131+
|> then(&if is_nil(state.cwd), do: &1, else: Enum.concat(&1, cd: state.cwd))
132+
133+
Port.open({:spawn_executable, cmd}, opts)
134+
end
135+
136+
defp get_default_env do
137+
default_env = if :os.type() == {:win32, :nt}, do: @win32_default_env, else: @unix_default_env
138+
139+
System.get_env()
140+
|> Enum.filter(fn {k, _} -> Enum.member?(default_env, k) end)
141+
# remove functions, for security risks
142+
|> Enum.reject(fn {_, v} -> String.starts_with?(v, "()") end)
143+
|> Enum.into(%{})
144+
end
145+
146+
defp normalize_env_for_erlang(%{} = env) do
147+
env
148+
|> Map.new(fn {k, v} -> {to_charlist(k), to_charlist(v)} end)
149+
|> Enum.into([])
150+
end
151+
end

0 commit comments

Comments
 (0)