-
Notifications
You must be signed in to change notification settings - Fork 1.6k
[PoC] Add API for tracking distinct buffers in MemoryPool
by reference count
#16359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Dandandan
wants to merge
18
commits into
apache:main
Choose a base branch
from
Dandandan:register_arc
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
5ea8c05
Add API for tracking distinct arrays
Dandandan 447cc48
Add API for tracking distinct arrays
Dandandan ba25081
Add API for tracking distinct arrays
Dandandan 05f3827
Add API for tracking distinct arrays
Dandandan c419fa5
Add API for tracking distinct arrays
Dandandan da36b88
Add API for tracking distinct arrays
Dandandan 618e17d
Add API for tracking distinct arrays
Dandandan 98216a8
Add API for tracking distinct arrays
Dandandan d3c4895
Add API for tracking distinct arrays
Dandandan 449e00f
Add API for tracking distinct arrays
Dandandan 9c9162c
Add API for tracking distinct arrays
Dandandan 5110d3f
Add API for tracking distinct arrays
Dandandan da7a103
Add API for tracking distinct arrays
Dandandan 2747ca9
Add API for tracking distinct arrays
Dandandan f904b56
Add API for tracking distinct arrays
Dandandan f313f4c
Add API for tracking distinct arrays
Dandandan 75531c9
Add API for tracking distinct arrays
Dandandan 9d43e6b
Add API for tracking distinct arrays
Dandandan File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ use datafusion_common::HashMap; | |
use datafusion_common::{resources_datafusion_err, DataFusionError, Result}; | ||
use log::debug; | ||
use parking_lot::Mutex; | ||
use std::sync::Arc; | ||
use std::{ | ||
num::NonZeroUsize, | ||
sync::atomic::{AtomicUsize, Ordering}, | ||
|
@@ -112,6 +113,144 @@ impl MemoryPool for GreedyMemoryPool { | |
} | ||
} | ||
|
||
// A [`MemoryPool`] that implements a greedy first-come first-serve limit. | ||
/// and tracks the memory usage based on the references to the arrays. | ||
/// | ||
/// This pool works well for queries that do not need to spill or have | ||
/// a single spillable operator. See [`FairSpillPool`] if there are | ||
/// multiple spillable operators that all will spill. | ||
#[derive(Debug)] | ||
pub struct GreedyMemoryPoolWithTracking { | ||
pool_size: usize, | ||
used: AtomicUsize, | ||
references: Mutex<HashMap<usize, usize>>, | ||
} | ||
|
||
impl GreedyMemoryPoolWithTracking { | ||
/// Create a new pool that can allocate up to `pool_size` bytes | ||
pub fn new(pool_size: usize) -> Self { | ||
debug!("Created new GreedyMemoryPool(pool_size={pool_size})"); | ||
Self { | ||
pool_size, | ||
used: AtomicUsize::new(0), | ||
references: Mutex::new(HashMap::new()), | ||
} | ||
} | ||
} | ||
|
||
impl MemoryPool for GreedyMemoryPoolWithTracking { | ||
fn grow(&self, _reservation: &MemoryReservation, additional: usize) { | ||
self.used.fetch_add(additional, Ordering::Relaxed); | ||
} | ||
|
||
fn grow_with_arrays( | ||
&self, | ||
reservation: &MemoryReservation, | ||
arrays: &[Arc<dyn arrow::array::Array>], | ||
) { | ||
for array in arrays { | ||
let array_data = array.to_data(); | ||
for buffer in array_data.buffers() { | ||
let addr = buffer.data_ptr().as_ptr() as usize; | ||
let ref_count = *self | ||
.references | ||
.lock() | ||
.entry(addr) | ||
.and_modify(|ref_count| *ref_count += array.get_array_memory_size()) | ||
.or_insert(1); | ||
|
||
// If this is the first time we see this buffer, we need to grow the pool | ||
if ref_count == 1 { | ||
let additional = buffer.capacity(); | ||
self.grow(reservation, additional); | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) { | ||
self.used.fetch_sub(shrink, Ordering::Relaxed); | ||
} | ||
|
||
fn shrink_with_arrays( | ||
&self, | ||
reservation: &MemoryReservation, | ||
arrays: &[Arc<dyn arrow::array::Array>], | ||
) { | ||
for array in arrays { | ||
let array_data = array.to_data(); | ||
for buffer in array_data.buffers() { | ||
// We need to track the memory usage of the buffers | ||
let addr = buffer.data_ptr().as_ptr() as usize; | ||
let ref_count = *self | ||
.references | ||
.lock() | ||
.entry(addr) | ||
.and_modify(|ref_count| *ref_count -= buffer.len()) | ||
.or_insert(1); | ||
Comment on lines
+185
to
+190
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should remove the entry here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good one |
||
|
||
// If this is the last reference to this buffer, we need to shrink the pool | ||
if ref_count == 0 { | ||
let additional = buffer.capacity(); | ||
self.shrink(reservation, additional); | ||
} | ||
} | ||
} | ||
} | ||
|
||
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> { | ||
self.used | ||
.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| { | ||
let new_used = used + additional; | ||
(new_used <= self.pool_size).then_some(new_used) | ||
}) | ||
.map_err(|used| { | ||
insufficient_capacity_err( | ||
reservation, | ||
additional, | ||
self.pool_size.saturating_sub(used), | ||
) | ||
})?; | ||
Ok(()) | ||
} | ||
|
||
fn try_grow_with_arrays( | ||
&self, | ||
reservation: &MemoryReservation, | ||
arrays: &[Arc<dyn arrow::array::Array>], | ||
) -> Result<()> { | ||
for array in arrays.iter() { | ||
// also take into account overhead | ||
let array_data = array.to_data(); | ||
let buffers = array_data.buffers(); | ||
for buffer in buffers { | ||
let addr = buffer.data_ptr().as_ptr() as usize; | ||
let ref_count = *self | ||
.references | ||
.lock() | ||
.entry(addr) | ||
.and_modify(|ref_count| *ref_count += 1) | ||
.or_insert(1); | ||
|
||
// If this is the first time we see this buffer, we need to grow the pool | ||
if ref_count == 1 { | ||
let additional = buffer.capacity(); | ||
self.try_grow(reservation, additional)?; | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} | ||
|
||
fn reserved(&self) -> usize { | ||
self.used.load(Ordering::Relaxed) | ||
} | ||
|
||
fn memory_limit(&self) -> MemoryLimit { | ||
MemoryLimit::Finite(self.pool_size) | ||
} | ||
} | ||
|
||
/// A [`MemoryPool`] that prevents spillable reservations from using more than | ||
/// an even fraction of the available memory sans any unspillable reservations | ||
/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`) | ||
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make the API take
RecordBatch
instead of arrays? Since inside df it's more common to passing batches around, and we can use a utility function to dobatch -> [Array]
for array usages.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thats possible, let me have a look at whether we can use recordbatch always. My corcern was we might not always have a RecordBatch, but might have an Array. In that case conversion to recordbatch would be strange.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that works since
MemoryReservation
holds array refs, we don't have record batch refsThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes @joroKr21 but we can extract the arrays / buffers from a
&RecordBatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I mean you can change the API of
MemoryReservation
but not ofMemoryPool
because ofMemoryReservation::free