Skip to content

Commit 5ad0c04

Browse files
jkwatsonbaasitshariefewilliams-clouderamliu-clouderaactions-user
authored
Streaming chat cleanup & in-process docling (#247)
* refactoring/cleanup * type fixes and import cleanup * name changes lastFile:ui/src/pages/RagChatTab/State/RagChatContext.tsx * wip lastFile:llm-service/app/services/query/querier.py * event testing lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * wip event queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * moving poison pill around lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * event wip lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * WIP event queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * WIP event queue lastFile:llm-service/app/services/query/chat_events.py * WIP even queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * wip on chat events lastFile:llm-service/app/routers/index/sessions/__init__.py * work in progress on chat events lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * WIP event queue lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * drop databases lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * wip on openai streaming events lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * send additional done after we're really done lastFile:llm-service/app/routers/index/sessions/__init__.py * getting close to streaming events on non openai agents lastFile:llm-service/app/services/query/agents/tool_calling_querier.py * gracefully shutdown handler and close loop * python cleanup * error handling in the non-openai streaming * cleanup * render contents of a tags and remove chat event queue * input for date tool * default input * fix duplicated timestamp issue * mypy * remove openaiagent * update lock file * Docling enhancements to parsing support and sampling summary nodes (#248) * work on docling native parsing * native parsing to json work, no formatting * docling + markdown + page numbers * small cleanup * use docling for parsing docs * only use docling readers for pdf and html * change chunk API to return a list of results * batch the csv results * conditionally condense questions * Revert "batch the csv results" This reverts commit ff3936f. * Revert "change chunk API to return a list of results" This reverts commit 4ea267f. * implement block-sampling for summarization * add test for block sampling * add some prints for debugging * update test and doc strings * handle case where no content in summary * better status code * fix mypy --------- Co-authored-by: jwatson <[email protected]> * Update release version to dev-testing * filter out non-final agent response * fixed typos * fixed typos wip * put limit on error retries on suggested questions and show error when opening suggested questions * format agent stream output for non final response * better error messages * refactor and check for mistral agent * fix issue with long responses to session name * return empty string when no chat history instead of throwing exception * throw error using bedrock model that does not support tool calling * error handling for non tool calling models lastFile:llm-service/app/services/query/querier.py * mypy fix and refactoring * use chat when no tools with tool calling enable * remove print and modify error message * remove print --------- Co-authored-by: Baasit Sharief <[email protected]> Co-authored-by: Elijah Williams <[email protected]> Co-authored-by: Michael Liu <[email protected]> Co-authored-by: actions-user <[email protected]>
1 parent 70c9830 commit 5ad0c04

30 files changed

+888
-328
lines changed

llm-service/app/ai/indexing/base.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import os
23
from abc import abstractmethod
34
from dataclasses import dataclass
@@ -6,13 +7,17 @@
67

78
from .readers.base_reader import BaseReader, ReaderConfig
89
from .readers.csv import CSVReader
10+
from .readers.docling_reader import DoclingReader
911
from .readers.docx import DocxReader
1012
from .readers.images import ImagesReader
1113
from .readers.json import JSONReader
1214
from .readers.markdown import MdReader
1315
from .readers.pdf import PDFReader
1416
from .readers.pptx import PptxReader
1517
from .readers.simple_file import SimpleFileReader
18+
from ...config import settings
19+
20+
logger = logging.getLogger(__name__)
1621

1722
READERS: Dict[str, Type[BaseReader]] = {
1823
".pdf": PDFReader,
@@ -29,6 +34,11 @@
2934
".png": ImagesReader,
3035
}
3136

37+
DOCLING_READERS: Dict[str, Type[BaseReader]] = {
38+
".pdf": DoclingReader,
39+
".html": DoclingReader,
40+
}
41+
3242

3343
@dataclass
3444
class NotSupportedFileExtensionError(Exception):
@@ -50,17 +60,19 @@ def index_file(self, file_path: Path, doc_id: str) -> None:
5060

5161
def _get_reader_class(self, file_path: Path) -> Type[BaseReader]:
5262
file_extension = os.path.splitext(file_path)[1]
53-
reader_cls = READERS.get(file_extension)
63+
reader_cls: Optional[Type[BaseReader]] = None
64+
if settings.advanced_pdf_parsing and DOCLING_READERS.get(file_extension):
65+
try:
66+
reader_cls = DoclingReader
67+
except Exception as e:
68+
logger.error(
69+
"Error initializing DoclingReader, falling back to default readers",
70+
e,
71+
)
72+
reader_cls = READERS.get(file_extension)
73+
else:
74+
reader_cls = READERS.get(file_extension)
5475
if not reader_cls:
5576
raise NotSupportedFileExtensionError(file_extension)
5677

5778
return reader_cls
58-
59-
60-
def get_reader_class(file_path: Path) -> Type[BaseReader]:
61-
file_extension = os.path.splitext(file_path)[1]
62-
reader_cls = READERS.get(file_extension)
63-
if not reader_cls:
64-
raise NotSupportedFileExtensionError(file_extension)
65-
66-
return reader_cls
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
#
2+
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
3+
# (C) Cloudera, Inc. 2025
4+
# All rights reserved.
5+
#
6+
# Applicable Open Source License: Apache 2.0
7+
#
8+
# NOTE: Cloudera open source products are modular software products
9+
# made up of hundreds of individual components, each of which was
10+
# individually copyrighted. Each Cloudera open source product is a
11+
# collective work under U.S. Copyright Law. Your license to use the
12+
# collective work is as provided in your written agreement with
13+
# Cloudera. Used apart from the collective work, this file is
14+
# licensed for your use pursuant to the open source license
15+
# identified above.
16+
#
17+
# This code is provided to you pursuant a written agreement with
18+
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
19+
# this code. If you do not have a written agreement with Cloudera nor
20+
# with an authorized and properly licensed third party, you do not
21+
# have any rights to access nor to use this code.
22+
#
23+
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
24+
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
25+
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
26+
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
27+
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
28+
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
29+
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
30+
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
31+
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
32+
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
33+
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
34+
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
35+
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
36+
# DATA.
37+
#
38+
39+
import logging
40+
from pathlib import Path
41+
from typing import List, Any
42+
43+
from docling.datamodel.document import ConversionResult
44+
from docling.document_converter import DocumentConverter
45+
from docling_core.transforms.chunker.hierarchical_chunker import HierarchicalChunker
46+
from docling_core.transforms.chunker.base import BaseChunk
47+
from docling_core.transforms.serializer.base import SerializationResult
48+
from docling_core.transforms.serializer.markdown import MarkdownDocSerializer
49+
from llama_index.core.schema import Document, TextNode, NodeRelationship
50+
51+
from .base_reader import BaseReader
52+
from .base_reader import ChunksResult
53+
from .pdf import MarkdownSerializerProvider
54+
55+
logger = logging.getLogger(__name__)
56+
57+
class DoclingReader(BaseReader):
58+
def __init__(self, *args: Any, **kwargs: Any) -> None:
59+
super().__init__(*args, **kwargs)
60+
61+
def load_chunks(self, file_path: Path) -> ChunksResult:
62+
document = Document()
63+
document.id_ = self.document_id
64+
self._add_document_metadata(document, file_path)
65+
parent = document.as_related_node_info()
66+
67+
converted_chunks: List[TextNode] = []
68+
logger.debug(f"{file_path=}")
69+
docling_doc: ConversionResult = DocumentConverter().convert(file_path)
70+
chunky_chunks = HierarchicalChunker(serializer_provider=MarkdownSerializerProvider()).chunk(docling_doc.document)
71+
chunky_chunk: BaseChunk
72+
serializer = MarkdownDocSerializer(doc=docling_doc.document)
73+
for i, chunky_chunk in enumerate(chunky_chunks):
74+
text = ""
75+
page_number: int = 0
76+
if not hasattr(chunky_chunk.meta, "doc_items"):
77+
logger.warning(f"Chunk {i} is empty, skipping")
78+
continue
79+
for item in chunky_chunk.meta.doc_items:
80+
page_number= item.prov[0].page_no if item.prov else None
81+
item_ser: SerializationResult = serializer.serialize(item=item)
82+
text += item_ser.text
83+
node = TextNode(text=text)
84+
if page_number:
85+
node.metadata["page_number"] = page_number
86+
node.metadata["file_name"] = document.metadata["file_name"]
87+
node.metadata["document_id"] = document.metadata["document_id"]
88+
node.metadata["data_source_id"] = document.metadata["data_source_id"]
89+
node.metadata["chunk_number"] = i
90+
node.metadata["chunk_format"] = "markdown"
91+
node.relationships.update(
92+
{NodeRelationship.SOURCE: parent}
93+
)
94+
converted_chunks.append(node)
95+
return ChunksResult(converted_chunks)

llm-service/app/ai/indexing/readers/pdf.py

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,18 @@
3535
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
3636
# DATA.
3737
#
38-
import os
3938
import logging
4039
from pathlib import Path
4140
from typing import Any, List
4241

42+
from docling_core.transforms.serializer.base import BaseSerializerProvider, BaseDocSerializer
43+
from docling_core.transforms.serializer.markdown import MarkdownDocSerializer
44+
from docling_core.types.doc.document import DoclingDocument
4345
from llama_index.core.schema import Document, TextNode
4446
from llama_index.readers.file import PDFReader as LlamaIndexPDFReader
47+
from typing_extensions import override
4548

46-
from ....exceptions import DocumentParseError
4749
from .base_reader import BaseReader, ChunksResult
48-
from .docling import load_chunks
4950
from .markdown import MdReader
5051

5152
logger = logging.getLogger(__name__)
@@ -88,27 +89,22 @@ def populate_chunk_page_numbers(self, chunks: List[TextNode]) -> None:
8889
chunk.metadata["page_number"] = chunk_label
8990

9091

92+
class MarkdownSerializerProvider(BaseSerializerProvider):
93+
"""Serializer provider used for chunking purposes."""
94+
95+
@override
96+
def get_serializer(self, doc: DoclingDocument) -> BaseDocSerializer:
97+
"""Get the associated serializer."""
98+
return MarkdownDocSerializer(doc=doc)
99+
100+
91101
class PDFReader(BaseReader):
92102
def __init__(self, *args: Any, **kwargs: Any) -> None:
93103
super().__init__(*args, **kwargs)
94104
self.inner = LlamaIndexPDFReader(return_full_document=False)
95105
self.markdown_reader = MdReader(*args, **kwargs)
96106

97107
def load_chunks(self, file_path: Path) -> ChunksResult:
98-
docling_enabled: bool = (
99-
os.getenv("USE_ENHANCED_PDF_PROCESSING", "false").lower() == "true"
100-
)
101-
logger.info(f"{docling_enabled=}")
102-
try:
103-
if docling_enabled:
104-
logger.debug(f"{file_path=}")
105-
chunks: list[TextNode] = load_chunks(self.markdown_reader, file_path)
106-
if chunks:
107-
# todo: handle pii & secrets
108-
return ChunksResult(chunks=chunks)
109-
except DocumentParseError as e:
110-
logger.warning(f"Failed to parse document with docling: {e}")
111-
112108
ret = ChunksResult()
113109

114110
pages: list[Document] = self.inner.load_data(file_path)

llm-service/app/ai/indexing/summary_indexer.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#
3838
import logging
3939
import os
40+
import random
4041
import shutil
4142
from pathlib import Path
4243
from threading import Lock
@@ -277,6 +278,9 @@ def index_file(self, file_path: Path, document_id: str) -> None:
277278
chunks: ChunksResult = reader.load_chunks(file_path)
278279
nodes: List[TextNode] = chunks.chunks
279280

281+
nodes = self.sample_nodes(nodes, 1000, 20)
282+
logger.debug(f"Using {len(nodes)} nodes from {len(chunks.chunks)} total nodes")
283+
280284
if not nodes:
281285
logger.warning(f"No chunks found for file {file_path}")
282286
return
@@ -358,6 +362,72 @@ def __update_global_summary_store(
358362
global_summary_store.insert_nodes(new_nodes)
359363
global_summary_store.storage_context.persist(persist_dir=global_persist_dir)
360364

365+
def sample_nodes(
366+
self,
367+
nodes: List[TextNode],
368+
max_number_to_sample: int = 1000,
369+
sample_block_size: int = 20,
370+
) -> List[TextNode]:
371+
"""
372+
Sample max_number_to_sample in contiguous blocks of sample_block_size if we have more than max_number_to_sample nodes.
373+
This sampling helps reduce processing time for very large documents while maintaining context coherence.
374+
375+
Args:
376+
nodes: List of TextNode objects to sample from
377+
max_number_to_sample: max number of nodes to sample
378+
sample_block_size: how big the contiguous blocks should be
379+
380+
Returns:
381+
A list of sampled TextNode objects, or the original list if it has 1000 or fewer nodes
382+
"""
383+
if len(nodes) <= max_number_to_sample:
384+
return nodes
385+
386+
num_blocks = max_number_to_sample // sample_block_size
387+
block_size = sample_block_size
388+
389+
# Calculate the maximum valid starting index for a block
390+
max_block_start_index = len(nodes) - block_size
391+
392+
# Randomly select starting indices for blocks, ensuring they're at least block_size apart
393+
# to avoid overlapping blocks
394+
available_indices = list(range(max_block_start_index + 1))
395+
block_start_indices: list[int] = []
396+
397+
# Try to get num_blocks non-overlapping blocks
398+
while len(block_start_indices) < num_blocks and available_indices:
399+
# Randomly select an index from available indices
400+
if not available_indices:
401+
break
402+
idx = random.choice(available_indices)
403+
block_start_indices.append(idx)
404+
405+
# Remove this index and all indices that would create overlapping blocks
406+
# (i.e., all indices within block_size of the selected index)
407+
for i in range(
408+
max(0, idx - block_size + 1), min(len(nodes), idx + block_size)
409+
):
410+
if i in available_indices:
411+
available_indices.remove(i)
412+
413+
# Sort the indices to maintain order
414+
block_start_indices.sort()
415+
416+
# Extract blocks of block_size contiguous nodes
417+
sampled_nodes = []
418+
for start_idx in block_start_indices:
419+
sampled_nodes.extend(nodes[start_idx : start_idx + block_size])
420+
421+
# If we couldn't get enough blocks (if document is not large enough)
422+
# but still larger than 1000, take the first 1000
423+
if (
424+
len(sampled_nodes) < max_number_to_sample
425+
and len(nodes) >= max_number_to_sample
426+
):
427+
return nodes[:max_number_to_sample]
428+
else:
429+
return sampled_nodes
430+
361431
def get_summary(self, document_id: str) -> Optional[str]:
362432
with _write_lock:
363433
persist_dir = self.__persist_dir()

llm-service/app/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ def qdrant_host(self) -> str:
101101
def qdrant_port(self) -> int:
102102
return int(os.environ.get("QDRANT_PORT", "6333"))
103103

104+
@property
105+
def advanced_pdf_parsing(self) -> bool:
106+
return os.environ.get("USE_ENHANCED_PDF_PROCESSING", "false").lower() == "true"
107+
104108
@property
105109
def vector_db_provider(self) -> Optional[str]:
106110
return os.environ.get("VECTOR_DB_PROVIDER")

llm-service/app/routers/index/data_source/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,11 @@ def summarize_document(
252252
try:
253253
indexer.index_file(file_path, doc_id)
254254
summary = indexer.get_summary(doc_id)
255-
assert summary is not None
255+
if summary is None:
256+
raise HTTPException(
257+
status_code=HTTPStatus.UNPROCESSABLE_ENTITY,
258+
detail="No content to summarize.",
259+
)
256260
return summary
257261
except NotSupportedFileExtensionError as e:
258262
raise HTTPException(

0 commit comments

Comments
 (0)