Skip to content

Commit fa95bd3

Browse files
authored
Use GroupsAccumulator exclusively in grouped hash aggregation (#175)
Makes other AggregateExprs in use GroupsAccumulatorFlatAdapter, and also uses a GroupsAccumulator implementation that has Box<dyn Accumulator> inside as a fallback accumulator if some AggregateExpr implementation does not support that. This fully removes a batch keys and hash table iteration and brings that performance benefit from Sum and Avg to other aggregation types.
1 parent a8f045a commit fa95bd3

File tree

8 files changed

+140
-142
lines changed

8 files changed

+140
-142
lines changed

datafusion/src/cube_ext/joinagg.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ impl ExecutionPlan for CrossJoinAggExec {
259259
accumulators = hash_aggregate::group_aggregate_batch(
260260
&AggregateMode::Full,
261261
&group_expr,
262-
&self.agg_expr,
263262
joined,
264263
std::mem::take(&mut accumulators),
265264
&aggs,

datafusion/src/physical_plan/distinct_expressions.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ use arrow::datatypes::{DataType, Field};
2727

2828
use crate::error::{DataFusionError, Result};
2929
use crate::physical_plan::group_scalar::GroupByScalar;
30+
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
31+
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
3032
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
3133
use crate::scalar::ScalarValue;
3234
use itertools::Itertools;
@@ -122,6 +124,26 @@ impl AggregateExpr for DistinctCount {
122124
}))
123125
}
124126

127+
fn uses_groups_accumulator(&self) -> bool {
128+
return true;
129+
}
130+
131+
fn create_groups_accumulator(
132+
&self,
133+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
134+
let state_data_types = self.state_data_types.clone();
135+
let count_data_type = self.data_type.clone();
136+
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
137+
DistinctCountAccumulator,
138+
>::new(move || {
139+
Ok(DistinctCountAccumulator {
140+
values: HashSet::default(),
141+
state_data_types: state_data_types.clone(),
142+
count_data_type: count_data_type.clone(),
143+
})
144+
}))))
145+
}
146+
125147
fn name(&self) -> &str {
126148
&self.name
127149
}

datafusion/src/physical_plan/expressions/average.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,8 +118,6 @@ impl AggregateExpr for Avg {
118118
return true;
119119
}
120120

121-
/// the groups accumulator used to accumulate values from the expression. If this returns None,
122-
/// create_accumulator must be used.
123121
fn create_groups_accumulator(
124122
&self,
125123
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {

datafusion/src/physical_plan/expressions/count.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::any::Any;
2121
use std::sync::Arc;
2222

2323
use crate::error::Result;
24+
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
25+
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
2426
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
2527
use crate::scalar::ScalarValue;
2628
use arrow::compute;
@@ -90,6 +92,20 @@ impl AggregateExpr for Count {
9092
Ok(Box::new(CountAccumulator::new()))
9193
}
9294

95+
fn uses_groups_accumulator(&self) -> bool {
96+
return true;
97+
}
98+
99+
fn create_groups_accumulator(
100+
&self,
101+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
102+
Ok(Some(Box::new(GroupsAccumulatorFlatAdapter::<
103+
CountAccumulator,
104+
>::new(move || {
105+
Ok(CountAccumulator::new())
106+
}))))
107+
}
108+
93109
fn name(&self) -> &str {
94110
&self.name
95111
}

datafusion/src/physical_plan/expressions/min_max.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use std::convert::TryFrom;
2222
use std::sync::Arc;
2323

2424
use crate::error::{DataFusionError, Result};
25+
use crate::physical_plan::groups_accumulator::GroupsAccumulator;
26+
use crate::physical_plan::groups_accumulator_flat_adapter::GroupsAccumulatorFlatAdapter;
2527
use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
2628
use crate::scalar::ScalarValue;
2729
use arrow::compute;
@@ -99,6 +101,21 @@ impl AggregateExpr for Max {
99101
Ok(Box::new(MaxAccumulator::try_new(&self.data_type)?))
100102
}
101103

104+
fn uses_groups_accumulator(&self) -> bool {
105+
return true;
106+
}
107+
108+
fn create_groups_accumulator(
109+
&self,
110+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
111+
let data_type = self.data_type.clone();
112+
Ok(Some(Box::new(
113+
GroupsAccumulatorFlatAdapter::<MaxAccumulator>::new(move || {
114+
MaxAccumulator::try_new(&data_type)
115+
}),
116+
)))
117+
}
118+
102119
fn name(&self) -> &str {
103120
&self.name
104121
}
@@ -523,6 +540,21 @@ impl AggregateExpr for Min {
523540
Ok(Box::new(MinAccumulator::try_new(&self.data_type)?))
524541
}
525542

543+
fn uses_groups_accumulator(&self) -> bool {
544+
return true;
545+
}
546+
547+
fn create_groups_accumulator(
548+
&self,
549+
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {
550+
let data_type = self.data_type.clone();
551+
Ok(Some(Box::new(
552+
GroupsAccumulatorFlatAdapter::<MinAccumulator>::new(move || {
553+
MinAccumulator::try_new(&data_type)
554+
}),
555+
)))
556+
}
557+
526558
fn name(&self) -> &str {
527559
&self.name
528560
}

datafusion/src/physical_plan/expressions/sum.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,6 @@ impl AggregateExpr for Sum {
124124
return true;
125125
}
126126

127-
/// the groups accumulator used to accumulate values from the expression. If this returns None,
128-
/// create_accumulator must be used.
129127
fn create_groups_accumulator(
130128
&self,
131129
) -> arrow::error::Result<Option<Box<dyn GroupsAccumulator>>> {

0 commit comments

Comments
 (0)