diff --git a/Sources/MCP/Client/Client.swift b/Sources/MCP/Client/Client.swift index 696ffd14..daa5e4c0 100644 --- a/Sources/MCP/Client/Client.swift +++ b/Sources/MCP/Client/Client.swift @@ -4,6 +4,7 @@ import struct Foundation.Data import struct Foundation.Date import class Foundation.JSONDecoder import class Foundation.JSONEncoder +import func Foundation.NSLog /// Model Context Protocol client public actor Client { @@ -154,7 +155,11 @@ public actor Client { /// A dictionary of type-erased pending requests, keyed by request ID private var pendingRequests: [ID: AnyPendingRequest] = [:] // Add reusable JSON encoder/decoder - private let encoder = JSONEncoder() + private let encoder: JSONEncoder = { + let encoder = JSONEncoder() + encoder.outputFormatting = .withoutEscapingSlashes + return encoder + }() private let decoder = JSONDecoder() public init( @@ -170,56 +175,83 @@ public actor Client { /// Connect to the server using the given transport @discardableResult public func connect(transport: any Transport) async throws -> Initialize.Result { + NSLog("🔵 CLIENT.connect() ENTRY - Setting connection") self.connection = transport + NSLog("🔵 CLIENT.connect() - Calling transport.connect()") try await self.connection?.connect() + NSLog("🔵 CLIENT.connect() - transport.connect() RETURNED!") - await logger?.debug( - "Client connected", metadata: ["name": "\(name)", "version": "\(version)"]) + // REMOVED: await logger?.debug() - causes actor boundary crossing hang + // await logger?.debug( + // "Client connected", metadata: ["name": "\(name)", "version": "\(version)"]) + NSLog("🔵 CLIENT: About to create message handling Task") + // REMOVED: await logger?.debug() - causes actor boundary crossing hang + // await logger?.debug("CLIENT: About to create message handling Task") // Start message handling loop task = Task { - guard let connection = self.connection else { return } - repeat { - // Check for cancellation before starting the iteration - if Task.isCancelled { break } - - do { - let stream = await connection.receive() - for try await data in stream { - if Task.isCancelled { break } // Check inside loop too - - // Attempt to decode data - // Try decoding as a batch response first - if let batchResponse = try? decoder.decode([AnyResponse].self, from: data) { - await handleBatchResponse(batchResponse) - } else if let response = try? decoder.decode(AnyResponse.self, from: data) { - await handleResponse(response) - } else if let message = try? decoder.decode(AnyMessage.self, from: data) { - await handleMessage(message) + NSLog("🔵 CLIENT: Inside Task - starting") + await self.logger?.debug("CLIENT: Inside Task - starting") + guard let connection = self.connection else { + NSLog("❌ CLIENT: No connection available in Task!") + return + } + + // Get stream once - don't call receive() repeatedly + NSLog("🔵 CLIENT: Getting stream from connection.receive()") + // REMOVED: await logger?.debug() - causes actor boundary delay + let stream = await connection.receive() + NSLog("🔵 CLIENT: Got stream, starting for-await loop") + // REMOVED: await logger?.debug() - causes actor boundary delay + + do { + for try await data in stream { + NSLog("🔵 CLIENT: Received data in loop - \(data.count) bytes") + // REMOVED: await logger?.debug() - causes delay in message processing loop + // await logger?.debug("CLIENT: Received data in loop", metadata: ["size": "\(data.count)"]) + if Task.isCancelled { break } + + // Attempt to decode data + // Try decoding as a batch response first + if let batchResponse = try? decoder.decode([AnyResponse].self, from: data) { + await handleBatchResponse(batchResponse) + } else if let response = try? decoder.decode(AnyResponse.self, from: data) { + await handleResponse(response) + } else if let message = try? decoder.decode(AnyMessage.self, from: data) { + await handleMessage(message) + } else { + // REMOVED: await logger?.warning() - blocks message processing loop + // await logger?.warning( + // "Unexpected message received by client (not single/batch response or notification)", + // metadata: metadata + // ) + if let string = String(data: data, encoding: .utf8) { + NSLog("⚠️ CLIENT: Unexpected message: \(string)") } else { - var metadata: Logger.Metadata = [:] - if let string = String(data: data, encoding: .utf8) { - metadata["message"] = .string(string) - } - await logger?.warning( - "Unexpected message received by client (not single/batch response or notification)", - metadata: metadata - ) + NSLog("⚠️ CLIENT: Unexpected message (non-UTF8)") } } - } catch let error where MCPError.isResourceTemporarilyUnavailable(error) { - try? await Task.sleep(for: .milliseconds(10)) - continue - } catch { - await logger?.error( - "Error in message handling loop", metadata: ["error": "\(error)"]) - break } - } while true + } catch { + // REMOVED: await logger?.error() - blocks message processing loop + // await logger?.error( + // "Error in message handling loop", metadata: ["error": "\(error)"]) + NSLog("❌ CLIENT: Error in message handling loop: \(error)") + } + await self.logger?.debug("Client message handling loop task is terminating.") } + // CRITICAL: Give the Task time to actually start before we call initialize + // Without this, initialize() blocks waiting for a response that the Task hasn't started consuming yet + // Task.yield() alone isn't enough - we need actual time for the Task to begin execution + try? await Task.sleep(nanoseconds: 100_000_000) // 0.1 seconds + NSLog("🔵 CLIENT: Waited 0.1s for Task to start, proceeding with initialize") + // REMOVED: await logger?.debug() - this causes 10s hang due to actor boundary crossing + // await logger?.debug("CLIENT: Waited 0.1s for Task to start, proceeding with initialize") + // Automatically initialize after connecting + NSLog("🔵 CLIENT: About to call _initialize()") return try await _initialize() } @@ -291,38 +323,68 @@ public actor Client { /// Send a request and receive its response public func send(_ request: Request) async throws -> M.Result { + NSLog("🔵 CLIENT.send() ENTRY - method: \(M.name), id: \(request.id)") guard let connection = connection else { throw MCPError.internalError("Client connection not initialized") } let requestData = try encoder.encode(request) - - // Store the pending request first - return try await withCheckedThrowingContinuation { continuation in - Task { - // Add the pending request before attempting to send - self.addPendingRequest( - id: request.id, - continuation: continuation, - type: M.Result.self - ) - - // Send the request data - do { - // Use the existing connection send - try await connection.send(requestData) - } catch { - // If send fails, try to remove the pending request. - // Resume with the send error only if we successfully removed the request, - // indicating the response handler hasn't processed it yet. - if self.removePendingRequest(id: request.id) != nil { - continuation.resume(throwing: error) + NSLog("🔵 CLIENT.send() - Encoded request, adding timeout wrapper") + + // Add timeout wrapper (30 seconds) to prevent infinite hangs + return try await withThrowingTaskGroup(of: M.Result.self) { group in + // Task 1: Actual send operation + group.addTask { + return try await withCheckedThrowingContinuation { continuation in + NSLog("🔵 CLIENT.send() - Inside continuation, creating Task") + Task { + NSLog("🔵 CLIENT.send() - Task started, adding pending request") + // Add the pending request before attempting to send + await self.addPendingRequest( + id: request.id, + continuation: continuation, + type: M.Result.self + ) + NSLog("🔵 CLIENT.send() - Pending request added, calling connection.send()") + + // Send the request data + do { + // Use the existing connection send + try await connection.send(requestData) + NSLog("🔵 CLIENT.send() - connection.send() succeeded, waiting for response...") + } catch { + NSLog("🔵 CLIENT.send() - connection.send() failed: \(error)") + // If send fails, try to remove the pending request. + // Resume with the send error only if we successfully removed the request, + // indicating the response handler hasn't processed it yet. + if await self.tryRemovePendingRequest(id: request.id) { + continuation.resume(throwing: error) + } + // Otherwise, the request was already removed by the response handler + // or by disconnect, so the continuation was already resumed. + // Do nothing here. + } } - // Otherwise, the request was already removed by the response handler - // or by disconnect, so the continuation was already resumed. - // Do nothing here. } } + + // Task 2: Timeout task (30 seconds) + group.addTask { + try await Task.sleep(nanoseconds: 30_000_000_000) + NSLog("⏰ CLIENT.send() - TIMEOUT after 30 seconds for method: \(M.name)") + throw MCPError.internalError("Request timed out after 30 seconds") + } + + // Wait for first task to complete (either success or timeout) + do { + let result = try await group.next()! + group.cancelAll() // Cancel the other task + return result + } catch { + // Clean up pending request on timeout + await self.tryRemovePendingRequest(id: request.id) + throw error + } } } @@ -338,6 +400,17 @@ public actor Client { return pendingRequests.removeValue(forKey: id) } + private func hasPendingRequest(id: ID) -> Bool { + return pendingRequests[id] != nil + } + + @discardableResult + private func tryRemovePendingRequest(id: ID) -> Bool { + let existed = pendingRequests[id] != nil + pendingRequests.removeValue(forKey: id) + return existed + } + // MARK: - Batching /// A batch of requests. @@ -501,6 +574,7 @@ public actor Client { /// Internal initialization implementation private func _initialize() async throws -> Initialize.Result { + NSLog("🔵 CLIENT._initialize() - ENTRY") let request = Initialize.request( .init( protocolVersion: Version.latest, @@ -508,7 +582,9 @@ public actor Client { clientInfo: clientInfo )) + NSLog("🔵 CLIENT._initialize() - Calling send(request)") let result = try await send(request) + NSLog("🔵 CLIENT._initialize() - send() RETURNED with result!") self.serverCapabilities = result.capabilities self.serverVersion = result.protocolVersion @@ -611,9 +687,13 @@ public actor Client { public func callTool(name: String, arguments: [String: Value]? = nil) async throws -> ( content: [Tool.Content], isError: Bool? ) { + NSLog("🔵 CLIENT.callTool() ENTRY - tool: \(name)") try validateServerCapability(\.tools, "Tools") + NSLog("🔵 CLIENT.callTool() - Creating request") let request = CallTool.request(.init(name: name, arguments: arguments)) + NSLog("🔵 CLIENT.callTool() - Calling send()") let result = try await send(request) + NSLog("🔵 CLIENT.callTool() - send() RETURNED!") return (content: result.content, isError: result.isError) } @@ -659,9 +739,11 @@ public actor Client { // MARK: - private func handleResponse(_ response: Response) async { - await logger?.trace( - "Processing response", - metadata: ["id": "\(response.id)"]) + // REMOVED: await logger?.trace() - causes actor boundary crossing delay + // await logger?.trace( + // "Processing response", + // metadata: ["id": "\(response.id)"]) + NSLog("🔵 CLIENT.handleResponse() for id: \(response.id)") // Attempt to remove the pending request using the response ID. // Resume with the response only if it hadn't yet been removed. @@ -684,9 +766,11 @@ public actor Client { } private func handleMessage(_ message: Message) async { - await logger?.trace( - "Processing notification", - metadata: ["method": "\(message.method)"]) + // REMOVED: await logger?.trace() - causes actor boundary crossing delay + // await logger?.trace( + // "Processing notification", + // metadata: ["method": "\(message.method)"]) + NSLog("🔵 CLIENT.handleMessage() method: \(message.method)") // Find notification handlers for this method guard let handlers = notificationHandlers[message.method] else { return } @@ -728,7 +812,9 @@ public actor Client { // Add handler for batch responses private func handleBatchResponse(_ responses: [AnyResponse]) async { - await logger?.trace("Processing batch response", metadata: ["count": "\(responses.count)"]) + // REMOVED: await logger?.trace() - causes actor boundary crossing delay + // await logger?.trace("Processing batch response", metadata: ["count": "\(responses.count)"]) + NSLog("🔵 CLIENT.handleBatchResponse() count: \(responses.count)") for response in responses { // Attempt to remove the pending request. // If successful, pendingRequest contains the request.