diff --git a/registry/main.py b/registry/main.py index 5075c9e..3007d84 100644 --- a/registry/main.py +++ b/registry/main.py @@ -131,6 +131,75 @@ # --- WebSocket Connection Management --- active_connections: Set[WebSocket] = set() +# --- Tool Metrics Tracking --- START +TOOL_METRICS_FILE = SERVERS_DIR / "tool_metrics.json" +tool_metrics_data = [] # List of metric entries: [{server_name, tool_name, execution_result, feedback, timestamp, ...}] +tool_metrics_lock = asyncio.Lock() + +def load_tool_metrics_data(): + """Load tool metrics data from file.""" + global tool_metrics_data + try: + if TOOL_METRICS_FILE.exists(): + with open(TOOL_METRICS_FILE, "r") as f: + loaded_data = json.load(f) + if isinstance(loaded_data, list): + tool_metrics_data = loaded_data + logger.info(f"Loaded {len(tool_metrics_data)} tool metric entries from {TOOL_METRICS_FILE}") + else: + logger.warning(f"Invalid tool metrics format in {TOOL_METRICS_FILE}. Expected a list. Resetting.") + tool_metrics_data = [] + else: + logger.info("No existing tool metrics file found. Starting fresh.") + tool_metrics_data = [] + except json.JSONDecodeError as e: + logger.error(f"Could not parse JSON from {TOOL_METRICS_FILE}: {e}. Starting with empty metrics.") + tool_metrics_data = [] + except Exception as e: + logger.error(f"Failed to load tool metrics from {TOOL_METRICS_FILE}: {e}. Starting with empty metrics.", exc_info=True) + tool_metrics_data = [] + +async def save_tool_metrics_data(): + """Save tool metrics data to file.""" + global tool_metrics_data + async with tool_metrics_lock: + try: + SERVERS_DIR.mkdir(parents=True, exist_ok=True) # Ensure directory exists + with open(TOOL_METRICS_FILE, "w") as f: + json.dump(tool_metrics_data, f, indent=2) + logger.debug(f"Saved {len(tool_metrics_data)} tool metric entries to {TOOL_METRICS_FILE}") + except Exception as e: + logger.error(f"Failed to save tool metrics to {TOOL_METRICS_FILE}: {e}", exc_info=True) + +async def add_tool_metric_entry(server_name: str, tool_name: str, execution_result: str, + execution_timestamp: str, feedback: str = None): + """Add a new tool metric entry.""" + global tool_metrics_data + async with tool_metrics_lock: + metric_entry = { + "event_type": "usage", # Mark as usage event + "server_name": server_name, + "tool_name": tool_name, + "execution_result": execution_result, + "execution_timestamp": execution_timestamp, + "recorded_timestamp": datetime.now(timezone.utc).isoformat() + } + + if feedback: + metric_entry["feedback"] = feedback + + tool_metrics_data.append(metric_entry) + logger.info(f"Added tool metric: {server_name}/{tool_name} -> {execution_result}") + + # Keep only the last 10000 entries to prevent unlimited growth + if len(tool_metrics_data) > 10000: + tool_metrics_data = tool_metrics_data[-10000:] + logger.info("Trimmed tool metrics data to last 10000 entries") + + # Save the updated data + await save_tool_metrics_data() +# --- Tool Metrics Tracking --- END + # --- FAISS Helper Functions --- START def _get_text_for_embedding(server_info: dict) -> str: @@ -1010,6 +1079,10 @@ async def lifespan(app: FastAPI): # 0. Load FAISS data and embedding model load_faiss_data() # Loads model, empty index or existing index. Synchronous. + # 0.5 Load tool metrics data + load_tool_metrics_data() + logger.info("Tool metrics data loaded.") + # 1. Load server definitions and persisted enabled/disabled state load_registered_servers_and_state() # This populates REGISTERED_SERVERS. Synchronous. @@ -1640,6 +1713,201 @@ async def refresh_service(service_path: str, username: Annotated[str, Depends(ap # --- Refresh Endpoint --- END +# --- Tool Metrics Reporting Endpoint --- START +@app.post("/report_tool_metrics") +async def report_tool_metrics( + server_name: Annotated[str, Form()], + tool_name: Annotated[str, Form()], + tool_execution_result: Annotated[str, Form()], + execution_timestamp: Annotated[str, Form()], + tool_execution_feedback: Annotated[str | None, Form()] = None, + username: Annotated[str, Depends(api_auth)] = None, +): + """ + Receives and stores tool usage metrics from MCP servers. + + Args: + server_name: Name of the server that provided the tool + tool_name: Name of the tool that was executed + tool_execution_result: Result of execution ('success' or 'failure') + execution_timestamp: ISO timestamp of when the tool was executed + tool_execution_feedback: Optional feedback about the tool execution + username: Username for authentication (provided by dependency) + + Returns: + JSON response confirming the metrics were recorded + """ + # Validate execution result + if tool_execution_result not in ["success", "failure"]: + raise HTTPException( + status_code=400, + detail="tool_execution_result must be either 'success' or 'failure'" + ) + + # Validate timestamp format (basic check) + try: + datetime.fromisoformat(execution_timestamp.replace('Z', '+00:00')) + except ValueError: + raise HTTPException( + status_code=400, + detail="execution_timestamp must be a valid ISO format timestamp" + ) + + logger.info(f"Received tool metrics report: {server_name}/{tool_name} -> {tool_execution_result} by user '{username}'") + + try: + # Add the metric entry + await add_tool_metric_entry( + server_name=server_name, + tool_name=tool_name, + execution_result=tool_execution_result, + execution_timestamp=execution_timestamp, + feedback=tool_execution_feedback + ) + + return JSONResponse( + status_code=200, + content={ + "message": "Tool metrics recorded successfully", + "server_name": server_name, + "tool_name": tool_name, + "execution_result": tool_execution_result, + "timestamp": execution_timestamp, + "has_feedback": bool(tool_execution_feedback) + } + ) + + except Exception as e: + logger.error(f"Failed to record tool metrics for {server_name}/{tool_name}: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to record tool metrics: {str(e)}" + ) +# --- Tool Metrics Reporting Endpoint --- END + + +# --- Tool Metrics Analytics Endpoint --- START +@app.get("/api/tool_metrics") +async def get_tool_metrics( + limit: int = 1000, + server_name: str | None = None, + tool_name: str | None = None, + execution_result: str | None = None, + event_type: str | None = None, + username: Annotated[str, Depends(api_auth)] = None, +): + """ + Retrieves tool metrics data for analytics (both recommendations and usage). + + Args: + limit: Maximum number of entries to return (default: 1000, max: 10000) + server_name: Filter by server name (optional) + tool_name: Filter by tool name (optional) + execution_result: Filter by execution result ('success' or 'failure', optional) + event_type: Filter by event type ('recommendation' or 'usage', optional) + username: Username for authentication (provided by dependency) + + Returns: + JSON response with filtered tool metrics data and summary statistics + """ + # Validate limit + if limit < 1 or limit > 10000: + raise HTTPException( + status_code=400, + detail="limit must be between 1 and 10000" + ) + + # Validate execution_result if provided + if execution_result and execution_result not in ["success", "failure"]: + raise HTTPException( + status_code=400, + detail="execution_result must be either 'success' or 'failure'" + ) + + # Validate event_type if provided + if event_type and event_type not in ["recommendation", "usage"]: + raise HTTPException( + status_code=400, + detail="event_type must be either 'recommendation' or 'usage'" + ) + + try: + async with tool_metrics_lock: + # Make a copy to avoid locking during filtering + metrics_copy = list(tool_metrics_data) + + # Apply filters + filtered_metrics = metrics_copy + + if server_name: + filtered_metrics = [m for m in filtered_metrics if m.get("server_name") == server_name] + + if tool_name: + filtered_metrics = [m for m in filtered_metrics if m.get("tool_name") == tool_name] + + if execution_result: + filtered_metrics = [m for m in filtered_metrics if m.get("execution_result") == execution_result] + + if event_type: + filtered_metrics = [m for m in filtered_metrics if m.get("event_type") == event_type] + + # Sort by recorded timestamp (most recent first) and apply limit + filtered_metrics.sort(key=lambda x: x.get("recorded_timestamp", ""), reverse=True) + limited_metrics = filtered_metrics[:limit] + + # Generate summary statistics + total_entries = len(filtered_metrics) + + # Separate stats for recommendations and usage + recommendations = [m for m in filtered_metrics if m.get("event_type") == "recommendation"] + usage_events = [m for m in filtered_metrics if m.get("event_type") == "usage"] + + success_count = len([m for m in usage_events if m.get("execution_result") == "success"]) + failure_count = len([m for m in usage_events if m.get("execution_result") == "failure"]) + + # Get unique servers and tools + unique_servers = set(m.get("server_name") for m in filtered_metrics if m.get("server_name")) + unique_tools = set(m.get("tool_name") for m in filtered_metrics if m.get("tool_name")) + + # Count entries with feedback (only usage events have feedback) + feedback_count = len([m for m in usage_events if m.get("feedback")]) + + logger.info(f"Tool metrics query by '{username}': {total_entries} entries found, returning {len(limited_metrics)}") + + return { + "metrics": limited_metrics, + "summary": { + "total_entries": total_entries, + "returned_entries": len(limited_metrics), + "recommendation_count": len(recommendations), + "usage_count": len(usage_events), + "success_count": success_count, + "failure_count": failure_count, + "success_rate": round(success_count / len(usage_events) * 100, 2) if usage_events else 0, + "conversion_rate": round(len(usage_events) / len(recommendations) * 100, 2) if recommendations else 0, + "unique_servers": len(unique_servers), + "unique_tools": len(unique_tools), + "entries_with_feedback": feedback_count, + "feedback_rate": round(feedback_count / len(usage_events) * 100, 2) if usage_events else 0 + }, + "filters_applied": { + "server_name": server_name, + "tool_name": tool_name, + "execution_result": execution_result, + "event_type": event_type, + "limit": limit + } + } + + except Exception as e: + logger.error(f"Failed to retrieve tool metrics: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to retrieve tool metrics: {str(e)}" + ) +# --- Tool Metrics Analytics Endpoint --- END + + # --- Add Edit Routes --- @app.get("/edit/{service_path:path}", response_class=HTMLResponse) @@ -1820,4 +2088,92 @@ async def websocket_endpoint(websocket: WebSocket): # if __name__ == "__main__": # import uvicorn # # Running this way makes relative paths tricky, better to use uvicorn command from parent -# uvicorn.run(app, host="0.0.0.0", port=7860) \ No newline at end of file +# uvicorn.run(app, host="0.0.0.0", port=7860) + +# --- Tool Recommendation Tracking Endpoint --- START +@app.post("/report_tool_recommendation") +async def report_tool_recommendation( + server_name: Annotated[str, Form()], + tool_name: Annotated[str, Form()], + service_path: Annotated[str, Form()], + recommendation_timestamp: Annotated[str, Form()], + query: Annotated[str | None, Form()] = None, + similarity_score: Annotated[float | None, Form()] = None, + username: Annotated[str, Depends(api_auth)] = None, +): + """ + Receives tool recommendation events from MCPGW clients. + + Args: + server_name: Name of the server that provided the tool + tool_name: Name of the tool that was recommended + service_path: Service path of the recommended tool + recommendation_timestamp: ISO timestamp of when the recommendation was made + query: The user's natural language query (optional) + similarity_score: The similarity score from FAISS search (optional) + username: Username for authentication (provided by dependency) + + Returns: + JSON response confirming the recommendation was recorded + """ + # Validate timestamp format + try: + datetime.fromisoformat(recommendation_timestamp.replace('Z', '+00:00')) + except ValueError: + raise HTTPException( + status_code=400, + detail="recommendation_timestamp must be a valid ISO format timestamp" + ) + + logger.info(f"Received tool recommendation report: {server_name}/{tool_name} by user '{username}'") + + try: + # Create recommendation entry + recommendation_entry = { + "event_type": "recommendation", + "server_name": server_name, + "tool_name": tool_name, + "service_path": service_path, + "recommendation_timestamp": recommendation_timestamp, + "recorded_timestamp": datetime.now(timezone.utc).isoformat() + } + + if query: + recommendation_entry["query"] = query + if similarity_score is not None: + recommendation_entry["similarity_score"] = similarity_score + + # Add to the same metrics data structure with a different event type + async with tool_metrics_lock: + tool_metrics_data.append(recommendation_entry) + logger.info(f"Added tool recommendation: {server_name}/{tool_name}") + + # Keep only the last 10000 entries to prevent unlimited growth + if len(tool_metrics_data) > 10000: + tool_metrics_data[:] = tool_metrics_data[-10000:] + logger.info("Trimmed tool metrics data to last 10000 entries") + + # Save the updated data + await save_tool_metrics_data() + + return JSONResponse( + status_code=200, + content={ + "message": "Tool recommendation recorded successfully", + "server_name": server_name, + "tool_name": tool_name, + "service_path": service_path, + "timestamp": recommendation_timestamp + } + ) + + except Exception as e: + logger.error(f"Failed to record tool recommendation for {server_name}/{tool_name}: {e}", exc_info=True) + raise HTTPException( + status_code=500, + detail=f"Failed to record tool recommendation: {str(e)}" + ) +# --- Tool Recommendation Tracking Endpoint --- END + + +# --- Tool Metrics Analytics Endpoint --- START \ No newline at end of file diff --git a/servers/mcpgw/server.py b/servers/mcpgw/server.py index 7d626d6..280c53d 100644 --- a/servers/mcpgw/server.py +++ b/servers/mcpgw/server.py @@ -19,6 +19,7 @@ import numpy as np # Added from sklearn.metrics.pairwise import cosine_similarity # Added import faiss # Added +from datetime import datetime, timezone # Added for tracking timestamps # Configure logging logging.basicConfig( @@ -40,6 +41,36 @@ _session_cookie: Optional[str] = None _auth_lock = asyncio.Lock() +# --- Tool Recommendation Tracking --- START +# Removed local tracking - now reports to registry instead + +async def report_recommendation_to_registry(service_path: str, tool_name: str, service_name: str, + query: str = None, similarity_score: float = None, + credentials: Credentials = None): + """Report a tool recommendation to the registry for centralized tracking.""" + try: + endpoint = "/report_tool_recommendation" + + form_data = { + "server_name": service_name, + "tool_name": tool_name, + "service_path": service_path, + "recommendation_timestamp": datetime.now(timezone.utc).isoformat() + } + + if query: + form_data["query"] = query + if similarity_score is not None: + form_data["similarity_score"] = similarity_score + + await _call_registry_api("POST", endpoint, credentials=credentials, data=form_data) + logger.debug(f"MCPGW: Reported recommendation to registry: {service_path}/{tool_name}") + + except Exception as e: + # Don't let tracking failures break the main functionality + logger.error(f"MCPGW: Failed to report recommendation to registry for {service_path}/{tool_name}: {e}") +# --- Tool Recommendation Tracking --- END + # --- FAISS and Sentence Transformer Integration for mcpgw --- START _faiss_data_lock = asyncio.Lock() _embedding_model_mcpgw: Optional[SentenceTransformer] = None @@ -774,6 +805,15 @@ async def intelligent_tool_finder( final_results = ranked_tools[:top_n_tools] logger.info(f"MCPGW: Top {len(final_results)} tools found: {json.dumps(final_results, indent=2)}") + # --- Track recommendations for analytics --- START + for result in final_results: + try: + await report_recommendation_to_registry(result["service_path"], result["tool_name"], result["service_name"], + natural_language_query, result["overall_similarity_score"], auth_credentials) + except Exception as e: + logger.error(f"MCPGW: Failed to track recommendation for {result['service_path']}/{result['tool_name']}: {e}") + # --- Track recommendations for analytics --- END + # Remove the temporary 'text_for_embedding' field from results for res in final_results: del res["text_for_embedding"] @@ -781,6 +821,146 @@ async def intelligent_tool_finder( return final_results +@mcp.tool() +async def report_tool_metrics( + server_name: str = Field(..., description="Name of the server that provided the tool."), + tool_name: str = Field(..., description="Name of the tool that was executed."), + tool_execution_result: str = Field(..., description="Result of tool execution: 'success' or 'failure'."), + tool_execution_feedback: Optional[str] = Field(None, description="Optional feedback about the tool execution from the LLM."), + username: str = Field(..., description="Username for registry authentication"), + password: str = Field(..., description="Password for registry authentication") +) -> Dict[str, Any]: + """ + Reports tool usage metrics back to the registry for analytics and improvement. + + This function sends tool execution data to the main registry's /report_tool_metrics endpoint, + allowing the registry to track tool usage patterns, success rates, and collect feedback + for improving the MCP ecosystem. + + Args: + server_name: Name of the server that provided the tool. + tool_name: Name of the tool that was executed. + tool_execution_result: Result of tool execution ('success' or 'failure'). + tool_execution_feedback: Optional feedback about the tool execution. + username: Username for registry authentication. + password: Password for registry authentication. + + Returns: + Dict[str, Any]: Response from the registry API confirming the metrics were recorded. + + Raises: + Exception: If the API call fails or validation errors occur. + """ + # Validate tool_execution_result + if tool_execution_result not in ["success", "failure"]: + raise ValueError("tool_execution_result must be either 'success' or 'failure'") + + endpoint = "/report_tool_metrics" + credentials = Credentials(username=username, password=password) + + # Create form data for the metrics + form_data = { + "server_name": server_name, + "tool_name": tool_name, + "tool_execution_result": tool_execution_result, + "execution_timestamp": datetime.now(timezone.utc).isoformat() + } + + # Add feedback if provided + if tool_execution_feedback: + form_data["tool_execution_feedback"] = tool_execution_feedback + + logger.info(f"MCPGW: Reporting tool metrics for {server_name}/{tool_name} with result: {tool_execution_result}") + + try: + response = await _call_registry_api("POST", endpoint, credentials=credentials, data=form_data) + logger.info(f"MCPGW: Successfully reported tool metrics for {server_name}/{tool_name}") + return response + except Exception as e: + logger.error(f"MCPGW: Failed to report tool metrics for {server_name}/{tool_name}: {e}") + raise Exception(f"Failed to report tool metrics: {str(e)}") from e + + +@mcp.tool() +async def get_recommendation_stats( + username: str = Field(..., description="Username for registry authentication"), + password: str = Field(..., description="Password for registry authentication") +) -> Dict[str, Any]: + """ + Retrieves current tool recommendation statistics from the registry. + + This shows how many times each tool has been recommended by the intelligent_tool_finder, + along with timestamps and summary statistics. + + Args: + username: Username for registry authentication. + password: Password for registry authentication. + + Returns: + Dict[str, Any]: Comprehensive recommendation statistics including: + - total_recommendations: Total number of tool recommendations made + - total_tools_recommended: Number of unique tools that have been recommended + - services_count: Number of services with recommended tools + - detailed_stats: Per-service and per-tool recommendation counts and timestamps + """ + credentials = Credentials(username=username, password=password) + + try: + # Fetch recommendation data from registry + endpoint = "/api/tool_metrics?event_type=recommendation&limit=10000" + response = await _call_registry_api("GET", endpoint, credentials=credentials) + + recommendation_metrics = response.get("metrics", []) + summary = response.get("summary", {}) + + # Process the data to match the expected format + detailed_stats = {} + for metric in recommendation_metrics: + service_path = metric.get("service_path") + tool_name = metric.get("tool_name") + + if service_path and tool_name: + if service_path not in detailed_stats: + detailed_stats[service_path] = {} + + if tool_name not in detailed_stats[service_path]: + detailed_stats[service_path][tool_name] = { + "count": 0, + "last_recommended": None + } + + detailed_stats[service_path][tool_name]["count"] += 1 + + # Update last recommended timestamp if this one is more recent + recommendation_time = metric.get("recommendation_timestamp") + if recommendation_time: + current_last = detailed_stats[service_path][tool_name]["last_recommended"] + if not current_last or recommendation_time > current_last: + detailed_stats[service_path][tool_name]["last_recommended"] = recommendation_time + + result = { + "total_recommendations": summary.get("recommendation_count", 0), + "total_tools_recommended": len([tool for service in detailed_stats.values() for tool in service.keys()]), + "services_count": len(detailed_stats), + "detailed_stats": detailed_stats + } + + logger.info(f"MCPGW: Retrieved recommendation stats from registry: {result['total_recommendations']} total recommendations across {result['total_tools_recommended']} tools") + + return result + + except Exception as e: + logger.error(f"MCPGW: Failed to retrieve recommendation stats from registry: {e}") + # Return empty stats on error + return { + "total_recommendations": 0, + "total_tools_recommended": 0, + "services_count": 0, + "detailed_stats": {}, + "error": f"Failed to retrieve stats: {str(e)}" + } + + # --- Main Execution --- def main():