feat: driver operations architecture redesign#1232
Conversation
… concrete implementations Introduces a clean operations-based architecture for graph driver operations, replacing inline query logic with abstract interfaces (ABCs) and concrete implementations for both Neo4j and FalkorDB backends. Key changes: - Add QueryExecutor and Transaction ABCs for database-agnostic query execution - Add 11 operations ABCs covering all node, edge, search, and graph maintenance operations - Implement all 11 operations for Neo4j with real transaction commit/rollback - Implement all 11 operations for FalkorDB with RedisSearch fulltext and vecf32 embeddings - Add NodeNamespace and EdgeNamespace convenience wrappers on Graphiti class - Wire operations into Neo4jDriver and FalkorDriver with property accessors - Fix circular import by moving STOPWORDS to graphiti_core.driver.falkordb package - Include design spec documenting architecture decisions and migration plan Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
| @@ -61,6 +102,64 @@ def __init__( | |||
|
|
|||
There was a problem hiding this comment.
Scheduling build_indices_and_constraints as a fire-and-forget task in __init__ is problematic:
- Silent failures: If the task fails, exceptions are silently swallowed unless you've configured an exception handler for the event loop
- Race condition: Code that immediately uses the driver may execute before indices are ready
- Testing difficulty: Makes it hard to control when indices are built in tests
Consider either:
- Removing the auto-scheduling and documenting that users should call
build_indices_and_constraints()explicitly - Making initialization async with a factory method like
Neo4jDriver.create()
| def _entity_node_from_record(record: Any) -> EntityNode: | ||
| attributes = record['attributes'] | ||
| attributes.pop('uuid', None) | ||
| attributes.pop('name', None) | ||
| attributes.pop('group_id', None) | ||
| attributes.pop('name_embedding', None) | ||
| attributes.pop('summary', None) | ||
| attributes.pop('created_at', None) | ||
| attributes.pop('labels', None) | ||
|
|
||
| labels = record.get('labels', []) | ||
| group_id = record.get('group_id') | ||
| dynamic_label = 'Entity_' + group_id.replace('-', '') | ||
| if dynamic_label in labels: | ||
| labels.remove(dynamic_label) | ||
|
|
||
| return EntityNode( | ||
| uuid=record['uuid'], | ||
| name=record['name'], | ||
| name_embedding=record.get('name_embedding'), | ||
| group_id=group_id, | ||
| labels=labels, | ||
| created_at=parse_db_date(record['created_at']), # type: ignore[arg-type] | ||
| summary=record['summary'], | ||
| attributes=attributes, | ||
| ) | ||
|
|
There was a problem hiding this comment.
This _entity_node_from_record function is duplicated verbatim across 4 files:
neo4j/operations/search_ops.pyneo4j/operations/entity_node_ops.pyneo4j/operations/graph_ops.py- And their FalkorDB equivalents
Similarly, _entity_edge_from_record and _community_node_from_record are duplicated across multiple files.
Consider extracting these into a shared graphiti_core/driver/record_converters.py module to eliminate the duplication.
| @property | ||
| def entity_node_ops(self) -> EntityNodeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def episode_node_ops(self) -> EpisodeNodeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def community_node_ops(self) -> CommunityNodeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def saga_node_ops(self) -> SagaNodeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def entity_edge_ops(self) -> EntityEdgeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def episodic_edge_ops(self) -> EpisodicEdgeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def community_edge_ops(self) -> CommunityEdgeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def has_episode_edge_ops(self) -> HasEpisodeEdgeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def next_episode_edge_ops(self) -> NextEpisodeEdgeOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def search_ops(self) -> SearchOperations | None: | ||
| return None | ||
|
|
||
| @property | ||
| def graph_ops(self) -> GraphMaintenanceOperations | None: | ||
| return None |
There was a problem hiding this comment.
All these operations properties return | None but the concrete drivers (Neo4jDriver, FalkorDriver) always return non-None implementations. This forces callers to do null checks even though these are never actually None for real drivers.
Consider either:
- Raising
NotImplementedErrorinstead of returningNone(like other abstract methods in this class) - Making these abstract properties that concrete drivers must implement
The current approach leads to unnecessary null checks throughout the codebase and makes the type system less helpful.
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| MAX_QUERY_LENGTH = 128 |
There was a problem hiding this comment.
The constant MAX_QUERY_LENGTH = 128 is defined here but build_fulltext_query on line 782 uses a default of max_query_length: int = 8000. This inconsistency between the class-level constant and the method default could cause confusion. Either use the constant consistently or document why different defaults are appropriate.
| def __init__(self, driver: GraphDriver, embedder: EmbedderClient): | ||
| entity_node_ops = driver.entity_node_ops | ||
| episode_node_ops = driver.episode_node_ops | ||
| community_node_ops = driver.community_node_ops | ||
| saga_node_ops = driver.saga_node_ops | ||
|
|
||
| if entity_node_ops is not None: | ||
| self.entity = EntityNodeNamespace(driver, entity_node_ops, embedder) | ||
| if episode_node_ops is not None: | ||
| self.episode = EpisodeNodeNamespace(driver, episode_node_ops) | ||
| if community_node_ops is not None: | ||
| self.community = CommunityNodeNamespace(driver, community_node_ops, embedder) | ||
| if saga_node_ops is not None: | ||
| self.saga = SagaNodeNamespace(driver, saga_node_ops) |
There was a problem hiding this comment.
The namespace attributes (entity, episode, community, saga) are only set if the corresponding ops is not None, which means accessing graphiti.nodes.entity will raise AttributeError if the driver doesn't implement entity operations.
Consider either:
- Always setting all attributes but raising
NotImplementedErrorwhen methods are called - Documenting this behavior and providing a
has_entity_ops()method or similar
This would improve the developer experience when working with drivers that may not implement all operations.
| @abstractmethod | ||
| async def load_embeddings_bulk( | ||
| self, | ||
| executor: QueryExecutor, | ||
| nodes: list[EntityNode], | ||
| batch_size: int = 100, | ||
| ) -> None: ... |
There was a problem hiding this comment.
The batch_size parameter is declared in the abstract method but is not enforced in either the Neo4j or FalkorDB implementations for load_embeddings_bulk. The FalkorDB implementation explicitly ignores it with # noqa: ARG002. Consider either:
- Implementing actual batching in the concrete implementations
- Removing the parameter from the interface if batching isn't needed
Code Review SummaryThis PR introduces a significant architectural redesign with 22 new operation classes. While the overall design is sound and follows good separation of concerns, there are several areas that need attention: Major Concerns1. No Unit Tests for New Operations 2. Significant Code Duplication 3. Fire-and-Forget Task in Constructor Additional Issues
Spec DocumentThe spec file |
- Fix ruff UP037: remove quoted type annotations in driver.py (redundant with `from __future__ import annotations`) - Extract duplicate record parsers into shared record_parsers.py module, eliminating identical _entity_node_from_record, _entity_edge_from_record, _episodic_node_from_record, and _community_node_from_record across 10 files in both Neo4j and FalkorDB operations - Fix MAX_QUERY_LENGTH inconsistency in FalkorDB search_ops build_fulltext_query (was 8000, now uses module constant 128) - Make namespace attributes unconditional with NotImplementedError for drivers that don't implement required operations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
KuzuDriver doesn't implement the new operations interfaces, so the NotImplementedError on init broke Kuzu tests. Now attributes are only set when the driver provides them, and __getattr__ gives a clear error on access. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
QueryExecutorandTransactionABCs for database-agnostic query execution, plus 11 operations ABCs covering entity/episode/community nodes, entity/episodic/community/has_episode/next_episode edges, search, and graph maintenancevecf32()embeddings for FalkorDB; Lucene fulltext, native vector index for Neo4j), andNodeNamespace/EdgeNamespaceconvenience wrappersArchitecture
Each driver now exposes operations via typed properties (e.g.,
driver.entity_node_ops,driver.search_ops), all accepting the driver itself as aQueryExecutorand an optionalTransactionfor write operations.Test plan
ruff checkpasses on all new and modified filespyrightreports 0 new errors across the entiregraphiti_core/directory__init__.py)🤖 Generated with Claude Code