Skip to content

Conversation

@petern48
Copy link
Contributor

@petern48 petern48 commented Nov 3, 2025

Which issue does this PR close?

Rationale for this change

What changes are included in this PR?

This PR adds the reduction_factor metric to the AggregateExec mode=Partial case.

e.g from the issue

create table t1(a int, b int);
insert into t1 values (1,10), (1, 20), (2,10), (2,30);
explain analyze select a, sum(b) from t1 group by a;

+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                         |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[sum(t1.b)], metrics=[output_rows=2, elapsed_compute=7.856539ms, output_bytes=544.0 B]                            |
|                   |   CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=2, elapsed_compute=192.334µs, output_bytes=96.0 KB]                                                      |
|                   |     RepartitionExec: partitioning=Hash([a@0], 10), input_partitions=10, metrics=[]                                                                                           |
|                   |       RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1, metrics=[]                                                                                      |
|                   |         AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[sum(t1.b)], metrics=[output_rows=2, elapsed_compute=2.581625ms, output_bytes=544.0 B, reduction_factor=50% (2/4)] |
|                   |           DataSourceExec: partitions=1, partition_sizes=[1], metrics=[]                                                                                                      |
|                   |                                                                                                                                                                              |
+-------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Note: For AggregateExec cases where this doesn't apply, the reduction_factor metric won't be shown. Here's an example of the explain analyze from the modified test in explain_analyze.rs.

running query: EXPLAIN ANALYZE SELECT count(*) as cnt FROM (SELECT count(*), c1 FROM aggregate_test_100 WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434' GROUP BY c1 ORDER BY c1 ) AS a UNION ALL SELECT 1 as cnt UNION ALL SELECT lead(c1, 1) OVER () as cnt FROM (select 1 as c1) AS b LIMIT 3
Query Output:

+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | CoalescePartitionsExec: fetch=3, metrics=[output_rows=3, elapsed_compute=6.084µs, output_bytes=25.0 B]                                                                                                                                                                                                                                                                                                                                                                                                    |
|                   |   UnionExec, metrics=[output_rows=3, elapsed_compute=117.208µs, output_bytes=25.0 B]                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |     ProjectionExec: expr=[count(Int64(1))@0 as cnt], metrics=[output_rows=1, elapsed_compute=1.333µs, output_bytes=8.0 B]                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |       AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=1, elapsed_compute=70.542µs, output_bytes=8.0 B]                                                                                                                                                                                                                                                                                                                                                                    |
|                   |         CoalescePartitionsExec, metrics=[output_rows=3, elapsed_compute=4.958µs, output_bytes=24.0 B]                                                                                                                                                                                                                                                                                                                                                                                                     |
|                   |           AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))], metrics=[output_rows=3, elapsed_compute=51.835µs, output_bytes=24.0 B]                                                                                                                                                                                                                                                                                                                                                             |
|                   |             ProjectionExec: expr=[], metrics=[output_rows=5, elapsed_compute=2.251µs, output_bytes=0.0 B]                                                                                                                                                                                                                                                                                                                                                                                                 |
|                   |               AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[], metrics=[output_rows=5, elapsed_compute=76.666µs, output_bytes=48.0 KB, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, peak_mem_used=50544, aggregate_arguments_time=3ns, aggregation_time=3ns, emitting_time=5.875µs, time_calculating_group_ids=9.459µs]                                                                                                                                                            |
|                   |                 CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=5, elapsed_compute=11.249µs, output_bytes=192.0 KB]                                                                                                                                                                                                                                                                                                                                                                     |
|                   |                   RepartitionExec: partitioning=Hash([c1@0], 3), input_partitions=3, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=15.064041ms, repartition_time=149.418µs, send_time=8.672µs]                                                                                                                                                                                                                                                                                  |
|                   |                     AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[], metrics=[output_rows=5, elapsed_compute=248.667µs, output_bytes=16.0 KB, spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, skipped_aggregation_rows=0, peak_mem_used=52168, aggregate_arguments_time=3ns, aggregation_time=3ns, emitting_time=7.377µs, time_calculating_group_ids=128.46µs, reduction_factor=5.1% (5/99)]                                                                                                   |
|                   |                       CoalesceBatchesExec: target_batch_size=4096, metrics=[output_rows=99, elapsed_compute=81.459µs, output_bytes=64.0 KB]                                                                                                                                                                                                                                                                                                                                                               |
|                   |                         FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434, projection=[c1@0], metrics=[output_rows=99, elapsed_compute=503.793µs, output_bytes=1584.0 B, selectivity=99% (99/100)]                                                                                                                                                                                                                                                                                                      |
|                   |                           RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1, metrics=[spill_count=0, spilled_bytes=0.0 B, spilled_rows=0, fetch_time=4.160958ms, repartition_time=1ns, send_time=16.085µs]                                                                                                                                                                                                                                                                             |
|                   |                             DataSourceExec: file_groups={1 group: [[Users/peter/Documents/open-source/datafusion/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c13], file_type=csv, has_header=true, metrics=[output_rows=100, elapsed_compute=1ns, output_bytes=19.1 KB, batches_split=0, file_open_errors=0, file_scan_errors=0, time_elapsed_opening=313.458µs, time_elapsed_processing=3.974624ms, time_elapsed_scanning_total=3.771208ms, time_elapsed_scanning_until_data=3.714625ms] |
|                   |     ProjectionExec: expr=[1 as cnt], metrics=[output_rows=1, elapsed_compute=20.792µs, output_bytes=8.0 B]                                                                                                                                                                                                                                                                                                                                                                                                |
|                   |       PlaceholderRowExec, metrics=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                      |
|                   |     ProjectionExec: expr=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as cnt], metrics=[output_rows=1, elapsed_compute=1.333µs, output_bytes=9.0 B]                                                                                                                                                                                                                                                                                                                    |
|                   |       BoundedWindowAggExec: wdw=[lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "lead(b.c1,Int64(1)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": nullable Int64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted], metrics=[output_rows=1, elapsed_compute=560µs, output_bytes=17.0 B]                                                                                                                             |
|                   |         ProjectionExec: expr=[1 as c1], metrics=[output_rows=1, elapsed_compute=2.459µs, output_bytes=8.0 B]                                                                                                                                                                                                                                                                                                                                                                                              |
|                   |           PlaceholderRowExec, metrics=[]                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The following cases don't include reduction_factor metric

  • AggregateExec: mode=Final, gby=[], aggr=[count(Int64(1))]
  • AggregateExec: mode=Partial, gby=[], aggr=[count(Int64(1))]
  • AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[]

While this case does:

  • AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[] -> reduction_factor=5.1% (5/99)

Are these changes tested?

Yes

Are there any user-facing changes?

Yes, a new metric will be visible when running EXPLAIN ANALYZE

@github-actions github-actions bot added core Core DataFusion crate physical-plan Changes to the physical-plan crate labels Nov 3, 2025
Copy link
Contributor

@2010YOUY01 2010YOUY01 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Good to go after CI passes.

@petern48 petern48 marked this pull request as ready for review November 3, 2025 03:35
@petern48 petern48 requested a review from 2010YOUY01 November 3, 2025 03:36
@petern48
Copy link
Contributor Author

petern48 commented Nov 3, 2025

Oh my bad, I didn't mean to click re-request review. (I forgot this is datafusion with the 24 hr merge delay)

assert_metrics!(
&formatted,
"AggregateExec: mode=Partial, gby=[c1@0 as c1]",
"reduction_factor=5.1% (5/99)"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really nice

@alamb
Copy link
Contributor

alamb commented Nov 3, 2025

Thank you @2010YOUY01 and @petern48

@alamb alamb added this pull request to the merge queue Nov 3, 2025
Merged via the queue into apache:main with commit ac41f44 Nov 3, 2025
32 checks passed
@petern48 petern48 deleted the reduction_factor_metric branch November 4, 2025 04:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support reduction_fator metric (for EXPLAIN ANALYZE) in AggregateExec

3 participants