Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions libs/langchain-mongodb/langchain_mongodb/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,55 @@ def create_fulltext_search_index(
timeout=wait_until_complete,
)
logger.info(result)


def create_autoembedded_vector_search_index(
collection: Collection,
index_name: str,
path: str,
model: str,
filters: Optional[List[str]] = None,
wait_until_complete: Optional[float] = None,
**kwargs: Any,
) -> None:
"""Experimental Utility function to create a vector search index with autoembedding.

Args:
collection (Collection): MongoDB Collection
index_name (str): Name of Index
path (str): field containing strings to produce embedding vectors from.
filters (List[str]): Fields/paths to index to allow filtering in $vectorSearch
wait_until_complete (Optional[float]): If provided, number of seconds to wait
until search index is ready.
kwargs: Keyword arguments supplying any additional options to SearchIndexModel.
"""
logger.info("Creating Search Index %s on %s", index_name, collection.name)

if collection.name not in collection.database.list_collection_names():
collection.database.create_collection(collection.name)

fields = [{
"type": "text",
"model": model,
"path": path,
}]
if filters:
for field in filters:
fields.append({"type": "filter", "path": field})
definition = {"fields": fields}

result = collection.create_search_index(
SearchIndexModel(
definition=definition,
name=index_name,
type="vectorSearch",
)
)

if wait_until_complete:
_wait_for_predicate(
predicate=lambda: _is_index_ready(collection, index_name),
err=f"{index_name=} did not complete in {wait_until_complete}!",
timeout=wait_until_complete,
)
logger.info(result)
290 changes: 289 additions & 1 deletion libs/langchain-mongodb/langchain_mongodb/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
from pymongo import MongoClient, ReplaceOne
from pymongo.collection import Collection
from pymongo.errors import CollectionInvalid
from langchain_voyageai import VoyageAIEmbeddings

from langchain_mongodb.index import (
create_vector_search_index,
update_vector_search_index,
create_autoembedded_vector_search_index,
)
from langchain_mongodb.pipelines import vector_search_stage
from langchain_mongodb.utils import (
Expand All @@ -46,6 +48,10 @@
DEFAULT_INSERT_BATCH_SIZE = 100


# TODO: fold the autoembedding indexes into MongoDBAtlasVectorSearch
# Add flag containing type: e.g. self.embedding_type in ["manual", "auto"], autoembedding (bool)
# - OR add model (str) param and infer: e.g. if embedding is not None: self.embedding_type = "manual"
# - assert not embedding and model
class MongoDBAtlasVectorSearch(VectorStore):
"""MongoDB Atlas vector store integration.

Expand Down Expand Up @@ -202,7 +208,7 @@ class MongoDBAtlasVectorSearch(VectorStore):
def __init__(
self,
collection: Collection[Dict[str, Any]],
embedding: Embeddings,
embedding: Embeddings = None,
index_name: str = "vector_index",
text_key: Union[str, List[str]] = "text",
embedding_key: str = "embedding",
Expand Down Expand Up @@ -865,3 +871,285 @@ def create_vector_search_index(
wait_until_complete=wait_until_complete,
**kwargs,
) # type: ignore [operator]


class AutoEmbeddingVectorStore(VectorStore):
"""Automated embedding in Atlas Vector Search


"""
def __init__(
self,
collection: Collection[Dict[str, Any]],
index_name: str = "vector_index",
text_key: Union[str, List[str]] = "text",
model: str = "voyage-3-large",
auto_create_index: bool | None = None,
auto_index_timeout: int = 15,
**kwargs: Any,
):

self._collection = collection
self._index_name = index_name
self._text_key = text_key if isinstance(text_key, str) else text_key[0]
self._model = model

# append_metadata was added in PyMongo 4.14.0, but is a valid database name on earlier versions
_append_client_metadata(self._collection.database.client)

if auto_create_index is False:
return
if not any([ix["name"] == index_name for ix in collection.list_search_indexes()]):
create_autoembedded_vector_search_index(
collection=collection,
index_name=index_name,
path=text_key,
model=model,
wait_until_complete=auto_index_timeout,
**kwargs
)

@classmethod
def from_connection_string(
cls,
connection_string: str,
database_name: str,
collection_name: str,
**kwargs: Any,
) -> MongoDBAtlasVectorSearch:
"""Construct a `MongoDB Atlas Vector Search` vector store
from a MongoDB connection URI.

Args:
connection_string: A valid MongoDB connection URI.
database_name: Will use this database or create a new one.
collection_name: Will use this collection or create a new one.


Returns:
A new MongoDBAtlasVectorSearch instance.

"""
client: MongoClient = MongoClient(
connection_string,
driver=DRIVER_METADATA,
)
collection = client[database_name][collection_name]
return cls(collection, **kwargs)

@property
def collection(self) -> Collection[Dict[str, Any]]:
return self._collection

def close(self) -> None:
"""Close the resources used by the MongoDBAtlasVectorSearch."""
self._collection.database.client.close()


def bulk_embed_and_insert_texts(
self,
texts: Union[List[str], Iterable[str]],
metadatas: Union[List[dict], Generator[dict, Any, Any]],
ids: Optional[List[str]] = None,
) -> List[str]:
"""Bulk insert single batch of texts, metadatas, and optionally ids.

See add_texts for additional details.
"""
if not texts:
return []

if not ids:
ids = [str(ObjectId()) for _ in range(len(list(texts)))]
docs = [
{
"_id": str_to_oid(i),
self._text_key: t,
**m,
}
for i, t, m, in zip(ids, texts, metadatas)
]
operations = [ReplaceOne({"_id": doc["_id"]}, doc, upsert=True) for doc in docs]
# insert the documents in MongoDB Atlas
result = self._collection.bulk_write(operations)
assert result.upserted_ids is not None
return [oid_to_str(_id) for _id in result.upserted_ids.values()]

def add_documents(
self,
documents: List[Document],
ids: Optional[List[str]] = None,
batch_size: int = DEFAULT_INSERT_BATCH_SIZE,
**kwargs: Any,
) -> List[str]:
"""Add documents to the vectorstore.

Args:
documents: Documents to add to the vectorstore.
ids: Optional list of unique ids that will be used as index in VectorStore.
See note on ids in add_texts.
batch_size: Number of documents to insert at a time.
Tuning this may help with performance and sidestep MongoDB limits.

Returns:
List of IDs of the added texts.
"""
n_docs = len(documents)
if ids:
assert len(ids) == n_docs, "Number of ids must equal number of documents."
else:
ids = [doc.id or str(ObjectId()) for doc in documents]
result_ids = []
start = 0
for end in range(batch_size, n_docs + batch_size, batch_size):
texts, metadatas = zip(
*[(doc.page_content, doc.metadata) for doc in documents[start:end]]
)
result_ids.extend(
self.bulk_embed_and_insert_texts(
texts=texts, metadatas=metadatas, ids=ids[start:end]
)
)
start = end
return result_ids

def similarity_search_with_score(
self,
query: str,
k: int = 4,
pre_filter: Optional[Dict[str, Any]] = None,
post_filter_pipeline: Optional[List[Dict]] = None,
oversampling_factor: int = 10,
include_embeddings: bool = False,
**kwargs: Any,
) -> List[Tuple[Document, float]]:

# Atlas Vector Search on auto-embedding index

search_defn = {
"query": query,
"index": self._index_name,
"path": self._text_key,
"limit": k,
"numCandidates": k * oversampling_factor,
}
if pre_filter:
search_defn["filter"] = pre_filter

pipeline = [
{"$vectorSearch": search_defn},
{"$set": {"score": {"$meta": "vectorSearchScore"}}},
]

# Post-processing
if post_filter_pipeline is not None:
pipeline.extend(post_filter_pipeline)

# Execution
cursor = self._collection.aggregate(pipeline) # type: ignore[arg-type]
docs = []

# Format
for res in cursor:
if self._text_key not in res:
continue
text = res.pop(self._text_key)
score = res.pop("score")
make_serializable(res)
docs.append(
(Document(page_content=text, metadata=res, id=res["_id"]), score)
)
return docs

@classmethod
def from_texts(
cls,
texts: List[str],
embedding: Embeddings = None, # VoyageAIEmbeddings(model="voyage-3-large"),
metadatas: Optional[List[Dict]] = None,
collection: Optional[Collection] = None,
ids: Optional[List[str]] = None,
**kwargs: Any,
) -> MongoDBAtlasVectorSearch:
"""Construct a `MongoDB Atlas Vector Search` vector store from raw documents.

This is a user-friendly interface that:
1. Embeds documents.
2. Adds the documents to a provided MongoDB Atlas Vector Search index
(Lucene)

This is intended to be a quick way to get started.

See `MongoDBAtlasVectorSearch` for kwargs and further description.


Example:
.. code-block:: python
from pymongo import MongoClient

from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_openai import OpenAIEmbeddings

mongo_client = MongoClient("<YOUR-CONNECTION-STRING>")
collection = mongo_client["<db_name>"]["<collection_name>"]
embeddings = OpenAIEmbeddings()
vectorstore = MongoDBAtlasVectorSearch.from_texts(
texts,
embeddings,
metadatas=metadatas,
collection=collection
)
"""
if collection is None:
raise ValueError("Must provide 'collection' named parameter.")
vectorstore = cls(collection, embedding.model, **kwargs)
vectorstore.add_texts(texts=texts, metadatas=metadatas, ids=ids, **kwargs)
return vectorstore

def similarity_search(
self,
query: str,
k: int = 4,
pre_filter: Optional[Dict[str, Any]] = None,
post_filter_pipeline: Optional[List[Dict]] = None,
oversampling_factor: int = 10,
include_scores: bool = False,
include_embeddings: bool = False,
**kwargs: Any,
) -> List[Document]: # noqa: E501
"""Return MongoDB documents most similar to the given query.

Atlas Vector Search eliminates the need to run a separate
search system alongside your database.

Args:
query: Input text of semantic query
k: (Optional) number of documents to return. Defaults to 4.
pre_filter: List of MQL match expressions comparing an indexed field
post_filter_pipeline: (Optional) Pipeline of MongoDB aggregation stages
to filter/process results after $vectorSearch.
oversampling_factor: Multiple of k used when generating number of candidates
at each step in the HNSW Vector Search,
include_scores: If True, the query score of each result
will be included in metadata.
include_embeddings: If True, the embedding vector of each result
will be included in metadata.
kwargs: Additional arguments are specific to the search_type

Returns:
List of documents most similar to the query and their scores.
"""
docs_and_scores = self.similarity_search_with_score(
query,
k=k,
pre_filter=pre_filter,
post_filter_pipeline=post_filter_pipeline,
oversampling_factor=oversampling_factor,
include_embeddings=include_embeddings,
**kwargs,
)

if include_scores:
for doc, score in docs_and_scores:
doc.metadata["score"] = score
return [doc for doc, _ in docs_and_scores]
1 change: 1 addition & 0 deletions libs/langchain-mongodb/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies = [
"langchain-text-splitters>=0.3",
"numpy>=1.26",
"lark<2.0.0,>=1.1.9",
"langchain-voyageai>=0.1.7",
]

[dependency-groups]
Expand Down
Loading
Loading