Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ See [cookbook/agno_agent.py](cookbook/agno_agent.py) for an example of tracing a

## Version changelog

### 3.9.11

- feat: Adds support for OpenAI realtime API tracing using Websockets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use proper capitalization for "WebSockets".

-- feat: Adds support for OpenAI realtime API tracing using Websockets
+- feat: Adds support for OpenAI realtime API tracing using WebSockets
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- feat: Adds support for OpenAI realtime API tracing using Websockets
- feat: Adds support for OpenAI realtime API tracing using WebSockets
🧰 Tools
🪛 LanguageTool

[grammar] ~67-~67: Use proper capitalization
Context: ...t for OpenAI realtime API tracing using Websockets ### 3.9.10 - fix: removes signal hand...

(QB_NEW_EN_OTHER_ERROR_IDS_6)

🤖 Prompt for AI Agents
In README.md at line 67, the term "Websockets" is not properly capitalized.
Change "Websockets" to "WebSockets" to use the correct capitalization.


### 3.9.10

- fix: removes signal handlers from the package.
Expand Down
2 changes: 2 additions & 0 deletions maxim/logger/livekit/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ def handle_tool_call_executed(self, event: FunctionToolsExecutedEvent):
)
tool_output = ""
for output in event.function_call_outputs or []:
if output is None:
continue
if output.call_id == function_call.call_id:
tool_output = output.output
break
Expand Down
353 changes: 353 additions & 0 deletions maxim/logger/openai/realtime/websocket/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,353 @@
import json
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional
from queue import Queue
from websockets.asyncio.client import ClientConnection
from ....logger import Logger
from .....scribe import scribe

Comment on lines +1 to +11
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Remove unused imports.

The threading and Queue imports are not used in the code and should be removed.

 import json
-import threading
 import time
 import uuid
 from concurrent.futures import ThreadPoolExecutor
 from typing import Any, Callable, Dict, List, Optional
-from queue import Queue
 from websockets.asyncio.client import ClientConnection
 from ....logger import Logger
 from .....scribe import scribe
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
import json
import threading
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional
from queue import Queue
from websockets.asyncio.client import ClientConnection
from ....logger import Logger
from .....scribe import scribe
import json
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional
from websockets.asyncio.client import ClientConnection
from ....logger import Logger
from .....scribe import scribe
🧰 Tools
🪛 Ruff (0.12.2)

2-2: threading imported but unused

Remove unused import: threading

(F401)


6-6: typing.Dict is deprecated, use dict instead

(UP035)


6-6: typing.List is deprecated, use list instead

(UP035)


7-7: queue.Queue imported but unused

Remove unused import: queue.Queue

(F401)

🤖 Prompt for AI Agents
In maxim/logger/openai/realtime/websocket/wrapper.py at lines 1 to 11, the
imports for threading and Queue are not used anywhere in the code. Remove these
two import statements to clean up the code and avoid unnecessary imports.


class WebSocketEvent:
"""Represents a websocket event with metadata."""

def __init__(
self,
event_type: str,
data: Any,
direction: str,
timestamp: Optional[float] = None,
):
self.event_type = event_type
self.data = data
self.direction = direction # 'inbound' or 'outbound'
self.timestamp = timestamp or time.time()
self.event_id = str(uuid.uuid4())

def to_dict(self) -> Dict[str, Any]:
"""Convert event to dictionary for logging."""
return {
"event_id": self.event_id,
"event_type": self.event_type,
"data": self.data,
"direction": self.direction,
"timestamp": self.timestamp,
}


def connect_with_maxim_wrapper():
pass


class OpenAIRealtimeWebsocketWrapper(ClientConnection):
"""
A WebSocket ClientConnection that logs OpenAI Realtime API events while maintaining
full compatibility with the websockets.ClientConnection interface.
This wrapper extends websockets.ClientConnection and can be used as a drop-in replacement
while adding OpenAI-specific logging and event processing capabilities.
"""

def __init__(
self, logger: Logger, session_id: Optional[str] = None, *args, **kwargs
):
# OpenAI-specific setup
self.maxim_logger = logger
self.session_id = session_id or str(uuid.uuid4())

# Event processing
self.event_callbacks: List[Callable[[WebSocketEvent], None]] = []
self.executor = ThreadPoolExecutor(
max_workers=1, thread_name_prefix="ws-processor"
)

# Stats
self.events_processed = 0
self.connection_start_time = time.time()

# Call parent constructor
super().__init__(*args, **kwargs)

scribe().info(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Initialized with session_id: {self.session_id}"
)

async def send(self, message, **kwargs):
"""Override send to log outbound messages."""
try:
# Process the message for logging
await self._process_outbound_message(message)

# Call the parent send method
result = await super().send(message, **kwargs)

scribe().debug(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Sent message successfully"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Remove unnecessary f-string prefixes.

Several log messages use f-string syntax without any placeholders. These should be regular strings.

For example:

-                f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Sent message successfully"
+                "[MaximSDK][OpenAIRealtimeWebsocketWrapper] Sent message successfully"

Apply similar changes to lines 108, 127, 162, and 178.

Also applies to: 108-108, 127-127, 162-162, 178-178

🧰 Tools
🪛 Ruff (0.12.2)

87-87: f-string without any placeholders

Remove extraneous f prefix

(F541)


87-87: Trailing comma missing

Add trailing comma

(COM812)

🤖 Prompt for AI Agents
In maxim/logger/openai/realtime/websocket/wrapper.py at lines 87, 108, 127, 162,
and 178, remove the unnecessary f-string prefixes from log messages that do not
contain any placeholders. Replace the f-strings with regular string literals to
simplify the code and improve readability.

)

return result

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to send message: {e}"
)
raise

async def recv(self, **kwargs):
"""Override recv to log inbound messages."""
try:
# Call the parent recv method
message = await super().recv(**kwargs)

# Process the message for logging
await self._process_inbound_message(message)

scribe().debug(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Received message successfully"
)

return message

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to receive message: {e}"
)
raise

async def recv_streaming(self, **kwargs):
"""Override recv_streaming to log inbound streaming messages."""
try:
# Call the parent recv_streaming method
stream = await super().recv_streaming(**kwargs)

# TODO: Consider how to handle streaming message logging
scribe().debug(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Received streaming message"
)

return stream

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to receive streaming message: {e}"
)
raise
Comment on lines +119 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Address the TODO for streaming message logging.

The recv_streaming method has a TODO comment about handling streaming message logging. Currently, the streaming data is not processed or logged in detail like regular messages.

Would you like me to help implement proper streaming message processing or create an issue to track this TODO?

🧰 Tools
🪛 Ruff (0.12.2)

119-119: Missing return type annotation for public function recv_streaming

(ANN201)


119-119: Missing type annotation for **kwargs

(ANN003)


127-127: f-string without any placeholders

Remove extraneous f prefix

(F541)


127-127: Trailing comma missing

Add trailing comma

(COM812)


130-130: Consider moving this statement to an else block

(TRY300)


134-134: Trailing comma missing

Add trailing comma

(COM812)

🤖 Prompt for AI Agents
In maxim/logger/openai/realtime/websocket/wrapper.py around lines 119 to 136,
the recv_streaming method has a TODO about logging streaming messages but
currently only logs a generic message without processing the stream data. To fix
this, modify the method to asynchronously iterate over the streaming messages
received from the parent recv_streaming call, log each message's content or
relevant details for debugging, and yield or return the messages appropriately
to preserve the streaming behavior.


async def close(self, code=1000, reason="", **kwargs):
"""Override close to log connection closure and cleanup."""
try:
scribe().info(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Closing connection with code: {code}, reason: {reason}"
)

# Call the parent close method
result = await super().close(code, reason, **kwargs)

# Cleanup resources
self._cleanup()

return result

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Error during close: {e}"
)
raise

async def ping(self, data=b"", **kwargs):
"""Override ping to log ping frames."""
try:
scribe().debug(f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Sending ping")

# Call the parent ping method
result = await super().ping(data, **kwargs)

return result

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to send ping: {e}"
)
raise

async def pong(self, data=b"", **kwargs):
"""Override pong to log pong frames."""
try:
scribe().debug(f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Sending pong")

# Call the parent pong method
result = await super().pong(data, **kwargs)

return result

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to send pong: {e}"
)
raise

def add_event_callback(self, callback: Callable[[WebSocketEvent], None]):
"""Add a callback function to be called for each processed event."""
self.event_callbacks.append(callback)
scribe().debug(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Added event callback. Total callbacks: {len(self.event_callbacks)}"
)

def remove_event_callback(self, callback: Callable[[WebSocketEvent], None]):
"""Remove an event callback."""
if callback in self.event_callbacks:
self.event_callbacks.remove(callback)
scribe().debug(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Removed event callback. Total callbacks: {len(self.event_callbacks)}"
)

async def _process_inbound_message(self, message: Any):
"""Process and log an inbound message."""
try:
# Parse the message
if isinstance(message, str):
try:
data = json.loads(message)
event_type = (
data.get("type", "unknown") if isinstance(data, dict) else "raw"
)
except json.JSONDecodeError:
data = message
event_type = "raw"
else:
data = message
event_type = "binary" if isinstance(message, bytes) else "raw"

# Create event for processing
event = WebSocketEvent(
event_type=event_type, data=data, direction="inbound"
)

# Process event asynchronously
self._process_event_async(event)
self.events_processed += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Consider thread-safe counter and proper executor shutdown.

  1. The events_processed counter is incremented from async methods that could potentially be called concurrently. Consider using threading.Lock or asyncio.Lock for thread-safe increments.

  2. The executor shutdown with wait=False might leave event processing tasks incomplete. Consider using wait=True or implementing a graceful shutdown mechanism.

+        self._event_counter_lock = asyncio.Lock()
         self.events_processed = 0

         # In _process_inbound_message and _process_outbound_message:
-        self.events_processed += 1
+        async with self._event_counter_lock:
+            self.events_processed += 1

         # In _cleanup:
-        self.executor.shutdown(wait=False)
+        self.executor.shutdown(wait=True, cancel_futures=True)

Also applies to: 261-261, 329-330

🤖 Prompt for AI Agents
In maxim/logger/openai/realtime/websocket/wrapper.py at lines 230, 261, and
329-330, the events_processed counter is incremented in async contexts without
thread safety, risking race conditions. Use an asyncio.Lock or threading.Lock to
protect increments of events_processed to ensure thread-safe updates.
Additionally, revise the executor shutdown calls to use wait=True or implement a
graceful shutdown to ensure all event processing tasks complete before shutdown.


except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to process inbound message: {e}"
)

async def _process_outbound_message(self, message: Any):
"""Process and log an outbound message."""
try:
# Parse the message
if isinstance(message, str):
try:
data = json.loads(message)
event_type = (
data.get("type", "unknown") if isinstance(data, dict) else "raw"
)
except json.JSONDecodeError:
data = message
event_type = "raw"
else:
data = message
event_type = "binary" if isinstance(message, bytes) else "raw"

# Create event for processing
event = WebSocketEvent(
event_type=event_type, data=data, direction="outbound"
)

# Process event asynchronously
self._process_event_async(event)
self.events_processed += 1

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to process outbound message: {e}"
)

def _process_event_async(self, event: WebSocketEvent):
"""Process a websocket event asynchronously."""
try:
# Submit to thread pool for parallel processing
self.executor.submit(self._process_event_sync, event)
except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to submit event for processing: {e}"
)

def _process_event_sync(self, event: WebSocketEvent):
"""Synchronously process and log the event."""
try:
# Call custom callbacks
for callback in self.event_callbacks:
try:
callback(event)
except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Event callback failed: {e}"
)

# Log to Maxim if logger is available
if self.maxim_logger:
try:
# TODO: Implement Maxim-specific logging here
pass
except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Maxim logging failed: {e}"
)

scribe().debug(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Processed {event.direction} event: {event.event_type}"
)

except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Failed to process event {event.event_id}: {e}"
)

def get_stats(self) -> Dict[str, Any]:
"""Get connection and processing statistics."""
duration = time.time() - self.connection_start_time
return {
"session_id": self.session_id,
"connection_duration": duration,
"events_processed": self.events_processed,
"active_callbacks": len(self.event_callbacks),
"state": str(self.state) if hasattr(self, "state") else "unknown",
"local_address": (
str(self.local_address) if hasattr(self, "local_address") else None
),
"remote_address": (
str(self.remote_address) if hasattr(self, "remote_address") else None
),
}

def _cleanup(self):
"""Cleanup resources during connection closure."""
try:
# Shutdown event processor
self.executor.shutdown(wait=False)

stats = self.get_stats()
scribe().info(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Connection closed. Final stats: {stats}"
)
except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Error during cleanup: {e}"
)

def __enter__(self):
"""Context manager entry."""
return self

def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
try:
# Note: In async context, close() should be awaited, but this is sync context
self._cleanup()
except Exception as e:
scribe().warning(
f"[MaximSDK][OpenAIRealtimeWebsocketWrapper] Error in context manager exit: {e}"
)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "maxim-py"
version = "3.9.10"
version = "3.9.11"
description = "A package that allows you to use the Maxim Python Library to interact with the Maxim Platform."
readme = "README.md"
requires-python = ">=3.9.20"
Expand Down