Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/backend/cdb/cdbgroupingpaths.c
Original file line number Diff line number Diff line change
Expand Up @@ -2651,7 +2651,7 @@ cdb_prepare_path_for_sorted_agg(PlannerInfo *root,
return subpath;
}

if (is_sorted && group_pathkeys)
if (is_sorted && group_pathkeys && (subpath->locus.parallel_workers <= 1))
{
/*
* The input is already conveniently sorted. We could redistribute
Expand All @@ -2665,7 +2665,8 @@ cdb_prepare_path_for_sorted_agg(PlannerInfo *root,
group_pathkeys,
false, locus);
}
else if (!is_sorted && group_pathkeys)
else if ((is_sorted && group_pathkeys && (subpath->locus.parallel_workers > 1)) ||
(!is_sorted && group_pathkeys))
{
/*
* If we need to redistribute, it's usually best to redistribute
Expand Down
2 changes: 1 addition & 1 deletion src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -3661,7 +3661,7 @@ cdbpath_motion_for_parallel_join(PlannerInfo *root,
int sp; /* small rel parallel workers */

/* Consider locus when parallel_ware. */
if(parallel_aware)
if (parallel_aware)
{
/* can't parallel join if both are Hashed, it should be in non-parallel path */
if (CdbPathLocus_IsHashed(outer.locus) &&
Expand Down
165 changes: 165 additions & 0 deletions src/backend/optimizer/plan/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,20 @@ static split_rollup_data *make_new_rollups_for_hash_grouping_set(PlannerInfo *ro
Path *path,
grouping_sets_data *gd);

static bool
contain_case_expr(Node *clause);

static bool
contain_case_expr_walker(Node *node, void *context);

static void create_partial_window_path(PlannerInfo *root,
RelOptInfo *window_rel,
Path *path,
PathTarget *input_target,
PathTarget *output_target,
WindowFuncLists *wflists,
List *activeWindows);


/*****************************************************************************
*
Expand Down Expand Up @@ -4876,6 +4890,27 @@ create_window_paths(PlannerInfo *root,
activeWindows);
}

/*
* Unlike Upstream, we could make window function parallel by redistributing
* the tuples according to the PARTITION BY clause which is similar to Group By.
* Even there is no PARTITION BY, window function could be parallel from
* sub partial paths.
*/
if (window_rel->consider_parallel &&
input_rel->partial_pathlist)
{
/* For partial, only the best one if enough. */
Path *path = (Path *) linitial(input_rel->partial_pathlist);

create_partial_window_path(root,
window_rel,
path,
input_target,
output_target,
wflists,
activeWindows);
}

/*
* If there is an FDW that's responsible for all baserels of the query,
* let it consider adding ForeignPaths.
Expand Down Expand Up @@ -9103,3 +9138,133 @@ make_new_rollups_for_hash_grouping_set(PlannerInfo *root,

return srd;
}

static bool
contain_case_expr(Node *clause)
{
return contain_case_expr_walker(clause, NULL);
}

static bool
contain_case_expr_walker(Node *node, void *context)
{
if (node == NULL)
return false;

if (IsA(node, CaseExpr))
return true;

return expression_tree_walker(node, contain_case_expr_walker,
context);
}

/*
* Parallel processing of window functions.
*
* NB: it may produce non-deterministic results if the window function
* lacks ORDER BY and PARTITION BY clause.
* SQL:2011 has clarified this behavior.
*/
static void
create_partial_window_path(PlannerInfo *root,
RelOptInfo *window_rel,
Path *path,
PathTarget *input_target,
PathTarget *output_target,
WindowFuncLists *wflists,
List *activeWindows)
{
PathTarget *window_target;
ListCell *l;
Bitmapset *sgrefs;

window_target = input_target;

sgrefs = NULL;

foreach(l, activeWindows)
{
WindowClause *wc = lfirst_node(WindowClause, l);
ListCell *lc2;

foreach(lc2, wc->partitionClause)
{
SortGroupClause *sortcl = lfirst_node(SortGroupClause, lc2);

sgrefs = bms_add_member(sgrefs, sortcl->tleSortGroupRef);
}
foreach(lc2, wc->orderClause)
{
SortGroupClause *sortcl = lfirst_node(SortGroupClause, lc2);

sgrefs = bms_add_member(sgrefs, sortcl->tleSortGroupRef);
}
}

int x = -1;
while ((x = bms_next_member(sgrefs, x)) >= 0)
{
Index sgref = get_pathtarget_sortgroupref(input_target, x);
if (sgref != 0)
{
ListCell *lc;
foreach(lc, input_target->exprs)
{
Expr *expr = (Expr *) lfirst(lc);
if (contain_case_expr((Node*)expr))
return;
}
}
}

foreach(l, activeWindows)
{
WindowClause *wc = lfirst_node(WindowClause, l);
List *window_pathkeys;
int presorted_keys;
bool is_sorted;

window_pathkeys = make_pathkeys_for_window(root,
wc,
root->processed_tlist);

is_sorted = pathkeys_count_contained_in(window_pathkeys,
path->pathkeys,
&presorted_keys);

path = cdb_prepare_path_for_sorted_agg(root,
is_sorted,
presorted_keys,
window_rel,
path,
path->pathtarget,
window_pathkeys,
-1.0,
wc->partitionClause,
NIL);
if (lnext(activeWindows, l))
{
ListCell *lc2;

window_target = copy_pathtarget(window_target);
foreach(lc2, wflists->windowFuncs[wc->winref])
{
WindowFunc *wfunc = lfirst_node(WindowFunc, lc2);

add_column_to_pathtarget(window_target, (Expr *) wfunc, 0);
window_target->width += get_typavgwidth(wfunc->wintype, -1);
}
}
else
{
window_target = output_target;
}

path = (Path *)
create_windowagg_path(root, window_rel, path, window_target,
wflists->windowFuncs[wc->winref],
wc);
}

add_partial_path(window_rel, path);
}
8 changes: 7 additions & 1 deletion src/backend/optimizer/util/clauses.c
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,13 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context)
*/
else if (IsA(node, WindowFunc))
{
if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context))
/*
* In Cloudberry, we proess window fuctions by redistributeing the tuples
* if there is Partition By clause.
* Each partition is processed individually, whether in a single process
* or distributed parallel workers setup.
*/
if (max_parallel_hazard_test(PROPARALLEL_SAFE, context))
return true;
}

Expand Down
6 changes: 3 additions & 3 deletions src/test/regress/expected/cluster.out
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,9 @@ set enable_indexscan = off;
set maintenance_work_mem = '1MB';
cluster clstr_4 using cluster_sort;
select * from
(select hundred, lag(hundred) over () as lhundred,
thousand, lag(thousand) over () as lthousand,
tenthous, lag(tenthous) over () as ltenthous from clstr_4) ss
(select hundred, lag(hundred) over (order by hundred) as lhundred,
thousand, lag(thousand) over (order by hundred) as lthousand,
tenthous, lag(tenthous) over (order by hundred) as ltenthous from clstr_4) ss
where row(hundred, thousand, tenthous) <= row(lhundred, lthousand, ltenthous);
hundred | lhundred | thousand | lthousand | tenthous | ltenthous
---------+----------+----------+-----------+----------+-----------
Expand Down
18 changes: 10 additions & 8 deletions src/test/regress/expected/select_parallel.out
Original file line number Diff line number Diff line change
Expand Up @@ -1056,35 +1056,37 @@ select count(*) from tenk1;
reset force_parallel_mode;
reset role;
-- Window function calculation can't be pushed to workers.
-- CBDB_PARALLEL: window function's subpath could be parallel.
explain (costs off, verbose)
select count(*) from tenk1 a where (unique1, two) in
(select unique1, row_number() over() from tenk1 b);
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate
Output: count(*)
-> Gather Motion 3:1 (slice1; segments: 3)
-> Gather Motion 6:1 (slice1; segments: 6)
Output: (PARTIAL count(*))
-> Partial Aggregate
Output: PARTIAL count(*)
-> Hash Semi Join
-> Parallel Hash Semi Join
Hash Cond: ((a.unique1 = b.unique1) AND (a.two = (row_number() OVER (?))))
-> Seq Scan on public.tenk1 a
-> Parallel Seq Scan on public.tenk1 a
Output: a.unique1, a.unique2, a.two, a.four, a.ten, a.twenty, a.hundred, a.thousand, a.twothousand, a.fivethous, a.tenthous, a.odd, a.even, a.stringu1, a.stringu2, a.string4
-> Hash
-> Parallel Hash
Output: b.unique1, (row_number() OVER (?))
-> Redistribute Motion 1:3 (slice2; segments: 1)
-> Redistribute Motion 1:6 (slice2; segments: 1)
Output: b.unique1, (row_number() OVER (?))
Hash Key: b.unique1
Hash Module: 3
-> WindowAgg
Output: b.unique1, row_number() OVER (?)
-> Gather Motion 3:1 (slice3; segments: 3)
-> Gather Motion 6:1 (slice3; segments: 6)
Output: b.unique1
-> Seq Scan on public.tenk1 b
-> Parallel Seq Scan on public.tenk1 b
Output: b.unique1
Settings: enable_parallel = 'on', min_parallel_table_scan_size = '0', optimizer = 'off', parallel_setup_cost = '0', parallel_tuple_cost = '0'
Optimizer: Postgres query optimizer
(23 rows)
(24 rows)

-- LIMIT/OFFSET within sub-selects can't be pushed to workers.
explain (costs off)
Expand Down
2 changes: 2 additions & 0 deletions src/test/regress/expected/statement_mem_for_windowagg.out
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE dummy_table(x int, y int) DISTRIBUTED BY (y);
INSERT INTO dummy_table SELECT generate_series(0, 20000), 0;
INSERT INTO dummy_table SELECT generate_series(0, 20000), 3;
INSERT INTO dummy_table SELECT generate_series(0, 20000), 10;
set enable_parallel = off;
-- 1. Test that if we set statement_mem to a larger value, the tuplestore
-- for caching the tuples in partition used in WindowAgg is able to be fitted
-- in memory.
Expand Down Expand Up @@ -172,5 +173,6 @@ SELECT gp_inject_fault_infinite('distinct_winagg_perform_sort', 'reset', dbid)
(3 rows)

-- Do some clean-ups.
reset enable_parallel;
DROP TABLE dummy_table;
RESET statement_mem;
Loading
Loading