Skip to content

Commit d6d59ce

Browse files
committed
feat: optimize LanceDB compaction and FTS indexing
- Refactored LanceGraphDB to use native ds.optimize() for compaction and snapshot pruning. - Automated FTS index creation in _ensure_schema. - Added configurable cleanup_older_than_days property. - Added pytest case to verify FTS index effectiveness and optimization.
1 parent 5abd660 commit d6d59ce

3 files changed

Lines changed: 220 additions & 1 deletion

File tree

src/memos/configs/graph_db.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,15 @@ class LanceGraphDBConfig(BaseConfig):
252252
description="Logical user or tenant ID for data isolation",
253253
)
254254
embedding_dimension: int = Field(default=768, description="Dimension of vector embedding")
255+
compaction_version_threshold: int = Field(
256+
default=500, description="Number of new versions to accumulate before triggering compaction"
257+
)
258+
compaction_interval_mins: int = Field(
259+
default=30, description="Fallback interval in minutes to check and run compaction"
260+
)
261+
cleanup_older_than_days: int = Field(
262+
default=7, description="Number of days to keep old versions before pruning"
263+
)
255264

256265

257266
class GraphDBConfigFactory(BaseModel):

src/memos/graph_dbs/lance.py

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22

33
import json
44
import os
5+
import threading
6+
import time
57
import uuid
68

7-
from datetime import datetime
9+
from datetime import datetime, timedelta
810
from typing import TYPE_CHECKING, Any
911

1012

@@ -35,11 +37,123 @@ def __init__(self, config: LanceGraphDBConfig):
3537
self.user_name = config.user_name or "default"
3638
self.dim = config.embedding_dimension
3739

40+
# Compaction settings
41+
self.compaction_version_threshold = config.compaction_version_threshold
42+
self.compaction_interval_mins = config.compaction_interval_mins
43+
self.cleanup_older_than_days = config.cleanup_older_than_days
44+
3845
self.memories_uri = os.path.join(self.uri, "memories")
3946
self.edges_uri = os.path.join(self.uri, "edges")
4047

4148
self._init_schema()
4249

50+
# Start LanceDB background optimizer thread
51+
self._last_compact_versions = {
52+
"memories": self._get_memories_table().version,
53+
"edges": self._get_edges_table().version,
54+
}
55+
self._optimizer_thread = threading.Thread(
56+
target=self._db_optimizer_loop,
57+
daemon=True,
58+
name="lancedb-optimizer",
59+
)
60+
self._optimizer_thread.start()
61+
62+
def _db_optimizer_loop(self):
63+
"""Background loop to periodically trigger table optimization."""
64+
import schedule
65+
66+
schedule.every(self.compaction_interval_mins).minutes.do(self._force_optimize)
67+
68+
logger.info(
69+
f"Started LanceDB optimizer thread. Compaction interval: {self.compaction_interval_mins}m, "
70+
f"Version threshold: {self.compaction_version_threshold}"
71+
)
72+
73+
while True:
74+
try:
75+
# 1. Check version threshold
76+
self._check_and_trigger_compaction()
77+
78+
# 2. Run scheduled fallback compaction
79+
schedule.run_pending()
80+
except Exception as e:
81+
logger.error(f"Error in LanceDB optimizer loop: {e}", stack_info=True)
82+
83+
time.sleep(5) # Avoid busy waiting
84+
85+
def _check_and_trigger_compaction(self):
86+
"""Trigger compaction if any table's version diff exceeds the threshold."""
87+
try:
88+
memories_ds = self._get_memories_table()
89+
if (
90+
memories_ds.version - self._last_compact_versions["memories"]
91+
> self.compaction_version_threshold
92+
):
93+
self._optimize_table("memories", memories_ds)
94+
95+
edges_ds = self._get_edges_table()
96+
if (
97+
edges_ds.version - self._last_compact_versions["edges"]
98+
> self.compaction_version_threshold
99+
):
100+
self._optimize_table("edges", edges_ds)
101+
except Exception as e:
102+
logger.error(f"Failed to check compaction versions: {e}")
103+
104+
def _optimize_table(self, table_name: str, ds):
105+
"""Helper method to optimize a specific LanceDB table."""
106+
try:
107+
current_version = ds.version
108+
last_version = self._last_compact_versions[table_name]
109+
110+
if current_version > last_version:
111+
logger.info(
112+
f"Triggering LanceDB optimization for '{table_name}'. "
113+
f"Current version: {current_version}, Last compacted: {last_version}"
114+
)
115+
116+
stats = ds.optimize(cleanup_older_than=timedelta(days=self.cleanup_older_than_days))
117+
118+
stats_msg = ""
119+
if stats:
120+
compaction = getattr(stats, "compaction", None)
121+
if compaction:
122+
stats_msg += (
123+
f" | Compaction: "
124+
f"-{getattr(compaction, 'fragments_removed', 0)}/"
125+
f"+{getattr(compaction, 'fragments_added', 0)} fragments, "
126+
f"-{getattr(compaction, 'files_removed', 0)}/"
127+
f"+{getattr(compaction, 'files_added', 0)} files"
128+
)
129+
130+
prune = getattr(stats, "prune", None)
131+
if prune:
132+
stats_msg += (
133+
f" | Prune: -{getattr(prune, 'bytes_removed', 0)} bytes, "
134+
f"-{getattr(prune, 'old_versions_removed', 0)} versions"
135+
)
136+
137+
# Reload the table to get the updated version after optimization
138+
if table_name == "memories":
139+
ds = self._get_memories_table()
140+
elif table_name == "edges":
141+
ds = self._get_edges_table()
142+
143+
self._last_compact_versions[table_name] = ds.version
144+
logger.info(
145+
f"LanceDB '{table_name}' optimization completed successfully. "
146+
f"New version: {self._last_compact_versions[table_name]}{stats_msg}"
147+
)
148+
except Exception as e:
149+
logger.error(f"LanceDB '{table_name}' optimization failed: {e}")
150+
151+
def _force_optimize(self):
152+
# Optimize Memories Table
153+
self._optimize_table("memories", self._get_memories_table())
154+
# Optimize Edges Table
155+
self._optimize_table("edges", self._get_edges_table())
156+
43157
def _init_schema(self):
44158
import lancedb
45159
import pyarrow as pa
@@ -74,6 +188,34 @@ def _init_schema(self):
74188
self.db.create_table("memories", data=empty_table)
75189
logger.info("Created LanceDB table for memories.")
76190

191+
try:
192+
ds = self.db.open_table("memories")
193+
194+
# Create vector index (aligned with memory-lancedb TS implementation)
195+
import math
196+
197+
row_count = ds.count_rows()
198+
if row_count > 256: # LanceDB requires at least 256 rows to train vector index
199+
num_partitions = max(1, math.floor(math.sqrt(row_count)))
200+
ds.create_index(
201+
metric="cosine",
202+
vector_column_name="embedding",
203+
num_partitions=num_partitions,
204+
)
205+
logger.info(
206+
f"Created IVF_FLAT index for memories.embedding with metric=cosine, partitions={num_partitions}"
207+
)
208+
else:
209+
logger.debug(
210+
f"Skipping vector index creation, not enough rows ({row_count} <= 256)"
211+
)
212+
213+
# Create full-text search index
214+
ds.create_fts_index("memory", replace=True)
215+
logger.info("Created FTS index for memories.memory")
216+
except Exception as e:
217+
logger.warning(f"Failed to create LanceDB indices: {e}")
218+
77219
if "edges" not in table_names:
78220
edge_schema = pa.schema(
79221
[

tests/graph_dbs/test_lance.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,5 +129,73 @@ def test_lance_graph_db():
129129
print("Test finished successfully in temporary directory!")
130130

131131

132+
def test_lance_compaction_and_fts_effectiveness():
133+
"""
134+
Test the effectiveness of the LanceDB _optimize_table mechanism,
135+
including compaction of small files and FTS index functionality.
136+
"""
137+
import tempfile
138+
139+
with tempfile.TemporaryDirectory() as tmpdir:
140+
db_uri = os.path.join(tmpdir, "test_lancedb_compaction")
141+
# Use a low threshold to force triggering
142+
config = LanceGraphDBConfig(
143+
uri=db_uri, user_name="test_user", embedding_dimension=3, compaction_version_threshold=2
144+
)
145+
db = LanceGraphDB(config)
146+
147+
# 1. Insert multiple single nodes to create small fragments
148+
print("\nInserting 5 separate fragments...")
149+
for i in range(5):
150+
node = {
151+
"id": f"node_c_{i}",
152+
"memory": f"Alice went to the magical forest number {i}",
153+
"metadata": {"memory_type": "LongTermMemory", "status": "activated"},
154+
"embedding": [0.1 * i, 0.2 * i, 0.3 * i],
155+
}
156+
db.add_nodes_batch([node])
157+
158+
import lance
159+
160+
ds = lance.dataset(os.path.join(db_uri, "memories.lance"))
161+
fragments_before = len(ds.get_fragments())
162+
print(f"Fragments BEFORE optimize: {fragments_before}")
163+
164+
# 2. Test FTS before optimization
165+
try:
166+
res_fts_before = db.search_by_fulltext(["magical"], top_k=10)
167+
print(f"FTS hits BEFORE optimize: {len(res_fts_before)}")
168+
except Exception as e:
169+
print(f"FTS failed before optimize: {e}")
170+
171+
# 3. Force the internal optimizer
172+
print("Forcing LanceDB optimizer...")
173+
db._force_optimize()
174+
175+
ds = lance.dataset(os.path.join(db_uri, "memories.lance"))
176+
fragments_after = len(ds.get_fragments())
177+
print(f"Fragments AFTER optimize: {fragments_after}")
178+
179+
# 5. Verify FTS index effectiveness after optimization
180+
res_fts_after = db.search_by_fulltext(["magical"], top_k=10)
181+
assert len(res_fts_after) == 5, (
182+
f"FTS should recall all 5 nodes, but got {len(res_fts_after)}"
183+
)
184+
print(f"FTS hits AFTER optimize: {len(res_fts_after)}")
185+
186+
# 6. Test prune/delete
187+
db.delete_node("node_c_0")
188+
db._force_optimize()
189+
190+
res_fts_deleted = db.search_by_fulltext(["magical"], top_k=10)
191+
assert len(res_fts_deleted) == 4, (
192+
f"FTS should recall 4 nodes after deletion, got {len(res_fts_deleted)}"
193+
)
194+
print(f"FTS hits AFTER deletion and optimize: {len(res_fts_deleted)}")
195+
196+
db.clear()
197+
198+
132199
if __name__ == "__main__":
133200
test_lance_graph_db()
201+
test_lance_compaction_and_fts_effectiveness()

0 commit comments

Comments
 (0)