From 5e9fe834bf18f081bf65fb930094ed78db4c9682 Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Mon, 4 Aug 2025 06:11:09 +0000 Subject: [PATCH 1/3] use GreedyMemoryPool for sanity check --- datafusion/execution/src/memory_pool/pool.rs | 4 +++- datafusion/physical-plan/src/sorts/merge.rs | 4 +++- .../src/sorts/multi_level_merge.rs | 17 ++++++++++++++--- .../physical-plan/src/sorts/streaming_merge.rs | 9 ++++++--- 4 files changed, 26 insertions(+), 8 deletions(-) diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 11467f69be1c..bfb3f4edac98 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -44,6 +44,7 @@ impl MemoryPool for UnboundedMemoryPool { fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { self.grow(reservation, additional); + println!("[mem pool] {} used", self.reserved()); Ok(()) } @@ -70,7 +71,8 @@ pub struct GreedyMemoryPool { impl GreedyMemoryPool { /// Create a new pool that can allocate up to `pool_size` bytes pub fn new(pool_size: usize) -> Self { - debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); + // debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); + println!("Created new GreedyMemoryPool(pool_size={pool_size})"); Self { pool_size, used: AtomicUsize::new(0), diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index ca2d5f2105f2..bf986d94fea1 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -194,7 +194,9 @@ impl SortPreservingMergeStream { match futures::ready!(self.streams.poll_next(cx, idx)) { None => Poll::Ready(Ok(())), - Some(Err(e)) => Poll::Ready(Err(e)), + Some(Err(e)) => { + Poll::Ready(Err(e)) + } Some(Ok((cursor, batch))) => { self.cursors[idx] = Some(Cursor::new(cursor)); Poll::Ready(self.in_progress.push_batch(idx, batch)) diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index bb6fc751b897..7821999f6dd2 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -248,11 +248,15 @@ impl MultiLevelMergeBuilder { // If we have no sorted spill files left, this is the last run true, true, + 0, // TODO(ding-young) ) } // Need to merge multiple streams - (_, _) => { + (spill, inmem) => { + println!( + "[merge_sorted_runs_within_mem_limit] spill{spill}, inmem{inmem}" + ); let mut memory_reservation = self.reservation.new_empty(); // Don't account for existing streams memory @@ -277,12 +281,17 @@ impl MultiLevelMergeBuilder { .read_spill_as_stream(spill.file)?; sorted_streams.push(stream); } - + println!( + "[merge_sorted_runs_within_mem_limit] memory_reservation:{}", + memory_reservation.size() + ); + // TODO(ding-young) pass memory limit but what about in mem size? let merge_sort_stream = self.create_new_merge_sort( sorted_streams, // If we have no sorted spill files left, this is the last run self.sorted_spill_files.is_empty(), is_only_merging_memory_streams, + memory_reservation.size(), )?; // If we're only merging memory streams, we don't need to attach the memory reservation @@ -308,6 +317,7 @@ impl MultiLevelMergeBuilder { streams: Vec, is_output: bool, all_in_memory: bool, + expected_max_memory_usage: usize, ) -> Result { let mut builder = StreamingMergeBuilder::new() .with_schema(Arc::clone(&self.schema)) @@ -328,7 +338,8 @@ impl MultiLevelMergeBuilder { // (reserving memory for the biggest batch in each stream) // TODO - avoid this hack as this can be broken easily when `SortPreservingMergeStream` // changes the implementation to use more/less memory - builder = builder.with_bypass_mempool(); + println!("create new merge sort with bypass mempool"); + builder = builder.with_bypass_mempool(expected_max_memory_usage); } else { // If we are only merging in-memory streams, we need to use the memory reservation // because we don't know the maximum size of the batches in the streams diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 191b13575341..246be1873277 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -29,8 +29,9 @@ use arrow::array::*; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; +#[allow(unused_imports)] use datafusion_execution::memory_pool::{ - human_readable_size, MemoryConsumer, MemoryPool, MemoryReservation, + human_readable_size, GreedyMemoryPool, MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool, }; use datafusion_physical_expr_common::sort_expr::LexOrdering; @@ -161,8 +162,10 @@ impl<'a> StreamingMergeBuilder<'a> { /// Bypass the mempool and avoid using the memory reservation. /// /// This is not marked as `pub` because it is not recommended to use this method - pub(super) fn with_bypass_mempool(self) -> Self { - let mem_pool: Arc = Arc::new(UnboundedMemoryPool::default()); + pub(super) fn with_bypass_mempool(self, memory_limit: usize) -> Self { + // TODO(ding-young) Bypass main memory pool and use separate GreedyMemoryPool for sanity check + // let mem_pool: Arc = Arc::new(UnboundedMemoryPool::default()); + let mem_pool: Arc = Arc::new(GreedyMemoryPool::new(memory_limit)); self.with_reservation( MemoryConsumer::new("merge stream mock memory").register(&mem_pool), From 0c65f27059fd8bf25e300bbe6556e01a4cd39c9b Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Fri, 8 Aug 2025 04:16:26 +0000 Subject: [PATCH 2/3] validate whether batch read from spill exceeds max_record_batch_mem --- datafusion/core/tests/sql/runtime_config.rs | 2 +- datafusion/physical-plan/benches/spill_io.rs | 14 ++++-- datafusion/physical-plan/src/sorts/merge.rs | 4 +- .../src/sorts/multi_level_merge.rs | 24 ++++------ .../src/sorts/streaming_merge.rs | 6 +-- datafusion/physical-plan/src/spill/mod.rs | 47 ++++++++++++++++--- .../physical-plan/src/spill/spill_manager.rs | 2 + 7 files changed, 63 insertions(+), 36 deletions(-) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index b05c36e335f3..d8207f0d48d2 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -193,7 +193,7 @@ async fn test_max_temp_directory_size_enforcement() { .unwrap(); let result = ctx.sql(query).await.unwrap().collect().await; - + println!("result is {result:?}"); assert!( result.is_ok(), "Should not fail due to max temp directory size limit" diff --git a/datafusion/physical-plan/benches/spill_io.rs b/datafusion/physical-plan/benches/spill_io.rs index 699f20f61f0d..2260fbe43381 100644 --- a/datafusion/physical-plan/benches/spill_io.rs +++ b/datafusion/physical-plan/benches/spill_io.rs @@ -115,8 +115,9 @@ fn bench_spill_io(c: &mut Criterion) { // - Wait for the consumer to finish processing |spill_file| { rt.block_on(async { - let stream = - spill_manager.read_spill_as_stream(spill_file).unwrap(); + let stream = spill_manager + .read_spill_as_stream(spill_file, None) + .unwrap(); let _ = collect(stream).await.unwrap(); }) }, @@ -519,8 +520,9 @@ fn benchmark_spill_batches_for_all_codec( ) .unwrap() .unwrap(); - let stream = - spill_manager.read_spill_as_stream(spill_file).unwrap(); + let stream = spill_manager + .read_spill_as_stream(spill_file, None) + .unwrap(); let _ = collect(stream).await.unwrap(); }) }, @@ -553,7 +555,9 @@ fn benchmark_spill_batches_for_all_codec( let rt = Runtime::new().unwrap(); let start = Instant::now(); rt.block_on(async { - let stream = spill_manager.read_spill_as_stream(spill_file).unwrap(); + let stream = spill_manager + .read_spill_as_stream(spill_file, None) + .unwrap(); let _ = collect(stream).await.unwrap(); }); let read_time = start.elapsed(); diff --git a/datafusion/physical-plan/src/sorts/merge.rs b/datafusion/physical-plan/src/sorts/merge.rs index bf986d94fea1..ca2d5f2105f2 100644 --- a/datafusion/physical-plan/src/sorts/merge.rs +++ b/datafusion/physical-plan/src/sorts/merge.rs @@ -194,9 +194,7 @@ impl SortPreservingMergeStream { match futures::ready!(self.streams.poll_next(cx, idx)) { None => Poll::Ready(Ok(())), - Some(Err(e)) => { - Poll::Ready(Err(e)) - } + Some(Err(e)) => Poll::Ready(Err(e)), Some(Ok((cursor, batch))) => { self.cursors[idx] = Some(Cursor::new(cursor)); Poll::Ready(self.in_progress.push_batch(idx, batch)) diff --git a/datafusion/physical-plan/src/sorts/multi_level_merge.rs b/datafusion/physical-plan/src/sorts/multi_level_merge.rs index 7821999f6dd2..58d046cc9091 100644 --- a/datafusion/physical-plan/src/sorts/multi_level_merge.rs +++ b/datafusion/physical-plan/src/sorts/multi_level_merge.rs @@ -237,7 +237,8 @@ impl MultiLevelMergeBuilder { let spill_file = self.sorted_spill_files.remove(0); // Not reserving any memory for this disk as we are not holding it in memory - self.spill_manager.read_spill_as_stream(spill_file.file) + self.spill_manager + .read_spill_as_stream(spill_file.file, None) } // Only in memory streams, so merge them all in a single pass @@ -248,15 +249,11 @@ impl MultiLevelMergeBuilder { // If we have no sorted spill files left, this is the last run true, true, - 0, // TODO(ding-young) ) } // Need to merge multiple streams - (spill, inmem) => { - println!( - "[merge_sorted_runs_within_mem_limit] spill{spill}, inmem{inmem}" - ); + (_, _) => { let mut memory_reservation = self.reservation.new_empty(); // Don't account for existing streams memory @@ -278,20 +275,17 @@ impl MultiLevelMergeBuilder { .spill_manager .clone() .with_batch_read_buffer_capacity(buffer_size) - .read_spill_as_stream(spill.file)?; + .read_spill_as_stream( + spill.file, + Some(spill.max_record_batch_memory), + )?; sorted_streams.push(stream); } - println!( - "[merge_sorted_runs_within_mem_limit] memory_reservation:{}", - memory_reservation.size() - ); - // TODO(ding-young) pass memory limit but what about in mem size? let merge_sort_stream = self.create_new_merge_sort( sorted_streams, // If we have no sorted spill files left, this is the last run self.sorted_spill_files.is_empty(), is_only_merging_memory_streams, - memory_reservation.size(), )?; // If we're only merging memory streams, we don't need to attach the memory reservation @@ -317,7 +311,6 @@ impl MultiLevelMergeBuilder { streams: Vec, is_output: bool, all_in_memory: bool, - expected_max_memory_usage: usize, ) -> Result { let mut builder = StreamingMergeBuilder::new() .with_schema(Arc::clone(&self.schema)) @@ -338,8 +331,7 @@ impl MultiLevelMergeBuilder { // (reserving memory for the biggest batch in each stream) // TODO - avoid this hack as this can be broken easily when `SortPreservingMergeStream` // changes the implementation to use more/less memory - println!("create new merge sort with bypass mempool"); - builder = builder.with_bypass_mempool(expected_max_memory_usage); + builder = builder.with_bypass_mempool(); } else { // If we are only merging in-memory streams, we need to use the memory reservation // because we don't know the maximum size of the batches in the streams diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index 246be1873277..f980a0aa35ba 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -162,10 +162,8 @@ impl<'a> StreamingMergeBuilder<'a> { /// Bypass the mempool and avoid using the memory reservation. /// /// This is not marked as `pub` because it is not recommended to use this method - pub(super) fn with_bypass_mempool(self, memory_limit: usize) -> Self { - // TODO(ding-young) Bypass main memory pool and use separate GreedyMemoryPool for sanity check - // let mem_pool: Arc = Arc::new(UnboundedMemoryPool::default()); - let mem_pool: Arc = Arc::new(GreedyMemoryPool::new(memory_limit)); + pub(super) fn with_bypass_mempool(self) -> Self { + let mem_pool: Arc = Arc::new(UnboundedMemoryPool::default()); self.with_reservation( MemoryConsumer::new("merge stream mock memory").register(&mem_pool), diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index a81221c8b6a9..cc774962342f 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -54,8 +54,13 @@ use futures::{FutureExt as _, Stream}; struct SpillReaderStream { schema: SchemaRef, state: SpillReaderStreamState, + /// how much memory the largest memory batch is taking + pub max_record_batch_memory: Option, } +// Small margin allowed to accommodate slight memory accounting variation +const MEMORY_MARGIN: usize = 4096; + /// When we poll for the next batch, we will get back both the batch and the reader, /// so we can call `next` again. type NextRecordBatchResult = Result<(StreamReader>, Option)>; @@ -76,10 +81,15 @@ enum SpillReaderStreamState { } impl SpillReaderStream { - fn new(schema: SchemaRef, spill_file: RefCountedTempFile) -> Self { + fn new( + schema: SchemaRef, + spill_file: RefCountedTempFile, + max_record_batch_memory: Option, + ) -> Self { Self { schema, state: SpillReaderStreamState::Uninitialized(spill_file), + max_record_batch_memory, } } @@ -125,6 +135,27 @@ impl SpillReaderStream { Ok((reader, batch)) => { match batch { Some(batch) => { + if let Some(max_record_batch_memory) = + self.max_record_batch_memory + { + let actual_size = + get_record_batch_memory_size(&batch); + if actual_size + > max_record_batch_memory + MEMORY_MARGIN + { + return Poll::Ready(Some(Err( + DataFusionError::ResourcesExhausted( + format!( + "Record batch memory usage ({actual_size} bytes) exceeds the expected limit ({max_record_batch_memory} bytes)\n + by more than the allowed tolerance ({MEMORY_MARGIN} bytes).\n + This likely indicates a bug in memory accounting during spilling.\n + Please report this issue", + ) + .to_owned(), + ), + ))); + } + } self.state = SpillReaderStreamState::Waiting(reader); Poll::Ready(Some(Ok(batch))) @@ -389,7 +420,7 @@ mod tests { let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), schema); let batches = collect(stream).await?; @@ -453,7 +484,7 @@ mod tests { let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), dict_schema); let batches = collect(stream).await?; assert_eq!(batches.len(), 2); @@ -479,7 +510,7 @@ mod tests { .unwrap(); assert!(spill_file.path().exists()); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), schema); let batches = collect(stream).await?; @@ -514,7 +545,7 @@ mod tests { let spilled_rows = spill_manager.metrics.spilled_rows.value(); assert_eq!(spilled_rows, num_rows); - let stream = spill_manager.read_spill_as_stream(spill_file)?; + let stream = spill_manager.read_spill_as_stream(spill_file, None)?; assert_eq!(stream.schema(), schema); let batches = collect(stream).await?; @@ -894,8 +925,10 @@ mod tests { .spill_record_batch_and_finish(&batches, "Test2")? .unwrap(); - let mut stream_1 = spill_manager.read_spill_as_stream(spill_file_1)?; - let mut stream_2 = spill_manager.read_spill_as_stream(spill_file_2)?; + let mut stream_1 = + spill_manager.read_spill_as_stream(spill_file_1, None)?; + let mut stream_2 = + spill_manager.read_spill_as_stream(spill_file_2, None)?; stream_1.next().await; stream_2.next().await; diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs b/datafusion/physical-plan/src/spill/spill_manager.rs index 6c47af129fce..e48cb621c1d1 100644 --- a/datafusion/physical-plan/src/spill/spill_manager.rs +++ b/datafusion/physical-plan/src/spill/spill_manager.rs @@ -201,10 +201,12 @@ impl SpillManager { pub fn read_spill_as_stream( &self, spill_file_path: RefCountedTempFile, + max_record_batch_memory: Option, ) -> Result { let stream = Box::pin(cooperative(SpillReaderStream::new( Arc::clone(&self.schema), spill_file_path, + max_record_batch_memory, ))); Ok(spawn_buffered(stream, self.batch_read_buffer_capacity)) From 68cffc684a4e0bb11e29fb413200e02c548c2aef Mon Sep 17 00:00:00 2001 From: SeoYoung Lee Date: Fri, 8 Aug 2025 05:29:25 +0000 Subject: [PATCH 3/3] chore: remove printlns --- datafusion/core/tests/sql/runtime_config.rs | 2 +- datafusion/execution/src/memory_pool/pool.rs | 3 --- datafusion/physical-plan/src/sorts/streaming_merge.rs | 3 +-- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/datafusion/core/tests/sql/runtime_config.rs b/datafusion/core/tests/sql/runtime_config.rs index d8207f0d48d2..b05c36e335f3 100644 --- a/datafusion/core/tests/sql/runtime_config.rs +++ b/datafusion/core/tests/sql/runtime_config.rs @@ -193,7 +193,7 @@ async fn test_max_temp_directory_size_enforcement() { .unwrap(); let result = ctx.sql(query).await.unwrap().collect().await; - println!("result is {result:?}"); + assert!( result.is_ok(), "Should not fail due to max temp directory size limit" diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index bfb3f4edac98..e3b70c37dd24 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -44,7 +44,6 @@ impl MemoryPool for UnboundedMemoryPool { fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { self.grow(reservation, additional); - println!("[mem pool] {} used", self.reserved()); Ok(()) } @@ -71,8 +70,6 @@ pub struct GreedyMemoryPool { impl GreedyMemoryPool { /// Create a new pool that can allocate up to `pool_size` bytes pub fn new(pool_size: usize) -> Self { - // debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); - println!("Created new GreedyMemoryPool(pool_size={pool_size})"); Self { pool_size, used: AtomicUsize::new(0), diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs b/datafusion/physical-plan/src/sorts/streaming_merge.rs index f980a0aa35ba..191b13575341 100644 --- a/datafusion/physical-plan/src/sorts/streaming_merge.rs +++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs @@ -29,9 +29,8 @@ use arrow::array::*; use arrow::datatypes::{DataType, SchemaRef}; use datafusion_common::{internal_err, Result}; use datafusion_execution::disk_manager::RefCountedTempFile; -#[allow(unused_imports)] use datafusion_execution::memory_pool::{ - human_readable_size, GreedyMemoryPool, MemoryConsumer, MemoryPool, MemoryReservation, + human_readable_size, MemoryConsumer, MemoryPool, MemoryReservation, UnboundedMemoryPool, }; use datafusion_physical_expr_common::sort_expr::LexOrdering;