diff --git a/.gitignore b/.gitignore index 00e9338..7e50e63 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ *.DS_Store sandbox/ dist/ +example/credentials.json +example/gmail_token.json +example/token.json diff --git a/example/README.md b/example/README.md new file mode 100644 index 0000000..da77836 --- /dev/null +++ b/example/README.md @@ -0,0 +1,143 @@ +# Elkar A2A Google Services Integration + +This project integrates Google Calendar and Gmail services with Elkar A2A, providing powerful email and calendar management capabilities. + +## Prerequisites + +- Python 3.8 or higher +- Google Cloud Platform account +- Claude Desktop application + +## Installation + +1. Clone the repository: +```bash +git clone https://github.com/yourusername/elkar-a2a.git +cd elkar-a2a +``` + +2. Install the required dependencies: +```bash +pip install -r requirements.txt +``` + +This will install all necessary packages including: +- Core dependencies (crewai, langchain, elkar) +- Google API dependencies +- Web server dependencies (uvicorn, fastapi) +- Database dependencies +- Utility dependencies +- MCP dependencies + +## Google Services Setup + +### 1. Enable Google APIs + +1. Go to the [Google Cloud Console](https://console.cloud.google.com/) +2. Create a new project or select an existing one +3. Enable the following APIs: + - Gmail API + - Google Calendar API +4. Configure the OAuth consent screen: + - Set up the OAuth consent screen with necessary scopes + - Add test users if in testing mode + +### 2. Download Credentials + +1. In the Google Cloud Console, go to "APIs & Services" > "Credentials" +2. Click "Create Credentials" > "OAuth client ID" +3. Choose "Desktop application" as the application type +4. Download the credentials file and save it as `credentials.json` in the `example` directory + +### 3. First-time Authentication + +When you run the application for the first time: +- The system will automatically create `token.json` for Google Calendar +- The system will automatically create `gmail_token.json` for Gmail +- A browser window will open for OAuth authentication +- Grant the requested permissions + +## Claude Desktop Configuration + +### 1. Configure Claude Desktop + +1. Locate your Claude Desktop configuration file: + - Mac: `/Users/mm/Library/Application Support/Claude/claude_desktop_config.json` + - Windows: `%APPDATA%\Claude\claude_desktop_config.json` + - Linux: `~/.config/Claude/claude_desktop_config.json` + +2. Add the MCP server configuration to the file: +```json +{ + "mcpServers": { + "a2a_elkar": { + "command": "/opt/anaconda3/bin/python", + "args": ["path to your server_mcp.py"], + "env": { + "ANTHROPIC_API_KEY": "sk-xxx", + "OPENAI_API_KEY": "sk-yyy", + "AGENT_URLS": "http://localhost:5001,http://localhost:5002" + } + } + } +} +``` + +### 2. Add Metaprompt + +1. Open the Claude Desktop configuration file +2. Add the metaprompt from `claudePrompt.txt` to the appropriate section +3. Save the file + +#### Example Claude Desktop Configuration Screenshots + +![Claude Desktop Profile Settings](claude_config1.png) + +![Claude Desktop General Settings](claude_config2.png) + +## Running the Servers + +1. Start the Gmail server: +```bash +python example/server.py +``` + +2. Start the Calendar server: +```bash +python example/server_cal.py +``` + +The servers will be available at: +- Gmail Assistant: http://localhost:5001 +- Calendar Assistant: http://localhost:5002 + +## Troubleshooting + +### Authentication Issues + +If you encounter authentication issues: +1. Delete the existing token files (`token.json` and `gmail_token.json`) +2. Run the server with the `force_reauth` parameter +3. Complete the OAuth flow again + +### Claude Desktop Issues + +If Claude Desktop doesn't recognize the MCP servers: +1. Verify the configuration file path +2. Check the JSON syntax +3. Restart Claude Desktop +4. Ensure the servers are running before starting Claude Desktop + +## Security Notes + +- Keep your `credentials.json` file secure and never commit it to version control +- Regularly rotate your OAuth credentials +- Monitor the OAuth consent screen for any unauthorized access +- Use environment variables for sensitive configuration + +## Support + +For issues or questions: +1. Check the troubleshooting section +2. Review the Google Cloud Console logs +3. Check the server logs for detailed error messages diff --git a/example/claudePrompt.txt b/example/claudePrompt.txt new file mode 100644 index 0000000..77b9702 --- /dev/null +++ b/example/claudePrompt.txt @@ -0,0 +1,25 @@ +At your disposal you have A2A agents, which are specialized tools that provide extended functionalities. +You can invoke these agents to perform tasks you may not be able to do directly. +If you're unsure what capabilities are available, first call the discover_a2a_agents tool, which will list all available agents and their functions. +For example, if you are asked to send a calendar invite or an email invitation, follow this workflow: +Workflow: Sending an Invite (Calendar or Email) +Discover Capabilities: +Call discover_a2a_agents and look for agents with keywords like calendar, email, or invite. +Choose a Tool Based on Goal: +If the goal is to create a calendar event (e.g., Google Calendar) and invite specific participants by email, use the create_calendar_event tool. +If the goal is to just send an email invitation, use the appropriate send_email agent/tool. +Create Calendar Event with Invitees: +When using the calendar agent: +Set the title, start_time, end_time, and description. +Add attendees as a list of email addresses. +Optionally set a location or video_conference_link. +Send an Email Invitation Instead: +When using an email agent: +Set the recipient_email, subject, and body. +Optionally include event details in the email body. +Confirmation: +After calling the tool, check the response for confirmation or errors. +If needed, retry or adjust based on the feedback from the agent/tool. +Example Use Cases: +"Create a Google Calendar event titled Team Sync for tomorrow at 10 AM with alice@example.com and bob@example.com invited." +"Send an email to charlie@example.com with subject Meeting Invite and body Let's meet at 2 PM to discuss the roadmap." \ No newline at end of file diff --git a/example/claude_config1.png b/example/claude_config1.png new file mode 100644 index 0000000..fd948da Binary files /dev/null and b/example/claude_config1.png differ diff --git a/example/claude_config2.png b/example/claude_config2.png new file mode 100644 index 0000000..81da3a4 Binary files /dev/null and b/example/claude_config2.png differ diff --git a/example/claude_desktop_config.json b/example/claude_desktop_config.json new file mode 100644 index 0000000..e4c0253 --- /dev/null +++ b/example/claude_desktop_config.json @@ -0,0 +1,7 @@ +"a2a_elkar": { + "command": "/opt/anaconda3/bin/python", + "args": ["path to your server_mcp.py"], + "env": { + "AGENT_URLS": "http://localhost:5001,http://localhost:5002" + } + }, diff --git a/example/server.py b/example/server.py new file mode 100644 index 0000000..f6864ea --- /dev/null +++ b/example/server.py @@ -0,0 +1,763 @@ +#!/usr/bin/env python +""" +Elkar A2A server that wraps a CrewAI agent for Gmail and animal color tasks. +""" +import asyncio +import os +import json +import base64 +from typing import List, Optional, Dict, Any, Tuple +import uuid +from datetime import datetime +import pickle +# You'll need to install these packages: +# pip install crewai langchain-openai langchain-core elkar dotenv +# pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib +from crewai import Agent, Task as CrewTask, Crew, Process +from langchain_openai import ChatOpenAI +from crewai.tools import tool +from dotenv import load_dotenv +import uvicorn +import httplib2 +from pydantic import SecretStr + +# Google API imports +from googleapiclient.discovery import build +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request +from googleapiclient.errors import HttpError + +# Elkar imports +from elkar.a2a_types import ( + AgentCard, + AgentCapabilities, + TaskStatus, + TaskState, + Message, + TextPart, + Artifact, + AgentSkill, +) +from elkar.server.server import A2AServer +from elkar.task_manager.task_manager_base import RequestContext +from elkar.task_manager.task_manager_with_task_modifier import TaskManagerWithModifier +from elkar.task_modifier.base import TaskModifierBase +from elkar.task_modifier.task_modifier import TaskModifier +from elkar.a2a_types import * +from elkar.store.elkar_client_store import ElkarClientStore +from elkar.server.server import A2AServer +from elkar.task_manager.task_manager_base import RequestContext +from elkar.task_manager.task_manager_with_task_modifier import TaskManagerWithModifier +from elkar.task_modifier.base import TaskModifierBase + +# Load environment variables +load_dotenv() + +# Define the Gmail API scopes - use multiple scopes to ensure we have proper permissions +GMAIL_SCOPES = [ + # 'https://www.googleapis.com/auth/gmail.readonly', # Read-only access to Gmail + # 'https://www.googleapis.com/auth/gmail.modify', # Access to modify emails (not delete) + # 'https://www.googleapis.com/auth/gmail.labels', # Access to modify labels + 'https://mail.google.com/' # Full access to Gmail (includes send) +] +GMAIL_TOKEN_FILE = 'example/gmail_token.json' +GMAIL_CREDENTIALS_FILE = 'example/credentials.json' + + + +# Gmail API tools +def get_gmail_service(force_reauth: bool = False): + """ + Gets an authorized Gmail API service instance. + + Args: + force_reauth: If True, forces re-authentication by deleting existing token + + Returns: + A Gmail API service object. + """ + creds = None + + # If force_reauth is True, delete the token file if it exists + if force_reauth and os.path.exists(GMAIL_TOKEN_FILE): + os.remove(GMAIL_TOKEN_FILE) + print(f"Deleted existing token file to force re-authentication") + + # The file token.json stores the user's access and refresh tokens + if os.path.exists(GMAIL_TOKEN_FILE): + with open(GMAIL_TOKEN_FILE, 'rb') as token: + creds = pickle.load(token) + print(f"Loaded credentials from {GMAIL_TOKEN_FILE}") + + # If credentials don't exist or are invalid, prompt the user to log in + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + print(f"Refreshing expired credentials") + creds.refresh(Request()) + else: + # Check if credentials file exists + if not os.path.exists(GMAIL_CREDENTIALS_FILE): + raise FileNotFoundError(f"Credentials file not found: {GMAIL_CREDENTIALS_FILE}. Please set up OAuth credentials.") + + print(f"Starting new OAuth flow with scopes: {GMAIL_SCOPES}") + flow = InstalledAppFlow.from_client_secrets_file( + GMAIL_CREDENTIALS_FILE, GMAIL_SCOPES) + creds = flow.run_local_server(port=0) + print(f"New authentication completed") + + # Save the credentials for the next run + with open(GMAIL_TOKEN_FILE, 'wb') as token: + pickle.dump(creds, token) + print(f"Saved credentials to {GMAIL_TOKEN_FILE}") + + service = build('gmail', 'v1', credentials=creds) + return service + + +@tool +def read_emails(max_results: int = 10, force_reauth: bool = False) -> str: + """ + Retrieves a list of recent emails from the user's primary Gmail inbox. + + This function connects to the Gmail API, fetches the most recent emails + (up to a specified maximum number), and extracts key information such as + sender, subject, date, and a snippet of the email body. + + Args: + max_results: The maximum number of emails to retrieve. Defaults to 10. + This helps control the amount of data returned and API usage. + force_reauth: If True, forces the Gmail API to re-authenticate by deleting + any existing token. This is useful if there are persistent + authentication issues. Defaults to False. + + Returns: + A string containing the formatted information for each retrieved email, + separated by '---'. If no messages are found, it returns "No messages found.". + In case of an API error or other exception, it returns an error message. + """ + try: + service = get_gmail_service(force_reauth=force_reauth) + results = service.users().messages().list(userId='me', maxResults=max_results).execute() + messages = results.get('messages', []) + + if not messages: + return "No messages found." + + email_info = [] + for message in messages: + msg = service.users().messages().get(userId='me', id=message['id']).execute() + + # Extract headers + headers = msg['payload']['headers'] + subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject') + sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'Unknown Sender') + date = next((header['value'] for header in headers if header['name'].lower() == 'date'), 'Unknown Date') + + # Get snippet + snippet = msg.get('snippet', 'No preview available') + + email_info.append(f"From: {sender}\nSubject: {subject}\nDate: {date}\nSnippet: {snippet}\n---") + + return "\n".join(email_info) + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +@tool +def search_emails(query: str, max_results: int = 10, force_reauth: bool = False) -> str: + """ + Searches the user's Gmail inbox for emails matching a specific query. + + This function utilizes the Gmail API's search capabilities to find emails + that match the provided search query string (e.g., 'from:boss@example.com', + 'subject:project update'). It then retrieves and formats details for + each matching email, including sender, subject, date, and a snippet. + + Args: + query: The Gmail search query string. This uses standard Gmail search + operators (e.g., 'from:', 'to:', 'subject:', 'is:unread'). + max_results: The maximum number of matching emails to retrieve. + Defaults to 10. + force_reauth: If True, forces the Gmail API to re-authenticate. + Defaults to False. + + Returns: + A string containing the formatted information for each matching email, + separated by '---'. If no messages match the query, it returns + "No messages found matching query: ". In case of an API error + or other exception, it returns an error message. + """ + try: + service = get_gmail_service(force_reauth=force_reauth) + results = service.users().messages().list(userId='me', q=query, maxResults=max_results).execute() + messages = results.get('messages', []) + + if not messages: + return f"No messages found matching query: {query}" + + email_info = [] + for message in messages: + msg = service.users().messages().get(userId='me', id=message['id']).execute() + + # Extract headers + headers = msg['payload']['headers'] + subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject') + sender = next((header['value'] for header in headers if header['name'].lower() == 'from'), 'Unknown Sender') + date = next((header['value'] for header in headers if header['name'].lower() == 'date'), 'Unknown Date') + + # Get snippet + snippet = msg.get('snippet', 'No preview available') + + email_info.append(f"From: {sender}\nSubject: {subject}\nDate: {date}\nSnippet: {snippet}\n---") + + return "\n".join(email_info) + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +@tool +def send_email(to: str, subject: str, body: str, force_reauth: bool = False) -> str: + """ + Composes and sends an email using the user's Gmail account. + + This function takes the recipient's email address, the subject line, and + the plain text body of the email. It then constructs a MIME message and + uses the Gmail API to send the email. Line breaks in the body are preserved. + + Args: + to: The email address of the recipient (e.g., 'recipient@example.com'). + subject: The subject line of the email. + body: The plain text content of the email. Line breaks (e.g., '\n') + will be rendered as new lines in the sent email. + force_reauth: If True, forces the Gmail API to re-authenticate. + Defaults to False. + + Returns: + A string confirming that the email was sent successfully, including the + Message ID, (e.g., "Email sent successfully to . Message ID: "). + If an error occurs during the process (e.g., API error, invalid email + format), it returns an error message detailing the issue. + """ + try: + import email.mime.text + import email.mime.multipart + from email.mime.text import MIMEText + from email.mime.multipart import MIMEMultipart + + service = get_gmail_service(force_reauth=force_reauth) + + # Create proper MIME message + msg = MIMEMultipart() + msg['to'] = to + msg['subject'] = subject + + # Attach the body as plain text, preserving line breaks + msg.attach(MIMEText(body, 'plain')) + + # Convert the message to a string and then encode it + raw_message = base64.urlsafe_b64encode(msg.as_bytes()).decode('ascii') + + send_message_request = { + 'raw': raw_message + } + + # For debugging + print(f"Email content (first 200 chars): {body[:200]}...") + print(f"Contains line breaks: {'\\n' in body}") + + sent_message = service.users().messages().send(userId='me', body=send_message_request).execute() + return f"Email sent successfully to {to}. Message ID: {sent_message.get('id')}" + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +class CrewAIWrapper: + """Wrapper for CrewAI agents that handles Gmail and animal color tasks.""" + + def __init__(self, verbose: bool = True, model_name: str = "gpt-3.5-turbo-0125"): + """Initialize the CrewAI wrapper with a Gmail and animal color agent.""" + # Setup LLM + api_key_str = os.getenv("OPENAI_API_KEY") + if not api_key_str: + print("Warning: OPENAI_API_KEY not found in environment variables.") + + print(api_key_str) + llm = ChatOpenAI( + model=model_name, + temperature=0.7, + api_key=SecretStr(api_key_str) if api_key_str else None + ) + + # Create the agent + self.agent = Agent( + role="Email Assistant", + goal="Help users read, search, and send emails. Coordinate with the Calendar Assistant for tasks requiring calendar management.", + backstory=( + "You are an expert email assistant. You can read, search, and send emails using your available tools (`read_emails`, `search_emails`, `send_email`). " + "You are also aware of a Google Calendar Assistant and should suggest using it if a user's request involves creating, modifying, or querying calendar events " + "(e.g., 'add this meeting to my calendar', 'what is my schedule for next Monday?')." + ), + verbose=verbose, + allow_delegation=False, + tools=[read_emails, search_emails, send_email], # Added send_email tool + llm=llm + ) + + # Define the CrewAI structure + self.crew = Crew( + agents=[self.agent], + tasks=[], + verbose=verbose, + process=Process.sequential, + ) + + + async def process_email_query(self, prompt: str) -> str: + """ + Run the CrewAI agent to process email queries. + Args: + prompt: The user's email-related prompt + Returns: + The email information requested or a confirmation/error from sending an email. + """ + # Metaprompt for collaboration, input validation and tool usage + metaprompt = ( + "You are the Gmail Assistant. Your available tools are `read_emails`, `search_emails`, and `send_email`. " + "To send an email, you MUST use the `send_email` tool. This tool requires `to` (recipient's email), `subject`, and `body`. " + "If the user asks to send an email but does not provide all three (recipient, subject, body), you MUST ask for the missing information. " + "CRITICALLY: Before sending an email, examine the content carefully for placeholders or incomplete information:\n" + "1. If an email address IS provided by the user (e.g., for the 'to' parameter) but it looks like a generic placeholder (e.g., `name@example.com`, `user@example.org`, `info@domain.com`), " + "you MUST ask the user to confirm if this is the actual email address they want to use. For instance, say: 'The email provided for the recipient is matthieu@example.com. Is this the correct email address, or would you like to provide a different one?' Only proceed if they confirm it or provide a new one.\n" + "2. If the email body contains placeholder text like '[Your name]', '[Signature]', '[Insert X]', or similar bracketed placeholders, you MUST ask the user to provide the actual text to replace these placeholders.\n" + "3. Be especially vigilant for signature placeholders like '[Please include my standard email signature]', '[Insert signature]', or instructions like 'please add my signature here'. NEVER send an email with these placeholders - you MUST ask the user to provide their actual signature text first.\n" + "4. If any text in the proposed email seems like a template or contains placeholders (indicated by ALL CAPS, multiple underscores like ___, or similar patterns), ask the user to provide the actual information first.\n" + "NEVER SEND emails containing placeholders, incomplete information, or fields clearly marked as needing replacement. ALWAYS ask for complete information first.\n" + "Do NOT invent email addresses or placeholders in the email content. If you are unsure, ask the user.\n" + "If the user's request seems to require calendar operations (e.g., 'find the flight details and add it to my calendar'), " + "first state that you will handle the email-specific parts if any, and then clearly suggest that the user " + "should ask the Google Calendar Assistant to perform calendar operations. Do not attempt to directly modify or query the calendar yourself. " + "If the request is purely about email functions (reading, searching, or sending with all details provided and confirmed), proceed as usual." + ) + + # Check if this is a request to force re-authentication + force_reauth = "force reauth" in prompt.lower() or "re-authenticate" in prompt.lower() + + # If permission error is mentioned, suggest re-authentication + if "permission" in prompt.lower() or "403" in prompt or "insufficient authentication" in prompt: + update_prompt = f"{prompt}\n\nIt seems there might be authentication or permission issues. Try using force_reauth=True to re-authenticate with Gmail." + else: + update_prompt = prompt + + # Create a dynamic task based on user input + print(f"Email Prompt: {update_prompt}") + task = CrewTask( + description=f"{metaprompt}\\n\\nUser request: {update_prompt}", + expected_output="Retrieved email information, confirmation of email sent, or guidance to consult the Calendar assistant or provide more details.", + agent=self.agent + ) + + # Update crew tasks and execute + self.crew.tasks = [task] + + # Run the crew in an executor to avoid blocking + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, self.crew.kickoff) + # Extract string from CrewOutput object + if hasattr(result, 'raw'): + return str(result.raw) + return str(result) + + + +# Set up Elkar A2A server with CrewAI integration +async def setup_server(): + """Main function to set up the Elkar A2A server with CrewAI integration.""" + + # Create the CrewAI wrapper + crew_ai_wrapper = CrewAIWrapper() + + # Create the agent card - using proper AgentSkill objects + agent_card = AgentCard( + name="Gmail Assistant", + description="Manages Gmail. Can read and search emails. Collaborates with the Google Calendar Assistant for tasks requiring calendar operations (e.g., creating events from email details).", + url="http://localhost:5001", + version="1.0.0", + skills=[ + AgentSkill( + id="email-assistant", + name="email-assistant", + description="Core capability for managing Gmail tasks through natural language. Understands requests to read, search, draft, and send emails. Interacts with users to clarify details and confirm actions before execution, especially for sending emails. Defers calendar-related tasks to the Google Calendar Assistant.", + tags=["email", "gmail", "assistant", "natural language processing", "communication", "inbox management", "email composition"], + examples=[ + "Read my latest unread emails.", + "Search for emails from 'marketing@example.com' about 'new campaign'.", + "Draft an email to 'john.doe@example.com' with subject 'Project Update' and body 'The project is on track.'", + "Can you send the draft I just made?", + "What did Sarah say about the meeting notes?" + ], + inputModes=["text"], + outputModes=["text"] + ), + AgentSkill( + id="gmail-integration", + name="gmail-integration", + description="Provides the technical interface to the Gmail API. Manages authentication, executes commands to search mailboxes, retrieve email content (threads and individual messages), and send emails. Ensures secure and reliable communication with Google's Gmail services.", + tags=["gmail", "api", "google api", "email service", "integration", "authentication", "email search", "email retrieval", "email sending"], + examples=[ + "Execute API call to list emails matching 'subject:Urgent'.", + "Retrieve full content of email thread ID 'xyz123'.", + "Use Gmail API to send an email to 'recipient@example.com'.", + "Handle Gmail API authentication flow.", + "Force re-authentication with Gmail." + ], + inputModes=["text"], # Represents instructions that translate to API calls + outputModes=["text", "json"] # API responses, status messages, or structured email data + ) + ], + capabilities=AgentCapabilities( + streaming=True, + pushNotifications=True, + stateTransitionHistory=True, + ), + ) + + # Add helper functions to detect placeholders and questions + def has_placeholder_in_prompt(prompt: str) -> Tuple[bool, str]: + """ + Check if the user's prompt contains any common placeholder text or emails. + Returns (has_placeholder, message) + """ + # Check for placeholder emails + placeholder_domains = ["example.com", "example.org", "test.com"] + for domain in placeholder_domains: + if domain in prompt: + return True, f"I notice you're using an email with '{domain}' which appears to be a placeholder. Please provide a valid email address instead." + + # Check for placeholder text patterns in brackets + placeholder_patterns = ["[Your name]", "[Name]", "[Full name]", "[Insert name]", + "[Your email]", "[Email]", "[Phone]", "[Your phone]", + "[Address]", "[Your address]", + # Add signature-specific placeholders + "[Signature]", "[Your signature]", "[My signature]", + "[Please include my standard email signature]", + "[Insert signature]", "[Include signature]", + "[Standard signature]", "[Email signature]", + "[Add signature]", "[Add my signature]"] + + for pattern in placeholder_patterns: + if pattern.lower() in prompt.lower(): + return True, f"I notice you're using a placeholder '{pattern}' in your request. Please provide the actual information to replace this placeholder." + + return False, "" + + def contains_placeholder_text(text: str) -> Tuple[bool, str]: + """ + Checks if any common placeholder patterns are in the text. + + Args: + text: Text to check for placeholders + + Returns: + Tuple of (has_placeholder, placeholder_message) + """ + # Check for placeholder brackets like [Your name], [Name], etc. + placeholder_patterns = ["[Your name]", "[Name]", "[Full name]", "[Insert name]", + "[Your email]", "[Email]", "[Phone]", "[Your phone]", + "[Address]", "[Your address]", + # Add signature-specific placeholders + "[Signature]", "[Your signature]", "[My signature]", + "[Please include my standard email signature]", + "[Insert signature]", "[Include signature]", + "[Standard signature]", "[Email signature]", + "[Add signature]", "[Add my signature]", + "[Company signature]", "[Business signature]"] + + for pattern in placeholder_patterns: + if pattern.lower() in text.lower(): + replacement_text = "signature" if "signature" in pattern.lower() else "information" + return True, f"I notice there's a placeholder '{pattern}' in the email content. Please provide your actual {replacement_text} to replace this placeholder." + + # Enhanced detection for signature placeholders without brackets + signature_indicators = [ + "please include my standard email signature", + "please include my signature", + "add my signature", + "insert my signature", + "include my signature", + "attach my signature", + "standard signature here", + "your signature here", + "signature: _____" + ] + + for indicator in signature_indicators: + if indicator.lower() in text.lower(): + return True, f"I notice there's a request to include your signature with text like '{indicator}'. Please provide your actual signature text to include in the email." + + # Check for other common placeholder indicators + placeholder_indicators = [" None: + """Handle tasks by using the CrewAI wrapper to get animal colors or process email requests.""" + + try: + # Access the task message using the pattern from crewai_a2a_server.py + task_obj = task._task + user_prompt = "" + print(f"Task: {task_obj}") + + # Extract text from message parts if they exist + if hasattr(task_obj, 'status') and hasattr(task_obj.status, 'message') and task_obj.status.message and hasattr(task_obj.status.message, 'parts'): + for part in task_obj.status.message.parts: + if hasattr(part, "text") and isinstance(part, TextPart): + user_prompt += part.text + # If no message in status, try to look in history + elif hasattr(task_obj, 'history') and task_obj.history and len(task_obj.history) > 0: + for msg in task_obj.history: + if hasattr(msg, 'parts'): + for part in msg.parts: + if hasattr(part, "text") and isinstance(part, TextPart): + user_prompt += part.text + + print(f"User prompt: {user_prompt}") + if not user_prompt: + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text="No text prompt was provided in the request.")], + ), + ), + is_final=True, + ) + return + + # Check for placeholders in the input prompt + has_placeholder, placeholder_message = has_placeholder_in_prompt(user_prompt) + if has_placeholder: + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text=placeholder_message)], + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + return + + # Check if this is a request to re-authenticate + if "force reauth" in user_prompt.lower() or "re-authenticate" in user_prompt.lower(): + await task.set_status( + TaskStatus( + state=TaskState.WORKING, + message=Message( + role="agent", + parts=[TextPart(text="I'll try to re-authenticate with Gmail...")], + ), + ) + ) + + # Try forcing re-authentication first + try: + get_gmail_service(force_reauth=True) + await task.set_status( + TaskStatus( + state=TaskState.WORKING, + message=Message( + role="agent", + parts=[TextPart(text="Successfully re-authenticated with Gmail. Now processing your request...")], + ), + ) + ) + except Exception as e: + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text=f"Failed to re-authenticate: {str(e)}")], + ), + ), + is_final=True, + ) + return + else: + # Send initial status update + await task.set_status( + TaskStatus( + state=TaskState.WORKING, + message=Message( + role="agent", + parts=[TextPart(text="I'm processing your request...")], + ), + ) + ) + + # Determine if this is an email-related request + is_email_request = any(keyword in user_prompt.lower() for keyword in + ["email", "gmail", "inbox", "message", "mail"]) + + # Determine if this is an animal color request - REMOVED + # is_animal_request = any(keyword in user_prompt.lower() for keyword in + # ["cat", "dog", "animal", "color"]) + + if is_email_request: + # Call CrewAI to process email query + result = await crew_ai_wrapper.process_email_query(user_prompt) + else: + # Default to helping determine what the request is about + result = "I'm not sure what you're asking for. I can help with email-related tasks. Please specify a Gmail request." + + # Direct check for the signature placeholder that the user specifically mentioned + if "[please include my standard email signature]" in result.lower() or "please include my standard email signature" in result.lower(): + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text="I notice there's a placeholder for your standard email signature. Please provide your actual signature text to include in the email.")], + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + return + + # Check if result contains placeholder text that needs client input + has_placeholder_text, placeholder_text_message = contains_placeholder_text(result) + if has_placeholder_text: + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text=placeholder_text_message)], + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + return + + # Check for placeholder emails in the result + if "example.com" in result or "example.org" in result or "test.com" in result: + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text="I notice there's a placeholder email address like example.com in the request. Please provide a valid email address instead of a placeholder.")], + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + return + + # Check if the agent is asking for more information + # Simple heuristic: check for question marks or common question phrases + is_question = result.strip().endswith("?") or \ + any(phrase in result.lower() for phrase in + ["what ", "which ", "when ", "who ", "how ", + "could you specify", "please provide", "do you mean", "clarify"]) + + if is_question: + # Agent is asking for more information, set state to INPUT_REQUIRED + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text=result)], # Pass the agent's question back + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + return + + # Add the generated text as an artifact + await task.upsert_artifacts( + [ + Artifact( + parts=[TextPart(text=result)], + index=0, + ) + ] + ) + + # Mark the task as completed + await task.set_status( + TaskStatus( + state=TaskState.COMPLETED, + message=Message( + role="agent", + parts=[TextPart(text="Request completed successfully!")], + ), + ), + is_final=True, + ) + + except Exception as e: + # Handle any errors + error_message = f"Error processing request: {str(e)}" + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text=error_message)], + ), + ), + is_final=True, + ) + + # Create the task manager with the handler + task_manager = TaskManagerWithModifier( + agent_card, send_task_handler=task_handler + ) + + + + + # Configure the ElkarClientStore + api_key = os.environ.get("ELKAR_API_KEY", "") # Replace with your actual Elkar API key + store = ElkarClientStore(base_url="https://api.elkar.co/api", api_key=api_key) + + task_manager: TaskManagerWithModifier = TaskManagerWithModifier( + agent_card, + send_task_handler=task_handler, + store=store # Pass the configured store to the task manager + ) + + server = A2AServer(task_manager, host="0.0.0.0", port=5001, endpoint="/") + return server + + +if __name__ == "__main__": + # Set up and run the server using uvicorn directly, avoiding asyncio conflict + # load_dotenv() + # print(os.getenv("OPENAI_API_KEY")) + server = asyncio.run(setup_server()) + + print("Starting Elkar A2A server on http://localhost:5001") + print("Press Ctrl+C to stop the server") + + # Run with uvicorn directly instead of server.start() + uvicorn.run(server.app, host="0.0.0.0", port=5001) diff --git a/example/server_cal.py b/example/server_cal.py new file mode 100644 index 0000000..0f832b6 --- /dev/null +++ b/example/server_cal.py @@ -0,0 +1,1139 @@ +#!/usr/bin/env python +""" +Elkar A2A server that wraps a CrewAI agent for Google Calendar tasks. +""" +import asyncio +import os +import json +import base64 +from typing import List, Optional, Dict, Any, Tuple, cast +import uuid +from datetime import datetime, timedelta, timezone +import pickle +# You'll need to install these packages: +# pip install crewai langchain-openai langchain-core elkar dotenv +# pip install google-api-python-client google-auth-httplib2 google-auth-oauthlib +from crewai import Agent, Task as CrewTask, Crew, Process +from langchain_openai import ChatOpenAI +from crewai.tools import tool +from dotenv import load_dotenv +import uvicorn +import httplib2 +from elkar.a2a_types import * +from elkar.store.elkar_client_store import ElkarClientStore +from elkar.server.server import A2AServer +from elkar.task_manager.task_manager_base import RequestContext +from elkar.task_manager.task_manager_with_task_modifier import TaskManagerWithModifier +from elkar.task_modifier.base import TaskModifierBase +from elkar.task_modifier.task_modifier import TaskModifier +from pydantic import SecretStr +import logging + +# Load environment variables +load_dotenv() + +# Define the Google Calendar API scopes - use multiple scopes to ensure we have proper permissions +CALENDAR_SCOPES = [ + 'https://www.googleapis.com/auth/calendar', # Full access to all calendars + # 'https://www.googleapis.com/auth/calendar.events', # Full access to events on all calendars + # 'https://www.googleapis.com/auth/calendar.readonly', # Read-only access to calendars + # 'https://www.googleapis.com/auth/calendar.events.readonly' # Read-only access to events +] +CALENDAR_TOKEN_FILE = os.environ.get('CALENDAR_TOKEN_FILE', 'example/token.json') +CALENDAR_CREDENTIALS_FILE = os.environ.get('CALENDAR_CREDENTIALS_FILE', 'example/credentials.json') + +# Google API imports +from googleapiclient.discovery import build +from google_auth_oauthlib.flow import InstalledAppFlow +from google.auth.transport.requests import Request +from googleapiclient.errors import HttpError + +# Google Calendar API tools +def get_calendar_service(force_reauth: bool = False): + """ + Gets an authorized Google Calendar API service instance. + + Args: + force_reauth: If True, forces re-authentication by deleting existing token + + Returns: + A Google Calendar API service object. + """ + creds = None + + logging.info(f"Using credentials file: {CALENDAR_CREDENTIALS_FILE}") + logging.info(f"Using token file: {CALENDAR_TOKEN_FILE}") + + # If force_reauth is True, delete the token file if it exists + if force_reauth and os.path.exists(CALENDAR_TOKEN_FILE): + os.remove(CALENDAR_TOKEN_FILE) + logging.info(f"Deleted existing token file to force re-authentication") + + # The file token.json stores the user's access and refresh tokens + if os.path.exists(CALENDAR_TOKEN_FILE): + try: + with open(CALENDAR_TOKEN_FILE, 'rb') as token: + creds = pickle.load(token) + logging.info(f"Loaded credentials from {CALENDAR_TOKEN_FILE}") + logging.info(f"Credentials valid: {creds.valid}") + if creds.expired: + logging.info("Credentials are expired") + if creds.refresh_token: + logging.info("Refresh token is available") + except Exception as e: + logging.info(f"Error loading token file: {str(e)}") + + # If credentials don't exist or are invalid, prompt the user to log in + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + logging.info(f"Refreshing expired credentials") + try: + creds.refresh(Request()) + logging.info("Successfully refreshed credentials") + except Exception as e: + logging.info(f"Error refreshing credentials: {str(e)}") + else: + # Check if credentials file exists + if not os.path.exists(CALENDAR_CREDENTIALS_FILE): + raise FileNotFoundError(f"Credentials file not found: {CALENDAR_CREDENTIALS_FILE}. Please set up OAuth credentials.") + + logging.info(f"Starting new OAuth flow with scopes: {CALENDAR_SCOPES}") + try: + flow = InstalledAppFlow.from_client_secrets_file( + CALENDAR_CREDENTIALS_FILE, CALENDAR_SCOPES) + creds = flow.run_local_server(port=0) + logging.info(f"New authentication completed") + except Exception as e: + logging.info(f"Error during OAuth flow: {str(e)}") + raise + + # Save the credentials for the next run + try: + with open(CALENDAR_TOKEN_FILE, 'wb') as token: + pickle.dump(creds, token) + logging.info(f"Saved credentials to {CALENDAR_TOKEN_FILE}") + except Exception as e: + logging.info(f"Error saving token file: {str(e)}") + + try: + service = build('calendar', 'v3', credentials=creds) + logging.info("Successfully built calendar service") + return service + except Exception as e: + logging.info(f"Error building calendar service: {str(e)}") + raise + + +@tool +def list_calendar_events(max_results: int = 10, time_min_days: int = -7, time_max_days: int = 7, force_reauth: bool = False) -> str: + """ + Retrieves a list of events from the user's primary Google Calendar within a specified time range. + + This function queries the Google Calendar API for events within a time window specified by + time_min_days and time_max_days relative to the current time. For example: + - time_min_days=-7, time_max_days=7 will show events from 7 days ago to 7 days in the future + - time_min_days=-1, time_max_days=0 will show events from yesterday to now + - time_min_days=0, time_max_days=0 will show events for today only + - time_min_days=0, time_max_days=7 will show events from now to 7 days in the future + + Args: + max_results: The maximum number of events to retrieve. Defaults to 10. + time_min_days: The number of days before current time to start listing events. + Negative values look into the past. Defaults to -7 (one week ago). + time_max_days: The number of days after current time to end listing events. + Defaults to 7 (one week ahead). + force_reauth: If True, forces the Google Calendar API to re-authenticate. + Defaults to False. + + Returns: + A string containing formatted information for each event, + separated by '---'. If no events are found, it returns + "No events found in the specified time range.". In case of an API error or other + exception, it returns an error message. + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + # Calculate time range + now = datetime.now(timezone.utc) + + # For today's events (time_min_days=0, time_max_days=0), set the time range to cover the entire day + if time_min_days == 0 and time_max_days == 0: + # Set time_min to start of today + time_min = now.replace(hour=0, minute=0, second=0, microsecond=0).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + # Set time_max to end of today + time_max = now.replace(hour=23, minute=59, second=59, microsecond=999999).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + else: + # For other time ranges, use the original calculation + time_min = (now + timedelta(days=time_min_days)).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + time_max = (now + timedelta(days=time_max_days)).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z' + + # Call the Calendar API + events_result = service.events().list( + calendarId='primary', + timeMin=time_min, + timeMax=time_max, + maxResults=max_results, + singleEvents=True, + orderBy='startTime' + ).execute() + + events = events_result.get('items', []) + + if not events: + if time_min_days == 0 and time_max_days == 0: + return "No events found for today." + return f"No events found in the specified time range ({time_min_days} to {time_max_days} days from now)." + + event_info = [] + for event in events: + start = event['start'].get('dateTime', event['start'].get('date')) + + # Format the start time for display + try: + if 'T' in start: # This is a dateTime format + start_dt = datetime.fromisoformat(start.replace('Z', '+00:00')) + start_formatted = start_dt.strftime('%Y-%m-%d %H:%M:%S') + else: # This is a date format + start_formatted = start + except: + start_formatted = start + + # Get event details + summary = event.get('summary', 'No Title') + location = event.get('location', 'No Location') + description = event.get('description', 'No Description') + event_id = event.get('id', 'No ID') + + # Truncate description if too long + if description and len(description) > 100: + description = description[:97] + "..." + + event_info.append(f"Event: {summary}\nID: {event_id}\nWhen: {start_formatted}\nLocation: {location}\nDescription: {description}\n---") + + return "\n".join(event_info) + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +@tool +def search_calendar_events(query: str, max_results: int = 10, force_reauth: bool = False) -> str: + """ + Searches for events in the user's primary Google Calendar that match a given query. + + This function uses the Google Calendar API to find events based on a textual query. + It looks for matches in event titles, descriptions, attendees, etc. For each + matching event, it retrieves and formats details like summary, start time, + location, description, and event ID. Start times are formatted for clarity, + and long descriptions are truncated. + + Args: + query: The search term or query string to use for finding events. + For example, "meeting with John" or "dentist appointment". + max_results: The maximum number of matching events to retrieve. + Defaults to 10. + force_reauth: If True, forces the Google Calendar API to re-authenticate. + Defaults to False. + + Returns: + A string containing formatted information for each matching event, + separated by '---'. If no events match the query, it returns + "No events found matching query: ". In case of an API error + or other exception, it returns an error message. + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + # Calculate time_min as current time + time_min = datetime.utcnow().isoformat() + 'Z' # 'Z' indicates UTC time + + # Call the Calendar API with a query + events_result = service.events().list( + calendarId='primary', + timeMin=time_min, + maxResults=max_results, + singleEvents=True, + orderBy='startTime', + q=query + ).execute() + + events = events_result.get('items', []) + + if not events: + return f"No events found matching query: {query}" + + event_info = [] + for event in events: + start = event['start'].get('dateTime', event['start'].get('date')) + + # Format the start time for display + try: + if 'T' in start: # This is a dateTime format + start_dt = datetime.fromisoformat(start.replace('Z', '+00:00')) + start_formatted = start_dt.strftime('%Y-%m-%d %H:%M:%S') + else: # This is a date format + start_formatted = start + except: + start_formatted = start + + # Get event details + summary = event.get('summary', 'No Title') + location = event.get('location', 'No Location') + description = event.get('description', 'No Description') + event_id = event.get('id', 'No ID') + + # Truncate description if too long + if description and len(description) > 100: + description = description[:97] + "..." + + event_info.append(f"Event: {summary}\nID: {event_id}\nWhen: {start_formatted}\nLocation: {location}\nDescription: {description}\n---") + + return "\n".join(event_info) + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +@tool +def get_calendar_details(days: int = 7, force_reauth: bool = False) -> str: + """ + Retrieves general details about the user's primary Google Calendar and a summary of upcoming events. + + This function fetches metadata about the primary calendar, including its name, + ID, time zone, and description. It also calculates and returns the number + of events scheduled within a specified number of upcoming days. + + Args: + days: The number of days into the future to look for counting events. + For example, a value of 7 will count events in the next 7 days. + Defaults to 7. + force_reauth: If True, forces the Google Calendar API to re-authenticate. + Defaults to False. + + Returns: + A string containing the calendar's name, ID, time zone, description, + and the count of events in the specified upcoming period. Each piece + of information is on a new line. In case of an API error or other + exception, it returns an error message. + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + # Get calendar details + calendar = service.calendars().get(calendarId='primary').execute() + calendar_info = [ + f"Calendar Name: {calendar.get('summary', 'Unknown')}", + f"Calendar ID: {calendar.get('id', 'Unknown')}", + f"Time Zone: {calendar.get('timeZone', 'Unknown')}", + f"Description: {calendar.get('description', 'None')}" + ] + + # Calculate time range for events + now = datetime.utcnow() + time_min = now.isoformat() + 'Z' + time_max = (now + timedelta(days=days)).isoformat() + 'Z' + + # Get event count + events_result = service.events().list( + calendarId='primary', + timeMin=time_min, + timeMax=time_max, + singleEvents=True + ).execute() + + events = events_result.get('items', []) + calendar_info.append(f"Events in the next {days} days: {len(events)}") + + return "\n".join(calendar_info) + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +@tool +def delete_calendar_event(event_id: str, force_reauth: bool = False) -> str: + """ + Deletes a specific event from the user's primary Google Calendar. + + This function uses the Google Calendar API to remove an event identified by its unique ID. + + Args: + event_id: The unique identifier of the calendar event to be deleted. + This ID can typically be obtained from functions like + `list_upcoming_events` or `search_calendar_events`. + force_reauth: If True, forces the Google Calendar API to re-authenticate. + Defaults to False. + + Returns: + A string confirming the successful deletion of the event, e.g., + "Event with ID deleted successfully!". If an error occurs + (e.g., event not found, permission issues), it returns an error message. + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + # Delete the event + service.events().delete(calendarId='primary', eventId=event_id).execute() + + return f"Event with ID {event_id} deleted successfully!" + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +# Utility functions for the server (not exposed as LLM tools) +def get_calendar_timezone(force_reauth: bool = False) -> str: + """ + Gets the timezone setting of the user's primary calendar. + Args: + force_reauth: Whether to force re-authentication + Returns: + A string with the calendar's timezone (e.g., 'Europe/Paris') + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + # Get calendar details + calendar = service.calendars().get(calendarId='primary').execute() + timezone = calendar.get('timeZone', 'UTC') + + return timezone + + except HttpError as error: + return "UTC" # Default to UTC in case of error + except Exception as e: + return "UTC" # Default to UTC in case of error + + +def contains_placeholder_email(email_list: List[str]) -> Tuple[bool, List[str]]: + """ + Checks if any email in the list appears to be a placeholder. + + Args: + email_list: List of email addresses to check + + Returns: + Tuple of (has_placeholder, list_of_placeholders) + """ + placeholder_domains = [ + "example.com", "example.org", "example.net", "test.com", "test.net", + "domain.com", "yourdomain.com", "anycompany.com", "email.com", "user.com" + ] + + placeholder_prefixes = [ + "user", "test", "info", "admin", "contact", "hello", "name", "email", + "username", "sample", "demo", "fake" + ] + + placeholders_found = [] + + for email in email_list: + if not email or '@' not in email: + continue + + domain = email.split('@')[1].lower() + prefix = email.split('@')[0].lower() + + # Check for placeholder domains + if any(pd in domain for pd in placeholder_domains): + placeholders_found.append(email) + continue + + # Check for common placeholder patterns + if any(pp == prefix for pp in placeholder_prefixes): + placeholders_found.append(email) + continue + + return len(placeholders_found) > 0, placeholders_found + + +@tool +def create_calendar_event(summary: str, start_time: str, end_time: str, + description: str = "", location: str = "", time_zone: str = "", + attendees: Optional[List[str]] = None, + create_conference: bool = True, + force_reauth: bool = False) -> str: + """ + Creates a new event in the user's primary Google Calendar with specified details. + + This function allows for the creation of a new calendar event, including its title (summary), + start and end times, description, location, and attendees. It can also automatically + generate a Google Meet conference link for the event. + + Time Handling: + - If `time_zone` is provided (e.g., 'Europe/Paris'), `start_time` and `end_time` should be + in 'YYYY-MM-DDTHH:MM:SS' format and are considered local to that timezone. + - If `time_zone` is NOT provided, `start_time` and `end_time` MUST be in full ISO 8601 + format with a UTC offset (e.g., '2023-10-26T10:00:00-07:00') or 'Z' for UTC + (e.g., '2023-10-26T17:00:00Z'). + - For all-day events, `start_time` and `end_time` should be in 'YYYY-MM-DD' format, + and `time_zone` is not applicable in the same way (the event spans the whole day + regardless of timezone for display, but Google Calendar handles the underlying UTC). + + Args: + summary: The title or summary of the event (e.g., "Team Meeting"). + start_time: The start date/time of the event. Format depends on whether `time_zone` + is provided and if it's an all-day event. See Time Handling notes. + end_time: The end date/time of the event. Format follows the same rules as `start_time`. + description: An optional detailed description for the event. + location: An optional location for the event (e.g., "Conference Room 4" or an address). + time_zone: Optional. The IANA time zone name for `start_time` and `end_time` + (e.g., 'America/New_York', 'Europe/London'). If not provided, times must + include offset information. + attendees: An optional list of email addresses of people to invite to the event. + (e.g., ['user1@example.com', 'user2@example.com']). + create_conference: If True (default), attempts to create a Google Meet conference + link for the event. Set to False to not create one. + force_reauth: If True, forces the Google Calendar API to re-authenticate. + Defaults to False. + + Returns: + A string confirming the successful creation of the event, including its ID, a web link + to the event, and a conference link if generated (e.g., "Event created successfully and verified!\nID: \nLink: \nConference Link: "). + If `create_conference` was True but no link was generated, a note is added. + Returns an error message if the event creation fails due to API errors, permission + issues, invalid parameters, or event conflicts. + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + is_all_day = 'T' not in start_time + + event_body: Dict[str, Any] = { + 'summary': summary, + 'location': location, + 'description': description, + } + + if is_all_day: + event_body['start'] = {'date': start_time} + event_body['end'] = {'date': end_time} + else: + if time_zone: + event_body['start'] = {'dateTime': start_time, 'timeZone': time_zone} + event_body['end'] = {'dateTime': end_time, 'timeZone': time_zone} + else: + if not start_time.endswith('Z') and '+' not in start_time and '-' not in start_time[10:]: + return "Error: start_time must be in ISO format with Z or offset if time_zone is not provided." + if not end_time.endswith('Z') and '+' not in end_time and '-' not in end_time[10:]: + return "Error: end_time must be in ISO format with Z or offset if time_zone is not provided." + event_body['start'] = {'dateTime': start_time} + event_body['end'] = {'dateTime': end_time} + + if attendees: + event_body['attendees'] = [{'email': email} for email in attendees] + + if create_conference: + event_body['conferenceData'] = { + 'createRequest': { + 'requestId': f'{uuid.uuid4().hex}', + 'conferenceSolutionKey': {'type': 'hangoutsMeet'} + } + } + + print(f"Creating event with params: {event_body}") + + # Validate if the event can be created before actually creating it + try: + # First, attempt to verify if the event is valid by fetching calendar metadata + service.calendars().get(calendarId='primary').execute() + except HttpError as calendar_error: + return f"Calendar access error: {str(calendar_error)}. Please check your calendar permissions and authentication." + except Exception as e: + return f"Failed to verify calendar: {str(e)}" + + # Attempt to create the event + try: + created_event = service.events().insert( + calendarId='primary', + body=event_body, + sendUpdates="all", + conferenceDataVersion=1 if create_conference else 0 # Required if conferenceData is set + ).execute() + except HttpError as http_error: + if http_error.resp.status == 403: + return f"Permission error: {str(http_error)}. Make sure the Google account has proper permissions to create events." + elif http_error.resp.status == 400: + return f"Invalid event parameters: {str(http_error)}. Please check the event details provided." + elif http_error.resp.status == 409: + return f"Event conflict: {str(http_error)}. The event may conflict with another event." + else: + return f"API error creating event: {str(http_error)}" + except Exception as e: + return f"Unexpected error creating event: {str(e)}" + + # Verify the event was actually created by trying to fetch it + try: + event_id = created_event.get('id') + verification = service.events().get(calendarId='primary', eventId=event_id).execute() + if not verification or verification.get('id') != event_id: + return f"Event appears to have been created (ID: {event_id}) but verification failed. Please check your calendar." + except Exception as ve: + return f"Event was inserted but verification failed: {str(ve)}. The event might not have been properly created." + + html_link = created_event.get('htmlLink') + event_id_str = created_event.get('id') + + conference_info = "" + conference_link_created = False + if created_event.get('conferenceData') and created_event['conferenceData'].get('entryPoints'): + for entry_point in created_event['conferenceData']['entryPoints']: + if entry_point.get('entryPointType') == 'video': + conference_info = f"\nConference Link: {entry_point.get('uri')}" + conference_link_created = True + break + + main_message = f"Event created successfully and verified!\nID: {event_id_str}\nLink: {html_link}" + final_message = main_message + conference_info + + if create_conference and not conference_link_created: + final_message += "\nNote: A video conference was requested, but a link was not present in the API response." + + # Print the successful event creation details for debugging + print(f"Event created: {event_id_str}, Summary: {summary}, Start: {start_time}, Time zone: {time_zone}") + + return final_message + + except HttpError as error: + error_details = f"HTTP error {error.resp.status}: {error._get_reason()}" + print(f"Calendar API HTTP error: {error_details}") + return f"An error occurred with the Calendar API: {error_details}" + except Exception as e: + print(f"Unexpected exception creating calendar event: {str(e)}") + return f"An unexpected error occurred: {str(e)}" + + +@tool +def update_calendar_event(event_id: str, summary: str = "", start_time: str = "", + end_time: str = "", description: str = "", location: str = "", time_zone: str = "", + attendees_to_add: Optional[List[str]] = None, + attendees_to_remove: Optional[List[str]] = None, + force_reauth: bool = False) -> str: + """ + Updates an existing event in the user's primary Google Calendar. + + This function allows modification of various properties of an existing calendar event, + identified by its `event_id`. Fields that are not provided or are empty strings (for textual + fields) will not be updated. Attendee lists can be modified by providing lists of emails + to add or remove. + + Time Handling for Updates: + - Similar to `create_calendar_event`, if `start_time` or `end_time` are being updated AND + `time_zone` is provided, the times are local to that zone. + - If `time_zone` is not provided when updating times, the new `start_time` and `end_time` + must include UTC offset or 'Z'. + - To convert an event to all-day or change an all-day event's date, provide `start_time` + and `end_time` in 'YYYY-MM-DD' format. + + Args: + event_id: The unique ID of the event to be updated. + summary: Optional. The new title for the event. If empty, not updated. + start_time: Optional. The new start date/time. See Time Handling notes. If empty, not updated. + end_time: Optional. The new end date/time. See Time Handling notes. If empty, not updated. + description: Optional. The new description for the event. If empty, not updated. + location: Optional. The new location for the event. If empty, not updated. + time_zone: Optional. The IANA time zone for updated `start_time` and `end_time`. + attendees_to_add: Optional. A list of email addresses to add as attendees. + attendees_to_remove: Optional. A list of email addresses to remove from attendees. + force_reauth: If True, forces the Google Calendar API to re-authenticate. + Defaults to False. + + Returns: + A string confirming the successful update of the event, including its ID and web link + (e.g., "Event updated successfully!\nID: \nLink: "). + Returns an error message if the update fails (e.g., event not found, API error, + permission issues). + """ + try: + service = get_calendar_service(force_reauth=force_reauth) + + # Get the existing event + event: Dict[str, Any] = service.events().get(calendarId='primary', eventId=event_id).execute() + + # Update standard properties if provided + if summary: + event['summary'] = summary + if location: + event['location'] = location + if description: + event['description'] = description + + # Update start time if provided + if start_time: + is_all_day_start = 'T' not in start_time + if is_all_day_start: + event['start'] = {'date': start_time} + else: + if time_zone: + event['start'] = {'dateTime': start_time, 'timeZone': time_zone} + else: + if not start_time.endswith('Z') and '+' not in start_time and '-' not in start_time[10:]: + return "Error: start_time must be in ISO format with Z or offset if time_zone is not provided for update." + event['start'] = {'dateTime': start_time} + + # Update end time if provided + if end_time: + is_all_day_end = 'T' not in end_time + if is_all_day_end: + event['end'] = {'date': end_time} + else: + if time_zone: + event['end'] = {'dateTime': end_time, 'timeZone': time_zone} + else: + if not end_time.endswith('Z') and '+' not in end_time and '-' not in end_time[10:]: + return "Error: end_time must be in ISO format with Z or offset if time_zone is not provided for update." + event['end'] = {'dateTime': end_time} + + # Manage attendees + current_attendees = event.get('attendees', []) + updated_attendees = [att for att in current_attendees] # Make a mutable copy + + if attendees_to_remove: + emails_to_remove = set(attendees_to_remove) + updated_attendees = [att for att in updated_attendees if att.get('email') not in emails_to_remove] + + if attendees_to_add: + existing_emails = {att.get('email') for att in updated_attendees} + for email_to_add in attendees_to_add: + if email_to_add not in existing_emails: + updated_attendees.append({'email': email_to_add}) + + event['attendees'] = updated_attendees + + # Update the event + updated_event = service.events().update( + calendarId='primary', + eventId=event_id, + body=event, + sendUpdates="all" # Ensure notifications are sent + ).execute() + + return f"Event updated successfully!\nID: {updated_event.get('id')}\nLink: {updated_event.get('htmlLink')}" + + except HttpError as error: + return f"An error occurred: {error}" + except Exception as e: + return f"An unexpected error occurred: {str(e)}" + + +class CrewAIWrapper: + """Wrapper for CrewAI agents that handles calendar tasks.""" + + def __init__(self, verbose: bool = True, model_name: str = "gpt-3.5-turbo-0125"):#gpt-4o-mini"): + """Initialize the CrewAI wrapper with a calendar agent.""" + # Setup LLM + openai_api_key_str = os.environ.get("OPENAI_API_KEY", "") + if not openai_api_key_str: + print("Warning: OPENAI_API_KEY not found in environment variables.") + + llm = ChatOpenAI( + model=model_name, + temperature=0.7, + api_key=SecretStr(openai_api_key_str) if openai_api_key_str else None + ) + + # Create an agent with calendar tools + self.agent = Agent( + role="Calendar Assistant", + goal="Help users access, create, update, and manage their Google Calendar. Coordinate with the Gmail Assistant for tasks requiring email access.", + backstory="You are an expert calendar assistant. You can manage schedules, appointments, and events. You are also aware of a Gmail Assistant and can suggest using it if a user's request involves reading emails (e.g., to find event details sent via email) or sending email confirmations.", + verbose=verbose, + allow_delegation=False, + tools=[ + list_calendar_events, + search_calendar_events, + get_calendar_details, + create_calendar_event, + update_calendar_event, + delete_calendar_event + ], + llm=llm + ) + + # Define the CrewAI structure + self.crew = Crew( + agents=[self.agent], + tasks=[], + verbose=verbose, + process=Process.sequential, + ) + + async def process_calendar_query(self, prompt: str) -> str: + """ + Run the CrewAI agent to process calendar queries. + + Args: + prompt: The user's calendar-related prompt + + Returns: + The calendar information requested + """ + current_utc_date = datetime.now(timezone.utc).strftime('%Y-%m-%d') + # Metaprompt for collaboration, date awareness, input validation, preventing hallucinations, and conference creation + metaprompt = ( + f"You are the Google Calendar Assistant. Today's UTC date is {current_utc_date}. " + "Your goal is to fulfill the user's request using your tools. You have a tool `create_calendar_event`, which automatically adds a video link (like Google Meet) if `create_conference` is true (default). " + "NEVER invent information, especially personal details like emails. Only use what's given or derived from tool output. " + "Before using any tool, ensure all critical info is present. If anything essential is missing, ASK the user for it—do NOT proceed without it. " + "To create events: you MUST have a summary, start time, and end time. " + "- Attendee emails:\n" + " - If a person is mentioned (e.g., 'Meet with Olga'), assume they should be invited as an attendee.\n" + " - Try to find their email using prior event history or from related emails.\n" + " - If not found, ask the user to provide it using the Task input.required prompt (e.g., 'What is Olga's email address so I can send the invite?').\n" + " - Do NOT invent or use placeholders like 'name@example.com'.\n" + " - If an email looks generic (e.g., olga@example.com), ASK for confirmation before proceeding.\n" + "- Timezones:\n" + " - Do NOT convert times unless told to. Use any provided IANA timezone (e.g., Europe/Paris) as-is.\n" + " - If the time includes a named zone (e.g., '10 AM EST'), map it to IANA (e.g., America/New_York) and use it in the `time_zone` parameter.\n" + " - If time is ISO 8601 with 'Z' or offset, pass it as-is without `time_zone`.\n" + " - If the user gives time without timezone info (e.g., '2 PM'), you MUST ask for an IANA timezone. Never assume UTC.\n" + "- After using `create_calendar_event` or `update_calendar_event`, verify success. Only say it's successful if tool confirms with 'Event created successfully and verified'. Report errors clearly.\n" + "- For updates/deletes: you MUST have `event_id`. If missing, ask the user or suggest listing/searching events.\n" + "- Adding attendees: if only names are given, ask for email addresses. Do NOT proceed with placeholders.\n" + "- If times/dates are vague (e.g., 'evening', 'next Tuesday'), ask for exact info.\n" + "If any of the following is missing: summary, start/end time, valid/confirmed attendee emails, timezone for local time, or event_id (for updates), DO NOT USE ANY TOOL. Ask the user instead. " + "If the request needs info from emails, refer the user to the Gmail Assistant. Only act on calendar tasks if all data is present." + ) + + + + # Check if this is a request to force re-authentication + force_reauth = "force reauth" in prompt.lower() or "re-authenticate" in prompt.lower() + + # If permission error is mentioned, suggest re-authentication + if "permission" in prompt.lower() or "403" in prompt or "insufficient authentication" in prompt: + update_prompt = f"{prompt}\n\nIt seems there might be authentication or permission issues. Try using force_reauth=True to re-authenticate with Google Calendar." + else: + update_prompt = prompt + + # Create a dynamic task based on user input + print(f"Calendar Prompt: {update_prompt}") + task = CrewTask( + description=f"{metaprompt}\\n\\nUser request: {update_prompt}", + expected_output="Retrieved or modified calendar information based on the user's request, or guidance to consult the Gmail assistant.", + agent=self.agent + ) + + # Update crew tasks and execute + self.crew.tasks = [task] + + # Run the crew in an executor to avoid blocking + loop = asyncio.get_event_loop() + result = await loop.run_in_executor(None, self.crew.kickoff) + # Extract string from CrewOutput object + if hasattr(result, 'raw'): + return str(result.raw) + return str(result) + + + +# Set up Elkar A2A server with CrewAI integration +async def setup_server(): + """Main function to set up the Elkar A2A server with CrewAI integration.""" + + # Create the CrewAI wrapper + crew_ai_wrapper = CrewAIWrapper() + + # Create the agent card - using proper AgentSkill objects + agent_card = AgentCard( + name="Google Calendar Assistant", + description="Manages Google Calendar. Can create, update, delete, and list events. Collaborates with the Gmail Assistant for tasks requiring email access (e.g., finding event details in emails).", + url="http://localhost:5002", + version="1.0.0", + skills=[ + AgentSkill( + id="calendar-assistant", + name="calendar-assistant", + description="Overall capability to manage and interact with Google Calendar. This includes understanding natural language requests related to calendar operations.", + tags=["calendar", "assistant", "google calendar", "natural language processing"], + examples=[ + "What's on my calendar for tomorrow?", + "Can you schedule a meeting for me?", + "Remind me about my doctor's appointment." + ], + inputModes=["text"], + outputModes=["text"] + ), + AgentSkill( + id="google-calendar", + name="google-calendar", + description="Direct interaction with the Google Calendar API. This skill covers the technical aspects of connecting to and using Google Calendar services.", + tags=["google calendar", "api", "integration", "events"], + examples=[ + "List events from my primary calendar.", + "Authenticate with Google Calendar.", + "Check permissions for calendar access." + ], + inputModes=["text"], # Can also be API calls internally + outputModes=["text", "json"] # API responses + ), + AgentSkill( + id="event-management", + name="event-management", + description="Specific skills for creating, reading, updating, and deleting (CRUD) calendar events. Handles event details like summaries, times, attendees, and locations.", + tags=["event", "create", "update", "delete", "list", "manage", "crud"], + examples=[ + "Create an event titled 'Team Meeting' for next Monday at 10 AM.", + "Update my 'Project Deadline' event to be on Friday.", + "Delete the 'Lunch with Alex' event.", + "Show me all events for next week." + ], + inputModes=["text"], + outputModes=["text"] + ), + AgentSkill( + id="scheduling", + name="scheduling", + description="Assists with scheduling appointments, meetings, and managing time slots. This can involve finding free time, coordinating with attendees (if supported), and setting reminders.", + tags=["schedule", "appointment", "meeting", "time management", "coordination"], + examples=[ + "Schedule a 30-minute call with Sarah next Tuesday afternoon.", + "Find a free slot for a 1-hour meeting next week.", + "Add a recurring weekly reminder for project updates." + ], + inputModes=["text"], + outputModes=["text"] + ) + ], + capabilities=AgentCapabilities( + streaming=True, + pushNotifications=True, + stateTransitionHistory=True, + ), + ) + + # Create a simple function to check for placeholder emails in the user's input + def has_placeholder_in_prompt(prompt: str) -> Tuple[bool, str]: + """ + Check if the user's prompt contains any common placeholder emails. + Returns (has_placeholder, message) + """ + for placeholder_domain in ["example.com", "example.org", "test.com"]: + if placeholder_domain in prompt: + return True, f"I notice you're using an email with '{placeholder_domain}' which appears to be a placeholder. Please provide a valid email address instead." + + return False, "" + + # Define the task handler to process requests via CrewAI + async def task_handler( + task: TaskModifier, request_context: RequestContext | None + ) -> None: + """Handle tasks by using the CrewAI wrapper to process calendar requests.""" + + try: + # Access the task message using the pattern from crewai_a2a_server.py + task_obj = task._task + user_prompt = "" + print(f"Task: {task_obj}") + + # Extract text from message parts if they exist + if hasattr(task_obj, 'status') and hasattr(task_obj.status, 'message') and task_obj.status.message and hasattr(task_obj.status.message, 'parts'): + for part in task_obj.status.message.parts: + if hasattr(part, "text") and isinstance(part, TextPart): + user_prompt += part.text + # If no message in status, try to look in history + elif hasattr(task_obj, 'history') and task_obj.history and len(task_obj.history) > 0: + for msg in task_obj.history: + if hasattr(msg, 'parts'): + for part in msg.parts: + if hasattr(part, "text") and isinstance(part, TextPart): + user_prompt += part.text + + print(f"User prompt: {user_prompt}") + if not user_prompt: + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text="No text prompt was provided in the request.")], + ), + ), + is_final=True, + ) + return + + # Simple check for placeholder emails in the user's request + has_placeholder, placeholder_message = has_placeholder_in_prompt(user_prompt) + if has_placeholder: + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text=placeholder_message)], + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + return + + # Check if this is a request to re-authenticate + if "force reauth" in user_prompt.lower() or "re-authenticate" in user_prompt.lower(): + await task.set_status( + TaskStatus( + state=TaskState.WORKING, + message=Message( + role="agent", + parts=[TextPart(text="I'll try to re-authenticate with Google Calendar...")], + ), + ) + ) + + # Try forcing re-authentication first + try: + get_calendar_service(force_reauth=True) + await task.set_status( + TaskStatus( + state=TaskState.WORKING, + message=Message( + role="agent", + parts=[TextPart(text="Successfully re-authenticated with Google Calendar. Now processing your request...")], + ), + ) + ) + except Exception as e: + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text=f"Failed to re-authenticate: {str(e)}")], + ), + ), + is_final=True, + ) + return + else: + # Send initial status update + await task.set_status( + TaskStatus( + state=TaskState.WORKING, + message=Message( + role="agent", + parts=[TextPart(text="I'm processing your calendar request...")], + ), + ) + ) + + # Add a simple warning about placeholder emails + user_prompt += "\n\nNever use placeholder email domains like example.com." + + # Call CrewAI to process calendar query + result = await crew_ai_wrapper.process_calendar_query(user_prompt) + + # Check for error indicators in the response + if any(indicator in result.lower() for indicator in ["http error", "api error", "permission error"]): + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text=f"There was an issue with the calendar request: {result}")], + ), + ), + is_final=True, + ) + return + + # Check for placeholder domains in the result + if any(domain in result for domain in ["example.com", "example.org", "test.com"]): + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text="I notice there's a placeholder email address in the request. Please provide a valid email address.")], + ), + ), + is_final=False, + ) + return + + # Check if the agent is asking for more information + if result.strip().endswith("?") or any(phrase in result.lower() for phrase in ["please provide", "do you mean", "clarify"]): + await task.set_status( + TaskStatus( + state=TaskState.INPUT_REQUIRED, + message=Message( + role="agent", + parts=[TextPart(text=result)], # Pass the agent's question back + ), + ), + is_final=False, # Mark as not final, expecting more input from user + ) + else: + # Agent provided a final response or took an action + await task.upsert_artifacts( + [ + Artifact( + parts=[TextPart(text=result)], + index=0, + ) + ] + ) + + # Mark the task as completed + await task.set_status( + TaskStatus( + state=TaskState.COMPLETED, + message=Message( + role="agent", + parts=[TextPart(text="Calendar request processed.")], # Generic message, artifact has details + ), + ), + is_final=True, + ) + + except Exception as e: + # Handle any errors + error_message = f"Error processing calendar request: {str(e)}" + await task.set_status( + TaskStatus( + state=TaskState.FAILED, + message=Message( + role="agent", + parts=[TextPart(text=f"Error processing request: {str(e)}")], + ), + ), + is_final=True, + ) + + # Create the task manager with the handler + task_manager = TaskManagerWithModifier( + agent_card, send_task_handler=task_handler + ) + + # Configure the ElkarClientStore + api_key = os.environ.get("ELKAR_API_KEY", "") # Get API key from environment variables + if not api_key: + print("Warning: ELKAR_API_KEY not found in environment variables.") + store = ElkarClientStore(base_url="https://api.elkar.co/api", api_key=api_key) + + task_manager: TaskManagerWithModifier = TaskManagerWithModifier( + agent_card, + send_task_handler=task_handler, + store=store # Pass the configured store to the task manager + ) + + server = A2AServer(task_manager, host="0.0.0.0", port=5002, endpoint="/") + return server + + +if __name__ == "__main__": + # Set up and run the server using uvicorn directly, avoiding asyncio conflict + server = asyncio.run(setup_server()) + print("Starting Elkar A2A server on http://localhost:5002") + print("Press Ctrl+C to stop the server") + + # Run with uvicorn directly instead of server.start() + uvicorn.run(server.app, host="0.0.0.0", port=5002) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..7a83444 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,29 @@ +# Core dependencies +crewai>=0.11.0 +langchain-openai>=0.0.5 +langchain-core>=0.1.15 +elkar>=0.1.0 +python-dotenv>=1.0.0 + +# Google API dependencies +google-api-python-client>=2.108.0 +google-auth-httplib2>=0.1.1 +google-auth-oauthlib>=1.1.0 + +# Web server dependencies +uvicorn>=0.24.0 +fastapi>=0.104.1 + +# Database dependencies +asyncpg>=0.29.0 + +# Utility dependencies +pydantic>=2.5.2 +httplib2>=0.22.0 +uuid>=1.30 + +# MCP dependencies +mcp>=0.1.0 + +# Type hints +typing-extensions>=4.8.0 \ No newline at end of file diff --git a/src/elkar/cli.py b/src/elkar/cli.py new file mode 100644 index 0000000..9e25c69 --- /dev/null +++ b/src/elkar/cli.py @@ -0,0 +1,74 @@ +import typer +from typing_extensions import Annotated +import logging +import uvicorn +from fastapi import FastAPI + +# Assuming your FastMCP instance is in server_mcp.py and named 'mcp' +from .server_mcp import mcp + +# Attempt to import the SSE route utility +try: + from mcp.server.fastapi import add_sse_route +except ImportError: + add_sse_route = None + logging.warning( + "Could not import 'add_sse_route' from 'mcp.server.fastapi'. " + "SSE mode might not work correctly if this utility is required. " + "Please ensure your FastMCP library is installed correctly and provides this utility, or adjust the import path." + ) + +app = typer.Typer( + name="elkar-mcp-cli", + help="CLI for running the Elkar MCP server with different transport protocols.", + add_completion=False, +) + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +@app.command() +def stdio(): + """ + Starts the MCP server with STDIO transport. + Reads from stdin and writes to stdout. + """ + logger.info("Starting MCP server with STDIO transport...") + mcp.run(transport="stdio") + +@app.command() +def sse( + port: Annotated[int, typer.Option(help="Port to run the SSE server on.")] = 8000, + host: Annotated[str, typer.Option(help="Host to bind the SSE server to.")] = "0.0.0.0", +): + """ + Starts the MCP server with SSE (Server-Sent Events) transport. + Runs an HTTP server for SSE communication. + """ + logger.info(f"Starting MCP server with SSE transport on {host}:{port}...") + + if add_sse_route is None: + logger.error("Cannot start SSE server because 'add_sse_route' utility could not be imported.") + logger.error("Please check your FastMCP installation and the import path in cli.py.") + return + + try: + # Create a FastAPI app + fastapi_app = FastAPI( + title=f"{mcp.name} - SSE Server", + description=mcp.instructions, + # lifespan=mcp.lifespan # If your mcp has a lifespan, pass it here + ) + + # Add the MCP SSE route to the FastAPI app + add_sse_route(fastapi_app, mcp) + + # Run the FastAPI app with Uvicorn + uvicorn.run(fastapi_app, host=host, port=port) + + except Exception as e: + logger.error(f"An unexpected error occurred while trying to start the SSE server: {e}") + logger.error("Ensure FastAPI is installed and mcp object is correctly initialized.") + +if __name__ == "__main__": + app() \ No newline at end of file diff --git a/src/elkar/client/a2a_client.py b/src/elkar/client/a2a_client.py index d3155ab..3480c09 100644 --- a/src/elkar/client/a2a_client.py +++ b/src/elkar/client/a2a_client.py @@ -73,7 +73,7 @@ async def _make_request( url = f"{self.config.base_url.rstrip('/')}" if endpoint: url = f"{url}/{endpoint.lstrip('/')}" - serialized_data = data.model_dump() if data else None + serialized_data = data.model_dump(exclude_none=True) if data else None async with self._session.request(method, url, json=serialized_data) as response: response.raise_for_status() return await response.json() @@ -110,7 +110,7 @@ async def send_task_streaming(self, task_params: TaskSendParams) -> AsyncIterabl if not self._session: raise RuntimeError("Client session not initialized. Use 'async with' context manager.") - async with self._session.post(self.config.base_url, json=request) as response: + async with self._session.post(self.config.base_url, json=request.model_dump(exclude_none=True)) as response: response.raise_for_status() return self._stream_response(response) @@ -141,7 +141,7 @@ async def resubscribe_to_task(self, task_params: TaskIdParams) -> AsyncIterable[ if not self._session: raise RuntimeError("Client session not initialized. Use 'async with' context manager.") - async with self._session.post(self.config.base_url, json=request) as response: + async with self._session.post(self.config.base_url, json=request.model_dump(exclude_none=True)) as response: response.raise_for_status() return self._stream_response(response) diff --git a/src/elkar/server_mcp.py b/src/elkar/server_mcp.py new file mode 100644 index 0000000..607db24 --- /dev/null +++ b/src/elkar/server_mcp.py @@ -0,0 +1,380 @@ +from mcp.server import Server +from mcp.server.models import InitializationOptions +import mcp.types as types +from typing import List, Dict, Any, Optional, Union +from dataclasses import dataclass +from mcp.server.fastmcp import FastMCP, Context +import os +import json +import datetime +from dotenv import load_dotenv +from pydantic import AnyUrl +import mcp.server.stdio +import logging +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +import asyncpg +import urllib.parse +import uuid +# Add Elkar A2A client imports +from elkar.a2a_types import Message, TextPart, TaskSendParams, Task, TaskState +from elkar.client.a2a_client import A2AClient, A2AClientConfig + + +load_dotenv() + +settings = dict( + debug=True, + log_level="DEBUG", + warn_on_duplicate_resources=True, + warn_on_duplicate_tools=True, + warn_on_duplicate_prompts=True, +) + + +mcp = FastMCP( + "A2A com using elkar", + instructions="""You are a helpful assistant.""", + # lifespan=breezy_lifespan +) + +logging.basicConfig(level=logging.INFO) + +logger = logging.getLogger(__name__) + + +@mcp.tool() +async def send_task_to_a2a_agent( + ctx: Context, + agent_url: str, + task_message: str, + task_id: Optional[str] = None, + headers: Optional[Dict[str, str]] = None, + timeout: int = 300, + metadata: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + """Sends a task to a specified Elkar A2A (Agent-to-Agent) compliant agent and awaits its response. + + As an MCP tool, this function communicates with a running A2A agent (server) to request task execution. + It leverages the Elkar A2A protocol; this framework is utilized for debugging, logging of task interactions, + and maintaining the status of the submitted task. + The function constructs a task with the given message and parameters, sends it to the target agent's URL, + and then processes the agent's response. The interaction is stateless, meaning each call is independent, + and the remote agent does not retain context from previous calls unless explicitly managed within the task itself. + + The function first attempts to retrieve the agent's 'card' (a summary of its capabilities and identity) + to ensure connectivity and gather basic information. If successful, it proceeds to send the actual task. + The response from the agent, which includes the task's final status, any generated artifacts (results), + and potential error messages, is then parsed and returned. + + Args: + ctx: The MCP (Multi-Capability Platform) context object, providing access to platform features. + agent_url: The complete base URL of the target Elkar A2A agent (e.g., 'http://localhost:5001'). + task_message: The primary content or instruction for the task to be performed by the remote agent. + task_id: An optional custom identifier for the task. If not provided, a unique UUID will be automatically generated. + headers: Optional dictionary of HTTP headers to include in the request, typically for authentication (e.g., API keys) or custom routing. + timeout: The maximum time in seconds to wait for a response from the A2A agent. Defaults to 300 seconds (5 minutes). + metadata: Optional dictionary of arbitrary key-value pairs to be sent along with the task, which the remote agent might use. + + Returns: + A dictionary containing the outcome of the task interaction. This includes: + - 'task_id': The ID of the task. + - 'status': The final state of the task (e.g., 'completed', 'failed', 'input_required'). + - 'agent_info': Basic information about the agent (name, description, version). + - 'message': (Optional) A message from the agent related to the task's status. + - 'artifacts': (Optional) A list of strings, where each string is the content of an artifact produced by the agent. + - 'error': (Optional) An error message if any part of the process failed (e.g., connection error, task error, client initialization error). + """ + try: + # Configure the A2A client + config = A2AClientConfig( + base_url=agent_url, + headers=headers, + timeout=timeout + ) + + # Create message from input text + message = Message( + role="user", + parts=[TextPart(text=task_message)] + ) + + # Prepare task parameters - use model_dump to serialize properly + task_params = TaskSendParams( + id=task_id or str(uuid.uuid4()), # Will generate a UUID if None + message=message, + metadata=metadata or {}, + sessionId="1" + ) + + logger.debug(f"Sending task parameters: {json.dumps(task_params.model_dump())}") + + # Use the client as a context manager for proper resource management + async with A2AClient(config) as client: + # Get the agent card to verify connection + try: + agent_card = await client.get_agent_card() + agent_info = { + "name": agent_card.name, + "description": agent_card.description, + "version": agent_card.version + } + except Exception as e: + return { + "error": f"Failed to connect to A2A agent: {str(e)}", + "status": "connection_error" + } + + # Send the task + try: + # Modified task_params before sending: + if hasattr(task_params, "model_dump"): + serialized_params = task_params.model_dump(mode='json') + else: + # Fall back to dict conversion + serialized_params = task_params.__dict__ + response = await client.send_task(task_params) # Use the original task_params object, not the serialized dict + logger.debug(f"Response: {response}") + # Handle error response + if response.error: + + return { + "error": response.error.message, + "status": "task_error", + "agent_info": agent_info + } + + # Handle successful response + task = response.result + if task is None: + return { + "error": "No task received from A2A agent", + "status": "task_error", + "agent_info": agent_info + } + logger.debug(f"Task: {task}") + result = { + "task_id": task.id, + "status": task.status.state.value if task.status and task.status.state else "unknown", + "agent_info": agent_info, + } + + # Include the message from the status if available + if task.status and task.status.message: + result["message"] = " ".join([ + part.text for part in task.status.message.parts + if hasattr(part, "text") + ]) + + # Include artifacts if available + if task.artifacts and len(task.artifacts) > 0: + artifacts_content = [] + for artifact in task.artifacts: + artifact_text = " ".join([ + part.text for part in artifact.parts + if hasattr(part, "text") + ]) + artifacts_content.append(artifact_text) + + result["artifacts"] = artifacts_content + + return result + + except Exception as e: + return { + "error": f"Error while sending task: {str(e)}", + "status": "request_error", + "agent_info": agent_info + } + + except Exception as e: + return { + "error": f"Failed to initialize A2A client: {str(e)}", + "status": "client_error" + } + +@mcp.tool() +async def get_a2a_agent_card( + # ctx: Context, + agent_url: str, + # timeout: int = 10, + # headers: Optional[Dict[str, str]] = None +) -> Dict[str, Any]: + """Retrieves and returns the 'Agent Card' from a specified Elkar A2A agent. + + The Agent Card provides metadata about an A2A agent, including its name, description, version, + capabilities (like streaming support, push notifications, state history), and the skills it offers. + This function is useful for discovering an agent's functionalities before sending it tasks. + + Args: + ctx: The MCP context object. + agent_url: The base URL of the Elkar A2A agent from which to fetch the card (e.g., 'http://localhost:5001'). + timeout: The maximum time in seconds to wait for a response from the agent. Defaults to 10 seconds. + headers: Optional dictionary of HTTP headers to include in the request, often used for authentication. + + Returns: + A dictionary containing the agent card information upon success, or error details upon failure. + Successful response structure: + - 'status': 'success' + - 'url': The agent_url queried. + - 'name': Name of the agent. + - 'description': Description of the agent. + - 'version': Version of the agent. + - 'capabilities': (Optional) Dictionary of agent capabilities (streaming, pushNotifications, stateTransitionHistory). + - 'skills': (Optional) List of skills, each with 'id' and 'name'. + Error response structure: + - 'status': 'error' + - 'url': The agent_url queried. + - 'error': A message detailing the error (e.g., connection failure, error retrieving card). + """ + try: + # Configure the A2A client + config = A2AClientConfig( + base_url=agent_url, + headers=headers, + timeout=timeout + ) + + # Use the client as a context manager for proper resource management + async with A2AClient(config) as client: + try: + # Retrieve the agent card + agent_card = await client.get_agent_card() + + # Extract and return comprehensive information + result = { + "status": "success", + "url": agent_url, + "name": agent_card.name, + "description": agent_card.description, + "version": agent_card.version, + } + + # Include capabilities + if hasattr(agent_card, "capabilities"): + result["capabilities"] = {} + capabilities = agent_card.capabilities + + if hasattr(capabilities, "streaming"): + result["capabilities"]["streaming"] = capabilities.streaming + + if hasattr(capabilities, "pushNotifications"): + result["capabilities"]["pushNotifications"] = capabilities.pushNotifications + + if hasattr(capabilities, "stateTransitionHistory"): + result["capabilities"]["stateTransitionHistory"] = capabilities.stateTransitionHistory + + # Include skills + if hasattr(agent_card, "skills") and agent_card.skills: + result["skills"] = [ + {"id": skill.id, "name": skill.name} + for skill in agent_card.skills + ] + + return result + + except Exception as e: + return { + "status": "error", + "url": agent_url, + "error": f"Error retrieving agent card: {str(e)}" + } + + except Exception as e: + return { + "status": "error", + "url": agent_url, + "error": f"Failed to connect to A2A agent: {str(e)}" + } + +@mcp.tool() +async def discover_a2a_agents( + ctx: Context, + agent_urls: Optional[List[str]] = None, + timeout: int = 10, + headers: Optional[Dict[str, str]] = None +) -> Dict[str, Any]: + """Discovers available Elkar A2A agents by querying a list of URLs and summarizing their capabilities, potentially enabling new features by identifying agents with specific skills. + + This function iterates through a provided list of agent URLs (or a default list from environment variables) + and attempts to retrieve the Agent Card from each. It compiles a report of successfully contacted agents + along with their key information (name, description, version, capabilities, skills) and a list of any + URLs that resulted in errors (e.g., connection failed, agent card not retrieved). + + This is useful for dynamically finding and understanding the A2A agents available in a network. + + Returns: + A dictionary with two keys: + - 'available_agents': A list of dictionaries, where each dictionary represents a successfully + contacted agent and contains its 'url', 'name', 'description', 'version', + 'capabilities' (streaming, pushNotifications, stateTransitionHistory), and 'skills'. + - 'errors': A list of dictionaries, where each entry details an error encountered while trying + to contact an agent, including the 'url' and an 'error' message. + """ + # If agent_urls not provided, use environment variable + if agent_urls is None: + # Get from environment and split by comma + env_urls = os.environ.get('AGENT_URLS', 'http://localhost:5001') + agent_urls = [url.strip() for url in env_urls.split(',')] + + results = { + "available_agents": [], + "errors": [] + } + + for url in agent_urls: + try: + # Configure the A2A client for this URL + config = A2AClientConfig( + base_url=url, + headers=headers, + timeout=timeout + ) + + # Use the client as a context manager for proper resource management + async with A2AClient(config) as client: + try: + # Retrieve the agent card + agent_card = await client.get_agent_card() + + # Extract key information from the agent card + agent_info = { + "url": url, + "name": agent_card.name, + "description": agent_card.description, + "version": agent_card.version, + "capabilities": { + "streaming": agent_card.capabilities.streaming if hasattr(agent_card.capabilities, "streaming") else False, + "pushNotifications": agent_card.capabilities.pushNotifications if hasattr(agent_card.capabilities, "pushNotifications") else False, + "stateTransitionHistory": agent_card.capabilities.stateTransitionHistory if hasattr(agent_card.capabilities, "stateTransitionHistory") else False, + } + } + + # Include skills information if available + if hasattr(agent_card, "skills") and agent_card.skills: + agent_info["skills"] = [ + {"id": skill.id, "name": skill.name} + for skill in agent_card.skills + ] + + results["available_agents"].append(agent_info) + + except Exception as e: + # Add to errors if we can connect but can't get the agent card + results["errors"].append({ + "url": url, + "error": f"Error retrieving agent card: {str(e)}" + }) + + except Exception as e: + # Add to errors if we can't even connect to the URL + results["errors"].append({ + "url": url, + "error": f"Failed to connect to A2A agent: {str(e)}" + }) + + return results + +if __name__ == "__main__": + load_dotenv() + mcp.run("stdio") \ No newline at end of file