feat: simplify extraction pipeline and add batch entity summarization#1224
feat: simplify extraction pipeline and add batch entity summarization#1224prasmussen15 merged 12 commits intomainfrom
Conversation
Add TokenUsageTracker class to track input/output tokens by prompt type during LLM calls. This helps analyze token costs across different operations like extract_nodes, extract_edges, resolve_nodes, etc. Changes: - Add graphiti_core/llm_client/token_tracker.py with TokenUsageTracker - Update LLMClient base class to include token_tracker instance - Update OpenAI base client to capture and record token usage - Add token_tracker property on Graphiti class for easy access - Update podcast_runner.py to print token usage summary after ingestion Usage: client = Graphiti(...) # ... run ingestion ... client.token_tracker.print_summary(sort_by='prompt_name') Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Disable the optimization that skips LLM calls when node summary + edge facts is under 2000 characters. This forces all summaries to be generated via LLM for token usage analysis. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This reverts the summary optimization changes. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove chunking code for entity-dense episodes (node_operations.py) - Delete _extract_nodes_chunked, _extract_from_chunk, _merge_extracted_entities - Always use single LLM call for entity extraction - Remove chunking code for edge extraction (edge_operations.py) - Remove MAX_NODES constant and generate_covering_chunks usage - Process all nodes in single LLM call instead of covering subsets - Add batch entity summarization (node_operations.py, extract_nodes.py) - New SummarizedEntity and SummarizedEntities Pydantic models - New extract_summaries_batch prompt for batch processing - New _extract_entity_summaries_batch function - Nodes with short summaries get edge facts appended directly (no LLM) - Only nodes needing LLM summarization are batched together - Simplify edge attribute extraction (extract_edges.py, edge_operations.py) - Remove episode_content from context (attributes from fact only) - Keep reference_time for temporal resolution - Add existing_attributes to preserve/update existing values - Improve edge deduplication prompt (dedupe_edges.py, edge_operations.py) - Use continuous indexing across duplicate and invalidation candidates - Deduplicate invalidation candidates against duplicate candidates - Allow EXISTING FACTS to be both duplicates AND contradicted - Consolidate to single contradicted_facts field - Remove obsolete chunking tests (test_entity_extraction.py) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
| total_output_tokens += output_tokens | ||
|
|
||
| # Record token usage | ||
| self.token_tracker.record(prompt_name, total_input_tokens, total_output_tokens) |
There was a problem hiding this comment.
Token usage is recorded even when there's an exception during retry attempts. The total_input_tokens and total_output_tokens are accumulated across retries, but if a retry fails after a successful initial call, the tracker will record tokens from both the successful and failed attempts, potentially double-counting.
Consider moving the token_tracker.record() call outside the retry loop, or only record on the first successful response.
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Implement token tracking in AnthropicClient._generate_response() and generate_response() using result.usage.input_tokens/output_tokens - Implement token tracking in GeminiClient._generate_response() and generate_response() using response.usage_metadata - Add comprehensive unit tests for TokenUsageTracker class - Add tests for _extract_entity_summaries_batch function covering: - No nodes needing summarization - Short summaries with edge facts - Long summaries requiring LLM - Node filter (should_summarize_node) - Batch multiple nodes - Unknown entity handling - Missing episode and summary Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Remove import of extract_attributes_from_node (function was removed) - Add import of _extract_entity_summaries_batch - Update tests to use new batch summarization API Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add MAX_NODES = 30 constant - Partition nodes needing summarization into flights of MAX_NODES - Extract _process_summary_flight helper for processing each flight - Each flight makes a separate LLM call to avoid context overflow Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Update both DEFAULT_MODEL and DEFAULT_SMALL_MODEL to use gpt-5-mini. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
|
|
||
|
|
||
| class TokenUsageTracker: | ||
| """Thread-safe tracker for LLM token usage by prompt type.""" |
There was a problem hiding this comment.
Minor: The docstring says "Thread-safe" but this is an async codebase. While threading.Lock works for protecting shared state in async code (since asyncio is single-threaded), the comments and design suggest this was written with threading in mind.
For clarity and potential future multi-threaded scenarios (e.g., if using ThreadPoolExecutor for blocking operations), this is fine. However, if you want to be more explicit about async-safe design, you could use asyncio.Lock instead, which is specifically designed for async contexts.
That said, threading.Lock is actually safer here if the tracker might be accessed from both sync and async contexts (like from the print_summary method which is sync).
Remove explicit model configuration to use the default gpt-5-mini models from OpenAIClient. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Restore the original default models instead of gpt-5-mini. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Fix unreachable code in _handle_structured_response (check response.refusal) - Process node summary flights in parallel using semaphore_gather - Use case-insensitive name matching for LLM summary responses - Handle duplicate node names by applying summary to all matching nodes - Fix edge case when both edge lists are empty in contradiction processing - Fix potential AttributeError when episode is None in edge attributes - Add tests for flight partitioning and case-insensitive name matching Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Summary
Changes
Node Operations
_extract_nodes_chunked,_extract_from_chunk,_merge_extracted_entities_extract_entity_summaries_batchfor batch summarizationEdge Operations
MAX_NODESconstant andgenerate_covering_chunksusagePrompts
extract_summaries_batchprompt withSummarizedEntity/SummarizedEntitiesmodelsextract_attributesfor edges (fact + reference_time + existing_attributes only)resolve_edgewith continuous indexing and consolidatedcontradicted_factsfieldTests
test_entity_extraction.pyTest plan
🤖 Generated with Claude Code