22import asyncio
33from uuid import uuid4
44
5- from openhands .a2a .client import A2ACardResolver , A2AClient
6- from openhands .a2a .common .types import TaskState , TaskStatusUpdateEvent
5+ from a2a .client import A2ACardResolver , A2AClient
6+ from a2a .types import AgentCard , JSONRPCErrorResponse , TaskState , TaskStatusUpdateEvent
7+
8+ from containers .runtime .code .openhands .utils .async_utils import call_async_from_sync
9+ from openhands .core .logger import openhands_logger as logger
710from openhands .core .message import TextContent
811
912
@@ -13,10 +16,7 @@ def __init__(self, a2a_server_url: str, session: str = None, history: bool = Fal
1316 self .history = history
1417
1518 self .card_resolver = A2ACardResolver (a2a_server_url )
16- self .card = self .card_resolver .get_agent_card ()
17-
18- print ('======= Agent Card ========' )
19- print (self .card .model_dump_json (exclude_none = True ))
19+ self .card : AgentCard = call_async_from_sync (self .card_resolver .get_agent_card ())
2020
2121 self .client = A2AClient (agent_card = self .card )
2222 if session :
@@ -61,21 +61,24 @@ async def completeTask(self, streaming, taskId, sessionId, messages: list[str]):
6161
6262 taskResult = None
6363 if streaming :
64- response_stream = self .client .send_task_streaming (payload )
64+ response_stream = self .client .send_message_streaming (payload )
6565 async for result in response_stream :
6666 print (f'stream event => { result .model_dump_json (exclude_none = True )} ' )
6767 if (
68- result .result
69- and isinstance (result .result , TaskStatusUpdateEvent )
70- and result .result .final
68+ result .root
69+ and isinstance (result .root . result , TaskStatusUpdateEvent )
70+ and result .root . result .final
7171 ):
7272 return False
7373 else :
74- taskResult = await self .client .send_task (payload )
74+ taskResult = await self .client .send_message (payload )
7575 print (f'\n task result => { taskResult .model_dump_json (exclude_none = True )} ' )
7676 ## if the result is that more input is required, loop again.
77- state = TaskState (taskResult .result .status .state )
78- if state .name == TaskState .INPUT_REQUIRED .name :
77+ if isinstance (taskResult .root , JSONRPCErrorResponse ):
78+ logger .error (f'Error sending message to agent A2A: { taskResult .root } ' )
79+ return False
80+ state = TaskState (taskResult .root .result .status .state )
81+ if state .name == TaskState .input_required .name :
7982 return await self .completeTask (streaming , taskId , sessionId )
8083 else :
8184 ## task is complete
0 commit comments