Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,40 @@ Devr.AI is built on a **LangGraph agent-based architecture** with autonomous, re
- Context-aware response personalization
- Platform-specific formatting and delivery optimization

### 5. Human-in-the-Loop (HIL) Interactive Workflow

- **Interactive User Input at Critical Decision Points**
Devr.AI now pauses at key steps in the support or troubleshooting process to request specific user input (e.g., asking for the project or repository name before proceeding). This ensures the agent always works with the most relevant and accurate context.

- **Interrupt-Driven Context Gathering**
If crucial information is missing or a decision is ambiguous, the agent interrupts its automated reasoning and prompts the user for clarification via Discord message. The conversation halts and resumes only after receiving the required input.

- **Session State Persistence**
All interruptions and user responses are stored, allowing the agent to pick up exactly where it left off after human feedback is integrated.

- **Seamless Multi-Turn Conversation Flow**
User input is woven naturally into the ongoing discussion, keeping troubleshooting and workflows coordinated and responsive.

- **Improved Accuracy & Automation Safety**
By involving humans in gray-area scenarios, HIL minimizes automation errors, fosters trust, and improves outcome quality for technical support interactions.

- **Text-Based Interactions (Discord)**
The current implementation uses standard Discord message replies for user responses instead of buttons, ensuring stability and broad compatibility.

**Example Workflow:**

1. User asks a technical question in Discord.
2. Agent pauses to ask:
“Are you working in a specific project or repository? Please specify.”
3. User replies, providing the needed information.
4. Agent resumes troubleshooting with updated context, repeating pauses for further input as needed.

**Integration Points:**

- Agent workflow layer (ReAct Supervisor and AgentCoordinator)
- Discord bot message handling
- Session and conversation persistence (Weaviate, Supabase)

## Current Integrations

### Discord Integration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from langchain_google_genai import ChatGoogleGenerativeAI

from app.core.config import settings
from app.database.weaviate.operations import search_contributors
# from app.database.weaviate.operations import search_contributors
from app.services.github.issue_processor import GitHubIssueProcessor
from app.services.embedding_service.service import EmbeddingService
from ..prompts.contributor_recommendation.query_alignment import QUERY_ALIGNMENT_PROMPT
Expand Down Expand Up @@ -97,6 +97,8 @@ async def handle_contributor_recommendation(query: str) -> Dict[str, Any]:

logger.info("Performing hybrid search (semantic + keyword matching)")

from app.database.weaviate.operations import search_contributors

results = await search_contributors(
query_embedding=query_embedding,
keywords=alignment_result.get("keywords", []),
Expand Down
63 changes: 55 additions & 8 deletions backend/app/agents/devrel/nodes/react_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,30 @@

logger = logging.getLogger(__name__)

HIL_INTERRUPT_ACTIONS = ["web_search", "faq_handler", "onboarding", "github_toolkit"]


async def react_supervisor_node(state: AgentState, llm) -> Dict[str, Any]:
"""ReAct Supervisor: Think -> Act -> Observe"""
logger.info(f"ReAct Supervisor thinking for session {state.session_id}")

repo = state.context.get("repository")
if not repo:
waiting_for_user_input = True
interrupt_details = {
"prompt": "Before we proceed, could you please specify the project or repository you are working on?"
}
logger.info(f"Human-in-the-Loop interrupt: asking for repository context in session {state.session_id}")

updated_context = {
**state.context,
"waiting_for_user_input": True,
"interrupt_details": interrupt_details
}
return {
"context": updated_context,
"current_task": "waiting_for_user_input_repo"
}
Comment on lines +17 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HIL workflow is intended to be a separate handler as other nodes like faq_tool, web_search...
No need of HIL support in the decision router itself.

# Get current context
latest_message = _get_latest_message(state)
conversation_history = _get_conversation_history(state)
Expand All @@ -31,15 +51,42 @@ async def react_supervisor_node(state: AgentState, llm) -> Dict[str, Any]:

logger.info(f"ReAct Supervisor decision: {decision['action']}")

# Update state with supervisor's thinking
waiting_for_user_input = False
interrupt_details = {}

if decision["action"] in HIL_INTERRUPT_ACTIONS:
# Here you can add logic to decide if user input is needed
# For example, if decision thinking contains uncertainty or multiple options
# For demo, we just always pause at these actions to ask the user
waiting_for_user_input = True
interrupt_details = {
"prompt": f"The agent wants to execute the action: {decision['action']}. Please confirm or provide input."
}
logger.info(
f"Human-in-the-Loop interrupt triggered for action {decision['action']} in session {state.session_id}")

# Update state with supervisor's thinking and interrupt flag if needed
updated_context = {
**state.context,
"supervisor_thinking": response.content,
"supervisor_decision": decision,
"iteration_count": iteration_count + 1,
}

if waiting_for_user_input:
updated_context["waiting_for_user_input"] = True
updated_context["interrupt_details"] = interrupt_details

return {
"context": {
**state.context,
"supervisor_thinking": response.content,
"supervisor_decision": decision,
"iteration_count": iteration_count + 1
},
"current_task": f"supervisor_decided_{decision['action']}"
"context": updated_context,
"current_task": (
f"supervisor_decided_{decision['action']}"
if not waiting_for_user_input else "waiting_for_user_input"
),
**(
{"final_response": interrupt_details["prompt"], "requires_human_review": True}
if waiting_for_user_input else {}
)
}
Comment on lines +54 to 90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The HIL workflow is intended to be a separate handler as other nodes like faq_tool, web_search...
No need of HIL support in the decision router itself.


def _parse_supervisor_decision(response: str) -> Dict[str, Any]:
Expand Down
81 changes: 55 additions & 26 deletions backend/app/core/orchestration/agent_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from app.core.orchestration.queue_manager import AsyncQueueManager
from app.agents.devrel.nodes.summarization import store_summary_to_database
from langsmith import traceable
from app.database.weaviate.operations import WeaviateAgentStateOperations


logger = logging.getLogger(__name__)

Expand All @@ -16,8 +18,21 @@ def __init__(self, queue_manager: AsyncQueueManager):
self.queue_manager = queue_manager
self.devrel_agent = DevRelAgent()
self.active_sessions: Dict[str, AgentState] = {}

self._register_handlers()
self.weaviate_agent_state_ops = WeaviateAgentStateOperations()

async def load_agent_state(self, session_id):
state = self.active_sessions.get(session_id)
if state:
return state
state = await self.weaviate_agent_state_ops.load_agent_state(session_id)
if state:
self.active_sessions[session_id] = state
return state

async def save_agent_state(self, agent_state):
self.active_sessions[agent_state.session_id] = agent_state
await self.weaviate_agent_state_ops.create_or_update_agent_state(agent_state)

def _register_handlers(self):
"""Register message handlers"""
Expand All @@ -26,34 +41,42 @@ def _register_handlers(self):

@traceable(name="devrel_request_coordination", run_type="chain")
async def _handle_devrel_request(self, message_data: Dict[str, Any]):
"""Handle DevRel agent requests"""
try:
# Extract memory thread ID (user_id for Discord)
memory_thread_id = message_data.get("memory_thread_id") or message_data.get("user_id", "")
session_id = str(uuid.uuid4())

initial_state = AgentState(
session_id=session_id,
user_id=message_data.get("user_id", ""),
platform=message_data.get("platform", "discord"),
thread_id=message_data.get("thread_id"),
channel_id=message_data.get("channel_id"),
context={
"original_message": message_data.get("content", ""),
"classification": message_data.get("classification", {}),
"author": message_data.get("author", {})
}
)

# Run agent
logger.info(f"Running DevRel agent for session {session_id} with memory thread {memory_thread_id}")
result_state = await self.devrel_agent.run(initial_state, memory_thread_id)

# Check if thread timeout was reached during processing
session_id = message_data.get("session_id") # Try to reuse if provided

# Attempt to load existing state for this session, else create new
agent_state = None
if session_id:
agent_state = await self.load_agent_state(session_id)

if not agent_state:
session_id = str(uuid.uuid4())
agent_state = AgentState(
session_id=session_id,
user_id=message_data.get("user_id", ""),
platform=message_data.get("platform", "discord"),
thread_id=message_data.get("thread_id"),
channel_id=message_data.get("channel_id"),
context={
"original_message": message_data.get("content", ""),
"classification": message_data.get("classification", {}),
"author": message_data.get("author", {})
}
)

Comment on lines +46 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify message ingestion on resumed sessions.

When reusing an existing AgentState, the new user message isn’t added here. If the graph (or DevRelAgent) reliably appends the incoming message to state.messages, you’re fine; otherwise replies may lack the latest user input.

Run this to confirm where new user messages are appended:


🏁 Script executed:

#!/bin/bash
# Look for code that appends the current inbound message to AgentState.messages
rg -nP -C3 '\b(messages|state\.messages)\b.*(append|add|extend)' --glob '!**/node_modules/**' --glob '!**/dist/**'
# Inspect DevRel agent graph nodes handling inbound messages
rg -nP -C3 '\boriginal_message\b|\bHumanMessage\b|\badd_message\b|\bmessages\[-1\]' backend/app

Length of output: 26745


Critical missing message ingestion on resumed sessions

Our search didn’t find any place where the incoming user message is appended to AgentState.messages, so when reusing an existing session the latest user input never makes it into the conversation history. Without this, downstream nodes will only ever see the original message stored in state.context, not the full message log.

Please add an explicit append of the inbound message immediately after loading or creating the agent_state. For example, in backend/app/core/orchestration/agent_coordinator.py (around lines 46–67):

             if session_id:
                 agent_state = await self.load_agent_state(session_id)

             if not agent_state:
                 session_id = str(uuid.uuid4())
                 agent_state = AgentState(
                     session_id=session_id,
                     user_id=message_data.get("user_id", ""),
                     platform=message_data.get("platform", "discord"),
                     thread_id=message_data.get("thread_id"),
                     channel_id=message_data.get("channel_id"),
                     context={
                         "original_message": message_data.get("content", ""),
                         "classification": message_data.get("classification", {}),
                         "author": message_data.get("author", {})
                     }
                 )
+            # Ingest the current user message into the conversation history
+            from datetime import datetime
+            agent_state.messages.append({
+                "role": "user",
+                "content": message_data.get("content", ""),
+                "timestamp": datetime.utcnow().isoformat()
+            })
  • Ensure you import datetime at the top of the file.
  • This guarantees every new or resumed session has its latest user input recorded in state.messages before the agent graph runs.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
session_id = message_data.get("session_id") # Try to reuse if provided
# Attempt to load existing state for this session, else create new
agent_state = None
if session_id:
agent_state = await self.load_agent_state(session_id)
if not agent_state:
session_id = str(uuid.uuid4())
agent_state = AgentState(
session_id=session_id,
user_id=message_data.get("user_id", ""),
platform=message_data.get("platform", "discord"),
thread_id=message_data.get("thread_id"),
channel_id=message_data.get("channel_id"),
context={
"original_message": message_data.get("content", ""),
"classification": message_data.get("classification", {}),
"author": message_data.get("author", {})
}
)
session_id = message_data.get("session_id") # Try to reuse if provided
# Attempt to load existing state for this session, else create new
agent_state = None
if session_id:
agent_state = await self.load_agent_state(session_id)
if not agent_state:
session_id = str(uuid.uuid4())
agent_state = AgentState(
session_id=session_id,
user_id=message_data.get("user_id", ""),
platform=message_data.get("platform", "discord"),
thread_id=message_data.get("thread_id"),
channel_id=message_data.get("channel_id"),
context={
"original_message": message_data.get("content", ""),
"classification": message_data.get("classification", {}),
"author": message_data.get("author", {})
}
)
# Ingest the current user message into the conversation history
from datetime import datetime
agent_state.messages.append({
"role": "user",
"content": message_data.get("content", ""),
"timestamp": datetime.utcnow().isoformat()
})
🤖 Prompt for AI Agents
In backend/app/core/orchestration/agent_coordinator.py around lines 46 to 67,
when loading or creating the AgentState the incoming user message is never
appended to agent_state.messages so resumed sessions miss the latest user input;
fix by importing datetime at the top of the file and immediately after loading
or creating agent_state, append a new message entry to agent_state.messages
containing the message role (e.g., "user"), the message text from
message_data.get("content", ""), and a timestamp (e.g.,
datetime.datetime.utcnow().isoformat()), then persist or continue with the
updated agent_state so the latest input is included in the conversation history
before running the agent graph.

Comment on lines +48 to +67
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Critical: latest inbound user message is never appended to AgentState.messages.

On resumed sessions, downstream nodes will keep seeing the stale context["original_message"] instead of the new user input. This breaks conversation continuity and HIL prompts.

Insert ingestion just after the new-or-loaded state is established (right before the log at Line 68):

             if not agent_state:
                 session_id = str(uuid.uuid4())
                 agent_state = AgentState(
                     session_id=session_id,
                     user_id=message_data.get("user_id", ""),
                     platform=message_data.get("platform", "discord"),
                     thread_id=message_data.get("thread_id"),
                     channel_id=message_data.get("channel_id"),
                     context={
                         "original_message": message_data.get("content", ""),
                         "classification": message_data.get("classification", {}),
                         "author": message_data.get("author", {})
                     }
                 )
 
+            # Ingest the current user message into the conversation history
+            agent_state.messages.append({
+                "role": "user",
+                "content": message_data.get("content", ""),
+                "timestamp": datetime.utcnow().isoformat()
+            })
+            agent_state.interaction_count += 1
+            agent_state.last_interaction_time = datetime.utcnow()

Outside this hunk, add the import:

from datetime import datetime

logger.info(
f"Running DevRel agent for session {agent_state.session_id} with memory thread {memory_thread_id}")
result_state = await self.devrel_agent.run(agent_state, memory_thread_id)

Comment on lines +68 to +71
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Verify alignment with DevRelAgent’s InMemorySaver memory.

Per your prior design (learned context: DevRelAgent uses an InMemorySaver with thread-based conversation management), ensure the ingested message also flows into the DevRelAgent’s thread memory keyed by memory_thread_id. If DevRelAgent.run(...) does not push this message into its memory, you may see divergence between persisted state and in-memory tool memory.


🏁 Script executed:

#!/bin/bash
# 1) Confirm where DevRelAgent persists/reads conversation memory for a thread
rg -nP -C3 '\bInMemorySaver\b|\bConversationBuffer\b|\bmemory\b|\bthread\b' backend/app

# 2) Confirm whether the inbound message is appended to DevRelAgent memory on each request
rg -nP -C3 '\b(messages|state\.messages)\b.*(append|add|extend)|add_message|HumanMessage' backend/app

# 3) Verify how memory_thread_id is used to read/write memory
rg -nP -C3 '\bmemory_thread_id\b' backend/app

Length of output: 45649


Ensure incoming messages are saved to DevRelAgent’s thread memory

Our grep of the codebase shows that while DevRelAgent instantiates an InMemorySaver (self.checkpointer = InMemorySaver()), there are no calls to push new messages into that memory before invoking the agent graph. Likewise, in agent_coordinator, we did not find evidence of appending the inbound message to the state passed into DevRelAgent.run(). Without explicitly saving each incoming human message into the InMemorySaver (or otherwise wiring it into the StateGraph), the in-memory thread history will diverge from what’s persisted.

• In backend/app/agents/devrel/agent.py, verify that _build_graph() registers self.checkpointer with the graph and that run() or its pre‐invoke logic calls into self.checkpointer (e.g., saving the updated message list).
• In backend/app/core/orchestration/agent_coordinator.py, confirm that you append the latest user message to agent_state.messages (or equivalent) before calling self.devrel_agent.run(...), so that the graph sees the new input.
• Add or update code to explicitly save or push the inbound message into the InMemorySaver–backed graph state (for example, via a checkpointer.save_state(thread_id, agent_state.model_dump()) or using graph‐provided memory nodes) before ainvoke.

Tagging as critical: until message ingestion into thread memory is implemented, the DevRelAgent will not maintain a consistent conversation history.

🤖 Prompt for AI Agents
In backend/app/core/orchestration/agent_coordinator.py around lines 68-71, the
inbound user message is not being appended to the agent_state or saved into the
DevRelAgent's InMemorySaver before invoking the agent; update the coordinator to
append the latest human message to agent_state.messages (or the equivalent
field) and explicitly persist it to the DevRelAgent's thread memory (e.g., call
the agent's checkpointer.save_state(thread_id, agent_state.model_dump()) or use
the graph's memory node API) immediately before calling await
self.devrel_agent.run(agent_state, memory_thread_id); also verify in
backend/app/agents/devrel/agent.py that _build_graph registers self.checkpointer
with the graph and that run() (or its pre-invoke hook) either saves the incoming
message list to the checkpointer or reads from the saved state so in-memory
thread history and persisted state remain consistent.

# Save updated state after agent run
await self.save_agent_state(result_state)

# Handle memory timeout if reached
if result_state.memory_timeout_reached:
await self._handle_memory_timeout(memory_thread_id, result_state)

# Send response back to platform
# Send response back to platform if present
if result_state.final_response:
await self._send_response_to_platform(message_data, result_state.final_response)

Expand Down Expand Up @@ -111,7 +134,7 @@ async def _send_response_to_platform(self, original_message: Dict[str, Any], res
"thread_id": original_message.get("thread_id"),
"channel_id": original_message.get("channel_id"),
"response": response,
"original_message_id": original_message.get("id")
"original_message_id": original_message.get("id"),
}

await self.queue_manager.enqueue(response_message)
Expand All @@ -121,4 +144,10 @@ async def _send_response_to_platform(self, original_message: Dict[str, Any], res

async def _send_error_response(self, original_message: Dict[str, Any], error_message: str):
"""Send error response to platform"""
await self._send_response_to_platform(original_message, error_message)
session_id = original_message.get("session_id") or str(uuid.uuid4())
await self._send_response_to_platform(
original_message,
error_message,
session_id=session_id,
context={}
)
64 changes: 64 additions & 0 deletions backend/app/database/weaviate/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime, timezone
from app.models.database.weaviate import WeaviateUserProfile
from app.database.weaviate.client import get_weaviate_client
from app.agents.state import AgentState
import weaviate.exceptions as weaviate_exceptions
import weaviate.classes as wvc
from weaviate.classes.query import Filter
Expand Down Expand Up @@ -377,3 +378,66 @@ async def search_contributors(
return await operations.hybrid_search_contributors(
query_embedding, keywords, limit, vector_weight, bm25_weight
)
class WeaviateAgentStateOperations:
def __init__(self, collection_name: str = "agent_states"):
self.collection_name = collection_name

async def find_state_by_session_id(self, session_id: str) -> Optional[str]:
try:
async with get_weaviate_client() as client:
collection = client.collections.get(self.collection_name)
response = await collection.query.fetch_objects(
filters=Filter.by_property("session_id").equal(session_id),
limit=1
)
if response.objects:
return str(response.objects[0].uuid)
return None
except Exception as e:
logger.error(f"Weaviate error finding agent state by session_id: {e}")
return None

async def load_agent_state(self, session_id: str) -> Optional[AgentState]:
try:
async with get_weaviate_client() as client:
collection = client.collections.get(self.collection_name)
response = await collection.query.fetch_objects(
filters=Filter.by_property("session_id").equal(session_id),
limit=1
)
if response.objects:
prop = response.objects[0].properties
# Deserialize JSON fields
prop['messages'] = json.loads(prop.get('messages', '[]'))
prop['context'] = json.loads(prop.get('context', '{}'))
return AgentState.model_validate(prop)
return None
except Exception as e:
logger.error(f"Weaviate error loading agent state: {e}")
return None

async def create_or_update_agent_state(self, agent_state: AgentState) -> bool:
try:
state_dict = agent_state.model_dump()
# Serialize complex fields
state_dict['messages'] = json.dumps(state_dict.get('messages', []))
state_dict['context'] = json.dumps(state_dict.get('context', {}))

existing_uuid = await self.find_state_by_session_id(agent_state.session_id)
async with get_weaviate_client() as client:
collection = client.collections.get(self.collection_name)
if existing_uuid:
await collection.data.update(
uuid=existing_uuid,
properties=state_dict
)
logger.info(f"Updated agent state for session {agent_state.session_id}")
else:
await collection.data.insert(
properties=state_dict
)
logger.info(f"Created new agent state for session {agent_state.session_id}")
return True
except Exception as e:
logger.error(f"Weaviate error creating/updating agent state: {e}")
return False
12 changes: 12 additions & 0 deletions backend/app/database/weaviate/scripts/create_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ async def create_user_profile_schema(client):
]
await create_schema(client, "weaviate_user_profile", properties)

async def create_agent_state_schema(client):
properties = [
wc.Property(name="session_id", data_type=wc.DataType.TEXT),
wc.Property(name="user_id", data_type=wc.DataType.TEXT),
wc.Property(name="platform", data_type=wc.DataType.TEXT),
wc.Property(name="messages", data_type=wc.DataType.TEXT),
wc.Property(name="context", data_type=wc.DataType.TEXT),
# Add more fields if needed
]
await create_schema(client, "agent_states", properties)

async def create_all_schemas():
"""
Create only the user profile schema as per the model structure.
Expand All @@ -42,6 +53,7 @@ async def create_all_schemas():
try:
await client.connect()
await create_user_profile_schema(client)
await create_agent_state_schema(client)
print("✅ User profile schema created successfully.")
except Exception as e:
print(f"❌ Error creating schema: {str(e)}")
Expand Down
2 changes: 1 addition & 1 deletion backend/app/services/github/issue_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

from app.core.config import settings
from app.services.embedding_service.service import EmbeddingService
from app.services.github.user.profiling import GitHubUserProfiler
from app.agents.devrel.github.prompts.contributor_recommendation.issue_summarization import ISSUE_SUMMARIZATION_PROMPT

logger = logging.getLogger(__name__)
Expand All @@ -31,6 +30,7 @@ async def fetch_issue_content(self) -> str:
"""
Fetches and consolidates all text content from a GitHub issue.
"""
from app.services.github.user.profiling import GitHubUserProfiler
logger.info(f"Fetching content for {self.owner}/{self.repo}#{self.issue_number}")
async with GitHubUserProfiler() as profiler:
issue_url = f"{profiler.base_url}/repos/{self.owner}/{self.repo}/issues/{self.issue_number}"
Expand Down
Loading