Skip to content

Commit 7c71b24

Browse files
committed
Skip all regular Accumulators if there are none
1 parent cb58dc7 commit 7c71b24

File tree

1 file changed

+52
-51
lines changed

1 file changed

+52
-51
lines changed

datafusion/src/physical_plan/hash_aggregate.rs

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,9 @@ pub(crate) fn group_aggregate_batch(
438438
// Make sure we can create the accumulators or otherwise return an error
439439
create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
440440

441+
let all_groups_accumulators: bool =
442+
aggr_expr.iter().all(|expr| expr.uses_groups_accumulator());
443+
441444
// Keys received in this batch
442445
let mut batch_keys = BinaryBuilder::new(0);
443446

@@ -523,59 +526,57 @@ pub(crate) fn group_aggregate_batch(
523526
})
524527
.collect();
525528

526-
let mut group_indices = Vec::<usize>::new();
527-
528-
// 2.1 for each key in this batch
529-
// 2.2 for each aggregation
530-
// 2.3 `slice` from each of its arrays the keys' values
531-
// 2.4 update / merge the accumulator with the values
532-
// 2.5 clear indices
533-
batch_keys
534-
.iter()
535-
.zip(offsets.windows(2))
536-
.try_for_each(|(key, offsets)| {
537-
let AccumulationGroupState {
538-
accumulator_set,
539-
indices,
540-
group_index,
541-
..
542-
} = accumulation_state
543-
.accumulators
544-
.get_mut(key.unwrap())
545-
.unwrap();
546-
// 2.2
547-
accumulator_set
548-
.iter_mut()
549-
.zip(values.iter())
550-
.map(|(accumulator, aggr_array)| {
551-
(
552-
accumulator,
553-
aggr_array
554-
.iter()
555-
.map(|array| {
556-
// 2.3
557-
array.slice(offsets[0], offsets[1] - offsets[0])
558-
})
559-
.collect::<Vec<ArrayRef>>(),
560-
)
561-
})
562-
.try_for_each(|(accumulator, values)| {
563-
if let Some(accumulator) = accumulator {
564-
match mode {
565-
AggregateMode::Partial | AggregateMode::Full => {
566-
accumulator.update_batch(&values)
567-
}
568-
AggregateMode::FinalPartitioned | AggregateMode::Final => {
569-
// note: the aggregation here is over states, not values, thus the merge
570-
accumulator.merge_batch(&values)
529+
if !all_groups_accumulators {
530+
// 2.1 for each key in this batch
531+
// 2.2 for each aggregation
532+
// 2.3 `slice` from each of its arrays the keys' values
533+
// 2.4 update / merge the accumulator with the values
534+
// 2.5 clear indices
535+
batch_keys
536+
.iter()
537+
.zip(offsets.windows(2))
538+
.try_for_each(|(key, offsets)| {
539+
let AccumulationGroupState {
540+
accumulator_set, ..
541+
} = accumulation_state
542+
.accumulators
543+
.get_mut(key.unwrap())
544+
.unwrap();
545+
// 2.2
546+
accumulator_set
547+
.iter_mut()
548+
.zip(values.iter())
549+
.map(|(accumulator, aggr_array)| {
550+
(
551+
accumulator,
552+
aggr_array
553+
.iter()
554+
.map(|array| {
555+
// 2.3
556+
array.slice(offsets[0], offsets[1] - offsets[0])
557+
})
558+
.collect::<Vec<ArrayRef>>(),
559+
)
560+
})
561+
.try_for_each(|(accumulator, values)| {
562+
if let Some(accumulator) = accumulator {
563+
match mode {
564+
AggregateMode::Partial | AggregateMode::Full => {
565+
accumulator.update_batch(&values)
566+
}
567+
AggregateMode::FinalPartitioned
568+
| AggregateMode::Final => {
569+
// note: the aggregation here is over states, not values, thus the merge
570+
accumulator.merge_batch(&values)
571+
}
571572
}
573+
} else {
574+
// We do groups accumulator separately.
575+
Ok(())
572576
}
573-
} else {
574-
// We do groups accumulator separately.
575-
Ok(())
576-
}
577-
})
578-
})?;
577+
})
578+
})?;
579+
}
579580

580581
for (accumulator_index, accumulator) in accumulation_state
581582
.groups_accumulators

0 commit comments

Comments
 (0)