Skip to content

Commit 653c95d

Browse files
committed
Implement parallel processing for window functions.
PostgreSQL's parallel processing cannot handle window functions. In contrast, our distributed environment enables parallel execution of window functions across multiple processes on multiple segments. For example: sum(a) over(partition by b order by c) The window function can be processed by redistributing data based on column b to ensure all rows with the same b value are processed by the same worker, significantly improving efficiency. Even without PARTITION BY clauses, we can still enable parallelism by allowing partial_path for window functions and subpaths, with parallel scanning of underlying tables for data filtering. Exclude CASE WHEN expressions in window functions (as they complicate parallelization and make it difficult to guarantee correct data ordering) Example non-parallel execution plan: SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS (PARTITION BY depname ORDER BY salary DESC); QUERY PLAN ---------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) -> WindowAgg Partition By: depname Order By: salary -> Sort Sort Key: depname, salary DESC -> Seq Scan on empsalary Parallel execution plan (4-parallel): SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS (PARTITION BY depname ORDER BY salary DESC); QUERY PLAN --------------------------------------------------------------------- Gather Motion 12:1 (slice1; segments: 12) -> WindowAgg Partition By: depname Order By: salary -> Sort Sort Key: depname, salary DESC -> Redistribute Motion 12:12 (slice2; segments: 12) Hash Key: depname Hash Module: 3 -> Parallel Seq Scan on empsalary In complex queries containing window functions, parallel processing may sometimes be inhibited due to cost considerations or other constraints. However, our approach still provides valuable parallelization opportunities for window function subpaths, delivering measurable query efficiency improvements. We have observed significant performance gains in TPC-DS benchmarks through this partial parallelization capability. TPC-DS queries via parallel execution plans (50G AOCS, 4 workers): | Query | Before(ms) | After(ms) | Saved(ms) | Gain | Plan Change | |-------|-----------:|----------:|----------:|------:|-----------------| | q12 | 10,439.08 | 4,613.52 | 5,825.56 | 55.8% | serial→parallel | | q20 | 21,487.08 | 8,723.74 | 12,763.34 | 59.4% | serial→parallel | | q44 | 33,816.75 | 22,515.03 | 11,301.72 | 33.4% | better parallel | | q49 | 60,039.45 | 28,603.51 | 31,435.95 | 52.4% | serial→parallel | | q98 | 40,114.21 | 17,052.78 | 23,061.43 | 57.5% | serial→parallel | changes: - Enabled parallel plans for q12/q20/q49/q98 (prev. serial) - Optimized parallel plan for q44 - Avg gain: 52% (best: q20 59.4%, saved 12.7s) Authored-by: Zhang Mingli [email protected]
1 parent 793971e commit 653c95d

16 files changed

+1813
-535
lines changed

src/backend/cdb/cdbgroupingpaths.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2651,7 +2651,7 @@ cdb_prepare_path_for_sorted_agg(PlannerInfo *root,
26512651
return subpath;
26522652
}
26532653

2654-
if (is_sorted && group_pathkeys)
2654+
if (is_sorted && group_pathkeys && (subpath->locus.parallel_workers <= 1))
26552655
{
26562656
/*
26572657
* The input is already conveniently sorted. We could redistribute
@@ -2665,7 +2665,8 @@ cdb_prepare_path_for_sorted_agg(PlannerInfo *root,
26652665
group_pathkeys,
26662666
false, locus);
26672667
}
2668-
else if (!is_sorted && group_pathkeys)
2668+
else if ((is_sorted && group_pathkeys && (subpath->locus.parallel_workers > 1)) ||
2669+
(!is_sorted && group_pathkeys))
26692670
{
26702671
/*
26712672
* If we need to redistribute, it's usually best to redistribute

src/backend/cdb/cdbpath.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3661,7 +3661,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
36613661
int sp; /* small rel parallel workers */
36623662

36633663
/* Consider locus when parallel_ware. */
3664-
if(parallel_aware)
3664+
if (parallel_aware)
36653665
{
36663666
/* can't parallel join if both are Hashed, it should be in non-parallel path */
36673667
if (CdbPathLocus_IsHashed(outer.locus) &&

src/backend/optimizer/plan/planner.c

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,20 @@ static split_rollup_data *make_new_rollups_for_hash_grouping_set(PlannerInfo *ro
300300
Path *path,
301301
grouping_sets_data *gd);
302302

303+
static bool
304+
contain_case_expr(Node *clause);
305+
306+
static bool
307+
contain_case_expr_walker(Node *node, void *context);
308+
309+
static void create_partial_window_path(PlannerInfo *root,
310+
RelOptInfo *window_rel,
311+
Path *path,
312+
PathTarget *input_target,
313+
PathTarget *output_target,
314+
WindowFuncLists *wflists,
315+
List *activeWindows);
316+
303317

304318
/*****************************************************************************
305319
*
@@ -4876,6 +4890,27 @@ create_window_paths(PlannerInfo *root,
48764890
activeWindows);
48774891
}
48784892

4893+
/*
4894+
* Unlike Upstream, we could make window function parallel by redistributing
4895+
* the tuples according to the PARTITION BY clause which is similar to Group By.
4896+
* Even there is no PARTITION BY, window function could be parallel from
4897+
* sub partial paths.
4898+
*/
4899+
if (window_rel->consider_parallel &&
4900+
input_rel->partial_pathlist)
4901+
{
4902+
/* For partial, only the best one if enough. */
4903+
Path *path = (Path *) linitial(input_rel->partial_pathlist);
4904+
4905+
create_partial_window_path(root,
4906+
window_rel,
4907+
path,
4908+
input_target,
4909+
output_target,
4910+
wflists,
4911+
activeWindows);
4912+
}
4913+
48794914
/*
48804915
* If there is an FDW that's responsible for all baserels of the query,
48814916
* let it consider adding ForeignPaths.
@@ -9103,3 +9138,133 @@ make_new_rollups_for_hash_grouping_set(PlannerInfo *root,
91039138

91049139
return srd;
91059140
}
9141+
9142+
static bool
9143+
contain_case_expr(Node *clause)
9144+
{
9145+
return contain_case_expr_walker(clause, NULL);
9146+
}
9147+
9148+
static bool
9149+
contain_case_expr_walker(Node *node, void *context)
9150+
{
9151+
if (node == NULL)
9152+
return false;
9153+
9154+
if (IsA(node, CaseExpr))
9155+
return true;
9156+
9157+
return expression_tree_walker(node, contain_case_expr_walker,
9158+
context);
9159+
}
9160+
9161+
/*
9162+
* Parallel processing of window functions.
9163+
*
9164+
* NB: it may produce non-deterministic results if the window function
9165+
* lacks ORDER BY and PARTITION BY clause.
9166+
* SQL:2011 has clarified this behavior.
9167+
*/
9168+
static void
9169+
create_partial_window_path(PlannerInfo *root,
9170+
RelOptInfo *window_rel,
9171+
Path *path,
9172+
PathTarget *input_target,
9173+
PathTarget *output_target,
9174+
WindowFuncLists *wflists,
9175+
List *activeWindows)
9176+
{
9177+
PathTarget *window_target;
9178+
ListCell *l;
9179+
Bitmapset *sgrefs;
9180+
9181+
window_target = input_target;
9182+
9183+
sgrefs = NULL;
9184+
9185+
foreach(l, activeWindows)
9186+
{
9187+
WindowClause *wc = lfirst_node(WindowClause, l);
9188+
ListCell *lc2;
9189+
9190+
foreach(lc2, wc->partitionClause)
9191+
{
9192+
SortGroupClause *sortcl = lfirst_node(SortGroupClause, lc2);
9193+
9194+
sgrefs = bms_add_member(sgrefs, sortcl->tleSortGroupRef);
9195+
}
9196+
foreach(lc2, wc->orderClause)
9197+
{
9198+
SortGroupClause *sortcl = lfirst_node(SortGroupClause, lc2);
9199+
9200+
sgrefs = bms_add_member(sgrefs, sortcl->tleSortGroupRef);
9201+
}
9202+
}
9203+
9204+
int x = -1;
9205+
while ((x = bms_next_member(sgrefs, x)) >= 0)
9206+
{
9207+
Index sgref = get_pathtarget_sortgroupref(input_target, x);
9208+
if (sgref != 0)
9209+
{
9210+
ListCell *lc;
9211+
foreach(lc, input_target->exprs)
9212+
{
9213+
Expr *expr = (Expr *) lfirst(lc);
9214+
if (contain_case_expr((Node*)expr))
9215+
return;
9216+
}
9217+
}
9218+
}
9219+
9220+
foreach(l, activeWindows)
9221+
{
9222+
WindowClause *wc = lfirst_node(WindowClause, l);
9223+
List *window_pathkeys;
9224+
int presorted_keys;
9225+
bool is_sorted;
9226+
9227+
window_pathkeys = make_pathkeys_for_window(root,
9228+
wc,
9229+
root->processed_tlist);
9230+
9231+
is_sorted = pathkeys_count_contained_in(window_pathkeys,
9232+
path->pathkeys,
9233+
&presorted_keys);
9234+
9235+
path = cdb_prepare_path_for_sorted_agg(root,
9236+
is_sorted,
9237+
presorted_keys,
9238+
window_rel,
9239+
path,
9240+
path->pathtarget,
9241+
window_pathkeys,
9242+
-1.0,
9243+
wc->partitionClause,
9244+
NIL);
9245+
if (lnext(activeWindows, l))
9246+
{
9247+
ListCell *lc2;
9248+
9249+
window_target = copy_pathtarget(window_target);
9250+
foreach(lc2, wflists->windowFuncs[wc->winref])
9251+
{
9252+
WindowFunc *wfunc = lfirst_node(WindowFunc, lc2);
9253+
9254+
add_column_to_pathtarget(window_target, (Expr *) wfunc, 0);
9255+
window_target->width += get_typavgwidth(wfunc->wintype, -1);
9256+
}
9257+
}
9258+
else
9259+
{
9260+
window_target = output_target;
9261+
}
9262+
9263+
path = (Path *)
9264+
create_windowagg_path(root, window_rel, path, window_target,
9265+
wflists->windowFuncs[wc->winref],
9266+
wc);
9267+
}
9268+
9269+
add_partial_path(window_rel, path);
9270+
}

src/backend/optimizer/util/clauses.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,13 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
807807
*/
808808
else if (IsA(node, WindowFunc))
809809
{
810-
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
810+
/*
811+
* In Cloudberry, we proess window fuctions by redistributeing the tuples
812+
* if there is Partition By clause.
813+
* Each partition is processed individually, whether in a single process
814+
* or distributed parallel workers setup.
815+
*/
816+
if (max_parallel_hazard_test(PROPARALLEL_SAFE, context))
811817
return true;
812818
}
813819

src/test/regress/expected/cluster.out

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,9 @@ set enable_indexscan = off;
467467
set maintenance_work_mem = '1MB';
468468
cluster clstr_4 using cluster_sort;
469469
select * from
470-
(select hundred, lag(hundred) over () as lhundred,
471-
thousand, lag(thousand) over () as lthousand,
472-
tenthous, lag(tenthous) over () as ltenthous from clstr_4) ss
470+
(select hundred, lag(hundred) over (order by hundred) as lhundred,
471+
thousand, lag(thousand) over (order by hundred) as lthousand,
472+
tenthous, lag(tenthous) over (order by hundred) as ltenthous from clstr_4) ss
473473
where row(hundred, thousand, tenthous) <= row(lhundred, lthousand, ltenthous);
474474
hundred | lhundred | thousand | lthousand | tenthous | ltenthous
475475
---------+----------+----------+-----------+----------+-----------

src/test/regress/expected/select_parallel.out

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1056,35 +1056,37 @@ select count(*) from tenk1;
10561056
reset force_parallel_mode;
10571057
reset role;
10581058
-- Window function calculation can't be pushed to workers.
1059+
-- CBDB_PARALLEL: window function's subpath could be parallel.
10591060
explain (costs off, verbose)
10601061
select count(*) from tenk1 a where (unique1, two) in
10611062
(select unique1, row_number() over() from tenk1 b);
10621063
QUERY PLAN
10631064
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
10641065
Finalize Aggregate
10651066
Output: count(*)
1066-
-> Gather Motion 3:1 (slice1; segments: 3)
1067+
-> Gather Motion 6:1 (slice1; segments: 6)
10671068
Output: (PARTIAL count(*))
10681069
-> Partial Aggregate
10691070
Output: PARTIAL count(*)
1070-
-> Hash Semi Join
1071+
-> Parallel Hash Semi Join
10711072
Hash Cond: ((a.unique1 = b.unique1) AND (a.two = (row_number() OVER (?))))
1072-
-> Seq Scan on public.tenk1 a
1073+
-> Parallel Seq Scan on public.tenk1 a
10731074
Output: a.unique1, a.unique2, a.two, a.four, a.ten, a.twenty, a.hundred, a.thousand, a.twothousand, a.fivethous, a.tenthous, a.odd, a.even, a.stringu1, a.stringu2, a.string4
1074-
-> Hash
1075+
-> Parallel Hash
10751076
Output: b.unique1, (row_number() OVER (?))
1076-
-> Redistribute Motion 1:3 (slice2; segments: 1)
1077+
-> Redistribute Motion 1:6 (slice2; segments: 1)
10771078
Output: b.unique1, (row_number() OVER (?))
10781079
Hash Key: b.unique1
1080+
Hash Module: 3
10791081
-> WindowAgg
10801082
Output: b.unique1, row_number() OVER (?)
1081-
-> Gather Motion 3:1 (slice3; segments: 3)
1083+
-> Gather Motion 6:1 (slice3; segments: 6)
10821084
Output: b.unique1
1083-
-> Seq Scan on public.tenk1 b
1085+
-> Parallel Seq Scan on public.tenk1 b
10841086
Output: b.unique1
10851087
Settings: enable_parallel = 'on', min_parallel_table_scan_size = '0', optimizer = 'off', parallel_setup_cost = '0', parallel_tuple_cost = '0'
10861088
Optimizer: Postgres query optimizer
1087-
(23 rows)
1089+
(24 rows)
10881090

10891091
-- LIMIT/OFFSET within sub-selects can't be pushed to workers.
10901092
explain (costs off)

src/test/regress/expected/statement_mem_for_windowagg.out

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ CREATE TABLE dummy_table(x int, y int) DISTRIBUTED BY (y);
22
INSERT INTO dummy_table SELECT generate_series(0, 20000), 0;
33
INSERT INTO dummy_table SELECT generate_series(0, 20000), 3;
44
INSERT INTO dummy_table SELECT generate_series(0, 20000), 10;
5+
set enable_parallel = off;
56
-- 1. Test that if we set statement_mem to a larger value, the tuplestore
67
-- for caching the tuples in partition used in WindowAgg is able to be fitted
78
-- in memory.
@@ -172,5 +173,6 @@ SELECT gp_inject_fault_infinite('distinct_winagg_perform_sort', 'reset', dbid)
172173
(3 rows)
173174

174175
-- Do some clean-ups.
176+
reset enable_parallel;
175177
DROP TABLE dummy_table;
176178
RESET statement_mem;

0 commit comments

Comments
 (0)