Skip to content

Commit e8a8c42

Browse files
committed
Merge branch 'feat/streaming-llm' into staging
2 parents fc30180 + 5f4dfad commit e8a8c42

File tree

3 files changed

+57
-16
lines changed

3 files changed

+57
-16
lines changed

openhands/agenthub/codeact_agent/codeact_agent.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -302,9 +302,7 @@ async def _handle_streaming_response(self, streaming_response):
302302
{
303303
'message': {
304304
'role': 'assistant',
305-
'content': accumulated_content
306-
if accumulated_content
307-
else None,
305+
'content': None,
308306
'tool_calls': formatted_tool_calls,
309307
},
310308
'index': 0,

openhands/mcp/utils.py

Lines changed: 48 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
import json
23
import re
34
from typing import Any, Optional
@@ -55,32 +56,66 @@ async def create_mcp_clients(
5556
mnemonic: Optional[str] = None,
5657
) -> list[MCPClient]:
5758
mcp_clients: list[MCPClient] = []
58-
# Initialize SSE connections
59+
60+
# Create connection tasks for parallel execution
61+
connection_tasks = []
62+
clients = []
63+
5964
for name, mcp_config in dict_mcp_config.items():
6065
logger.info(
6166
f'Initializing MCP {name} agent for {mcp_config.url} with {mcp_config.mode} connection...'
6267
)
6368
# check if the name in a search engine config
6469
if f'search_engine_{name}' in dict_mcp_config:
6570
continue
71+
6672
client = MCPClient(name=name)
67-
try:
68-
await client.connect_sse(mcp_config.url, sid, mnemonic)
69-
# Only add the client to the list after a successful connection
70-
mcp_clients.append(client)
71-
logger.info(f'Connected to MCP server {mcp_config.url} via SSE')
72-
except Exception as e:
73-
logger.error(f'Failed to connect to {mcp_config.url}: {str(e)}')
73+
clients.append((client, name, mcp_config))
74+
75+
# Create connection task
76+
async def connect_client(client, mcp_config):
7477
try:
75-
await client.disconnect()
76-
except Exception as disconnect_error:
77-
logger.error(
78-
f'Error during disconnect after failed connection: {str(disconnect_error)}'
79-
)
78+
await client.connect_sse(mcp_config.url, sid, mnemonic)
79+
return client, None # Success: return client and no error
80+
except Exception as e:
81+
return None, e # Failure: return no client and the error
82+
83+
connection_tasks.append(connect_client(client, mcp_config))
84+
85+
if connection_tasks:
86+
results = await asyncio.gather(*connection_tasks, return_exceptions=True)
87+
88+
for i, result in enumerate(results):
89+
client, name, mcp_config = clients[i]
90+
91+
if isinstance(result, Exception):
92+
logger.error(f'Failed to connect to {mcp_config.url}: {str(result)}')
93+
await _safe_disconnect(client)
94+
elif isinstance(result, tuple) and len(result) == 2:
95+
client_result, error = result
96+
if error is None: # Success case
97+
mcp_clients.append(client_result)
98+
logger.info(f'Connected to MCP server {mcp_config.url} via SSE')
99+
else: # Error case
100+
logger.error(f'Failed to connect to {mcp_config.url}: {str(error)}')
101+
await _safe_disconnect(client)
102+
else:
103+
logger.error(f'Unexpected result format for {mcp_config.url}: {result}')
104+
await _safe_disconnect(client)
80105

81106
return mcp_clients
82107

83108

109+
async def _safe_disconnect(client: MCPClient):
110+
"""Safely disconnect a client with error handling."""
111+
try:
112+
await client.disconnect()
113+
except Exception as disconnect_error:
114+
logger.error(
115+
f'Error during disconnect after failed connection: {str(disconnect_error)}'
116+
)
117+
118+
84119
async def fetch_mcp_tools_from_config(
85120
dict_mcp_config: dict[str, MCPConfig],
86121
sid: Optional[str] = None,

openhands/memory/conversation_memory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
A2ASendTaskAction,
2525
)
2626
from openhands.events.action.mcp import McpAction
27+
from openhands.events.action.message import StreamingMessageAction
2728
from openhands.events.event import Event, RecallType
2829
from openhands.events.observation import (
2930
AgentCondensationObservation,
@@ -328,6 +329,13 @@ def _process_action(
328329
content=content,
329330
)
330331
]
332+
elif isinstance(action, StreamingMessageAction):
333+
return [
334+
Message(
335+
role='assistant',
336+
content=[TextContent(text=action.content)],
337+
)
338+
]
331339
elif isinstance(action, CmdRunAction) and action.source == 'user':
332340
content = [
333341
TextContent(text=f'User executed the command:\n{action.command}')

0 commit comments

Comments
 (0)