Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ npm install @sinclair/typebox
## Features

- **Complete MCP 2025-06-18 Support**: Implements the full Model Context Protocol specification with elicitation
- **Async Iterator Streaming**: Real-time streaming of tool results via Server-Sent Events
- **Per-Stream Event IDs**: MCP specification compliant event ID architecture for proper resumability
- **Elicitation Support**: Server-to-client information requests with schema validation
- **TypeBox Validation**: Type-safe schema validation with automatic TypeScript inference
- **Security Enhancements**: Input sanitization, rate limiting, and security assessment
Expand Down Expand Up @@ -122,6 +124,179 @@ app.mcpAddPrompt({
await app.listen({ port: 3000 })
```

## Async Iterator Streaming

The plugin supports **real-time streaming** of tool results via Server-Sent Events (SSE). Tools that return async iterators automatically stream their results as separate SSE events, enabling real-time progress updates and streaming data processing.

### Basic Streaming Tool

```typescript
import Fastify from 'fastify'
import mcpPlugin from '@platformatic/mcp'

const app = Fastify({ logger: true })

await app.register(mcpPlugin, {
enableSSE: true, // Required for streaming
serverInfo: { name: 'streaming-server', version: '1.0.0' },
capabilities: { tools: {} }
})

// Regular tool (returns JSON immediately)
app.mcpAddTool({
name: 'immediate_response',
description: 'Returns immediate result',
inputSchema: {
type: 'object',
properties: {
message: { type: 'string' }
}
}
}, async (params) => {
return {
content: [{ type: 'text', text: `Echo: ${params.message}` }]
}
})

// Streaming tool (returns SSE stream)
app.mcpAddTool({
name: 'streaming_counter',
description: 'Streams counting progress',
inputSchema: {
type: 'object',
properties: {
count: { type: 'number', minimum: 1, maximum: 10 }
},
required: ['count']
}
}, async function* (params) {
// Yield incremental progress updates
for (let i = 1; i <= params.count; i++) {
yield {
content: [{
type: 'text',
text: `Processing step ${i}/${params.count}...`
}]
}

// Simulate async work
await new Promise(resolve => setTimeout(resolve, 500))
}

// Final result
return {
content: [{
type: 'text',
text: `✅ Completed all ${params.count} steps!`
}]
}
})

await app.listen({ port: 3000 })
```

### Streaming Response Format

When a tool returns an async iterator, the plugin automatically:

1. **Detects the async iterator** using `Symbol.asyncIterator`
2. **Changes response type** to `Content-Type: text/event-stream`
3. **Streams each yielded value** as a separate SSE event
4. **Sends the final return value** as the last event
5. **Handles errors gracefully** during streaming

### Client Usage

```bash
# Regular tool (returns JSON)
curl -X POST http://localhost:3000/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"immediate_response","arguments":{"message":"Hello"}}}'

# Streaming tool (returns text/event-stream)
curl -X POST http://localhost:3000/mcp \
-H "Content-Type: application/json" \
-d '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"streaming_counter","arguments":{"count":3}}}'
```

### Advanced Streaming Examples

#### File Processing Stream
```typescript
app.mcpAddTool({
name: 'process_files',
description: 'Process multiple files with progress updates'
}, async function* (params) {
for (const [index, filename] of params.files.entries()) {
yield {
content: [{
type: 'text',
text: `📁 Processing file ${index + 1}/${params.files.length}: ${filename}`
}]
}

// Simulate file processing
await processFile(filename)

yield {
content: [{
type: 'text',
text: `✅ Completed: ${filename}`
}]
}
}

return {
content: [{
type: 'text',
text: `🎉 All ${params.files.length} files processed!`
}]
}
})
```

#### Error Handling During Streaming
```typescript
app.mcpAddTool({
name: 'streaming_with_errors',
description: 'Demonstrates error handling in streams'
}, async function* (params) {
try {
for (let i = 1; i <= 5; i++) {
if (i === 3) {
throw new Error('Simulated processing error')
}

yield {
content: [{
type: 'text',
text: `Step ${i}: Working...`
}]
}

await new Promise(resolve => setTimeout(resolve, 300))
}

return {
content: [{ type: 'text', text: 'All steps completed' }]
}
} catch (error) {
// Errors during streaming are handled gracefully
// Client receives all yielded values before the error
throw error
}
})
```

### Key Features

- **🔄 Automatic Detection**: No configuration needed - just return an async generator
- **📡 Real-time Updates**: Each `yield` becomes an immediate SSE event
- **🛡️ Error Handling**: Partial results preserved, errors handled gracefully
- **🔙 Backward Compatible**: Existing tools continue to work unchanged
- **⚡ Performance**: Efficient streaming with proper event ID management
- **🌊 MCP Compliant**: Per-stream event IDs for proper resumability

## Elicitation Support (MCP 2025-06-18)

The plugin supports the elicitation capability, allowing servers to request structured information from clients. This enables dynamic data collection with schema validation.
Expand Down
187 changes: 187 additions & 0 deletions examples/streaming-demo.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/usr/bin/env node

import Fastify from 'fastify'
import mcpPlugin from '../dist/index.js'

const app = Fastify({ logger: true })

// Register MCP plugin with SSE enabled for streaming support
await app.register(mcpPlugin, {
serverInfo: { name: 'streaming-demo', version: '1.0.0' },
enableSSE: true
})

// Regular tool that returns immediate results
app.mcpAddTool({
name: 'immediate_response',
description: 'Tool that returns an immediate response',
inputSchema: {
type: 'object',
properties: {
message: { type: 'string' }
},
required: ['message']
}
}, async (params) => {
return {
content: [{ type: 'text', text: `Immediate: ${params.message}` }]
}
})

// Streaming tool using async generator
app.mcpAddTool({
name: 'streaming_response',
description: 'Tool that streams responses using async generator',
inputSchema: {
type: 'object',
properties: {
count: { type: 'number', minimum: 1, maximum: 10 },
delay: { type: 'number', minimum: 100, maximum: 2000, default: 500 }
},
required: ['count']
}
}, async function * (params) {
const delay = params.delay ?? 500

// Yield incremental chunks
for (let i = 1; i <= params.count; i++) {
yield {
content: [{
type: 'text',
text: `Streaming chunk ${i}/${params.count}: Processing...`
}]
}

// Simulate async work
await new Promise(resolve => setTimeout(resolve, delay))
}

// Final result
return {
content: [{
type: 'text',
text: `✅ Completed all ${params.count} processing steps!`
}]
}
})

// Streaming tool that simulates file processing
app.mcpAddTool({
name: 'file_processor',
description: 'Simulates processing multiple files with streaming updates',
inputSchema: {
type: 'object',
properties: {
files: {
type: 'array',
items: { type: 'string' },
minItems: 1,
maxItems: 5
}
},
required: ['files']
}
}, async function * (params) {
for (const [index, filename] of params.files.entries()) {
// Simulate processing each file
yield {
content: [{
type: 'text',
text: `📁 Processing file ${index + 1}/${params.files.length}: ${filename}`
}]
}

// Simulate processing time
await new Promise(resolve => setTimeout(resolve, 800))

yield {
content: [{
type: 'text',
text: `✅ Completed processing: ${filename}`
}]
}
}

// Final summary
return {
content: [{
type: 'text',
text: `🎉 All ${params.files.length} files processed successfully!`
}]
}
})

// Error demonstration tool
app.mcpAddTool({
name: 'error_demo',
description: 'Demonstrates error handling in streaming',
inputSchema: {
type: 'object',
properties: {
errorAfter: { type: 'number', minimum: 1, maximum: 5, default: 3 }
}
}
}, async function * (params) {
const errorAfter = params.errorAfter ?? 3

for (let i = 1; i <= 5; i++) {
if (i === errorAfter) {
throw new Error(`Simulated error at step ${i}`)
}

yield {
content: [{
type: 'text',
text: `Step ${i}: Everything working fine...`
}]
}

await new Promise(resolve => setTimeout(resolve, 300))
}

return {
content: [{
type: 'text',
text: 'This should not be reached due to the error'
}]
}
})

// Start the server
const port = parseInt(process.env.PORT || '3000', 10)
const host = process.env.HOST || '127.0.0.1'

try {
await app.listen({ port, host })
console.log(`🚀 MCP Streaming Demo Server running on http://${host}:${port}`)
console.log('\n📖 Usage Examples:')
console.log(`
# Test immediate response (returns JSON)
curl -X POST http://${host}:${port}/mcp \\
-H "Content-Type: application/json" \\
-d '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"immediate_response","arguments":{"message":"Hello World"}}}'

# Test streaming response (returns text/event-stream)
curl -X POST http://${host}:${port}/mcp \\
-H "Content-Type: application/json" \\
-d '{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"streaming_response","arguments":{"count":3,"delay":1000}}}'

# Test file processing simulation
curl -X POST http://${host}:${port}/mcp \\
-H "Content-Type: application/json" \\
-d '{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"file_processor","arguments":{"files":["doc1.pdf","image.jpg","data.csv"]}}}'

# Test error handling
curl -X POST http://${host}:${port}/mcp \\
-H "Content-Type: application/json" \\
-d '{"jsonrpc":"2.0","id":4,"method":"tools/call","params":{"name":"error_demo","arguments":{"errorAfter":2}}}'

# List all available tools
curl -X POST http://${host}:${port}/mcp \\
-H "Content-Type: application/json" \\
-d '{"jsonrpc":"2.0","id":5,"method":"tools/list","params":{}}'
`)
} catch (err) {
app.log.error(err)
process.exit(1)
}
Loading