Skip to content

Commit 15d9c28

Browse files
Modernize Milvus API from deprecated connections to MilvusClient
- Replace deprecated connections.connect() with MilvusClient(uri=endpoint) - Implement URI endpoint composition pattern for MilvusClient initialization - Update MilvusVectorStore class to use modern MilvusClient methods - Simplify schema creation using MilvusClient's streamlined approach - Fix vector search by adding anns_field parameter specification - Add proper resource cleanup with client.close() method - Remove deprecated utility imports and Collection-based operations - Ensure compatibility with current Milvus Python SDK
1 parent 1f9f871 commit 15d9c28

File tree

1 file changed

+98
-85
lines changed

1 file changed

+98
-85
lines changed

bootcamp/RAG/RAG_Milvus_LangChain_Anthropic.ipynb

Lines changed: 98 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -142,8 +142,8 @@
142142
"from langchain.schema import Document\n",
143143
"from langchain.document_loaders import TextLoader, PyPDFLoader, DirectoryLoader\n",
144144
"\n",
145-
"# Milvus imports\n",
146-
"from pymilvus import connections, Collection, CollectionSchema, FieldSchema, DataType, utility\n",
145+
"# Milvus imports - using modern MilvusClient only\n",
146+
"from pymilvus import MilvusClient\n",
147147
"\n",
148148
"# Embedding and LLM imports\n",
149149
"from sentence_transformers import SentenceTransformer\n",
@@ -193,7 +193,7 @@
193193
" top_k: int = 5\n",
194194
" \n",
195195
" # Anthropic configuration\n",
196-
" ANTHROPIC_API_KEY: Optional[str] = None\n",
196+
" ANTHROPIC_API_KEY: Optional[str] = \"your_api_key\"\n",
197197
" model_name: str = \"claude-sonnet-4-20250514\"\n",
198198
" max_tokens: int = 1000\n",
199199
" \n",
@@ -387,7 +387,7 @@
387387
"outputs": [],
388388
"source": [
389389
"class MilvusVectorStore:\n",
390-
" \"\"\"Handles Milvus vector database operations.\"\"\"\n",
390+
" \"\"\"Handles Milvus vector database operations using modern MilvusClient.\"\"\"\n",
391391
" \n",
392392
" def __init__(self, host: str = \"localhost\", port: str = \"19530\", collection_name: str = \"rag_documents\"):\n",
393393
" \"\"\"\n",
@@ -401,100 +401,91 @@
401401
" self.host = host\n",
402402
" self.port = port\n",
403403
" self.collection_name = collection_name\n",
404-
" self.collection = None\n",
405404
" \n",
406-
" # Connect to Milvus\n",
405+
" # Connect to Milvus using the modern MilvusClient\n",
407406
" self._connect()\n",
408407
" \n",
409408
" def _connect(self) -> None:\n",
410409
" \"\"\"\n",
411-
" Establish connection to Milvus server.\n",
410+
" Establish connection to Milvus server using MilvusClient.\n",
412411
" \"\"\"\n",
413412
" try:\n",
414-
" connections.connect(\"default\", host=self.host, port=self.port)\n",
415-
" logger.info(f\"Connected to Milvus at {self.host}:{self.port}\")\n",
413+
" # Use the modern MilvusClient with uri endpoint\n",
414+
" uri = f\"http://{self.host}:{self.port}\"\n",
415+
" self.client = MilvusClient(uri=uri)\n",
416+
" logger.info(f\"Connected to Milvus at {uri}\")\n",
416417
" except Exception as e:\n",
417418
" logger.error(f\"Failed to connect to Milvus: {e}\")\n",
418419
" raise\n",
419420
" \n",
420421
" def create_collection(self, embedding_dim: int) -> None:\n",
421422
" \"\"\"\n",
422-
" Create a new collection with the specified schema.\n",
423+
" Create a new collection using MilvusClient's simplified approach.\n",
423424
" \n",
424425
" Args:\n",
425426
" embedding_dim: Dimension of the embedding vectors\n",
426427
" \"\"\"\n",
427-
" # Define collection schema\n",
428-
" fields = [\n",
429-
" FieldSchema(name=\"id\", dtype=DataType.INT64, is_primary=True, auto_id=True),\n",
430-
" FieldSchema(name=\"text\", dtype=DataType.VARCHAR, max_length=65535),\n",
431-
" FieldSchema(name=\"embedding\", dtype=DataType.FLOAT_VECTOR, dim=embedding_dim),\n",
432-
" FieldSchema(name=\"metadata\", dtype=DataType.VARCHAR, max_length=65535)\n",
433-
" ]\n",
434-
" \n",
435-
" schema = CollectionSchema(fields, \"RAG document collection\")\n",
436-
" \n",
437-
" # Drop existing collection if it exists\n",
438-
" if utility.has_collection(self.collection_name):\n",
439-
" utility.drop_collection(self.collection_name)\n",
440-
" logger.info(f\"Dropped existing collection: {self.collection_name}\")\n",
441-
" \n",
442-
" # Create collection\n",
443-
" self.collection = Collection(self.collection_name, schema)\n",
444-
" logger.info(f\"Created collection: {self.collection_name}\")\n",
445-
" \n",
446-
" # Create index for vector field\n",
447-
" index_params = {\n",
448-
" \"metric_type\": \"COSINE\",\n",
449-
" \"index_type\": \"IVF_FLAT\",\n",
450-
" \"params\": {\"nlist\": 1024}\n",
451-
" }\n",
452-
" \n",
453-
" self.collection.create_index(\"embedding\", index_params)\n",
454-
" logger.info(\"Created index for embedding field\")\n",
428+
" try:\n",
429+
" # Drop existing collection if it exists\n",
430+
" if self.client.has_collection(self.collection_name):\n",
431+
" self.client.drop_collection(self.collection_name)\n",
432+
" logger.info(f\"Dropped existing collection: {self.collection_name}\")\n",
433+
" \n",
434+
" # MilvusClient uses a simplified schema creation approach\n",
435+
" self.client.create_collection(\n",
436+
" collection_name=self.collection_name,\n",
437+
" dimension=embedding_dim,\n",
438+
" metric_type=\"COSINE\",\n",
439+
" index_type=\"IVF_FLAT\",\n",
440+
" index_params={\"nlist\": 1024}\n",
441+
" )\n",
442+
" \n",
443+
" logger.info(f\"Created collection: {self.collection_name} with dimension {embedding_dim}\")\n",
444+
" \n",
445+
" except Exception as e:\n",
446+
" logger.error(f\"Error creating collection: {e}\")\n",
447+
" raise\n",
455448
" \n",
456449
" def load_collection(self) -> None:\n",
457450
" \"\"\"\n",
458451
" Load existing collection.\n",
459452
" \"\"\"\n",
460-
" if utility.has_collection(self.collection_name):\n",
461-
" self.collection = Collection(self.collection_name)\n",
462-
" self.collection.load()\n",
463-
" logger.info(f\"Loaded collection: {self.collection_name}\")\n",
453+
" if self.client.has_collection(self.collection_name):\n",
454+
" logger.info(f\"Collection {self.collection_name} exists and is ready\")\n",
464455
" else:\n",
465456
" raise ValueError(f\"Collection {self.collection_name} does not exist\")\n",
466457
" \n",
467458
" def add_documents(self, texts: List[str], embeddings: List[np.ndarray], metadata: List[Dict[str, Any]]) -> None:\n",
468459
" \"\"\"\n",
469-
" Add documents to the collection.\n",
460+
" Add documents to the collection using MilvusClient.\n",
470461
" \n",
471462
" Args:\n",
472463
" texts: List of document texts\n",
473464
" embeddings: List of embedding vectors\n",
474465
" metadata: List of metadata dictionaries\n",
475466
" \"\"\"\n",
476-
" # Convert metadata to JSON strings\n",
477-
" metadata_strs = [json.dumps(meta) for meta in metadata]\n",
478-
" \n",
479-
" # Prepare data for insertion\n",
480-
" data = [\n",
481-
" texts,\n",
482-
" embeddings.tolist(),\n",
483-
" metadata_strs\n",
484-
" ]\n",
467+
" # Prepare data for MilvusClient insertion (include id field)\n",
468+
" data = []\n",
469+
" for i in range(len(texts)):\n",
470+
" data.append({\n",
471+
" \"id\": i, # Add required id field\n",
472+
" \"text\": texts[i],\n",
473+
" \"vector\": embeddings[i].tolist(),\n",
474+
" \"metadata\": json.dumps(metadata[i])\n",
475+
" })\n",
485476
" \n",
486-
" # Insert data\n",
487-
" mr = self.collection.insert(data)\n",
488-
" self.collection.flush()\n",
477+
" # Insert data using MilvusClient\n",
478+
" result = self.client.insert(\n",
479+
" collection_name=self.collection_name,\n",
480+
" data=data\n",
481+
" )\n",
489482
" \n",
490-
" # Load the collection after adding documents\n",
491-
" self.collection.load()\n",
492-
" logger.info(f\"Added {len(texts)} documents to collection and loaded it\")\n",
493-
" return mr\n",
483+
" logger.info(f\"Added {len(texts)} documents to collection\")\n",
484+
" return result\n",
494485
" \n",
495486
" def search(self, query_embedding: np.ndarray, top_k: int = 5) -> List[Dict[str, Any]]:\n",
496487
" \"\"\"\n",
497-
" Search for similar documents.\n",
488+
" Search for similar documents using MilvusClient.\n",
498489
" \n",
499490
" Args:\n",
500491
" query_embedding: Query embedding vector\n",
@@ -503,16 +494,16 @@
503494
" Returns:\n",
504495
" List of search results with text, metadata, and similarity scores\n",
505496
" \"\"\"\n",
506-
" # Ensure collection is loaded\n",
507-
" if self.collection is None:\n",
508-
" self.load_collection()\n",
509-
" \n",
510-
" search_params = {\"metric_type\": \"COSINE\", \"params\": {\"nprobe\": 10}}\n",
497+
" # Ensure collection exists\n",
498+
" if not self.client.has_collection(self.collection_name):\n",
499+
" raise ValueError(f\"Collection {self.collection_name} does not exist\")\n",
511500
" \n",
512-
" results = self.collection.search(\n",
501+
" # Use MilvusClient search method with proper vector field specification\n",
502+
" results = self.client.search(\n",
503+
" collection_name=self.collection_name,\n",
513504
" data=[query_embedding.tolist()],\n",
514-
" anns_field=\"embedding\",\n",
515-
" param=search_params,\n",
505+
" anns_field=\"vector\", # Specify the vector field name\n",
506+
" search_params={\"metric_type\": \"COSINE\", \"params\": {\"nprobe\": 10}},\n",
516507
" limit=top_k,\n",
517508
" output_fields=[\"text\", \"metadata\"]\n",
518509
" )\n",
@@ -521,13 +512,21 @@
521512
" formatted_results = []\n",
522513
" for hit in results[0]:\n",
523514
" formatted_results.append({\n",
524-
" \"text\": hit.entity.get(\"text\"),\n",
525-
" \"metadata\": json.loads(hit.entity.get(\"metadata\")),\n",
526-
" \"score\": hit.score,\n",
527-
" \"id\": hit.id\n",
515+
" \"text\": hit[\"text\"],\n",
516+
" \"metadata\": json.loads(hit[\"metadata\"]),\n",
517+
" \"score\": 1.0 - hit[\"distance\"], # Convert distance to similarity score for COSINE\n",
518+
" \"id\": hit[\"id\"]\n",
528519
" })\n",
529520
" \n",
530-
" return formatted_results"
521+
" return formatted_results\n",
522+
" \n",
523+
" def close(self) -> None:\n",
524+
" \"\"\"\n",
525+
" Close the MilvusClient connection.\n",
526+
" \"\"\"\n",
527+
" if hasattr(self, 'client'):\n",
528+
" self.client.close()\n",
529+
" logger.info(\"Closed MilvusClient connection\")"
531530
]
532531
},
533532
{
@@ -613,7 +612,7 @@
613612
" \n",
614613
" except Exception as e:\n",
615614
" logger.error(f\"Error generating response: {e}\")\n",
616-
" return f\"Error generating response: {str(e)}\""
615+
" raise e "
617616
]
618617
},
619618
{
@@ -656,7 +655,7 @@
656655
" )\n",
657656
" \n",
658657
" self.llm = ClaudeGenerator(\n",
659-
" api_key=config.anthropic_api_key,\n",
658+
" api_key=config.ANTHROPIC_API_KEY,\n",
660659
" model_name=config.model_name,\n",
661660
" max_tokens=config.max_tokens\n",
662661
" )\n",
@@ -716,9 +715,9 @@
716715
" logger.info(f\"Processing query: {question[:100]}...\")\n",
717716
" \n",
718717
" try:\n",
719-
" # Load collection if not already loaded\n",
720-
" if self.vector_store.collection is None:\n",
721-
" self.vector_store.load_collection()\n",
718+
" # Ensure collection exists\n",
719+
" if not self.vector_store.client.has_collection(self.vector_store.collection_name):\n",
720+
" raise ValueError(f\"Collection {self.vector_store.collection_name} does not exist. Please index documents first.\")\n",
722721
" \n",
723722
" # Generate query embedding\n",
724723
" query_embedding = self.embedding_generator.embed_text(question)\n",
@@ -758,20 +757,28 @@
758757
" \n",
759758
" def get_collection_stats(self) -> Dict[str, Any]:\n",
760759
" \"\"\"\n",
761-
" Get statistics about the current collection.\n",
760+
" Get statistics about the current collection using MilvusClient.\n",
762761
" \n",
763762
" Returns:\n",
764763
" Dictionary with collection statistics\n",
765764
" \"\"\"\n",
766765
" try:\n",
767-
" if self.vector_store.collection is None:\n",
768-
" self.vector_store.load_collection()\n",
766+
" # Check if collection exists\n",
767+
" if not self.vector_store.client.has_collection(self.vector_store.collection_name):\n",
768+
" return {\n",
769+
" \"error\": f\"Collection {self.vector_store.collection_name} does not exist. Please index documents first.\"\n",
770+
" }\n",
769771
" \n",
772+
" # Get collection statistics using MilvusClient\n",
773+
" collection_info = self.vector_store.client.describe_collection(self.vector_store.collection_name)\n",
774+
" \n",
775+
" # Get entity count - this might not be available in MilvusClient, so we'll provide what we can\n",
770776
" stats = {\n",
771777
" \"collection_name\": self.config.collection_name,\n",
772-
" \"num_entities\": self.vector_store.collection.num_entities,\n",
773778
" \"embedding_dim\": self.embedding_generator.embedding_dim,\n",
774-
" \"embedding_model\": self.config.embedding_model\n",
779+
" \"embedding_model\": self.config.embedding_model,\n",
780+
" \"collection_exists\": True,\n",
781+
" \"schema\": collection_info if collection_info else \"Schema information not available\"\n",
775782
" }\n",
776783
" \n",
777784
" return stats\n",
@@ -966,8 +973,14 @@
966973
"def cleanup_resources() -> None:\n",
967974
" \"\"\"Clean up resources and connections.\"\"\"\n",
968975
" try:\n",
969-
" connections.disconnect(\"default\")\n",
970-
" print(\"Disconnected from Milvus\")\n",
976+
" # Close MilvusClient connections if they exist\n",
977+
" if 'rag' in globals() and hasattr(rag.vector_store, 'client'):\n",
978+
" rag.vector_store.close()\n",
979+
" \n",
980+
" if 'pdf_rag' in globals() and hasattr(pdf_rag.vector_store, 'client'):\n",
981+
" pdf_rag.vector_store.close()\n",
982+
" \n",
983+
" print(\"Closed MilvusClient connections\")\n",
971984
" except Exception as e:\n",
972985
" print(f\"Error during cleanup: {e}\")\n",
973986
"\n",

0 commit comments

Comments
 (0)