Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 57 additions & 3 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ConversationState,
)
from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent
from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent
from openhands.sdk.security.analyzer import SecurityAnalyzerBase
from openhands.sdk.security.confirmation_policy import ConfirmationPolicyBase
from openhands.sdk.utils.async_utils import AsyncCallbackWrapper
Expand Down Expand Up @@ -231,6 +232,55 @@ async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID:
async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool:
return self._pub_sub.unsubscribe(subscriber_id)

def _emit_event_from_thread(self, event: Event) -> None:
"""Helper to safely emit events from non-async contexts (e.g., callbacks).

This schedules event emission in the main event loop, making it safe to call
from callbacks that may run in different threads.
"""
if self._main_loop and self._main_loop.is_running():
asyncio.run_coroutine_threadsafe(self._pub_sub(event), self._main_loop)

def _setup_llm_log_streaming(self, agent: Agent) -> None:
"""Configure LLM log callbacks to stream logs via events."""
for llm in agent.get_all_llms():
if not llm.log_completions:
continue

# Capture variables for closure
usage_id = llm.usage_id
model_name = llm.model

def log_callback(
filename: str, log_data: str, uid=usage_id, model=model_name
) -> None:
"""Callback to emit LLM completion logs as events."""
event = LLMCompletionLogEvent(
filename=filename,
log_data=log_data,
model_name=model,
usage_id=uid,
)
self._emit_event_from_thread(event)

llm.telemetry.set_log_callback(log_callback)

def _setup_stats_streaming(self, agent: Agent) -> None:
"""Configure stats update callbacks to stream stats changes via events."""

def stats_callback() -> None:
"""Callback to emit stats updates."""
# Publish only the stats field to avoid sending entire state
if not self._conversation:
return
state = self._conversation._state
with state:
event = ConversationStateUpdateEvent(key="stats", value=state.stats)
self._emit_event_from_thread(event)

for llm in agent.get_all_llms():
llm.telemetry.set_stats_update_callback(stats_callback)

async def start(self):
# Store the main event loop for cross-thread communication
self._main_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
Expand All @@ -241,6 +291,13 @@ async def start(self):
assert isinstance(workspace, LocalWorkspace)
Path(workspace.working_dir).mkdir(parents=True, exist_ok=True)
agent = Agent.model_validate(self.stored.agent.model_dump())

# Setup LLM log streaming for remote execution
self._setup_llm_log_streaming(agent)

# Setup stats streaming for remote execution
self._setup_stats_streaming(agent)

conversation = LocalConversation(
agent=agent,
workspace=workspace,
Expand Down Expand Up @@ -358,12 +415,9 @@ async def _publish_state_update(self):

state = self._conversation._state
with state:
# Create state update event with current state information
state_update_event = ConversationStateUpdateEvent.from_conversation_state(
state
)

# Publish the state update event
await self._pub_sub(state_update_event)

async def __aenter__(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import os
import threading
import uuid
from collections.abc import Mapping
Expand All @@ -26,6 +27,7 @@
FULL_STATE_KEY,
ConversationStateUpdateEvent,
)
from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent
from openhands.sdk.llm import LLM, Message, TextContent
from openhands.sdk.logger import DEBUG, get_logger
from openhands.sdk.observability.laminar import observe
Expand Down Expand Up @@ -423,6 +425,7 @@ class RemoteConversation(BaseConversation):
max_iteration_per_run: int
workspace: RemoteWorkspace
_client: httpx.Client
_log_completion_folders: dict[str, str]

def __init__(
self,
Expand Down Expand Up @@ -461,6 +464,13 @@ def __init__(
self.workspace = workspace
self._client = workspace.client

# Build map of log directories for all LLMs in the agent
self._log_completion_folders = {}
for llm in agent.get_all_llms():
if llm.log_completions:
# Map usage_id to log folder
self._log_completion_folders[llm.usage_id] = llm.log_completions_folder
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we necessarily need this since we can always get this when needed?


if conversation_id is None:
payload = {
"agent": agent.model_dump(
Expand Down Expand Up @@ -502,6 +512,11 @@ def __init__(
state_update_callback = self._state.create_state_update_callback()
self._callbacks.append(state_update_callback)

# Add callback to handle LLM completion logs
if self._log_completion_folders:
llm_log_callback = self._create_llm_completion_log_callback()
self._callbacks.append(llm_log_callback)

# Handle visualization configuration
if isinstance(visualizer, ConversationVisualizerBase):
# Use custom visualizer instance
Expand Down Expand Up @@ -541,6 +556,32 @@ def __init__(

self._start_observability_span(str(self._id))

def _create_llm_completion_log_callback(self) -> ConversationCallbackType:
"""Create a callback that writes LLM completion logs to client filesystem."""

def callback(event: Event) -> None:
if not isinstance(event, LLMCompletionLogEvent):
return

# Get the log directory for this LLM's usage_id
log_dir = self._log_completion_folders.get(event.usage_id)
if not log_dir:
logger.debug(
f"No log directory configured for usage_id={event.usage_id}"
)
return

try:
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, event.filename)
with open(log_path, "w") as f:
f.write(event.log_data)
logger.debug(f"Wrote LLM completion log to {log_path}")
except Exception as e:
logger.warning(f"Failed to write LLM completion log: {e}")

return callback

@property
def id(self) -> ConversationID:
return self._id
Expand Down
2 changes: 2 additions & 0 deletions openhands-sdk/openhands/sdk/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
CondensationSummaryEvent,
)
from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent
from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent
from openhands.sdk.event.llm_convertible import (
ActionEvent,
AgentErrorEvent,
Expand Down Expand Up @@ -35,6 +36,7 @@
"CondensationRequest",
"CondensationSummaryEvent",
"ConversationStateUpdateEvent",
"LLMCompletionLogEvent",
"EventID",
"ToolCallID",
]
39 changes: 39 additions & 0 deletions openhands-sdk/openhands/sdk/event/llm_completion_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Event for streaming LLM completion logs from remote agents to clients."""

from pydantic import Field

from openhands.sdk.event.base import Event
from openhands.sdk.event.types import SourceType


class LLMCompletionLogEvent(Event):
"""Event containing LLM completion log data.

When an LLM is configured with log_completions=True in a remote conversation,
this event streams the completion log data back to the client through WebSocket
instead of writing it to a file inside the Docker container.
"""

source: SourceType = "environment"
filename: str = Field(
...,
description="The intended filename for this log (relative to log directory)",
)
log_data: str = Field(
...,
description="The JSON-encoded log data to be written to the file",
)
model_name: str = Field(
default="unknown",
description="The model name for context",
)
usage_id: str = Field(
default="default",
description="The LLM usage_id that produced this log",
)

def __str__(self) -> str:
return (
f"LLMCompletionLog(usage_id={self.usage_id}, model={self.model_name}, "
f"file={self.filename})"
)
15 changes: 15 additions & 0 deletions openhands-sdk/openhands/sdk/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,21 @@ def metrics(self) -> Metrics:
)
return self._metrics

@property
def telemetry(self) -> Telemetry:
"""Get telemetry handler for this LLM instance.

Returns:
Telemetry object for managing logging and metrics callbacks.

Example:
>>> llm.telemetry.set_log_callback(my_callback)
"""
assert self._telemetry is not None, (
"Telemetry should be initialized after model validation"
)
return self._telemetry

def restore_metrics(self, metrics: Metrics) -> None:
# Only used by ConversationStats to seed metrics
self._metrics = metrics
Expand Down
66 changes: 51 additions & 15 deletions openhands-sdk/openhands/sdk/llm/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import uuid
import warnings
from collections.abc import Callable
from typing import Any, ClassVar

from litellm.cost_calculator import completion_cost as litellm_completion_cost
Expand Down Expand Up @@ -42,12 +43,32 @@ class Telemetry(BaseModel):
_req_start: float = PrivateAttr(default=0.0)
_req_ctx: dict[str, Any] = PrivateAttr(default_factory=dict)
_last_latency: float = PrivateAttr(default=0.0)
_log_callback: Callable[[str, str], None] | None = PrivateAttr(default=None)
_stats_update_callback: Callable[[], None] | None = PrivateAttr(default=None)

model_config: ClassVar[ConfigDict] = ConfigDict(
extra="forbid", arbitrary_types_allowed=True
)

# ---------- Lifecycle ----------
def set_log_callback(self, callback: Callable[[str, str], None] | None) -> None:
"""Set a callback function for logging instead of writing to file.

Args:
callback: A function that takes (filename, log_data) and handles the log.
Used for streaming logs in remote execution contexts.
"""
self._log_callback = callback

def set_stats_update_callback(self, callback: Callable[[], None] | None) -> None:
"""Set a callback function to be notified when stats are updated.

Args:
callback: A function called whenever metrics are updated.
Used for streaming stats updates in remote execution contexts.
"""
self._stats_update_callback = callback

def on_request(self, log_ctx: dict | None) -> None:
self._req_start = time.time()
self._req_ctx = log_ctx or {}
Expand Down Expand Up @@ -86,6 +107,13 @@ def on_response(
if self.log_enabled:
self.log_llm_call(resp, cost, raw_resp=raw_resp)

# 5) notify about stats update
if self._stats_update_callback is not None:
try:
self._stats_update_callback()
except Exception:
logger.exception("Stats update callback failed", exc_info=True)

return self.metrics.deep_copy()

def on_error(self, _err: Exception) -> None:
Expand Down Expand Up @@ -218,22 +246,17 @@ def log_llm_call(
cost: float | None,
raw_resp: ModelResponse | ResponsesAPIResponse | None = None,
) -> None:
if not self.log_dir:
# Skip if neither file logging nor callback is configured
if not self.log_dir and not self._log_callback:
return
try:
# Create log directory if it doesn't exist
os.makedirs(self.log_dir, exist_ok=True)
if not os.access(self.log_dir, os.W_OK):
raise PermissionError(f"log_dir is not writable: {self.log_dir}")

fname = os.path.join(
self.log_dir,
(
f"{self.model_name.replace('/', '__')}-"
f"{time.time():.3f}-"
f"{uuid.uuid4().hex[:4]}.json"
),
# Prepare filename and log data
filename = (
f"{self.model_name.replace('/', '__')}-"
f"{time.time():.3f}-"
f"{uuid.uuid4().hex[:4]}.json"
)

data = self._req_ctx.copy()
data["response"] = (
resp # ModelResponse | ResponsesAPIResponse;
Expand Down Expand Up @@ -297,8 +320,21 @@ def log_llm_call(
and "tools" in data["kwargs"]
):
data["kwargs"].pop("tools")
with open(fname, "w", encoding="utf-8") as f:
f.write(json.dumps(data, default=_safe_json, ensure_ascii=False))

log_data = json.dumps(data, default=_safe_json, ensure_ascii=False)

# Use callback if set (for remote execution), otherwise write to file
if self._log_callback:
self._log_callback(filename, log_data)
elif self.log_dir:
# Create log directory if it doesn't exist
os.makedirs(self.log_dir, exist_ok=True)
if not os.access(self.log_dir, os.W_OK):
raise PermissionError(f"log_dir is not writable: {self.log_dir}")

fname = os.path.join(self.log_dir, filename)
with open(fname, "w", encoding="utf-8") as f:
f.write(log_data)
except Exception as e:
warnings.warn(f"Telemetry logging failed: {e}")

Expand Down
Loading
Loading