Skip to content

Commit 8ce1636

Browse files
committed
perf: backport CountGroupsAccumulator
1 parent 51d6d74 commit 8ce1636

File tree

1 file changed

+202
-8
lines changed
  • datafusion/src/physical_plan/expressions

1 file changed

+202
-8
lines changed

datafusion/src/physical_plan/expressions/count.rs

Lines changed: 202 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,16 @@
2020
use std::any::Any;
2121
use std::sync::Arc;
2222

23-
use crate::error::Result;
24-
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
23+
use crate::error::{DataFusionError, Result};
24+
use crate::physical_plan::groups_accumulator::{EmitTo, GroupsAccumulator};
2525
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
26+
use crate::physical_plan::null_state::accumulate_indices;
2627
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
2728
use crate::scalar::ScalarValue;
29+
use arrow::array::{Array, ArrayData, BooleanArray, Int64Array, PrimitiveArray};
30+
use arrow::buffer::Buffer;
2831
use arrow::compute;
29-
use arrow::datatypes::DataType;
32+
use arrow::datatypes::{DataType, Int64Type, UInt64Type};
3033
use arrow::{
3134
array::{ArrayRef, UInt64Array},
3235
datatypes::Field,
@@ -99,11 +102,7 @@ impl AggregateExpr for Count {
99102
fn create_groups_accumulator(
100103
&self,
101104
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
102-
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
103-
CountAccumulator,
104-
>::new(move || {
105-
Ok(CountAccumulator::new())
106-
}))))
105+
Ok(Some(Box::new(CountGroupsAccumulator::new())))
107106
}
108107

109108
fn name(&self) -> &str {
@@ -170,6 +169,201 @@ impl Accumulator for CountAccumulator {
170169
}
171170
}
172171

172+
/// An accumulator to compute the counts of [`PrimitiveArray<T>`].
173+
/// Stores values as native types, and does overflow checking
174+
///
175+
/// Unlike most other accumulators, COUNT never produces NULLs. If no
176+
/// non-null values are seen in any group the output is 0. Thus, this
177+
/// accumulator has no additional null or seen filter tracking.
178+
#[derive(Debug)]
179+
struct CountGroupsAccumulator {
180+
/// Count per group.
181+
///
182+
/// Note that in upstream this is a Vec<i64>, and the count output and intermediate data type is
183+
/// `DataType::Int64`. But here we are still using UInt64.
184+
counts: Vec<u64>,
185+
}
186+
187+
impl CountGroupsAccumulator {
188+
pub fn new() -> Self {
189+
Self { counts: vec![] }
190+
}
191+
}
192+
193+
impl GroupsAccumulator for CountGroupsAccumulator {
194+
fn update_batch(
195+
&mut self,
196+
values: &[ArrayRef],
197+
group_indices: &[usize],
198+
opt_filter: Option<&BooleanArray>,
199+
total_num_groups: usize,
200+
) -> Result<()> {
201+
assert_eq!(values.len(), 1, "single argument to update_batch");
202+
let values = &values[0];
203+
204+
// Add one to each group's counter for each non null, non
205+
// filtered value
206+
self.counts.resize(total_num_groups, 0);
207+
accumulate_indices(
208+
group_indices,
209+
values.data_ref().null_bitmap().as_ref().map(|bitmap| (bitmap, values.offset(), values.len())),
210+
opt_filter,
211+
|group_index| {
212+
self.counts[group_index] += 1;
213+
},
214+
);
215+
216+
Ok(())
217+
}
218+
219+
fn merge_batch(
220+
&mut self,
221+
values: &[ArrayRef],
222+
group_indices: &[usize],
223+
opt_filter: Option<&BooleanArray>,
224+
total_num_groups: usize,
225+
) -> Result<()> {
226+
assert_eq!(values.len(), 1, "one argument to merge_batch");
227+
// first batch is counts, second is partial sums
228+
let partial_counts = match values[0].as_any().downcast_ref::<PrimitiveArray<UInt64Type>>() {
229+
Some(x) => x,
230+
None => {
231+
panic!("values[0] is of unexpected type {:?}, expecting UInt64Type for intermediate count batch", values[0].data_type());
232+
}
233+
};
234+
235+
// intermediate counts are always created as non null
236+
assert_eq!(partial_counts.null_count(), 0);
237+
let partial_counts = partial_counts.values();
238+
239+
// Adds the counts with the partial counts
240+
self.counts.resize(total_num_groups, 0);
241+
match opt_filter {
242+
Some(filter) => filter
243+
.iter()
244+
.zip(group_indices.iter())
245+
.zip(partial_counts.iter())
246+
.for_each(|((filter_value, &group_index), partial_count)| {
247+
if let Some(true) = filter_value {
248+
self.counts[group_index] += partial_count;
249+
}
250+
}),
251+
None => group_indices.iter().zip(partial_counts.iter()).for_each(
252+
|(&group_index, partial_count)| {
253+
self.counts[group_index] += partial_count;
254+
},
255+
),
256+
}
257+
258+
Ok(())
259+
}
260+
261+
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
262+
let counts = emit_to.take_needed(&mut self.counts);
263+
264+
// Count is always non null (null inputs just don't contribute to the overall values)
265+
266+
// TODO: This copies. Ideally, don't. Note: Avoiding this memcpy had minimal effect in PrimitiveGroupsAccumulator
267+
let buffers = vec![Buffer::from_slice_ref(&counts)];
268+
269+
let data = ArrayData::new(
270+
DataType::UInt64,
271+
counts.len(),
272+
None,
273+
None,
274+
0, /* offset */
275+
buffers,
276+
vec![]
277+
);
278+
Ok(Arc::new(PrimitiveArray::<UInt64Type>::from(data)))
279+
}
280+
281+
// return arrays for counts
282+
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
283+
let counts = emit_to.take_needed(&mut self.counts);
284+
// Backporting note: UInt64Array::from actually does copy here in old DF.
285+
let counts: PrimitiveArray<UInt64Type> = UInt64Array::from(counts); // zero copy, no nulls
286+
Ok(vec![Arc::new(counts) as ArrayRef])
287+
}
288+
289+
/// Converts an input batch directly to a state batch
290+
///
291+
/// The state of `COUNT` is always a single Int64Array (backporting note: it is a UInt64Array):
292+
/// * `1` (for non-null, non filtered values)
293+
/// * `0` (for null values)
294+
fn convert_to_state(
295+
&self,
296+
_values: &[ArrayRef],
297+
_opt_filter: Option<&BooleanArray>,
298+
) -> Result<Vec<ArrayRef>> {
299+
// convert_to_state only gets used in upstream datafusion, and we set
300+
// supports_convert_to_state to false. Because values.data_ref().offset() and the null
301+
// bitmap have differences that require care to backport, we comment this out instead.
302+
return Err(DataFusionError::NotImplemented("Input batch conversion to state not implemented".to_owned()));
303+
/*
304+
let values = &values[0];
305+
306+
let state_array = match (values.logical_nulls(), opt_filter) {
307+
(None, None) => {
308+
// In case there is no nulls in input and no filter, returning array of 1
309+
Arc::new(Int64Array::from_value(1, values.len()))
310+
}
311+
(Some(nulls), None) => {
312+
// If there are any nulls in input values -- casting `nulls` (true for values, false for nulls)
313+
// of input array to Int64
314+
let nulls = BooleanArray::new(nulls.into_inner(), None);
315+
compute::cast(&nulls, &DataType::Int64)?
316+
}
317+
(None, Some(filter)) => {
318+
// If there is only filter
319+
// - applying filter null mask to filter values by bitand filter values and nulls buffers
320+
// (using buffers guarantees absence of nulls in result)
321+
// - casting result of bitand to Int64 array
322+
let (filter_values, filter_nulls) = filter.clone().into_parts();
323+
324+
let state_buf = match filter_nulls {
325+
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
326+
None => filter_values,
327+
};
328+
329+
let boolean_state = BooleanArray::new(state_buf, None);
330+
compute::cast(&boolean_state, &DataType::Int64)?
331+
}
332+
(Some(nulls), Some(filter)) => {
333+
// For both input nulls and filter
334+
// - applying filter null mask to filter values by bitand filter values and nulls buffers
335+
// (using buffers guarantees absence of nulls in result)
336+
// - applying values null mask to filter buffer by another bitand on filter result and
337+
// nulls from input values
338+
// - casting result to Int64 array
339+
let (filter_values, filter_nulls) = filter.clone().into_parts();
340+
341+
let filter_buf = match filter_nulls {
342+
Some(filter_nulls) => &filter_values & filter_nulls.inner(),
343+
None => filter_values,
344+
};
345+
let state_buf = &filter_buf & nulls.inner();
346+
347+
let boolean_state = BooleanArray::new(state_buf, None);
348+
compute::cast(&boolean_state, &DataType::Int64)?
349+
}
350+
};
351+
352+
Ok(vec![state_array])
353+
*/
354+
}
355+
356+
fn supports_convert_to_state(&self) -> bool {
357+
// Is set to true in upstream (as it's implemented above in upstream). But convert_to_state
358+
// is not used in this branch anyway.
359+
false
360+
}
361+
362+
fn size(&self) -> usize {
363+
self.counts.capacity() * size_of::<usize>()
364+
}
365+
}
366+
173367
#[cfg(test)]
174368
mod tests {
175369
use super::*;

0 commit comments

Comments
 (0)