Skip to content

Commit ea6162c

Browse files
authored
mcp: incorporate EventStore into StreamableServerTransport (#156)
Use the EventStore interface to implement resumption on the server side.
1 parent 4529904 commit ea6162c

File tree

4 files changed

+128
-133
lines changed

4 files changed

+128
-133
lines changed

mcp/event.go

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -153,10 +153,9 @@ func scanEvents(r io.Reader) iter.Seq2[Event, error] {
153153
//
154154
// All of an EventStore's methods must be safe for use by multiple goroutines.
155155
type EventStore interface {
156-
// AppendEvent appends data for an outgoing event to given stream, which is part of the
157-
// given session. It returns the index of the event in the stream, suitable for constructing
158-
// an event ID to send to the client.
159-
AppendEvent(_ context.Context, sessionID string, _ StreamID, data []byte) (int, error)
156+
// Append appends data for an outgoing event to given stream, which is part of the
157+
// given session.
158+
Append(_ context.Context, sessionID string, _ StreamID, data []byte) error
160159

161160
// After returns an iterator over the data for the given session and stream, beginning
162161
// just after the given index.
@@ -165,16 +164,15 @@ type EventStore interface {
165164
// dropped; it must not return partial results.
166165
After(_ context.Context, sessionID string, _ StreamID, index int) iter.Seq2[[]byte, error]
167166

168-
// StreamClosed informs the store that the given stream is finished.
169-
// A store cannot rely on this method being called for cleanup. It should institute
170-
// additional mechanisms, such as timeouts, to reclaim storage.
171-
StreamClosed(_ context.Context, sessionID string, streamID StreamID) error
172-
173167
// SessionClosed informs the store that the given session is finished, along
174168
// with all of its streams.
175169
// A store cannot rely on this method being called for cleanup. It should institute
176170
// additional mechanisms, such as timeouts, to reclaim storage.
171+
//
177172
SessionClosed(_ context.Context, sessionID string) error
173+
174+
// There is no StreamClosed method. A server doesn't know when a stream is finished, because
175+
// the client can always send a GET with a Last-Event-ID referring to the stream.
178176
}
179177

180178
// A dataList is a list of []byte.
@@ -210,15 +208,6 @@ func (dl *dataList) removeFirst() int {
210208
return r
211209
}
212210

213-
// lastIndex returns the index of the last data item in dl.
214-
// It panics if there are none.
215-
func (dl *dataList) lastIndex() int {
216-
if len(dl.data) == 0 {
217-
panic("empty dataList")
218-
}
219-
return dl.first + len(dl.data) - 1
220-
}
221-
222211
// A MemoryEventStore is an [EventStore] backed by memory.
223212
type MemoryEventStore struct {
224213
mu sync.Mutex
@@ -267,9 +256,8 @@ func NewMemoryEventStore(opts *MemoryEventStoreOptions) *MemoryEventStore {
267256
}
268257
}
269258

270-
// AppendEvent implements [EventStore.AppendEvent] by recording data
271-
// in memory.
272-
func (s *MemoryEventStore) AppendEvent(_ context.Context, sessionID string, streamID StreamID, data []byte) (int, error) {
259+
// Append implements [EventStore.Append] by recording data in memory.
260+
func (s *MemoryEventStore) Append(_ context.Context, sessionID string, streamID StreamID, data []byte) error {
273261
s.mu.Lock()
274262
defer s.mu.Unlock()
275263

@@ -288,9 +276,13 @@ func (s *MemoryEventStore) AppendEvent(_ context.Context, sessionID string, stre
288276
s.purge()
289277
dl.appendData(data)
290278
s.nBytes += len(data)
291-
return dl.lastIndex(), nil
279+
return nil
292280
}
293281

282+
// ErrEventsPurged is the error that [EventStore.After] should return if the event just after the
283+
// index is no longer available.
284+
var ErrEventsPurged = errors.New("data purged")
285+
294286
// After implements [EventStore.After].
295287
func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID StreamID, index int) iter.Seq2[[]byte, error] {
296288
// Return the data items to yield.
@@ -306,10 +298,12 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S
306298
if !ok {
307299
return nil, fmt.Errorf("MemoryEventStore.After: unknown stream ID %v in session %q", streamID, sessionID)
308300
}
309-
if dl.first > index {
310-
return nil, fmt.Errorf("MemoryEventStore.After: data purged at index %d, stream ID %v, session %q", index, streamID, sessionID)
301+
start := index + 1
302+
if dl.first > start {
303+
return nil, fmt.Errorf("MemoryEventStore.After: index %d, stream ID %v, session %q: %w",
304+
index, streamID, sessionID, ErrEventsPurged)
311305
}
312-
return slices.Clone(dl.data[index-dl.first:]), nil
306+
return slices.Clone(dl.data[start-dl.first:]), nil
313307
}
314308

315309
return func(yield func([]byte, error) bool) {
@@ -326,26 +320,6 @@ func (s *MemoryEventStore) After(_ context.Context, sessionID string, streamID S
326320
}
327321
}
328322

329-
// StreamClosed implements [EventStore.StreamClosed].
330-
func (s *MemoryEventStore) StreamClosed(_ context.Context, sessionID string, streamID StreamID) error {
331-
if sessionID == "" {
332-
panic("empty sessionID")
333-
}
334-
335-
s.mu.Lock()
336-
defer s.mu.Unlock()
337-
338-
sm := s.store[sessionID]
339-
dl := sm[streamID]
340-
s.nBytes -= dl.size
341-
delete(sm, streamID)
342-
if len(sm) == 0 {
343-
delete(s.store, sessionID)
344-
}
345-
s.validate()
346-
return nil
347-
}
348-
349323
// SessionClosed implements [EventStore.SessionClosed].
350324
func (s *MemoryEventStore) SessionClosed(_ context.Context, sessionID string) error {
351325
s.mu.Lock()

mcp/event_test.go

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func TestMemoryEventStoreState(t *testing.T) {
105105
ctx := context.Background()
106106

107107
appendEvent := func(s *MemoryEventStore, sess string, str StreamID, data string) {
108-
if _, err := s.AppendEvent(ctx, sess, str, []byte(data)); err != nil {
108+
if err := s.Append(ctx, sess, str, []byte(data)); err != nil {
109109
t.Fatal(err)
110110
}
111111
}
@@ -127,18 +127,6 @@ func TestMemoryEventStoreState(t *testing.T) {
127127
"S1 1 first=0 d1 d3; S1 2 first=0 d2; S2 8 first=0 d4",
128128
8,
129129
},
130-
{
131-
"stream close",
132-
func(s *MemoryEventStore) {
133-
appendEvent(s, "S1", 1, "d1")
134-
appendEvent(s, "S1", 2, "d2")
135-
appendEvent(s, "S1", 1, "d3")
136-
appendEvent(s, "S2", 8, "d4")
137-
s.StreamClosed(ctx, "S1", 1)
138-
},
139-
"S1 2 first=0 d2; S2 8 first=0 d4",
140-
4,
141-
},
142130
{
143131
"session close",
144132
func(s *MemoryEventStore) {
@@ -218,10 +206,10 @@ func TestMemoryEventStoreAfter(t *testing.T) {
218206
ctx := context.Background()
219207
s := NewMemoryEventStore(nil)
220208
s.SetMaxBytes(4)
221-
s.AppendEvent(ctx, "S1", 1, []byte("d1"))
222-
s.AppendEvent(ctx, "S1", 1, []byte("d2"))
223-
s.AppendEvent(ctx, "S1", 1, []byte("d3"))
224-
s.AppendEvent(ctx, "S1", 2, []byte("d4")) // will purge d1
209+
s.Append(ctx, "S1", 1, []byte("d1"))
210+
s.Append(ctx, "S1", 1, []byte("d2"))
211+
s.Append(ctx, "S1", 1, []byte("d3"))
212+
s.Append(ctx, "S1", 2, []byte("d4")) // will purge d1
225213
want := "S1 1 first=1 d2 d3; S1 2 first=0 d4"
226214
if got := s.debugString(); got != want {
227215
t.Fatalf("got state %q, want %q", got, want)
@@ -234,10 +222,10 @@ func TestMemoryEventStoreAfter(t *testing.T) {
234222
want []string
235223
wantErr string // if non-empty, error should contain this string
236224
}{
237-
{"S1", 1, 0, nil, "purge"},
238-
{"S1", 1, 1, []string{"d2", "d3"}, ""},
239-
{"S1", 1, 2, []string{"d3"}, ""},
240-
{"S1", 2, 0, []string{"d4"}, ""},
225+
{"S1", 1, 0, []string{"d2", "d3"}, ""},
226+
{"S1", 1, 1, []string{"d3"}, ""},
227+
{"S1", 1, 2, nil, ""},
228+
{"S1", 2, 0, nil, ""},
241229
{"S1", 3, 0, nil, "unknown stream ID"},
242230
{"S2", 0, 0, nil, "unknown session ID"},
243231
} {

mcp/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,11 @@ func NewServer(impl *Implementation, opts *ServerOptions) *Server {
8585
if opts.PageSize < 0 {
8686
panic(fmt.Errorf("invalid page size %d", opts.PageSize))
8787
}
88+
// TODO(jba): don't modify opts, modify Server.opts.
8889
if opts.PageSize == 0 {
8990
opts.PageSize = DefaultPageSize
9091
}
92+
9193
return &Server{
9294
impl: impl,
9395
opts: *opts,

0 commit comments

Comments
 (0)