Skip to content

Tool Metrics #11

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 357 additions & 1 deletion registry/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,75 @@
# --- WebSocket Connection Management ---
active_connections: Set[WebSocket] = set()

# --- Tool Metrics Tracking --- START
TOOL_METRICS_FILE = SERVERS_DIR / "tool_metrics.json"
tool_metrics_data = [] # List of metric entries: [{server_name, tool_name, execution_result, feedback, timestamp, ...}]
tool_metrics_lock = asyncio.Lock()

def load_tool_metrics_data():
"""Load tool metrics data from file."""
global tool_metrics_data
try:
if TOOL_METRICS_FILE.exists():
with open(TOOL_METRICS_FILE, "r") as f:
loaded_data = json.load(f)
if isinstance(loaded_data, list):
tool_metrics_data = loaded_data
logger.info(f"Loaded {len(tool_metrics_data)} tool metric entries from {TOOL_METRICS_FILE}")
else:
logger.warning(f"Invalid tool metrics format in {TOOL_METRICS_FILE}. Expected a list. Resetting.")
tool_metrics_data = []
else:
logger.info("No existing tool metrics file found. Starting fresh.")
tool_metrics_data = []
except json.JSONDecodeError as e:
logger.error(f"Could not parse JSON from {TOOL_METRICS_FILE}: {e}. Starting with empty metrics.")
tool_metrics_data = []
except Exception as e:
logger.error(f"Failed to load tool metrics from {TOOL_METRICS_FILE}: {e}. Starting with empty metrics.", exc_info=True)
tool_metrics_data = []

async def save_tool_metrics_data():
"""Save tool metrics data to file."""
global tool_metrics_data
async with tool_metrics_lock:
try:
SERVERS_DIR.mkdir(parents=True, exist_ok=True) # Ensure directory exists
with open(TOOL_METRICS_FILE, "w") as f:
json.dump(tool_metrics_data, f, indent=2)
logger.debug(f"Saved {len(tool_metrics_data)} tool metric entries to {TOOL_METRICS_FILE}")
except Exception as e:
logger.error(f"Failed to save tool metrics to {TOOL_METRICS_FILE}: {e}", exc_info=True)

async def add_tool_metric_entry(server_name: str, tool_name: str, execution_result: str,
execution_timestamp: str, feedback: str = None):
"""Add a new tool metric entry."""
global tool_metrics_data
async with tool_metrics_lock:
metric_entry = {
"event_type": "usage", # Mark as usage event
"server_name": server_name,
"tool_name": tool_name,
"execution_result": execution_result,
"execution_timestamp": execution_timestamp,
"recorded_timestamp": datetime.now(timezone.utc).isoformat()
}

if feedback:
metric_entry["feedback"] = feedback

tool_metrics_data.append(metric_entry)
logger.info(f"Added tool metric: {server_name}/{tool_name} -> {execution_result}")

# Keep only the last 10000 entries to prevent unlimited growth
if len(tool_metrics_data) > 10000:
tool_metrics_data = tool_metrics_data[-10000:]
logger.info("Trimmed tool metrics data to last 10000 entries")

# Save the updated data
await save_tool_metrics_data()
# --- Tool Metrics Tracking --- END

# --- FAISS Helper Functions --- START

def _get_text_for_embedding(server_info: dict) -> str:
Expand Down Expand Up @@ -1010,6 +1079,10 @@ async def lifespan(app: FastAPI):
# 0. Load FAISS data and embedding model
load_faiss_data() # Loads model, empty index or existing index. Synchronous.

# 0.5 Load tool metrics data
load_tool_metrics_data()
logger.info("Tool metrics data loaded.")

# 1. Load server definitions and persisted enabled/disabled state
load_registered_servers_and_state() # This populates REGISTERED_SERVERS. Synchronous.

Expand Down Expand Up @@ -1640,6 +1713,201 @@ async def refresh_service(service_path: str, username: Annotated[str, Depends(ap
# --- Refresh Endpoint --- END


# --- Tool Metrics Reporting Endpoint --- START
@app.post("/report_tool_metrics")
async def report_tool_metrics(
server_name: Annotated[str, Form()],
tool_name: Annotated[str, Form()],
tool_execution_result: Annotated[str, Form()],
execution_timestamp: Annotated[str, Form()],
tool_execution_feedback: Annotated[str | None, Form()] = None,
username: Annotated[str, Depends(api_auth)] = None,
):
"""
Receives and stores tool usage metrics from MCP servers.

Args:
server_name: Name of the server that provided the tool
tool_name: Name of the tool that was executed
tool_execution_result: Result of execution ('success' or 'failure')
execution_timestamp: ISO timestamp of when the tool was executed
tool_execution_feedback: Optional feedback about the tool execution
username: Username for authentication (provided by dependency)

Returns:
JSON response confirming the metrics were recorded
"""
# Validate execution result
if tool_execution_result not in ["success", "failure"]:
raise HTTPException(
status_code=400,
detail="tool_execution_result must be either 'success' or 'failure'"
)

# Validate timestamp format (basic check)
try:
datetime.fromisoformat(execution_timestamp.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(
status_code=400,
detail="execution_timestamp must be a valid ISO format timestamp"
)

logger.info(f"Received tool metrics report: {server_name}/{tool_name} -> {tool_execution_result} by user '{username}'")

try:
# Add the metric entry
await add_tool_metric_entry(
server_name=server_name,
tool_name=tool_name,
execution_result=tool_execution_result,
execution_timestamp=execution_timestamp,
feedback=tool_execution_feedback
)

return JSONResponse(
status_code=200,
content={
"message": "Tool metrics recorded successfully",
"server_name": server_name,
"tool_name": tool_name,
"execution_result": tool_execution_result,
"timestamp": execution_timestamp,
"has_feedback": bool(tool_execution_feedback)
}
)

except Exception as e:
logger.error(f"Failed to record tool metrics for {server_name}/{tool_name}: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to record tool metrics: {str(e)}"
)
# --- Tool Metrics Reporting Endpoint --- END


# --- Tool Metrics Analytics Endpoint --- START
@app.get("/api/tool_metrics")
async def get_tool_metrics(
limit: int = 1000,
server_name: str | None = None,
tool_name: str | None = None,
execution_result: str | None = None,
event_type: str | None = None,
username: Annotated[str, Depends(api_auth)] = None,
):
"""
Retrieves tool metrics data for analytics (both recommendations and usage).

Args:
limit: Maximum number of entries to return (default: 1000, max: 10000)
server_name: Filter by server name (optional)
tool_name: Filter by tool name (optional)
execution_result: Filter by execution result ('success' or 'failure', optional)
event_type: Filter by event type ('recommendation' or 'usage', optional)
username: Username for authentication (provided by dependency)

Returns:
JSON response with filtered tool metrics data and summary statistics
"""
# Validate limit
if limit < 1 or limit > 10000:
raise HTTPException(
status_code=400,
detail="limit must be between 1 and 10000"
)

# Validate execution_result if provided
if execution_result and execution_result not in ["success", "failure"]:
raise HTTPException(
status_code=400,
detail="execution_result must be either 'success' or 'failure'"
)

# Validate event_type if provided
if event_type and event_type not in ["recommendation", "usage"]:
raise HTTPException(
status_code=400,
detail="event_type must be either 'recommendation' or 'usage'"
)

try:
async with tool_metrics_lock:
# Make a copy to avoid locking during filtering
metrics_copy = list(tool_metrics_data)

# Apply filters
filtered_metrics = metrics_copy

if server_name:
filtered_metrics = [m for m in filtered_metrics if m.get("server_name") == server_name]

if tool_name:
filtered_metrics = [m for m in filtered_metrics if m.get("tool_name") == tool_name]

if execution_result:
filtered_metrics = [m for m in filtered_metrics if m.get("execution_result") == execution_result]

if event_type:
filtered_metrics = [m for m in filtered_metrics if m.get("event_type") == event_type]

# Sort by recorded timestamp (most recent first) and apply limit
filtered_metrics.sort(key=lambda x: x.get("recorded_timestamp", ""), reverse=True)
limited_metrics = filtered_metrics[:limit]

# Generate summary statistics
total_entries = len(filtered_metrics)

# Separate stats for recommendations and usage
recommendations = [m for m in filtered_metrics if m.get("event_type") == "recommendation"]
usage_events = [m for m in filtered_metrics if m.get("event_type") == "usage"]

success_count = len([m for m in usage_events if m.get("execution_result") == "success"])
failure_count = len([m for m in usage_events if m.get("execution_result") == "failure"])

# Get unique servers and tools
unique_servers = set(m.get("server_name") for m in filtered_metrics if m.get("server_name"))
unique_tools = set(m.get("tool_name") for m in filtered_metrics if m.get("tool_name"))

# Count entries with feedback (only usage events have feedback)
feedback_count = len([m for m in usage_events if m.get("feedback")])

logger.info(f"Tool metrics query by '{username}': {total_entries} entries found, returning {len(limited_metrics)}")

return {
"metrics": limited_metrics,
"summary": {
"total_entries": total_entries,
"returned_entries": len(limited_metrics),
"recommendation_count": len(recommendations),
"usage_count": len(usage_events),
"success_count": success_count,
"failure_count": failure_count,
"success_rate": round(success_count / len(usage_events) * 100, 2) if usage_events else 0,
"conversion_rate": round(len(usage_events) / len(recommendations) * 100, 2) if recommendations else 0,
"unique_servers": len(unique_servers),
"unique_tools": len(unique_tools),
"entries_with_feedback": feedback_count,
"feedback_rate": round(feedback_count / len(usage_events) * 100, 2) if usage_events else 0
},
"filters_applied": {
"server_name": server_name,
"tool_name": tool_name,
"execution_result": execution_result,
"event_type": event_type,
"limit": limit
}
}

except Exception as e:
logger.error(f"Failed to retrieve tool metrics: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to retrieve tool metrics: {str(e)}"
)
# --- Tool Metrics Analytics Endpoint --- END


# --- Add Edit Routes ---

@app.get("/edit/{service_path:path}", response_class=HTMLResponse)
Expand Down Expand Up @@ -1820,4 +2088,92 @@ async def websocket_endpoint(websocket: WebSocket):
# if __name__ == "__main__":
# import uvicorn
# # Running this way makes relative paths tricky, better to use uvicorn command from parent
# uvicorn.run(app, host="0.0.0.0", port=7860)
# uvicorn.run(app, host="0.0.0.0", port=7860)

# --- Tool Recommendation Tracking Endpoint --- START
@app.post("/report_tool_recommendation")
async def report_tool_recommendation(
server_name: Annotated[str, Form()],
tool_name: Annotated[str, Form()],
service_path: Annotated[str, Form()],
recommendation_timestamp: Annotated[str, Form()],
query: Annotated[str | None, Form()] = None,
similarity_score: Annotated[float | None, Form()] = None,
username: Annotated[str, Depends(api_auth)] = None,
):
"""
Receives tool recommendation events from MCPGW clients.

Args:
server_name: Name of the server that provided the tool
tool_name: Name of the tool that was recommended
service_path: Service path of the recommended tool
recommendation_timestamp: ISO timestamp of when the recommendation was made
query: The user's natural language query (optional)
similarity_score: The similarity score from FAISS search (optional)
username: Username for authentication (provided by dependency)

Returns:
JSON response confirming the recommendation was recorded
"""
# Validate timestamp format
try:
datetime.fromisoformat(recommendation_timestamp.replace('Z', '+00:00'))
except ValueError:
raise HTTPException(
status_code=400,
detail="recommendation_timestamp must be a valid ISO format timestamp"
)

logger.info(f"Received tool recommendation report: {server_name}/{tool_name} by user '{username}'")

try:
# Create recommendation entry
recommendation_entry = {
"event_type": "recommendation",
"server_name": server_name,
"tool_name": tool_name,
"service_path": service_path,
"recommendation_timestamp": recommendation_timestamp,
"recorded_timestamp": datetime.now(timezone.utc).isoformat()
}

if query:
recommendation_entry["query"] = query
if similarity_score is not None:
recommendation_entry["similarity_score"] = similarity_score

# Add to the same metrics data structure with a different event type
async with tool_metrics_lock:
tool_metrics_data.append(recommendation_entry)
logger.info(f"Added tool recommendation: {server_name}/{tool_name}")

# Keep only the last 10000 entries to prevent unlimited growth
if len(tool_metrics_data) > 10000:
tool_metrics_data[:] = tool_metrics_data[-10000:]
logger.info("Trimmed tool metrics data to last 10000 entries")

# Save the updated data
await save_tool_metrics_data()

return JSONResponse(
status_code=200,
content={
"message": "Tool recommendation recorded successfully",
"server_name": server_name,
"tool_name": tool_name,
"service_path": service_path,
"timestamp": recommendation_timestamp
}
)

except Exception as e:
logger.error(f"Failed to record tool recommendation for {server_name}/{tool_name}: {e}", exc_info=True)
raise HTTPException(
status_code=500,
detail=f"Failed to record tool recommendation: {str(e)}"
)
# --- Tool Recommendation Tracking Endpoint --- END


# --- Tool Metrics Analytics Endpoint --- START
Loading
Loading