Skip to content

Commit 3ac4ca9

Browse files
authored
mcp: streamable.go: clarifications (#189)
Clarify or remove TODOs. Make some code clearer.
1 parent 64b5b91 commit 3ac4ca9

File tree

1 file changed

+34
-24
lines changed

1 file changed

+34
-24
lines changed

mcp/streamable.go

Lines changed: 34 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ type StreamableHTTPHandler struct {
4242
// StreamableHTTPOptions is a placeholder options struct for future
4343
// configuration of the StreamableHTTP handler.
4444
type StreamableHTTPOptions struct {
45-
// TODO(rfindley): support configurable session ID generation and event
46-
// store, session retention, and event retention.
45+
// TODO: support configurable session ID generation (?)
46+
// TODO: support session retention (?)
4747
}
4848

4949
// NewStreamableHTTPHandler returns a new [StreamableHTTPHandler].
@@ -61,7 +61,7 @@ func NewStreamableHTTPHandler(getServer func(*http.Request) *Server, opts *Strea
6161
// closeAll closes all ongoing sessions.
6262
//
6363
// TODO(rfindley): investigate the best API for callers to configure their
64-
// session lifecycle.
64+
// session lifecycle. (?)
6565
//
6666
// Should we allow passing in a session store? That would allow the handler to
6767
// be stateless.
@@ -118,7 +118,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
118118
return
119119
}
120120
h.sessionsMu.Lock()
121-
delete(h.sessions, session.id)
121+
delete(h.sessions, session.sessionID)
122122
h.sessionsMu.Unlock()
123123
session.Close()
124124
w.WriteHeader(http.StatusNoContent)
@@ -149,7 +149,7 @@ func (h *StreamableHTTPHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
149149
return
150150
}
151151
h.sessionsMu.Lock()
152-
h.sessions[s.id] = s
152+
h.sessions[s.sessionID] = s
153153
h.sessionsMu.Unlock()
154154
session = s
155155
}
@@ -176,7 +176,7 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp
176176
opts = &StreamableServerTransportOptions{}
177177
}
178178
t := &StreamableServerTransport{
179-
id: sessionID,
179+
sessionID: sessionID,
180180
incoming: make(chan jsonrpc.Message, 10),
181181
done: make(chan struct{}),
182182
streams: make(map[StreamID]*stream),
@@ -193,18 +193,18 @@ func NewStreamableServerTransport(sessionID string, opts *StreamableServerTransp
193193
}
194194

195195
func (t *StreamableServerTransport) SessionID() string {
196-
return t.id
196+
return t.sessionID
197197
}
198198

199199
// A StreamableServerTransport implements the [Transport] interface for a
200200
// single session.
201201
type StreamableServerTransport struct {
202202
nextStreamID atomic.Int64 // incrementing next stream ID
203203

204-
id string
205-
opts StreamableServerTransportOptions
206-
incoming chan jsonrpc.Message // messages from the client to the server
207-
done chan struct{}
204+
sessionID string
205+
opts StreamableServerTransportOptions
206+
incoming chan jsonrpc.Message // messages from the client to the server
207+
done chan struct{}
208208

209209
mu sync.Mutex
210210
// Sessions are closed exactly once.
@@ -217,17 +217,20 @@ type StreamableServerTransport struct {
217217
// Therefore, we use a logical connection ID to key the connection state, and
218218
// perform the accounting described below when incoming HTTP requests are
219219
// handled.
220-
//
221-
// TODO(rfindley): simplify.
222220

223221
// streams holds the logical streams for this session, keyed by their ID.
222+
// TODO: streams are never deleted, so the memory for a connection grows without
223+
// bound. If we deleted a stream when the response is sent, we would lose the ability
224+
// to replay if there was a cut just before the response was transmitted.
225+
// Perhaps we could have a TTL for streams that starts just after the response.
224226
streams map[StreamID]*stream
225227

226228
// requestStreams maps incoming requests to their logical stream ID.
227229
//
228230
// Lifecycle: requestStreams persists for the duration of the session.
229231
//
230-
// TODO(rfindley): clean up once requests are handled.
232+
// TODO(rfindley): clean up once requests are handled. See the TODO for streams
233+
// above.
231234
requestStreams map[jsonrpc.ID]StreamID
232235
}
233236

@@ -288,7 +291,7 @@ type StreamID int64
288291

289292
// Connect implements the [Transport] interface.
290293
//
291-
// TODO(rfindley): Connect should return a new object.
294+
// TODO(rfindley): Connect should return a new object. (Why?)
292295
func (s *StreamableServerTransport) Connect(context.Context) (Connection, error) {
293296
return s, nil
294297
}
@@ -411,6 +414,8 @@ func (t *StreamableServerTransport) servePOST(w http.ResponseWriter, req *http.R
411414
// TODO(rfindley): consider optimizing for a single incoming request, by
412415
// responding with application/json when there is only a single message in
413416
// the response.
417+
// (But how would we know there is only a single message? For example, couldn't
418+
// a progress notification be sent before a response on the same context?)
414419
return t.streamResponse(stream, w, req, -1)
415420
}
416421

@@ -437,7 +442,7 @@ func (t *StreamableServerTransport) streamResponse(stream *stream, w http.Respon
437442
return true
438443
}
439444

440-
w.Header().Set(sessionIDHeader, t.id)
445+
w.Header().Set(sessionIDHeader, t.sessionID)
441446
w.Header().Set("Content-Type", "text/event-stream") // Accept checked in [StreamableHTTPHandler]
442447
w.Header().Set("Cache-Control", "no-cache, no-transform")
443448
w.Header().Set("Connection", "keep-alive")
@@ -486,7 +491,9 @@ stream:
486491
// If all requests have been handled and replied to, we should terminate this connection.
487492
// "After the JSON-RPC response has been sent, the server SHOULD close the SSE stream."
488493
// §6.4, https://modelcontextprotocol.io/specification/2025-06-18/basic/transports#sending-messages-to-the-server
489-
// TODO(jba,findleyr): why not terminate regardless of http method?
494+
// We only want to terminate POSTs, and GETs that are replaying. The general-purpose GET
495+
// (stream ID 0) will never have requests, and should remain open indefinitely.
496+
// TODO: implement the GET case.
490497
if req.Method == http.MethodPost && nOutstanding == 0 {
491498
if writes == 0 {
492499
// Spec: If the server accepts the input, the server MUST return HTTP
@@ -563,11 +570,12 @@ func (t *StreamableServerTransport) Read(ctx context.Context) (jsonrpc.Message,
563570
// Write implements the [Connection] interface.
564571
func (t *StreamableServerTransport) Write(ctx context.Context, msg jsonrpc.Message) error {
565572
// Find the incoming request that this write relates to, if any.
566-
var forRequest, replyTo jsonrpc.ID
573+
var forRequest jsonrpc.ID
574+
isResponse := false
567575
if resp, ok := msg.(*jsonrpc.Response); ok {
568576
// If the message is a response, it relates to its request (of course).
569577
forRequest = resp.ID
570-
replyTo = resp.ID
578+
isResponse = true
571579
} else {
572580
// Otherwise, we check to see if it request was made in the context of an
573581
// ongoing request. This may not be the case if the request way made with
@@ -611,10 +619,12 @@ func (t *StreamableServerTransport) Write(ctx context.Context, msg jsonrpc.Messa
611619
stream = t.streams[0]
612620
}
613621

622+
// TODO: if there is nothing to send these messages to (as would happen, for example, if forConn == 0
623+
// and the client never did a GET), then memory will grow without bound. Consider a mitigation.
614624
stream.outgoing = append(stream.outgoing, data)
615-
if replyTo.IsValid() {
625+
if isResponse {
616626
// Once we've put the reply on the queue, it's no longer outstanding.
617-
delete(stream.requests, replyTo)
627+
delete(stream.requests, forRequest)
618628
}
619629

620630
// Signal streamResponse that new work is available.
@@ -635,16 +645,16 @@ func (t *StreamableServerTransport) Close() error {
635645
if !t.isDone {
636646
t.isDone = true
637647
close(t.done)
638-
return t.opts.EventStore.SessionClosed(context.TODO(), t.id)
648+
// TODO: find a way to plumb a context here, or an event store with a long-running
649+
// close operation can take arbitrary time. Alternative: impose a fixed timeout here.
650+
return t.opts.EventStore.SessionClosed(context.TODO(), t.sessionID)
639651
}
640652
return nil
641653
}
642654

643655
// A StreamableClientTransport is a [Transport] that can communicate with an MCP
644656
// endpoint serving the streamable HTTP transport defined by the 2025-03-26
645657
// version of the spec.
646-
//
647-
// TODO(rfindley): support retries and resumption tokens.
648658
type StreamableClientTransport struct {
649659
url string
650660
opts StreamableClientTransportOptions

0 commit comments

Comments
 (0)