Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions examples/python/a2a_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# /// script
# dependencies = [
# "a2a-sdk[http-server]",
# "httpx",
# "openai",
# "uvicorn",
# ]
Expand All @@ -19,6 +20,7 @@
from typing import Dict, List, Any, Optional
from fastapi import FastAPI, Request, HTTPException, APIRouter
from fastapi.responses import JSONResponse
import httpx
from openai import OpenAI
from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
Expand All @@ -32,6 +34,18 @@
# Initialize OpenAI client
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

# Realtime API defaults (optional override via env or request body)
OPENAI_REALTIME_SESSIONS_URL = os.getenv(
"OPENAI_REALTIME_SESSIONS_URL",
"https://api.openai.com/v1/realtime/sessions",
)
DEFAULT_REALTIME_MODEL = os.getenv(
"OPENAI_REALTIME_MODEL",
"gpt-4o-realtime-preview-2024-12-17",
)
DEFAULT_REALTIME_VOICE = os.getenv("OPENAI_REALTIME_VOICE")
DEFAULT_REALTIME_MODALITIES = os.getenv("OPENAI_REALTIME_MODALITIES")

# Agent IDs
PERSONAL_ASSISTANT_ID = "00000000-0000-0000-0000-000000000000"
WEATHER_ASSISTANT_ID = "FFFFFFFF-FFFF-FFFF-FFFF-FFFFFFFFFFFF"
Expand Down Expand Up @@ -142,6 +156,64 @@ def build_system_message(agent_id: str, tools: List[Dict[str, Any]]) -> str:
return "\n".join(system_parts)


def parse_modalities(value: Optional[str]) -> Optional[List[str]]:
"""Parse comma-separated modalities from env var."""
if not value:
return None
return [item.strip() for item in value.split(",") if item.strip()]


def build_realtime_instructions(agent_id: str, extra_instructions: Optional[str]) -> str:
"""Build realtime instructions using the same agent system message."""
base_instructions = build_system_message(agent_id, AGENT_TOOLS.get(agent_id, []))
if extra_instructions:
return f"{base_instructions}\n\nAdditional instructions:\n{extra_instructions}"
return base_instructions


def build_realtime_session_payload(
agent_id: str,
body: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""Build OpenAI Realtime session payload with safe defaults."""
body = body or {}
tools = AGENT_TOOLS.get(agent_id, [])

payload: Dict[str, Any] = {
"model": body.get("model") or DEFAULT_REALTIME_MODEL,
"instructions": build_realtime_instructions(agent_id, body.get("instructions")),
}

modalities = body.get("modalities")
if modalities is None:
modalities = parse_modalities(DEFAULT_REALTIME_MODALITIES)
if modalities is not None:
payload["modalities"] = modalities

voice = body.get("voice")
if voice is None:
voice = DEFAULT_REALTIME_VOICE
if voice is not None:
payload["voice"] = voice

include_tools = body.get("include_tools", True)
if include_tools and tools:
payload["tools"] = tools
payload["tool_choice"] = body.get("tool_choice", "auto")

for key in (
"input_audio_format",
"output_audio_format",
"turn_detection",
"temperature",
"max_output_tokens",
):
if key in body:
payload[key] = body[key]

return payload


# Simple in-memory task storage (messages per task, keyed by agent_id:task_id)
task_messages: Dict[str, List[Dict[str, Any]]] = {}

Expand Down Expand Up @@ -491,6 +563,48 @@ async def get_agent_card(agent_id: str):

return JSONResponse(card.model_dump(mode="json"))

# Realtime session endpoint (WebRTC clients can use returned client_secret)
@app.post("/agents/{agent_id}/realtime/session")
async def create_realtime_session(request: Request, agent_id: str):
"""Create an OpenAI Realtime session scoped to the selected agent."""
if agent_id not in agent_a2a_apps:
raise HTTPException(status_code=404, detail=f"Agent {agent_id} not found")

api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
raise HTTPException(status_code=500, detail="OPENAI_API_KEY is not set")

try:
body = await request.json()
except Exception:
body = {}

payload = build_realtime_session_payload(agent_id, body)

try:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.post(
OPENAI_REALTIME_SESSIONS_URL,
json=payload,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
},
)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
raise HTTPException(
status_code=exc.response.status_code,
detail=exc.response.text,
) from exc
except httpx.HTTPError as exc:
raise HTTPException(
status_code=502,
detail=f"Realtime session request failed: {exc}",
) from exc

return JSONResponse(response.json())

# Create agent-specific A2A apps and mount their routes
agent_a2a_apps: Dict[str, A2ARESTFastAPIApplication] = {
PERSONAL_ASSISTANT_ID: personal_a2a_app,
Expand Down