Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions mcp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"iter"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/google/jsonschema-go/jsonschema"
Expand Down Expand Up @@ -177,7 +178,11 @@ func (c *Client) Connect(ctx context.Context, t Transport, _ *ClientSessionOptio
// Call [ClientSession.Close] to close the connection, or await server
// termination with [ClientSession.Wait].
type ClientSession struct {
onClose func()
// Ensure that onClose is called at most once.
// We defensively use an atomic CompareAndSwap rather than a sync.Once, in case the
// onClose callback triggers a re-entrant call to Close.
calledOnClose atomic.Bool
onClose func()

conn *jsonrpc2.Connection
client *Client
Expand Down Expand Up @@ -205,6 +210,8 @@ func (cs *ClientSession) ID() string {
// Close performs a graceful close of the connection, preventing new requests
// from being handled, and waiting for ongoing requests to return. Close then
// terminates the connection.
//
// Close is idempotent and concurrency safe.
func (cs *ClientSession) Close() error {
// Note: keepaliveCancel access is safe without a mutex because:
// 1. keepaliveCancel is only written once during startKeepalive (happens-before all Close calls)
Expand All @@ -216,7 +223,7 @@ func (cs *ClientSession) Close() error {
}
err := cs.conn.Close()

if cs.onClose != nil {
if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) {
cs.onClose()
}

Expand Down
13 changes: 10 additions & 3 deletions mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"reflect"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/google/jsonschema-go/jsonschema"
Expand Down Expand Up @@ -825,7 +826,7 @@ func (s *Server) disconnect(cc *ServerSession) {
type ServerSessionOptions struct {
State *ServerSessionState

onClose func()
onClose func() // used to clean up associated resources
}

// Connect connects the MCP server over the given transport and starts handling
Expand Down Expand Up @@ -920,7 +921,11 @@ func newServerRequest[P Params](ss *ServerSession, params P) *ServerRequest[P] {
// Call [ServerSession.Close] to close the connection, or await client
// termination with [ServerSession.Wait].
type ServerSession struct {
onClose func()
// Ensure that onClose is called at most once.
// We defensively use an atomic CompareAndSwap rather than a sync.Once, in case the
// onClose callback triggers a re-entrant call to Close.
calledOnClose atomic.Bool
onClose func()

server *Server
conn *jsonrpc2.Connection
Expand Down Expand Up @@ -1185,6 +1190,8 @@ func (ss *ServerSession) setLevel(_ context.Context, params *SetLoggingLevelPara
// Close performs a graceful shutdown of the connection, preventing new
// requests from being handled, and waiting for ongoing requests to return.
// Close then terminates the connection.
//
// Close is idempotent and concurrency safe.
func (ss *ServerSession) Close() error {
if ss.keepaliveCancel != nil {
// Note: keepaliveCancel access is safe without a mutex because:
Expand All @@ -1196,7 +1203,7 @@ func (ss *ServerSession) Close() error {
}
err := ss.conn.Close()

if ss.onClose != nil {
if ss.onClose != nil && ss.calledOnClose.CompareAndSwap(false, true) {
ss.onClose()
}

Expand Down
Loading