Skip to content

Commit bcbb31f

Browse files
mcp: add resource subscriptions (#138)
This CL adds the ability for clients to subscribe and receive updates for resources as described in https://modelcontextprotocol.io/specification/2025-06-18/server/resources#subscriptions Fixes: #23
1 parent ea6162c commit bcbb31f

File tree

6 files changed

+214
-6
lines changed

6 files changed

+214
-6
lines changed

design/design.md

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -748,13 +748,26 @@ Server sessions also support the spec methods `ListResources` and `ListResourceT
748748
749749
#### Subscriptions
750750
751-
ClientSessions can manage change notifications on particular resources:
751+
##### Client-Side Usage
752+
753+
Use the Subscribe and Unsubscribe methods on a ClientSession to start or stop receiving updates for a specific resource.
752754
753755
```go
754756
func (*ClientSession) Subscribe(context.Context, *SubscribeParams) error
755757
func (*ClientSession) Unsubscribe(context.Context, *UnsubscribeParams) error
756758
```
757759
760+
To process incoming update notifications, you must provide a ResourceUpdatedHandler in your ClientOptions. The SDK calls this function automatically whenever the server sends a notification for a resource you're subscribed to.
761+
762+
```go
763+
type ClientOptions struct {
764+
...
765+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
766+
}
767+
```
768+
769+
##### Server-Side Implementation
770+
758771
The server does not implement resource subscriptions. It passes along subscription requests to the user, and supplies a method to notify clients of changes. It tracks which sessions have subscribed to which resources so the user doesn't have to.
759772
760773
If a server author wants to support resource subscriptions, they must provide handlers to be called when clients subscribe and unsubscribe. It is an error to provide only one of these handlers.
@@ -772,7 +785,7 @@ type ServerOptions struct {
772785
User code should call `ResourceUpdated` when a subscribed resource changes.
773786
774787
```go
775-
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotification) error
788+
func (*Server) ResourceUpdated(context.Context, *ResourceUpdatedNotificationParams) error
776789
```
777790
778791
The server routes these notifications to the server sessions that subscribed to the resource.

mcp/client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type ClientOptions struct {
6060
ToolListChangedHandler func(context.Context, *ClientSession, *ToolListChangedParams)
6161
PromptListChangedHandler func(context.Context, *ClientSession, *PromptListChangedParams)
6262
ResourceListChangedHandler func(context.Context, *ClientSession, *ResourceListChangedParams)
63+
ResourceUpdatedHandler func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams)
6364
LoggingMessageHandler func(context.Context, *ClientSession, *LoggingMessageParams)
6465
ProgressNotificationHandler func(context.Context, *ClientSession, *ProgressNotificationParams)
6566
// If non-zero, defines an interval for regular "ping" requests.
@@ -293,6 +294,7 @@ var clientMethodInfos = map[string]methodInfo{
293294
notificationToolListChanged: newMethodInfo(clientMethod((*Client).callToolChangedHandler)),
294295
notificationPromptListChanged: newMethodInfo(clientMethod((*Client).callPromptChangedHandler)),
295296
notificationResourceListChanged: newMethodInfo(clientMethod((*Client).callResourceChangedHandler)),
297+
notificationResourceUpdated: newMethodInfo(clientMethod((*Client).callResourceUpdatedHandler)),
296298
notificationLoggingMessage: newMethodInfo(clientMethod((*Client).callLoggingHandler)),
297299
notificationProgress: newMethodInfo(sessionMethod((*ClientSession).callProgressNotificationHandler)),
298300
}
@@ -386,6 +388,20 @@ func (cs *ClientSession) Complete(ctx context.Context, params *CompleteParams) (
386388
return handleSend[*CompleteResult](ctx, cs, methodComplete, orZero[Params](params))
387389
}
388390

391+
// Subscribe sends a "resources/subscribe" request to the server, asking for
392+
// notifications when the specified resource changes.
393+
func (cs *ClientSession) Subscribe(ctx context.Context, params *SubscribeParams) error {
394+
_, err := handleSend[*emptyResult](ctx, cs, methodSubscribe, orZero[Params](params))
395+
return err
396+
}
397+
398+
// Unsubscribe sends a "resources/unsubscribe" request to the server, cancelling
399+
// a previous subscription.
400+
func (cs *ClientSession) Unsubscribe(ctx context.Context, params *UnsubscribeParams) error {
401+
_, err := handleSend[*emptyResult](ctx, cs, methodUnsubscribe, orZero[Params](params))
402+
return err
403+
}
404+
389405
func (c *Client) callToolChangedHandler(ctx context.Context, s *ClientSession, params *ToolListChangedParams) (Result, error) {
390406
return callNotificationHandler(ctx, c.opts.ToolListChangedHandler, s, params)
391407
}
@@ -398,6 +414,10 @@ func (c *Client) callResourceChangedHandler(ctx context.Context, s *ClientSessio
398414
return callNotificationHandler(ctx, c.opts.ResourceListChangedHandler, s, params)
399415
}
400416

417+
func (c *Client) callResourceUpdatedHandler(ctx context.Context, s *ClientSession, params *ResourceUpdatedNotificationParams) (Result, error) {
418+
return callNotificationHandler(ctx, c.opts.ResourceUpdatedHandler, s, params)
419+
}
420+
401421
func (c *Client) callLoggingHandler(ctx context.Context, cs *ClientSession, params *LoggingMessageParams) (Result, error) {
402422
if h := c.opts.LoggingMessageHandler; h != nil {
403423
h(ctx, cs, params)

mcp/mcp_test.go

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func TestEndToEnd(t *testing.T) {
6060

6161
// Channels to check if notification callbacks happened.
6262
notificationChans := map[string]chan int{}
63-
for _, name := range []string{"initialized", "roots", "tools", "prompts", "resources", "progress_server", "progress_client"} {
63+
for _, name := range []string{"initialized", "roots", "tools", "prompts", "resources", "progress_server", "progress_client", "resource_updated", "subscribe", "unsubscribe"} {
6464
notificationChans[name] = make(chan int, 1)
6565
}
6666
waitForNotification := func(t *testing.T, name string) {
@@ -78,6 +78,14 @@ func TestEndToEnd(t *testing.T) {
7878
ProgressNotificationHandler: func(context.Context, *ServerSession, *ProgressNotificationParams) {
7979
notificationChans["progress_server"] <- 0
8080
},
81+
SubscribeHandler: func(context.Context, *SubscribeParams) error {
82+
notificationChans["subscribe"] <- 0
83+
return nil
84+
},
85+
UnsubscribeHandler: func(context.Context, *UnsubscribeParams) error {
86+
notificationChans["unsubscribe"] <- 0
87+
return nil
88+
},
8189
}
8290
s := NewServer(testImpl, sopts)
8391
AddTool(s, &Tool{
@@ -128,6 +136,9 @@ func TestEndToEnd(t *testing.T) {
128136
ProgressNotificationHandler: func(context.Context, *ClientSession, *ProgressNotificationParams) {
129137
notificationChans["progress_client"] <- 0
130138
},
139+
ResourceUpdatedHandler: func(context.Context, *ClientSession, *ResourceUpdatedNotificationParams) {
140+
notificationChans["resource_updated"] <- 0
141+
},
131142
}
132143
c := NewClient(testImpl, opts)
133144
rootAbs, err := filepath.Abs(filepath.FromSlash("testdata/files"))
@@ -421,6 +432,37 @@ func TestEndToEnd(t *testing.T) {
421432
waitForNotification(t, "progress_server")
422433
})
423434

435+
t.Run("resource_subscriptions", func(t *testing.T) {
436+
err := cs.Subscribe(ctx, &SubscribeParams{
437+
URI: "test",
438+
})
439+
if err != nil {
440+
t.Fatal(err)
441+
}
442+
waitForNotification(t, "subscribe")
443+
s.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{
444+
URI: "test",
445+
})
446+
waitForNotification(t, "resource_updated")
447+
err = cs.Unsubscribe(ctx, &UnsubscribeParams{
448+
URI: "test",
449+
})
450+
if err != nil {
451+
t.Fatal(err)
452+
}
453+
waitForNotification(t, "unsubscribe")
454+
455+
// Verify the client does not receive the update after unsubscribing.
456+
s.ResourceUpdated(ctx, &ResourceUpdatedNotificationParams{
457+
URI: "test",
458+
})
459+
select {
460+
case <-notificationChans["resource_updated"]:
461+
t.Fatalf("resource updated after unsubscription")
462+
case <-time.After(time.Second):
463+
}
464+
})
465+
424466
// Disconnect.
425467
cs.Close()
426468
clientWG.Wait()

mcp/protocol.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,38 @@ type ToolListChangedParams struct {
859859
func (x *ToolListChangedParams) GetProgressToken() any { return getProgressToken(x) }
860860
func (x *ToolListChangedParams) SetProgressToken(t any) { setProgressToken(x, t) }
861861

862+
// Sent from the client to request resources/updated notifications from the
863+
// server whenever a particular resource changes.
864+
type SubscribeParams struct {
865+
// This property is reserved by the protocol to allow clients and servers to
866+
// attach additional metadata to their responses.
867+
Meta `json:"_meta,omitempty"`
868+
// The URI of the resource to subscribe to.
869+
URI string `json:"uri"`
870+
}
871+
872+
// Sent from the client to request cancellation of resources/updated
873+
// notifications from the server. This should follow a previous
874+
// resources/subscribe request.
875+
type UnsubscribeParams struct {
876+
// This property is reserved by the protocol to allow clients and servers to
877+
// attach additional metadata to their responses.
878+
Meta `json:"_meta,omitempty"`
879+
// The URI of the resource to unsubscribe from.
880+
URI string `json:"uri"`
881+
}
882+
883+
// A notification from the server to the client, informing it that a resource
884+
// has changed and may need to be read again. This should only be sent if the
885+
// client previously sent a resources/subscribe request.
886+
type ResourceUpdatedNotificationParams struct {
887+
// This property is reserved by the protocol to allow clients and servers to
888+
// attach additional metadata to their responses.
889+
Meta `json:"_meta,omitempty"`
890+
// The URI of the resource that has been updated. This might be a sub-resource of the one that the client actually subscribed to.
891+
URI string `json:"uri"`
892+
}
893+
862894
// TODO(jba): add CompleteRequest and related types.
863895

864896
// TODO(jba): add ElicitRequest and related types.

mcp/server.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"fmt"
1414
"iter"
1515
"log"
16+
"maps"
1617
"net/url"
1718
"path/filepath"
1819
"slices"
@@ -43,6 +44,7 @@ type Server struct {
4344
sessions []*ServerSession
4445
sendingMethodHandler_ MethodHandler[*ServerSession]
4546
receivingMethodHandler_ MethodHandler[*ServerSession]
47+
resourceSubscriptions map[string]map[*ServerSession]bool // uri -> session -> bool
4648
}
4749

4850
// ServerOptions is used to configure behavior of the server.
@@ -64,6 +66,10 @@ type ServerOptions struct {
6466
// If the peer fails to respond to pings originating from the keepalive check,
6567
// the session is automatically closed.
6668
KeepAlive time.Duration
69+
// Function called when a client session subscribes to a resource.
70+
SubscribeHandler func(context.Context, *SubscribeParams) error
71+
// Function called when a client session unsubscribes from a resource.
72+
UnsubscribeHandler func(context.Context, *UnsubscribeParams) error
6773
}
6874

6975
// NewServer creates a new MCP server. The resulting server has no features:
@@ -89,7 +95,12 @@ func NewServer(impl *Implementation, opts *ServerOptions) *Server {
8995
if opts.PageSize == 0 {
9096
opts.PageSize = DefaultPageSize
9197
}
92-
98+
if opts.SubscribeHandler != nil && opts.UnsubscribeHandler == nil {
99+
panic("SubscribeHandler requires UnsubscribeHandler")
100+
}
101+
if opts.UnsubscribeHandler != nil && opts.SubscribeHandler == nil {
102+
panic("UnsubscribeHandler requires SubscribeHandler")
103+
}
93104
return &Server{
94105
impl: impl,
95106
opts: *opts,
@@ -99,6 +110,7 @@ func NewServer(impl *Implementation, opts *ServerOptions) *Server {
99110
resourceTemplates: newFeatureSet(func(t *serverResourceTemplate) string { return t.resourceTemplate.URITemplate }),
100111
sendingMethodHandler_: defaultSendingMethodHandler[*ServerSession],
101112
receivingMethodHandler_: defaultReceivingMethodHandler[*ServerSession],
113+
resourceSubscriptions: make(map[string]map[*ServerSession]bool),
102114
}
103115
}
104116

@@ -225,6 +237,9 @@ func (s *Server) capabilities() *serverCapabilities {
225237
}
226238
if s.resources.len() > 0 || s.resourceTemplates.len() > 0 {
227239
caps.Resources = &resourceCapabilities{ListChanged: true}
240+
if s.opts.SubscribeHandler != nil {
241+
caps.Resources.Subscribe = true
242+
}
228243
}
229244
return caps
230245
}
@@ -428,6 +443,57 @@ func fileResourceHandler(dir string) ResourceHandler {
428443
}
429444
}
430445

446+
// ResourceUpdated sends a notification to all clients that have subscribed to the
447+
// resource specified in params. This method is the primary way for a
448+
// server author to signal that a resource has changed.
449+
func (s *Server) ResourceUpdated(ctx context.Context, params *ResourceUpdatedNotificationParams) error {
450+
s.mu.Lock()
451+
subscribedSessions := s.resourceSubscriptions[params.URI]
452+
sessions := slices.Collect(maps.Keys(subscribedSessions))
453+
s.mu.Unlock()
454+
notifySessions(sessions, notificationResourceUpdated, params)
455+
return nil
456+
}
457+
458+
func (s *Server) subscribe(ctx context.Context, ss *ServerSession, params *SubscribeParams) (*emptyResult, error) {
459+
if s.opts.SubscribeHandler == nil {
460+
return nil, fmt.Errorf("%w: server does not support resource subscriptions", jsonrpc2.ErrMethodNotFound)
461+
}
462+
if err := s.opts.SubscribeHandler(ctx, params); err != nil {
463+
return nil, err
464+
}
465+
466+
s.mu.Lock()
467+
defer s.mu.Unlock()
468+
if s.resourceSubscriptions[params.URI] == nil {
469+
s.resourceSubscriptions[params.URI] = make(map[*ServerSession]bool)
470+
}
471+
s.resourceSubscriptions[params.URI][ss] = true
472+
473+
return &emptyResult{}, nil
474+
}
475+
476+
func (s *Server) unsubscribe(ctx context.Context, ss *ServerSession, params *UnsubscribeParams) (*emptyResult, error) {
477+
if s.opts.UnsubscribeHandler == nil {
478+
return nil, jsonrpc2.ErrMethodNotFound
479+
}
480+
481+
if err := s.opts.UnsubscribeHandler(ctx, params); err != nil {
482+
return nil, err
483+
}
484+
485+
s.mu.Lock()
486+
defer s.mu.Unlock()
487+
if subscribedSessions, ok := s.resourceSubscriptions[params.URI]; ok {
488+
delete(subscribedSessions, ss)
489+
if len(subscribedSessions) == 0 {
490+
delete(s.resourceSubscriptions, params.URI)
491+
}
492+
}
493+
494+
return &emptyResult{}, nil
495+
}
496+
431497
// Run runs the server over the given transport, which must be persistent.
432498
//
433499
// Run blocks until the client terminates the connection or the provided
@@ -475,6 +541,10 @@ func (s *Server) disconnect(cc *ServerSession) {
475541
s.sessions = slices.DeleteFunc(s.sessions, func(cc2 *ServerSession) bool {
476542
return cc2 == cc
477543
})
544+
545+
for _, subscribedSessions := range s.resourceSubscriptions {
546+
delete(subscribedSessions, cc)
547+
}
478548
}
479549

480550
// Connect connects the MCP server over the given transport and starts handling
@@ -616,6 +686,8 @@ var serverMethodInfos = map[string]methodInfo{
616686
methodListResourceTemplates: newMethodInfo(serverMethod((*Server).listResourceTemplates)),
617687
methodReadResource: newMethodInfo(serverMethod((*Server).readResource)),
618688
methodSetLevel: newMethodInfo(sessionMethod((*ServerSession).setLevel)),
689+
methodSubscribe: newMethodInfo(serverMethod((*Server).subscribe)),
690+
methodUnsubscribe: newMethodInfo(serverMethod((*Server).unsubscribe)),
619691
notificationInitialized: newMethodInfo(serverMethod((*Server).callInitializedHandler)),
620692
notificationRootsListChanged: newMethodInfo(serverMethod((*Server).callRootsListChangedHandler)),
621693
notificationProgress: newMethodInfo(sessionMethod((*ServerSession).callProgressNotificationHandler)),

0 commit comments

Comments
 (0)