Skip to content

Commit 968b318

Browse files
committed
feat: add complete MVCC (Multi-Version Concurrency Control) implementation
Add comprehensive MVCC system with sequence-based versioning: - Multi-namespace support with ViewNameSpace trait - Read-write transactions via View with Read Committed isolation - Read-only transactions via ViewReadonly with Snapshot isolation - Staged changes with commit protocol - Tombstone-based deletion system - Stream-based range queries for memory efficiency - Namespace-scoped views for ergonomic API usage Key features: - Type-safe generic design with ViewKey/ViewValue traits - Table abstraction with insertion constraints and conflict detection - Utility functions for key-value result sorting and merging - Comprehensive isolation level documentation - Test coverage for non-incrementing namespaces (secondary indices)
1 parent f9b7ee4 commit 968b318

23 files changed

+6005
-11
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
[package]
22
name = "map-api"
33
description = "Raft state machine"
4-
version = "0.2.7"
4+
version = "0.3.0"
55
authors = ["Databend Authors <[email protected]>"]
66
license = "Apache-2.0"
77
edition = "2021"
@@ -14,7 +14,7 @@ deepsize = { version = "0.2.0" }
1414
futures = "0.3.24"
1515
futures-util = "0.3.24"
1616
log = { version = "0.4.21", features = ["serde", "kv_unstable_std"] }
17-
seq-marked = { version = "0.3.1"}
17+
seq-marked = { version = "0.3.3"}
1818
serde = { version = "1.0.164", features = ["derive", "rc"] }
1919
stream-more = { version = "0.1.3" }
2020
thiserror = { version = "1" }

src/compact.rs

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,15 @@ use crate::MapApiRO;
2626
use crate::MapKey;
2727
use crate::SeqMarked;
2828

29+
/// Return the newer if it is a `not-found` record.
30+
pub fn compact_seq_marked_pair<T>(newer: SeqMarked<T>, older: SeqMarked<T>) -> SeqMarked<T> {
31+
if newer.is_not_found() {
32+
older
33+
} else {
34+
newer
35+
}
36+
}
37+
2938
/// Get a key from multi levels data.
3039
///
3140
/// Returns the first non-tombstone entry.
@@ -116,13 +125,37 @@ mod tests {
116125

117126
use futures_util::TryStreamExt;
118127

128+
use super::*;
119129
use crate::compact::compacted_get;
120130
use crate::compact::compacted_range;
121131
use crate::impls::immutable::Immutable;
122132
use crate::impls::level::Level;
123133
use crate::MapApi;
124134
use crate::SeqMarked;
125135

136+
#[test]
137+
fn test_compact_seq_marked_pair() {
138+
assert_eq!(
139+
compact_seq_marked_pair(SeqMarked::new_normal(1, "a"), SeqMarked::new_normal(2, "b")),
140+
SeqMarked::new_normal(1, "a")
141+
);
142+
assert_eq!(
143+
compact_seq_marked_pair(SeqMarked::new_normal(1, "a"), SeqMarked::new_tombstone(2)),
144+
SeqMarked::new_normal(1, "a")
145+
);
146+
assert_eq!(
147+
compact_seq_marked_pair(SeqMarked::new_tombstone(1), SeqMarked::new_normal(2, "b")),
148+
SeqMarked::new_tombstone(1)
149+
);
150+
assert_eq!(
151+
compact_seq_marked_pair(
152+
SeqMarked::<()>::new_tombstone(0),
153+
SeqMarked::new_tombstone(2)
154+
),
155+
SeqMarked::new_tombstone(2)
156+
);
157+
}
158+
126159
#[tokio::test]
127160
async fn test_compacted_get() -> anyhow::Result<()> {
128161
let mut l0 = Level::default();
@@ -248,4 +281,173 @@ mod tests {
248281
fn b(x: impl ToString) -> Vec<u8> {
249282
x.to_string().as_bytes().to_vec()
250283
}
284+
285+
#[test]
286+
fn test_compact_seq_marked_pair_edge_cases() {
287+
// Not found newer should return older
288+
assert_eq!(
289+
compact_seq_marked_pair(
290+
SeqMarked::<String>::new_not_found(),
291+
SeqMarked::new_normal(5, "older".to_string())
292+
),
293+
SeqMarked::new_normal(5, "older".to_string())
294+
);
295+
296+
// Not found newer with tombstone older
297+
assert_eq!(
298+
compact_seq_marked_pair(
299+
SeqMarked::<String>::new_not_found(),
300+
SeqMarked::new_tombstone(3)
301+
),
302+
SeqMarked::new_tombstone(3)
303+
);
304+
305+
// Not found both
306+
assert_eq!(
307+
compact_seq_marked_pair(
308+
SeqMarked::<String>::new_not_found(),
309+
SeqMarked::<String>::new_not_found()
310+
),
311+
SeqMarked::<String>::new_not_found()
312+
);
313+
}
314+
315+
#[tokio::test]
316+
async fn test_compacted_get_empty_levels() -> anyhow::Result<()> {
317+
// No levels at all
318+
let got = compacted_get::<String, Level, Level>(&s("missing"), [], []).await?;
319+
assert!(got.is_not_found());
320+
Ok(())
321+
}
322+
323+
#[tokio::test]
324+
async fn test_compacted_get_key_not_found() -> anyhow::Result<()> {
325+
let mut l0 = Level::default();
326+
l0.set(s("a"), Some(b("a"))).await?;
327+
328+
let mut l1 = l0.new_level();
329+
l1.set(s("b"), Some(b("b"))).await?;
330+
331+
// Key not in any level
332+
let got = compacted_get::<String, _, Level>(&s("missing"), [&l0, &l1], []).await?;
333+
assert!(got.is_not_found());
334+
Ok(())
335+
}
336+
337+
#[tokio::test]
338+
async fn test_compacted_get_only_tombstones() -> anyhow::Result<()> {
339+
let mut l0 = Level::default();
340+
l0.set(s("a"), Some(b("a"))).await?;
341+
342+
let mut l1 = l0.new_level();
343+
l1.set(s("a"), None).await?; // Tombstone
344+
345+
let mut l2 = l1.new_level();
346+
l2.set(s("a"), None).await?; // Another tombstone
347+
348+
// Should find first tombstone
349+
let got = compacted_get::<String, _, Level>(&s("a"), [&l2, &l1], []).await?;
350+
assert_eq!(got, SeqMarked::new_tombstone(1));
351+
Ok(())
352+
}
353+
354+
#[tokio::test]
355+
async fn test_compacted_range_empty_levels() -> anyhow::Result<()> {
356+
// No levels at all
357+
let got = compacted_range::<String, _, Level, &Level, Level>(s("").., None, [], []).await?;
358+
let got = got.try_collect::<Vec<_>>().await?;
359+
assert!(got.is_empty());
360+
Ok(())
361+
}
362+
363+
#[tokio::test]
364+
async fn test_compacted_range_single_level() -> anyhow::Result<()> {
365+
let mut l0 = Level::default();
366+
l0.set(s("a"), Some(b("a"))).await?;
367+
l0.set(s("b"), Some(b("b"))).await?;
368+
369+
let got = compacted_range::<_, _, Level, _, Level>(s("").., None, [&l0], []).await?;
370+
let got = got.try_collect::<Vec<_>>().await?;
371+
assert_eq!(got, vec![
372+
(s("a"), SeqMarked::new_normal(1, b("a"))),
373+
(s("b"), SeqMarked::new_normal(2, b("b"))),
374+
]);
375+
Ok(())
376+
}
377+
378+
#[tokio::test]
379+
async fn test_compacted_range_with_persisted_only() -> anyhow::Result<()> {
380+
let mut l0 = Level::default();
381+
l0.set(s("a"), Some(b("a"))).await?;
382+
l0.set(s("b"), Some(b("b"))).await?;
383+
384+
// Only persisted levels, no in-memory levels
385+
let got =
386+
compacted_range::<String, _, Level, &Level, &Level>(s("").., None, [], [&l0]).await?;
387+
let got = got.try_collect::<Vec<_>>().await?;
388+
assert_eq!(got, vec![
389+
(s("a"), SeqMarked::new_normal(1, b("a"))),
390+
(s("b"), SeqMarked::new_normal(2, b("b"))),
391+
]);
392+
Ok(())
393+
}
394+
395+
#[tokio::test]
396+
async fn test_compacted_range_overlapping_keys() -> anyhow::Result<()> {
397+
// Test that newer versions override older ones
398+
let mut l0 = Level::default();
399+
l0.set(s("a"), Some(b("old_a"))).await?;
400+
l0.set(s("c"), Some(b("c"))).await?;
401+
402+
let mut l1 = l0.new_level();
403+
l1.set(s("a"), Some(b("new_a"))).await?; // Override
404+
l1.set(s("b"), Some(b("b"))).await?; // New key
405+
406+
let got = compacted_range::<_, _, Level, _, Level>(s("").., None, [&l1, &l0], []).await?;
407+
let got = got.try_collect::<Vec<_>>().await?;
408+
assert_eq!(got, vec![
409+
(s("a"), SeqMarked::new_normal(3, b("new_a"))), /* Newer version (sequence continues from l0) */
410+
(s("b"), SeqMarked::new_normal(4, b("b"))),
411+
(s("c"), SeqMarked::new_normal(2, b("c"))),
412+
]);
413+
Ok(())
414+
}
415+
416+
#[tokio::test]
417+
async fn test_compacted_range_bounded_range() -> anyhow::Result<()> {
418+
let mut l0 = Level::default();
419+
l0.set(s("a"), Some(b("a"))).await?;
420+
l0.set(s("b"), Some(b("b"))).await?;
421+
l0.set(s("c"), Some(b("c"))).await?;
422+
l0.set(s("d"), Some(b("d"))).await?;
423+
424+
// Test bounded range
425+
let got =
426+
compacted_range::<_, _, Level, _, Level>(s("b")..=s("c"), None, [&l0], []).await?;
427+
let got = got.try_collect::<Vec<_>>().await?;
428+
assert_eq!(got, vec![
429+
(s("b"), SeqMarked::new_normal(2, b("b"))),
430+
(s("c"), SeqMarked::new_normal(3, b("c"))),
431+
]);
432+
Ok(())
433+
}
434+
435+
#[tokio::test]
436+
async fn test_compacted_range_all_tombstones() -> anyhow::Result<()> {
437+
let mut l0 = Level::default();
438+
l0.set(s("a"), Some(b("a"))).await?;
439+
l0.set(s("b"), Some(b("b"))).await?;
440+
441+
let mut l1 = l0.new_level();
442+
l1.set(s("a"), None).await?; // Tombstone
443+
l1.set(s("b"), None).await?; // Tombstone
444+
445+
let got = compacted_range::<_, _, Level, _, Level>(s("").., None, [&l1], []).await?;
446+
let got = got.try_collect::<Vec<_>>().await?;
447+
assert_eq!(got, vec![
448+
(s("a"), SeqMarked::new_tombstone(2)), /* Both tombstones get same seq from the implementation */
449+
(s("b"), SeqMarked::new_tombstone(2)),
450+
]);
451+
Ok(())
452+
}
251453
}

src/impls/immutable.rs

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,3 +73,130 @@ impl MapApiRO<String> for Immutable<ValueOf<String>> {
7373
Ok(strm)
7474
}
7575
}
76+
77+
#[cfg(test)]
78+
mod tests {
79+
use futures_util::TryStreamExt;
80+
81+
use super::*;
82+
use crate::impls::level::Level;
83+
use crate::MapApi;
84+
use crate::SeqMarked;
85+
86+
#[test]
87+
fn test_immutable_new() {
88+
let level = Level::<Vec<u8>>::default();
89+
let immutable = Immutable::new(Arc::new(level));
90+
91+
// Test that we can access the inner level
92+
assert_eq!(Arc::strong_count(&immutable.level), 1);
93+
}
94+
95+
#[test]
96+
fn test_immutable_new_from_level() {
97+
let level = Level::<Vec<u8>>::default();
98+
let immutable = Immutable::new_from_level(level);
99+
100+
// Test that the immutable was created successfully
101+
assert_eq!(Arc::strong_count(&immutable.level), 1);
102+
}
103+
104+
#[test]
105+
fn test_immutable_as_ref() {
106+
let level = Level::<Vec<u8>>::default();
107+
let immutable = Immutable::new_from_level(level);
108+
109+
// Test AsRef trait
110+
let level_ref: &Level<Vec<u8>> = immutable.as_ref();
111+
let _: &Level<Vec<u8>> = level_ref; // Type assertion
112+
}
113+
114+
#[test]
115+
fn test_immutable_deref() {
116+
let level = Level::<Vec<u8>>::default();
117+
let immutable = Immutable::new_from_level(level);
118+
119+
// Test Deref trait - we should be able to access Level methods directly
120+
let _level_direct: &Level<Vec<u8>> = &immutable;
121+
}
122+
123+
#[tokio::test]
124+
async fn test_immutable_map_api_ro_get() {
125+
let mut level = Level::default();
126+
level
127+
.set(
128+
"test_key".to_string(),
129+
Some("test_value".as_bytes().to_vec()),
130+
)
131+
.await
132+
.unwrap();
133+
let immutable = Immutable::new_from_level(level);
134+
135+
// Test MapApiRO implementation
136+
let result = immutable.get(&"test_key".to_string()).await.unwrap();
137+
assert!(result.is_normal());
138+
assert_eq!(
139+
result,
140+
SeqMarked::new_normal(1, "test_value".as_bytes().to_vec())
141+
);
142+
}
143+
144+
#[tokio::test]
145+
async fn test_immutable_map_api_ro_get_nonexistent() {
146+
let level = Level::default();
147+
let immutable = Immutable::new_from_level(level);
148+
149+
let result = immutable.get(&"nonexistent".to_string()).await.unwrap();
150+
assert!(result.is_not_found());
151+
}
152+
153+
#[tokio::test]
154+
async fn test_immutable_map_api_ro_range() {
155+
let mut level = Level::default();
156+
level
157+
.set("key1".to_string(), Some("value1".as_bytes().to_vec()))
158+
.await
159+
.unwrap();
160+
level
161+
.set("key2".to_string(), Some("value2".as_bytes().to_vec()))
162+
.await
163+
.unwrap();
164+
let immutable = Immutable::new_from_level(level);
165+
166+
// Test range operation
167+
let range_result = immutable
168+
.range("key1".to_string().."key3".to_string())
169+
.await
170+
.unwrap();
171+
let items: Vec<_> = range_result.try_collect().await.unwrap();
172+
173+
assert_eq!(items.len(), 2);
174+
assert_eq!(items[0].0, "key1");
175+
assert_eq!(items[1].0, "key2");
176+
}
177+
178+
#[tokio::test]
179+
async fn test_immutable_map_api_ro_empty_range() {
180+
let level = Level::default();
181+
let immutable = Immutable::new_from_level(level);
182+
183+
let range_result = immutable
184+
.range("a".to_string().."z".to_string())
185+
.await
186+
.unwrap();
187+
let items: Vec<_> = range_result.try_collect().await.unwrap();
188+
189+
assert!(items.is_empty());
190+
}
191+
192+
#[test]
193+
fn test_immutable_clone() {
194+
let level = Level::<Vec<u8>>::default();
195+
let immutable1 = Immutable::new_from_level(level);
196+
let immutable2 = immutable1.clone();
197+
198+
// Test that both point to the same Arc
199+
assert!(Arc::ptr_eq(&immutable1.level, &immutable2.level));
200+
assert_eq!(Arc::strong_count(&immutable1.level), 2);
201+
}
202+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ pub mod map_api_ro;
7070
pub mod map_key;
7171
pub mod map_value;
7272
pub mod match_seq;
73+
pub mod mvcc;
7374
pub mod seq_value;
7475
pub mod util;
7576

0 commit comments

Comments
 (0)