Version: 0.1.0 Last Updated: 2025-10-08
RAG-Templates provides a unified, standardized API across all 6 production RAG pipelines. All pipelines implement the same core methods with consistent signatures and response formats, ensuring 100% compatibility with LangChain and RAGAS evaluation frameworks.
from iris_vector_rag import create_pipeline
# Create any pipeline with validation
pipeline = create_pipeline("basic", validate_requirements=True)
# Load documents
result = pipeline.load_documents(documents=[...])
# Query
result = pipeline.query("What is machine learning?", top_k=5)Factory function to create RAG pipeline instances with automatic validation.
Signature:
def create_pipeline(
pipeline_type: str,
config_path: Optional[str] = None,
llm_func: Optional[Callable[[str], str]] = None,
embedding_func: Optional[Callable[[List[str]], List[List[float]]]] = None,
external_connection = None,
validate_requirements: bool = True,
auto_setup: bool = False,
**kwargs
) -> RAGPipelineParameters:
pipeline_type(str): Pipeline type to create"basic"- BasicRAG with vector similarity"basic_rerank"- BasicRAG + cross-encoder reranking"crag"- Corrective RAG with self-evaluation"graphrag"- HybridGraphRAG (vector + text + graph)"pylate_colbert"- ColBERT late interaction
config_path(str, optional): Path to configuration filellm_func(callable, optional): LLM function for answer generationembedding_func(callable, optional): Embedding function for vectorsexternal_connection(optional): Existing database connectionvalidate_requirements(bool): Validate pipeline requirements (default: True)auto_setup(bool): Auto-fix validation issues (default: False)**kwargs: Additional pipeline-specific parameters
Returns: RAGPipeline instance
Raises:
ValueError: If pipeline_type is unknownPipelineValidationError: If validation fails and auto_setup is False
Example:
from iris_vector_rag import create_pipeline
# With validation
pipeline = create_pipeline("basic", validate_requirements=True)
# Without validation (faster, for testing)
pipeline = create_pipeline("basic", validate_requirements=False)
# With auto-setup (fixes missing tables, etc.)
pipeline = create_pipeline("crag", validate_requirements=True, auto_setup=True)Validate pipeline requirements without creating an instance.
Signature:
def validate_pipeline(
pipeline_type: str,
config_path: Optional[str] = None,
external_connection = None
) -> Dict[str, Any]Returns: Validation results dictionary with detailed status
Example:
from iris_vector_rag import validate_pipeline
status = validate_pipeline("graphrag")
print(f"Valid: {status['is_valid']}")
print(f"Issues: {status['issues']}")Set up all requirements for a pipeline type.
Signature:
def setup_pipeline(
pipeline_type: str,
config_path: Optional[str] = None,
external_connection = None
) -> Dict[str, Any]Returns: Setup results dictionary
Example:
from iris_vector_rag import setup_pipeline
result = setup_pipeline("basic")
print(f"Setup complete: {result['success']}")All pipeline classes implement these core methods with identical signatures.
Load documents into the pipeline for retrieval.
Signature:
def load_documents(
self,
documents: Optional[List[Document]] = None,
documents_path: Optional[str] = None,
**kwargs
) -> Dict[str, Any]Parameters:
documents(List[Document], optional): List of Document objects to loaddocuments_path(str, optional): Path to documents file (JSON)**kwargs: Pipeline-specific parameters
Returns:
{
"documents_loaded": int, # Number successfully loaded
"embeddings_generated": int, # Number of embeddings created
"documents_failed": int # Number that failed to load
}Validation:
- Requires either
documentsordocuments_path(not both None) - Rejects empty document lists with actionable error message
- Validates Document objects have required fields
Example:
from iris_rag.core.models import Document
# Option 1: From Document objects
docs = [
Document(
page_content="Python is a programming language...",
metadata={"source": "intro.txt", "author": "John"}
),
Document(
page_content="Machine learning uses algorithms...",
metadata={"source": "ml.txt", "topic": "AI"}
)
]
result = pipeline.load_documents(documents=docs)
print(f"Loaded {result['documents_loaded']} documents")
# Option 2: From file
result = pipeline.load_documents(documents_path="data/docs.json")Execute a RAG query with document retrieval and optional answer generation.
Signature:
def query(
self,
query: str,
top_k: int = 5,
generate_answer: bool = True,
include_sources: bool = True,
**kwargs
) -> Dict[str, Any]Parameters:
query(str): The query text (required, cannot be empty)top_k(int): Number of documents to return, range [1-100] (default: 5)generate_answer(bool): Generate LLM answer (default: True)include_sources(bool): Include source metadata (default: True)**kwargs: Pipeline-specific parameters
Returns:
{
"query": str, # Original query
"answer": str | None, # LLM-generated answer
"retrieved_documents": List[Document], # LangChain Document objects
"contexts": List[str], # RAGAS-compatible contexts
"sources": List[Dict], # Source references
"execution_time": float, # Query execution time
"metadata": {
"num_retrieved": int, # Documents retrieved
"processing_time": float,
"pipeline_type": str, # Pipeline identifier
"retrieval_method": str, # Retrieval strategy used
"context_count": int, # Number of contexts
"sources": List[Dict], # Also in metadata
# Pipeline-specific fields...
}
}Validation:
- Query cannot be empty or whitespace-only
- top_k must be between 1 and 100 (inclusive)
- Raises
ValueErrorwith 5-part error message on validation failure
Error Message Format:
Error: <what went wrong>
Context: <where it happened>
Expected: <what was expected>
Actual: <what was received>
Fix: <how to fix it>
Example:
# Basic query
result = pipeline.query("What is diabetes?", top_k=5)
print(result["answer"])
# Without answer generation (retrieval only)
result = pipeline.query(
"diabetes symptoms",
top_k=10,
generate_answer=False
)
# Access retrieved documents (LangChain compatible)
for doc in result["retrieved_documents"]:
print(f"Source: {doc.metadata.get('source')}")
print(f"Content: {doc.page_content[:100]}...")
# Access contexts (RAGAS compatible)
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy
evaluation = evaluate(
dataset={"contexts": result["contexts"], "answer": result["answer"]},
metrics=[faithfulness, answer_relevancy]
)Get information about the pipeline's configuration.
Signature:
def get_pipeline_info(self) -> Dict[str, Any]Returns: Dictionary with pipeline configuration details
Example:
info = pipeline.get_pipeline_info()
print(f"Type: {info['pipeline_type']}")
print(f"Config: {info}")Additional metadata fields:
result["metadata"]["reranked"] # bool: Whether reranking was applied
result["metadata"]["initial_candidates"] # int: Initial retrieval count
result["metadata"]["rerank_factor"] # int: Reranking multiplierConfiguration:
pipeline = create_pipeline("basic_rerank")
# Retrieves rerank_factor * top_k documents, reranks, returns top_kAdditional metadata fields:
result["metadata"]["evaluation_score"] # float: Relevance evaluation
result["metadata"]["corrected"] # bool: Whether correction appliedPipeline-specific query parameters:
result = pipeline.query(
query_text="cancer targets",
method="rrf", # "rrf", "hybrid", "vector", "text", "graph"
vector_k=30, # Documents from vector search
text_k=30, # Documents from text search
graph_k=20, # Documents from graph traversal
top_k=15 # Final result count after fusion
)Additional metadata fields:
result["metadata"]["fusion_method"] # str: Fusion strategy used
result["metadata"]["vector_score"] # float: Vector search contribution
result["metadata"]["text_score"] # float: Text search contribution
result["metadata"]["graph_score"] # float: Graph traversal contributionAdditional metadata fields:
result["metadata"]["native_reranking"] # bool: PyLate reranking used
result["metadata"]["model_name"] # str: ColBERT model identifierStandard document object used across all pipelines.
from iris_rag.core.models import Document
doc = Document(
page_content="The text content of the document...",
metadata={
"source": "filename.txt",
"author": "John Doe",
"date": "2024-01-01",
# Any custom fields...
}
)Fields:
page_content(str): The main text contentmetadata(dict): Dictionary of metadata fields
LangChain Compatibility: This is the standard LangChain Document class, ensuring 100% compatibility.
All pipelines use consistent validation with actionable error messages:
try:
result = pipeline.query("", top_k=5)
except ValueError as e:
print(str(e))
# Output:
# Error: Query parameter is required and cannot be empty
# Context: BasicRAG pipeline query operation
# Expected: Non-empty query string
# Actual: Empty or whitespace-only string
# Fix: Provide a valid query string, e.g., query='What is diabetes?'Empty Query:
pipeline.query("") # Raises ValueErrorInvalid top_k:
pipeline.query("test", top_k=0) # Raises ValueError (< 1)
pipeline.query("test", top_k=101) # Raises ValueError (> 100)Empty Document List:
pipeline.load_documents(documents=[]) # Raises ValueErrorMissing Required Parameter:
pipeline.load_documents() # Raises ValueError (need documents or documents_path)Located at iris_rag/config/default_config.yaml:
database:
db_host: localhost
db_port: 1972
db_namespace: USER
embedding_model:
name: sentence-transformers/all-MiniLM-L6-v2
dimension: 384
pipelines:
basic:
chunk_size: 1000
chunk_overlap: 200
basic_reranking:
rerank_factor: 2
reranker_model: cross-encoder/ms-marco-MiniLM-L-6-v2
crag:
relevance_threshold: 0.5Override configuration with environment variables:
export IRIS_HOST=localhost
export IRIS_PORT=1972
export IRIS_USERNAME=SuperUser
export IRIS_PASSWORD=SYS
export OPENAI_API_KEY=your-key-hereValidate API contracts without database:
pytest tests/contract/ -vFull end-to-end with live database:
pytest tests/e2e/ -vfrom iris_vector_rag import create_pipeline
from iris_rag.core.models import Document
def test_basic_workflow():
# Create pipeline
pipeline = create_pipeline("basic", validate_requirements=False)
# Load documents
docs = [Document(page_content="Test content", metadata={"source": "test"})]
result = pipeline.load_documents(documents=docs)
assert result["documents_loaded"] == 1
# Query
result = pipeline.query("test", top_k=1)
assert "answer" in result
assert len(result["retrieved_documents"]) <= 1# OLD - Inconsistent signatures
pipeline.query(query_text="test", k=5) # CRAG used query_text
pipeline.load_documents("path/to/docs") # Only file paths
result["num_retrieved"] # Inconsistent metadata# NEW - Consistent across all pipelines
pipeline.query(query="test", top_k=5) # Unified signature
pipeline.load_documents(documents=[...]) # Supports both lists and paths
result["metadata"]["num_retrieved"] # Standardized structure-
Always use validation in production:
pipeline = create_pipeline("basic", validate_requirements=True)
-
Handle errors with specific exceptions:
try: result = pipeline.query(user_input, top_k=5) except ValueError as e: logger.error(f"Query validation failed: {e}")
-
Use Document objects for metadata preservation:
docs = [Document(page_content=text, metadata={"source": file})] pipeline.load_documents(documents=docs)
-
Access results with framework-specific interfaces:
# For LangChain documents = result["retrieved_documents"] # For RAGAS contexts = result["contexts"] answer = result["answer"]
- Documentation: docs/
- Examples: scripts/
- Issues: GitHub Issues
- Testing: TEST_VALIDATION_SUMMARY.md