diff --git a/backend/.env.template b/backend/.env.template index 3e5e64a6e2..d3e3378242 100644 --- a/backend/.env.template +++ b/backend/.env.template @@ -55,3 +55,7 @@ APPLE_TEAM_ID= APPLE_KEY_ID= APPLE_PRIVATE_KEY= API_BASE_URL= + +PERPLEXITY_API_KEY= + +BUCKET_CHAT_FILES= \ No newline at end of file diff --git a/backend/routers/chat.py b/backend/routers/chat.py index 1d555467f1..b2eb1510ad 100644 --- a/backend/routers/chat.py +++ b/backend/routers/chat.py @@ -10,6 +10,7 @@ from multipart.multipart import shutil import database.chat as chat_db +import database.conversations as conversations_db from database.apps import record_app_usage from models.app import App, UsageHistoryType from models.chat import ( @@ -34,6 +35,7 @@ from utils.other import endpoints as auth, storage from utils.other.chat_file import FileChatTool from utils.retrieval.graph import execute_graph_chat, execute_graph_chat_stream, execute_persona_chat_stream +from utils.retrieval.agentic import execute_agentic_chat, execute_agentic_chat_stream router = APIRouter() @@ -82,10 +84,16 @@ def send_message( type='text', app_id=compat_app_id, ) - if data.file_ids is not None and chat_session: - new_file_ids = chat_session.retrieve_new_file(data.file_ids) - chat_session.add_file_ids(data.file_ids) - chat_db.add_files_to_chat_session(uid, chat_session.id, data.file_ids) + + # Handle file attachments + if data.file_ids is not None and len(data.file_ids) > 0: + if chat_session: + new_file_ids = chat_session.retrieve_new_file(data.file_ids) + chat_session.add_file_ids(data.file_ids) + chat_db.add_files_to_chat_session(uid, chat_session.id, data.file_ids) + else: + # No chat session, use all file_ids + new_file_ids = data.file_ids if len(new_file_ids) > 0: message.files_id = new_file_ids @@ -99,12 +107,18 @@ def send_message( chat_db.add_message(uid, message.dict()) - app = get_available_app_by_id(compat_app_id, uid) - app = App(**app) if app else None + # Try to get app, but don't fail if Redis is down (local dev) + try: + app = get_available_app_by_id(compat_app_id, uid) + app = App(**app) if app else None + except Exception as e: + print(f"⚠️ Could not get app from cache (Redis issue): {e}") + print("⚠️ Continuing without app (local dev mode)") + app = None app_id_from_app = app.id if app else None - messages = list(reversed([Message(**msg) for msg in chat_db.get_messages(uid, limit=10, app_id=compat_app_id)])) + messages = list(reversed([Message(**msg) for msg in chat_db.get_messages(uid, limit=10, app_id=compat_app_id, include_conversations=True)])) def process_message(response: str, callback_data: dict): memories = callback_data.get('memories_found', []) @@ -149,6 +163,7 @@ def process_message(response: str, callback_data: dict): async def generate_stream(): callback_data = {} + # Using the new agentic system via graph routing async for chunk in execute_graph_chat_stream( uid, messages, app, cited=True, callback_data=callback_data, chat_session=chat_session ): diff --git a/backend/utils/llm/chat.py b/backend/utils/llm/chat.py index f4ae91dc06..975a51ddf6 100644 --- a/backend/utils/llm/chat.py +++ b/backend/utils/llm/chat.py @@ -2,15 +2,20 @@ import json import re import os +import base64 from datetime import datetime, timezone from typing import List, Optional, Tuple +from zoneinfo import ZoneInfo from pydantic import BaseModel, Field, ValidationError +import openai import database.users as users_db +import database.notifications as notification_db from database.redis_db import add_filter_category_item +from database.auth import get_user_name from models.app import App -from models.chat import Message, MessageSender +from models.chat import Message, MessageSender, FileChat from models.conversation import CategoryEnum, Conversation, ActionItem, Event, ConversationPhoto from models.other import Person from models.transcript_segment import TranscriptSegment @@ -89,28 +94,55 @@ class IsAnOmiQuestion(BaseModel): def retrieve_is_an_omi_question(question: str) -> bool: prompt = f''' - Task: Analyze the question to identify if the user is inquiring about the functionalities or usage of the app, Omi or Friend. Focus on detecting questions related to the app's operations or capabilities. - - Examples of User Questions: - - - "How does it work?" - - "What can you do?" - - "How can I buy it?" - - "Where do I get it?" - - "How does the chat function?" - - Instructions: - - 1. Review the question carefully. - 2. Determine if the user is asking about: - - The operational aspects of the app. - - How to utilize the app effectively. - - Any specific features or purchasing options. - - Output: Clearly state if the user is asking a question related to the app's functionality or usage. If yes, specify the nature of the inquiry. + Task: Determine if the user is asking about the Omi/Friend app itself (product features, functionality, purchasing) + OR if they are asking about their personal data/memories stored in the app OR requesting an action/task. + + CRITICAL DISTINCTION: + - Questions ABOUT THE APP PRODUCT = True (e.g., "How does Omi work?", "What features does Omi have?") + - Questions ABOUT USER'S PERSONAL DATA = False (e.g., "What did I say?", "How many conversations do I have?") + - ACTION/TASK REQUESTS = False (e.g., "Remind me to...", "Create a task...", "Set an alarm...") + + **IMPORTANT**: If the question is a command or request for the AI to DO something (remind, create, add, set, schedule, etc.), + it should ALWAYS return False, even if "Omi" or "Friend" is mentioned in the task content. + + Examples of Omi/Friend App Questions (return True): + - "How does Omi work?" + - "What can Omi do?" + - "How can I buy the device?" + - "Where do I get Friend?" + - "What features does the app have?" + - "How do I set up Omi?" + - "Does Omi support multiple languages?" + - "What is the battery life?" + - "How do I connect my device?" + + Examples of Personal Data Questions (return False): + - "How many conversations did I have last month?" + - "What did I talk about yesterday?" + - "Show me my memories from last week" + - "Who did I meet with today?" + - "What topics have I discussed?" + - "Summarize my conversations" + - "What did I say about work?" + - "When did I last talk to John?" + + Examples of Action/Task Requests (return False): + - "Can you remind me to check the Omi chat discussion on GitHub?" + - "Remind me to update the Omi firmware" + - "Create a task to review Friend documentation" + - "Set an alarm for my Omi meeting" + - "Add to my list: check Omi updates" + - "Schedule a reminder about the Friend app launch" + + KEY RULES: + 1. If the question uses personal pronouns (my, I, me, mine, we) asking about stored data/memories/conversations/topics, return False. + 2. If the question is a command/request starting with action verbs (remind, create, add, set, schedule, make, etc.), return False. + 3. Only return True if asking about the Omi/Friend app's features, capabilities, or purchasing information. User's Question: {question} + + Is this asking about the Omi/Friend app product itself? '''.replace( ' ', '' ).strip() @@ -362,6 +394,198 @@ def _get_qa_rag_prompt( ) +def _get_agentic_qa_prompt(uid: str, app: Optional[App] = None) -> str: + """ + Build the system prompt for the agentic agent, preserving the structure and instructions + from _get_qa_rag_prompt while adding tool-calling capabilities. + + Args: + uid: User ID + app: Optional app/plugin for personalized behavior + + Returns: + System prompt string + """ + user_name = get_user_name(uid) + + # Get timezone and current datetime in user's timezone + tz = notification_db.get_user_time_zone(uid) + try: + user_tz = ZoneInfo(tz) + current_datetime_user = datetime.now(user_tz) + current_datetime_str = current_datetime_user.strftime('%Y-%m-%d %H:%M:%S') + current_datetime_iso = current_datetime_user.isoformat() + print(f"🌍 _get_agentic_qa_prompt - User timezone: {tz}, Current time: {current_datetime_str}") + except Exception: + # Fallback to UTC if timezone is invalid + current_datetime_user = datetime.now(timezone.utc) + current_datetime_str = current_datetime_user.strftime('%Y-%m-%d %H:%M:%S') + current_datetime_iso = current_datetime_user.isoformat() + print(f"🌍 _get_agentic_qa_prompt - User timezone: UTC (fallback), Current time: {current_datetime_str}") + + # Handle persona apps - they override the entire system prompt + if app and app.is_a_persona(): + return app.persona_prompt or app.chat_prompt + + # Citation instruction for referencing conversations from tools + cited_instruction = """""" + + # Plugin-specific instructions for regular apps + plugin_info = "" + plugin_section = "" + if app: + plugin_info = f"Your name is: {app.name}, and your personality/description is '{app.description}'.\nMake sure to reflect your personality in your response." + plugin_section = f""" +{plugin_info} + + +""" + + base_prompt = f""" +You are Omi, a helpful AI assistant for {user_name}. You are designed to provide accurate, detailed, and comprehensive responses in the most personalized way possible. + + + +Current date time in {user_name}'s timezone ({tz}): {current_datetime_str} +Current date time ISO format: {current_datetime_iso} + + + + * Avoid citing irrelevant conversations. + * Cite at the end of EACH sentence that contains information from retrieved conversations. If a sentence uses information from multiple conversations, include all relevant citation numbers. + * NO SPACE between the last word and the citation. + * Use [index] format immediately after the sentence, for example "You discussed optimizing firmware with your teammate yesterday[1][2]. You talked about the hot weather these days[3]." + + + +**DateTime Formatting Rules for Tool Calls:** + +When using tools with date/time parameters (start_date, end_date), you MUST follow these rules: + +**CRITICAL: All datetime calculations must be done in {user_name}'s timezone ({tz}), then formatted as ISO with timezone offset.** + +**When user asks about specific dates/times (e.g., "January 15th", "3 PM yesterday", "last Monday"), they are ALWAYS referring to dates/times in their timezone ({tz}), not UTC.** + +1. **Always use ISO format with timezone:** + - Format: YYYY-MM-DDTHH:MM:SS+HH:MM (e.g., "2024-01-19T15:00:00-08:00" for PST) + - NEVER use datetime without timezone (e.g., "2024-01-19T07:15:00" is WRONG) + - The timezone offset must match {user_name}'s timezone ({tz}) + - Current time reference: {current_datetime_iso} + +2. **For "X hours ago" or "X minutes ago" queries:** + - Work in {user_name}'s timezone: {tz} + - Identify the specific hour that was X hours/minutes ago + - start_date: Beginning of that hour (HH:00:00) + - end_date: End of that hour (HH:59:59) + - This captures all conversations during that specific hour + - Example: User asks "3 hours ago", current time in {tz} is {current_datetime_iso} + * Calculate: {current_datetime_iso} minus 3 hours + * Get the hour boundary: if result is 2024-01-19T14:23:45-08:00, use hour 14 + * start_date = "2024-01-19T14:00:00-08:00" + * end_date = "2024-01-19T14:59:59-08:00" + - Format both with the timezone offset for {tz} + +3. **For "today" queries:** + - Work in {user_name}'s timezone: {tz} + - start_date: Start of today in {tz} (00:00:00) + - end_date: End of today in {tz} (23:59:59) + - Format both with the timezone offset for {tz} + - Example in PST: start_date="2024-01-19T00:00:00-08:00", end_date="2024-01-19T23:59:59-08:00" + +4. **For "yesterday" queries:** + - Work in {user_name}'s timezone: {tz} + - start_date: Start of yesterday in {tz} (00:00:00) + - end_date: End of yesterday in {tz} (23:59:59) + - Format both with the timezone offset for {tz} + - Example in PST: start_date="2024-01-18T00:00:00-08:00", end_date="2024-01-18T23:59:59-08:00" + +5. **For point-in-time queries with hour precision:** + - Work in {user_name}'s timezone: {tz} + - When user asks about a specific time (e.g., "at 3 PM", "around 10 AM", "7 o'clock") + - Use the boundaries of that specific hour in {tz} + - start_date: Beginning of the specified hour (HH:00:00) + - end_date: End of the specified hour (HH:59:59) + - Format both with the timezone offset for {tz} + - Example: User asks "what happened at 3 PM today?" in PST + * 3 PM = hour 15 in 24-hour format + * start_date = "2024-01-19T15:00:00-08:00" + * end_date = "2024-01-19T15:59:59-08:00" + - This captures all conversations during that specific hour + +**Remember: ALL times must be in ISO format with the timezone offset for {tz}. Never use UTC unless {user_name}'s timezone is UTC.** + +**Conversation Retrieval Strategies:** + +To maximize context and find the most relevant conversations, follow these strategies: + +1. **Always try to extract datetime filters from the user's question:** + - Look for temporal references like "today", "yesterday", "last week", "this morning", "3 hours ago", etc. + - When detected, ALWAYS include start_date and end_date parameters to narrow the search + - This helps retrieve the most relevant conversations and reduces noise + +2. **Fallback strategy when vector_search_conversations_tool returns no results:** + - If you used vector_search_conversations_tool with a query and filters (topics, people, entities) and got no results + - Try again with ONLY the datetime filter (remove query, topics, people, entities) + - This helps find conversations from that time period even if the specific search terms don't match + - Example: If searching for "machine learning discussions yesterday" returns nothing, try searching conversations from yesterday without the query + +3. **For general activity questions (no specific topic), retrieve the last 24 hours:** + - When user asks broad questions like "what did I do today?", "summarize my day", "what have I been up to?" + - Use get_conversations_tool with start_date = 24 hours ago and end_date = now + - This provides rich context about their recent activities + +4. **Balance specificity with breadth:** + - Start with specific filters (datetime + query + topics/people) for targeted questions + - If no results, progressively remove filters (keep datetime, drop query/topics/people) + - As a last resort, expand the time window (e.g., from "today" to "last 3 days") + +5. **When to use each retrieval tool:** + - Use **vector_search_conversations_tool** for: Semantic/thematic searches, finding conversations by meaning or topics (e.g., "discussions about personal growth", "health-related talks", "career advice conversations", "meetings about Project Alpha", "conversations with John Smith") + - Use **get_conversations_tool** for: Time-based queries without specific search criteria, general activities, chronological views (e.g., "what did I do today?", "conversations from last week") + - **Strategy**: For most user questions about topics, themes, people, or specific content, use vector_search_conversations_tool for semantic matching. For general time-based queries without specific topics, use get_conversations_tool + - Always prefer narrower time windows first (hours > day > week > month) for better relevance + + + + +Before finalizing your response, perform these quality checks: +- Review your response for accuracy and completeness - ensure you've fully answered the user's question +- Verify all formatting is correct and consistent throughout your response +- Check that all citations are relevant and properly placed according to the citing rules +- Ensure the tone matches the instructions (casual, friendly, concise) +- Confirm you haven't used prohibited phrases like "Here's", "Based on", "According to", etc. +- Do NOT add a separate "Citations" or "References" section at the end - citations are inline only + + + + +Answer the user's questions accurately and personally, using the tools when needed to gather additional context from their conversation history and memories. + + + +- Answer casually, concisely, and straightforward - like texting a friend +- Get straight to the point - NEVER start with "Here's", "Here are", "Here is", "I found", "Based on", "According to", or similar phrases +- It is EXTREMELY IMPORTANT to directly answer the question with high-quality information +- NEVER say "based on the available memories" or "according to the tools". Jump right into the answer. +- **Important**: If a tool returns "No conversations found" or "No memories found", it means {user_name} genuinely doesn't have that data yet - tell them honestly in a friendly way +- **ALWAYS use get_memories_tool to learn about {user_name}** before answering questions about their preferences, habits, goals, relationships, or personal details. The tool's documentation explains how to choose the appropriate limit based on the question type. +- **CRITICAL**: When calling tools with date/time parameters, you MUST follow theDateTime Formatting Rules specified in +- When you use information from conversations retrieved by tools, you MUST cite them Rules specified in . +- Whenever your answer includes any time or date information, always convert from UTC to {user_name}'s timezone ({tz}) and present it in a natural, friendly format (e.g., "3:45 PM on Tuesday, October 16th" or "last Monday at 2:30 PM") +- If you don't know something, say so honestly +- If suggesting follow-up questions, ONLY suggest meaningful, context-specific questions based on the current conversation - NEVER suggest generic questions like "if you want transcripts of more details" or "let me know if you need more information" +{"- Regard the " if plugin_info else ""} +- You MUST follow the Quality Control Rules specified in + + +{plugin_section} + +Remember: Use tools strategically to provide the best possible answers. Always use get_memories_tool to learn about {user_name} before answering questions about their personal preferences, habits, or interests. Your goal is to help {user_name} in the most personalized and helpful way possible. +""" + + return base_prompt.strip() + + def qa_rag( uid: str, question: str, @@ -376,7 +600,7 @@ def qa_rag( return llm_medium.invoke(prompt).content -def qa_rag_stream( +async def qa_rag_stream( uid: str, question: str, context: str, @@ -386,9 +610,115 @@ def qa_rag_stream( tz: Optional[str] = "UTC", callbacks=[], ) -> str: - prompt = _get_qa_rag_prompt(uid, question, context, plugin, cited, messages, tz) - # print('qa_rag prompt', prompt) - return llm_medium_stream.invoke(prompt, {'callbacks': callbacks}).content + print("qa_rag_stream START") + print(f"qa_rag_stream: received {len(messages)} messages") + + # Check if the last message has images + has_images = False + image_files = [] + if messages and len(messages) > 0: + last_message = messages[-1] + print(f"qa_rag_stream: last message ID={last_message.id}, sender={last_message.sender}") + print(f"qa_rag_stream: last message has 'files' attr={hasattr(last_message, 'files')}") + print(f"qa_rag_stream: last message has 'files_id' attr={hasattr(last_message, 'files_id')}") + + if hasattr(last_message, 'files_id'): + print(f"qa_rag_stream: files_id = {last_message.files_id}") + + if hasattr(last_message, 'files'): + print(f"qa_rag_stream: files = {last_message.files}") + print(f"qa_rag_stream: last message has {len(last_message.files)} files") + if last_message.files: + for idx, f in enumerate(last_message.files): + print(f"qa_rag_stream: file {idx}: name={f.name if f else 'None'}, is_image={f.is_image() if f else 'N/A'}, openai_file_id={f.openai_file_id if f else 'None'}") + image_files = [f for f in last_message.files if f and f.is_image()] + has_images = len(image_files) > 0 + print(f"qa_rag_stream: found {len(image_files)} image files") + else: + print("qa_rag_stream: NO 'files' attribute on last message") + + if has_images: + print(f"qa_rag_stream: Found {len(image_files)} images in message") + # Use vision API with images - use a simple, direct prompt for vision + content = [] + + # Simple vision prompt that explicitly asks AI to look at the image + vision_prompt = f"""Look at the image(s) provided and answer this question: {question} + +Be specific and detailed about what you see in the image.""" + + content.append({"type": "text", "text": vision_prompt}) + + # Read images from GCS or local storage and add to content + for img_file in image_files[:3]: # Limit to 3 images + try: + file_location = img_file.openai_file_id + + # Check if it's a local path or GCS URL + if file_location.startswith('http://') or file_location.startswith('https://'): + # GCS URL - fetch via HTTP + print(f"qa_rag_stream: Fetching image from GCS: {file_location}") + + # TODO: Add decryption support when encryption is implemented + # encrypted_data = fetch_from_gcs(file_location) + # image_data = decrypt_file(encrypted_data, key) + + import requests + response = requests.get(file_location, timeout=10) + response.raise_for_status() + image_data = response.content + print(f"qa_rag_stream: Downloaded {len(image_data)} bytes from GCS") + else: + # Local file path - read directly + print(f"qa_rag_stream: Reading image from local storage: {file_location}") + with open(file_location, 'rb') as f: + image_data = f.read() + print(f"qa_rag_stream: Read {len(image_data)} bytes from local file") + + base64_image = base64.b64encode(image_data).decode('utf-8') + print(f"qa_rag_stream: Base64 length: {len(base64_image)}, first 100 chars: {base64_image[:100]}") + image_url = f"data:{img_file.mime_type};base64,{base64_image}" + content.append({ + "type": "image_url", + "image_url": {"url": image_url} + }) + print(f"qa_rag_stream: Added image to content, URL length: {len(image_url)}") + except Exception as e: + print(f"qa_rag_stream: Failed to fetch image from {img_file.openai_file_id}: {e}") + import traceback + traceback.print_exc() + + # Use chat format with images + from langchain_core.messages import HumanMessage + message = HumanMessage(content=content) + print(f"qa_rag_stream: Calling OpenAI with {len(content)} content items") + response = await llm_medium_stream.ainvoke([message], {'callbacks': callbacks}) + print(f"qa_rag_stream: OpenAI responded with vision, response length: {len(response.content)}") + print(f"qa_rag_stream: Response preview: {response.content[:200]}") + return response.content + else: + # No images, use regular text prompt + prompt = _get_qa_rag_prompt(uid, question, context, plugin, cited, messages, tz) + print(f"qa_rag_stream: calling OpenAI with prompt length={len(prompt)}") + print("=" * 100) + print("FULL PROMPT:") + print(prompt) + print("=" * 100) + print(f"CONTEXT LENGTH: {len(context)}") + print(f"QUESTION: {question}") + print(f"UID: {uid}") + print("=" * 100) + try: + response = await llm_medium_stream.ainvoke(prompt, {'callbacks': callbacks}) + print("qa_rag_stream: OpenAI responded") + print(f"RESPONSE: {response.content}") + print("=" * 100) + return response.content + except Exception as e: + print(f"qa_rag_stream ERROR: {e}") + import traceback + traceback.print_exc() + raise # ************************************************** @@ -532,9 +862,16 @@ def extract_question_from_conversation(messages: List[Message]) -> str: If the contain a complete question, maintain the original version as accurately as possible. \ Avoid adding unnecessary words. + **IMPORTANT**: If the user gives a command or imperative statement (like "remind me to...", "add task to...", "create action item..."), \ + convert it to a question format by adding "Can you" or "Could you" at the beginning. \ + Examples: + - "remind me to buy milk tomorrow" -> "Can you remind me to buy milk tomorrow" + - "add task to finish report" -> "Can you add task to finish report" + - "create action item for meeting" -> "Can you create action item for meeting" + You MUST keep the original - Output a WH-question, that is, a question that starts with a WH-word, like "What", "When", "Where", "Who", "Why", "How". + Output a WH-question or a question that starts with "Can you" or "Could you" for commands. Example 1: diff --git a/backend/utils/other/chat_file.py b/backend/utils/other/chat_file.py index caa3598cfd..769361df32 100644 --- a/backend/utils/other/chat_file.py +++ b/backend/utils/other/chat_file.py @@ -1,6 +1,7 @@ import asyncio import mimetypes import re +import uuid from pathlib import Path from typing import List, Optional @@ -10,6 +11,7 @@ import database.chat as chat_db from models.chat import ChatSession, FileChat +from utils.other import storage class File: @@ -68,29 +70,65 @@ def __init__(self, uid: str, chat_session_id: str) -> None: @staticmethod def upload(file_path) -> dict: + import shutil result = {} file = File(file_path) file.get_mime_type() if file.is_image(): file.generate_thumbnail() - file.purpose = "vision" - - with open(file_path, 'rb') as f: - # upload file to OpenAI - response = openai.files.create(file=f, purpose=file.purpose) - if response: - file.file_id = response.id - file.file_name = response.filename - - result["file_name"] = response.filename - result["file_id"] = response.id - result["mime_type"] = file.mime_type - if file.is_image(): - result["thumbnail"] = file.thumbnail_path - result["thumbnail_name"] = file.thumbnail_name + # For images: upload to Google Cloud Storage for vision API + # TODO: Add encryption before uploading to GCS + # Example encryption placeholder: + # encrypted_file_path = encrypt_file(file_path) + # gcs_url = storage.upload_chat_image(encrypted_file_path, uid=None) + + gcs_url = FileChatTool._upload_image_to_gcs(file_path) + + result["file_name"] = Path(file_path).name + result["file_id"] = gcs_url # Store GCS URL as file_id + result["mime_type"] = file.mime_type + result["thumbnail"] = file.thumbnail_path + result["thumbnail_name"] = file.thumbnail_name + else: + # For non-images: upload to OpenAI Files API as before + with open(file_path, 'rb') as f: + response = openai.files.create(file=f, purpose=file.purpose) + if response: + file.file_id = response.id + file.file_name = response.filename + + result["file_name"] = response.filename + result["file_id"] = response.id + result["mime_type"] = file.mime_type return result + @staticmethod + def _upload_image_to_gcs(file_path: str) -> str: + """ + Upload image to Google Cloud Storage. + + TODO: Add encryption support + - Encrypt file before upload: encrypted_path = encrypt_file(file_path) + - Store encryption key securely (per-user key management) + - Decrypt on download: decrypted_data = decrypt_file(gcs_data, key) + + Args: + file_path: Local path to the image file + + Returns: + str: GCS URL of the uploaded image + """ + # Generate unique filename to avoid collisions + unique_filename = f"{uuid.uuid4()}_{Path(file_path).name}" + + # Upload to GCS using existing storage utility + # Using uid='chat_images' as a namespace since we don't have uid in static context + # The upload_multi_chat_files expects a list, so we'll use a simpler approach + gcs_url = storage.upload_chat_image(file_path, unique_filename) + + return gcs_url + def process_chat_with_file(self, question, file_ids: List[str]): """Process chat with file attachments""" self._ensure_thread_and_assistant() diff --git a/backend/utils/other/storage.py b/backend/utils/other/storage.py index 93cb3d631f..a001353144 100644 --- a/backend/utils/other/storage.py +++ b/backend/utils/other/storage.py @@ -8,6 +8,8 @@ from google.cloud.storage import transfer_manager from database.redis_db import cache_signed_url, get_cached_signed_url +from utils import encryption +from database import users as users_db if os.environ.get('SERVICE_ACCOUNT_JSON'): service_account_info = json.loads(os.environ["SERVICE_ACCOUNT_JSON"]) @@ -19,6 +21,7 @@ speech_profiles_bucket = os.getenv('BUCKET_SPEECH_PROFILES') postprocessing_audio_bucket = os.getenv('BUCKET_POSTPROCESSING') memories_recordings_bucket = os.getenv('BUCKET_MEMORIES_RECORDINGS') +private_cloud_sync_bucket = os.getenv('BUCKET_PRIVATE_CLOUD_SYNC', 'omi-private-cloud-sync') syncing_local_bucket = os.getenv('BUCKET_TEMPORAL_SYNC_LOCAL') omi_apps_bucket = os.getenv('BUCKET_PLUGINS_LOGOS') app_thumbnails_bucket = os.getenv('BUCKET_APP_THUMBNAILS') @@ -231,6 +234,160 @@ def delete_syncing_temporal_file(file_path: str): blob.delete() +# ************************************************ +# *********** PRIVATE CLOUD SYNC ***************** +# ************************************************ + + +def upload_audio_chunk(chunk_data: bytes, uid: str, conversation_id: str, timestamp: float) -> str: + """ + Upload an audio chunk to Google Cloud Storage with optional encryption. + + Args: + chunk_data: Raw audio bytes (PCM16) + uid: User ID + conversation_id: Conversation ID + timestamp: Unix timestamp when chunk was recorded + + Returns: + GCS path of the uploaded chunk + """ + bucket = storage_client.bucket(private_cloud_sync_bucket) + protection_level = users_db.get_data_protection_level(uid) + + # Format timestamp to 3 decimal places for cleaner filenames + formatted_timestamp = f'{timestamp:.3f}' + + if protection_level == 'enhanced': + # Encrypt as length-prefixed binary + encrypted_chunk = encryption.encrypt_audio_chunk(chunk_data, uid) + path = f'chunks/{uid}/{conversation_id}/{formatted_timestamp}.enc' + blob = bucket.blob(path) + blob.upload_from_string(encrypted_chunk, content_type='application/octet-stream') + else: + # Standard - no encryption + path = f'chunks/{uid}/{conversation_id}/{formatted_timestamp}.bin' + blob = bucket.blob(path) + blob.upload_from_string(chunk_data, content_type='application/octet-stream') + + return path + + +def delete_audio_chunks(uid: str, conversation_id: str, timestamps: List[float]) -> None: + """Delete audio chunks after they've been merged.""" + bucket = storage_client.bucket(private_cloud_sync_bucket) + for timestamp in timestamps: + # Format timestamp to match upload format (3 decimal places) + formatted_timestamp = f'{timestamp:.3f}' + # Try both encrypted and unencrypted paths + for extension in ['.enc', '.bin']: + chunk_path = f'chunks/{uid}/{conversation_id}/{formatted_timestamp}{extension}' + blob = bucket.blob(chunk_path) + if blob.exists(): + blob.delete() + + +def list_audio_chunks(uid: str, conversation_id: str) -> List[dict]: + """ + List all audio chunks for a conversation. + + Returns: + List of dicts with chunk info: {'timestamp': float, 'path': str, 'size': int} + """ + bucket = storage_client.bucket(private_cloud_sync_bucket) + prefix = f'chunks/{uid}/{conversation_id}/' + blobs = bucket.list_blobs(prefix=prefix) + + chunks = [] + for blob in blobs: + # Extract timestamp from filename (e.g., '1234567890.123.bin' or '1234567890.123.enc') + filename = blob.name.split('/')[-1] + if filename.endswith('.bin') or filename.endswith('.enc'): + try: + # Remove extension (.bin or .enc) + timestamp_str = filename.rsplit('.', 1)[0] + timestamp = float(timestamp_str) + chunks.append( + { + 'timestamp': timestamp, + 'path': blob.name, + 'size': blob.size, + } + ) + except ValueError: + continue + + return sorted(chunks, key=lambda x: x['timestamp']) + + +def delete_conversation_audio_files(uid: str, conversation_id: str) -> None: + """Delete all audio files (chunks and merged) for a conversation.""" + bucket = storage_client.bucket(private_cloud_sync_bucket) + + # Delete chunks + chunks_prefix = f'chunks/{uid}/{conversation_id}/' + for blob in bucket.list_blobs(prefix=chunks_prefix): + blob.delete() + + # Delete merged files + audio_prefix = f'audio/{uid}/{conversation_id}/' + for blob in bucket.list_blobs(prefix=audio_prefix): + blob.delete() + + +def download_audio_chunks_and_merge(uid: str, conversation_id: str, timestamps: List[float]) -> bytes: + """ + Download and merge audio chunks on-demand, handling mixed encryption states. + Normalizes all chunks to unencrypted PCM format for consistent merging. + + Args: + uid: User ID + conversation_id: Conversation ID + timestamps: List of chunk timestamps to merge + + Returns: + Merged audio bytes (PCM16) + """ + bucket = storage_client.bucket(private_cloud_sync_bucket) + merged_data = bytearray() + + for timestamp in timestamps: + # Format timestamp to match upload format (3 decimal places) + formatted_timestamp = f'{timestamp:.3f}' + # Try encrypted path first + chunk_path_enc = f'chunks/{uid}/{conversation_id}/{formatted_timestamp}.enc' + chunk_path_bin = f'chunks/{uid}/{conversation_id}/{formatted_timestamp}.bin' + + chunk_blob_enc = bucket.blob(chunk_path_enc) + chunk_blob_bin = bucket.blob(chunk_path_bin) + + chunk_data = None + is_encrypted = False + + if chunk_blob_enc.exists(): + chunk_data = chunk_blob_enc.download_as_bytes() + is_encrypted = True + elif chunk_blob_bin.exists(): + chunk_data = chunk_blob_bin.download_as_bytes() + is_encrypted = False + else: + print(f"Warning: Chunk not found for timestamp {formatted_timestamp}") + continue + + # Normalize to PCM (decrypt if needed) + if is_encrypted: + pcm_data = encryption.decrypt_audio_file(chunk_data, uid) + else: + pcm_data = chunk_data + + merged_data.extend(pcm_data) + + if not merged_data: + raise FileNotFoundError(f"No chunks found for conversation {conversation_id}") + + return bytes(merged_data) + + # ********************************** # ************* UTILS ************** # ********************************** @@ -302,3 +459,45 @@ def upload_multi_chat_files(files_name: List[str], uid: str) -> dict: else: dictFiles[name] = f'https://storage.googleapis.com/{chat_files_bucket}/{uid}/{name}' return dictFiles + + +def upload_chat_image(file_path: str, unique_filename: str) -> str: + """ + Upload a single image file to Google Cloud Storage for vision API usage. + + Falls back to local storage if BUCKET_CHAT_FILES is not configured (for local dev). + + TODO: Add encryption support before uploading + + Args: + file_path: Local path to the image file + unique_filename: Unique filename to use in GCS + + Returns: + str: Public URL of the uploaded image in GCS, or local file path for dev + """ + # LOCAL DEVELOPMENT FALLBACK: If no bucket configured, store locally + if not chat_files_bucket: + import shutil + from pathlib import Path + + print("⚠️ BUCKET_CHAT_FILES not set - using local storage (dev mode)") + + # Create local storage directory + local_storage_dir = Path("_chat_files/chat_images") + local_storage_dir.mkdir(parents=True, exist_ok=True) + + # Copy file to local storage + local_path = local_storage_dir / unique_filename + shutil.copy2(file_path, local_path) + + # Return absolute path (will be used to read the file later) + return str(local_path.absolute()) + + # PRODUCTION: Upload to GCS + bucket = storage_client.bucket(chat_files_bucket) + path = f'chat_images/{unique_filename}' + blob = bucket.blob(path) + blob.cache_control = 'private, max-age=0' # Don't cache as these are temporary + blob.upload_from_filename(file_path) + return f'https://storage.googleapis.com/{chat_files_bucket}/{path}' diff --git a/backend/utils/retrieval/graph.py b/backend/utils/retrieval/graph.py index 08d01d5881..c8bebb35d7 100644 --- a/backend/utils/retrieval/graph.py +++ b/backend/utils/retrieval/graph.py @@ -40,8 +40,10 @@ from utils.other.chat_file import FileChatTool from utils.other.endpoints import timeit from utils.app_integrations import get_github_docs_content +from utils.retrieval.agentic import execute_agentic_chat_stream model = ChatOpenAI(model="gpt-4o-mini") +llm_medium = ChatOpenAI(model='gpt-4o') llm_medium_stream = ChatOpenAI(model='gpt-4o', streaming=True) @@ -128,16 +130,32 @@ def determine_conversation_type( state: GraphState, ) -> Literal[ "no_context_conversation", - "context_dependent_conversation", - "omi_question", + "agentic_context_dependent_conversation", + # "omi_question", "file_chat_question", "persona_question", + "vision_question", ]: # chat with files by attachments on the last message print("determine_conversation_type") messages = state.get("messages", []) + has_images = False if len(messages) > 0 and len(messages[-1].files_id) > 0: - return "file_chat_question" + # Check if files are images (vision) or documents (assistants API) + last_message = messages[-1] + has_non_image_files = False + if hasattr(last_message, 'files') and last_message.files: + has_non_image_files = any(not f.is_image() for f in last_message.files) + has_images = any(f.is_image() for f in last_message.files) + + # Route images to dedicated vision node + if has_images: + print("determine_conversation_type: has_images=True, routing to vision_question") + return "vision_question" + + # Only use file_chat_question for non-image files (PDFs, docs, etc) + if has_non_image_files: + return "file_chat_question" # persona app: App = state.get("plugin_selected") @@ -161,13 +179,13 @@ def determine_conversation_type( if is_file_question: return "file_chat_question" - is_omi_question = retrieve_is_an_omi_question(question) - if is_omi_question: - return "omi_question" + # is_omi_question = retrieve_is_an_omi_question(question) + # if is_omi_question: + # return "omi_question" requires = requires_context(question) if requires: - return "context_dependent_conversation" + return "agentic_context_dependent_conversation" return "no_context_conversation" @@ -228,27 +246,74 @@ def persona_question(state: GraphState): return {'answer': "Oops", 'ask_for_nps': True} -def context_dependent_conversation_v1(state: GraphState): - question = extract_question_from_conversation(state.get("messages", [])) - print("context_dependent_conversation parsed question:", question) - return {"parsed_question": question} - - def context_dependent_conversation(state: GraphState): return state +def agentic_context_dependent_conversation(state: GraphState): + """Handle context-dependent conversations using the agentic system""" + print("agentic_context_dependent_conversation node") + + uid = state.get("uid") + messages = state.get("messages", []) + app = state.get("plugin_selected") + + # streaming + streaming = state.get("streaming") + if streaming: + callback_data = {} + + async def run_agentic_stream(): + async for chunk in execute_agentic_chat_stream( + uid, + messages, + app, + callback_data=callback_data, + chat_session=state.get("chat_session"), + ): + if chunk: + # Forward streaming chunks through callback + if chunk.startswith("data: "): + state.get('callback').put_data_nowait(chunk.replace("data: ", "")) + elif chunk.startswith("think: "): + state.get('callback').put_thought_nowait(chunk.replace("think: ", "")) + + # Run the async streaming + asyncio.run(run_agentic_stream()) + + # Signal completion to the callback + state.get('callback').end_nowait() + + # Extract results from callback_data + answer = callback_data.get('answer', '') + memories_found = callback_data.get('memories_found', []) + ask_for_nps = callback_data.get('ask_for_nps', False) + + return {"answer": answer, "memories_found": memories_found, "ask_for_nps": ask_for_nps} + + # no streaming - not yet implemented + return {"answer": "Streaming required for agentic mode", "ask_for_nps": False} + + # !! include a question extractor? node? def retrieve_topics_filters(state: GraphState): print("retrieve_topics_filters") - filters = { - "people": get_filter_category_items(state.get("uid"), "people", limit=1000), - "topics": get_filter_category_items(state.get("uid"), "topics", limit=1000), - "entities": get_filter_category_items(state.get("uid"), "entities", limit=1000), - # 'dates': get_filter_category_items(state.get('uid'), 'dates'), - } + + # Try to get filters from Redis, fallback to empty if Redis is down (local dev) + try: + filters = { + "people": get_filter_category_items(state.get("uid"), "people", limit=1000), + "topics": get_filter_category_items(state.get("uid"), "topics", limit=1000), + "entities": get_filter_category_items(state.get("uid"), "entities", limit=1000), + # 'dates': get_filter_category_items(state.get('uid'), 'dates'), + } + except Exception as e: + print(f"⚠️ Could not get filter items from Redis: {e}") + print("⚠️ Using empty filters (local dev mode)") + filters = {"people": [], "topics": [], "entities": []} + result = select_structured_filters(state.get("parsed_question", ""), filters) filters = { "topics": result.get("topics", []), @@ -318,7 +383,8 @@ def query_vectors(state: GraphState): return {"memories_found": memories} -def qa_handler(state: GraphState): +async def qa_handler(state: GraphState): + print("qa_handler START") uid = state.get("uid") memories = state.get("memories_found", []) @@ -336,18 +402,25 @@ def qa_handler(state: GraphState): # streaming streaming = state.get("streaming") if streaming: - # state['callback'].put_thought_nowait("Reasoning") - response: str = qa_rag_stream( - uid, - state.get("parsed_question"), - Conversation.conversations_to_string(memories, False, people=people), - state.get("plugin_selected"), - cited=state.get("cited"), - messages=state.get("messages"), - tz=state.get("tz"), - callbacks=[state.get('callback')], - ) - return {"answer": response, "ask_for_nps": True} + print("qa_handler: calling qa_rag_stream") + try: + response: str = await qa_rag_stream( + uid, + state.get("parsed_question"), + Conversation.conversations_to_string(memories, False, people=people), + state.get("plugin_selected"), + cited=state.get("cited"), + messages=state.get("messages"), + tz=state.get("tz"), + callbacks=[state.get('callback')], + ) + print("qa_handler: qa_rag_stream completed") + return {"answer": response, "ask_for_nps": True} + except Exception as e: + print(f"qa_handler ERROR: {e}") + import traceback + traceback.print_exc() + return {"answer": f"Error: {str(e)}", "ask_for_nps": False} # no streaming response: str = qa_rag( @@ -414,6 +487,107 @@ def file_chat_question(state: GraphState): raise +async def vision_question(state: GraphState): + """ + Dedicated node for handling image-based questions using Vision API. + Routes directly to Vision API without going through RAG pipeline. + """ + print("vision_question node") + + uid = state.get("uid", "") + question = state.get("parsed_question", "") + messages = state.get("messages", []) + + # Get the last message which should contain the images + if not messages or len(messages) == 0: + return {"answer": "No messages found", "ask_for_nps": False} + + last_message = messages[-1] + + # Extract image files + image_files = [] + if hasattr(last_message, 'files') and last_message.files: + image_files = [f for f in last_message.files if f and f.is_image()] + + if len(image_files) == 0: + print("vision_question: No image files found") + return {"answer": "No images found in your message", "ask_for_nps": False} + + print(f"vision_question: Processing {len(image_files)} images") + + # Build vision API request + import base64 + content = [] + + # Add the user's question + vision_prompt = f"""Look at the image(s) provided and answer this question: {question} + +Be specific and detailed about what you see in the image.""" + + content.append({"type": "text", "text": vision_prompt}) + + # Fetch and encode images (from GCS or local storage) + for img_file in image_files[:3]: # Limit to 3 images + try: + file_location = img_file.openai_file_id + + # Check if it's a local path or GCS URL + if file_location.startswith('http://') or file_location.startswith('https://'): + # GCS URL - fetch via HTTP + print(f"vision_question: Fetching image from GCS: {file_location}") + + # TODO: Add decryption support when encryption is implemented + # encrypted_data = fetch_from_gcs(file_location) + # image_data = decrypt_file(encrypted_data, key) + + import requests + response = requests.get(file_location, timeout=10) + response.raise_for_status() + image_data = response.content + print(f"vision_question: Downloaded {len(image_data)} bytes from GCS") + else: + # Local file path - read directly + print(f"vision_question: Reading image from local storage: {file_location}") + with open(file_location, 'rb') as f: + image_data = f.read() + print(f"vision_question: Read {len(image_data)} bytes from local file") + + base64_image = base64.b64encode(image_data).decode('utf-8') + image_url = f"data:{img_file.mime_type};base64,{base64_image}" + content.append({ + "type": "image_url", + "image_url": {"url": image_url} + }) + print(f"vision_question: Added image to content") + except Exception as e: + print(f"vision_question: Failed to fetch image {img_file.openai_file_id}: {e}") + import traceback + traceback.print_exc() + + # Call Vision API + streaming = state.get("streaming") + try: + from langchain_core.messages import HumanMessage + message = HumanMessage(content=content) + + if streaming: + print("vision_question: Calling Vision API with streaming") + response = await llm_medium_stream.ainvoke([message], {'callbacks': [state.get('callback')]}) + else: + print("vision_question: Calling Vision API without streaming") + response = llm_medium.invoke([message]) + + answer = response.content + print(f"vision_question: Got response, length: {len(answer)}") + return {"answer": answer, "ask_for_nps": True} + + except Exception as e: + print(f"vision_question ERROR: {e}") + import traceback + traceback.print_exc() + return {"answer": f"Error processing image: {str(e)}", "ask_for_nps": False} + + workflow = StateGraph(GraphState) workflow.add_edge(START, "determine_conversation") @@ -423,17 +597,20 @@ def file_chat_question(state: GraphState): workflow.add_conditional_edges("determine_conversation", determine_conversation_type) workflow.add_node("no_context_conversation", no_context_conversation) -workflow.add_node("omi_question", omi_question) -workflow.add_node("context_dependent_conversation", context_dependent_conversation) +# workflow.add_node("omi_question", omi_question) +# workflow.add_node("context_dependent_conversation", context_dependent_conversation) +workflow.add_node("agentic_context_dependent_conversation", agentic_context_dependent_conversation) workflow.add_node("file_chat_question", file_chat_question) workflow.add_node("persona_question", persona_question) +workflow.add_node("vision_question", vision_question) workflow.add_edge("no_context_conversation", END) -workflow.add_edge("omi_question", END) +# workflow.add_edge("omi_question", END) workflow.add_edge("persona_question", END) workflow.add_edge("file_chat_question", END) -workflow.add_edge("context_dependent_conversation", "retrieve_topics_filters") -workflow.add_edge("context_dependent_conversation", "retrieve_date_filters") +workflow.add_edge("vision_question", END) +workflow.add_edge("agentic_context_dependent_conversation", "retrieve_topics_filters") +workflow.add_edge("agentic_context_dependent_conversation", "retrieve_date_filters") workflow.add_node("retrieve_topics_filters", retrieve_topics_filters) workflow.add_node("retrieve_date_filters", retrieve_date_filters)