Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 70 additions & 4 deletions openhands-agent-server/openhands/agent_server/event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
)
from openhands.agent_server.pub_sub import PubSub, Subscriber
from openhands.agent_server.utils import utc_now
from openhands.sdk import LLM, Agent, Event, Message, get_logger
from openhands.sdk import LLM, Agent, AgentBase, Event, Message, get_logger
from openhands.sdk.conversation.impl.local_conversation import LocalConversation
from openhands.sdk.conversation.secret_registry import SecretValue
from openhands.sdk.conversation.state import (
ConversationExecutionStatus,
ConversationState,
)
from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent
from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent
from openhands.sdk.security.analyzer import SecurityAnalyzerBase
from openhands.sdk.security.confirmation_policy import ConfirmationPolicyBase
from openhands.sdk.utils.async_utils import AsyncCallbackWrapper
Expand Down Expand Up @@ -279,6 +280,67 @@ async def subscribe_to_events(self, subscriber: Subscriber[Event]) -> UUID:
async def unsubscribe_from_events(self, subscriber_id: UUID) -> bool:
return self._pub_sub.unsubscribe(subscriber_id)

def _emit_event_from_thread(self, event: Event) -> None:
"""Helper to safely emit events from non-async contexts (e.g., callbacks).

This schedules event emission in the main event loop, making it safe to call
from callbacks that may run in different threads. Events are emitted through
the conversation's normal event flow to ensure they are persisted.
"""
if self._main_loop and self._main_loop.is_running() and self._conversation:
# Capture conversation reference for closure
conversation = self._conversation

# Wrap _on_event with lock acquisition to ensure thread-safe access
# to conversation state and event log during concurrent operations
def locked_on_event():
with conversation._state:
conversation._on_event(event)

# Run the locked callback in an executor to ensure the event is
# both persisted and sent to WebSocket subscribers
self._main_loop.run_in_executor(None, locked_on_event)

def _setup_llm_log_streaming(self, agent: AgentBase) -> None:
"""Configure LLM log callbacks to stream logs via events."""
for llm in agent.get_all_llms():
if not llm.log_completions:
continue

# Capture variables for closure
usage_id = llm.usage_id
model_name = llm.model

def log_callback(
filename: str, log_data: str, uid=usage_id, model=model_name
) -> None:
"""Callback to emit LLM completion logs as events."""
event = LLMCompletionLogEvent(
filename=filename,
log_data=log_data,
model_name=model,
usage_id=uid,
)
self._emit_event_from_thread(event)

llm.telemetry.set_log_completions_callback(log_callback)

def _setup_stats_streaming(self, agent: AgentBase) -> None:
"""Configure stats update callbacks to stream stats changes via events."""

def stats_callback() -> None:
"""Callback to emit stats updates."""
# Publish only the stats field to avoid sending entire state
if not self._conversation:
return
state = self._conversation._state
with state:
event = ConversationStateUpdateEvent(key="stats", value=state.stats)
self._emit_event_from_thread(event)

for llm in agent.get_all_llms():
llm.telemetry.set_stats_update_callback(stats_callback)

async def start(self):
# Store the main event loop for cross-thread communication
self._main_loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
Expand All @@ -289,6 +351,7 @@ async def start(self):
assert isinstance(workspace, LocalWorkspace)
Path(workspace.working_dir).mkdir(parents=True, exist_ok=True)
agent = Agent.model_validate(self.stored.agent.model_dump())

conversation = LocalConversation(
agent=agent,
workspace=workspace,
Expand All @@ -310,6 +373,12 @@ async def start(self):
# Register state change callback to automatically publish updates
self._conversation._state.set_on_state_change(self._conversation._on_event)

# Setup LLM log streaming for remote execution
self._setup_llm_log_streaming(self._conversation.agent)

# Setup stats streaming for remote execution
self._setup_stats_streaming(self._conversation.agent)

# Publish initial state update
await self._publish_state_update()

Expand Down Expand Up @@ -406,12 +475,9 @@ async def _publish_state_update(self):

state = self._conversation._state
with state:
# Create state update event with current state information
state_update_event = ConversationStateUpdateEvent.from_conversation_state(
state
)

# Publish the state update event
await self._pub_sub(state_update_event)

async def __aenter__(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import json
import os
import threading
import uuid
from collections.abc import Mapping
Expand All @@ -26,6 +27,7 @@
FULL_STATE_KEY,
ConversationStateUpdateEvent,
)
from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent
from openhands.sdk.llm import LLM, Message, TextContent
from openhands.sdk.logger import DEBUG, get_logger
from openhands.sdk.observability.laminar import observe
Expand Down Expand Up @@ -502,6 +504,12 @@ def __init__(
state_update_callback = self._state.create_state_update_callback()
self._callbacks.append(state_update_callback)

# Add callback to handle LLM completion logs
# Register callback if any LLM has log_completions enabled
if any(llm.log_completions for llm in agent.get_all_llms()):
llm_log_callback = self._create_llm_completion_log_callback()
self._callbacks.append(llm_log_callback)

# Handle visualization configuration
if isinstance(visualizer, ConversationVisualizerBase):
# Use custom visualizer instance
Expand Down Expand Up @@ -541,6 +549,39 @@ def __init__(

self._start_observability_span(str(self._id))

def _create_llm_completion_log_callback(self) -> ConversationCallbackType:
"""Create a callback that writes LLM completion logs to client filesystem."""

def callback(event: Event) -> None:
if not isinstance(event, LLMCompletionLogEvent):
return

# Find the LLM with matching usage_id
target_llm = None
for llm in self.agent.get_all_llms():
if llm.usage_id == event.usage_id:
target_llm = llm
break

if not target_llm or not target_llm.log_completions:
logger.debug(
f"No LLM with log_completions enabled found "
f"for usage_id={event.usage_id}"
)
return

try:
log_dir = target_llm.log_completions_folder
os.makedirs(log_dir, exist_ok=True)
log_path = os.path.join(log_dir, event.filename)
with open(log_path, "w") as f:
f.write(event.log_data)
logger.debug(f"Wrote LLM completion log to {log_path}")
except Exception as e:
logger.warning(f"Failed to write LLM completion log: {e}")

return callback

@property
def id(self) -> ConversationID:
return self._id
Expand Down
2 changes: 2 additions & 0 deletions openhands-sdk/openhands/sdk/event/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
CondensationSummaryEvent,
)
from openhands.sdk.event.conversation_state import ConversationStateUpdateEvent
from openhands.sdk.event.llm_completion_log import LLMCompletionLogEvent
from openhands.sdk.event.llm_convertible import (
ActionEvent,
AgentErrorEvent,
Expand Down Expand Up @@ -35,6 +36,7 @@
"CondensationRequest",
"CondensationSummaryEvent",
"ConversationStateUpdateEvent",
"LLMCompletionLogEvent",
"EventID",
"ToolCallID",
]
39 changes: 39 additions & 0 deletions openhands-sdk/openhands/sdk/event/llm_completion_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
"""Event for streaming LLM completion logs from remote agents to clients."""

from pydantic import Field

from openhands.sdk.event.base import Event
from openhands.sdk.event.types import SourceType


class LLMCompletionLogEvent(Event):
"""Event containing LLM completion log data.

When an LLM is configured with log_completions=True in a remote conversation,
this event streams the completion log data back to the client through WebSocket
instead of writing it to a file inside the Docker container.
"""

source: SourceType = "environment"
filename: str = Field(
...,
description="The intended filename for this log (relative to log directory)",
)
log_data: str = Field(
...,
description="The JSON-encoded log data to be written to the file",
)
model_name: str = Field(
default="unknown",
description="The model name for context",
)
usage_id: str = Field(
default="default",
description="The LLM usage_id that produced this log",
)

def __str__(self) -> str:
return (
f"LLMCompletionLog(usage_id={self.usage_id}, model={self.model_name}, "
f"file={self.filename})"
)
15 changes: 15 additions & 0 deletions openhands-sdk/openhands/sdk/llm/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,21 @@ def metrics(self) -> Metrics:
)
return self._metrics

@property
def telemetry(self) -> Telemetry:
"""Get telemetry handler for this LLM instance.

Returns:
Telemetry object for managing logging and metrics callbacks.

Example:
>>> llm.telemetry.set_log_completions_callback(my_callback)
"""
assert self._telemetry is not None, (
"Telemetry should be initialized after model validation"
)
return self._telemetry

def restore_metrics(self, metrics: Metrics) -> None:
# Only used by ConversationStats to seed metrics
self._metrics = metrics
Expand Down
70 changes: 55 additions & 15 deletions openhands-sdk/openhands/sdk/llm/utils/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
import uuid
import warnings
from collections.abc import Callable
from typing import Any, ClassVar

from litellm.cost_calculator import completion_cost as litellm_completion_cost
Expand Down Expand Up @@ -42,12 +43,36 @@ class Telemetry(BaseModel):
_req_start: float = PrivateAttr(default=0.0)
_req_ctx: dict[str, Any] = PrivateAttr(default_factory=dict)
_last_latency: float = PrivateAttr(default=0.0)
_log_completions_callback: Callable[[str, str], None] | None = PrivateAttr(
default=None
)
_stats_update_callback: Callable[[], None] | None = PrivateAttr(default=None)

model_config: ClassVar[ConfigDict] = ConfigDict(
extra="forbid", arbitrary_types_allowed=True
)

# ---------- Lifecycle ----------
def set_log_completions_callback(
self, callback: Callable[[str, str], None] | None
) -> None:
"""Set a callback function for logging instead of writing to file.

Args:
callback: A function that takes (filename, log_data) and handles the log.
Used for streaming logs in remote execution contexts.
"""
self._log_completions_callback = callback

def set_stats_update_callback(self, callback: Callable[[], None] | None) -> None:
"""Set a callback function to be notified when stats are updated.

Args:
callback: A function called whenever metrics are updated.
Used for streaming stats updates in remote execution contexts.
"""
self._stats_update_callback = callback

def on_request(self, log_ctx: dict | None) -> None:
self._req_start = time.time()
self._req_ctx = log_ctx or {}
Expand Down Expand Up @@ -86,6 +111,13 @@ def on_response(
if self.log_enabled:
self.log_llm_call(resp, cost, raw_resp=raw_resp)

# 5) notify about stats update
if self._stats_update_callback is not None:
try:
self._stats_update_callback()
except Exception:
logger.exception("Stats update callback failed", exc_info=True)

return self.metrics.deep_copy()

def on_error(self, _err: Exception) -> None:
Expand Down Expand Up @@ -218,22 +250,17 @@ def log_llm_call(
cost: float | None,
raw_resp: ModelResponse | ResponsesAPIResponse | None = None,
) -> None:
if not self.log_dir:
# Skip if neither file logging nor callback is configured
if not self.log_dir and not self._log_completions_callback:
return
try:
# Create log directory if it doesn't exist
os.makedirs(self.log_dir, exist_ok=True)
if not os.access(self.log_dir, os.W_OK):
raise PermissionError(f"log_dir is not writable: {self.log_dir}")

fname = os.path.join(
self.log_dir,
(
f"{self.model_name.replace('/', '__')}-"
f"{time.time():.3f}-"
f"{uuid.uuid4().hex[:4]}.json"
),
# Prepare filename and log data
filename = (
f"{self.model_name.replace('/', '__')}-"
f"{time.time():.3f}-"
f"{uuid.uuid4().hex[:4]}.json"
)

data = self._req_ctx.copy()
data["response"] = (
resp # ModelResponse | ResponsesAPIResponse;
Expand Down Expand Up @@ -297,8 +324,21 @@ def log_llm_call(
and "tools" in data["kwargs"]
):
data["kwargs"].pop("tools")
with open(fname, "w", encoding="utf-8") as f:
f.write(json.dumps(data, default=_safe_json, ensure_ascii=False))

log_data = json.dumps(data, default=_safe_json, ensure_ascii=False)

# Use callback if set (for remote execution), otherwise write to file
if self._log_completions_callback:
self._log_completions_callback(filename, log_data)
elif self.log_dir:
# Create log directory if it doesn't exist
os.makedirs(self.log_dir, exist_ok=True)
if not os.access(self.log_dir, os.W_OK):
raise PermissionError(f"log_dir is not writable: {self.log_dir}")

fname = os.path.join(self.log_dir, filename)
with open(fname, "w", encoding="utf-8") as f:
f.write(log_data)
except Exception as e:
warnings.warn(f"Telemetry logging failed: {e}")

Expand Down
Loading
Loading