Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 101 additions & 3 deletions src/cas/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Stream for BlockStream {
// skip file entirely
self.processed += self.paths[self.fp].1;
self.fp += 1;
if self.fp > self.paths.len() {
if self.fp >= self.paths.len() {
return Poll::Ready(None);
}
continue;
Expand All @@ -145,7 +145,7 @@ impl Stream for BlockStream {
// skip file entirely
self.processed += self.paths[self.fp].1;
self.fp += 1;
if self.fp > self.paths.len() {
if self.fp >= self.paths.len() {
return Poll::Ready(None);
}
continue;
Expand All @@ -158,7 +158,7 @@ impl Stream for BlockStream {
}

// we don't have an open file, check if we have any more left
if self.fp > self.paths.len() {
if self.fp >= self.paths.len() {
return Poll::Ready(None);
}

Expand Down Expand Up @@ -199,3 +199,101 @@ impl Stream for BlockStream {
(self.size, Some(self.size))
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::fs::File;
use std::io::Write;
use std::sync::Once;
use tempfile::tempdir;

// Use a static metrics instance to avoid registration conflicts
static INIT: Once = Once::new();
static mut METRICS: Option<SharedMetrics> = None;

fn get_metrics() -> SharedMetrics {
unsafe {
INIT.call_once(|| {
METRICS = Some(SharedMetrics::new());
});
METRICS.clone().unwrap()
}
}

#[test]
fn test_block_stream_happy_path() {
let dir = tempdir().unwrap();
let file_path1 = dir.path().join("test_file1");
let file_path2 = dir.path().join("test_file2");

// Create test files with known content
let data1 = b"hello world";
let mut file1 = File::create(&file_path1).unwrap();
file1.write_all(data1).unwrap();

let data2 = b"second file content";
let mut file2 = File::create(&file_path2).unwrap();
file2.write_all(data2).unwrap();

// Create paths vector for BlockStream
let paths = vec![(file_path1, data1.len()), (file_path2, data2.len())];

let total_size = data1.len() + data2.len();
let metrics = get_metrics();

// Test with RangeRequest::All
let stream = BlockStream::new(
paths.clone(),
total_size,
RangeRequest::All,
metrics.clone(),
);

// Verify the structure is created correctly
assert_eq!(stream.size, total_size);
}

#[test]
fn test_block_stream_truncated_file() {
let dir = tempdir().unwrap();
let file_path1 = dir.path().join("test_file1");
let file_path2 = dir.path().join("test_file2");
let nonexistent_path = dir.path().join("nonexistent_file");

// Create first test file with known content
let data1 = b"hello world";
let mut file1 = File::create(&file_path1).unwrap();
file1.write_all(data1).unwrap();

// Create second test file with known content
let data2 = b"second file content";
let mut file2 = File::create(&file_path2).unwrap();
file2.write_all(data2).unwrap();

// Create paths vector for BlockStream including a nonexistent file
let paths = vec![
(file_path1, data1.len()),
(nonexistent_path, 10), // This file doesn't exist
(file_path2, data2.len()),
];

let total_size = data1.len() + 10 + data2.len();
let metrics = get_metrics();

// Test with RangeRequest::All
let stream = BlockStream::new(
paths.clone(),
total_size,
RangeRequest::All,
metrics.clone(),
);

// Verify the structure is created correctly
assert_eq!(stream.size, total_size);

// The test passes if it doesn't panic when we try to access a nonexistent file
// The fix ensures that when a file is missing, we properly check bounds before
// trying to access the next file in the paths array
}
}
Loading