Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,5 @@ jobs:
enable-cache: true
- name: Install dependencies
run: make sync
- name: Install Python 3.9 dependencies
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved to makefile

run: UV_PROJECT_ENVIRONMENT=.venv_39 uv sync --all-extras --all-packages --group dev
- name: Run tests
run: make old_version_tests
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ celerybeat.pid
*.sage.py

# Environments
.env
.python-version
.env*
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for local python 3.9 tests

.venv
env/
venv/
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ snapshots-create:
uv run pytest --inline-snapshot=create

.PHONY: old_version_tests
old_version_tests:
old_version_tests:
UV_PROJECT_ENVIRONMENT=.venv_39 uv sync --python 3.9 --all-extras --all-packages --group dev
UV_PROJECT_ENVIRONMENT=.venv_39 uv run --python 3.9 -m pytest

.PHONY: build-docs
Expand Down
11 changes: 8 additions & 3 deletions examples/realtime/app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ To use the same UI with your own agents, edit `agent.py` and ensure get_starting
1. Click **Connect** to establish a realtime session
2. Audio capture starts automatically - just speak naturally
3. Click the **Mic On/Off** button to mute/unmute your microphone
4. Watch the conversation unfold in the left pane
5. Monitor raw events in the right pane (click to expand/collapse)
6. Click **Disconnect** when done
4. To send an image, enter an optional prompt and click **🖼️ Send Image** (select a file)
5. Watch the conversation unfold in the left pane (image thumbnails are shown)
6. Monitor raw events in the right pane (click to expand/collapse)
7. Click **Disconnect** when done

## Architecture

- **Backend**: FastAPI server with WebSocket connections for real-time communication
- **Session Management**: Each connection gets a unique session with the OpenAI Realtime API
- **Image Inputs**: The UI uploads images and the server forwards a
`conversation.item.create` event with `input_image` (plus optional `input_text`),
followed by `response.create` to start the model response. The messages pane
renders image bubbles for `input_image` content.
- **Audio Processing**: 24kHz mono audio capture and playback
- **Event Handling**: Full event stream processing with transcript generation
- **Frontend**: Vanilla JavaScript with clean, responsive CSS
Expand Down
163 changes: 161 additions & 2 deletions examples/realtime/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
from typing_extensions import assert_never

from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent
from agents.realtime.config import RealtimeUserInputMessage
from agents.realtime.model_inputs import RealtimeModelSendRawMessage

# Import TwilioHandler class - handle both module and package use cases
if TYPE_CHECKING:
Expand Down Expand Up @@ -64,6 +66,34 @@ async def send_audio(self, session_id: str, audio_bytes: bytes):
if session_id in self.active_sessions:
await self.active_sessions[session_id].send_audio(audio_bytes)

async def send_client_event(self, session_id: str, event: dict[str, Any]):
"""Send a raw client event to the underlying realtime model."""
session = self.active_sessions.get(session_id)
if not session:
return
await session.model.send_event(
RealtimeModelSendRawMessage(
message={
"type": event["type"],
"other_data": {k: v for k, v in event.items() if k != "type"},
}
)
)

async def send_user_message(self, session_id: str, message: RealtimeUserInputMessage):
"""Send a structured user message via the higher-level API (supports input_image)."""
session = self.active_sessions.get(session_id)
if not session:
return
await session.send_message(message) # delegates to RealtimeModelSendUserInput path

async def interrupt(self, session_id: str) -> None:
"""Interrupt current model playback/response for a session."""
session = self.active_sessions.get(session_id)
if not session:
return
await session.interrupt()

async def _process_events(self, session_id: str):
try:
session = self.active_sessions[session_id]
Expand Down Expand Up @@ -101,7 +131,11 @@ async def _serialize_event(self, event: RealtimeSessionEvent) -> dict[str, Any]:
elif event.type == "history_updated":
base_event["history"] = [item.model_dump(mode="json") for item in event.history]
elif event.type == "history_added":
pass
# Provide the added item so the UI can render incrementally.
try:
base_event["item"] = event.item.model_dump(mode="json")
except Exception:
base_event["item"] = None
elif event.type == "guardrail_tripped":
base_event["guardrail_results"] = [
{"name": result.guardrail.name} for result in event.guardrail_results
Expand Down Expand Up @@ -134,6 +168,7 @@ async def lifespan(app: FastAPI):
@app.websocket("/ws/{session_id}")
async def websocket_endpoint(websocket: WebSocket, session_id: str):
await manager.connect(websocket, session_id)
image_buffers: dict[str, dict[str, Any]] = {}
try:
while True:
data = await websocket.receive_text()
Expand All @@ -144,6 +179,124 @@ async def websocket_endpoint(websocket: WebSocket, session_id: str):
int16_data = message["data"]
audio_bytes = struct.pack(f"{len(int16_data)}h", *int16_data)
await manager.send_audio(session_id, audio_bytes)
elif message["type"] == "image":
logger.info("Received image message from client (session %s).", session_id)
# Build a conversation.item.create with input_image (and optional input_text)
data_url = message.get("data_url")
prompt_text = message.get("text") or "Please describe this image."
if data_url:
logger.info(
"Forwarding image (structured message) to Realtime API (len=%d).",
len(data_url),
)
user_msg: RealtimeUserInputMessage = {
"type": "message",
"role": "user",
"content": (
[
{"type": "input_image", "image_url": data_url, "detail": "high"},
{"type": "input_text", "text": prompt_text},
]
if prompt_text
else [
{"type": "input_image", "image_url": data_url, "detail": "high"}
]
),
}
await manager.send_user_message(session_id, user_msg)
# Acknowledge to client UI
await websocket.send_text(
json.dumps(
{
"type": "client_info",
"info": "image_enqueued",
"size": len(data_url),
}
)
)
else:
await websocket.send_text(
json.dumps(
{
"type": "error",
"error": "No data_url for image message.",
}
)
)
elif message["type"] == "commit_audio":
# Force close the current input audio turn
await manager.send_client_event(session_id, {"type": "input_audio_buffer.commit"})
elif message["type"] == "image_start":
img_id = str(message.get("id"))
image_buffers[img_id] = {
"text": message.get("text") or "Please describe this image.",
"chunks": [],
}
await websocket.send_text(
json.dumps({"type": "client_info", "info": "image_start_ack", "id": img_id})
)
elif message["type"] == "image_chunk":
img_id = str(message.get("id"))
chunk = message.get("chunk", "")
if img_id in image_buffers:
image_buffers[img_id]["chunks"].append(chunk)
if len(image_buffers[img_id]["chunks"]) % 10 == 0:
await websocket.send_text(
json.dumps(
{
"type": "client_info",
"info": "image_chunk_ack",
"id": img_id,
"count": len(image_buffers[img_id]["chunks"]),
}
)
)
elif message["type"] == "image_end":
img_id = str(message.get("id"))
buf = image_buffers.pop(img_id, None)
if buf is None:
await websocket.send_text(
json.dumps({"type": "error", "error": "Unknown image id for image_end."})
)
else:
data_url = "".join(buf["chunks"]) if buf["chunks"] else None
prompt_text = buf["text"]
if data_url:
logger.info(
"Forwarding chunked image (structured message) to Realtime API (len=%d).",
len(data_url),
)
user_msg2: RealtimeUserInputMessage = {
"type": "message",
"role": "user",
"content": (
[
{"type": "input_image", "image_url": data_url, "detail": "high"},
{"type": "input_text", "text": prompt_text},
]
if prompt_text
else [
{"type": "input_image", "image_url": data_url, "detail": "high"}
]
),
}
await manager.send_user_message(session_id, user_msg2)
await websocket.send_text(
json.dumps(
{
"type": "client_info",
"info": "image_enqueued",
"id": img_id,
"size": len(data_url),
}
)
)
else:
await websocket.send_text(
json.dumps({"type": "error", "error": "Empty image."})
)
elif message["type"] == "interrupt":
await manager.interrupt(session_id)

except WebSocketDisconnect:
await manager.disconnect(session_id)
Expand All @@ -160,4 +313,10 @@ async def read_index():
if __name__ == "__main__":
import uvicorn

uvicorn.run(app, host="0.0.0.0", port=8000)
uvicorn.run(
app,
host="0.0.0.0",
port=8000,
# Increased WebSocket frame size to comfortably handle image data URLs.
ws_max_size=16 * 1024 * 1024,
)
Loading