From 133fadbe996f0014401d14ccd38eb808b6c6eab1 Mon Sep 17 00:00:00 2001 From: John Ramsden Date: Tue, 23 Dec 2025 16:36:52 -0800 Subject: [PATCH] Hash on UUID, not offset, size Hashing on offset and size leads to duplicate map entries when subset reads occur --- oxcache/src/cache/bucket.rs | 30 ++++++++-------- oxcache/src/cache/mod.rs | 68 ++++++++++++++++++++++++++++++++----- oxcache/src/server.rs | 37 ++++++++++++-------- 3 files changed, 98 insertions(+), 37 deletions(-) diff --git a/oxcache/src/cache/bucket.rs b/oxcache/src/cache/bucket.rs index 97e0867..fd05b2e 100644 --- a/oxcache/src/cache/bucket.rs +++ b/oxcache/src/cache/bucket.rs @@ -4,27 +4,27 @@ use std::sync::{Arc, atomic::{AtomicUsize, Ordering}}; use tokio::sync::Notify; use lru_mem::HeapSize; -#[derive(Debug, Clone, Eq, PartialEq, Hash)] +#[derive(Debug, Clone)] pub struct Chunk { pub uuid: String, pub offset: Byte, pub size: Byte, } -// // Only hash and compare based on uuid - offset/size are just read parameters -// impl PartialEq for Chunk { -// fn eq(&self, other: &Self) -> bool { -// self.uuid == other.uuid -// } -// } -// -// impl Eq for Chunk {} -// -// impl std::hash::Hash for Chunk { -// fn hash(&self, state: &mut H) { -// self.uuid.hash(state); -// } -// } +// Only hash and compare based on uuid - offset/size are just read parameters +impl PartialEq for Chunk { + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Chunk {} + +impl std::hash::Hash for Chunk { + fn hash(&self, state: &mut H) { + self.uuid.hash(state); + } +} #[derive(Debug, PartialEq, Clone, Eq, Hash)] pub struct ChunkLocation { diff --git a/oxcache/src/cache/mod.rs b/oxcache/src/cache/mod.rs index 716c543..5212f63 100644 --- a/oxcache/src/cache/mod.rs +++ b/oxcache/src/cache/mod.rs @@ -511,17 +511,20 @@ mod mod_tests { Err(err) => assert!(false, "Error occurred: {err}"), }; - // Insert multiple similar entries - let entry1 = Chunk::new(String::from("fake-uuid"), 120, 10); - let entry2 = Chunk::new(String::from("fake-uuid!"), 120, 10); - let entry3 = Chunk::new(String::from("fake-uuid"), 121, 10); - let entry4 = Chunk::new(String::from("fake-uuid"), 121, 11); - + // Test UUID-only hashing: entries with same UUID should map to same cache entry + // even with different offset/size + let entry1 = Chunk::new(String::from("fake-uuid-1"), 120, 10); + let entry2 = Chunk::new(String::from("fake-uuid-2"), 120, 10); + let entry3 = Chunk::new(String::from("fake-uuid-3"), 121, 10); + let entry4 = Chunk::new(String::from("fake-uuid-4"), 121, 11); + + // Different UUIDs, so different cache entries let chunk_loc1 = ChunkLocation::new(0, 20); let chunk_loc2 = ChunkLocation::new(1, 22); let chunk_loc3 = ChunkLocation::new(9, 25); let chunk_loc4 = ChunkLocation::new(7, 29); + // Insert all four entries (all have different UUIDs) check_err( cache .get_or_insert_with(entry1.clone(), fail_path, { @@ -558,6 +561,7 @@ mod mod_tests { .await, ); + // Verify all entries hit cache with correct locations check_err( cache .get_or_insert_with( @@ -619,6 +623,54 @@ mod mod_tests { ); } + #[tokio::test] + async fn test_uuid_only_hashing() { + let cache = Cache::new(10, 100); + + let fail_write_path = async || -> tokio::io::Result { + assert!(false, "Write shouldn't be called for cache hit"); + Ok(ChunkLocation { zone: 0, index: 0 }) + }; + + let check_err = |result| match result { + Ok(()) => (), + Err(err) => assert!(false, "Error occurred: {err}"), + }; + + // Insert entry with specific offset/size + let entry1 = Chunk::new(String::from("same-uuid"), 120, 10); + let chunk_loc1 = ChunkLocation::new(0, 20); + + check_err( + cache + .get_or_insert_with(entry1.clone(), |_| async { Ok(()) }, { + let chunk_loc1 = chunk_loc1.clone(); + || async { Ok(chunk_loc1) } + }) + .await, + ); + + // Try to insert with same UUID but different offset/size + // Should hit cache (reader path), not write + let entry2 = Chunk::new(String::from("same-uuid"), 0, 100); + + check_err( + cache + .get_or_insert_with( + entry2, + { + |pin_guard| async move { + // Should hit cache and return same location as entry1 + assert_eq!(pin_guard.location(), &chunk_loc1); + Ok(()) + } + }, + fail_write_path, + ) + .await, + ); + } + #[tokio::test] async fn test_remove() { let cache = Cache::new(10, 100); @@ -701,10 +753,10 @@ mod mod_tests { Err(err) => assert!(false, "Error occurred: {err}"), }; - let entry = Chunk::new(String::from("fake-uuid"), 120, 10); + let entry = Chunk::new(String::from("fake-uuid-1"), 120, 10); let chunk_loc = ChunkLocation::new(0, 20); - let entry2 = Chunk::new(String::from("fake-uuid"), 121, 10); + let entry2 = Chunk::new(String::from("fake-uuid-2"), 121, 10); let chunk_loc2 = ChunkLocation::new(0, 21); // Insert diff --git a/oxcache/src/server.rs b/oxcache/src/server.rs index d51a91c..c66ee77 100644 --- a/oxcache/src/server.rs +++ b/oxcache/src/server.rs @@ -432,19 +432,26 @@ async fn handle_connection( } // println!("Received get request: {:?}", req); - let chunk: Chunk = req.into(); - let chunk_clone = chunk.clone(); + + // Extract request parameters for subset reading + let request_offset = req.offset; + let request_size = req.size; + let request_uuid = req.key.clone(); + + // Create normalized cache key (always 0, chunk_size) + let cache_key = Chunk::new(request_uuid.clone(), 0, chunk_size); tracing::debug!("REQ[{}] Calling cache.get_or_insert_with", request_id); cache.get_or_insert_with( - chunk.clone(), + cache_key, { let writer = Arc::clone(&writer); let reader_pool = Arc::clone(&reader_pool); let start = Arc::clone(&start); - // let chunk = chunk.clone(); + let request_offset = request_offset; + let request_size = request_size; + let request_uuid = request_uuid.clone(); move |pin_guard| async move { - let chunk = chunk.clone(); tracing::debug!("REQ[{}] CACHE HIT - entering read path", request_id); let location = pin_guard.location().clone(); @@ -453,8 +460,8 @@ async fn handle_connection( location, responder: tx, _pin_guard: pin_guard, - read_offset: chunk.offset, - read_size: chunk.size, + read_offset: request_offset, + read_size: request_size, }; tracing::debug!("REQ[{}] Sending read request to reader pool", request_id); reader_pool.send(read_req).await.map_err(|e| { @@ -483,9 +490,9 @@ async fn handle_connection( // Validate read response - pin_guard must stay alive until here! // Note: We validate against the original chunk that was stored (0, chunk_size), - // not the subset request (chunk.offset, chunk.size) + // not the subset request (request_offset, request_size) #[cfg(debug_assertions)] - validate_read_response(&header, &chunk.uuid, 0, chunk_size); + validate_read_response(&header, &request_uuid, 0, chunk_size); // Return only the data portion (header is just for validation) let chunked_resp = data; @@ -506,21 +513,23 @@ async fn handle_connection( METRICS.update_metric_histogram_latency("get_hit_latency_ms", start.elapsed(), MetricType::MsLatency); METRICS.update_metric_counter("hit", 1); METRICS.update_hitratio(HitType::Hit); - METRICS.update_metric_counter("read_bytes_total", chunk.size); - METRICS.update_metric_counter("bytes_total", chunk.size); + METRICS.update_metric_counter("read_bytes_total", request_size); + METRICS.update_metric_counter("bytes_total", request_size); tracing::debug!("REQ[{}] CACHE HIT completed successfully", request_id); Ok(()) } }, { - let chunk = chunk_clone.clone(); let writer = Arc::clone(&writer); let remote = Arc::clone(&remote); let writer_pool = Arc::clone(&writer_pool); let start = Arc::clone(&start); + let request_offset = request_offset; + let request_size = request_size; + let request_uuid = request_uuid.clone(); move || async move { tracing::debug!("REQ[{}] CACHE MISS - entering remote fetch path", request_id); - let resp = match remote.get(chunk.uuid.as_str(), 0, chunk_size).await { + let resp = match remote.get(request_uuid.as_str(), 0, chunk_size).await { Ok(resp) => { tracing::debug!("REQ[{}] Remote fetch completed successfully", request_id); resp @@ -543,7 +552,7 @@ async fn handle_connection( }; // Will implicitly fail if size larger than chunk - let chunked_resp = resp.slice(chunk.offset as usize..(chunk.offset + chunk.size) as usize); + let chunked_resp = resp.slice(request_offset as usize..(request_offset + request_size) as usize); let encoded = bincode::serde::encode_to_vec( request::GetResponse::Response(chunked_resp), bincode::config::standard()