Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions oxcache/src/cache/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,27 @@ use std::sync::{Arc, atomic::{AtomicUsize, Ordering}};
use tokio::sync::Notify;
use lru_mem::HeapSize;

#[derive(Debug, Clone, Eq, PartialEq, Hash)]
#[derive(Debug, Clone)]
pub struct Chunk {
pub uuid: String,
pub offset: Byte,
pub size: Byte,
}

// // Only hash and compare based on uuid - offset/size are just read parameters
// impl PartialEq for Chunk {
// fn eq(&self, other: &Self) -> bool {
// self.uuid == other.uuid
// }
// }
//
// impl Eq for Chunk {}
//
// impl std::hash::Hash for Chunk {
// fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
// self.uuid.hash(state);
// }
// }
// Only hash and compare based on uuid - offset/size are just read parameters
impl PartialEq for Chunk {
fn eq(&self, other: &Self) -> bool {
self.uuid == other.uuid
}
}

impl Eq for Chunk {}

impl std::hash::Hash for Chunk {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.uuid.hash(state);
}
}

#[derive(Debug, PartialEq, Clone, Eq, Hash)]
pub struct ChunkLocation {
Expand Down
68 changes: 60 additions & 8 deletions oxcache/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,17 +511,20 @@ mod mod_tests {
Err(err) => assert!(false, "Error occurred: {err}"),
};

// Insert multiple similar entries
let entry1 = Chunk::new(String::from("fake-uuid"), 120, 10);
let entry2 = Chunk::new(String::from("fake-uuid!"), 120, 10);
let entry3 = Chunk::new(String::from("fake-uuid"), 121, 10);
let entry4 = Chunk::new(String::from("fake-uuid"), 121, 11);

// Test UUID-only hashing: entries with same UUID should map to same cache entry
// even with different offset/size
let entry1 = Chunk::new(String::from("fake-uuid-1"), 120, 10);
let entry2 = Chunk::new(String::from("fake-uuid-2"), 120, 10);
let entry3 = Chunk::new(String::from("fake-uuid-3"), 121, 10);
let entry4 = Chunk::new(String::from("fake-uuid-4"), 121, 11);

// Different UUIDs, so different cache entries
let chunk_loc1 = ChunkLocation::new(0, 20);
let chunk_loc2 = ChunkLocation::new(1, 22);
let chunk_loc3 = ChunkLocation::new(9, 25);
let chunk_loc4 = ChunkLocation::new(7, 29);

// Insert all four entries (all have different UUIDs)
check_err(
cache
.get_or_insert_with(entry1.clone(), fail_path, {
Expand Down Expand Up @@ -558,6 +561,7 @@ mod mod_tests {
.await,
);

// Verify all entries hit cache with correct locations
check_err(
cache
.get_or_insert_with(
Expand Down Expand Up @@ -619,6 +623,54 @@ mod mod_tests {
);
}

#[tokio::test]
async fn test_uuid_only_hashing() {
let cache = Cache::new(10, 100);

let fail_write_path = async || -> tokio::io::Result<ChunkLocation> {
assert!(false, "Write shouldn't be called for cache hit");
Ok(ChunkLocation { zone: 0, index: 0 })
};

let check_err = |result| match result {
Ok(()) => (),
Err(err) => assert!(false, "Error occurred: {err}"),
};

// Insert entry with specific offset/size
let entry1 = Chunk::new(String::from("same-uuid"), 120, 10);
let chunk_loc1 = ChunkLocation::new(0, 20);

check_err(
cache
.get_or_insert_with(entry1.clone(), |_| async { Ok(()) }, {
let chunk_loc1 = chunk_loc1.clone();
|| async { Ok(chunk_loc1) }
})
.await,
);

// Try to insert with same UUID but different offset/size
// Should hit cache (reader path), not write
let entry2 = Chunk::new(String::from("same-uuid"), 0, 100);

check_err(
cache
.get_or_insert_with(
entry2,
{
|pin_guard| async move {
// Should hit cache and return same location as entry1
assert_eq!(pin_guard.location(), &chunk_loc1);
Ok(())
}
},
fail_write_path,
)
.await,
);
}

#[tokio::test]
async fn test_remove() {
let cache = Cache::new(10, 100);
Expand Down Expand Up @@ -701,10 +753,10 @@ mod mod_tests {
Err(err) => assert!(false, "Error occurred: {err}"),
};

let entry = Chunk::new(String::from("fake-uuid"), 120, 10);
let entry = Chunk::new(String::from("fake-uuid-1"), 120, 10);
let chunk_loc = ChunkLocation::new(0, 20);

let entry2 = Chunk::new(String::from("fake-uuid"), 121, 10);
let entry2 = Chunk::new(String::from("fake-uuid-2"), 121, 10);
let chunk_loc2 = ChunkLocation::new(0, 21);

// Insert
Expand Down
37 changes: 23 additions & 14 deletions oxcache/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,19 +432,26 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
}

// println!("Received get request: {:?}", req);
let chunk: Chunk = req.into();
let chunk_clone = chunk.clone();

// Extract request parameters for subset reading
let request_offset = req.offset;
let request_size = req.size;
let request_uuid = req.key.clone();

// Create normalized cache key (always 0, chunk_size)
let cache_key = Chunk::new(request_uuid.clone(), 0, chunk_size);

tracing::debug!("REQ[{}] Calling cache.get_or_insert_with", request_id);
cache.get_or_insert_with(
chunk.clone(),
cache_key,
{
let writer = Arc::clone(&writer);
let reader_pool = Arc::clone(&reader_pool);
let start = Arc::clone(&start);
// let chunk = chunk.clone();
let request_offset = request_offset;
let request_size = request_size;
let request_uuid = request_uuid.clone();
move |pin_guard| async move {
let chunk = chunk.clone();
tracing::debug!("REQ[{}] CACHE HIT - entering read path", request_id);
let location = pin_guard.location().clone();

Expand All @@ -453,8 +460,8 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
location,
responder: tx,
_pin_guard: pin_guard,
read_offset: chunk.offset,
read_size: chunk.size,
read_offset: request_offset,
read_size: request_size,
};
tracing::debug!("REQ[{}] Sending read request to reader pool", request_id);
reader_pool.send(read_req).await.map_err(|e| {
Expand Down Expand Up @@ -483,9 +490,9 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(

// Validate read response - pin_guard must stay alive until here!
// Note: We validate against the original chunk that was stored (0, chunk_size),
// not the subset request (chunk.offset, chunk.size)
// not the subset request (request_offset, request_size)
#[cfg(debug_assertions)]
validate_read_response(&header, &chunk.uuid, 0, chunk_size);
validate_read_response(&header, &request_uuid, 0, chunk_size);

// Return only the data portion (header is just for validation)
let chunked_resp = data;
Expand All @@ -506,21 +513,23 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
METRICS.update_metric_histogram_latency("get_hit_latency_ms", start.elapsed(), MetricType::MsLatency);
METRICS.update_metric_counter("hit", 1);
METRICS.update_hitratio(HitType::Hit);
METRICS.update_metric_counter("read_bytes_total", chunk.size);
METRICS.update_metric_counter("bytes_total", chunk.size);
METRICS.update_metric_counter("read_bytes_total", request_size);
METRICS.update_metric_counter("bytes_total", request_size);
tracing::debug!("REQ[{}] CACHE HIT completed successfully", request_id);
Ok(())
}
},
{
let chunk = chunk_clone.clone();
let writer = Arc::clone(&writer);
let remote = Arc::clone(&remote);
let writer_pool = Arc::clone(&writer_pool);
let start = Arc::clone(&start);
let request_offset = request_offset;
let request_size = request_size;
let request_uuid = request_uuid.clone();
move || async move {
tracing::debug!("REQ[{}] CACHE MISS - entering remote fetch path", request_id);
let resp = match remote.get(chunk.uuid.as_str(), 0, chunk_size).await {
let resp = match remote.get(request_uuid.as_str(), 0, chunk_size).await {
Ok(resp) => {
tracing::debug!("REQ[{}] Remote fetch completed successfully", request_id);
resp
Expand All @@ -543,7 +552,7 @@ async fn handle_connection<T: RemoteBackend + Send + Sync + 'static>(
};

// Will implicitly fail if size larger than chunk
let chunked_resp = resp.slice(chunk.offset as usize..(chunk.offset + chunk.size) as usize);
let chunked_resp = resp.slice(request_offset as usize..(request_offset + request_size) as usize);
let encoded = bincode::serde::encode_to_vec(
request::GetResponse::Response(chunked_resp),
bincode::config::standard()
Expand Down