Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ build-frontend:
# Start backend
start-backend:
@echo "$(YELLOW)Starting backend...$(RESET)"
@poetry run uvicorn openhands.server.listen:app --host $(BACKEND_HOST) --port $(BACKEND_PORT) --reload --reload-exclude "./workspace" --reload-exclude "./tests/e2e/*"
@poetry run uvicorn openhands.server.listen:app --host $(BACKEND_HOST) --port $(BACKEND_PORT) --reload --reload-exclude "./workspace/*"

# Start frontend
start-frontend:
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
environment:
- DB_USER=$PGBOUNCER_DB_USER
- DB_PASSWORD=$PGBOUNCER_DB_PASSWORD
- DB_HOST=$PGBOUNCER_DB_HOST
- DB_HOST=postgres
- DB_NAME=$PGBOUNCER_DB_NAME
- POOL_MODE=transaction
- ADMIN_USERS=postgres,$PGBOUNCER_DB_USER
Expand Down
91 changes: 60 additions & 31 deletions openhands/a2a/A2AManager.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,27 @@
import asyncio
import json
import os
import uuid
from abc import ABC
from typing import AsyncGenerator, List

from openhands.a2a.client.card_resolver import A2ACardResolver
from openhands.a2a.client.client import A2AClient
from openhands.a2a.common.types import (
A2AClientHTTPError,
A2AClientJSONError,
import httpx
from a2a.client import A2ACardResolver, A2AClient
from a2a.client.errors import A2AClientHTTPError, A2AClientJSONError
from a2a.types import (
AgentCard,
Message,
SendTaskResponse,
SendTaskStreamingResponse,
TaskSendParams,
MessageSendParams,
SendMessageRequest,
SendMessageResponse,
SendStreamingMessageResponse,
TextPart,
)

from openhands.core.logger import openhands_logger as logger

A2A_REQUEST_DEFAULT_TIMEOUT = float(os.getenv('A2A_REQUEST_DEFAULT_TIMEOUT') or 120.0)


class A2AManager(ABC):
list_remote_agent_servers: List[str] = []
Expand All @@ -34,7 +39,8 @@ async def initialize_agent_cards(self):
return

async def fetch_card(server_url: str) -> AgentCard | None:
async with A2ACardResolver(server_url) as resolver:
async with httpx.AsyncClient() as httpx_client:
resolver = A2ACardResolver(httpx_client, server_url)
try:
return await resolver.get_agent_card()
except (A2AClientHTTPError, A2AClientJSONError) as e:
Expand All @@ -58,13 +64,26 @@ def list_remote_agents(self):
remote_agent_info = []
for card in self.list_remote_agent_cards.values():
remote_agent_info.append(
{'name': card.name, 'description': card.description}
{
'agent_name': card.name,
'agent_description': card.description,
'agent_skills': json.dumps(
[
{
'skill_name': skill.name,
'skill_description': skill.description,
'skill_examples': skill.examples,
}
for skill in card.skills
]
),
}
)
return remote_agent_info

async def send_task(
self, agent_name: str, message: str, sid: str
) -> AsyncGenerator[SendTaskStreamingResponse | SendTaskResponse, None]:
async def send_message(
self, agent_name: str, message: str, sid: str, role: str = 'user'
) -> AsyncGenerator[SendStreamingMessageResponse | SendMessageResponse, None]:
"""Send a task to a remote agent and yield task responses.

Args:
Expand All @@ -79,25 +98,35 @@ async def send_task(
raise ValueError(f'Agent {agent_name} not found')

card = self.list_remote_agent_cards[agent_name]
client = A2AClient(card)
request: TaskSendParams = TaskSendParams(
id=str(uuid.uuid4()),
sessionId=sid,
message=Message(
role='user',
parts=[TextPart(text=message)],
metadata={},
),
acceptedOutputModes=['text', 'text/plain', 'image/png'],
metadata={'conversation_id': sid},
)

if card.capabilities.streaming:
async for response in client.send_task_streaming(request):
async with httpx.AsyncClient(
timeout=A2A_REQUEST_DEFAULT_TIMEOUT
) as httpx_client:
client = A2AClient(httpx_client, card)
params: MessageSendParams = MessageSendParams(
message=Message(
role=role,
parts=[TextPart(text=message)],
message_id=uuid.uuid4().hex,
),
acceptedOutputModes=['text', 'text/plain', 'image/png'],
metadata={'conversation_id': sid, 'session_id': sid},
)
request: SendMessageRequest = SendMessageRequest(
id=str(uuid.uuid4()),
params=params,
)

logger.info(f'Sending task to {agent_name} with message: {message}')
logger.info(f'Card capabilities: {card.capabilities}')
if card.capabilities.streaming:
async for response in client.send_message_streaming(request=request):
yield response
else:
response = await client.send_message(request=request)
yield response
else:
response = await client.send_task(request)
yield response

# async def send_cancel_task(self, task_id: str, sid: str):
# pass

@classmethod
def from_toml_config(cls, config: dict) -> 'A2AManager':
Expand Down
29 changes: 16 additions & 13 deletions openhands/a2a/a2a.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import asyncio
from uuid import uuid4

from openhands.a2a.client import A2ACardResolver, A2AClient
from openhands.a2a.common.types import TaskState, TaskStatusUpdateEvent
from a2a.client import A2ACardResolver, A2AClient
from a2a.types import AgentCard, JSONRPCErrorResponse, TaskState, TaskStatusUpdateEvent

from openhands.core.logger import openhands_logger as logger
from openhands.core.message import TextContent
from openhands.utils.async_utils import call_async_from_sync


class A2AAgent:
Expand All @@ -13,10 +16,7 @@ def __init__(self, a2a_server_url: str, session: str = None, history: bool = Fal
self.history = history

self.card_resolver = A2ACardResolver(a2a_server_url)
self.card = self.card_resolver.get_agent_card()

print('======= Agent Card ========')
print(self.card.model_dump_json(exclude_none=True))
self.card: AgentCard = call_async_from_sync(self.card_resolver.get_agent_card())

self.client = A2AClient(agent_card=self.card)
if session:
Expand Down Expand Up @@ -61,21 +61,24 @@ async def completeTask(self, streaming, taskId, sessionId, messages: list[str]):

taskResult = None
if streaming:
response_stream = self.client.send_task_streaming(payload)
response_stream = self.client.send_message_streaming(payload)
async for result in response_stream:
print(f'stream event => {result.model_dump_json(exclude_none=True)}')
if (
result.result
and isinstance(result.result, TaskStatusUpdateEvent)
and result.result.final
result.root
and isinstance(result.root.result, TaskStatusUpdateEvent)
and result.root.result.final
):
return False
else:
taskResult = await self.client.send_task(payload)
taskResult = await self.client.send_message(payload)
print(f'\ntask result => {taskResult.model_dump_json(exclude_none=True)}')
## if the result is that more input is required, loop again.
state = TaskState(taskResult.result.status.state)
if state.name == TaskState.INPUT_REQUIRED.name:
if isinstance(taskResult.root, JSONRPCErrorResponse):
logger.error(f'Error sending message to agent A2A: {taskResult.root}')
return False
state = TaskState(taskResult.root.result.status.state)
if state.name == TaskState.input_required.name:
return await self.completeTask(streaming, taskId, sessionId)
else:
## task is complete
Expand Down
4 changes: 0 additions & 4 deletions openhands/a2a/client/__init__.py

This file was deleted.

50 changes: 0 additions & 50 deletions openhands/a2a/client/card_resolver.py

This file was deleted.

83 changes: 0 additions & 83 deletions openhands/a2a/client/client.py

This file was deleted.

Loading
Loading