Add full page context expansion with neighboring page support for pag…#338
Add full page context expansion with neighboring page support for pag…#338nv-pranjald wants to merge 2 commits intodevelopfrom
Conversation
da27ea4 to
c18624c
Compare
|
@CodeRabbit review |
✅ Actions performedReview triggered.
|
|
Warning
|
| Cohort / File(s) | Summary |
|---|---|
Configuration & Environment deploy/compose/docker-compose-rag-server.yaml, src/nvidia_rag/utils/configuration.py |
Added APP_FETCH_FULL_PAGE_CONTEXT and APP_FETCH_NEIGHBORING_PAGES environment variables with field validators enforcing that neighboring-pages expansion requires full-page context. |
RAG Pipeline Core src/nvidia_rag/rag_server/main.py |
Major additions to support page-context expansion: new parameters propagated through generate() and _rag_chain(), helper methods for page extraction/expansion/formatting, VDB re-fetching of missing chunks, page-aware context formatting, and extensive logging for retrieved pages and context structure. |
API & Server Layer src/nvidia_rag/rag_server/server.py |
Added Prompt model fields fetch_full_page_context and fetch_neighboring_pages with CONFIG defaults, threaded through /generate endpoint to backend RAG.generate() call. |
VLM Integration src/nvidia_rag/rag_server/vlm.py |
Added organize_by_page mode to extract_and_process_messages and stream_with_messages to interleave text/images per page; new helpers for building page-organized content_parts, extracting images, and logging content structure. |
Vector Database Layer src/nvidia_rag/utils/vdb/elasticsearch/es_queries.py, src/nvidia_rag/utils/vdb/elasticsearch/elastic_vdb.py, src/nvidia_rag/utils/vdb/milvus/milvus_vdb.py, src/nvidia_rag/utils/vdb/vdb_base.py |
Added abstract retrieve_chunks_by_filter() method and implementations in Elasticsearch and Milvus backends to fetch chunks by source name and page numbers; added Elasticsearch query builder for filtering by source and pages. |
Unit Tests tests/unit/test_rag_server/test_page_context_organization.py, tests/unit/test_utils/test_configuration.py |
Comprehensive test coverage for page extraction, expansion, formatting helpers; validation tests for configuration cross-field rules. |
Sequence Diagram
sequenceDiagram
actor Client
participant Server
participant RAG as RAG Pipeline
participant VDB as Vector Database
participant VLM as Vision Language Model
Client->>Server: POST /generate (fetch_full_page_context, fetch_neighboring_pages)
Server->>RAG: generate(prompt, fetch_full_page_context, fetch_neighboring_pages)
RAG->>VDB: semantic_search (initial retrieval)
VDB-->>RAG: initial_documents
RAG->>RAG: _extract_page_set_from_docs()
RAG->>RAG: _expand_page_set_with_neighbors()
RAG->>VDB: retrieve_chunks_by_filter(source, expanded_pages)
VDB-->>RAG: full_page_chunks
RAG->>RAG: _expand_and_organize_context()
RAG->>RAG: _format_context_by_page() or existing formatter
RAG->>VLM: stream_with_messages(docs, organize_by_page)
VLM->>VLM: _build_content_parts_by_page()
VLM->>VLM: _extract_images_from_docs()
VLM-->>Client: streaming response
Estimated code review effort
🎯 4 (Complex) | ⏱️ ~65 minutes
Possibly related PRs
- Integrate nemotron-nano-12b-v2-vl VLM with RAG #73: Modifies VLM message-streaming integration (extract_and_process_messages, stream_with_messages signatures) used by the RAG generate/_rag_chain paths, creating a code-level dependency with this PR's VLM changes.
Suggested reviewers
- smasurekar
- nv-nikkulkarni
- shubhadeepd
Poem
🐰 A rabbit's ode to expanding pages:
Hops through chapters, page by page,
Neighboring neighbors join the stage,
Full context gathered, no chunk amiss,
VLM vision arranged like this—
Long-eared logic, wisely sage! 📖✨
🚥 Pre-merge checks | ✅ 3 | ❌ 1
❌ Failed checks (1 warning)
| Check name | Status | Explanation | Resolution |
|---|---|---|---|
| Merge Conflict Detection | ❌ Merge conflicts detected (34 files): ⚔️ .github/workflows/ci-pipeline.yml (content)⚔️ README.md (content)⚔️ deploy/compose/docker-compose-rag-server.yaml (content)⚔️ docs/api-ingestor.md (content)⚔️ docs/api-rag.md (content)⚔️ docs/assets/arch_diagram.png (content)⚔️ docs/change-model.md (content)⚔️ docs/deploy-docker-self-hosted.md (content)⚔️ docs/deploy-helm.md (content)⚔️ docs/mig-deployment.md (content)⚔️ docs/multi-collection-retrieval.md (content)⚔️ docs/observability.md (content)⚔️ docs/python-client.md (content)⚔️ docs/release-notes.md (content)⚔️ docs/support-matrix.md (content)⚔️ docs/text_only_ingest.md (content)⚔️ docs/troubleshooting.md (content)⚔️ docs/user-interface.md (content)⚔️ examples/nvidia_rag_mcp/mcp_server.py (content)⚔️ examples/rag_react_agent/pyproject.toml (content)⚔️ examples/rag_react_agent/uv.lock (content)⚔️ notebooks/launchable.ipynb (content)⚔️ notebooks/nat_mcp_integration.ipynb (content)⚔️ src/nvidia_rag/rag_server/main.py (content)⚔️ src/nvidia_rag/rag_server/server.py (content)⚔️ src/nvidia_rag/rag_server/vlm.py (content)⚔️ src/nvidia_rag/utils/configuration.py (content)⚔️ src/nvidia_rag/utils/vdb/elasticsearch/elastic_vdb.py (content)⚔️ src/nvidia_rag/utils/vdb/elasticsearch/es_queries.py (content)⚔️ src/nvidia_rag/utils/vdb/milvus/milvus_vdb.py (content)⚔️ src/nvidia_rag/utils/vdb/vdb_base.py (content)⚔️ tests/integration/README.md (content)⚔️ tests/unit/test_utils/test_configuration.py (content)⚔️ uv.lock (content)These conflicts must be resolved before merging into develop. |
Resolve conflicts locally and push changes to this branch. |
✅ Passed checks (3 passed)
| Check name | Status | Explanation |
|---|---|---|
| Description Check | ✅ Passed | Check skipped - CodeRabbit’s high-level summary is enabled. |
| Title check | ✅ Passed | The title clearly and specifically describes the main enhancement: adding full page context expansion with neighboring page support for page-aware retrieval. |
| Docstring Coverage | ✅ Passed | Docstring coverage is 86.27% which is sufficient. The required threshold is 80.00%. |
✏️ Tip: You can configure your own custom pre-merge checks in the settings.
✨ Finishing touches
- 📝 Generate docstrings
🧪 Generate unit tests (beta)
- Create PR with unit tests
- Post copyable unit tests in a comment
- Commit unit tests in branch
dev/pranjald/page-context-expansion
⚔️ Resolve merge conflicts (beta)
- Auto-commit resolved conflicts to branch
dev/pranjald/page-context-expansion - Create stacked PR with resolved conflicts
- Post resolved changes as copyable diffs in a comment
Tip
Issue Planner is now in beta. Read the docs and try it out! Share your feedback on Discord.
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.
Comment @coderabbitai help to get the list of available commands and usage tips.
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/nvidia_rag/rag_server/vlm.py (1)
706-717:⚠️ Potential issue | 🟠 Major
analyze_with_messageslacks theorganize_by_pageparameter present instream_with_messages.
stream_with_messages(line 753) accepts and passesorganize_by_pagetoextract_and_process_messages, butanalyze_with_messages(line 657) does not have this parameter at all. Sinceextract_and_process_messagessupportsorganize_by_page, the non-streaming VLM path cannot utilize per-page organization regardless of caller intent.Should
analyze_with_messagesalso accept and forward theorganize_by_pageparameter for API consistency?src/nvidia_rag/rag_server/server.py (1)
1355-1393:⚠️ Potential issue | 🟡 MinorNew fields missing from
request_datalogging dict.All other
Promptfields are logged inrequest_data, butfetch_full_page_contextandfetch_neighboring_pagesare omitted. This makes debugging page-context issues harder.Proposed fix
"filter_expr": prompt.filter_expr, "confidence_threshold": prompt.confidence_threshold, + "fetch_full_page_context": prompt.fetch_full_page_context, + "fetch_neighboring_pages": prompt.fetch_neighboring_pages, }
🤖 Fix all issues with AI agents
In `@src/nvidia_rag/rag_server/main.py`:
- Around line 3468-3492: The loop computing grouped keys contains an unused
`filename` variable; replace that variable with `_` (or remove its assignment)
so only the intended loop variables `(filename, source_path, page_num, doc)` are
used for grouping, and remove the redundant filename recomputation earlier to
avoid confusion. In the `keys_sorted` iteration use the computed `filename` only
once when creating `marker = f"=== Page {page_num} ({filename}) ===\n"` (keep
the existing os.path.splitext/os.path.basename expression there) and delete the
earlier unused `filename` assignment. For the `no_page` branch, append a single
combined string (e.g., `"=== Additional context ===\n" +
"\n\n".join(format_fn(d) for d in no_page)`) instead of two separate parts so
formatting matches the `has_page` path and prevents an extra "\n\n" from
appearing when joining `parts`. Ensure references to `grouped`, `keys_sorted`,
`format_fn`, `marker`, and `no_page` are updated accordingly.
- Around line 3365-3440: The _expand_and_organize_context method currently
performs synchronous VDB network calls (vdb_op.retrieve_chunks_by_filter) inside
the async RAG path which will block the event loop; update this by removing the
unused collection_names parameter from the signature, and change the fetch loop
to run retrieve_chunks_by_filter on a worker thread (use asyncio.to_thread or
submit to ThreadPoolExecutor) and schedule the per-(coll,source) fetches in
parallel (gather the tasks and then merge results), and replace the misleading
hasattr(vdb_op, "retrieve_chunks_by_filter") guard with a direct call wrapped in
try/except that explicitly catches NotImplementedError and logs warnings; keep
the existing dedup logic (doc_key, seen, merged) but perform deduping after
collected parallel fetch results before returning merged.
In `@src/nvidia_rag/rag_server/server.py`:
- Around line 583-593: Add a cross-field `@model_validator` to the Prompt pydantic
model that enforces the same rule as validate_page_context_options: if
fetch_neighboring_pages > 0 then fetch_full_page_context must be True; otherwise
raise a ValidationError (or ValueError) with a clear message. Place the
validator inside the Prompt class near the existing field definitions for
fetch_full_page_context and fetch_neighboring_pages and name/reference it
similarly (e.g., validate_page_context_options) so API inputs like
fetch_neighboring_pages=5, fetch_full_page_context=False are rejected.
In `@src/nvidia_rag/rag_server/vlm.py`:
- Around line 400-414: The method _build_content_parts_by_page currently ignores
the textual_context parameter and assigns q but never uses it; change the
human_template.format call (and any later question_text usage inside
_build_content_parts_by_page) to use the provided textual_context and the
trimmed q variable (q = (question_text or "").strip()) instead of empty strings
or raw question_text, so the preformatted context and normalized question are
respected; if you decide not to use textual_context here, remove the unused
parameter and q, otherwise update the caller to supply the textual_context
argument so the method can include it when building the intro and any question
formatting.
In `@src/nvidia_rag/utils/vdb/milvus/milvus_vdb.py`:
- Around line 1206-1221: The Milvus query builds filter_expr with substring
matching (like "%{source_name}%") and interpolates source_name raw, causing
inconsistent behavior with the Elasticsearch implementation (which uses exact
term match) and risk of syntax-breaking characters; update the code that
constructs filter_expr in milvus_vdb.py (the filter_expr variable used with
MilvusClient.query) to perform an exact equality check on source["source_name"]
(to mirror es_queries.py's term on metadata.source.source_name.keyword) and
properly escape/quote source_name before interpolation (e.g., ensure internal
quotes/backslashes are escaped and the value is wrapped in quotes) so the filter
string is safe and semantics match across backends.
In `@tests/unit/test_rag_server/test_page_context_organization.py`:
- Around line 198-227: The test test_fetch_full_page_context_calls_vdb should
use call_args.kwargs for clarity and strengthen the final assertion: replace
accessing mock_vdb.retrieve_chunks_by_filter.call_args[1] with
mock_vdb.retrieve_chunks_by_filter.call_args.kwargs to explicitly read keyword
arguments, and change the weak assertion assert len(result) >= 1 to assert
len(result) == 1 because the mock returns an empty list and deduplication in
rag._expand_and_organize_context should leave only the original document.
🧹 Nitpick comments (9)
src/nvidia_rag/utils/configuration.py (1)
773-801: Consider an upper bound forfetch_neighboring_pages.The validator ensures non-negative values, but there's no upper bound. A very large value (e.g., 1000) would cause the system to attempt fetching thousands of pages per retrieved chunk, which could degrade performance or overwhelm the VDB. A reasonable cap (e.g., 10 or 20) would provide a safety net.
💡 Optional: Add an upper bound
`@field_validator`("fetch_neighboring_pages") `@classmethod` def validate_fetch_neighboring_pages(cls, v: int) -> int: if not isinstance(v, int) or isinstance(v, bool): raise TypeError( f"fetch_neighboring_pages must be an integer, got {type(v).__name__}" ) if v < 0: raise ValueError( f"fetch_neighboring_pages must be >= 0, got {v}" ) + if v > 20: + raise ValueError( + f"fetch_neighboring_pages must be <= 20, got {v}" + ) return vsrc/nvidia_rag/utils/vdb/milvus/milvus_vdb.py (1)
1226-1235: Entity-to-Document conversion is duplicated withretrieval_image_langchain.Lines 1226-1233 are nearly identical to the conversion loop in
retrieval_image_langchain(lines 1180-1187). Consider extracting a shared helper method.♻️ Optional: Extract helper
+ `@staticmethod` + def _entities_to_documents(entities: list[dict]) -> list[Document]: + """Convert Milvus entities to LangChain Document objects.""" + docs: list[Document] = [] + for item in entities: + page_content = item.get("text") or item.get("chunk") or "" + metadata = { + "source": item.get("source"), + "content_metadata": item.get("content_metadata", {}), + } + docs.append(Document(page_content=page_content, metadata=metadata)) + return docsThen use it in both methods:
docs = self._entities_to_documents(entities) return self._add_collection_name_to_retreived_docs(docs, collection_name)src/nvidia_rag/rag_server/vlm.py (2)
376-397: Silent exception swallowing hinders debugging.The bare
except Exception: continueat line 396 silently discards all errors during image extraction (e.g., MinIO connection failures, malformed payloads). Adding a debug-level log would help diagnose issues in production without cluttering normal output.💡 Suggested improvement
- except Exception: + except Exception as e: + logger.debug("Skipping image extraction for doc: %s", e) continue
365-370: Nested ternary expression is hard to read.The
source_idextraction has a confusing doubleisinstancecheck:source_id = ( source_meta.get("source_id", "") or (source_meta.get("source_name", "") if isinstance(source_meta, dict) else "") if isinstance(source_meta, dict) else "" )The inner
isinstance(source_meta, dict)check is redundant since the outer one already guards the entire expression.♻️ Simplified version
- source_id = ( - source_meta.get("source_id", "") - or (source_meta.get("source_name", "") if isinstance(source_meta, dict) else "") - if isinstance(source_meta, dict) - else "" - ) + source_id = ( + (source_meta.get("source_id", "") or source_meta.get("source_name", "")) + if isinstance(source_meta, dict) + else "" + )tests/unit/test_utils/test_configuration.py (1)
847-862: Good validation coverage. Consider adding a positive test case.The negative validation paths are well tested. Consider adding a test for the valid configuration to ensure it doesn't raise:
💡 Suggested additional test
def test_fetch_full_page_context_with_neighboring_pages_valid(self): """Test that fetch_neighboring_pages > 0 with fetch_full_page_context=True is valid.""" config = RetrieverConfig( fetch_full_page_context=True, fetch_neighboring_pages=2, ) assert config.fetch_full_page_context is True assert config.fetch_neighboring_pages == 2src/nvidia_rag/utils/vdb/elasticsearch/elastic_vdb.py (1)
952-996: LGTM — solid implementation matching the base class contract.The method correctly handles empty
page_numbers, caps the result size, and gracefully returns an empty list on errors. The document construction mirrors existing patterns in the class.Two observations worth noting:
Source matching inconsistency across backends: This ES implementation uses exact keyword term matching (
term: {"metadata.source.source_name.keyword": source_name}), while the Milvus implementation uses substring matching (like "%{source_name}%"). These have different semantics—ES will only match exact source names, while Milvus will match source names containing the string. If source naming conventions differ across deployments, this could yield different retrieval results. Consider documenting whether this behavioral difference is intentional.Static analysis:
except Exception(line 974) is broad; consider catching specific Elasticsearch exceptions if feasible. Also,logger.erroron line 975 could belogger.exceptionto include the traceback automatically.src/nvidia_rag/rag_server/main.py (3)
3389-3401: Dedup key truncates content at 300 chars — potential for false merges.If two distinct chunks on the same page share the first 300 characters (e.g., repeated headers, table rows, or structured data), one will be silently dropped. Consider using a hash of the full content instead:
♻️ Safer dedup key
+ import hashlib + def doc_key(d: Document) -> tuple[str, str, int, str]: meta = getattr(d, "metadata", {}) or {} content_md = meta.get("content_metadata", {}) or {} source = meta.get("source", {}) source_path = ( source.get("source_name", "") if isinstance(source, dict) else source ) coll = meta.get("collection_name", "") page_num = content_md.get("page_number", 0) - content_preview = (getattr(d, "page_content", "") or "")[:300] - return (str(coll), str(source_path), int(page_num), content_preview) + content_hash = hashlib.md5( + (getattr(d, "page_content", "") or "").encode() + ).hexdigest() + return (str(coll), str(source_path), int(page_num), content_hash)
3293-3319: Unusedmax_charsparameter.
max_chars(line 3297) is declared but never referenced in the method body. Remove it to keep the signature clean.♻️ Proposed fix
def _log_context_structure( self, context_str: str, prefix: str = "Context structure", - max_chars: int = 60, ) -> None:
3217-3291: Consider extracting shared metadata extraction logic.Both
_log_retrieved_pagesand_log_expanded_context_layout(as well as_extract_page_set_from_docs,_format_context_by_page, anddoc_keyinside_expand_and_organize_context) repeat the same metadata extraction pattern — pullingcontent_metadata.page_numberandsource.source_namefrom nesteddoc.metadata. A small shared helper (e.g.,_extract_doc_page_info(doc) -> tuple[str, str | None, int | None]) would reduce duplication across these five call sites.
| def _expand_and_organize_context( | ||
| self, | ||
| docs: list[Document], | ||
| vdb_op: VDBRag, | ||
| collection_names: list[str], | ||
| fetch_full_page_context: bool, | ||
| fetch_neighboring_pages: int, | ||
| ) -> list[Document]: | ||
| """Expand context with full page chunks and/or neighboring pages, then return merged docs. | ||
|
|
||
| When no docs have page_number (e.g., text files), returns reranker top-k unchanged. | ||
| """ | ||
| page_set = self._extract_page_set_from_docs(docs) | ||
| if not page_set: | ||
| return docs # Text files, no page metadata: return reranker top-k as-is | ||
|
|
||
| if fetch_neighboring_pages > 0: | ||
| page_set = self._expand_page_set_with_neighbors( | ||
| page_set, fetch_neighboring_pages | ||
| ) | ||
|
|
||
| if not fetch_full_page_context: | ||
| return docs | ||
|
|
||
| seen: set[tuple[str, str, int, str]] = set() | ||
|
|
||
| def doc_key(d: Document) -> tuple[str, str, int, str]: | ||
| meta = getattr(d, "metadata", {}) or {} | ||
| content_md = meta.get("content_metadata", {}) or {} | ||
| source = meta.get("source", {}) | ||
| source_path = ( | ||
| source.get("source_name", "") if isinstance(source, dict) else source | ||
| ) | ||
| coll = meta.get("collection_name", "") | ||
| page_num = content_md.get("page_number", 0) | ||
| content_preview = (getattr(d, "page_content", "") or "")[:300] | ||
| return (str(coll), str(source_path), int(page_num), content_preview) | ||
|
|
||
| merged: list[Document] = [] | ||
| for d in docs: | ||
| k = doc_key(d) | ||
| if k not in seen: | ||
| seen.add(k) | ||
| merged.append(d) | ||
|
|
||
| pages_by_coll_source: dict[tuple[str, str], set[int]] = {} | ||
| for coll, source, page in page_set: | ||
| key = (coll, source) | ||
| if key not in pages_by_coll_source: | ||
| pages_by_coll_source[key] = set() | ||
| pages_by_coll_source[key].add(page) | ||
|
|
||
| for (coll, source), pages in pages_by_coll_source.items(): | ||
| if not hasattr(vdb_op, "retrieve_chunks_by_filter"): | ||
| continue | ||
| try: | ||
| fetched = vdb_op.retrieve_chunks_by_filter( | ||
| collection_name=coll, | ||
| source_name=source, | ||
| page_numbers=sorted(pages), | ||
| limit=1000, | ||
| ) | ||
| for d in fetched: | ||
| k = doc_key(d) | ||
| if k not in seen: | ||
| seen.add(k) | ||
| merged.append(d) | ||
| except Exception as e: | ||
| logger.warning( | ||
| "Failed to fetch chunks for source=%s pages=%s: %s", | ||
| source, | ||
| list(pages), | ||
| e, | ||
| ) | ||
|
|
||
| return merged |
There was a problem hiding this comment.
Synchronous VDB calls will block the async event loop.
retrieve_chunks_by_filter performs synchronous network I/O (HTTP calls to Elasticsearch/Milvus). This method is called directly from the async _rag_chain, blocking the event loop for the duration of each fetch. The rest of the codebase wraps similar VDB calls in ThreadPoolExecutor (e.g., lines 2550–2568). With multiple (coll, source) groups, latency compounds serially.
Consider wrapping the loop in a ThreadPoolExecutor (or asyncio.to_thread) to avoid blocking the event loop:
♻️ Suggested parallelization
+ from concurrent.futures import ThreadPoolExecutor
+
for (coll, source), pages in pages_by_coll_source.items():
if not hasattr(vdb_op, "retrieve_chunks_by_filter"):
continue
try:
- fetched = vdb_op.retrieve_chunks_by_filter(
- collection_name=coll,
- source_name=source,
- page_numbers=sorted(pages),
- limit=1000,
- )
- for d in fetched:
- k = doc_key(d)
- if k not in seen:
- seen.add(k)
- merged.append(d)
+ with ThreadPoolExecutor() as executor:
+ futures = {
+ executor.submit(
+ vdb_op.retrieve_chunks_by_filter,
+ collection_name=coll_src[0],
+ source_name=coll_src[1],
+ page_numbers=sorted(pg_set),
+ limit=1000,
+ ): coll_src
+ for coll_src, pg_set in pages_by_coll_source.items()
+ }
+ for future in futures:
+ try:
+ fetched = future.result()
+ for d in fetched:
+ k = doc_key(d)
+ if k not in seen:
+ seen.add(k)
+ merged.append(d)
+ except Exception as e:
+ coll_src = futures[future]
+ logger.warning(
+ "Failed to fetch chunks for source=%s pages=%s: %s",
+ coll_src[1], list(pages_by_coll_source[coll_src]), e,
+ )Additionally:
collection_namesparameter is unused — remove it from the signature to avoid confusion (confirmed by static analysis).hasattr(vdb_op, "retrieve_chunks_by_filter")is always True forVDBRagsubclasses since the base class defines it (raisingNotImplementedError). Theexcept ExceptioncatchesNotImplementedErroras a fallback, but thehasattrguard is misleading. Consider catchingNotImplementedErrorexplicitly instead.
🧰 Tools
🪛 Ruff (0.15.0)
[warning] 3369-3369: Unused method argument: collection_names
(ARG002)
[warning] 3432-3432: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In `@src/nvidia_rag/rag_server/main.py` around lines 3365 - 3440, The
_expand_and_organize_context method currently performs synchronous VDB network
calls (vdb_op.retrieve_chunks_by_filter) inside the async RAG path which will
block the event loop; update this by removing the unused collection_names
parameter from the signature, and change the fetch loop to run
retrieve_chunks_by_filter on a worker thread (use asyncio.to_thread or submit to
ThreadPoolExecutor) and schedule the per-(coll,source) fetches in parallel
(gather the tasks and then merge results), and replace the misleading
hasattr(vdb_op, "retrieve_chunks_by_filter") guard with a direct call wrapped in
try/except that explicitly catches NotImplementedError and logs warnings; keep
the existing dedup logic (doc_key, seen, merged) but perform deduping after
collected parallel fetch results before returning merged.
| parts: list[str] = [] | ||
| grouped: dict[tuple[str, int], list[Document]] = {} | ||
| for filename, source_path, page_num, doc in has_page: | ||
| key = (source_path, page_num) | ||
| if key not in grouped: | ||
| grouped[key] = [] | ||
| grouped[key].append(doc) | ||
|
|
||
| keys_sorted = sorted(grouped.keys(), key=lambda k: (k[0], k[1])) | ||
| for (source_path, page_num) in keys_sorted: | ||
| doc_list = grouped[(source_path, page_num)] | ||
| filename = ( | ||
| os.path.splitext(os.path.basename(source_path))[0] | ||
| if source_path | ||
| else "unknown" | ||
| ) | ||
| marker = f"=== Page {page_num} ({filename}) ===\n" | ||
| content = "\n\n".join(format_fn(d) for d in doc_list) | ||
| parts.append(marker + content) | ||
|
|
||
| if no_page: | ||
| parts.append("=== Additional context ===\n") | ||
| parts.append("\n\n".join(format_fn(d) for d in no_page)) | ||
|
|
||
| return "\n\n".join(parts) |
There was a problem hiding this comment.
Unused loop variable and inconsistent no_page formatting.
-
Line 3470:
filenameis computed but never used in the loop body (the actual filename is recomputed at line 3479). Use_to signal intent. -
Lines 3488–3490: The
no_pagebranch appends the header and content as separate list elements, which introduces an extra"\n\n"between them (from"\n\n".join(parts)). Thehas_pagepath (line 3484–3486) concatenates them into one string with only"\n". This formatting inconsistency may produce unexpected output.
♻️ Proposed fix
- for filename, source_path, page_num, doc in has_page:
+ for _, source_path, page_num, doc in has_page:
key = (source_path, page_num)
if key not in grouped:
grouped[key] = []
grouped[key].append(doc) if no_page:
- parts.append("=== Additional context ===\n")
- parts.append("\n\n".join(format_fn(d) for d in no_page))
+ no_page_content = "\n\n".join(format_fn(d) for d in no_page)
+ parts.append("=== Additional context ===\n" + no_page_content)🧰 Tools
🪛 Ruff (0.15.0)
[warning] 3470-3470: Loop control variable filename not used within loop body
(B007)
🤖 Prompt for AI Agents
In `@src/nvidia_rag/rag_server/main.py` around lines 3468 - 3492, The loop
computing grouped keys contains an unused `filename` variable; replace that
variable with `_` (or remove its assignment) so only the intended loop variables
`(filename, source_path, page_num, doc)` are used for grouping, and remove the
redundant filename recomputation earlier to avoid confusion. In the
`keys_sorted` iteration use the computed `filename` only once when creating
`marker = f"=== Page {page_num} ({filename}) ===\n"` (keep the existing
os.path.splitext/os.path.basename expression there) and delete the earlier
unused `filename` assignment. For the `no_page` branch, append a single combined
string (e.g., `"=== Additional context ===\n" + "\n\n".join(format_fn(d) for d
in no_page)`) instead of two separate parts so formatting matches the `has_page`
path and prevents an extra "\n\n" from appearing when joining `parts`. Ensure
references to `grouped`, `keys_sorted`, `format_fn`, `marker`, and `no_page` are
updated accordingly.
| fetch_full_page_context: bool = Field( | ||
| default=CONFIG.retriever.fetch_full_page_context, | ||
| description="Fetch ALL chunks for retrieved pages and organize context by page. " | ||
| "When True, enables page-based grouping for LLM/VLM.", | ||
| ) | ||
| fetch_neighboring_pages: int = Field( | ||
| default=CONFIG.retriever.fetch_neighboring_pages, | ||
| description="N pages before/after each retrieved page (0=disabled, 1=+/-1 page).", | ||
| ge=0, | ||
| le=10, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Search for cross-field validation in configuration.py
rg -n 'fetch_neighboring_pages' --type=py -C 10 -g '*/configuration.py'Repository: NVIDIA-AI-Blueprints/rag
Length of output: 216
🏁 Script executed:
# Check server.py for Prompt model and its validators
rg -n 'class Prompt' --type=py -A 50 -g '*/server.py' | head -100Repository: NVIDIA-AI-Blueprints/rag
Length of output: 216
🏁 Script executed:
# Search for request_data dict construction in server.py
rg -n 'request_data' --type=py -C 5 -g '*/server.py' | head -100Repository: NVIDIA-AI-Blueprints/rag
Length of output: 216
🏁 Script executed:
# Find configuration.py and server.py files
git ls-files | grep -E '(configuration|server)\.py'Repository: NVIDIA-AI-Blueprints/rag
Length of output: 405
🏁 Script executed:
# Get general file structure
fd -t f -e py | head -30Repository: NVIDIA-AI-Blueprints/rag
Length of output: 1249
🏁 Script executed:
# Search for fetch_neighboring_pages validation in configuration.py
rg 'fetch_neighboring_pages' src/nvidia_rag/utils/configuration.py -C 10Repository: NVIDIA-AI-Blueprints/rag
Length of output: 2750
🏁 Script executed:
# Search for Prompt class in server.py
rg 'class Prompt' src/nvidia_rag/rag_server/server.py -A 80Repository: NVIDIA-AI-Blueprints/rag
Length of output: 3384
🏁 Script executed:
# Search for fetch_neighboring_pages in server.py
rg 'fetch_neighboring_pages' src/nvidia_rag/rag_server/server.py -C 5Repository: NVIDIA-AI-Blueprints/rag
Length of output: 1064
🏁 Script executed:
# Search for validators in the Prompt class definition
rg 'class Prompt' src/nvidia_rag/rag_server/server.py -A 300 | grep -E '@(model_validator|field_validator)|def validate'Repository: NVIDIA-AI-Blueprints/rag
Length of output: 239
🏁 Script executed:
# Get full Prompt class definition to line count
wc -l src/nvidia_rag/rag_server/server.pyRepository: NVIDIA-AI-Blueprints/rag
Length of output: 108
🏁 Script executed:
# Show the section after fetch_neighboring_pages in server.py to see if validators exist
sed -n '583,650p' src/nvidia_rag/rag_server/server.pyRepository: NVIDIA-AI-Blueprints/rag
Length of output: 2761
Add cross-field validation to Prompt model for fetch_neighboring_pages.
The configuration layer validates that fetch_neighboring_pages > 0 requires fetch_full_page_context=True (via validate_page_context_options), but the Prompt model lacks this validation. API callers can submit fetch_neighboring_pages=5, fetch_full_page_context=False and bypass this constraint. Add a @model_validator matching the config-level validation.
Proposed fix
fetch_neighboring_pages: int = Field(
default=CONFIG.retriever.fetch_neighboring_pages,
description="N pages before/after each retrieved page (0=disabled, 1=+/-1 page).",
ge=0,
le=10,
)
+ `@model_validator`(mode="after")
+ def validate_page_context_options(self) -> "Prompt":
+ if self.fetch_neighboring_pages > 0 and not self.fetch_full_page_context:
+ raise ValueError(
+ "fetch_full_page_context must be True when fetch_neighboring_pages > 0."
+ )
+ return self
+
`@model_validator`(mode="after")
def validate_confidence_threshold(cls, values):🤖 Prompt for AI Agents
In `@src/nvidia_rag/rag_server/server.py` around lines 583 - 593, Add a
cross-field `@model_validator` to the Prompt pydantic model that enforces the same
rule as validate_page_context_options: if fetch_neighboring_pages > 0 then
fetch_full_page_context must be True; otherwise raise a ValidationError (or
ValueError) with a clear message. Place the validator inside the Prompt class
near the existing field definitions for fetch_full_page_context and
fetch_neighboring_pages and name/reference it similarly (e.g.,
validate_page_context_options) so API inputs like fetch_neighboring_pages=5,
fetch_full_page_context=False are rejected.
| def _build_content_parts_by_page( | ||
| self, | ||
| vlm_template: dict[str, Any], | ||
| textual_context: str, | ||
| question_text: str | None, | ||
| docs: list[Any], | ||
| remaining_image_budget: int | None, | ||
| ) -> list[dict[str, Any]]: | ||
| """Build content_parts with text and images interleaved per page.""" | ||
| human_template = vlm_template.get("human") or "{context}\n\n{question}" | ||
| q = (question_text or "").strip() | ||
| intro = human_template.format(context="", question="").rstrip() | ||
| if intro.endswith("Context:"): | ||
| intro = intro + "\n" | ||
| content_parts: list[dict[str, Any]] = [{"type": "text", "text": intro}] |
There was a problem hiding this comment.
Unused parameters: textual_context is never used and q is assigned but never read.
textual_context (line 403) is accepted as a parameter but never referenced in the method body — the method builds context directly from docs instead. This means any pre-formatted context text from the caller is silently discarded when organize_by_page=True.
Additionally, q (line 410) is assigned but never used since question_text is referenced directly at line 471.
🔧 Proposed fix
def _build_content_parts_by_page(
self,
vlm_template: dict[str, Any],
- textual_context: str,
question_text: str | None,
docs: list[Any],
remaining_image_budget: int | None,
) -> list[dict[str, Any]]:
"""Build content_parts with text and images interleaved per page."""
human_template = vlm_template.get("human") or "{context}\n\n{question}"
- q = (question_text or "").strip()
intro = human_template.format(context="", question="").rstrip()And update the call site at line 295:
content_parts = self._build_content_parts_by_page(
vlm_template,
- textual_context,
question_text,
docs,
remaining_image_budget,
)🧰 Tools
🪛 Ruff (0.15.0)
[warning] 403-403: Unused method argument: textual_context
(ARG002)
[error] 410-410: Local variable q is assigned to but never used
Remove assignment to unused variable q
(F841)
🤖 Prompt for AI Agents
In `@src/nvidia_rag/rag_server/vlm.py` around lines 400 - 414, The method
_build_content_parts_by_page currently ignores the textual_context parameter and
assigns q but never uses it; change the human_template.format call (and any
later question_text usage inside _build_content_parts_by_page) to use the
provided textual_context and the trimmed q variable (q = (question_text or
"").strip()) instead of empty strings or raw question_text, so the preformatted
context and normalized question are respected; if you decide not to use
textual_context here, remove the unused parameter and q, otherwise update the
caller to supply the textual_context argument so the method can include it when
building the intro and any question formatting.
| try: | ||
| # Build filter: page_number in [x,y,z] and source matches | ||
| page_list_str = ", ".join(str(p) for p in page_numbers) | ||
| filter_expr = ( | ||
| f'content_metadata["page_number"] in [{page_list_str}] and ' | ||
| f'source["source_name"] like "%{source_name}%"' | ||
| ) | ||
| milvus_client = MilvusClient( | ||
| self.vdb_endpoint, | ||
| token=self._get_milvus_token(), | ||
| ) | ||
| entities = milvus_client.query( | ||
| collection_name=collection_name, | ||
| filter=filter_expr, | ||
| limit=limit, | ||
| ) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# Check the like pattern usage with source_name
rg -n 'like.*source_name' --type=py -C 2Repository: NVIDIA-AI-Blueprints/rag
Length of output: 989
🏁 Script executed:
# Also check the context around the file mentioned
rg -n 'retrieve_chunks_by_filter|retrieval_image_langchain' src/nvidia_rag/utils/vdb/milvus/milvus_vdb.pyRepository: NVIDIA-AI-Blueprints/rag
Length of output: 222
🏁 Script executed:
# Check the Elasticsearch implementation for comparison
fd -e py | xargs grep -l 'elastic_vdb\|ElasticVDB' | head -5Repository: NVIDIA-AI-Blueprints/rag
Length of output: 248
🏁 Script executed:
# Check the Elasticsearch implementation for source filtering
cat src/nvidia_rag/utils/vdb/elasticsearch/elastic_vdb.py | head -100Repository: NVIDIA-AI-Blueprints/rag
Length of output: 4351
🏁 Script executed:
# Search for source filtering in elastic_vdb.py specifically
rg -n 'source_name|source\.source_name|term.*source' src/nvidia_rag/utils/vdb/elasticsearch/elastic_vdb.py -A 3 -B 3Repository: NVIDIA-AI-Blueprints/rag
Length of output: 1488
🏁 Script executed:
# Find the method that retrieves chunks by source and pages
rg -n 'get_chunks_by_source_and_pages|retrieve_chunks_by_filter' src/nvidia_rag/utils/vdb/elasticsearch/elastic_vdb.py -A 20Repository: NVIDIA-AI-Blueprints/rag
Length of output: 2582
🏁 Script executed:
# Find the es_queries file and check the get_chunks_by_source_and_pages_query function
fd 'es_queries' --type fRepository: NVIDIA-AI-Blueprints/rag
Length of output: 189
🏁 Script executed:
# Check the implementation of get_chunks_by_source_and_pages_query
rg -n 'def get_chunks_by_source_and_pages_query' -A 30Repository: NVIDIA-AI-Blueprints/rag
Length of output: 2865
🏁 Script executed:
# Check if there's any escaping or validation of source_name in Milvus filter expressions
rg -n 'source_name.*escape|escape.*source_name|quote|sanitize' src/nvidia_rag/utils/vdb/milvus/ --type=pyRepository: NVIDIA-AI-Blueprints/rag
Length of output: 50
🏁 Script executed:
# Check if Milvus documentation or usage elsewhere shows escaping patterns
rg -n 'filter_expr.*source|filter.*like' src/nvidia_rag/utils/vdb/milvus/milvus_vdb.py -B 2 -A 2Repository: NVIDIA-AI-Blueprints/rag
Length of output: 50
Milvus uses substring match (like) while Elasticsearch uses exact match (term) for source filtering, creating behavioral inconsistency.
Line 1211 uses like "%{source_name}%" for substring matching. The Elasticsearch implementation in es_queries.py (line 100) uses term on metadata.source.source_name.keyword, which is exact matching. This means the same query could return different results depending on the backend.
Additionally, source_name is interpolated directly into the filter string without escaping. If source_name contains special characters (e.g., quotes), it could break the filter expression. This pattern is consistent with existing code in retrieval_image_langchain (line 1161), but it remains a concern.
🤖 Prompt for AI Agents
In `@src/nvidia_rag/utils/vdb/milvus/milvus_vdb.py` around lines 1206 - 1221, The
Milvus query builds filter_expr with substring matching (like "%{source_name}%")
and interpolates source_name raw, causing inconsistent behavior with the
Elasticsearch implementation (which uses exact term match) and risk of
syntax-breaking characters; update the code that constructs filter_expr in
milvus_vdb.py (the filter_expr variable used with MilvusClient.query) to perform
an exact equality check on source["source_name"] (to mirror es_queries.py's term
on metadata.source.source_name.keyword) and properly escape/quote source_name
before interpolation (e.g., ensure internal quotes/backslashes are escaped and
the value is wrapped in quotes) so the filter string is safe and semantics match
across backends.
| def test_fetch_full_page_context_calls_vdb(self): | ||
| """Test fetch_full_page_context triggers VDB filter query.""" | ||
| rag = NvidiaRAG() | ||
| docs = [ | ||
| Document( | ||
| page_content="x", | ||
| metadata={ | ||
| "source": {"source_name": "/path/to/a.pdf"}, | ||
| "content_metadata": {"page_number": 3}, | ||
| "collection_name": "col1", | ||
| }, | ||
| ), | ||
| ] | ||
| mock_vdb = MagicMock() | ||
| mock_vdb.retrieve_chunks_by_filter.return_value = [] | ||
|
|
||
| result = rag._expand_and_organize_context( | ||
| docs=docs, | ||
| vdb_op=mock_vdb, | ||
| collection_names=["col1"], | ||
| fetch_full_page_context=True, | ||
| fetch_neighboring_pages=0, | ||
| ) | ||
|
|
||
| mock_vdb.retrieve_chunks_by_filter.assert_called_once() | ||
| call_kw = mock_vdb.retrieve_chunks_by_filter.call_args[1] | ||
| assert call_kw["collection_name"] == "col1" | ||
| assert call_kw["source_name"] == "/path/to/a.pdf" | ||
| assert 3 in call_kw["page_numbers"] | ||
| assert len(result) >= 1 |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, find the NvidiaRAG class and its _expand_and_organize_context method
fd -t f "rag" --type f | grep -E "\.(py)$" | head -20Repository: NVIDIA-AI-Blueprints/rag
Length of output: 664
🏁 Script executed:
# Search for _expand_and_organize_context implementation
rg "_expand_and_organize_context" -A 50Repository: NVIDIA-AI-Blueprints/rag
Length of output: 15426
🏁 Script executed:
# Search for retrieve_chunks_by_filter calls
rg "retrieve_chunks_by_filter" -B 2 -A 2Repository: NVIDIA-AI-Blueprints/rag
Length of output: 5202
🏁 Script executed:
# Get the full retrieve_chunks_by_filter call in _expand_and_organize_context
rg "retrieve_chunks_by_filter" -A 5 src/nvidia_rag/rag_server/main.pyRepository: NVIDIA-AI-Blueprints/rag
Length of output: 414
🏁 Script executed:
# Also check the signature of retrieve_chunks_by_filter to understand the parameters
rg "def retrieve_chunks_by_filter" -A 10Repository: NVIDIA-AI-Blueprints/rag
Length of output: 2741
🏁 Script executed:
# Check Python version requirement
cat README.md | grep -i python || cat pyproject.toml | grep -i python || cat setup.py | grep -i python || find . -name "*.toml" -o -name "*.cfg" | xargs grep -i "python" 2>/dev/null | head -20Repository: NVIDIA-AI-Blueprints/rag
Length of output: 112
🏁 Script executed:
# Let's check what happens in the result after fetching empty chunks
# Examine the rest of _expand_and_organize_context to see how result is constructed
rg "_expand_and_organize_context" -A 100 src/nvidia_rag/rag_server/main.py | head -120Repository: NVIDIA-AI-Blueprints/rag
Length of output: 6292
🏁 Script executed:
# Get more of the _expand_and_organize_context method implementation
rg "def _expand_and_organize_context" -A 150 src/nvidia_rag/rag_server/main.py | tail -100Repository: NVIDIA-AI-Blueprints/rag
Length of output: 3949
Use call_args.kwargs for clarity and strengthen the result length assertion.
Line 223 uses call_args[1] to access keyword arguments. While this works correctly since the implementation calls retrieve_chunks_by_filter with keyword-only arguments, call_args.kwargs (Python 3.8+) is more explicit and self-documenting.
Also, assert len(result) >= 1 (line 227) is weak. Since the mock returns an empty list and deduplication preserves only the original document, use assert len(result) == 1 for a stronger assertion.
Suggested improvements
- call_kw = mock_vdb.retrieve_chunks_by_filter.call_args[1]
+ call_kw = mock_vdb.retrieve_chunks_by_filter.call_args.kwargs
assert call_kw["collection_name"] == "col1"
assert call_kw["source_name"] == "/path/to/a.pdf"
assert 3 in call_kw["page_numbers"]
- assert len(result) >= 1
+ assert len(result) == 1🤖 Prompt for AI Agents
In `@tests/unit/test_rag_server/test_page_context_organization.py` around lines
198 - 227, The test test_fetch_full_page_context_calls_vdb should use
call_args.kwargs for clarity and strengthen the final assertion: replace
accessing mock_vdb.retrieve_chunks_by_filter.call_args[1] with
mock_vdb.retrieve_chunks_by_filter.call_args.kwargs to explicitly read keyword
arguments, and change the weak assertion assert len(result) >= 1 to assert
len(result) == 1 because the mock returns an empty list and deduplication in
rag._expand_and_organize_context should leave only the original document.
c18624c to
37e356a
Compare
…e-aware retrieval
Description
Checklist
git commit -s) and GPG signed (git commit -S).Summary by CodeRabbit