Skip to content

Commit 60eb10d

Browse files
committed
ORCA: allow different strategy control the redistribute key below aggregate
In CBDB, if there is an AGG operator (one-step AGG or final AGG operator) that requires data redistribution, then the redistribution motion operator will used all `GROUP BY` keys as the redistribute keys. In fact, only a single key needs to be redistributed, and the results of AGG will be the same. Reducing the number of redistributed keys can effectively reduce the overhead of hash function calls in motion operator. However, this may lead to data skew. Therefore, the current commit provides several different strategies for deciding how redistribution keys should be selected during redistribution motion operator (which under the AGG operator). User can use the GUC `optimizer_agg_pds_strategy` to select the strategies. - OPTIMIZER_AGG_PDS_ALL_KEY(value: 0): default one, select all `GROUP BY` key as the redistributed keys. - OPTIMIZER_AGG_PDS_FIRST_KEY(value: 1): select the first `GROUP BY` key as the redistributed keys. - OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY(value: 2): select a `GROUP BY` key which has the minimal and positive typlen as the redistributed keys. If only non-fixed type (such as text and varchar) exist, select the first `GROUP BY` key. - OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED(value: 3): select the `GROUP BY` key which is fixed typlen the redistributed keys. If only non-fixed type (such as text and varchar) exist, select the first `GROUP BY` key.
1 parent b9849f3 commit 60eb10d

File tree

9 files changed

+301
-2
lines changed

9 files changed

+301
-2
lines changed

src/backend/gpopt/config/CConfigParamMapping.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,14 @@ CConfigParamMapping::PackConfigParamInBitset(
578578
traceflag_bitset->ExchangeSet(EopttraceEnableWindowHashAgg);
579579
}
580580

581+
if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_FIRST_KEY) {
582+
traceflag_bitset->ExchangeSet(EopttraceAggRRSFirstKey);
583+
} else if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY) {
584+
traceflag_bitset->ExchangeSet(EopttraceAggRRSMinimalLenKey);
585+
} else if (optimizer_agg_pds_strategy == OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED) {
586+
traceflag_bitset->ExchangeSet(EopttraceAggRRSExcludeNonFixedKey);
587+
}
588+
581589
return traceflag_bitset;
582590
}
583591

src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
// @doc:
99
// Implementation of basic aggregate operator
1010
//---------------------------------------------------------------------------
11-
1211
#include "gpopt/operators/CPhysicalAgg.h"
1312

1413
#include "gpos/base.h"
@@ -295,11 +294,65 @@ CDistributionSpec *
295294
CPhysicalAgg::PdsMaximalHashed(CMemoryPool *mp, CColRefArray *colref_array)
296295
{
297296
GPOS_ASSERT(nullptr != colref_array);
297+
CColRefArray *pcraResHashs = nullptr;
298+
299+
if (GPOS_FTRACE(EopttraceAggRRSFirstKey)) {
300+
pcraResHashs = GPOS_NEW(mp) CColRefArray(mp);
301+
if (colref_array->Size() > 0) {
302+
CColRef *colref = (*colref_array)[0];
303+
pcraResHashs->Append(colref);
304+
}
305+
} else if (GPOS_FTRACE(EopttraceAggRRSMinimalLenKey)) {
306+
ULONG ulsz = colref_array->Size();
307+
pcraResHashs = GPOS_NEW(mp) CColRefArray(mp);
308+
309+
LINT minimal_typlen = gpos::lint_max; // less than minimal typlen
310+
LINT minimal_typlen_ul = -1;
311+
for (ULONG ul = 0; ul < ulsz; ul++) {
312+
CColRef *pcr =(*colref_array)[ul];
313+
const gpmd::IMDType *pmdtyp = pcr->RetrieveType();
314+
LINT typlen = pmdtyp->IsFixedLength() ? pmdtyp->Length() : (gpos::lint_max - 1);
315+
316+
if (typlen < minimal_typlen) {
317+
minimal_typlen = typlen;
318+
minimal_typlen_ul = ul;
319+
}
320+
}
321+
322+
if (minimal_typlen_ul != -1) {
323+
pcraResHashs->Append((*colref_array)[minimal_typlen_ul]);
324+
}
325+
} else if (GPOS_FTRACE(EopttraceAggRRSExcludeNonFixedKey)) {
326+
ULONG ulsz = colref_array->Size();
327+
pcraResHashs = GPOS_NEW(mp) CColRefArray(mp);
328+
329+
for (ULONG ul = 0; ul < ulsz; ul++) {
330+
CColRef *pcr =(*colref_array)[ul];
331+
const gpmd::IMDType *pmdtyp = pcr->RetrieveType();
332+
if (pmdtyp->IsFixedLength()) {
333+
pcraResHashs->Append(pcr);
334+
}
335+
}
336+
337+
// no key in result
338+
if (pcraResHashs->Size() == 0) {
339+
colref_array->AddRef();
340+
pcraResHashs = colref_array;
341+
}
342+
} else {
343+
colref_array->AddRef();
344+
pcraResHashs = colref_array;
345+
}
346+
347+
GPOS_ASSERT(nullptr != pcraResHashs);
348+
GPOS_ASSERT_IMP(colref_array->Size() > 0, pcraResHashs->Size() > 0);
298349

299350
CDistributionSpecHashed *pdshashedMaximal =
300351
CDistributionSpecHashed::PdshashedMaximal(
301-
mp, colref_array, true /*fNullsColocated*/
352+
mp, pcraResHashs, true /*fNullsColocated*/
302353
);
354+
355+
pcraResHashs->Release();
303356
if (nullptr != pdshashedMaximal)
304357
{
305358
return pdshashedMaximal;

src/backend/gporca/libnaucrates/include/naucrates/traceflags/traceflags.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,15 @@ enum EOptTraceFlag
244244
// Enable window hash agg
245245
EopttraceEnableWindowHashAgg = 103050,
246246

247+
// Use the first key in AGG Pds
248+
EopttraceAggRRSFirstKey = 103051,
249+
250+
// Use the minimal length key in AGG Pds
251+
EopttraceAggRRSMinimalLenKey = 103052,
252+
253+
// Use the all key exclude the non-fixed key in AGG pds
254+
EopttraceAggRRSExcludeNonFixedKey = 103053,
255+
247256
///////////////////////////////////////////////////////
248257
///////////////////// statistics flags ////////////////
249258
//////////////////////////////////////////////////////

src/backend/utils/misc/guc_gp.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ bool optimizer_enable_foreign_table;
365365
bool optimizer_enable_right_outer_join;
366366
bool optimizer_enable_query_parameter;
367367
bool optimizer_force_window_hash_agg;
368+
int optimizer_agg_pds_strategy;
368369

369370
/* Optimizer plan enumeration related GUCs */
370371
bool optimizer_enumerate_plans;
@@ -4490,6 +4491,17 @@ struct config_int ConfigureNamesInt_gp[] =
44904491
NULL, NULL, NULL
44914492
},
44924493

4494+
{
4495+
{"optimizer_agg_pds_strategy", PGC_USERSET, DEVELOPER_OPTIONS,
4496+
gettext_noop("Set the strategy of agg required distribution."),
4497+
NULL,
4498+
GUC_NOT_IN_SAMPLE
4499+
},
4500+
&optimizer_agg_pds_strategy,
4501+
OPTIMIZER_AGG_PDS_ALL_KEY, OPTIMIZER_AGG_PDS_ALL_KEY, OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED,
4502+
NULL, NULL, NULL
4503+
},
4504+
44934505
{
44944506
{"memory_profiler_dataset_size", PGC_USERSET, DEVELOPER_OPTIONS,
44954507
gettext_noop("Set the size in GB"),

src/include/utils/guc.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,12 @@ extern bool create_restartpoint_on_ckpt_record_replay;
466466
#define OPTIMIZER_GPDB_CALIBRATED 1 /* GPDB's calibrated cost model */
467467
#define OPTIMIZER_GPDB_EXPERIMENTAL 2 /* GPDB's experimental cost model */
468468

469+
/* optimizer cost model */
470+
#define OPTIMIZER_AGG_PDS_ALL_KEY 0
471+
#define OPTIMIZER_AGG_PDS_FIRST_KEY 1
472+
#define OPTIMIZER_AGG_PDS_MINIMAL_LEN_KEY 2
473+
#define OPTIMIZER_AGG_PDS_EXCLUDE_NON_FIXED 3
474+
469475

470476
/* Optimizer related gucs */
471477
extern bool optimizer;
@@ -546,6 +552,7 @@ extern bool optimizer_enable_foreign_table;
546552
extern bool optimizer_enable_right_outer_join;
547553
extern bool optimizer_enable_query_parameter;
548554
extern bool optimizer_force_window_hash_agg;
555+
extern int optimizer_agg_pds_strategy;
549556

550557
/* Optimizer plan enumeration related GUCs */
551558
extern bool optimizer_enumerate_plans;

src/include/utils/unsync_guc_name.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,7 @@
440440
"optimizer_enable_right_outer_join",
441441
"optimizer_enable_query_parameter",
442442
"optimizer_force_window_hash_agg",
443+
"optimizer_agg_pds_strategy",
443444
"optimizer_enforce_subplans",
444445
"optimizer_enumerate_plans",
445446
"optimizer_expand_fulljoin",

src/test/regress/expected/aggregates.out

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3404,3 +3404,93 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
34043404

34053405
reset debug_print_aggref_in_explain;
34063406
reset optimizer_force_multistage_agg;
3407+
-- test the optimizer_agg_pds_strategy
3408+
DROP TABLE IF EXISTS pds_t1;
3409+
NOTICE: table "pds_t1" does not exist, skipping
3410+
create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) DISTRIBUTED BY (v1);
3411+
set optimizer_agg_pds_strategy to 0;
3412+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
3413+
QUERY PLAN
3414+
------------------------------------------------------------
3415+
Gather Motion 3:1 (slice1; segments: 3)
3416+
-> HashAggregate
3417+
Group Key: v2, v3
3418+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3419+
Hash Key: v2, v3
3420+
-> HashAggregate
3421+
Group Key: v2, v3
3422+
-> Seq Scan on pds_t1
3423+
Optimizer: Postgres query optimizer
3424+
(9 rows)
3425+
3426+
set optimizer_agg_pds_strategy to 1;
3427+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
3428+
QUERY PLAN
3429+
------------------------------------------------------------
3430+
Gather Motion 3:1 (slice1; segments: 3)
3431+
-> HashAggregate
3432+
Group Key: v2, v3
3433+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3434+
Hash Key: v2, v3
3435+
-> HashAggregate
3436+
Group Key: v2, v3
3437+
-> Seq Scan on pds_t1
3438+
Optimizer: Postgres query optimizer
3439+
(9 rows)
3440+
3441+
set optimizer_agg_pds_strategy to 2;
3442+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
3443+
QUERY PLAN
3444+
------------------------------------------------------------
3445+
Gather Motion 3:1 (slice1; segments: 3)
3446+
-> HashAggregate
3447+
Group Key: v2, v3
3448+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3449+
Hash Key: v2, v3
3450+
-> HashAggregate
3451+
Group Key: v2, v3
3452+
-> Seq Scan on pds_t1
3453+
Optimizer: Postgres query optimizer
3454+
(9 rows)
3455+
3456+
set optimizer_agg_pds_strategy to 3;
3457+
explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
3458+
QUERY PLAN
3459+
------------------------------------------------------------
3460+
Gather Motion 3:1 (slice1; segments: 3)
3461+
-> HashAggregate
3462+
Group Key: v2, v3, v4, v5, v6
3463+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3464+
Hash Key: v2, v3, v4, v5, v6
3465+
-> HashAggregate
3466+
Group Key: v2, v3, v4, v5, v6
3467+
-> Seq Scan on pds_t1
3468+
Optimizer: Postgres query optimizer
3469+
(9 rows)
3470+
3471+
-- We can't dedup the "Redistribute Motion", cause in this step we can't know the
3472+
-- distribution of output column which from the underlying operators.
3473+
-- So you need to be cautious when opening this guc.
3474+
set optimizer_agg_pds_strategy to 1;
3475+
explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
3476+
QUERY PLAN
3477+
------------------------------------------
3478+
Gather Motion 3:1 (slice1; segments: 3)
3479+
-> HashAggregate
3480+
Group Key: v3, v2, v1
3481+
-> Seq Scan on pds_t1
3482+
Optimizer: Postgres query optimizer
3483+
(5 rows)
3484+
3485+
set optimizer_agg_pds_strategy to 0;
3486+
explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
3487+
QUERY PLAN
3488+
------------------------------------------
3489+
Gather Motion 3:1 (slice1; segments: 3)
3490+
-> HashAggregate
3491+
Group Key: v3, v2, v1
3492+
-> Seq Scan on pds_t1
3493+
Optimizer: Postgres query optimizer
3494+
(5 rows)
3495+
3496+
reset optimizer_agg_pds_strategy;

src/test/regress/expected/aggregates_optimizer.out

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3573,3 +3573,99 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
35733573

35743574
reset debug_print_aggref_in_explain;
35753575
reset optimizer_force_multistage_agg;
3576+
-- test the optimizer_agg_pds_strategy
3577+
DROP TABLE IF EXISTS pds_t1;
3578+
NOTICE: table "pds_t1" does not exist, skipping
3579+
create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) DISTRIBUTED BY (v1);
3580+
set optimizer_agg_pds_strategy to 0;
3581+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
3582+
QUERY PLAN
3583+
------------------------------------------------------------------
3584+
Gather Motion 3:1 (slice1; segments: 3)
3585+
-> GroupAggregate
3586+
Group Key: v2, v3
3587+
-> Sort
3588+
Sort Key: v2, v3
3589+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3590+
Hash Key: v2, v3
3591+
-> Seq Scan on pds_t1
3592+
Optimizer: GPORCA
3593+
(9 rows)
3594+
3595+
set optimizer_agg_pds_strategy to 1;
3596+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
3597+
QUERY PLAN
3598+
------------------------------------------------------------------
3599+
Gather Motion 3:1 (slice1; segments: 3)
3600+
-> GroupAggregate
3601+
Group Key: v2, v3
3602+
-> Sort
3603+
Sort Key: v2, v3
3604+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3605+
Hash Key: v2
3606+
-> Seq Scan on pds_t1
3607+
Optimizer: GPORCA
3608+
(9 rows)
3609+
3610+
set optimizer_agg_pds_strategy to 2;
3611+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
3612+
QUERY PLAN
3613+
------------------------------------------------------------------
3614+
Gather Motion 3:1 (slice1; segments: 3)
3615+
-> GroupAggregate
3616+
Group Key: v2, v3
3617+
-> Sort
3618+
Sort Key: v2, v3
3619+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3620+
Hash Key: v3
3621+
-> Seq Scan on pds_t1
3622+
Optimizer: GPORCA
3623+
(9 rows)
3624+
3625+
set optimizer_agg_pds_strategy to 3;
3626+
explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
3627+
QUERY PLAN
3628+
------------------------------------------------------------------
3629+
Gather Motion 3:1 (slice1; segments: 3)
3630+
-> GroupAggregate
3631+
Group Key: v2, v3, v4, v5, v6
3632+
-> Sort
3633+
Sort Key: v2, v3, v4, v5, v6
3634+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3635+
Hash Key: v3, v5
3636+
-> Seq Scan on pds_t1
3637+
Optimizer: GPORCA
3638+
(9 rows)
3639+
3640+
-- We can't dedup the "Redistribute Motion", cause in this step we can't know the
3641+
-- distribution of output column which from the underlying operators.
3642+
-- So you need to be cautious when opening this guc.
3643+
set optimizer_agg_pds_strategy to 1;
3644+
explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
3645+
QUERY PLAN
3646+
------------------------------------------------------------------
3647+
Gather Motion 3:1 (slice1; segments: 3)
3648+
-> GroupAggregate
3649+
Group Key: v3, v2, v1
3650+
-> Sort
3651+
Sort Key: v3, v2, v1
3652+
-> Redistribute Motion 3:3 (slice2; segments: 3)
3653+
Hash Key: v3
3654+
-> Seq Scan on pds_t1
3655+
Optimizer: GPORCA
3656+
(9 rows)
3657+
3658+
set optimizer_agg_pds_strategy to 0;
3659+
explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
3660+
QUERY PLAN
3661+
------------------------------------------
3662+
Gather Motion 3:1 (slice1; segments: 3)
3663+
-> GroupAggregate
3664+
Group Key: v3, v2, v1
3665+
-> Sort
3666+
Sort Key: v3, v2, v1
3667+
-> Seq Scan on pds_t1
3668+
Optimizer: GPORCA
3669+
(7 rows)
3670+
3671+
reset optimizer_agg_pds_strategy;

src/test/regress/sql/aggregates.sql

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1528,3 +1528,26 @@ having sum(tgb1.v3 * tgb2.v3) > 100 and
15281528

15291529
reset debug_print_aggref_in_explain;
15301530
reset optimizer_force_multistage_agg;
1531+
1532+
-- test the optimizer_agg_pds_strategy
1533+
DROP TABLE IF EXISTS pds_t1;
1534+
create table pds_t1(v1 int, v2 text, v3 int, v4 text, v5 int, v6 text) DISTRIBUTED BY (v1);
1535+
1536+
set optimizer_agg_pds_strategy to 0;
1537+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
1538+
set optimizer_agg_pds_strategy to 1;
1539+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
1540+
set optimizer_agg_pds_strategy to 2;
1541+
explain (costs off) select v2,v3 from pds_t1 group by v2,v3;
1542+
set optimizer_agg_pds_strategy to 3;
1543+
explain (costs off) select v2,v3,v4,v5,v6 from pds_t1 group by v2,v3,v4,v5,v6;
1544+
1545+
-- We can't dedup the "Redistribute Motion", cause in this step we can't know the
1546+
-- distribution of output column which from the underlying operators.
1547+
-- So you need to be cautious when opening this guc.
1548+
set optimizer_agg_pds_strategy to 1;
1549+
explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
1550+
set optimizer_agg_pds_strategy to 0;
1551+
explain (costs off) select v1,v2,v3 from pds_t1 group by v3,v2,v1;
1552+
1553+
reset optimizer_agg_pds_strategy;

0 commit comments

Comments
 (0)