Skip to content

Validate the memory consumption in SPM created by multi level merge #17029

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub struct GreedyMemoryPool {
impl GreedyMemoryPool {
/// Create a new pool that can allocate up to `pool_size` bytes
pub fn new(pool_size: usize) -> Self {
debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
Self {
pool_size,
used: AtomicUsize::new(0),
Expand Down
14 changes: 9 additions & 5 deletions datafusion/physical-plan/benches/spill_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ fn bench_spill_io(c: &mut Criterion) {
// - Wait for the consumer to finish processing
|spill_file| {
rt.block_on(async {
let stream =
spill_manager.read_spill_as_stream(spill_file).unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.unwrap();
let _ = collect(stream).await.unwrap();
})
},
Expand Down Expand Up @@ -519,8 +520,9 @@ fn benchmark_spill_batches_for_all_codec(
)
.unwrap()
.unwrap();
let stream =
spill_manager.read_spill_as_stream(spill_file).unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.unwrap();
let _ = collect(stream).await.unwrap();
})
},
Expand Down Expand Up @@ -553,7 +555,9 @@ fn benchmark_spill_batches_for_all_codec(
let rt = Runtime::new().unwrap();
let start = Instant::now();
rt.block_on(async {
let stream = spill_manager.read_spill_as_stream(spill_file).unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.unwrap();
let _ = collect(stream).await.unwrap();
});
let read_time = start.elapsed();
Expand Down
9 changes: 6 additions & 3 deletions datafusion/physical-plan/src/sorts/multi_level_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,8 @@ impl MultiLevelMergeBuilder {
let spill_file = self.sorted_spill_files.remove(0);

// Not reserving any memory for this disk as we are not holding it in memory
self.spill_manager.read_spill_as_stream(spill_file.file)
self.spill_manager
.read_spill_as_stream(spill_file.file, None)
}

// Only in memory streams, so merge them all in a single pass
Expand Down Expand Up @@ -274,10 +275,12 @@ impl MultiLevelMergeBuilder {
.spill_manager
.clone()
.with_batch_read_buffer_capacity(buffer_size)
.read_spill_as_stream(spill.file)?;
.read_spill_as_stream(
spill.file,
Some(spill.max_record_batch_memory),
)?;
sorted_streams.push(stream);
}

let merge_sort_stream = self.create_new_merge_sort(
sorted_streams,
// If we have no sorted spill files left, this is the last run
Expand Down
47 changes: 40 additions & 7 deletions datafusion/physical-plan/src/spill/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,13 @@ use futures::{FutureExt as _, Stream};
struct SpillReaderStream {
schema: SchemaRef,
state: SpillReaderStreamState,
/// how much memory the largest memory batch is taking
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend to also explain: this is used for validation, and link to the multi-level merge's doc for the background.

pub max_record_batch_memory: Option<usize>,
}

// Small margin allowed to accommodate slight memory accounting variation
const MEMORY_MARGIN: usize = 4096;

/// When we poll for the next batch, we will get back both the batch and the reader,
/// so we can call `next` again.
type NextRecordBatchResult = Result<(StreamReader<BufReader<File>>, Option<RecordBatch>)>;
Expand All @@ -76,10 +81,15 @@ enum SpillReaderStreamState {
}

impl SpillReaderStream {
fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self {
fn new(
schema: SchemaRef,
spill_file: RefCountedTempFile,
max_record_batch_memory: Option<usize>,
) -> Self {
Self {
schema,
state: SpillReaderStreamState::Uninitialized(spill_file),
max_record_batch_memory,
}
}

Expand Down Expand Up @@ -125,6 +135,27 @@ impl SpillReaderStream {
Ok((reader, batch)) => {
match batch {
Some(batch) => {
if let Some(max_record_batch_memory) =
self.max_record_batch_memory
{
let actual_size =
get_record_batch_memory_size(&batch);
if actual_size
> max_record_batch_memory + MEMORY_MARGIN
{
return Poll::Ready(Some(Err(
DataFusionError::ResourcesExhausted(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use resources_err!(...) macro here

format!(
"Record batch memory usage ({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} bytes)\n
by more than the allowed tolerance ({MEMORY_MARGIN} bytes).\n
This likely indicates a bug in memory accounting during spilling.\n
Please report this issue",
)
.to_owned(),
),
)));
}
}
self.state = SpillReaderStreamState::Waiting(reader);

Poll::Ready(Some(Ok(batch)))
Expand Down Expand Up @@ -389,7 +420,7 @@ mod tests {
let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), schema);

let batches = collect(stream).await?;
Expand Down Expand Up @@ -453,7 +484,7 @@ mod tests {
let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), dict_schema);
let batches = collect(stream).await?;
assert_eq!(batches.len(), 2);
Expand All @@ -479,7 +510,7 @@ mod tests {
.unwrap();
assert!(spill_file.path().exists());

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), schema);

let batches = collect(stream).await?;
Expand Down Expand Up @@ -514,7 +545,7 @@ mod tests {
let spilled_rows = spill_manager.metrics.spilled_rows.value();
assert_eq!(spilled_rows, num_rows);

let stream = spill_manager.read_spill_as_stream(spill_file)?;
let stream = spill_manager.read_spill_as_stream(spill_file, None)?;
assert_eq!(stream.schema(), schema);

let batches = collect(stream).await?;
Expand Down Expand Up @@ -894,8 +925,10 @@ mod tests {
.spill_record_batch_and_finish(&batches, "Test2")?
.unwrap();

let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?;
let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?;
let mut stream_1 =
spill_manager.read_spill_as_stream(spill_file_1, None)?;
let mut stream_2 =
spill_manager.read_spill_as_stream(spill_file_2, None)?;
stream_1.next().await;
stream_2.next().await;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/physical-plan/src/spill/spill_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,12 @@ impl SpillManager {
pub fn read_spill_as_stream(
&self,
spill_file_path: RefCountedTempFile,
max_record_batch_memory: Option<usize>,
) -> Result<SendableRecordBatchStream> {
let stream = Box::pin(cooperative(SpillReaderStream::new(
Arc::clone(&self.schema),
spill_file_path,
max_record_batch_memory,
)));

Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
Expand Down
Loading