Skip to content

Conversation

@avamingli
Copy link
Contributor

PostgreSQL's parallel processing cannot handle window functions. In contrast, our distributed environment enables parallel execution of window functions across multiple processes on multiple segments. For example:

  sum(a) over(partition by b order by c)

The window function can be processed by redistributing data based on column b to ensure all rows with the same b value are processed by the same worker, significantly improving efficiency.

Even without PARTITION BY clauses, we can still enable parallelism by allowing partial_path for window functions and subpaths, with parallel scanning of underlying tables for data filtering.

Exclude CASE WHEN expressions in window functions (as they complicate parallelization and make it difficult to guarantee correct data ordering)

Example non-parallel execution plan:

SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS (PARTITION BY depname ORDER BY salary DESC);
                  QUERY PLAN
----------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  WindowAgg
         Partition By: depname
         Order By: salary
         ->  Sort
               Sort Key: depname, salary DESC
               ->  Seq Scan on empsalary
Parallel execution plan (4-parallel):
SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS (PARTITION BY depname ORDER BY salary DESC);
                             QUERY PLAN
---------------------------------------------------------------------
 Gather Motion 12:1  (slice1; segments: 12)
   ->  WindowAgg
         Partition By: depname
         Order By: salary
         ->  Sort
               Sort Key: depname, salary DESC
               ->  Redistribute Motion 12:12  (slice2; segments: 12)
                     Hash Key: depname
                     Hash Module: 3
                     ->  Parallel Seq Scan on empsalary

For window function execution plans that can be parallelized, performance typically scales in positive correlation with the degree of parallelism.

In complex queries containing window functions, parallel processing may sometimes be inhibited due to cost considerations or other constraints. However, our approach still provides valuable parallelization opportunities for window function subpaths, delivering measurable query efficiency improvements. We have observed significant performance gains in TPC-DS benchmarks through this partial parallelization capability.

TPC-DS queries via parallel execution plans (50G AOCS, 4 workers):

Query Before(ms) After(ms) Saved(ms) Gain Plan Change
q12 10,439.08 4,613.52 5,825.56 55.8% serial→parallel
q20 21,487.08 8,723.74 12,763.34 59.4% serial→parallel
q44 33,816.75 22,515.03 11,301.72 33.4% better parallel
q49 60,039.45 28,603.51 31,435.95 52.4% serial→parallel
q98 40,114.21 17,052.78 23,061.43 57.5% serial→parallel

changes:

  • Enabled parallel plans for q12/q20/q49/q98 (prev. serial)
  • Optimized parallel plan for q44
  • Avg gain: 52% (best: q20 59.4%, saved 12.7s)

Authored-by: Zhang Mingli [email protected]

Fixes #ISSUE_Number

What does this PR do?

Type of Change

  • Bug fix (non-breaking change)
  • New feature (non-breaking change)
  • Breaking change (fix or feature with breaking changes)
  • Documentation update

Breaking Changes

Test Plan

  • Unit tests added/updated
  • Integration tests added/updated
  • Passed make installcheck
  • Passed make -C src/test installcheck-cbdb-parallel

Impact

Performance:

User-facing changes:

Dependencies:

Checklist

Additional Context

CI Skip Instructions


@avamingli
Copy link
Contributor Author

TPS-DS q12, details:

-- q12 old
                                                                                                  QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=10436.303..10436.303 rows=100 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=10436.303..10436.303 rows=100 loops=1)
         Merge Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(web_sales.ws_ext_sales_price)) * '100'::numeric) / sum((sum(web_sales.ws_ext_sales_price))) OVER (?)))
         ->  Limit (actual time=10436.303..10436.303 rows=100 loops=1)
               ->  Sort (actual time=10436.303..10436.303 rows=100 loops=1)
                     Sort Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(web_sales.ws_ext_sales_price)) * '100'::numeric) / sum((sum(web_sales.ws_ext_sales_price))) OVER (?)))
                     Sort Method:  top-N heapsort  Memory: 209kB
                     ->  WindowAgg (actual time=10420.303..10428.303 rows=3460 loops=1)
                           Partition By: item.i_class
                           ->  Sort (actual time=10420.303..10424.303 rows=3460 loops=1)
                                 Sort Key: item.i_class
                                 Sort Method:  quicksort  Memory: 2607kB
                                 ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=10288.299..10420.303 rows=3460 loops=1)
                                       Hash Key: item.i_class
                                       ->  GroupAggregate (actual time=10332.300..10408.302 rows=3152 loops=1)
                                             Group Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                             ->  Sort (actual time=10332.300..10344.301 rows=34669 loops=1)
                                                   Sort Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                                   Sort Method:  quicksort  Memory: 29848kB
                                                   ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=84.002..10116.294 rows=34669 loops=1)
                                                         Hash Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, item.i_current_price
                                                         ->  Hash Join (actual time=60.002..10020.291 rows=34690 loops=1)
                                                               Hash Cond: (web_sales.ws_item_sk = item.i_item_sk)
                                                               Extra Text: (seg0)   Hash chain length 1.0 avg, 3 max, using 6099 of 262144 buckets.
                                                               ->  Hash Join (actual time=20.001..9580.278 rows=115483 loops=1)
                                                                     Hash Cond: (web_sales.ws_sold_date_sk = date_dim.d_date_sk)
                                                                     Extra Text: (seg1)   Hash chain length 1.0 avg, 1 max, using 31 of 1048576 buckets.
                                                                     ->  Seq Scan on web_sales (actual time=8.000..7048.205 rows=12081030 loops=1)
                                                                     ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                           Buckets: 1048576  Batches: 1  Memory Usage: 8194kB
                                                                           ->  Broadcast Motion 3:3  (slice4; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                 ->  Seq Scan on date_dim (actual time=4.000..8.000 rows=13 loops=1)
                                                                                       Filter: ((d_date >= '1998-02-03'::date) AND (d_date <= '1998-03-05 00:00:00'::timestamp without time zone))
                                                               ->  Hash (actual time=36.001..36.001 rows=6173 loops=1)
                                                                     Buckets: 262144  Batches: 1  Memory Usage: 3134kB
                                                                     ->  Seq Scan on item (actual time=12.000..28.001 rows=6173 loops=1)
                                                                           Filter: ((i_category)::text = ANY ('{Sports,Men,Jewelry}'::text[]))
 Planning Time: 2.614 ms
   (slice0)    Executor memory: 189K bytes.
   (slice1)    Executor memory: 988K bytes avg x 3x(0) workers, 1111K bytes max (seg2).  Work_mem: 783K bytes max.
   (slice2)    Executor memory: 9205K bytes avg x 3x(0) workers, 9404K bytes max (seg2).  Work_mem: 7814K bytes max.
   (slice3)    Executor memory: 41241K bytes avg x 3x(0) workers, 41241K bytes max (seg1).  Work_mem: 8194K bytes max.
   (slice4)    Executor memory: 6413K bytes avg x 3x(0) workers, 6413K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 10439.077 ms
(46 rows)

-- q12 new
                                                                                                  QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=4588.139..4588.139 rows=100 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=4588.139..4588.139 rows=100 loops=1)
         Merge Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(web_sales.ws_ext_sales_price)) * '100'::numeric) / sum((sum(web_sales.ws_ext_sales_price))) OVER (?)))
         ->  Limit (actual time=4584.139..4584.139 rows=100 loops=1)
               ->  Sort (actual time=4584.139..4584.139 rows=100 loops=1)
                     Sort Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(web_sales.ws_ext_sales_price)) * '100'::numeric) / sum((sum(web_sales.ws_ext_sales_price))) OVER (?)))
                     Sort Method:  top-N heapsort  Memory: 211kB
                     ->  WindowAgg (actual time=4572.138..4580.138 rows=3460 loops=1)
                           Partition By: item.i_class
                           ->  Sort (actual time=4568.138..4572.138 rows=3460 loops=1)
                                 Sort Key: item.i_class
                                 Sort Method:  quicksort  Memory: 2607kB
                                 ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=4456.135..4564.138 rows=3460 loops=1)
                                       Hash Key: item.i_class
                                       ->  GroupAggregate (actual time=4452.135..4516.136 rows=3152 loops=1)
                                             Group Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                             ->  Sort (actual time=4452.135..4464.135 rows=34669 loops=1)
                                                   Sort Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                                   Sort Method:  quicksort  Memory: 29848kB
                                                   ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=72.002..4256.129 rows=34669 loops=1)
                                                         Hash Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, item.i_current_price
                                                         ->  Hash Join (actual time=72.002..4184.126 rows=34690 loops=1)
                                                               Hash Cond: (web_sales.ws_item_sk = item.i_item_sk)
                                                               Extra Text: (seg0)   Hash chain length 1.0 avg, 3 max, using 6099 of 262144 buckets.
                                                               ->  Redistribute Motion 12:3  (slice4; segments: 12) (actual time=32.001..4056.123 rows=115483 loops=1)
                                                                     Hash Key: web_sales.ws_item_sk
                                                                     ->  Hash Join (actual time=48.001..3592.109 rows=28720 loops=1)
                                                                           Hash Cond: (web_sales.ws_sold_date_sk = date_dim.d_date_sk)
                                                                           Extra Text: (seg1)   Hash chain length 1.0 avg, 1 max, using 31 of 1048576 buckets.
                                                                           ->  Parallel Seq Scan on web_sales (actual time=20.001..2060.062 rows=3000000 loops=1)
                                                                           ->  Hash (actual time=12.000..12.000 rows=31 loops=1)
                                                                                 Buckets: 1048576  Batches: 1  Memory Usage: 8194kB
                                                                                 ->  Broadcast Motion 3:12  (slice5; segments: 3) (actual time=0.000..12.000 rows=31 loops=1)
                                                                                       ->  Seq Scan on date_dim (actual time=12.000..20.001 rows=13 loops=1)
                                                                                             Filter: ((d_date >= '1998-02-03'::date) AND (d_date <= '1998-03-05 00:00:00'::timestamp without time zone))
                                                               ->  Hash (actual time=40.001..40.001 rows=6173 loops=1)
                                                                     Buckets: 262144  Batches: 1  Memory Usage: 3134kB
                                                                     ->  Seq Scan on item (actual time=12.000..36.001 rows=6173 loops=1)
                                                                           Filter: ((i_category)::text = ANY ('{Sports,Men,Jewelry}'::text[]))
 Planning Time: 2.708 ms
   (slice0)    Executor memory: 194K bytes.
   (slice1)    Executor memory: 987K bytes avg x 3x(0) workers, 1111K bytes max (seg2).  Work_mem: 783K bytes max.
   (slice2)    Executor memory: 9292K bytes avg x 3x(0) workers, 9491K bytes max (seg2).  Work_mem: 7814K bytes max.
   (slice3)    Executor memory: 23420K bytes avg x 3x(0) workers, 23420K bytes max (seg1).  Work_mem: 3134K bytes max.
   (slice4)    Executor memory: 17895K bytes avg x 12x(0) workers, 17895K bytes max (seg0).  Work_mem: 8194K bytes max.
   (slice5)    Executor memory: 6413K bytes avg x 3x(0) workers, 6413K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 4613.516 ms
(49 rows)

@avamingli
Copy link
Contributor Author

TPS-DS Q20

-- q20 old
                                                                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=21484.455..21484.455 rows=100 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=21484.455..21484.455 rows=100 loops=1)
         Merge Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(catalog_sales.cs_ext_sales_price)) * '100'::numeric) / sum((sum(catalog_sales.cs_ext_sales_price))) OVER (?)))
         ->  Limit (actual time=21484.455..21484.455 rows=100 loops=1)
               ->  Sort (actual time=21484.455..21484.455 rows=100 loops=1)
                     Sort Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(catalog_sales.cs_ext_sales_price)) * '100'::numeric) / sum((sum(catalog_sales.cs_ext_sales_price))) OVER (?)))
                     Sort Method:  top-N heapsort  Memory: 221kB
                     ->  WindowAgg (actual time=21452.455..21472.455 rows=3865 loops=1)
                           Partition By: item.i_class
                           ->  Sort (actual time=21452.455..21452.455 rows=3865 loops=1)
                                 Sort Key: item.i_class
                                 Sort Method:  quicksort  Memory: 2629kB
                                 ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=21096.447..21448.455 rows=3865 loops=1)
                                       Hash Key: item.i_class
                                       ->  GroupAggregate (actual time=21268.451..21440.455 rows=3161 loops=1)
                                             Group Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                             ->  Sort (actual time=21268.451..21292.451 rows=71690 loops=1)
                                                   Sort Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                                   Sort Method:  quicksort  Memory: 61365kB
                                                   ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=3824.081..20680.438 rows=71690 loops=1)
                                                         Hash Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, item.i_current_price
                                                         ->  Hash Join (actual time=3816.081..20460.434 rows=72694 loops=1)
                                                               Hash Cond: (catalog_sales.cs_item_sk = item.i_item_sk)
                                                               Extra Text: (seg1)   Hash chain length 1.0 avg, 3 max, using 6167 of 262144 buckets.
                                                               ->  Hash Join (actual time=3732.079..20264.430 rows=236599 loops=1)
                                                                     Hash Cond: (catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)
                                                                     Extra Text: (seg1)   Hash chain length 1.0 avg, 1 max, using 31 of 1048576 buckets.
                                                                     ->  Seq Scan on catalog_sales (actual time=12.000..14928.316 rows=24158497 loops=1)
                                                                     ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                           Buckets: 1048576  Batches: 1  Memory Usage: 8194kB
                                                                           ->  Broadcast Motion 3:3  (slice4; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                 ->  Seq Scan on date_dim (actual time=4.000..8.000 rows=12 loops=1)
                                                                                       Filter: ((d_date >= '1999-05-15'::date) AND (d_date <= '1999-06-14 00:00:00'::timestamp without time zone))
                                                               ->  Hash (actual time=80.002..80.002 rows=6252 loops=1)
                                                                     Buckets: 262144  Batches: 1  Memory Usage: 3138kB
                                                                     ->  Seq Scan on item (actual time=24.001..64.001 rows=6252 loops=1)
                                                                           Filter: ((i_category)::text = ANY ('{Books,Music,Home}'::text[]))
 Planning Time: 4.175 ms
   (slice0)    Executor memory: 189K bytes.
   (slice1)    Executor memory: 1044K bytes avg x 3x(0) workers, 1204K bytes max (seg0).  Work_mem: 879K bytes max.
   (slice2)    Executor memory: 18851K bytes avg x 3x(0) workers, 19092K bytes max (seg1).  Work_mem: 15966K bytes max.
   (slice3)    Executor memory: 41230K bytes avg x 3x(0) workers, 41240K bytes max (seg0).  Work_mem: 8194K bytes max.
   (slice4)    Executor memory: 6413K bytes avg x 3x(0) workers, 6413K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 21487.075 ms
(46 rows)

-- q20 new
                                                                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=8704.184..8704.184 rows=100 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=8704.184..8704.184 rows=100 loops=1)
         Merge Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(catalog_sales.cs_ext_sales_price)) * '100'::numeric) / sum((sum(catalog_sales.cs_ext_sales_price))) OVER (?)))
         ->  Limit (actual time=8700.184..8700.184 rows=100 loops=1)
               ->  Sort (actual time=8700.184..8700.184 rows=100 loops=1)
                     Sort Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(catalog_sales.cs_ext_sales_price)) * '100'::numeric) / sum((sum(catalog_sales.cs_ext_sales_price))) OVER (?)))
                     Sort Method:  top-N heapsort  Memory: 222kB
                     ->  WindowAgg (actual time=8684.184..8696.184 rows=3865 loops=1)
                           Partition By: item.i_class
                           ->  Sort (actual time=8680.184..8680.184 rows=3865 loops=1)
                                 Sort Key: item.i_class
                                 Sort Method:  quicksort  Memory: 2629kB
                                 ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=8420.178..8680.184 rows=3865 loops=1)
                                       Hash Key: item.i_class
                                       ->  GroupAggregate (actual time=8428.179..8564.181 rows=3161 loops=1)
                                             Group Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                             ->  Sort (actual time=8428.179..8432.179 rows=71690 loops=1)
                                                   Sort Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                                   Sort Method:  quicksort  Memory: 61365kB
                                                   ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=4128.087..8024.170 rows=71690 loops=1)
                                                         Hash Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, item.i_current_price
                                                         ->  Hash Join (actual time=4116.087..7780.165 rows=72694 loops=1)
                                                               Hash Cond: (catalog_sales.cs_item_sk = item.i_item_sk)
                                                               Extra Text: (seg1)   Hash chain length 1.0 avg, 3 max, using 6167 of 262144 buckets.
                                                               ->  Redistribute Motion 12:3  (slice4; segments: 12) (actual time=4068.086..7580.161 rows=236599 loops=1)
                                                                     Hash Key: catalog_sales.cs_item_sk
                                                                     ->  Hash Join (actual time=4112.087..5376.114 rows=48209 loops=1)
                                                                           Hash Cond: (catalog_sales.cs_sold_date_sk = date_dim.d_date_sk)
                                                                           Extra Text: (seg1)   Hash chain length 1.0 avg, 1 max, using 31 of 1048576 buckets.
                                                                           ->  Parallel Seq Scan on catalog_sales (actual time=28.001..3356.071 rows=6000000 loops=1)
                                                                           ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                 Buckets: 1048576  Batches: 1  Memory Usage: 8194kB
                                                                                 ->  Broadcast Motion 3:12  (slice5; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                       ->  Seq Scan on date_dim (actual time=8.000..12.000 rows=12 loops=1)
                                                                                             Filter: ((d_date >= '1999-05-15'::date) AND (d_date <= '1999-06-14 00:00:00'::timestamp without time zone))
                                                               ->  Hash (actual time=48.001..48.001 rows=6252 loops=1)
                                                                     Buckets: 262144  Batches: 1  Memory Usage: 3138kB
                                                                     ->  Seq Scan on item (actual time=12.000..32.001 rows=6252 loops=1)
                                                                           Filter: ((i_category)::text = ANY ('{Books,Music,Home}'::text[]))
 Planning Time: 2.593 ms
   (slice0)    Executor memory: 194K bytes.
   (slice1)    Executor memory: 1044K bytes avg x 3x(0) workers, 1204K bytes max (seg0).  Work_mem: 879K bytes max.
   (slice2)    Executor memory: 18938K bytes avg x 3x(0) workers, 19179K bytes max (seg1).  Work_mem: 15966K bytes max.
   (slice3)    Executor memory: 23409K bytes avg x 3x(0) workers, 23420K bytes max (seg1).  Work_mem: 3138K bytes max.
   (slice4)    Executor memory: 17894K bytes avg x 12x(0) workers, 17894K bytes max (seg2).  Work_mem: 8194K bytes max.
   (slice5)    Executor memory: 6413K bytes avg x 3x(0) workers, 6413K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 8723.736 ms
(49 rows)

@avamingli avamingli added type: Performance cloudberry runs slow on some particular query planner labels Jul 28, 2025
@my-ship-it
Copy link
Contributor

my-ship-it commented Jul 29, 2025

Do we have test results against TPC-DS compared with ORCA?

@avamingli
Copy link
Contributor Author

Do we have test results against TPC-DS compared with ORCA?

This feature is based on the PG optimizer and does not involve ORCA optimization, so ORCA won't have any changes. The overall results will be somewhat better than the previous PG planner, but since window functions account for a relatively small portion of the entire TPC-DS test, it might narrow the gap with ORCA. We'll need comprehensive testing to verify.

@avamingli
Copy link
Contributor Author

Q44

q44 old
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=20508.439..20508.439 rows=0 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=20508.439..20508.439 rows=0 loops=1)
         Merge Key: v11.rnk
         ->  Limit (actual time=20496.439..20496.439 rows=0 loops=1)
               ->  Sort (actual time=20496.439..20496.439 rows=0 loops=1)
                     Sort Key: v11.rnk
                     Sort Method:  quicksort  Memory: 75kB
                     ->  Hash Join (actual time=20496.439..20496.439 rows=0 loops=1)
                           Hash Cond: (i2.i_item_sk = v21.item_sk)
                           ->  Seq Scan on item i2 (never executed)
                           ->  Hash (actual time=20476.439..20476.439 rows=0 loops=1)
                                 Buckets: 1048576  Batches: 1  Memory Usage: 8192kB
                                 ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=20476.439..20476.439 rows=0 loops=1)
                                       Hash Key: v21.item_sk
                                       ->  Hash Join (actual time=20472.439..20472.439 rows=0 loops=1)
                                             Hash Cond: (i1.i_item_sk = v11.item_sk)
                                             ->  Seq Scan on item i1 (never executed)
                                             ->  Hash (actual time=20440.438..20440.438 rows=0 loops=1)
                                                   Buckets: 1048576  Batches: 1  Memory Usage: 8192kB
                                                   ->  Redistribute Motion 1:3  (slice3; segments: 1) (actual time=20440.438..20440.438 rows=0 loops=1)
                                                         Hash Key: v11.item_sk
                                                         ->  Hash Join (actual time=20448.438..20448.438 rows=0 loops=1)
                                                               Hash Cond: (v11.rnk = v21.rnk)
                                                               ->  Subquery Scan on v11 (actual time=20448.438..20448.438 rows=0 loops=1)
                                                                     Filter: (v11.rnk < 11)
                                                                     ->  WindowAgg (actual time=20448.438..20448.438 rows=0 loops=1)
                                                                           Order By: v1.rank_col
                                                                           ->  Gather Motion 3:1  (slice4; segments: 3) (actual time=20448.438..20448.438 rows=0 loops=1)
                                                                                 Merge Key: v1.rank_col
                                                                                 ->  Sort (actual time=17764.381..17764.381 rows=0 loops=1)
                                                                                       Sort Key: v1.rank_col
                                                                                       Sort Method:  quicksort  Memory: 75kB
                                                                                       ->  Subquery Scan on v1 (actual time=17764.381..17764.381 rows=0 loops=1)
                                                                                             ->  GroupAggregate (actual time=17764.381..17764.381 rows=0 loops=1)
                                                                                                   Group Key: ss1.ss_item_sk
                                                                                                   Filter: (avg(ss1.ss_net_profit) > (0.9 * $0))
                                                                                                   InitPlan 1 (returns $0)  (slice5)
                                                                                                     ->  Gather Motion 12:1  (slice6; segments: 12) (actual time=6348.136..0.000 rows=0 loops=1)
                                                                                                           ->  GroupAggregate (actual time=6336.136..6336.136 rows=0 loops=1)
                                                                                                                 Group Key: store_sales.ss_store_sk
                                                                                                                 ->  Redistribute Motion 12:12  (slice7; segments: 12) (actual time=6336.136..6336.136 rows=0 loops=1)
                                                                                                                       Hash Key: store_sales.ss_store_sk
                                                                                                                       Hash Module: 3
                                                                                                                       ->  Parallel Seq Scan on store_sales (actual time=6332.136..6332.136 rows=0 loops=1)
                                                                                                                             Filter: ((ss_cdemo_sk IS NULL) AND (ss_store_sk = 5))
                                                                                                   ->  Sort (actual time=17764.381..17764.381 rows=0 loops=1)
                                                                                                         Sort Key: ss1.ss_item_sk
                                                                                                         Sort Method:  quicksort  Memory: 75kB
                                                                                                         ->  Seq Scan on store_sales ss1 (actual time=17764.381..17764.381 rows=0 loops=1)
                                                                                                               Filter: (ss_store_sk = 5)
                                                               ->  Hash (never executed)
                                                                     ->  Subquery Scan on v21 (never executed)
                                                                           Filter: (v21.rnk < 11)
                                                                           ->  WindowAgg (never executed)
                                                                                 Order By: v2.rank_col
                                                                                 ->  Gather Motion 3:1  (slice8; segments: 3) (never executed)
                                                                                       Merge Key: v2.rank_col
                                                                                       ->  Sort (actual time=20720.444..20720.444 rows=0 loops=1)
                                                                                             Sort Key: v2.rank_col DESC
                                                                                             Sort Method:  quicksort  Memory: 75kB
                                                                                             ->  Subquery Scan on v2 (actual time=20720.444..20720.444 rows=0 loops=1)
                                                                                                   ->  GroupAggregate (actual time=20720.444..20720.444 rows=0 loops=1)
                                                                                                         Group Key: ss1_1.ss_item_sk
                                                                                                         Filter: (avg(ss1_1.ss_net_profit) > (0.9 * $1))
                                                                                                         InitPlan 2 (returns $1)  (slice9)
                                                                                                           ->  Gather Motion 12:1  (slice10; segments: 12) (actual time=6680.143..0.000 rows=0 loops=1)
                                                                                                                 ->  GroupAggregate (actual time=6676.143..6676.143 rows=0 loops=1)
                                                                                                                       Group Key: store_sales_1.ss_store_sk
                                                                                                                       ->  Redistribute Motion 12:12  (slice11; segments: 12) (actual time=6676.143..6676.143 rows=0 loops=1)
                                                                                                                             Hash Key: store_sales_1.ss_store_sk
                                                                                                                             Hash Module: 3
                                                                                                                             ->  Parallel Seq Scan on store_sales store_sales_1 (actual time=4988.107..4988.107 rows=0 loops=1)
                                                                                                                                   Filter: ((ss_cdemo_sk IS NULL) AND (ss_store_sk = 5))
                                                                                                         ->  Sort (actual time=20720.444..20720.444 rows=0 loops=1)
                                                                                                               Sort Key: ss1_1.ss_item_sk
                                                                                                               Sort Method:  quicksort  Memory: 75kB
                                                                                                               ->  Seq Scan on store_sales ss1_1 (actual time=20720.444..20720.444 rows=0 loops=1)
                                                                                                                     Filter: (ss_store_sk = 5)
 Planning Time: 6.328 ms
   (slice0)    Executor memory: 390K bytes.
   (slice1)    Executor memory: 8225K bytes avg x 3x(0) workers, 8225K bytes max (seg0).  Work_mem: 8192K bytes max.
   (slice2)    Executor memory: 8221K bytes avg x 3x(0) workers, 8221K bytes max (seg0).  Work_mem: 8192K bytes max.
   (slice3)    Executor memory: 137K bytes (seg1).
   (slice4)    Executor memory: 9622K bytes avg x 3x(0) workers, 9622K bytes max (seg1).
   (slice5)    Executor memory: 271K bytes.
 _ (slice6)    Workers: Workers: 12 not dispatched;.
 Executor memory: 81K bytes avg x 12x(0) workers, 123K bytes max (seg0).
 _ (slice7)    Workers: Workers: 12 not dispatched;.
 Executor memory: 9642K bytes avg x 12x(0) workers, 9643K bytes max (seg0).
   (slice8)    Executor memory: 9622K bytes avg x 3x(0) workers, 9622K bytes max (seg1).
   (slice9)    Executor memory: 330K bytes.
 _ (slice10)   Workers: Workers: 12 not dispatched;.
 Executor memory: 107K bytes avg x 12x(0) workers, 123K bytes max (seg0).
 _ (slice11)   Workers: Workers: 12 not dispatched;.
 Executor memory: 9642K bytes avg x 12x(0) workers, 9643K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 33816.747 ms
(98 rows)


-- q44 new
                                                                                                     QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=8072.172..8072.172 rows=0 loops=1)
   ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=8072.172..8072.172 rows=0 loops=1)
         Merge Key: v11.rnk
         ->  Limit (actual time=8072.172..8072.172 rows=0 loops=1)
               ->  Sort (actual time=8072.172..8072.172 rows=0 loops=1)
                     Sort Key: v11.rnk
                     Sort Method:  quicksort  Memory: 75kB
                     ->  Hash Join (actual time=8072.172..8072.172 rows=0 loops=1)
                           Hash Cond: (v21.item_sk = i2.i_item_sk)
                           ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=8036.171..8036.171 rows=0 loops=1)
                                 Hash Key: v21.item_sk
                                 ->  Hash Join (actual time=8060.172..8060.172 rows=0 loops=1)
                                       Hash Cond: (v11.item_sk = i1.i_item_sk)
                                       ->  Redistribute Motion 1:3  (slice3; segments: 1) (actual time=7996.171..7996.171 rows=0 loops=1)
                                             Hash Key: v11.item_sk
                                             ->  Parallel Hash Join (actual time=8048.172..8048.172 rows=0 loops=1)
                                                   Hash Cond: (v11.rnk = v21.rnk)
                                                   ->  Subquery Scan on v11 (actual time=8048.172..8048.172 rows=0 loops=1)
                                                         Filter: (v11.rnk < 11)
                                                         ->  WindowAgg (actual time=8048.172..8048.172 rows=0 loops=1)
                                                               Order By: v1.rank_col
                                                               ->  Gather Motion 12:1  (slice4; segments: 12) (actual time=8048.172..8048.172 rows=0 loops=1)
                                                                     Merge Key: v1.rank_col
                                                                     ->  Sort (actual time=8044.172..8044.172 rows=0 loops=1)
                                                                           Sort Key: v1.rank_col
                                                                           Sort Method:  quicksort  Memory: 300kB
                                                                           ->  Subquery Scan on v1 (actual time=8036.171..8036.171 rows=0 loops=1)
                                                                                 ->  GroupAggregate (actual time=8036.171..8036.171 rows=0 loops=1)
                                                                                       Group Key: ss1.ss_item_sk
                                                                                       Filter: (avg(ss1.ss_net_profit) > (0.9 * $0))
                                                                                       InitPlan 1 (returns $0)  (slice6)
                                                                                         ->  Gather Motion 12:1  (slice7; segments: 12) (actual time=6816.145..0.000 rows=0 loops=1)
                                                                                               ->  GroupAggregate (actual time=6808.145..6808.145 rows=0 loops=1)
                                                                                                     Group Key: store_sales.ss_store_sk
                                                                                                     ->  Redistribute Motion 12:12  (slice8; segments: 12) (actual time=6808.145..6808.145 rows=0 loops=1)
                                                                                                           Hash Key: store_sales.ss_store_sk
                                                                                                           Hash Module: 3
                                                                                                           ->  Parallel Seq Scan on store_sales (actual time=5988.128..5988.128 rows=0 loops=1)
                                                                                                                 Filter: ((ss_cdemo_sk IS NULL) AND (ss_store_sk = 5))
                                                                                       ->  Sort (actual time=8036.171..8036.171 rows=0 loops=1)
                                                                                             Sort Key: ss1.ss_item_sk
                                                                                             Sort Method:  quicksort  Memory: 300kB
                                                                                             ->  Redistribute Motion 12:12  (slice5; segments: 12) (actual time=8036.171..8036.171 rows=0 loops=1)
                                                                                                   Hash Key: ss1.ss_item_sk
                                                                                                   Hash Module: 3
                                                                                                   ->  Parallel Seq Scan on store_sales ss1 (actual time=7916.169..7916.169 rows=0 loops=1)
                                                                                                         Filter: (ss_store_sk = 5)
                                                   ->  Parallel Hash (never executed)
                                                         ->  Subquery Scan on v21 (never executed)
                                                               Filter: (v21.rnk < 11)
                                                               ->  WindowAgg (never executed)
                                                                     Order By: v2.rank_col
                                                                     ->  Gather Motion 12:1  (slice9; segments: 12) (never executed)
                                                                           Merge Key: v2.rank_col
                                                                           ->  Sort (actual time=7912.169..7912.169 rows=0 loops=1)
                                                                                 Sort Key: v2.rank_col DESC
                                                                                 Sort Method:  quicksort  Memory: 300kB
                                                                                 ->  Subquery Scan on v2 (actual time=7912.169..7912.169 rows=0 loops=1)
                                                                                       ->  GroupAggregate (actual time=7912.169..7912.169 rows=0 loops=1)
                                                                                             Group Key: ss1_1.ss_item_sk
                                                                                             Filter: (avg(ss1_1.ss_net_profit) > (0.9 * $1))
                                                                                             InitPlan 2 (returns $1)  (slice11)
                                                                                               ->  Gather Motion 12:1  (slice12; segments: 12) (actual time=7488.160..0.000 rows=0 loops=1)
                                                                                                     ->  GroupAggregate (actual time=7484.160..7484.160 rows=0 loops=1)
                                                                                                           Group Key: store_sales_1.ss_store_sk
                                                                                                           ->  Redistribute Motion 12:12  (slice13; segments: 12) (actual time=7484.160..7484.160 rows=0 loops=1)
                                                                                                                 Hash Key: store_sales_1.ss_store_sk
                                                                                                                 Hash Module: 3
                                                                                                                 ->  Parallel Seq Scan on store_sales store_sales_1 (actual time=6832.146..6832.146 rows=0 loops=1)
                                                                                                                       Filter: ((ss_cdemo_sk IS NULL) AND (ss_store_sk = 5))
                                                                                             ->  Sort (actual time=7912.169..7912.169 rows=0 loops=1)
                                                                                                   Sort Key: ss1_1.ss_item_sk
                                                                                                   Sort Method:  quicksort  Memory: 300kB
                                                                                                   ->  Redistribute Motion 12:12  (slice10; segments: 12) (actual time=7912.169..7912.169 rows=0 loops=1)
                                                                                                         Hash Key: ss1_1.ss_item_sk
                                                                                                         Hash Module: 3
                                                                                                         ->  Parallel Seq Scan on store_sales ss1_1 (actual time=7428.158..7428.158 rows=0 loops=1)
                                                                                                               Filter: (ss_store_sk = 5)
                                       ->  Hash (actual time=44.001..44.001 rows=20772 loops=1)
                                             Buckets: 524288  Batches: 1  Memory Usage: 5266kB
                                             ->  Seq Scan on item i1 (actual time=4.000..20.000 rows=20772 loops=1)
                           ->  Hash (actual time=60.001..60.001 rows=20772 loops=1)
                                 Buckets: 524288  Batches: 1  Memory Usage: 5266kB
                                 ->  Seq Scan on item i2 (actual time=4.000..40.001 rows=20772 loops=1)
 Planning Time: 4.783 ms
   (slice0)    Executor memory: 403K bytes.
   (slice1)    Executor memory: 11895K bytes avg x 3x(0) workers, 11895K bytes max (seg2).  Work_mem: 5266K bytes max.
   (slice2)    Executor memory: 11890K bytes avg x 3x(0) workers, 11891K bytes max (seg2).  Work_mem: 5266K bytes max.
   (slice3)    Executor memory: 138K bytes (seg1).
   (slice4)    Executor memory: 268K bytes avg x 12x(0) workers, 268K bytes max (seg0).
   (slice5)    Executor memory: 9639K bytes avg x 12x(0) workers, 9639K bytes max (seg1).
   (slice6)    Executor memory: 277K bytes.
 _ (slice7)    Workers: Workers: 12 not dispatched;.
 Executor memory: 92K bytes avg x 12x(0) workers, 124K bytes max (seg0).
 _ (slice8)    Workers: Workers: 12 not dispatched;.
 Executor memory: 9643K bytes avg x 12x(0) workers, 9643K bytes max (seg0).
   (slice9)    Executor memory: 268K bytes avg x 12x(0) workers, 268K bytes max (seg0).
   (slice10)   Executor memory: 9639K bytes avg x 12x(0) workers, 9639K bytes max (seg1).
   (slice11)   Executor memory: 338K bytes.
 _ (slice12)   Workers: Workers: 12 not dispatched;.
 Executor memory: 108K bytes avg x 12x(0) workers, 124K bytes max (seg0).
 _ (slice13)   Workers: Workers: 12 not dispatched;.
 Executor memory: 9643K bytes avg x 12x(0) workers, 9643K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 22515.025 ms
(106 rows)

@avamingli
Copy link
Contributor Author

Q49 old

                                                                                         QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=60021.352..60021.352 rows=78 loops=1)
   ->  Sort (actual time=60021.352..60021.352 rows=78 loops=1)
         Sort Key: ('web'::text), web.return_rank, web.currency_rank
         Sort Method:  quicksort  Memory: 31kB
         ->  HashAggregate (actual time=60021.352..60021.352 rows=78 loops=1)
               Group Key: ('web'::text), web.item, web.return_ratio, web.return_rank, web.currency_rank
               Batches: 1  Memory Usage: 48kB
               Extra Text: hash table(s): 1; chain length 2.3 avg, 4 max; using 78 of 128 buckets; total 3 expansions.

               ->  Append (actual time=15556.351..60021.352 rows=78 loops=1)
                     ->  Subquery Scan on web (actual time=15556.351..15560.351 rows=28 loops=1)
                           Filter: ((web.return_rank <= 10) OR (web.currency_rank <= 10))
                           Rows Removed by Filter: 884
                           ->  WindowAgg (actual time=15556.351..15560.351 rows=912 loops=1)
                                 Order By: in_web.return_ratio
                                 ->  Sort (actual time=15556.351..15556.351 rows=912 loops=1)
                                       Sort Key: in_web.return_ratio
                                       Sort Method:  quicksort  Memory: 103kB
                                       ->  WindowAgg (actual time=15552.351..15556.351 rows=912 loops=1)
                                             Order By: in_web.currency_ratio
                                             ->  Gather Motion 3:1  (slice1; segments: 3) (actual time=15552.351..15552.351 rows=912 loops=1)
                                                   Merge Key: in_web.currency_ratio
                                                   ->  Sort (actual time=14908.337..14908.337 rows=323 loops=1)
                                                         Sort Key: in_web.currency_ratio
                                                         Sort Method:  quicksort  Memory: 149kB
                                                         ->  Subquery Scan on in_web (actual time=14904.337..14904.337 rows=323 loops=1)
                                                               ->  GroupAggregate (actual time=14904.337..14904.337 rows=323 loops=1)
                                                                     Group Key: ws.ws_item_sk
                                                                     ->  Sort (actual time=14904.337..14904.337 rows=329 loops=1)
                                                                           Sort Key: ws.ws_item_sk
                                                                           Sort Method:  quicksort  Memory: 153kB
                                                                           ->  Hash Join (actual time=784.018..14896.336 rows=329 loops=1)
                                                                                 Hash Cond: ((ws.ws_order_number = wr.wr_order_number) AND (ws.ws_item_sk = wr.wr_item_sk))
                                                                                 Extra Text: (seg2)   Hash chain length 1.0 avg, 3 max, using 9637 of 262144 buckets.
                                                                                 ->  Hash Join (actual time=28.001..14336.324 rows=160062 loops=1)
                                                                                       Hash Cond: (ws.ws_sold_date_sk = date_dim.d_date_sk)
                                                                                       Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 31 of 524288 buckets.
                                                                                       ->  Seq Scan on web_sales ws (actual time=24.001..13304.300 rows=4698728 loops=1)
                                                                                             Filter: ((ws_net_profit > '1'::numeric) AND (ws_net_paid > '0'::numeric) AND (ws_quantity > 0))
                                                                                       ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                             Buckets: 524288  Batches: 1  Memory Usage: 4098kB
                                                                                             ->  Broadcast Motion 3:3  (slice2; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                   ->  Seq Scan on date_dim (actual time=8.000..12.000 rows=13 loops=1)
                                                                                                         Filter: ((d_year = 2000) AND (d_moy = 12))
                                                                                 ->  Hash (actual time=760.017..760.017 rows=10091 loops=1)
                                                                                       Buckets: 262144  Batches: 1  Memory Usage: 2640kB
                                                                                       ->  Seq Scan on web_returns wr (actual time=12.000..752.017 rows=10091 loops=1)
                                                                                             Filter: (wr_return_amt > '10000'::numeric)
                     ->  Subquery Scan on catalog (actual time=18048.407..18056.407 rows=29 loops=1)
                           Filter: ((catalog.return_rank <= 10) OR (catalog.currency_rank <= 10))
                           Rows Removed by Filter: 1728
                           ->  WindowAgg (actual time=18048.407..18056.407 rows=1757 loops=1)
                                 Order By: in_cat.return_ratio
                                 ->  Sort (actual time=18048.407..18048.407 rows=1757 loops=1)
                                       Sort Key: in_cat.return_ratio
                                       Sort Method:  quicksort  Memory: 200kB
                                       ->  WindowAgg (actual time=18036.406..18040.407 rows=1757 loops=1)
                                             Order By: in_cat.currency_ratio
                                             ->  Gather Motion 3:1  (slice3; segments: 3) (actual time=18036.406..18036.406 rows=1757 loops=1)
                                                   Merge Key: in_cat.currency_ratio
                                                   ->  Sort (actual time=33596.758..33596.758 rows=606 loops=1)
                                                         Sort Key: in_cat.currency_ratio
                                                         Sort Method:  quicksort  Memory: 218kB
                                                         ->  Subquery Scan on in_cat (actual time=33592.758..33596.758 rows=606 loops=1)
                                                               ->  GroupAggregate (actual time=33592.758..33596.758 rows=606 loops=1)
                                                                     Group Key: cs.cs_item_sk
                                                                     ->  Sort (actual time=33592.758..33592.758 rows=627 loops=1)
                                                                           Sort Key: cs.cs_item_sk
                                                                           Sort Method:  quicksort  Memory: 229kB
                                                                           ->  Hash Join (actual time=7564.171..33592.758 rows=627 loops=1)
                                                                                 Hash Cond: ((cs.cs_order_number = cr.cr_order_number) AND (cs.cs_item_sk = cr.cr_item_sk))
                                                                                 Extra Text: (seg0)   Hash chain length 1.0 avg, 4 max, using 19910 of 262144 buckets.
                                                                                 ->  Hash Join (actual time=6028.136..31924.720 rows=316758 loops=1)
                                                                                       Hash Cond: (cs.cs_sold_date_sk = date_dim_1.d_date_sk)
                                                                                       Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 31 of 524288 buckets.
                                                                                       ->  Seq Scan on catalog_sales cs (actual time=28.001..26060.588 rows=9329378 loops=1)
                                                                                             Filter: ((cs_net_profit > '1'::numeric) AND (cs_net_paid > '0'::numeric) AND (cs_quantity > 0))
                                                                                       ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                             Buckets: 524288  Batches: 1  Memory Usage: 4098kB
                                                                                             ->  Broadcast Motion 3:3  (slice4; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                   ->  Seq Scan on date_dim date_dim_1 (actual time=8.000..12.000 rows=13 loops=1)
                                                                                                         Filter: ((d_year = 2000) AND (d_moy = 12))
                                                                                 ->  Hash (actual time=1524.034..1524.034 rows=20673 loops=1)
                                                                                       Buckets: 262144  Batches: 1  Memory Usage: 3260kB
                                                                                       ->  Seq Scan on catalog_returns cr (actual time=20.000..1512.034 rows=20673 loops=1)
                                                                                             Filter: (cr_return_amount > '10000'::numeric)
                     ->  Subquery Scan on store (actual time=26400.593..26404.593 rows=21 loops=1)
                           Filter: ((store.return_rank <= 10) OR (store.currency_rank <= 10))
                           Rows Removed by Filter: 612
                           ->  WindowAgg (actual time=26400.593..26404.593 rows=633 loops=1)
                                 Order By: in_store.return_ratio
                                 ->  Sort (actual time=26400.593..26400.593 rows=633 loops=1)
                                       Sort Key: in_store.return_ratio
                                       Sort Method:  quicksort  Memory: 79kB
                                       ->  WindowAgg (actual time=26392.593..26400.593 rows=633 loops=1)
                                             Order By: in_store.currency_ratio
                                             ->  Gather Motion 3:1  (slice5; segments: 3) (actual time=26392.593..26392.593 rows=633 loops=1)
                                                   Merge Key: in_store.currency_ratio
                                                   ->  Sort (actual time=52169.175..52169.175 rows=226 loops=1)
                                                         Sort Key: in_store.currency_ratio
                                                         Sort Method:  quicksort  Memory: 125kB
                                                         ->  Subquery Scan on in_store (actual time=52169.175..52169.175 rows=226 loops=1)
                                                               ->  GroupAggregate (actual time=52169.175..52169.175 rows=226 loops=1)
                                                                     Group Key: sts.ss_item_sk
                                                                     ->  Sort (actual time=52169.175..52169.175 rows=231 loops=1)
                                                                           Sort Key: sts.ss_item_sk
                                                                           Sort Method:  quicksort  Memory: 129kB
                                                                           ->  Hash Join (actual time=3140.071..52165.175 rows=231 loops=1)
                                                                                 Hash Cond: ((sts.ss_ticket_number = sr.sr_ticket_number) AND (sts.ss_item_sk = sr.sr_item_sk))
                                                                                 Extra Text: (seg0)   Hash chain length 1.0 avg, 3 max, using 7664 of 262144 buckets.
                                                                                 ->  Hash Join (actual time=28.001..48901.102 rows=390494 loops=1)
                                                                                       Hash Cond: (sts.ss_sold_date_sk = date_dim_2.d_date_sk)
                                                                                       Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 31 of 524288 buckets.
                                                                                       ->  Seq Scan on store_sales sts (actual time=28.001..51581.161 rows=11508189 loops=1)
                                                                                             Filter: ((ss_net_profit > '1'::numeric) AND (ss_net_paid > '0'::numeric) AND (ss_quantity > 0))
                                                                                       ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                             Buckets: 524288  Batches: 1  Memory Usage: 4098kB
                                                                                             ->  Broadcast Motion 3:3  (slice6; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                   ->  Seq Scan on date_dim date_dim_2 (actual time=12.000..16.000 rows=13 loops=1)
                                                                                                         Filter: ((d_year = 2000) AND (d_moy = 12))
                                                                                 ->  Hash (actual time=4744.107..4744.107 rows=7997 loops=1)
                                                                                       Buckets: 262144  Batches: 1  Memory Usage: 2517kB
                                                                                       ->  Seq Scan on store_returns sr (actual time=28.001..4736.107 rows=7997 loops=1)
                                                                                             Filter: (sr_return_amt > '10000'::numeric)
 Planning Time: 8.318 ms
 * (slice0)    Executor memory: 560K bytes.  Work_mem: 110K bytes max, 48K bytes wanted.
   (slice1)    Executor memory: 38711K bytes avg x 3x(0) workers, 38711K bytes max (seg1).  Work_mem: 4098K bytes max.
   (slice2)    Executor memory: 9556K bytes avg x 3x(0) workers, 9556K bytes max (seg0).
   (slice3)    Executor memory: 39388K bytes avg x 3x(0) workers, 39388K bytes max (seg2).  Work_mem: 4098K bytes max.
   (slice4)    Executor memory: 9556K bytes avg x 3x(0) workers, 9556K bytes max (seg0).
   (slice5)    Executor memory: 38564K bytes avg x 3x(0) workers, 38564K bytes max (seg1).  Work_mem: 4098K bytes max.
   (slice6)    Executor memory: 9556K bytes avg x 3x(0) workers, 9556K bytes max (seg0).
 Memory used:  1048576kB
 Memory wanted:  4304kB
 Optimizer: Postgres query optimizer
 Execution Time: 60039.451 ms
(136 rows)

Q49 new

                                                                                               QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Limit (actual time=28468.665..28468.665 rows=78 loops=1)
   ->  Sort (actual time=28468.665..28468.665 rows=78 loops=1)
         Sort Key: ('web'::text), web.return_rank, web.currency_rank
         Sort Method:  quicksort  Memory: 31kB
         ->  HashAggregate (actual time=28468.665..28468.665 rows=78 loops=1)
               Group Key: ('web'::text), web.item, web.return_ratio, web.return_rank, web.currency_rank
               Batches: 1  Memory Usage: 48kB
               Extra Text: hash table(s): 1; chain length 2.3 avg, 4 max; using 78 of 128 buckets; total 3 expansions.

               ->  Append (actual time=13776.322..28468.665 rows=78 loops=1)
                     ->  Subquery Scan on web (actual time=13776.322..13780.322 rows=28 loops=1)
                           Filter: ((web.return_rank <= 10) OR (web.currency_rank <= 10))
                           Rows Removed by Filter: 884
                           ->  WindowAgg (actual time=13776.322..13780.322 rows=912 loops=1)
                                 Order By: in_web.return_ratio
                                 ->  Sort (actual time=13776.322..13776.322 rows=912 loops=1)
                                       Sort Key: in_web.return_ratio
                                       Sort Method:  quicksort  Memory: 103kB
                                       ->  WindowAgg (actual time=13772.322..13776.322 rows=912 loops=1)
                                             Order By: in_web.currency_ratio
                                             ->  Gather Motion 12:1  (slice1; segments: 12) (actual time=13772.322..13772.322 rows=912 loops=1)
                                                   Merge Key: in_web.currency_ratio
                                                   ->  Sort (actual time=13732.321..13732.321 rows=72 loops=1)
                                                         Sort Key: in_web.currency_ratio
                                                         Sort Method:  quicksort  Memory: 367kB
                                                         ->  Subquery Scan on in_web (actual time=13732.321..13732.321 rows=72 loops=1)
                                                               ->  GroupAggregate (actual time=13732.321..13732.321 rows=72 loops=1)
                                                                     Group Key: ws.ws_item_sk
                                                                     ->  Sort (actual time=13732.321..13732.321 rows=72 loops=1)
                                                                           Sort Key: ws.ws_item_sk
                                                                           Sort Method:  quicksort  Memory: 375kB
                                                                           ->  Redistribute Motion 12:12  (slice2; segments: 12) (actual time=13688.320..13732.321 rows=72 loops=1)
                                                                                 Hash Key: ws.ws_item_sk
                                                                                 Hash Module: 3
                                                                                 ->  Parallel Hash Join (actual time=12880.301..13076.306 rows=74 loops=1)
                                                                                       Hash Cond: ((wr.wr_order_number = ws.ws_order_number) AND (wr.wr_item_sk = ws.ws_item_sk))
                                                                                       ->  Parallel Seq Scan on web_returns wr (actual time=4.000..220.005 rows=2432 loops=1)
                                                                                             Filter: (wr_return_amt > '10000'::numeric)
                                                                                       ->  Parallel Hash (actual time=13484.316..13484.316 rows=40145 loops=1)
                                                                                             Buckets: 1048576  Batches: 1  Memory Usage: 17120kB
                                                                                             ->  Hash Join (actual time=144.003..12856.301 rows=40145 loops=1)
                                                                                                   Hash Cond: (ws.ws_sold_date_sk = date_dim.d_date_sk)
                                                                                                   Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 31 of 524288 buckets.
                                                                                                   ->  Parallel Seq Scan on web_sales ws (actual time=196.005..6116.143 rows=1167817 loops=1)
                                                                                                         Filter: ((ws_net_profit > '1'::numeric) AND (ws_net_paid > '0'::numeric) AND (ws_quantity > 0))
                                                                                                   ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                         Buckets: 524288  Batches: 1  Memory Usage: 4098kB
                                                                                                         ->  Broadcast Motion 3:12  (slice3; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                               ->  Seq Scan on date_dim (actual time=4.000..4.000 rows=13 loops=1)
                                                                                                                     Filter: ((d_year = 2000) AND (d_moy = 12))
                     ->  Subquery Scan on catalog (actual time=6868.160..6888.161 rows=29 loops=1)
                           Filter: ((catalog.return_rank <= 10) OR (catalog.currency_rank <= 10))
                           Rows Removed by Filter: 1728
                           ->  WindowAgg (actual time=6868.160..6888.161 rows=1757 loops=1)
                                 Order By: in_cat.return_ratio
                                 ->  Sort (actual time=6868.160..6868.160 rows=1757 loops=1)
                                       Sort Key: in_cat.return_ratio
                                       Sort Method:  quicksort  Memory: 200kB
                                       ->  WindowAgg (actual time=6852.160..6864.160 rows=1757 loops=1)
                                             Order By: in_cat.currency_ratio
                                             ->  Gather Motion 12:1  (slice4; segments: 12) (actual time=6852.160..6860.160 rows=1757 loops=1)
                                                   Merge Key: in_cat.currency_ratio
                                                   ->  Sort (actual time=20612.482..20612.482 rows=166 loops=1)
                                                         Sort Key: in_cat.currency_ratio
                                                         Sort Method:  quicksort  Memory: 438kB
                                                         ->  Subquery Scan on in_cat (actual time=20612.482..20612.482 rows=166 loops=1)
                                                               ->  GroupAggregate (actual time=20612.482..20612.482 rows=166 loops=1)
                                                                     Group Key: cs.cs_item_sk
                                                                     ->  Sort (actual time=20612.482..20612.482 rows=173 loops=1)
                                                                           Sort Key: cs.cs_item_sk
                                                                           Sort Method:  quicksort  Memory: 450kB
                                                                           ->  Redistribute Motion 12:12  (slice5; segments: 12) (actual time=19332.452..20612.482 rows=173 loops=1)
                                                                                 Hash Key: cs.cs_item_sk
                                                                                 Hash Module: 3
                                                                                 ->  Parallel Hash Join (actual time=19212.449..19300.451 rows=109 loops=1)
                                                                                       Hash Cond: ((cr.cr_order_number = cs.cs_order_number) AND (cr.cr_item_sk = cs.cs_item_sk))
                                                                                       ->  Parallel Seq Scan on catalog_returns cr (actual time=8.000..372.009 rows=5336 loops=1)
                                                                                             Filter: (cr_return_amount > '10000'::numeric)
                                                                                       ->  Parallel Hash (actual time=19604.458..19604.458 rows=82094 loops=1)
                                                                                             Buckets: 1048576  Batches: 1  Memory Usage: 25824kB
                                                                                             ->  Hash Join (actual time=15072.353..18756.439 rows=82094 loops=1)
                                                                                                   Hash Cond: (cs.cs_sold_date_sk = date_dim_1.d_date_sk)
                                                                                                   Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 31 of 524288 buckets.
                                                                                                   ->  Parallel Seq Scan on catalog_sales cs (actual time=52.001..10884.255 rows=2339330 loops=1)
                                                                                                         Filter: ((cs_net_profit > '1'::numeric) AND (cs_net_paid > '0'::numeric) AND (cs_quantity > 0))
                                                                                                   ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                         Buckets: 524288  Batches: 1  Memory Usage: 4098kB
                                                                                                         ->  Broadcast Motion 3:12  (slice6; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                               ->  Seq Scan on date_dim date_dim_1 (actual time=4.000..4.000 rows=13 loops=1)
                                                                                                                     Filter: ((d_year = 2000) AND (d_moy = 12))
                     ->  Subquery Scan on store (actual time=7796.182..7796.182 rows=21 loops=1)
                           Filter: ((store.return_rank <= 10) OR (store.currency_rank <= 10))
                           Rows Removed by Filter: 612
                           ->  WindowAgg (actual time=7796.182..7796.182 rows=633 loops=1)
                                 Order By: in_store.return_ratio
                                 ->  Sort (actual time=7796.182..7796.182 rows=633 loops=1)
                                       Sort Key: in_store.return_ratio
                                       Sort Method:  quicksort  Memory: 79kB
                                       ->  WindowAgg (actual time=7796.182..7796.182 rows=633 loops=1)
                                             Order By: in_store.currency_ratio
                                             ->  Gather Motion 12:1  (slice7; segments: 12) (actual time=7796.182..7796.182 rows=633 loops=1)
                                                   Merge Key: in_store.currency_ratio
                                                   ->  Sort (actual time=28448.665..28448.665 rows=60 loops=1)
                                                         Sort Key: in_store.currency_ratio
                                                         Sort Method:  quicksort  Memory: 346kB
                                                         ->  Subquery Scan on in_store (actual time=28448.665..28448.665 rows=60 loops=1)
                                                               ->  GroupAggregate (actual time=28448.665..28448.665 rows=60 loops=1)
                                                                     Group Key: sts.ss_item_sk
                                                                     ->  Sort (actual time=28448.665..28448.665 rows=60 loops=1)
                                                                           Sort Key: sts.ss_item_sk
                                                                           Sort Method:  quicksort  Memory: 351kB
                                                                           ->  Redistribute Motion 12:12  (slice8; segments: 12) (actual time=25280.591..28448.665 rows=60 loops=1)
                                                                                 Hash Key: sts.ss_item_sk
                                                                                 Hash Module: 3
                                                                                 ->  Parallel Hash Join (actual time=26736.625..27640.646 rows=67 loops=1)
                                                                                       Hash Cond: ((sr.sr_ticket_number = sts.ss_ticket_number) AND (sr.sr_item_sk = sts.ss_item_sk))
                                                                                       ->  Parallel Seq Scan on store_returns sr (actual time=12.000..880.021 rows=1819 loops=1)
                                                                                             Filter: (sr_return_amt > '10000'::numeric)
                                                                                       ->  Parallel Hash (actual time=26716.624..26716.624 rows=96830 loops=1)
                                                                                             Buckets: 1048576  Batches: 1  Memory Usage: 29760kB
                                                                                             ->  Hash Join (actual time=136.003..23280.544 rows=96830 loops=1)
                                                                                                   Hash Cond: (sts.ss_sold_date_sk = date_dim_2.d_date_sk)
                                                                                                   Extra Text: (seg0)   Hash chain length 1.0 avg, 1 max, using 31 of 524288 buckets.
                                                                                                   ->  Parallel Seq Scan on store_sales sts (actual time=120.003..18128.424 rows=2865268 loops=1)
                                                                                                         Filter: ((ss_net_profit > '1'::numeric) AND (ss_net_paid > '0'::numeric) AND (ss_quantity > 0))
                                                                                                   ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                         Buckets: 524288  Batches: 1  Memory Usage: 4098kB
                                                                                                         ->  Broadcast Motion 3:12  (slice9; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                                                               ->  Seq Scan on date_dim date_dim_2 (actual time=8.000..8.000 rows=13 loops=1)
                                                                                                                     Filter: ((d_year = 2000) AND (d_moy = 12))
 Planning Time: 8.620 ms
 * (slice0)    Executor memory: 673K bytes.  Work_mem: 110K bytes max, 48K bytes wanted.
   (slice1)    Executor memory: 234K bytes avg x 12x(0) workers, 275K bytes max (seg2).  Work_mem: 7K bytes max.
   (slice2)    Executor memory: 31909K bytes avg x 12x(0) workers, 31909K bytes max (seg1).  Work_mem: 4098K bytes max.
   (slice3)    Executor memory: 9557K bytes avg x 3x(0) workers, 9557K bytes max (seg0).
   (slice4)    Executor memory: 279K bytes avg x 12x(0) workers, 280K bytes max (seg2).  Work_mem: 11K bytes max.
   (slice5)    Executor memory: 31913K bytes avg x 12x(0) workers, 31913K bytes max (seg2).  Work_mem: 4098K bytes max.
   (slice6)    Executor memory: 9557K bytes avg x 3x(0) workers, 9557K bytes max (seg0).
   (slice7)    Executor memory: 273K bytes avg x 12x(0) workers, 274K bytes max (seg1).  Work_mem: 5K bytes max.
   (slice8)    Executor memory: 31890K bytes avg x 12x(0) workers, 31890K bytes max (seg1).  Work_mem: 4098K bytes max.
   (slice9)    Executor memory: 9557K bytes avg x 3x(0) workers, 9557K bytes max (seg0).
 Memory used:  1048576kB
 Memory wanted:  4604kB
 Optimizer: Postgres query optimizer
 Execution Time: 28603.506 ms
(145 rows)

@avamingli
Copy link
Contributor Author

Fix cases where window function operations that couldn't be parallel before can now be parallel in CBDB, such as window functions within subqueries.

In UPSTREAM select_parallel.sql

-- Window function calculation can't be pushed to workers.

explain (costs off, verbose)
  select count(*) from tenk1 a where (unique1, two) in
    (select unique1, row_number() over() from tenk1 b);
                                                                                               QUERY PLAN                                                                                                
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate
   Output: count(*)
   ->  Gather Motion 6:1  (slice1; segments: 6)
         Output: (PARTIAL count(*))
         ->  Partial Aggregate
               Output: PARTIAL count(*)
               ->  Parallel Hash Semi Join
                     Hash Cond: ((a.unique1 = b.unique1) AND (a.two = (row_number() OVER (?))))
                     ->  Parallel Seq Scan on public.tenk1 a
                           Output: a.unique1, a.unique2, a.two, a.four, a.ten, a.twenty, a.hundred, a.thousand, a.twothousand, a.fivethous, a.tenthous, a.odd, a.even, a.stringu1, a.stringu2, a.string4
                     ->  Parallel Hash
                           Output: b.unique1, (row_number() OVER (?))
                           ->  Redistribute Motion 1:6  (slice2; segments: 1)
                                 Output: b.unique1, (row_number() OVER (?))
                                 Hash Key: b.unique1
                                 Hash Module: 3
                                 ->  WindowAgg
                                       Output: b.unique1, row_number() OVER (?)
                                       ->  Gather Motion 6:1  (slice3; segments: 6)
                                             Output: b.unique1
                                             ->  Parallel Seq Scan on public.tenk1 b
                                                   Output: b.unique1
 Settings: enable_parallel = 'on', min_parallel_table_scan_size = '0', optimizer = 'off', parallel_setup_cost = '0', parallel_tuple_cost = '0'
 Optimizer: Postgres query optimizer
(24 rows)

@avamingli
Copy link
Contributor Author

TPC-DS Q98

Q98 old
                                                                                              QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3) (actual time=40068.731..40084.732 rows=9145 loops=1)
   Merge Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(store_sales.ss_ext_sales_price)) * '100'::numeric) / sum((sum(store_sales.ss_ext_sales_price))) OVER (?)))
   ->  Sort (actual time=40064.731..40064.731 rows=3223 loops=1)
         Sort Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(store_sales.ss_ext_sales_price)) * '100'::numeric) / sum((sum(store_sales.ss_ext_sales_price))) OVER (?)))
         Sort Method:  quicksort  Memory: 2781kB
         ->  WindowAgg (actual time=40036.731..40048.731 rows=3223 loops=1)
               Partition By: item.i_class
               ->  Sort (actual time=40036.731..40036.731 rows=3223 loops=1)
                     Sort Key: item.i_class
                     Sort Method:  quicksort  Memory: 2556kB
                     ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=39732.725..40032.731 rows=3223 loops=1)
                           Hash Key: item.i_class
                           ->  GroupAggregate (actual time=39716.725..40024.731 rows=3121 loops=1)
                                 Group Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                 ->  Sort (actual time=39716.725..39764.726 rows=135884 loops=1)
                                       Sort Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                       Sort Method:  quicksort  Memory: 108627kB
                                       ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=112.002..38672.706 rows=135884 loops=1)
                                             Hash Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, item.i_current_price
                                             ->  Hash Join (actual time=80.001..38264.698 rows=135710 loops=1)
                                                   Hash Cond: (store_sales.ss_item_sk = item.i_item_sk)
                                                   Extra Text: (seg1)   Hash chain length 1.0 avg, 2 max, using 6147 of 262144 buckets.
                                                   ->  Hash Join (actual time=36.001..37884.691 rows=452779 loops=1)
                                                         Hash Cond: (store_sales.ss_sold_date_sk = date_dim.d_date_sk)
                                                         Extra Text: (seg1)   Hash chain length 1.0 avg, 1 max, using 31 of 1048576 buckets.
                                                         ->  Seq Scan on store_sales (actual time=20.000..28016.511 rows=48321834 loops=1)
                                                         ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                               Buckets: 1048576  Batches: 1  Memory Usage: 8194kB
                                                               ->  Broadcast Motion 3:3  (slice4; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                     ->  Seq Scan on date_dim (actual time=8.000..12.000 rows=14 loops=1)
                                                                           Filter: ((d_date >= '1998-03-26'::date) AND (d_date <= '1998-04-25 00:00:00'::timestamp without time zone))
                                                   ->  Hash (actual time=40.001..40.001 rows=6218 loops=1)
                                                         Buckets: 262144  Batches: 1  Memory Usage: 3133kB
                                                         ->  Seq Scan on item (actual time=16.000..40.001 rows=6218 loops=1)
                                                               Filter: ((i_category)::text = ANY ('{Home,Women,Men}'::text[]))
 Planning Time: 3.490 ms
   (slice0)    Executor memory: 176K bytes.
   (slice1)    Executor memory: 1543K bytes avg x 3x(0) workers, 1649K bytes max (seg0).  Work_mem: 814K bytes max.
   (slice2)    Executor memory: 33250K bytes avg x 3x(0) workers, 35754K bytes max (seg2).  Work_mem: 29469K bytes max.
   (slice3)    Executor memory: 41218K bytes avg x 3x(0) workers, 41228K bytes max (seg1).  Work_mem: 8194K bytes max.
   (slice4)    Executor memory: 6413K bytes avg x 3x(0) workers, 6413K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 40114.212 ms
(44 rows)

Q98 new

                                                                                              QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3) (actual time=17012.314..17028.314 rows=9145 loops=1)
   Merge Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(store_sales.ss_ext_sales_price)) * '100'::numeric) / sum((sum(store_sales.ss_ext_sales_price))) OVER (?)))
   ->  Sort (actual time=17008.314..17008.314 rows=3223 loops=1)
         Sort Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, ((((sum(store_sales.ss_ext_sales_price)) * '100'::numeric) / sum((sum(store_sales.ss_ext_sales_price))) OVER (?)))
         Sort Method:  quicksort  Memory: 2781kB
         ->  WindowAgg (actual time=16980.313..16988.313 rows=3223 loops=1)
               Partition By: item.i_class
               ->  Sort (actual time=16980.313..16980.313 rows=3223 loops=1)
                     Sort Key: item.i_class
                     Sort Method:  quicksort  Memory: 2556kB
                     ->  Redistribute Motion 3:3  (slice2; segments: 3) (actual time=16520.305..16980.313 rows=3223 loops=1)
                           Hash Key: item.i_class
                           ->  GroupAggregate (actual time=16528.305..16744.309 rows=3121 loops=1)
                                 Group Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                 ->  Sort (actual time=16528.305..16564.305 rows=135884 loops=1)
                                       Sort Key: item.i_item_id, item.i_item_desc, item.i_category, item.i_class, item.i_current_price
                                       Sort Method:  quicksort  Memory: 108627kB
                                       ->  Redistribute Motion 3:3  (slice3; segments: 3) (actual time=104.002..15684.289 rows=135884 loops=1)
                                             Hash Key: item.i_category, item.i_class, item.i_item_id, item.i_item_desc, item.i_current_price
                                             ->  Hash Join (actual time=208.004..15252.281 rows=135710 loops=1)
                                                   Hash Cond: (store_sales.ss_item_sk = item.i_item_sk)
                                                   Extra Text: (seg1)   Hash chain length 1.0 avg, 2 max, using 6147 of 262144 buckets.
                                                   ->  Redistribute Motion 12:3  (slice4; segments: 12) (actual time=0.000..14548.268 rows=452779 loops=1)
                                                         Hash Key: store_sales.ss_item_sk
                                                         ->  Hash Join (actual time=20.000..12688.234 rows=113983 loops=1)
                                                               Hash Cond: (store_sales.ss_sold_date_sk = date_dim.d_date_sk)
                                                               Extra Text: (seg1)   Hash chain length 1.0 avg, 1 max, using 31 of 1048576 buckets.
                                                               ->  Parallel Seq Scan on store_sales (actual time=36.001..8408.155 rows=12100000 loops=1)
                                                               ->  Hash (actual time=0.000..0.000 rows=31 loops=1)
                                                                     Buckets: 1048576  Batches: 1  Memory Usage: 8194kB
                                                                     ->  Broadcast Motion 3:12  (slice5; segments: 3) (actual time=0.000..0.000 rows=31 loops=1)
                                                                           ->  Seq Scan on date_dim (actual time=8.000..12.000 rows=14 loops=1)
                                                                                 Filter: ((d_date >= '1998-03-26'::date) AND (d_date <= '1998-04-25 00:00:00'::timestamp without time zone))
                                                   ->  Hash (actual time=208.004..208.004 rows=6218 loops=1)
                                                         Buckets: 262144  Batches: 1  Memory Usage: 3133kB
                                                         ->  Seq Scan on item (actual time=16.000..112.002 rows=6218 loops=1)
                                                               Filter: ((i_category)::text = ANY ('{Home,Women,Men}'::text[]))
 Planning Time: 2.730 ms
   (slice0)    Executor memory: 181K bytes.
   (slice1)    Executor memory: 1543K bytes avg x 3x(0) workers, 1649K bytes max (seg0).  Work_mem: 814K bytes max.
   (slice2)    Executor memory: 33250K bytes avg x 3x(0) workers, 35754K bytes max (seg2).  Work_mem: 29469K bytes max.
   (slice3)    Executor memory: 23409K bytes avg x 3x(0) workers, 23420K bytes max (seg0).  Work_mem: 3133K bytes max.
   (slice4)    Executor memory: 17881K bytes avg x 12x(0) workers, 17882K bytes max (seg2).  Work_mem: 8194K bytes max.
   (slice5)    Executor memory: 6413K bytes avg x 3x(0) workers, 6413K bytes max (seg0).
 Memory used:  1048576kB
 Optimizer: Postgres query optimizer
 Execution Time: 17052.781 ms
(47 rows)

Copy link
Contributor

@my-ship-it my-ship-it left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

PostgreSQL's parallel processing cannot handle window functions. In
contrast, our distributed environment enables parallel execution of
window functions across multiple processes on multiple segments.
For example:
  sum(a) over(partition by b order by c)
The window function can be processed by redistributing data
based on column b to ensure all rows with the same b value are processed
by the same worker, significantly improving efficiency.

Even without PARTITION BY clauses, we can still enable parallelism by
allowing partial_path for window functions and subpaths, with parallel
scanning of underlying tables for data filtering.

Exclude CASE WHEN expressions in window functions (as they
complicate parallelization and make it difficult to guarantee correct
data ordering)

Example non-parallel execution plan:
SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS
(PARTITION BY depname ORDER BY salary DESC);
                  QUERY PLAN
----------------------------------------------
 Gather Motion 3:1  (slice1; segments: 3)
   ->  WindowAgg
         Partition By: depname
         Order By: salary
         ->  Sort
               Sort Key: depname, salary DESC
               ->  Seq Scan on empsalary

Parallel execution plan (4-parallel):
SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS
(PARTITION BY depname ORDER BY salary DESC);
                             QUERY PLAN
---------------------------------------------------------------------
 Gather Motion 12:1  (slice1; segments: 12)
   ->  WindowAgg
         Partition By: depname
         Order By: salary
         ->  Sort
               Sort Key: depname, salary DESC
               ->  Redistribute Motion 12:12  (slice2; segments: 12)
                     Hash Key: depname
                     Hash Module: 3
                     ->  Parallel Seq Scan on empsalary

In complex queries containing window functions, parallel processing may
sometimes be inhibited due to cost considerations or other constraints.
However, our approach still provides valuable parallelization
opportunities for window function subpaths, delivering measurable query
efficiency improvements. We have observed significant performance gains
in TPC-DS benchmarks through this partial parallelization capability.

TPC-DS queries via parallel execution plans (50G AOCS, 4 workers):

| Query | Before(ms) | After(ms) | Saved(ms) | Gain  | Plan Change     |
|-------|-----------:|----------:|----------:|------:|-----------------|
| q12   |  10,439.08 |  4,613.52 |  5,825.56 | 55.8% | serial→parallel |
| q20   |  21,487.08 |  8,723.74 | 12,763.34 | 59.4% | serial→parallel |
| q44   |  33,816.75 | 22,515.03 | 11,301.72 | 33.4% | better parallel |
| q49   |  60,039.45 | 28,603.51 | 31,435.95 | 52.4% | serial→parallel |
| q98   |  40,114.21 | 17,052.78 | 23,061.43 | 57.5% | serial→parallel |

changes:
- Enabled parallel plans for q12/q20/q49/q98 (prev. serial)
- Optimized parallel plan for q44
- Avg gain: 52% (best: q20 59.4%, saved 12.7s)

Authored-by: Zhang Mingli [email protected]
@avamingli avamingli merged commit b8c6a63 into apache:main Aug 4, 2025
27 checks passed
@avamingli avamingli deleted the dev branch August 4, 2025 08:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

planner type: Performance cloudberry runs slow on some particular query

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants