diff --git a/datafusion/execution/src/memory_pool/mod.rs b/datafusion/execution/src/memory_pool/mod.rs index 19e509d263ea..3d538eb86d3b 100644 --- a/datafusion/execution/src/memory_pool/mod.rs +++ b/datafusion/execution/src/memory_pool/mod.rs @@ -18,8 +18,10 @@ //! [`MemoryPool`] for memory management during query execution, [`proxy`] for //! help with allocation accounting. +use arrow::array::Array; use datafusion_common::{internal_err, Result}; use std::hash::{Hash, Hasher}; +use std::vec; use std::{cmp::Ordering, sync::atomic, sync::Arc}; mod pool; @@ -131,14 +133,58 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug { /// This must always succeed fn grow(&self, reservation: &MemoryReservation, additional: usize); + /// Infallibly grow the provided `reservation` by bytes in held in &[Arc] + /// + /// This defaults to summing the memory size of all arrays, but can be + /// overridden by implementations that track the memory size of Array usages + fn grow_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc], + ) { + let additional = arrays + .iter() + .map(|array| array.get_array_memory_size()) + .sum(); + self.grow(reservation, additional); + } + /// Infallibly shrink the provided `reservation` by `shrink` bytes fn shrink(&self, reservation: &MemoryReservation, shrink: usize); + fn shrink_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc], + ) { + let shrink = arrays + .iter() + .map(|array| array.get_array_memory_size()) + .sum(); + self.shrink(reservation, shrink); + } + /// Attempt to grow the provided `reservation` by `additional` bytes /// /// On error the `allocation` will not be increased in size fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>; + /// Infallibly grow the provided `reservation` by bytes held in &[Arc] + /// + /// This defaults to summing the memory size of all arrays, but can be + /// overridden by implementations that track the memory size of Array usages + fn try_grow_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc], + ) -> Result<()> { + let additional = arrays + .iter() + .map(|array| array.get_array_memory_size()) + .sum(); + self.try_grow(reservation, additional) + } + /// Return the total amount of memory reserved fn reserved(&self) -> usize; @@ -261,6 +307,7 @@ impl MemoryConsumer { consumer: self, }), size: 0, + arrays: Vec::new(), } } } @@ -290,6 +337,8 @@ impl Drop for SharedRegistration { pub struct MemoryReservation { registration: Arc, size: usize, + // arrays tracked by this reservation + arrays: Vec>, } impl MemoryReservation { @@ -310,6 +359,11 @@ impl MemoryReservation { if size != 0 { self.shrink(size) } + + self.registration + .pool + .shrink_with_arrays(self, &self.arrays); + self.arrays.clear(); size } @@ -375,6 +429,18 @@ impl MemoryReservation { Ok(()) } + /// Increase the size of this reservation by bytes held in + /// the provided `arrays`. + pub fn try_grow_with_arrays(&mut self, arrays: &[Arc]) -> Result<()> { + self.registration.pool.try_grow_with_arrays(self, arrays)?; + // don't increase size of this pool + arrays + .iter() + .for_each(|array| self.arrays.push(Arc::clone(array))); + + Ok(()) + } + /// Splits off `capacity` bytes from this [`MemoryReservation`] /// into a new [`MemoryReservation`] with the same /// [`MemoryConsumer`]. @@ -390,6 +456,7 @@ impl MemoryReservation { Self { size: capacity, registration: Arc::clone(&self.registration), + arrays: vec![], } } @@ -398,6 +465,7 @@ impl MemoryReservation { Self { size: 0, registration: Arc::clone(&self.registration), + arrays: vec![], } } diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 11467f69be1c..1d2977ebd604 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -22,6 +22,7 @@ use datafusion_common::HashMap; use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; use log::debug; use parking_lot::Mutex; +use std::sync::Arc; use std::{ num::NonZeroUsize, sync::atomic::{AtomicUsize, Ordering}, @@ -112,6 +113,144 @@ impl MemoryPool for GreedyMemoryPool { } } +// A [`MemoryPool`] that implements a greedy first-come first-serve limit. +/// and tracks the memory usage based on the references to the arrays. +/// +/// This pool works well for queries that do not need to spill or have +/// a single spillable operator. See [`FairSpillPool`] if there are +/// multiple spillable operators that all will spill. +#[derive(Debug)] +pub struct GreedyMemoryPoolWithTracking { + pool_size: usize, + used: AtomicUsize, + references: Mutex>, +} + +impl GreedyMemoryPoolWithTracking { + /// Create a new pool that can allocate up to `pool_size` bytes + pub fn new(pool_size: usize) -> Self { + debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); + Self { + pool_size, + used: AtomicUsize::new(0), + references: Mutex::new(HashMap::new()), + } + } +} + +impl MemoryPool for GreedyMemoryPoolWithTracking { + fn grow(&self, _reservation: &MemoryReservation, additional: usize) { + self.used.fetch_add(additional, Ordering::Relaxed); + } + + fn grow_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc], + ) { + for array in arrays { + let array_data = array.to_data(); + for buffer in array_data.buffers() { + let addr = buffer.data_ptr().as_ptr() as usize; + let ref_count = *self + .references + .lock() + .entry(addr) + .and_modify(|ref_count| *ref_count += array.get_array_memory_size()) + .or_insert(1); + + // If this is the first time we see this buffer, we need to grow the pool + if ref_count == 1 { + let additional = buffer.capacity(); + self.grow(reservation, additional); + } + } + } + } + + fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) { + self.used.fetch_sub(shrink, Ordering::Relaxed); + } + + fn shrink_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc], + ) { + for array in arrays { + let array_data = array.to_data(); + for buffer in array_data.buffers() { + // We need to track the memory usage of the buffers + let addr = buffer.data_ptr().as_ptr() as usize; + let ref_count = *self + .references + .lock() + .entry(addr) + .and_modify(|ref_count| *ref_count -= buffer.len()) + .or_insert(1); + + // If this is the last reference to this buffer, we need to shrink the pool + if ref_count == 0 { + let additional = buffer.capacity(); + self.shrink(reservation, additional); + } + } + } + } + + fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { + self.used + .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| { + let new_used = used + additional; + (new_used <= self.pool_size).then_some(new_used) + }) + .map_err(|used| { + insufficient_capacity_err( + reservation, + additional, + self.pool_size.saturating_sub(used), + ) + })?; + Ok(()) + } + + fn try_grow_with_arrays( + &self, + reservation: &MemoryReservation, + arrays: &[Arc], + ) -> Result<()> { + for array in arrays.iter() { + // also take into account overhead + let array_data = array.to_data(); + let buffers = array_data.buffers(); + for buffer in buffers { + let addr = buffer.data_ptr().as_ptr() as usize; + let ref_count = *self + .references + .lock() + .entry(addr) + .and_modify(|ref_count| *ref_count += 1) + .or_insert(1); + + // If this is the first time we see this buffer, we need to grow the pool + if ref_count == 1 { + let additional = buffer.capacity(); + self.try_grow(reservation, additional)?; + } + } + } + Ok(()) + } + + fn reserved(&self) -> usize { + self.used.load(Ordering::Relaxed) + } + + fn memory_limit(&self) -> MemoryLimit { + MemoryLimit::Finite(self.pool_size) + } +} + /// A [`MemoryPool`] that prevents spillable reservations from using more than /// an even fraction of the available memory sans any unspillable reservations /// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`) diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index e4d554ceb62c..88a50fdbb064 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -201,7 +201,7 @@ async fn load_left_input( |(mut batches, metrics, mut reservation), batch| async { let batch_size = batch.get_array_memory_size(); // Reserve memory for incoming batch - reservation.try_grow(batch_size)?; + reservation.try_grow_with_arrays(batch.columns())?; // Update metrics metrics.build_mem_used.add(batch_size); metrics.build_input_batches.add(1); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 96cb09b4cb81..fa9e73e9a7f2 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -967,7 +967,7 @@ async fn collect_left_input( .try_fold(initial, |mut acc, batch| async { let batch_size = get_record_batch_memory_size(&batch); // Reserve memory for incoming batch - acc.3.try_grow(batch_size)?; + acc.3.try_grow_with_arrays(batch.columns())?; // Update metrics acc.2.build_mem_used.add(batch_size); acc.2.build_input_batches.add(1); diff --git a/datafusion/physical-plan/src/recursive_query.rs b/datafusion/physical-plan/src/recursive_query.rs index 210db90c3c7f..4b56b78a8eaf 100644 --- a/datafusion/physical-plan/src/recursive_query.rs +++ b/datafusion/physical-plan/src/recursive_query.rs @@ -303,7 +303,7 @@ impl RecursiveQueryStream { mut self: std::pin::Pin<&mut Self>, batch: RecordBatch, ) -> Poll>> { - if let Err(e) = self.reservation.try_grow(batch.get_array_memory_size()) { + if let Err(e) = self.reservation.try_grow_with_arrays(batch.columns()) { return Poll::Ready(Some(Err(e))); }