Skip to content

Commit 75fb8f9

Browse files
authored
rebuild BM25 from nodes and harden index invariants (#883)
<!-- greptile_comment --> <h3>Greptile Summary</h3> This PR hardens the BM25 full-text search subsystem by introducing a **reverse index** (`doc_id → list of (term, tf)`) alongside the existing inverted index, enabling correct and efficient delete/update operations without re-scanning the inverted index. It also adds **schema versioning** so that on startup, if the stored index was built without the reverse index, the migration layer automatically clears and rebuilds the entire BM25 index from `nodes_db`. The `add_n`, `update`, and `upsert` traversal operators are updated to maintain BM25 invariants as nodes are created or changed. Key changes: - **`bm25.rs`**: Adds `ReversePostingEntry`, `DocPresence`/`DocState` enums, `reverse_index_db` (DUP_SORT), schema version read/write, and refactored `insert_doc` / `delete_doc` / `update_doc` that enforce strict consistency invariants (returns errors on index corruption). - **`storage_migration.rs`**: `migrate_bm25` clears and rebuilds the BM25 index in 1 024-node batches when the stored schema version doesn't match `BM25_SCHEMA_VERSION = 2`. - **`add_n.rs`**: `bm25.insert_doc` is called after writing the node — but without a guard that checks whether the `nodes_db` write succeeded, allowing BM25 errors to mask prior errors. - **`update.rs` / `upsert.rs`**: BM25 is correctly updated before (update) or after (upsert new node) the `nodes_db` write, with proper error propagation. - Comprehensive new tests cover the reverse index, schema migration, update-to-searchable, and invariant enforcement. <details><summary><h3>Important Files Changed</h3></summary> | Filename | Overview | |----------|----------| | helix-db/src/helix_engine/bm25/bm25.rs | Core BM25 rewrite adding a reverse index (doc_id → terms) and schema versioning. Logic is sound overall; minor duplication between reverse_entries and reverse_entries_rw. | | helix-db/src/helix_engine/traversal_core/ops/source/add_n.rs | BM25 insert_doc is called unconditionally after nodes_db write, even on prior failure — can mask the original error if BM25 also fails. | | helix-db/src/helix_engine/storage_core/storage_migration.rs | Adds migrate_bm25 that clears and rebuilds the BM25 index from nodes_db. Holding read_txn alive across batch write transactions can cause LMDB freelist growth; nodes without properties are silently skipped (consistent with add_n but deserves a comment). | | helix-db/src/helix_engine/traversal_core/ops/util/update.rs | Correctly calls bm25.update_doc before nodes_db write; BM25 error short-circuits the node save, and both are in the same transaction so rollback is safe. | | helix-db/src/helix_engine/traversal_core/ops/util/upsert.rs | BM25 insert_doc (new node) and update_doc (existing node) are correctly integrated into upsert_n; error propagation follows the existing ? pattern. | </details> </details> <details><summary><h3>Sequence Diagram</h3></summary> ```mermaid sequenceDiagram participant Caller participant migrate_bm25 participant BM25Index participant nodes_db participant add_n / update / upsert Note over Caller, BM25Index: Startup Migration Path Caller->>migrate_bm25: migrate(storage) migrate_bm25->>BM25Index: schema_version(read_txn) alt version matches BM25_SCHEMA_VERSION BM25Index-->>migrate_bm25: Some(2) — skip else outdated or missing migrate_bm25->>BM25Index: clear_all(write_txn) migrate_bm25->>nodes_db: iter(read_txn) — batch by 1024 loop Each batch migrate_bm25->>BM25Index: insert_doc per node with properties (write_txn) end migrate_bm25->>BM25Index: write_schema_version(2, write_txn) end Note over Caller, BM25Index: Normal Write Path Caller->>add_n / update / upsert: write op (RwTxn) add_n / update / upsert->>nodes_db: put node add_n / update / upsert->>BM25Index: insert_doc / update_doc / delete_doc BM25Index->>BM25Index: update inverted_index_db (term→postings) BM25Index->>BM25Index: update reverse_index_db (doc_id→terms) BM25Index->>BM25Index: update doc_lengths_db BM25Index->>BM25Index: update term_frequencies_db BM25Index->>BM25Index: update metadata (total_docs, avgdl) add_n / update / upsert-->>Caller: Result<TraversalValue> ``` </details> <!-- greptile_failed_comments --> <details><summary><h3>Comments Outside Diff (1)</h3></summary> 1. `helix-db/src/helix_engine/traversal_core/ops/source/add_n.rs`, line 141-148 ([link](https://github.com/helixdb/helix-db/blob/8d28406c1ec70c56ba3100205080a9bb7a9ea17c/helix-db/src/helix_engine/traversal_core/ops/source/add_n.rs#L141-L148)) **BM25 insert runs unconditionally, can mask prior errors** The BM25 `insert_doc` call (lines 141-148) runs even when a previous operation failed (secondary-index insertion or `nodes_db.put_with_flags`). If `bm25.insert_doc` then also fails, it silently overwrites the original error stored in `result`, making the caller receive a BM25 error instead of the real `nodes_db` or secondary-index error. Adding an early-exit guard fixes both the error masking and the wasted BM25 work: </details> <!-- /greptile_failed_comments --> <sub>Last reviewed commit: 8d28406</sub> <!-- /greptile_comment -->
2 parents 12e530b + 8d28406 commit 75fb8f9

File tree

9 files changed

+1193
-179
lines changed

9 files changed

+1193
-179
lines changed

helix-db/src/helix_engine/bm25/bm25.rs

Lines changed: 443 additions & 138 deletions
Large diffs are not rendered by default.

helix-db/src/helix_engine/bm25/bm25_tests.rs

Lines changed: 389 additions & 1 deletion
Large diffs are not rendered by default.

helix-db/src/helix_engine/storage_core/storage_migration.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use crate::{
22
helix_engine::{
3+
bm25::bm25::{BM25, BM25_SCHEMA_VERSION, build_bm25_payload},
34
storage_core::HelixGraphStorage,
45
types::GraphError,
56
vector_core::{vector::HVector, vector_core},
67
},
78
protocol::value::Value,
8-
utils::properties::ImmutablePropertiesMap,
9+
utils::{items::Node, properties::ImmutablePropertiesMap},
910
};
1011
use bincode::Options;
1112
use itertools::Itertools;
@@ -38,10 +39,79 @@ pub fn migrate(storage: &mut HelixGraphStorage) -> Result<(), GraphError> {
3839

3940
verify_vectors_and_repair(storage)?;
4041
remove_orphaned_vector_edges(storage)?;
42+
migrate_bm25(storage)?;
4143

4244
Ok(())
4345
}
4446

47+
fn migrate_bm25(storage: &mut HelixGraphStorage) -> Result<(), GraphError> {
48+
const BATCH_SIZE: usize = 1024;
49+
50+
let Some(bm25) = storage.bm25.as_ref() else {
51+
return Ok(());
52+
};
53+
54+
let current_schema_version = {
55+
let txn = storage.graph_env.read_txn()?;
56+
bm25.schema_version(&txn)?
57+
};
58+
59+
if current_schema_version == Some(BM25_SCHEMA_VERSION) {
60+
return Ok(());
61+
}
62+
63+
{
64+
let mut txn = storage.graph_env.write_txn()?;
65+
bm25.clear_all(&mut txn)?;
66+
txn.commit()?;
67+
}
68+
69+
let read_txn = storage.graph_env.read_txn()?;
70+
let mut batch = Vec::with_capacity(BATCH_SIZE);
71+
72+
for kv in storage.nodes_db.iter(&read_txn)? {
73+
let (id, value) = kv?;
74+
batch.push((id, value.to_vec()));
75+
76+
if batch.len() == BATCH_SIZE {
77+
rebuild_bm25_batch(storage, bm25, &batch)?;
78+
batch.clear();
79+
}
80+
}
81+
82+
drop(read_txn);
83+
84+
if !batch.is_empty() {
85+
rebuild_bm25_batch(storage, bm25, &batch)?;
86+
}
87+
88+
let mut txn = storage.graph_env.write_txn()?;
89+
bm25.write_schema_version(&mut txn, BM25_SCHEMA_VERSION)?;
90+
txn.commit()?;
91+
92+
Ok(())
93+
}
94+
95+
fn rebuild_bm25_batch(
96+
storage: &HelixGraphStorage,
97+
bm25: &crate::helix_engine::bm25::bm25::HBM25Config,
98+
batch: &[(u128, Vec<u8>)],
99+
) -> Result<(), GraphError> {
100+
let arena = bumpalo::Bump::new();
101+
let mut txn = storage.graph_env.write_txn()?;
102+
103+
for (id, value) in batch {
104+
let node = Node::from_bincode_bytes(*id, value, &arena)?;
105+
if let Some(properties) = node.properties.as_ref() {
106+
let data = build_bm25_payload(properties, node.label);
107+
bm25.insert_doc(&mut txn, *id, &data)?;
108+
}
109+
}
110+
111+
txn.commit()?;
112+
Ok(())
113+
}
114+
45115
pub(crate) fn migrate_pre_metadata_to_native_vector_endianness(
46116
storage: &mut HelixGraphStorage,
47117
) -> Result<StorageMetadata, GraphError> {

helix-db/src/helix_engine/storage_core/storage_migration_tests.rs

Lines changed: 186 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,29 @@
99
//! - Performance tests for large datasets
1010
1111
use super::{
12-
metadata::{StorageMetadata, VectorEndianness, NATIVE_VECTOR_ENDIANNESS},
12+
HelixGraphStorage,
13+
metadata::{NATIVE_VECTOR_ENDIANNESS, StorageMetadata, VectorEndianness},
1314
storage_migration::{
1415
convert_all_vector_properties, convert_old_vector_properties_to_new_format,
1516
convert_vector_endianness, migrate,
1617
},
17-
HelixGraphStorage,
1818
};
1919
use crate::{
2020
helix_engine::{
21-
storage_core::version_info::VersionInfo, traversal_core::config::Config, types::GraphError,
21+
bm25::bm25::{
22+
BM25, BM25_SCHEMA_VERSION, BM25_SCHEMA_VERSION_KEY, BM25Metadata, METADATA_KEY,
23+
},
24+
storage_core::version_info::VersionInfo,
25+
traversal_core::{
26+
config::Config,
27+
ops::{g::G, source::add_n::AddNAdapter},
28+
},
29+
types::GraphError,
2230
},
2331
protocol::value::Value,
32+
utils::{items::Node, properties::ImmutablePropertiesMap},
2433
};
34+
use bumpalo::Bump;
2535
use std::collections::HashMap;
2636
use tempfile::TempDir;
2737

@@ -169,6 +179,45 @@ fn clear_metadata(storage: &mut HelixGraphStorage) -> Result<(), GraphError> {
169179
Ok(())
170180
}
171181

182+
fn add_test_node(
183+
storage: &HelixGraphStorage,
184+
label: &'static str,
185+
properties: &[(&'static str, Value)],
186+
) -> u128 {
187+
let arena = Bump::new();
188+
let properties = if properties.is_empty() {
189+
None
190+
} else {
191+
Some(ImmutablePropertiesMap::new(
192+
properties.len(),
193+
properties.iter().map(|(key, value)| (*key, value.clone())),
194+
&arena,
195+
))
196+
};
197+
let mut txn = storage.graph_env.write_txn().unwrap();
198+
let node = G::new_mut(storage, &arena, &mut txn)
199+
.add_n(label, properties, None)
200+
.collect_to_obj()
201+
.unwrap();
202+
let node_id = node.id();
203+
txn.commit().unwrap();
204+
node_id
205+
}
206+
207+
fn bm25_search_ids(storage: &HelixGraphStorage, query: &str) -> Vec<u128> {
208+
let arena = Bump::new();
209+
let txn = storage.graph_env.read_txn().unwrap();
210+
storage
211+
.bm25
212+
.as_ref()
213+
.unwrap()
214+
.search(&txn, query, 10, &arena)
215+
.unwrap()
216+
.into_iter()
217+
.map(|(id, _)| id)
218+
.collect()
219+
}
220+
172221
// ============================================================================
173222
// Unit Tests: Endianness Conversion
174223
// ============================================================================
@@ -960,6 +1009,140 @@ fn test_error_handling_graceful_failure() {
9601009
assert_eq!(count, 11); // 10 valid + 1 invalid
9611010
}
9621011

1012+
#[test]
1013+
fn test_bm25_migration_rerun_is_noop_once_schema_written() {
1014+
let (mut storage, _temp_dir) = setup_test_storage();
1015+
let node_id = add_test_node(&storage, "person", &[("name", Value::from("stable_term"))]);
1016+
1017+
let before_metadata = {
1018+
let txn = storage.graph_env.read_txn().unwrap();
1019+
let bm25 = storage.bm25.as_ref().unwrap();
1020+
assert_eq!(
1021+
bm25.schema_version(&txn).unwrap(),
1022+
Some(BM25_SCHEMA_VERSION)
1023+
);
1024+
bm25.metadata_db
1025+
.get(&txn, METADATA_KEY)
1026+
.unwrap()
1027+
.map(|bytes| bytes.to_vec())
1028+
};
1029+
let before_results = bm25_search_ids(&storage, "stable_term");
1030+
assert_eq!(before_results, vec![node_id]);
1031+
1032+
migrate(&mut storage).unwrap();
1033+
1034+
let after_metadata = {
1035+
let txn = storage.graph_env.read_txn().unwrap();
1036+
let bm25 = storage.bm25.as_ref().unwrap();
1037+
assert_eq!(
1038+
bm25.schema_version(&txn).unwrap(),
1039+
Some(BM25_SCHEMA_VERSION)
1040+
);
1041+
bm25.metadata_db
1042+
.get(&txn, METADATA_KEY)
1043+
.unwrap()
1044+
.map(|bytes| bytes.to_vec())
1045+
};
1046+
let after_results = bm25_search_ids(&storage, "stable_term");
1047+
1048+
assert_eq!(after_results, vec![node_id]);
1049+
assert_eq!(before_results, after_results);
1050+
assert_eq!(before_metadata, after_metadata);
1051+
}
1052+
1053+
#[test]
1054+
fn test_bm25_migration_repairs_stale_node_index() {
1055+
let (mut storage, _temp_dir) = setup_test_storage();
1056+
let node_id = add_test_node(&storage, "person", &[("name", Value::from("legacyalpha"))]);
1057+
1058+
assert_eq!(bm25_search_ids(&storage, "legacyalpha"), vec![node_id]);
1059+
assert!(bm25_search_ids(&storage, "freshomega").is_empty());
1060+
1061+
{
1062+
let arena = Bump::new();
1063+
let mut txn = storage.graph_env.write_txn().unwrap();
1064+
let node_bytes = storage
1065+
.nodes_db
1066+
.get(&txn, &node_id)
1067+
.unwrap()
1068+
.unwrap()
1069+
.to_vec();
1070+
let mut node = Node::from_bincode_bytes(node_id, &node_bytes, &arena).unwrap();
1071+
node.properties = Some(ImmutablePropertiesMap::new(
1072+
1,
1073+
std::iter::once(("name", Value::from("freshomega"))),
1074+
&arena,
1075+
));
1076+
1077+
let updated_bytes = node.to_bincode_bytes().unwrap();
1078+
storage
1079+
.nodes_db
1080+
.put(&mut txn, &node_id, &updated_bytes)
1081+
.unwrap();
1082+
storage
1083+
.bm25
1084+
.as_ref()
1085+
.unwrap()
1086+
.metadata_db
1087+
.put(&mut txn, BM25_SCHEMA_VERSION_KEY, &0u64.to_le_bytes())
1088+
.unwrap();
1089+
txn.commit().unwrap();
1090+
}
1091+
1092+
assert_eq!(bm25_search_ids(&storage, "legacyalpha"), vec![node_id]);
1093+
assert!(bm25_search_ids(&storage, "freshomega").is_empty());
1094+
1095+
migrate(&mut storage).unwrap();
1096+
1097+
assert!(bm25_search_ids(&storage, "legacyalpha").is_empty());
1098+
assert_eq!(bm25_search_ids(&storage, "freshomega"), vec![node_id]);
1099+
1100+
let txn = storage.graph_env.read_txn().unwrap();
1101+
assert_eq!(
1102+
storage.bm25.as_ref().unwrap().schema_version(&txn).unwrap(),
1103+
Some(BM25_SCHEMA_VERSION)
1104+
);
1105+
}
1106+
1107+
#[test]
1108+
fn test_bm25_migration_drops_legacy_direct_docs() {
1109+
let (mut storage, _temp_dir) = setup_test_storage();
1110+
let node_id = add_test_node(&storage, "person", &[("name", Value::from("nodeonlyterm"))]);
1111+
1112+
{
1113+
let mut txn = storage.graph_env.write_txn().unwrap();
1114+
let bm25 = storage.bm25.as_ref().unwrap();
1115+
bm25.insert_doc(&mut txn, 999u128, "legacyvectorterm")
1116+
.unwrap();
1117+
bm25.metadata_db
1118+
.put(&mut txn, BM25_SCHEMA_VERSION_KEY, &0u64.to_le_bytes())
1119+
.unwrap();
1120+
txn.commit().unwrap();
1121+
}
1122+
1123+
assert_eq!(bm25_search_ids(&storage, "legacyvectorterm"), vec![999u128]);
1124+
assert_eq!(bm25_search_ids(&storage, "nodeonlyterm"), vec![node_id]);
1125+
1126+
migrate(&mut storage).unwrap();
1127+
1128+
assert!(bm25_search_ids(&storage, "legacyvectorterm").is_empty());
1129+
assert_eq!(bm25_search_ids(&storage, "nodeonlyterm"), vec![node_id]);
1130+
1131+
let txn = storage.graph_env.read_txn().unwrap();
1132+
let metadata: BM25Metadata = bincode::deserialize(
1133+
storage
1134+
.bm25
1135+
.as_ref()
1136+
.unwrap()
1137+
.metadata_db
1138+
.get(&txn, METADATA_KEY)
1139+
.unwrap()
1140+
.unwrap(),
1141+
)
1142+
.unwrap();
1143+
assert_eq!(metadata.total_docs, 1);
1144+
}
1145+
9631146
// ============================================================================
9641147
// Performance Tests
9651148
// ============================================================================

helix-db/src/helix_engine/tests/traversal_tests/update_tests.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
storage_core::HelixGraphStorage,
1010
traversal_core::{
1111
ops::{
12+
bm25::search_bm25::SearchBM25Adapter,
1213
g::G,
1314
source::{add_n::AddNAdapter, n_from_id::NFromIdAdapter},
1415
util::update::UpdateAdapter,
@@ -90,3 +91,43 @@ fn test_update_node() {
9091
other => panic!("unexpected traversal value: {other:?}"),
9192
}
9293
}
94+
95+
#[test]
96+
fn test_update_node_without_prior_bm25_doc_becomes_searchable() {
97+
let (_temp_dir, storage) = setup_test_db();
98+
let arena = Bump::new();
99+
let mut txn = storage.graph_env.write_txn().unwrap();
100+
101+
let node = G::new_mut(&storage, &arena, &mut txn)
102+
.add_n("person", None, None)
103+
.collect_to_obj()
104+
.unwrap();
105+
txn.commit().unwrap();
106+
107+
let arena = Bump::new();
108+
let txn = storage.graph_env.read_txn().unwrap();
109+
let traversal = G::new(&storage, &txn, &arena)
110+
.n_from_id(&node.id())
111+
.collect::<Result<Vec<_>, _>>()
112+
.unwrap();
113+
drop(txn);
114+
115+
let arena = Bump::new();
116+
let mut txn = storage.graph_env.write_txn().unwrap();
117+
G::new_mut_from_iter(&storage, &mut txn, traversal.into_iter(), &arena)
118+
.update(&[("name", Value::from("bm25_searchable"))])
119+
.collect::<Result<Vec<_>, _>>()
120+
.unwrap();
121+
txn.commit().unwrap();
122+
123+
let arena = Bump::new();
124+
let txn = storage.graph_env.read_txn().unwrap();
125+
let results = G::new(&storage, &txn, &arena)
126+
.search_bm25("person", "bm25_searchable", 10)
127+
.unwrap()
128+
.collect::<Result<Vec<_>, _>>()
129+
.unwrap();
130+
131+
assert_eq!(results.len(), 1);
132+
assert_eq!(results[0].id(), node.id());
133+
}

0 commit comments

Comments
 (0)