diff --git a/examples/memory_pipeline.py b/examples/memory_pipeline.py new file mode 100644 index 00000000..c23a0bc2 --- /dev/null +++ b/examples/memory_pipeline.py @@ -0,0 +1,95 @@ + +import os +from haystack.components.agents import Agent +from haystack.components.generators.chat import OpenAIChatGenerator +from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder +from haystack.core.pipeline import Pipeline +from haystack.tools import tool +from haystack.dataclasses import ChatMessage +from haystack_experimental.memory import Mem0MemoryRetriever, MemoryWriter, Mem0MemoryStore + + +@tool +def save_user_preference(preference_type: str, preference_value: str) -> str: + """Save user preferences that should be remembered""" + return f"✅ Saved preference: {preference_type} = {preference_value}" + + +@tool +def get_recommendation(category: str) -> str: + """Get personalized recommendations based on user preferences""" + recommendations = { + "food": "Based on your preferences, try the Mediterranean cuisine!", + "music": "I recommend some jazz playlists for you!", + "books": "You might enjoy science fiction novels!", + } + return recommendations.get(category, "I'll learn your preferences to give better recommendations!") + + +# Create memory store +memory_store = Mem0MemoryStore(api_key=os.getenv("MEM0_API_KEY")) + +# Create memory-aware agent +memory_agent = Agent( + chat_generator=OpenAIChatGenerator(model="gpt-4o-mini"), + tools=[save_user_preference, get_recommendation], + system_prompt=""" + You are a personal assistant with memory capabilities. + Use the provided memories to personalize your responses and remember user context. + When users share preferences, use the save_user_preference tool. + When asked for recommendations, use the get_recommendation tool. + Be conversational and reference previous interactions when relevant. + """, + exit_conditions=["text"], + max_agent_steps=10, + raise_on_tool_invocation_failure=False +) + +# Create the pipeline +agent_memory_pipeline = Pipeline() + +# Add components +agent_memory_pipeline.add_component("memory_retriever", Mem0MemoryRetriever( + memory_store=memory_store, + top_k=5 +)) + +agent_memory_pipeline.add_component("prompt_builder", ChatPromptBuilder( + template=[ + ChatMessage.from_system( + "Previous conversation context:\n" + "{% for memory in memories %}" + "{{ memory.content }}\n" + "{% endfor %}" + "{% if not memories %}No previous context available.{% endif %}" + ), + ChatMessage.from_user("{{ user_query }}") + ], + required_variables=["user_query"] +)) + +agent_memory_pipeline.add_component("agent", memory_agent) +agent_memory_pipeline.add_component("memory_writer", MemoryWriter(memory_store=memory_store)) + +# Connect components +agent_memory_pipeline.connect("memory_retriever.memories", "prompt_builder.memories") +agent_memory_pipeline.connect("prompt_builder.prompt", "agent.messages") +agent_memory_pipeline.connect("agent.messages", "memory_writer.messages") + +# Run the pipeline +user_id = "alice_123" +user_query = "Can you remember this and give me a food recommendation?" + +# Get memories and run agent +agent_output = agent_memory_pipeline.run({ + "memory_retriever": { + "query": user_query, + "user_id": user_id + }, + "prompt_builder": { + "user_query": user_query + }, + "memory_writer": { + "user_id": user_id + } +}) diff --git a/haystack_experimental/memory/README.md b/haystack_experimental/memory/README.md new file mode 100644 index 00000000..0ce56168 --- /dev/null +++ b/haystack_experimental/memory/README.md @@ -0,0 +1,48 @@ +# Memory Components + +This directory contains Haystack components for implementing agent memory capabilities. + +## Components + +### MemoryStore Protocol (`protocol.py`) +Defines the interface for pluggable memory storage backends. + +### Mem0MemoryStore (`mem0_store.py`) +Implementation using Mem0 as the backend storage service. + +### MemoryRetriever (`memory_retriever.py`) +Component for retrieving relevant memories based on queries. + +### MemoryWriter (`memory_writer.py`) +Component for storing chat messages as memories. + +## Usage + +### Examples + +1. **Basic Memory Pipeline** (`examples/memory_pipeline_example.py`) + - Simple memory storage and retrieval + - Different memory types demonstration + +2. **Agent Memory Integration** (`examples/agent_memory_pipeline_example.py`) + - Complete agent with memory capabilities + - Memory-aware conversations + - Preference learning and recall + - Session persistence + +3. **Simple Agent Memory** (`examples/simple_agent_memory_example.py`) + - Minimal agent memory integration + - Direct pipeline structure + - Easy to understand and modify + +## Memory Types + +Memories are stored as ChatMessage objects with metadata: +- `memory_type`: "semantic" (facts/preferences) or "episodic" (experiences) +- `user_id`: User identifier for scoping +- `memory_id`: Unique identifier (set by storage backend) + +## Requirements + +- `pip install mem0ai` for Mem0MemoryStore +- MEM0_API_KEY environment variable diff --git a/haystack_experimental/memory/__init__.py b/haystack_experimental/memory/__init__.py new file mode 100644 index 00000000..30718a1e --- /dev/null +++ b/haystack_experimental/memory/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +from .mem0_store import Mem0MemoryStore +from .memory_retriever import Mem0MemoryRetriever +from .memory_writer import MemoryWriter + +__all__ = ["Mem0MemoryRetriever", "MemoryWriter", "Mem0MemoryStore"] diff --git a/haystack_experimental/memory/mem0_store.py b/haystack_experimental/memory/mem0_store.py new file mode 100644 index 00000000..7acfe887 --- /dev/null +++ b/haystack_experimental/memory/mem0_store.py @@ -0,0 +1,143 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +import os +from datetime import datetime +from typing import Any, Optional + +from haystack import default_from_dict, default_to_dict +from haystack.dataclasses.chat_message import ChatMessage +from haystack.lazy_imports import LazyImport + +with LazyImport(message="Run 'pip install mem0ai'") as mem0_import: + from mem0 import MemoryClient + + +class Mem0MemoryStore: + """ + A memory store implementation using Mem0 as the backend. + + This store provides semantic and episodic memory capabilities for agents + by integrating with the Mem0 memory service. Memories are stored as ChatMessage + objects with memory-specific metadata. + + :param api_key: Mem0 API key (if not provided, uses MEM0_API_KEY environment variable) + :param kwargs: Additional configuration parameters for Mem0 client + """ + + def __init__(self, api_key: Optional[str] = None, config: Optional[dict[Any, Any]] = None, **kwargs): + mem0_import.check() + self.api_key = api_key or os.getenv("MEM0_API_KEY") + if not self.api_key: + raise ValueError("Mem0 API key must be provided either as parameter or MEM0_API_KEY environment variable") + + self.config = config + self.client = ( + MemoryClient.from_config(self.config) if self.config else MemoryClient(api_key=self.api_key, **kwargs) + ) + + def to_dict(self) -> dict[str, Any]: + """Serialize the store configuration to a dictionary.""" + return default_to_dict(self, api_key=self.api_key, config=self.config) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "Mem0MemoryStore": + """Deserialize the store from a dictionary.""" + return default_from_dict(cls, data) + + def add_memories(self, user_id: str, messages: list[ChatMessage]) -> list[str]: + """ + Add ChatMessage memories to Mem0. + + :param messages: List of ChatMessage objects with memory metadata + :param user_id: User identifier associated with the memories + :returns: List of memory IDs for the added messages + """ + added_ids = [] + + for message in messages: + if not message.text: + continue + mem0_message = [{"role": message.role, "content": message.text}] + + try: + # Mem0 primarily uses user_id as the main identifier + # org_id and session_id are stored in metadata for filtering + result = self.client.add(messages=mem0_message, user_id=user_id, metadata=message.meta, infer=False) + # Mem0 returns different response formats, handle both + memory_id = result.get("id") or result.get("memory_id") or str(result) + added_ids.append(memory_id) + except Exception as e: + raise RuntimeError(f"Failed to add memory message: {e}") from e + + return added_ids + + def search_memories( + self, + query: str, + user_id: str, + filters: Optional[dict[str, Any]] = None, + top_k: int = 10, + ) -> list[ChatMessage]: + """ + Search for memories in Mem0. + + :param query: Text query to search for + :param user_id: User identifier for scoping the search + :param filters: Additional filters to apply on search. For more details on mem0 filters, see https://mem0.ai/docs/search/ + :param top_k: Maximum number of results to return + :returns: List of ChatMessage memories matching the criteria + """ + # Prepare filters for Mem0 + search_filters = filters or {} + + mem0_filters = {"AND": [{"user_id": user_id}, search_filters]} + + try: + results = self.client.search(query=query, limit=top_k, filters=mem0_filters, user_id=user_id) + memories = [ + ChatMessage.from_assistant(text=result["content"], meta=result["metadata"]) for result in results + ] + + return memories + + except Exception as e: + raise RuntimeError(f"Failed to search memories: {e}") from e + + def update_memories(self, messages: list[ChatMessage]) -> int: + """ + Update ChatMessage memories in Mem0. + + :param messages: List of ChatMessage memories to update (must have memory_id in meta) + :returns: Number of records actually updated + """ + + for message in messages: + memory_id = message.meta.get("memory_id") + if not memory_id: + raise ValueError("ChatMessage must have memory_id in meta to be updated") + + metadata = { + "role": message.role.value, + "updated_at": datetime.now().isoformat(), + **{k: v for k, v in message.meta.items() if k not in ["memory_id", "user_id"]}, + } + + try: + self.client.update(memory_id=memory_id, data=message.text or str(message), metadata=metadata) + except Exception as e: + raise RuntimeError(f"Failed to update memory {memory_id}: {e}") from e + + # mem0 doesn't allow passing filter to delete endpoint, + # we can delete all memories for a user by passing the user_id + def delete_all_memories(self, user_id: str): + """ + Delete memory records from Mem0. + + :param user_id: User identifier for scoping the deletion + """ + + try: + self.client.delete_all(user_id=user_id) + except Exception as e: + raise RuntimeError(f"Failed to delete memories for user {user_id}: {e}") from e diff --git a/haystack_experimental/memory/mem_super_component.py b/haystack_experimental/memory/mem_super_component.py new file mode 100644 index 00000000..d6d15ee5 --- /dev/null +++ b/haystack_experimental/memory/mem_super_component.py @@ -0,0 +1,76 @@ +from haystack import Pipeline, SuperComponent, super_component +from haystack.components.agents import Agent +from haystack.components.builders.chat_prompt_builder import ChatPromptBuilder +from haystack.components.generators.chat import ChatGenerator +from haystack.dataclasses import ChatMessage, Document +from haystack.tools import Tool + +from .mem0_store import Mem0MemoryStore +from .memory_retriever import Mem0MemoryRetriever +from .memory_writer import MemoryWriter + + +@super_component +class AgentMemory: + def __init__( + self, + system_prompt: str, + tools: list[Tool], + chat_generator: ChatGenerator, + exit_conditions: list[str], + max_agent_steps: int, + raise_on_tool_invocation_failure: bool, + ): + memory_store = Mem0MemoryStore() + memory_retriever = Mem0MemoryRetriever(memory_store=memory_store) + memory_writer = MemoryWriter(memory_store=memory_store) + agent = Agent( + chat_generator=chat_generator, + tools=tools, + system_prompt=system_prompt, + exit_conditions=exit_conditions, + max_agent_steps=max_agent_steps, + raise_on_tool_invocation_failure=raise_on_tool_invocation_failure, + ) + pipeline = Pipeline() + pipeline.add_component("memory_retriever", memory_retriever) + + pipeline.add_component( + "prompt_builder", + ChatPromptBuilder( + template=[ + ChatMessage.from_system( + "Previous conversation context:\n" + "{% for memory in memories %}" + "{{ memory.content }}\n" + "{% endfor %}" + "{% if not memories %}No previous context available.{% endif %}" + ), + ChatMessage.from_user("{{ user_query }}"), + ], + required_variables=["user_query"], + ), + ) + + pipeline.add_component("agent", agent) + pipeline.add_component("memory_writer", memory_writer) + + # Connect components + pipeline.connect("memory_retriever.memories", "prompt_builder.memories") + pipeline.connect("prompt_builder.prompt", "agent.messages") + pipeline.connect("agent.messages", "memory_writer.messages") + + self.output_mapping = { + "agent.messages": "messages", + "memory_writer.messages": "messages", + } + self.input_mapping = { + "query": "retriever.query", + "user_id": ["retriever.user_id", "writer.user_id"], + } + + def run(self, *, query: str) -> dict[str, list[Document]]: # noqa: D102 + ... + + def warmup(self) -> None: # noqa: D102 + ... diff --git a/haystack_experimental/memory/memory_retriever.py b/haystack_experimental/memory/memory_retriever.py new file mode 100644 index 00000000..e7f6bada --- /dev/null +++ b/haystack_experimental/memory/memory_retriever.py @@ -0,0 +1,93 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Optional + +from haystack import component, default_from_dict, default_to_dict +from haystack.dataclasses.chat_message import ChatMessage + +from haystack_experimental.memory.mem0_store import Mem0MemoryStore + + +@component +class Mem0MemoryRetriever: + """ + Retrieves relevant memories from a Mem0MemoryStore before agent interactions. + + This component searches for memories that are relevant to the current query + and returns them as ChatMessage objects that can be used to provide context + to agents for more personalized responses. + + Usage example: + ```python + from haystack.components.memory import Mem0MemoryRetriever + from haystack.components.memory.mem0_store import Mem0MemoryStore + + memory_store = Mem0MemoryStore(api_key="your-api-key") + retriever = Mem0MemoryRetriever(memory_store=memory_store, top_k=5) + + result = retriever.run( + query="What's my timezone preference?", + user_id="user_123" + ) + print(result["memories"]) # List of ChatMessage objects with memory metadata + ``` + + :param memory_store: The memory store to retrieve memories from + :param top_k: Maximum number of memories to retrieve + :param default_filters: Default filters to apply to all searches + """ + + def __init__( + self, + memory_store: Mem0MemoryStore, + top_k: int = 10, + ): + self.memory_store = memory_store + self.top_k = top_k + + def to_dict(self) -> dict[str, Any]: + """Serialize the component to a dictionary.""" + return default_to_dict( + self, + memory_store=self.memory_store.to_dict(), + top_k=self.top_k, + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "Mem0MemoryRetriever": + """Deserialize the component from a dictionary.""" + + # TODO: Deserialize memory store based on type + # This would need proper import logic based on store type + + return default_from_dict(cls, data) + + @component.output_types(memories=list[ChatMessage]) + def run( + self, + query: Optional[str] = None, + user_id: Optional[str] = None, + filters: Optional[dict[str, Any]] = None, + top_k: Optional[int] = None, + ) -> dict[str, Any]: + """ + Retrieve relevant memories from the memory store. + + :param query: Text query to search for relevant memories + :param user_id: User identifier for scoping the search + :param filters: Additional filters to apply to the search + :param top_k: Maximum number of memories to retrieve (overrides default) + + :returns: Dictionary with "memories" key containing list of ChatMessage objects + """ + + # Search for memories directly + memories = self.memory_store.search_memories( + query=query, + user_id=user_id, + filters=filters, + top_k=top_k or self.top_k, + ) + + return {"memories": memories} diff --git a/haystack_experimental/memory/memory_writer.py b/haystack_experimental/memory/memory_writer.py new file mode 100644 index 00000000..413c095c --- /dev/null +++ b/haystack_experimental/memory/memory_writer.py @@ -0,0 +1,129 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Optional + +from haystack import component, default_from_dict, default_to_dict +from haystack.dataclasses.chat_message import ChatMessage + +from haystack_experimental.memory.protocol import MemoryStore + + +@component +class MemoryWriter: + """ + Writes chat messages to a MemoryStore for long-term memory. + + This component processes chat messages and stores them as memories with + appropriate metadata that can be retrieved later for personalized agent interactions. + + Usage example: + ```python + from haystack.components.memory import MemoryWriter + from haystack.components.memory.mem0_store import Mem0MemoryStore + from haystack.dataclasses import ChatMessage + + memory_store = Mem0MemoryStore(api_key="your-api-key") + writer = MemoryWriter(memory_store=memory_store) + + # Create messages with identifiers in metadata + messages = [ + ChatMessage.from_user( + "I prefer dark mode", + meta={ + "user_id": "user_123", + "org_id": "org_456", + "session_id": "session_789", + "memory_type": "semantic" + } + ), + ChatMessage.from_assistant( + "I'll remember your preference", + meta={ + "user_id": "user_123", + "org_id": "org_456", + "session_id": "session_789", + "memory_type": "episodic" + } + ) + ] + + # Write memories - identifiers are extracted from message metadata + result = writer.run( + messages=messages, + metadata={"conversation_id": "conv_001"} # Optional additional metadata + ) + print(result["memories_written"]) # Number of memories written + ``` + + :param memory_store: The memory store to write memories to + """ + + def __init__( + self, + memory_store: MemoryStore, + ): + self.memory_store = memory_store + + def to_dict(self) -> dict[str, Any]: + """Serialize the component to a dictionary.""" + return default_to_dict( + self, + memory_store=self.memory_store.to_dict(), + ) + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MemoryWriter": + """Deserialize the component from a dictionary.""" + + @component.output_types(memories_written=int) + def run( + self, + messages: list[ChatMessage], + user_id: str, + metadata: Optional[dict[str, Any]] = None, + ) -> dict[str, Any]: + """ + Write chat messages as memories to the memory store. + + The component extracts user_id from each ChatMessage's + metadata to properly scope the memories in the store. + + :param messages: List of chat messages to store as memories. Each message should + have user_id in its metadata for proper scoping. + :param user_id: User identifier for scoping the memories + :param metadata: Additional metadata to attach to all memories + + :returns: Dictionary with "memories_written" key containing the number of memories written + """ + # Process messages and add any additional metadata + processed_messages = [] + + for message in messages: + updated_meta = {**message.meta} + + # Add any additional metadata + if metadata: + updated_meta.update(metadata) + + # Create new message with updated metadata (if any additional metadata was provided) + if metadata: + processed_message = ChatMessage( + role=message.role, + content=message.content, + meta=updated_meta, + name=message.name, + tool_calls=message.tool_calls, + tool_call_result=message.tool_call_result, + ) + processed_messages.append(processed_message) + else: + # Use original message if no additional metadata + processed_messages.append(message) + + # Write memories to store + try: + added_ids = self.memory_store.add_memories(user_id, processed_messages) + return {"memories_written": len(added_ids)} + except Exception as e: + raise RuntimeError(f"Failed to write memories: {e}") from e diff --git a/haystack_experimental/memory/protocol.py b/haystack_experimental/memory/protocol.py new file mode 100644 index 00000000..78804791 --- /dev/null +++ b/haystack_experimental/memory/protocol.py @@ -0,0 +1,81 @@ +# SPDX-FileCopyrightText: 2022-present deepset GmbH +# SPDX-License-Identifier: Apache-2.0 + +from typing import Any, Optional, Protocol, runtime_checkable + +from haystack.dataclasses.chat_message import ChatMessage + + +@runtime_checkable +class MemoryStore(Protocol): + """ + Protocol for memory storage backends that can store and retrieve agent memories. + + This protocol defines the interface for pluggable memory stores that can be used + with MemoryRetriever and MemoryWriter components. Memories are stored as ChatMessage + objects with memory-specific metadata. + """ + + def to_dict(self) -> dict[str, Any]: + """ + Serializes this store to a dictionary. + + :returns: Dictionary representation of the store configuration + """ + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "MemoryStore": + """ + Deserializes the store from a dictionary. + + :param data: Dictionary containing store configuration + :returns: Initialized MemoryStore instance + """ + + def add_memories(self, user_id: str, messages: list[ChatMessage]) -> list[str]: + """ + Adds ChatMessage memories to the store. + + :param messages: List of ChatMessage objects with memory metadata + :returns: List of memory IDs for the added messages + """ + + def filter_memories(self, user_id: str, filters: Optional[dict[Any, Any]]) -> list[str]: + """ + Retrieve memories based on the filter. + + :param messages: List of ChatMessage objects with memory metadata + :returns: List of memory IDs for the added messages + """ + + def search_memories( + self, + query: str, + user_id: Optional[str] = None, + filters: Optional[dict[str, Any]] = None, + top_k: int = 10, + ) -> list[ChatMessage]: + """ + Searches for memories matching the criteria. + + :param query: Text query to search for + :param user_id: User identifier for scoping + :param filters: Additional filters to apply + :param top_k: Maximum number of results to return + :returns: List of ChatMessage memories matching the criteria + """ + + def update_memories(self, messages: list[ChatMessage]) -> None: + """ + Updates existing memory messages in the store. + + :param messages: List of ChatMessage memories to update (must have memory_id in meta) + """ + + def delete_all_memories(self, memory_ids: list[str]) -> int: + """ + Deletes memory records by their IDs. + + :param memory_ids: List of memory IDs to delete + :returns: Number of records actually deleted + """