Skip to content

Commit 83dcbe4

Browse files
gabotechshknlof
authored andcommitted
Address memory over-accounting in array_agg (apache#16816)
* Use get_slice_memory_size() instead of get_array_memory_size() for measuring array_agg accumulator size * Add comment explaining the rationale for using `.get_slice_memory_size()`
1 parent c9cd2cf commit 83dcbe4

File tree

1 file changed

+15
-11
lines changed

1 file changed

+15
-11
lines changed

datafusion/functions-aggregate/src/array_agg.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ use std::mem::{size_of, size_of_val, take};
2323
use std::sync::Arc;
2424

2525
use arrow::array::{
26-
make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray,
27-
StructArray,
26+
new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray,
2827
};
2928
use arrow::compute::{filter, SortOptions};
3029
use arrow::datatypes::{DataType, Field, FieldRef, Fields};
3130

3231
use datafusion_common::cast::as_list_array;
33-
use datafusion_common::scalar::copy_array_data;
3432
use datafusion_common::utils::{
3533
compare_rows, get_row_at_idx, take_function_args, SingleRowListArrayBuilder,
3634
};
@@ -335,11 +333,7 @@ impl Accumulator for ArrayAggAccumulator {
335333
};
336334

337335
if !val.is_empty() {
338-
// The ArrayRef might be holding a reference to its original input buffer, so
339-
// storing it here directly copied/compacted avoids over accounting memory
340-
// not used here.
341-
self.values
342-
.push(make_array(copy_array_data(&val.to_data())));
336+
self.values.push(val)
343337
}
344338

345339
Ok(())
@@ -398,7 +392,18 @@ impl Accumulator for ArrayAggAccumulator {
398392
+ self
399393
.values
400394
.iter()
401-
.map(|arr| arr.get_array_memory_size())
395+
// Each ArrayRef might be just a reference to a bigger array, and many
396+
// ArrayRefs here might be referencing exactly the same array, so if we
397+
// were to call `arr.get_array_memory_size()`, we would be double-counting
398+
// the same underlying data many times.
399+
//
400+
// Instead, we do an approximation by estimating how much memory each
401+
// ArrayRef would occupy if its underlying data was fully owned by this
402+
// accumulator.
403+
//
404+
// Note that this is just an estimation, but the reality is that this
405+
// accumulator might not own any data.
406+
.map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default())
402407
.sum::<usize>()
403408
+ self.datatype.size()
404409
- size_of_val(&self.datatype)
@@ -1064,8 +1069,7 @@ mod tests {
10641069
acc2.update_batch(&[data(["b", "c", "a"])])?;
10651070
acc1 = merge(acc1, acc2)?;
10661071

1067-
// without compaction, the size is 2652.
1068-
assert_eq!(acc1.size(), 732);
1072+
assert_eq!(acc1.size(), 266);
10691073

10701074
Ok(())
10711075
}

0 commit comments

Comments
 (0)