Skip to content

Commit 2cb7e47

Browse files
committed
成功mcp
1 parent 5f519e0 commit 2cb7e47

File tree

2 files changed

+203
-68
lines changed

2 files changed

+203
-68
lines changed

golang/http_shell/src/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@ func startMCPServer() {
1818
http.HandleFunc("/mcp", mcpSSEHandler)
1919
http.HandleFunc("/", handler)
2020

21-
log.Println("MCP SSE 服务启动在端口 :8080,端点: /mcp")
22-
log.Println("原有 http shell 服务仍在根目录 / 可用")
21+
log.Println("MCP SSE 服务启动在端口 :8080")
22+
log.Println(" - SSE 端点: GET /mcp (用于接收服务器推送)")
23+
log.Println(" - 命令端点: POST /mcp (用于发送 JSON-RPC 命令)")
24+
log.Println(" - 原有 http shell 服务仍在根目录 / 可用")
2325
log.Fatal(http.ListenAndServe(":8080", nil))
2426
}
2527

golang/http_shell/src/mcp.go

Lines changed: 199 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
package main
22

33
import (
4-
"bufio"
54
"encoding/json"
65
"fmt"
6+
"io"
77
"log"
88
"net/http"
99
"os/exec"
1010
"strings"
1111
"sync"
12+
"time"
1213
)
1314

1415
// ==================== MCP SSE Protocol Implementation ====================
@@ -54,17 +55,35 @@ type SSEConnection struct {
5455
writer http.ResponseWriter
5556
flusher http.Flusher
5657
mu sync.Mutex
58+
closed bool
5759
}
5860

59-
func (sse *SSEConnection) WriteEvent(eventType, data string) {
61+
func (sse *SSEConnection) WriteEvent(eventType, data string) error {
6062
sse.mu.Lock()
6163
defer sse.mu.Unlock()
6264

65+
if sse.closed {
66+
return fmt.Errorf("SSE connection closed")
67+
}
68+
6369
fmt.Fprintf(sse.writer, "event: %s\n", eventType)
6470
fmt.Fprintf(sse.writer, "data: %s\n\n", data)
6571
sse.flusher.Flush()
72+
return nil
73+
}
74+
75+
func (sse *SSEConnection) Close() {
76+
sse.mu.Lock()
77+
defer sse.mu.Unlock()
78+
sse.closed = true
6679
}
6780

81+
// Global SSE connection manager
82+
var (
83+
sseConnections = make(map[string]*SSEConnection)
84+
sseConnectionsLock sync.RWMutex
85+
)
86+
6887
// MCPHandler manages MCP protocol handling
6988
type MCPHandler struct {
7089
tools map[string]func(map[string]interface{}) MCPToolCallResult
@@ -166,13 +185,26 @@ func (h *MCPHandler) HandleToolCall(params map[string]interface{}) MCPToolCallRe
166185
}
167186

168187
func mcpSSEHandler(w http.ResponseWriter, r *http.Request) {
169-
clientIP := r.RemoteAddr
170-
log.Printf("[SSE] 新连接来自: %s, Method: %s", clientIP, r.Method)
188+
clientID := r.RemoteAddr + ":" + time.Now().Format("20060102150405")
189+
log.Printf("[SSE] 新连接 %s 来自: %s, Method: %s", clientID, r.RemoteAddr, r.Method)
190+
191+
// POST requests handle JSON-RPC commands
192+
if r.Method == "POST" {
193+
mcpRequestHandler(w, r)
194+
return
195+
}
196+
197+
// GET requests establish SSE connection
198+
if r.Method != "GET" {
199+
log.Printf("[SSE] 错误: 客户端 %s 使用了错误的 HTTP 方法: %s", clientID, r.Method)
200+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
201+
return
202+
}
171203

172204
// Check for SSE support
173205
flusher, ok := w.(http.Flusher)
174206
if !ok {
175-
log.Printf("[SSE] 错误: 客户端 %s 不支持 SSE", clientIP)
207+
log.Printf("[SSE] 错误: 客户端 %s 不支持 SSE", clientID)
176208
http.Error(w, "SSE not supported", http.StatusInternalServerError)
177209
return
178210
}
@@ -183,14 +215,26 @@ func mcpSSEHandler(w http.ResponseWriter, r *http.Request) {
183215
w.Header().Set("Connection", "keep-alive")
184216
w.Header().Set("Access-Control-Allow-Origin", "*")
185217

186-
log.Printf("[SSE] 客户端 %s: SSE 响应头已设置", clientIP)
218+
log.Printf("[SSE] 客户端 %s: SSE 响应头已设置", clientID)
187219

188220
sse := &SSEConnection{
189221
writer: w,
190222
flusher: flusher,
191223
}
192224

193-
mcpHandler := NewMCPHandler()
225+
// Register connection
226+
sseConnectionsLock.Lock()
227+
sseConnections[clientID] = sse
228+
sseConnectionsLock.Unlock()
229+
230+
// Clean up on disconnect
231+
defer func() {
232+
sse.Close()
233+
sseConnectionsLock.Lock()
234+
delete(sseConnections, clientID)
235+
sseConnectionsLock.Unlock()
236+
log.Printf("[SSE] 客户端 %s: 已断开连接", clientID)
237+
}()
194238

195239
// Send endpoint notification
196240
endpointMsg, _ := json.Marshal(map[string]interface{}{
@@ -201,69 +245,158 @@ func mcpSSEHandler(w http.ResponseWriter, r *http.Request) {
201245
},
202246
})
203247
sse.WriteEvent("endpoint", string(endpointMsg))
204-
log.Printf("[SSE] 客户端 %s: 已发送 endpoint 通知", clientIP)
205-
206-
// Process incoming messages
207-
scanner := bufio.NewScanner(r.Body)
208-
messageCount := 0
209-
210-
for scanner.Scan() {
211-
messageCount++
212-
line := scanner.Text()
213-
log.Printf("[SSE] 客户端 %s: 收到消息 #%d: %s", clientIP, messageCount, line)
214-
215-
if strings.HasPrefix(line, "data: ") {
216-
jsonStr := strings.TrimPrefix(line, "data: ")
217-
var msg MCPMessage
218-
if err := json.Unmarshal([]byte(jsonStr), &msg); err != nil {
219-
log.Printf("[SSE] 客户端 %s: JSON解析错误 #%d: %v, 原始内容: %s", clientIP, messageCount, err, jsonStr)
220-
continue
221-
}
222-
223-
log.Printf("[SSE] 客户端 %s: 解析成功, Method: %s, ID: %v", clientIP, msg.Method, msg.ID)
224-
225-
var response interface{}
226-
switch msg.Method {
227-
case "tools/list":
228-
log.Printf("[SSE] 客户端 %s: 处理 tools/list 请求", clientIP)
229-
response = map[string]interface{}{
230-
"jsonrpc": "2.0",
231-
"id": msg.ID,
232-
"result": mcpHandler.HandleToolsList(),
233-
}
234-
case "tools/call":
235-
log.Printf("[SSE] 客户端 %s: 处理 tools/call 请求", clientIP)
236-
var params map[string]interface{}
237-
json.Unmarshal(msg.Params, &params)
238-
result := mcpHandler.HandleToolCall(params)
239-
response = map[string]interface{}{
240-
"jsonrpc": "2.0",
241-
"id": msg.ID,
242-
"result": result,
243-
}
244-
default:
245-
log.Printf("[SSE] 客户端 %s: 未知方法: %s", clientIP, msg.Method)
246-
response = map[string]interface{}{
247-
"jsonrpc": "2.0",
248-
"id": msg.ID,
249-
"error": map[string]interface{}{
250-
"code": -32601,
251-
"message": "Method not found",
252-
},
253-
}
254-
}
248+
log.Printf("[SSE] 客户端 %s: 已发送 endpoint 通知", clientID)
255249

256-
respBytes, _ := json.Marshal(response)
257-
sse.WriteEvent("message", string(respBytes))
258-
log.Printf("[SSE] 客户端 %s: 已发送响应", clientIP)
259-
}
250+
// Wait for disconnect (SSE is server-push only)
251+
notify := r.Context().Done()
252+
<-notify
253+
log.Printf("[SSE] 客户端 %s: 上下文已关闭", clientID)
254+
}
255+
256+
// mcpRequestHandler handles POST requests for MCP commands
257+
func mcpRequestHandler(w http.ResponseWriter, r *http.Request) {
258+
clientIP := r.RemoteAddr
259+
log.Printf("[MCP-POST] 收到请求来自: %s, Method: %s", clientIP, r.Method)
260+
261+
if r.Method != "POST" {
262+
log.Printf("[MCP-POST] 错误: 客户端 %s 使用了错误的 HTTP 方法: %s (期望 POST)", clientIP, r.Method)
263+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
264+
return
260265
}
261266

262-
if err := scanner.Err(); err != nil {
263-
log.Printf("[SSE] 客户端 %s: 连接错误 (已处理消息: %d): %v", clientIP, messageCount, err)
267+
// Read request body
268+
body, err := io.ReadAll(r.Body)
269+
if err != nil {
270+
log.Printf("[MCP-POST] 客户端 %s: 读取请求体失败: %v", clientIP, err)
271+
http.Error(w, "Failed to read request body", http.StatusBadRequest)
272+
return
273+
}
274+
defer r.Body.Close()
275+
276+
log.Printf("[MCP-POST] 客户端 %s: 请求体: %s", clientIP, string(body))
277+
278+
// Check if request body is SSE format or direct JSON
279+
var msg MCPMessage
280+
var jsonStr string
281+
282+
if strings.HasPrefix(string(body), "data: ") {
283+
// SSE format
284+
jsonStr = strings.TrimPrefix(string(body), "data: ")
264285
} else {
265-
log.Printf("[SSE] 客户端 %s: 连接正常关闭 (已处理消息: %d)", clientIP, messageCount)
286+
// Direct JSON
287+
jsonStr = string(body)
266288
}
267289

268-
log.Printf("[SSE] 客户端 %s: 处理完成", clientIP)
290+
if err := json.Unmarshal([]byte(jsonStr), &msg); err != nil {
291+
log.Printf("[MCP-POST] 客户端 %s: JSON解析错误: %v, 原始内容: %s", clientIP, err, jsonStr)
292+
http.Error(w, fmt.Sprintf("JSON parse error: %v", err), http.StatusBadRequest)
293+
return
294+
}
295+
296+
log.Printf("[MCP-POST] 客户端 %s: 解析成功, Method: %s, ID: %v", clientIP, msg.Method, msg.ID)
297+
298+
mcpHandler := NewMCPHandler()
299+
300+
// Handle notifications (no id, no response needed)
301+
if msg.ID == nil && strings.HasPrefix(msg.Method, "notifications/") {
302+
log.Printf("[MCP-POST] 客户端 %s: 忽略通知 %s", clientIP, msg.Method)
303+
w.Header().Set("Content-Type", "application/json")
304+
w.Write([]byte("{}"))
305+
return
306+
}
307+
308+
// Handle "initialized" notification
309+
if msg.ID == nil && msg.Method == "initialized" {
310+
log.Printf("[MCP-POST] 客户端 %s: 收到 initialized 通知", clientIP)
311+
w.Header().Set("Content-Type", "application/json")
312+
w.Write([]byte("{}"))
313+
return
314+
}
315+
316+
var response interface{}
317+
switch msg.Method {
318+
case "initialize":
319+
log.Printf("[MCP-POST] 客户端 %s: 处理 initialize 请求", clientIP)
320+
response = map[string]interface{}{
321+
"jsonrpc": "2.0",
322+
"id": msg.ID,
323+
"result": map[string]interface{}{
324+
"protocolVersion": "2025-06-18",
325+
"capabilities": map[string]interface{}{
326+
"tools": map[string]interface{}{
327+
"listChanged": false,
328+
},
329+
"prompts": map[string]interface{}{
330+
"listChanged": false,
331+
},
332+
"resources": map[string]interface{}{
333+
"listChanged": false,
334+
},
335+
"logging": map[string]interface{}{},
336+
},
337+
"serverInfo": map[string]interface{}{
338+
"name": "http-shell-mcp",
339+
"version": "1.0.0",
340+
},
341+
},
342+
}
343+
case "tools/list":
344+
log.Printf("[MCP-POST] 客户端 %s: 处理 tools/list 请求", clientIP)
345+
response = map[string]interface{}{
346+
"jsonrpc": "2.0",
347+
"id": msg.ID,
348+
"result": mcpHandler.HandleToolsList(),
349+
}
350+
case "tools/call":
351+
log.Printf("[MCP-POST] 客户端 %s: 处理 tools/call 请求", clientIP)
352+
var params map[string]interface{}
353+
json.Unmarshal(msg.Params, &params)
354+
result := mcpHandler.HandleToolCall(params)
355+
response = map[string]interface{}{
356+
"jsonrpc": "2.0",
357+
"id": msg.ID,
358+
"result": result,
359+
}
360+
case "prompts/list":
361+
log.Printf("[MCP-POST] 客户端 %s: 处理 prompts/list 请求", clientIP)
362+
response = map[string]interface{}{
363+
"jsonrpc": "2.0",
364+
"id": msg.ID,
365+
"result": map[string]interface{}{
366+
"prompts": []interface{}{},
367+
},
368+
}
369+
case "resources/list":
370+
log.Printf("[MCP-POST] 客户端 %s: 处理 resources/list 请求", clientIP)
371+
response = map[string]interface{}{
372+
"jsonrpc": "2.0",
373+
"id": msg.ID,
374+
"result": map[string]interface{}{
375+
"resources": []interface{}{},
376+
},
377+
}
378+
case "ping":
379+
log.Printf("[MCP-POST] 客户端 %s: 处理 ping 请求", clientIP)
380+
response = map[string]interface{}{
381+
"jsonrpc": "2.0",
382+
"id": msg.ID,
383+
"result": map[string]interface{}{},
384+
}
385+
default:
386+
log.Printf("[MCP-POST] 客户端 %s: 未知方法: %s", clientIP, msg.Method)
387+
response = map[string]interface{}{
388+
"jsonrpc": "2.0",
389+
"id": msg.ID,
390+
"error": map[string]interface{}{
391+
"code": -32601,
392+
"message": "Method not found",
393+
},
394+
}
395+
}
396+
397+
respBytes, _ := json.Marshal(response)
398+
w.Header().Set("Content-Type", "application/json")
399+
w.Write(respBytes)
400+
401+
log.Printf("[MCP-POST] 客户端 %s: 已发送响应: %s", clientIP, string(respBytes))
269402
}

0 commit comments

Comments
 (0)