Skip to content

Commit 92aaf00

Browse files
committed
Fix desync between clean and evict
There's a problem right now for chunk eviction. In get_evict_targets we remove from the lru and modify the priority queue accordingly. We then take a subset of what has been modified and clean it after retrieving the relevant zones in get_clean_targets. The problem is the priority queue can now be potentially adjusted by reads that occur, meaning the priorities can be in an inconsistent state since we do not evict them immediately. Need to track if chunk is valid, maintain a array of list of chunks representing all zones. Chunks should be initialized to invalid initially. When a first write occurs it should be set to valid. When it's removed from the lru it should be set to invalid and we update the priority queue accordingly. If we ever do a read, but it is to an invalid chunk, that means we need to update the priority queue. This also means we do not need to remove from the lru in get_clean_targets, saving us a lot of time. We simply remove the retain, and then in get_evict_targets, when we are removing from the lru we check if it is an invalid entry and if so we remove it and skip it. So basically our lru may end up holding invalid items but that's fine.
1 parent 4e71dc4 commit 92aaf00

File tree

2 files changed

+105
-203
lines changed

2 files changed

+105
-203
lines changed

oxcache/src/eviction.rs

Lines changed: 85 additions & 196 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::writerpool::WriterPool;
44
use crate::zone_state::zone_priority_queue::{ZoneIndex, ZonePriorityQueue};
55
use flume::{Receiver, Sender};
66
use lru_mem::{LruCache};
7+
use ndarray::Array2;
78
use nvme::types::{Chunk, Zone};
89
use std::io::ErrorKind;
910
use std::sync::{
@@ -231,6 +232,7 @@ pub struct ChunkEvictionPolicy {
231232
nr_chunks_per_zone: Chunk,
232233
lru: LruCache<ChunkLocation, ()>,
233234
pq: ZonePriorityQueue,
235+
validity: Array2<AtomicBool>,
234236
#[cfg(feature = "eviction-metrics")]
235237
pub metrics: Option<Arc<crate::eviction_metrics::EvictionMetrics>>,
236238
}
@@ -249,17 +251,49 @@ impl ChunkEvictionPolicy {
249251
high_water,
250252
nr_chunks_per_zone
251253
);
254+
255+
// Initialize all chunks as invalid (false) - become valid on first write
256+
let validity = Array2::from_shape_fn(
257+
(nr_zones as usize, nr_chunks_per_zone as usize),
258+
|_| AtomicBool::new(false)
259+
);
260+
252261
Self {
253262
high_water,
254263
low_water,
255264
nr_zones,
256265
nr_chunks_per_zone,
257266
lru: LruCache::new(usize::MAX), // Effectively unbounded
258267
pq: ZonePriorityQueue::new(nr_zones, clean_low_water),
268+
validity,
259269
#[cfg(feature = "eviction-metrics")]
260270
metrics: None,
261271
}
262272
}
273+
274+
/// Check if chunk is currently valid (tracked by eviction policy)
275+
fn is_valid(&self, loc: &ChunkLocation) -> bool {
276+
// Bounds check - zones/chunks outside our range
277+
assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone));
278+
self.validity[[loc.zone as usize, loc.index as usize]]
279+
.load(Ordering::Relaxed)
280+
}
281+
282+
/// Mark chunk as valid (actively tracked)
283+
fn mark_valid(&self, loc: &ChunkLocation) {
284+
// Bounds check - zones/chunks outside our range
285+
assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone));
286+
self.validity[[loc.zone as usize, loc.index as usize]]
287+
.store(true, Ordering::Relaxed);
288+
}
289+
290+
/// Mark chunk as invalid (evicted but not yet cleaned)
291+
fn mark_invalid(&self, loc: &ChunkLocation) {
292+
// Bounds check - ignore out of range chunks
293+
assert!(!(loc.zone >= self.nr_zones || loc.index >= self.nr_chunks_per_zone));
294+
self.validity[[loc.zone as usize, loc.index as usize]]
295+
.store(false, Ordering::Relaxed);
296+
}
263297
}
264298

265299
impl EvictionPolicy for ChunkEvictionPolicy {
@@ -271,6 +305,7 @@ impl EvictionPolicy for ChunkEvictionPolicy {
271305
metrics.record_write(&chunk);
272306
}
273307

308+
self.mark_valid(&chunk);
274309
self.lru.insert(chunk, ()).ok();
275310
}
276311

@@ -280,9 +315,28 @@ impl EvictionPolicy for ChunkEvictionPolicy {
280315
metrics.record_read(&chunk);
281316
}
282317

283-
if self.lru.contains(&chunk) {
318+
// Check validity first - chunk can be in LRU but marked invalid
319+
// (after get_clean_targets marks all chunks in cleaned zones as invalid)
320+
if !self.is_valid(&chunk) {
321+
// Chunk was marked for eviction/cleaning but is still being accessed
322+
// Re-validate it: add/promote in LRU and decrement PQ count
323+
let zone = chunk.zone;
324+
self.mark_valid(&chunk);
325+
self.lru.insert(chunk, ()).ok();
326+
327+
// Decrement priority queue since this chunk is no longer invalid
328+
self.pq.modify_priority(zone, -1);
329+
330+
#[cfg(feature = "eviction-metrics")]
331+
if let Some(ref metrics) = self.metrics {
332+
// Optional: Track re-validation events
333+
// metrics.record_chunk_revalidation(&chunk);
334+
}
335+
} else if self.lru.contains(&chunk) {
336+
// Already tracked and valid - just promote it
284337
self.lru.insert(chunk, ()).ok();
285338
}
339+
// If chunk is not in LRU and is valid, it was already cleaned - do nothing
286340
}
287341

288342
fn get_evict_targets(&mut self, always_evict: bool) -> Self::Target {
@@ -307,19 +361,36 @@ impl EvictionPolicy for ChunkEvictionPolicy {
307361
let mut zone_counts = std::collections::HashMap::new();
308362

309363
// Collect evicted items and count by zone (batch the counting)
310-
for _ in 0..cap {
364+
// Continue until we have 'cap' VALID chunks
365+
let mut collected = 0;
366+
while collected < cap {
311367
if let Some((targ, _)) = self.lru.remove_lru() {
368+
// Check if already invalid (lazily skip stale entries)
369+
if !self.is_valid(&targ) {
370+
// Already marked invalid in previous eviction round but not yet cleaned
371+
// Skip it and continue removing more to reach our target count
372+
continue;
373+
}
374+
312375
let target_zone = targ.zone;
313-
targets.push(targ);
376+
targets.push(targ.clone());
377+
378+
// Mark chunk as invalid
379+
self.mark_invalid(&targ);
314380

315381
// Batch count instead of individual priority queue updates
316382
*zone_counts.entry(target_zone).or_insert(0) += 1;
383+
384+
collected += 1;
385+
} else {
386+
// LRU is empty, can't collect more
387+
break;
317388
}
318389
}
319390

320391
// Batch update priority queue (far fewer operations)
321392
for (zone, count) in zone_counts {
322-
self.pq.modify_priority(zone, count);
393+
self.pq.modify_priority(zone, count as i64);
323394
}
324395

325396
#[cfg(feature = "eviction-metrics")]
@@ -338,13 +409,16 @@ impl EvictionPolicy for ChunkEvictionPolicy {
338409

339410
clean_targets.sort_unstable();
340411

341-
let zones_to_clean: std::collections::HashSet<nvme::types::Zone> =
342-
clean_targets.iter().copied().collect();
343-
344-
// Efficient selective removal - O(k) where k = items removed
345-
// instead of O(n) where n = total LRU size
346-
self.lru
347-
.retain(|chunk_loc, _| !zones_to_clean.contains(&chunk_loc.zone));
412+
// Mark ALL chunks in cleaned zones as invalid
413+
// This prevents chunks from being re-added to LRU during zone cleaning
414+
// and ensures consistency when zone is being relocated
415+
// This must be done because the actual locations can change
416+
for zone in &clean_targets {
417+
for chunk_idx in 0..self.nr_chunks_per_zone {
418+
let loc = ChunkLocation::new(*zone, chunk_idx);
419+
self.mark_invalid(&loc);
420+
}
421+
}
348422

349423
clean_targets
350424
}
@@ -662,189 +736,4 @@ mod tests {
662736
let got = policy.get_clean_targets().len();
663737
assert_eq!(2, got, "Expected 2, but got {}", got);
664738
}
665-
666-
#[test]
667-
fn performance_test_large_lru_get_clean_targets() {
668-
// Performance test with ~15.6M chunks in LRU
669-
// 904 zones, 17232 chunks per zone = 15,581,728 total chunks
670-
let nr_zones = 904;
671-
let nr_chunks_per_zone = 17232;
672-
let total_chunks = nr_zones * nr_chunks_per_zone;
673-
674-
// Set clean_low_water to trigger cleaning when zones have 1+ evicted chunks
675-
let clean_low_water = 1;
676-
677-
// High/low water marks - trigger eviction when LRU approaches capacity
678-
// Use more reasonable eviction ratios to avoid massive bulk evictions
679-
let high_water = total_chunks - (total_chunks / 100); // 99% capacity
680-
let low_water = total_chunks - (total_chunks / 50); // 98% capacity
681-
682-
let mut policy = ChunkEvictionPolicy::new(
683-
high_water,
684-
low_water,
685-
clean_low_water,
686-
nr_zones,
687-
nr_chunks_per_zone,
688-
);
689-
690-
println!(
691-
"Filling LRU with {} chunks across {} zones...",
692-
total_chunks, nr_zones
693-
);
694-
let start_fill = std::time::Instant::now();
695-
696-
// Fill the LRU with chunks from all zones
697-
for zone in 0..nr_zones {
698-
for chunk_idx in 0..nr_chunks_per_zone {
699-
policy.write_update(ChunkLocation::new(zone, chunk_idx));
700-
}
701-
}
702-
703-
let fill_duration = start_fill.elapsed();
704-
println!("LRU fill took: {:?}", fill_duration);
705-
println!("LRU size: {}", policy.lru.len());
706-
707-
// Trigger some evictions to populate the priority queue
708-
// This will evict 500 chunks and mark zones for potential cleaning
709-
println!("Triggering evictions to populate priority queue...");
710-
let evict_start = std::time::Instant::now();
711-
let evicted = policy.get_evict_targets(false);
712-
let evict_duration = evict_start.elapsed();
713-
println!("Evicted {} chunks in {:?}", evicted.len(), evict_duration);
714-
715-
// Now test get_clean_targets performance with different scenarios
716-
717-
// Scenario 1: Small cleanup (few zones)
718-
println!("\n=== Scenario 1: Small cleanup ===");
719-
let start_small = std::time::Instant::now();
720-
let clean_targets_small = policy.get_clean_targets();
721-
let small_duration = start_small.elapsed();
722-
println!(
723-
"Small cleanup: {} zones cleaned in {:?}",
724-
clean_targets_small.len(),
725-
small_duration
726-
);
727-
println!("LRU size after small cleanup: {}", policy.lru.len());
728-
729-
// Refill LRU with new chunks to simulate continued cache activity
730-
println!("Refilling LRU with new chunks for scenario 2...");
731-
let refill_start = std::time::Instant::now();
732-
let chunks_to_add = (total_chunks as usize) / 3; // Add 33% more chunks
733-
for i in 0..chunks_to_add {
734-
let zone = i % (nr_zones as usize);
735-
let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize);
736-
// Use high zone/chunk indices to avoid conflicts with existing chunks
737-
policy.write_update(ChunkLocation::new(
738-
(zone + nr_zones as usize) as u64,
739-
chunk_idx as u64,
740-
));
741-
}
742-
let refill_duration = refill_start.elapsed();
743-
println!(
744-
"Refilled LRU with {} chunks in {:?}",
745-
chunks_to_add, refill_duration
746-
);
747-
println!("LRU size after refill: {}", policy.lru.len());
748-
749-
// Trigger evictions to populate priority queue for scenario 2
750-
let evict2_start = std::time::Instant::now();
751-
let evicted2 = policy.get_evict_targets(false);
752-
let evict2_duration = evict2_start.elapsed();
753-
println!(
754-
"Second eviction: {} chunks in {:?}",
755-
evicted2.len(),
756-
evict2_duration
757-
);
758-
759-
// Scenario 2: Medium cleanup
760-
println!("\n=== Scenario 2: Medium cleanup ===");
761-
let start_medium = std::time::Instant::now();
762-
let clean_targets_medium = policy.get_clean_targets();
763-
let medium_duration = start_medium.elapsed();
764-
println!(
765-
"Medium cleanup: {} zones cleaned in {:?}",
766-
clean_targets_medium.len(),
767-
medium_duration
768-
);
769-
println!("LRU size after medium cleanup: {}", policy.lru.len());
770-
771-
// Refill LRU again for scenario 3
772-
println!("Refilling LRU with new chunks for scenario 3...");
773-
let refill2_start = std::time::Instant::now();
774-
let chunks_to_add2 = (total_chunks as usize) / 2; // Add 50% more chunks
775-
for i in 0..chunks_to_add2 {
776-
let zone = i % (nr_zones as usize);
777-
let chunk_idx = (i / (nr_zones as usize)) % (nr_chunks_per_zone as usize);
778-
// Use even higher indices to avoid conflicts
779-
policy.write_update(ChunkLocation::new(
780-
(zone + 2 * nr_zones as usize) as u64,
781-
chunk_idx as u64,
782-
));
783-
}
784-
let refill2_duration = refill2_start.elapsed();
785-
println!(
786-
"Refilled LRU with {} chunks in {:?}",
787-
chunks_to_add2, refill2_duration
788-
);
789-
println!("LRU size after second refill: {}", policy.lru.len());
790-
791-
// Trigger evictions for scenario 3
792-
let evict3_start = std::time::Instant::now();
793-
let evicted3 = policy.get_evict_targets(false);
794-
let evict3_duration = evict3_start.elapsed();
795-
println!(
796-
"Third eviction: {} chunks in {:?}",
797-
evicted3.len(),
798-
evict3_duration
799-
);
800-
801-
// Scenario 3: Large cleanup
802-
println!("\n=== Scenario 3: Large cleanup ===");
803-
let start_large = std::time::Instant::now();
804-
let clean_targets_large = policy.get_clean_targets();
805-
let large_duration = start_large.elapsed();
806-
println!(
807-
"Large cleanup: {} zones cleaned in {:?}",
808-
clean_targets_large.len(),
809-
large_duration
810-
);
811-
println!("LRU size after large cleanup: {}", policy.lru.len());
812-
813-
// Performance analysis
814-
println!("\n=== Performance Analysis ===");
815-
println!("Initial LRU size: {}", total_chunks);
816-
println!(
817-
"Small cleanup time: {:?} ({} zones)",
818-
small_duration,
819-
clean_targets_small.len()
820-
);
821-
println!(
822-
"Medium cleanup time: {:?} ({} zones)",
823-
medium_duration,
824-
clean_targets_medium.len()
825-
);
826-
println!(
827-
"Large cleanup time: {:?} ({} zones)",
828-
large_duration,
829-
clean_targets_large.len()
830-
);
831-
832-
// Calculate time per LRU item processed
833-
if policy.lru.len() > 0 {
834-
let time_per_item_ns = large_duration.as_nanos() as f64 / total_chunks as f64;
835-
println!(
836-
"Approximate time per LRU item processed: {:.2} ns",
837-
time_per_item_ns
838-
);
839-
}
840-
841-
// Verify correctness - LRU should still function properly
842-
assert!(policy.lru.len() <= total_chunks as usize);
843-
844-
// Test that we can still perform normal operations
845-
policy.write_update(ChunkLocation::new(999, 14));
846-
policy.read_update(ChunkLocation::new(0, 0));
847-
848-
println!("Test completed successfully!");
849-
}
850739
}

oxcache/src/zone_state/zone_priority_queue.rs

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,27 @@ impl ZonePriorityQueue {
4848
zones
4949
}
5050

51-
pub fn modify_priority(&mut self, ind: ZoneIndex, priority_increase: ZonePriority) {
52-
// Only account for it if the entry exists
53-
if self
54-
.invalid_queue
55-
.change_priority_by(&ind, |p| *p += priority_increase)
56-
{
57-
self.invalid_count = self.invalid_count.saturating_add(priority_increase);
51+
pub fn modify_priority(&mut self, ind: ZoneIndex, priority_change: i64) {
52+
// Handle both increments and decrements
53+
if priority_change > 0 {
54+
let increase = priority_change as ZonePriority;
55+
if self
56+
.invalid_queue
57+
.change_priority_by(&ind, |p| *p += increase)
58+
{
59+
self.invalid_count = self.invalid_count.saturating_add(increase);
60+
}
61+
} else if priority_change < 0 {
62+
let decrease = (-priority_change) as ZonePriority;
63+
if self
64+
.invalid_queue
65+
.change_priority_by(&ind, |p| *p = p.saturating_sub(decrease))
66+
{
67+
self.invalid_count = self.invalid_count.saturating_sub(decrease);
68+
}
5869
}
70+
// priority_change == 0: no-op
71+
5972
#[cfg(debug_assertions)]
6073
self.assert_consistent();
6174
}

0 commit comments

Comments
 (0)