From 8403ec913a144fb9df8738c06ee7ece041343de4 Mon Sep 17 00:00:00 2001 From: Sash Zats Date: Sat, 3 May 2025 22:47:43 -0400 Subject: [PATCH] Handle endpoint event I noticed that endpoint event is not being handled meaning when server responds with tokenized endpoint and client should switch over to it, we were ignoring it previously resulting in client never connecting --- .../Base/Transports/HTTPClientTransport.swift | 359 ++++++++++-------- 1 file changed, 194 insertions(+), 165 deletions(-) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 9cf93a0c..adf3c936 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -2,22 +2,22 @@ import Foundation import Logging #if canImport(FoundationNetworking) - import FoundationNetworking +import FoundationNetworking #endif public actor HTTPClientTransport: Actor, Transport { - public let endpoint: URL + private(set) public var endpoint: URL private let session: URLSession public private(set) var sessionID: String? private let streaming: Bool private var streamingTask: Task? private var lastEventID: String? public nonisolated let logger: Logger - + private var isConnected = false private let messageStream: AsyncThrowingStream private let messageContinuation: AsyncThrowingStream.Continuation - + public init( endpoint: URL, configuration: URLSessionConfiguration = .default, @@ -31,7 +31,7 @@ public actor HTTPClientTransport: Actor, Transport { logger: logger ) } - + internal init( endpoint: URL, session: URLSession, @@ -41,83 +41,98 @@ public actor HTTPClientTransport: Actor, Transport { self.endpoint = endpoint self.session = session self.streaming = streaming - + // Create message stream var continuation: AsyncThrowingStream.Continuation! self.messageStream = AsyncThrowingStream { continuation = $0 } self.messageContinuation = continuation - + self.logger = - logger - ?? Logger( - label: "mcp.transport.http.client", - factory: { _ in SwiftLogNoOpLogHandler() } - ) + logger + ?? Logger( + label: "mcp.transport.http.client", + factory: { _ in SwiftLogNoOpLogHandler() } + ) } - + /// Establishes connection with the transport public func connect() async throws { - guard !isConnected else { return } - isConnected = true - - if streaming { - // Start listening to server events - streamingTask = Task { await startListeningForServerEvents() } + try await withUnsafeThrowingContinuation { (cont: UnsafeContinuation) in + guard !isConnected else { + cont.resume(throwing: MCPError.internalError("Transport already connected")) + return + } + + isConnected = true + + addListener("endpoint") { endpointPath in + guard let resolvedURL = URL(string: endpointPath, relativeTo: self.endpoint)?.absoluteURL else { + cont.resume(throwing: MCPError.internalError("Invalid endpoint URL")) + return + } + self.endpoint = resolvedURL + cont.resume() + } + + if streaming { + // Start listening to server events + streamingTask = Task { await startListeningForServerEvents() } + } + + logger.info("HTTP transport connected") } - - logger.info("HTTP transport connected") } - + /// Disconnects from the transport public func disconnect() async { guard isConnected else { return } isConnected = false - + // Cancel streaming task if active streamingTask?.cancel() streamingTask = nil - + // Cancel any in-progress requests session.invalidateAndCancel() - + // Clean up message stream messageContinuation.finish() - + logger.info("HTTP clienttransport disconnected") } - + /// Sends data through an HTTP POST request public func send(_ data: Data) async throws { guard isConnected else { throw MCPError.internalError("Transport not connected") } - + var request = URLRequest(url: endpoint) request.httpMethod = "POST" request.addValue("application/json, text/event-stream", forHTTPHeaderField: "Accept") request.addValue("application/json", forHTTPHeaderField: "Content-Type") request.httpBody = data - + // Add session ID if available if let sessionID = sessionID { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - + let (responseData, response) = try await session.data(for: request) - + guard let httpResponse = response as? HTTPURLResponse else { throw MCPError.internalError("Invalid HTTP response") } - + // Process the response based on content type and status code let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" - + // Extract session ID if present if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { self.sessionID = newSessionID logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - + // Handle different response types switch httpResponse.statusCode { case 200, 201, 202: @@ -127,7 +142,7 @@ public actor HTTPClientTransport: Actor, Transport { // The streaming is handled by the SSE task if active return } - + // For JSON responses, deliver the data directly if contentType.contains("application/json") && !responseData.isEmpty { logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) @@ -145,18 +160,18 @@ public actor HTTPClientTransport: Actor, Transport { throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") } } - + /// Receives data in an async sequence public func receive() -> AsyncThrowingStream { return messageStream } - + // MARK: - SSE - + /// Starts listening for server events using SSE private func startListeningForServerEvents() async { guard isConnected else { return } - + // Retry loop for connection drops while isConnected && !Task.isCancelled { do { @@ -170,143 +185,157 @@ public actor HTTPClientTransport: Actor, Transport { } } } - - #if canImport(FoundationNetworking) - private func connectToEventStream() async throws { - logger.warning("SSE is not supported on this platform") + +#if canImport(FoundationNetworking) + private func connectToEventStream() async throws { + logger.warning("SSE is not supported on this platform") + } +#else + /// Establishes an SSE connection to the server + private func connectToEventStream() async throws { + guard isConnected else { return } + + var request = URLRequest(url: endpoint) + request.httpMethod = "GET" + request.addValue("text/event-stream", forHTTPHeaderField: "Accept") + + // Add session ID if available + if let sessionID = sessionID { + request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - #else - /// Establishes an SSE connection to the server - private func connectToEventStream() async throws { - guard isConnected else { return } - - var request = URLRequest(url: endpoint) - request.httpMethod = "GET" - request.addValue("text/event-stream", forHTTPHeaderField: "Accept") - - // Add session ID if available - if let sessionID = sessionID { - request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") - } - - // Add Last-Event-ID header for resumability if available - if let lastEventID = lastEventID { - request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") - } - - logger.debug("Starting SSE connection") - - // Create URLSession task for SSE - let (stream, response) = try await session.bytes(for: request) - - guard let httpResponse = response as? HTTPURLResponse else { - throw MCPError.internalError("Invalid HTTP response") + + // Add Last-Event-ID header for resumability if available + if let lastEventID = lastEventID { + request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") + } + + logger.debug("Starting SSE connection") + + // Create URLSession task for SSE + let (stream, response) = try await session.bytes(for: request) + + guard let httpResponse = response as? HTTPURLResponse else { + throw MCPError.internalError("Invalid HTTP response") + } + + // Check response status + guard httpResponse.statusCode == 200 else { + // If the server returns 405 Method Not Allowed, + // it indicates that the server doesn't support SSE streaming. + // We should cancel the task instead of retrying the connection. + if httpResponse.statusCode == 405 { + self.streamingTask?.cancel() } - - // Check response status - guard httpResponse.statusCode == 200 else { - // If the server returns 405 Method Not Allowed, - // it indicates that the server doesn't support SSE streaming. - // We should cancel the task instead of retrying the connection. - if httpResponse.statusCode == 405 { - self.streamingTask?.cancel() + throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") + } + + // Extract session ID if present + if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { + self.sessionID = newSessionID + } + + // Process the SSE stream + var buffer = "" + var eventType = "" + var eventID: String? + var eventData = "" + + for try await byte in stream { + if Task.isCancelled { break } + + guard let char = String(bytes: [byte], encoding: .utf8) else { continue } + buffer.append(char) + + // Process complete lines + while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) { + var line = buffer[.. Void]() + + private func addListener(_ named: EventName, handler: @escaping @isolated(any) (EventData) -> Void) { + listeners[named] = handler + } + }