Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion tourist_scheduling_system/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies = [
"opentelemetry-sdk>=1.20.0",
"opentelemetry-exporter-otlp-proto-http>=1.20.0",
"slimrpc==0.2.1",
"slima2a==0.2.1",
"slima2a==0.2.2",
]

[project.optional-dependencies]
Expand Down
43 changes: 35 additions & 8 deletions tourist_scheduling_system/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,29 @@ Options:
--no-demo Start servers only, no demo traffic
--real-agents Use real ADK guide/tourist agents instead of simulation

Environment variables (recommended for zsh - args may not work when sourcing):
export TRANSPORT=slim # Use SLIM transport
export TRACING=true # Enable OpenTelemetry tracing
export AZURE_OPENAI_API_KEY=...
source run.sh
Environment variables (can be used instead of flags):
TRANSPORT Transport mode (http/slim)
TRACING Enable tracing (true/false)
SCHED_PORT Scheduler port
UI_PORT Dashboard port
NUM_GUIDES Number of guides
NUM_TOURISTS Number of tourists
DURATION Duration in minutes
INTERVAL Request interval

# SLIM Configuration
SLIM_PORT SLIM node port (default: 46357)
SLIM_ENDPOINT SLIM endpoint URL
SLIM_SHARED_SECRET SLIM shared secret

# Tracing Configuration
JAEGER_OTLP_HTTP_PORT Jaeger OTLP HTTP port (default: 4318)
JAEGER_UI_PORT Jaeger UI port (default: 16686)

# Required for Agents
AZURE_OPENAI_API_KEY Azure OpenAI API Key
AZURE_OPENAI_ENDPOINT Azure OpenAI Endpoint
AZURE_OPENAI_DEPLOYMENT_NAME Azure OpenAI Deployment Name

Examples:
# Using environment variables (recommended for zsh)
Expand Down Expand Up @@ -345,6 +363,7 @@ _run_demo() {
# Guide categories and tourist preferences for variety
local GUIDE_CATEGORIES=("culture,history,food" "art,architecture" "nature,adventure" "food,nightlife" "history,museums")
local TOURIST_PREFS=("culture,history" "art,food" "nature,museums" "adventure,nightlife" "history,architecture")
local AGENT_PIDS=""

# Start guide agents
_log "Starting $_NUM_GUIDES guide agents..."
Expand All @@ -360,7 +379,9 @@ _run_demo() {

_log " Guide $GUIDE_ID: $CATS @ \$$RATE/hr"
"$PYTHON" -m agents.guide_agent "${GUIDE_ARGS[@]}" > "$GUIDE_LOG" 2>&1 &
_save_pid $!
local pid=$!
_save_pid $pid
AGENT_PIDS="$AGENT_PIDS $pid"

# Small delay between guides
sleep 2
Expand All @@ -384,14 +405,16 @@ _run_demo() {

_log " Tourist $TOURIST_ID: $PREFS @ \$$BUDGET budget"
"$PYTHON" -m agents.tourist_agent "${TOURIST_ARGS[@]}" > "$TOURIST_LOG" 2>&1 &
_save_pid $!
local pid=$!
_save_pid $pid
AGENT_PIDS="$AGENT_PIDS $pid"

# Small delay between tourists
sleep 2
done

_log "Waiting for agents to complete..."
wait
wait $AGENT_PIDS

_ok "Real agents demo complete!"
echo ""
Expand All @@ -402,6 +425,10 @@ _run_demo() {
for i in $(seq 1 "$_NUM_TOURISTS"); do
echo " tail -f ${_RUN_SCRIPT_DIR}/tourist_t${i}.log"
done

_log "Dashboard still running at http://localhost:$_UI_PORT"
_log "Press Ctrl+C to stop agents."
wait
else
_log "Running demo simulation..."

Expand Down
2 changes: 2 additions & 0 deletions tourist_scheduling_system/src/agents/guide_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import asyncio
import logging
import os
import sys
from datetime import datetime
from typing import Optional, TYPE_CHECKING

Expand Down Expand Up @@ -278,6 +279,7 @@ def main(
hourly_rate=hourly_rate,
max_group_size=max_group_size,
))
sys.exit(0)


if __name__ == "__main__":
Expand Down
36 changes: 9 additions & 27 deletions tourist_scheduling_system/src/agents/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ def register_tourist_request(
availability_end: str,
preferences: List[str],
budget: float,
tool_context: ToolContext,
) -> Dict[str, Any]:
"""
Register a tourist's request for scheduling.
Expand All @@ -167,7 +166,6 @@ def register_tourist_request(
availability_end: End time in ISO format (e.g., "2025-06-01T17:00:00")
preferences: List of preferred categories (e.g., ["culture", "history"])
budget: Maximum hourly budget in dollars
tool_context: ADK tool context for state access

Returns:
Confirmation with registration details
Expand Down Expand Up @@ -241,7 +239,6 @@ def register_guide_offer(
available_end: str,
hourly_rate: float,
max_group_size: int = 1,
tool_context: ToolContext = None,
) -> Dict[str, Any]:
"""
Register a guide's availability and capabilities.
Expand All @@ -256,7 +253,6 @@ def register_guide_offer(
available_end: End of availability in ISO format
hourly_rate: Guide's hourly rate in dollars
max_group_size: Maximum number of tourists the guide can handle (default: 1)
tool_context: ADK tool context for state access

Returns:
Confirmation with registration details
Expand Down Expand Up @@ -324,23 +320,15 @@ def register_guide_offer(


@traced("run_scheduling")
def run_scheduling(tool_context: ToolContext = None) -> Dict[str, Any]:
def run_scheduling() -> Dict[str, Any]:
"""
Execute the scheduling algorithm to match tourists with guides.

This tool runs a greedy scheduling algorithm that:
1. Sorts tourists by earliest availability
2. Matches each tourist to the best available guide based on:
- Budget constraints
- Time overlap
- Preference matching score
3. Creates assignments and schedule proposals

Args:
tool_context: ADK tool context for state access
This tool triggers the matching process based on current requests and offers.
It should be called after requests and offers have been registered.

Returns:
Summary of scheduling results including assignments made
Summary of the scheduling run
"""
try:
tourists = _scheduler_state.tourist_requests
Expand Down Expand Up @@ -371,7 +359,7 @@ def run_scheduling(tool_context: ToolContext = None) -> Dict[str, Any]:
assignments=[assignment],
status="proposed",
)
proposals.append(proposal.model_dump())
proposals.append(proposal)

# Notify UI agent for each assignment
send_to_ui_agent({
Expand Down Expand Up @@ -413,8 +401,8 @@ def run_scheduling(tool_context: ToolContext = None) -> Dict[str, Any]:
"status": "completed",
"message": f"Scheduled {len(assignments)} assignments from {len(tourists)} tourists and {len(guides)} guides",
"num_assignments": len(assignments),
"assignments": [a.model_dump() for a in assignments],
"proposals": proposals,
"assignments": [a.model_dump(mode='json') for a in assignments],
"proposals": [p.model_dump(mode='json') if hasattr(p, 'model_dump') else p for p in proposals],
}

except Exception as e:
Expand All @@ -425,7 +413,7 @@ def run_scheduling(tool_context: ToolContext = None) -> Dict[str, Any]:
}


def get_schedule_status(tool_context: ToolContext = None) -> Dict[str, Any]:
def get_schedule_status() -> Dict[str, Any]:
"""
Get the current status of the scheduler.

Expand All @@ -435,9 +423,6 @@ def get_schedule_status(tool_context: ToolContext = None) -> Dict[str, Any]:
- Number of completed assignments
- Utilization metrics

Args:
tool_context: ADK tool context for state access

Returns:
Current scheduler state summary
"""
Expand Down Expand Up @@ -551,13 +536,10 @@ def _build_schedule(
return assignments


def clear_scheduler_state(tool_context: ToolContext = None) -> Dict[str, Any]:
def clear_scheduler_state() -> Dict[str, Any]:
"""
Clear all scheduler state (for testing/reset).

Args:
tool_context: ADK tool context

Returns:
Confirmation message
"""
Expand Down
2 changes: 2 additions & 0 deletions tourist_scheduling_system/src/agents/tourist_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import asyncio
import logging
import os
import sys
from datetime import datetime
from typing import Optional

Expand Down Expand Up @@ -280,6 +281,7 @@ def main(
availability_end=availability_end,
budget=budget,
))
sys.exit(0)


if __name__ == "__main__":
Expand Down
48 changes: 45 additions & 3 deletions tourist_scheduling_system/src/agents/ui_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def to_dict(self) -> dict:

# Global dashboard state
_dashboard_state = DashboardState()
_broadcaster = None


def get_dashboard_state() -> DashboardState:
Expand All @@ -172,6 +173,15 @@ def clear_dashboard_state():
_dashboard_state = DashboardState()


async def broadcast_update():
"""Broadcast state update to dashboard clients."""
if _broadcaster:
try:
await _broadcaster()
except Exception as e:
logger.warning(f"[Dashboard] Broadcast failed: {e}")


# ============================================================================
# ADK Tool Functions for Dashboard
# ============================================================================
Expand Down Expand Up @@ -223,6 +233,13 @@ def record_tourist_request(
state.metrics.total_messages += 1
state.update_metrics()

# Trigger broadcast
try:
loop = asyncio.get_running_loop()
loop.create_task(broadcast_update())
except RuntimeError:
pass

logger.info(f"[Dashboard] Recorded tourist request from {tourist_id}")
return f"Recorded tourist request from {tourist_id}"

Expand Down Expand Up @@ -277,6 +294,13 @@ def record_guide_offer(
state.metrics.total_messages += 1
state.update_metrics()

# Trigger broadcast
try:
loop = asyncio.get_running_loop()
loop.create_task(broadcast_update())
except RuntimeError:
pass

logger.info(f"[Dashboard] Recorded guide offer from {guide_id}")
return f"Recorded guide offer from {guide_id}"

Expand Down Expand Up @@ -328,6 +352,13 @@ def record_assignment(
state.metrics.total_messages += 1
state.update_metrics()

# Trigger broadcast
try:
loop = asyncio.get_running_loop()
loop.create_task(broadcast_update())
except RuntimeError:
pass

logger.info(f"[Dashboard] Recorded assignment: {tourist_id} -> {guide_id}")
return f"Recorded assignment: {tourist_id} assigned to {guide_id}"

Expand Down Expand Up @@ -581,11 +612,22 @@ def main(host: str, port: int, transport: str, slim_endpoint: str, slim_local_id
set_transport_mode,
broadcast_to_clients,
)
# Create dashboard state
dashboard_state = DashboardState()
set_dashboard_state(dashboard_state)
# Use global dashboard state
set_dashboard_state(_dashboard_state)
set_transport_mode(transport)
dashboard_app = create_dashboard_app()

# Setup broadcaster
async def broadcaster():
state = get_dashboard_state()
await broadcast_to_clients({
"type": "initial_state",
"data": state.to_dict()
})

global _broadcaster
_broadcaster = broadcaster

logger.info(f"[ADK UI] Web dashboard enabled at http://{host}:{port}")
except ImportError as e:
logger.warning(f"[ADK UI] Dashboard module not available: {e}")
Expand Down
11 changes: 8 additions & 3 deletions tourist_scheduling_system/src/core/slim_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,17 @@
from slima2a.client_transport import SRPCTransport

class CustomSRPCTransport(SRPCTransport):
"""Custom transport to handle 'extensions' argument introduced in google-adk 1.19+."""
"""Custom transport to handle 'request_metadata' argument from google-adk."""
async def send_message(self, message, **kwargs):
if "extensions" in kwargs:
kwargs.pop("extensions")
if "request_metadata" in kwargs:
kwargs.pop("request_metadata")
return await super().send_message(message, **kwargs)

async def send_message_streaming(self, message, **kwargs):
if "request_metadata" in kwargs:
kwargs.pop("request_metadata")
return super().send_message_streaming(message, **kwargs)

SLIM_AVAILABLE = True
except ImportError:
SLIM_AVAILABLE = False
Expand Down
Loading