Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 29 additions & 86 deletions openhands-sdk/openhands/sdk/agent/agent.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import json

from pydantic import ValidationError, model_validator
from pydantic import Field, ValidationError, model_validator

import openhands.sdk.security.analyzer as analyzer
import openhands.sdk.security.risk as risk
from openhands.sdk.agent.base import AgentBase
from openhands.sdk.agent.utils import fix_malformed_tool_arguments
Expand Down Expand Up @@ -42,15 +41,15 @@
should_enable_observability,
)
from openhands.sdk.observability.utils import extract_action_name
from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.sdk.security.security_service import (
DefaultSecurityService,
)
from openhands.sdk.tool import (
Action,
Observation,
)
from openhands.sdk.tool.builtins import (
FinishAction,
FinishTool,
ThinkAction,
)


Expand All @@ -72,6 +71,15 @@ class Agent(AgentBase):
>>> agent = Agent(llm=llm, tools=tools)
"""

security_service: DefaultSecurityService | None = Field(
default=None,
description="Based on the Security Analyzer and Confirmation Policy "
" from conversationState, we conduct a security analysis of the "
"relevant actions.",
examples=[{"kind": "DefaultSecurityService"}],
exclude=True,
)

@model_validator(mode="before")
@classmethod
def _add_security_prompt_as_default(cls, data):
Expand All @@ -93,6 +101,16 @@ def init_state(
on_event: ConversationCallbackType,
) -> None:
super().init_state(state, on_event=on_event)
# Build the security service based on the security analyzer and
# confirmation policy from conversationState.
try:
object.__setattr__(
self,
"security_service",
DefaultSecurityService(state),
)
except Exception as e:
raise ValueError(f"Could not set Agent.security_service:{e}")
# TODO(openhands): we should add test to test this init_state will actually
# modify state in-place

Expand Down Expand Up @@ -232,7 +250,6 @@ def step(
tool_call,
llm_response_id=llm_response.id,
on_event=on_event,
security_analyzer=state.security_analyzer,
thought=thought_content
if i == 0
else [], # Only first gets thought
Expand All @@ -248,8 +265,11 @@ def step(
continue
action_events.append(action_event)

# Handle confirmation mode - exit early if actions need confirmation
if self._requires_user_confirmation(state, action_events):
# Handle confirmation mode - exit early if actions need access_confirm
if self.security_service.access_confirm(action_events).access_confirm: # type: ignore
state.execution_status = (
ConversationExecutionStatus.WAITING_FOR_CONFIRMATION
)
return

if action_events:
Expand Down Expand Up @@ -279,87 +299,11 @@ def step(
)
on_event(token_event)

def _requires_user_confirmation(
self, state: ConversationState, action_events: list[ActionEvent]
) -> bool:
"""
Decide whether user confirmation is needed to proceed.

Rules:
1. Confirmation mode is enabled
2. Every action requires confirmation
3. A single `FinishAction` never requires confirmation
4. A single `ThinkAction` never requires confirmation
"""
# A single `FinishAction` or `ThinkAction` never requires confirmation
if len(action_events) == 1 and isinstance(
action_events[0].action, (FinishAction, ThinkAction)
):
return False

# If there are no actions there is nothing to confirm
if len(action_events) == 0:
return False

# If a security analyzer is registered, use it to grab the risks of the actions
# involved. If not, we'll set the risks to UNKNOWN.
if state.security_analyzer is not None:
risks = [
risk
for _, risk in state.security_analyzer.analyze_pending_actions(
action_events
)
]
else:
risks = [risk.SecurityRisk.UNKNOWN] * len(action_events)

# Grab the confirmation policy from the state and pass in the risks.
if any(state.confirmation_policy.should_confirm(risk) for risk in risks):
state.execution_status = (
ConversationExecutionStatus.WAITING_FOR_CONFIRMATION
)
return True

return False

def _extract_security_risk(
self,
arguments: dict,
tool_name: str,
read_only_tool: bool,
security_analyzer: analyzer.SecurityAnalyzerBase | None = None,
) -> risk.SecurityRisk:
requires_sr = isinstance(security_analyzer, LLMSecurityAnalyzer)
raw = arguments.pop("security_risk", None)

# Default risk value for action event
# Tool is marked as read-only so security risk can be ignored
if read_only_tool:
return risk.SecurityRisk.UNKNOWN

# Raises exception if failed to pass risk field when expected
# Exception will be sent back to agent as error event
# Strong models like GPT-5 can correct itself by retrying
if requires_sr and raw is None:
raise ValueError(
f"Failed to provide security_risk field in tool '{tool_name}'"
)

# When using weaker models without security analyzer
# safely ignore missing security risk fields
if not requires_sr and raw is None:
return risk.SecurityRisk.UNKNOWN

# Raises exception if invalid risk enum passed by LLM
security_risk = risk.SecurityRisk(raw)
return security_risk

def _get_action_event(
self,
tool_call: MessageToolCall,
llm_response_id: str,
on_event: ConversationCallbackType,
security_analyzer: analyzer.SecurityAnalyzerBase | None = None,
thought: list[TextContent] | None = None,
reasoning_content: str | None = None,
thinking_blocks: list[ThinkingBlock | RedactedThinkingBlock] | None = None,
Expand Down Expand Up @@ -405,11 +349,10 @@ def _get_action_event(

# Fix malformed arguments (e.g., JSON strings for list/dict fields)
arguments = fix_malformed_tool_arguments(arguments, tool.action_type)
security_risk = self._extract_security_risk(
security_risk = self.security_service.extract_security_risk( # type: ignore
arguments,
tool.name,
tool.annotations.readOnlyHint if tool.annotations else False,
security_analyzer,
)
assert "security_risk" not in arguments, (
"Unexpected 'security_risk' key found in tool arguments"
Expand Down
136 changes: 136 additions & 0 deletions openhands-sdk/openhands/sdk/security/security_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from abc import ABC, abstractmethod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SecurityService (interface)
├── SecurityServiceBase (base implementation)
    └── DefaultSecurityService (concrete implementation)

We seem to have multiple layers for the interface, could we reduce them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, adjustments done!

from typing import TYPE_CHECKING

from openhands.sdk.utils.models import DiscriminatedUnionMixin


if TYPE_CHECKING:
from openhands.sdk.conversation.state import ConversationState
from groq import BaseModel

from openhands.sdk.event.llm_convertible.action import ActionEvent
from openhands.sdk.security import risk
from openhands.sdk.security.llm_analyzer import LLMSecurityAnalyzer
from openhands.sdk.tool.builtins.finish import FinishAction
from openhands.sdk.tool.builtins.think import ThinkAction


class AccessConfirmRes(BaseModel):
access_confirm: bool
security_level: risk.SecurityRisk | None = None


class SecurityServiceBase(DiscriminatedUnionMixin, ABC):
"""
Security service interface defining core security-related methods.
"""

@abstractmethod
def access_confirm(self, action_events: list[ActionEvent]) -> AccessConfirmRes:
"""
Determine whether user confirmation is required to proceed with actions.

:param action_events: List of pending action events
:return: AccessConfirm object will never return None:
- access_confirm: bool = True (needs user confirmation)
/ False (no confirmation needed)
- security_level: Optional[risk.SecurityRisk] = Security risk
level of the actions (defaults to the highest risk level among all
actions; None if no risk))
"""
pass


class DefaultSecurityService(SecurityServiceBase):
def __init__(
self,
state: "ConversationState",
):
self._state = state

def access_confirm(
self,
action_events: list[ActionEvent],
) -> AccessConfirmRes:
"""
Decide whether user confirmation is needed to proceed.

Rules:
1. Confirmation mode is enabled
2. Every action requires confirmation
3. A single `FinishAction` never requires confirmation
4. A single `ThinkAction` never requires confirmation
"""
# If there are no actions there is nothing to confirm
if len(action_events) == 0:
return AccessConfirmRes(access_confirm=False)

if all(
isinstance(action_event.action, (FinishAction, ThinkAction))
for action_event in action_events
):
return AccessConfirmRes(access_confirm=False)
# If a security analyzer is registered, use it to grab the risks of the actions
# involved. If not, we'll set the risks to UNKNOWN.
non_unknown_risks = []
if self._state.security_analyzer is not None:
risks = [
r
for _, r in self._state.security_analyzer.analyze_pending_actions(
action_events
)
]
non_unknown_risks = [r for r in risks if r != risk.SecurityRisk.UNKNOWN]
else:
risks = [risk.SecurityRisk.UNKNOWN for _ in action_events]

access_confirm = any(
self._state.confirmation_policy.should_confirm(r) for r in risks
)
# Return the highest risk level.
if non_unknown_risks:
security_level = max(
non_unknown_risks,
key=lambda x: {
risk.SecurityRisk.LOW: 1,
risk.SecurityRisk.MEDIUM: 2,
risk.SecurityRisk.HIGH: 3,
}[x],
)
else:
security_level = risk.SecurityRisk.UNKNOWN

return AccessConfirmRes(
access_confirm=access_confirm, security_level=security_level
)

def extract_security_risk(
self,
arguments: dict,
tool_name: str,
read_only_tool: bool,
) -> risk.SecurityRisk:
requires_sr = isinstance(self._state.security_analyzer, LLMSecurityAnalyzer)
raw = arguments.pop("security_risk", None)

# Default risk value for action event
# Tool is marked as read-only so security risk can be ignored
if read_only_tool:
return risk.SecurityRisk.UNKNOWN

# Raises exception if failed to pass risk field when expected
# Exception will be sent back to agent as error event
# Strong models like GPT-5 can correct itself by retrying
if requires_sr and raw is None:
raise ValueError(
f"Failed to provide security_risk field in tool '{tool_name}'"
)

# When using weaker models without security analyzer
# safely ignore missing security risk fields
if not requires_sr and raw is None:
return risk.SecurityRisk.UNKNOWN

# Raises exception if invalid risk enum passed by LLM
security_risk = risk.SecurityRisk(raw)
return security_risk
Loading
Loading