Skip to content

Conversation

@arsenkylyshbek
Copy link
Collaborator

Overview

This PR fixes the backend to support image-based chat in a multi-instance deployment. The main issues were that images were stored locally (doesn't work when you have multiple servers) and the graph routing wasn't handling images correctly.

What Changed

Image Storage

Changed image storage from local filesystem to Google Cloud Storage.

Files:

  • backend/utils/other/chat_file.py - Now uploads images to GCS
  • backend/utils/other/storage.py - Added upload_chat_image() function

Why: When you save an image to one server's filesystem, other servers can't access it. GCS solves this by providing centralized storage.

For local dev: Falls back to local storage if BUCKET_CHAT_FILES isn't set, so you don't need GCS to test locally.

Graph Routing

Added a dedicated vision_question node in the LangGraph workflow for handling images.

Files:

  • backend/utils/retrieval/graph.py - Added new node and routing logic

How it works:

  • When a message has images, it goes directly to the vision_question node
  • This node calls GPT-4o Vision API and returns the response
  • Messages without images use the existing RAG pipeline as before

Why: Image queries need different handling than text queries. They don't need the RAG pipeline, just direct Vision API access.

Backward Compatibility

Made sure existing features still work and added fallbacks for local development.

Files:

  • backend/routers/chat.py - Can handle file uploads without existing chat session, added Redis fallback
  • backend/utils/llm/chat.py - Can fetch images from GCS or local paths

Routing Flow

Message comes in
  → Has images? 
      → Yes: vision_question node → GPT-4o Vision → Response
      → No: Existing RAG pipeline → Response

Environment Variables

New (optional):

BUCKET_CHAT_FILES=your-gcs-bucket-name

If not set, uses local storage at _chat_files/chat_images/

Deployment Notes

  • Production needs BUCKET_CHAT_FILES set
  • No breaking changes
  • Existing chat functionality unchanged
  • Can test locally without GCS or Redis setup

Future Work

  • Add encryption for images before uploading to GCS
  • Add image cleanup/expiration policy
  • Add metrics for vision queries

Addresses feedback from PR #3198

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant backend changes to support image-based chat by integrating Google Cloud Storage for image hosting and adding a dedicated vision processing pipeline in the LangGraph workflow. The changes are well-structured, with fallbacks for local development. My review focuses on improving code quality and fixing a few critical issues. I've identified some local imports that should be moved, a critical issue with asyncio.run being used in an async context, incorrect graph wiring, and some duplicated/dead code. Addressing these points will enhance the robustness and maintainability of the new features.

Comment on lines +253 to +296
def agentic_context_dependent_conversation(state: GraphState):
"""Handle context-dependent conversations using the agentic system"""
print("agentic_context_dependent_conversation node")

uid = state.get("uid")
messages = state.get("messages", [])
app = state.get("plugin_selected")

# streaming
streaming = state.get("streaming")
if streaming:
callback_data = {}

async def run_agentic_stream():
async for chunk in execute_agentic_chat_stream(
uid,
messages,
app,
callback_data=callback_data,
chat_session=state.get("chat_session"),
):
if chunk:
# Forward streaming chunks through callback
if chunk.startswith("data: "):
state.get('callback').put_data_nowait(chunk.replace("data: ", ""))
elif chunk.startswith("think: "):
state.get('callback').put_thought_nowait(chunk.replace("think: ", ""))

# Run the async streaming
asyncio.run(run_agentic_stream())

# Signal completion to the callback
state.get('callback').end_nowait()

# Extract results from callback_data
answer = callback_data.get('answer', '')
memories_found = callback_data.get('memories_found', [])
ask_for_nps = callback_data.get('ask_for_nps', False)

return {"answer": answer, "memories_found": memories_found, "ask_for_nps": ask_for_nps}

# no streaming - not yet implemented
return {"answer": "Streaming required for agentic mode", "ask_for_nps": False}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The use of asyncio.run() inside agentic_context_dependent_conversation is problematic. Since the graph is invoked with ainvoke, it's already running within an asyncio event loop. Calling asyncio.run() from a running loop will raise a RuntimeError. The function should be defined as async def and use await to handle the async stream, similar to how qa_handler is implemented.

async def agentic_context_dependent_conversation(state: GraphState):
    """Handle context-dependent conversations using the agentic system"""
    print("agentic_context_dependent_conversation node")

    uid = state.get("uid")
    messages = state.get("messages", [])
    app = state.get("plugin_selected")

    # streaming
    streaming = state.get("streaming")
    if streaming:
        callback_data = {}

        async for chunk in execute_agentic_chat_stream(
            uid,
            messages,
            app,
            callback_data=callback_data,
            chat_session=state.get("chat_session"),
        ):
            if chunk:
                # Forward streaming chunks through callback
                if chunk.startswith("data: "):
                    state.get('callback').put_data_nowait(chunk.replace("data: ", ""))
                elif chunk.startswith("think: "):
                    state.get('callback').put_thought_nowait(chunk.replace("think: ", ""))

        # Signal completion to the callback
        state.get('callback').end_nowait()

        # Extract results from callback_data
        answer = callback_data.get('answer', '')
        memories_found = callback_data.get('memories_found', [])
        ask_for_nps = callback_data.get('ask_for_nps', False)

        return {"answer": answer, "memories_found": memories_found, "ask_for_nps": ask_for_nps}

    # no streaming - not yet implemented
    return {"answer": "Streaming required for agentic mode", "ask_for_nps": False}

Comment on lines +612 to +613
workflow.add_edge("agentic_context_dependent_conversation", "retrieve_topics_filters")
workflow.add_edge("agentic_context_dependent_conversation", "retrieve_date_filters")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The agentic_context_dependent_conversation node appears to be a self-contained pipeline that produces a final answer. However, it's currently wired to retrieve_topics_filters and retrieve_date_filters, which are part of a different RAG pipeline. This will cause redundant processing. The agentic_context_dependent_conversation node should likely be connected directly to the END node.

workflow.add_edge("agentic_context_dependent_conversation", END)

Comment on lines +616 to +721
# Check if the last message has images
has_images = False
image_files = []
if messages and len(messages) > 0:
last_message = messages[-1]
print(f"qa_rag_stream: last message ID={last_message.id}, sender={last_message.sender}")
print(f"qa_rag_stream: last message has 'files' attr={hasattr(last_message, 'files')}")
print(f"qa_rag_stream: last message has 'files_id' attr={hasattr(last_message, 'files_id')}")

if hasattr(last_message, 'files_id'):
print(f"qa_rag_stream: files_id = {last_message.files_id}")

if hasattr(last_message, 'files'):
print(f"qa_rag_stream: files = {last_message.files}")
print(f"qa_rag_stream: last message has {len(last_message.files)} files")
if last_message.files:
for idx, f in enumerate(last_message.files):
print(f"qa_rag_stream: file {idx}: name={f.name if f else 'None'}, is_image={f.is_image() if f else 'N/A'}, openai_file_id={f.openai_file_id if f else 'None'}")
image_files = [f for f in last_message.files if f and f.is_image()]
has_images = len(image_files) > 0
print(f"qa_rag_stream: found {len(image_files)} image files")
else:
print("qa_rag_stream: NO 'files' attribute on last message")

if has_images:
print(f"qa_rag_stream: Found {len(image_files)} images in message")
# Use vision API with images - use a simple, direct prompt for vision
content = []

# Simple vision prompt that explicitly asks AI to look at the image
vision_prompt = f"""Look at the image(s) provided and answer this question: {question}
Be specific and detailed about what you see in the image."""

content.append({"type": "text", "text": vision_prompt})

# Read images from GCS or local storage and add to content
for img_file in image_files[:3]: # Limit to 3 images
try:
file_location = img_file.openai_file_id

# Check if it's a local path or GCS URL
if file_location.startswith('http://') or file_location.startswith('https://'):
# GCS URL - fetch via HTTP
print(f"qa_rag_stream: Fetching image from GCS: {file_location}")

# TODO: Add decryption support when encryption is implemented
# encrypted_data = fetch_from_gcs(file_location)
# image_data = decrypt_file(encrypted_data, key)

import requests
response = requests.get(file_location, timeout=10)
response.raise_for_status()
image_data = response.content
print(f"qa_rag_stream: Downloaded {len(image_data)} bytes from GCS")
else:
# Local file path - read directly
print(f"qa_rag_stream: Reading image from local storage: {file_location}")
with open(file_location, 'rb') as f:
image_data = f.read()
print(f"qa_rag_stream: Read {len(image_data)} bytes from local file")

base64_image = base64.b64encode(image_data).decode('utf-8')
print(f"qa_rag_stream: Base64 length: {len(base64_image)}, first 100 chars: {base64_image[:100]}")
image_url = f"data:{img_file.mime_type};base64,{base64_image}"
content.append({
"type": "image_url",
"image_url": {"url": image_url}
})
print(f"qa_rag_stream: Added image to content, URL length: {len(image_url)}")
except Exception as e:
print(f"qa_rag_stream: Failed to fetch image from {img_file.openai_file_id}: {e}")
import traceback
traceback.print_exc()

# Use chat format with images
from langchain_core.messages import HumanMessage
message = HumanMessage(content=content)
print(f"qa_rag_stream: Calling OpenAI with {len(content)} content items")
response = await llm_medium_stream.ainvoke([message], {'callbacks': callbacks})
print(f"qa_rag_stream: OpenAI responded with vision, response length: {len(response.content)}")
print(f"qa_rag_stream: Response preview: {response.content[:200]}")
return response.content
else:
# No images, use regular text prompt
prompt = _get_qa_rag_prompt(uid, question, context, plugin, cited, messages, tz)
print(f"qa_rag_stream: calling OpenAI with prompt length={len(prompt)}")
print("=" * 100)
print("FULL PROMPT:")
print(prompt)
print("=" * 100)
print(f"CONTEXT LENGTH: {len(context)}")
print(f"QUESTION: {question}")
print(f"UID: {uid}")
print("=" * 100)
try:
response = await llm_medium_stream.ainvoke(prompt, {'callbacks': callbacks})
print("qa_rag_stream: OpenAI responded")
print(f"RESPONSE: {response.content}")
print("=" * 100)
return response.content
except Exception as e:
print(f"qa_rag_stream ERROR: {e}")
import traceback
traceback.print_exc()
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This block for handling images appears to be dead code. The routing logic in backend/utils/retrieval/graph.py's determine_conversation_type function now directs all messages with images to the vision_question node. This means qa_rag_stream should no longer receive messages with images. This duplicated logic should be removed to improve maintainability and avoid confusion. The logic is already correctly implemented in the vision_question node.

    # This function is not expected to handle images due to graph routing.
    # The `vision_question` node handles image-based queries.
    prompt = _get_qa_rag_prompt(uid, question, context, plugin, cited, messages, tz)
    print(f"qa_rag_stream: calling OpenAI with prompt length={len(prompt)}")
    print("=" * 100)
    print("FULL PROMPT:")
    print(prompt)
    print("=" * 100)
    print(f"CONTEXT LENGTH: {len(context)}")
    print(f"QUESTION: {question}")
    print(f"UID: {uid}")
    print("=" * 100)
    try:
        response = await llm_medium_stream.ainvoke(prompt, {'callbacks': callbacks})
        print("qa_rag_stream: OpenAI responded")
        print(f"RESPONSE: {response.content}")
        print("=" * 100)
        return response.content
    except Exception as e:
        print(f"qa_rag_stream ERROR: {e}")
        import traceback
        traceback.print_exc()
        raise

Comment on lines +666 to +719
import requests
response = requests.get(file_location, timeout=10)
response.raise_for_status()
image_data = response.content
print(f"qa_rag_stream: Downloaded {len(image_data)} bytes from GCS")
else:
# Local file path - read directly
print(f"qa_rag_stream: Reading image from local storage: {file_location}")
with open(file_location, 'rb') as f:
image_data = f.read()
print(f"qa_rag_stream: Read {len(image_data)} bytes from local file")

base64_image = base64.b64encode(image_data).decode('utf-8')
print(f"qa_rag_stream: Base64 length: {len(base64_image)}, first 100 chars: {base64_image[:100]}")
image_url = f"data:{img_file.mime_type};base64,{base64_image}"
content.append({
"type": "image_url",
"image_url": {"url": image_url}
})
print(f"qa_rag_stream: Added image to content, URL length: {len(image_url)}")
except Exception as e:
print(f"qa_rag_stream: Failed to fetch image from {img_file.openai_file_id}: {e}")
import traceback
traceback.print_exc()

# Use chat format with images
from langchain_core.messages import HumanMessage
message = HumanMessage(content=content)
print(f"qa_rag_stream: Calling OpenAI with {len(content)} content items")
response = await llm_medium_stream.ainvoke([message], {'callbacks': callbacks})
print(f"qa_rag_stream: OpenAI responded with vision, response length: {len(response.content)}")
print(f"qa_rag_stream: Response preview: {response.content[:200]}")
return response.content
else:
# No images, use regular text prompt
prompt = _get_qa_rag_prompt(uid, question, context, plugin, cited, messages, tz)
print(f"qa_rag_stream: calling OpenAI with prompt length={len(prompt)}")
print("=" * 100)
print("FULL PROMPT:")
print(prompt)
print("=" * 100)
print(f"CONTEXT LENGTH: {len(context)}")
print(f"QUESTION: {question}")
print(f"UID: {uid}")
print("=" * 100)
try:
response = await llm_medium_stream.ainvoke(prompt, {'callbacks': callbacks})
print("qa_rag_stream: OpenAI responded")
print(f"RESPONSE: {response.content}")
print("=" * 100)
return response.content
except Exception as e:
print(f"qa_rag_stream ERROR: {e}")
import traceback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are several local imports within the qa_rag_stream function (requests, traceback, HumanMessage). Imports should be at the top of the file to improve readability, avoid repeated import overhead, and prevent potential circular dependency issues. Please move all imports to the top of the module.


@staticmethod
def upload(file_path) -> dict:
import shutil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

import shutil is inside the upload method. Imports should be at the top of the file for better code organization and to avoid re-importing. Please move it to the top of the module.

Comment on lines +481 to +482
import shutil
from pathlib import Path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Imports (shutil, pathlib.Path) should be at the top of the file, not inside a function. This improves code readability and performance. Please move these imports to the top of the module.

Comment on lines +519 to +586
import base64
content = []

# Add the user's question
vision_prompt = f"""Look at the image(s) provided and answer this question: {question}
Be specific and detailed about what you see in the image."""

content.append({"type": "text", "text": vision_prompt})

# Fetch and encode images (from GCS or local storage)
for img_file in image_files[:3]: # Limit to 3 images
try:
file_location = img_file.openai_file_id

# Check if it's a local path or GCS URL
if file_location.startswith('http://') or file_location.startswith('https://'):
# GCS URL - fetch via HTTP
print(f"vision_question: Fetching image from GCS: {file_location}")

# TODO: Add decryption support when encryption is implemented
# encrypted_data = fetch_from_gcs(file_location)
# image_data = decrypt_file(encrypted_data, key)

import requests
response = requests.get(file_location, timeout=10)
response.raise_for_status()
image_data = response.content
print(f"vision_question: Downloaded {len(image_data)} bytes from GCS")
else:
# Local file path - read directly
print(f"vision_question: Reading image from local storage: {file_location}")
with open(file_location, 'rb') as f:
image_data = f.read()
print(f"vision_question: Read {len(image_data)} bytes from local file")

base64_image = base64.b64encode(image_data).decode('utf-8')
image_url = f"data:{img_file.mime_type};base64,{base64_image}"
content.append({
"type": "image_url",
"image_url": {"url": image_url}
})
print(f"vision_question: Added image to content")
except Exception as e:
print(f"vision_question: Failed to fetch image {img_file.openai_file_id}: {e}")
import traceback
traceback.print_exc()

# Call Vision API
streaming = state.get("streaming")
try:
from langchain_core.messages import HumanMessage
message = HumanMessage(content=content)

if streaming:
print("vision_question: Calling Vision API with streaming")
response = await llm_medium_stream.ainvoke([message], {'callbacks': [state.get('callback')]})
else:
print("vision_question: Calling Vision API without streaming")
response = llm_medium.invoke([message])

answer = response.content
print(f"vision_question: Got response, length: {len(answer)}")
return {"answer": answer, "ask_for_nps": True}

except Exception as e:
print(f"vision_question ERROR: {e}")
import traceback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are several local imports within the vision_question function (base64, requests, traceback, HumanMessage). Imports should be at the top of the file to improve readability, avoid repeated import overhead, and prevent potential circular dependency issues. Please move all imports to the top of the module.

@beastoin
Copy link
Collaborator

hey man, please rebase with main, and currently we have migrated 80% of our chat to agentic (tool calls). pls try to make chat with file more agentic.

code: backend/utils/retrieval/agentic.py, backend/retrieval/tools

@arsenkylyshbek

@beastoin beastoin closed this Nov 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants