Skip to content

Fix streamable http sampling #531

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
881e095
feat: implement sampling support for Streamable HTTP transport
andig Jul 27, 2025
d239784
feat: implement server-side sampling support for HTTP transport
andig Jul 27, 2025
dd877e0
fix: replace time.Sleep with synchronization primitives in tests
andig Jul 27, 2025
a4ec0b3
fix: improve request detection logic and add nil pointer checks
andig Jul 27, 2025
1cae3a9
fix: correct misleading comment about response delivery
andig Jul 27, 2025
204b273
fix: implement EnableSampling() to properly declare sampling capability
andig Jul 27, 2025
5d4fb64
fix: prevent panic from unsafe type assertion in example server
andig Jul 27, 2025
4e41f25
fix: add missing EnableSampling() call in interface test
andig Jul 27, 2025
178e234
fix: expand error test coverage and avoid t.Fatalf
andig Jul 27, 2025
27322ca
fix: eliminate recursive response handling and improve routing
andig Jul 27, 2025
d025975
fix: improve sampling response delivery robustness
andig Jul 27, 2025
a9b20be
fix: add graceful shutdown handling to sampling client
andig Jul 28, 2025
b7afbb9
fix: improve context handling in streamable HTTP transport
andig Jul 28, 2025
a664289
fix: improve error message for notification channel queue full condition
andig Jul 28, 2025
bac5dad
refactor: rename struct variable for clarity in message parsing
andig Jul 28, 2025
e69716d
test: add concurrent sampling requests test with response association
andig Jul 28, 2025
e28a859
fix: improve context handling in async goroutine
andig Jul 28, 2025
4fa5295
refactor: replace interface{} with any throughout codebase
andig Jul 28, 2025
3852e2d
fix: improve context handling in async goroutine for StreamableHTTP
andig Jul 28, 2025
83883ed
refactor: remove unused samplingResponseChan field from session struct
andig Jul 28, 2025
9ea4a10
feat: add graceful shutdown handling to sampling HTTP client example
andig Jul 28, 2025
11f8d0e
refactor: remove unused mu field from streamableHttpSession
andig Jul 28, 2025
c1f30be
Merge branch 'main' into http-sampling-improvements
andig Aug 6, 2025
1bda735
Add e2e test
andig Aug 6, 2025
490bcbe
wip
andig Aug 6, 2025
e65ac96
wip
andig Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,22 @@ func (c *Client) handleSamplingRequestTransport(ctx context.Context, request tra
return nil, fmt.Errorf("failed to unmarshal params: %w", err)
}
}

// Fix content parsing - HTTP transport unmarshals TextContent as map[string]any
for i := range params.Messages {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually

func getTextFromContent(content any) string 

from the examples directory. Might be a good idea to make that a global helper function?

if contentMap, ok := params.Messages[i].Content.(map[string]any); ok {
if textType, exists := contentMap["type"]; exists && textType == "text" {
if text, exists := contentMap["text"]; exists {
if textStr, ok := text.(string); ok {
params.Messages[i].Content = mcp.TextContent{
Type: "text",
Text: textStr,
}
}
}
}
}
}

// Create the MCP request
mcpRequest := mcp.CreateMessageRequest{
Expand Down
6 changes: 2 additions & 4 deletions client/transport/streamable_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,10 +601,8 @@ func (c *StreamableHTTP) IsOAuthEnabled() bool {
func (c *StreamableHTTP) listenForever(ctx context.Context) {
c.logger.Infof("listening to server forever")
for {
// Add timeout for individual connection attempts
connectCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
err := c.createGETConnectionToServer(connectCtx)
cancel()
// Use the original context for continuous listening - no timeout
err := c.createGETConnectionToServer(ctx)

if errors.Is(err, ErrGetMethodNotAllowed) {
// server does not support listening
Expand Down
Loading