diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CLogicalGbAgg.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CLogicalGbAgg.h index a7f1b01c202..005b78db1e8 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CLogicalGbAgg.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CLogicalGbAgg.h @@ -131,6 +131,20 @@ class CLogicalGbAgg : public CLogicalUnary return m_fGeneratesDuplicates; } + // dose this aggregate generated by pushdown + BOOL + FAggPushdown() const + { + return m_aggpushdown; + } + + // this aggregate generated by pushdown + void + MarkAggPushdown() + { + m_aggpushdown = true; + } + // match function BOOL Matches(COperator *pop) const override; @@ -274,6 +288,8 @@ class CLogicalGbAgg : public CLogicalUnary // which type of multi-stage agg it is EAggStage m_aggStage; + // is current agg pushdown? + BOOL m_aggpushdown; }; // class CLogicalGbAgg } // namespace gpopt diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalAgg.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalAgg.h index a5a44e61d0c..0a1c5b5000e 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalAgg.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalAgg.h @@ -44,6 +44,9 @@ class CPhysicalAgg : public CPhysical CLogicalGbAgg::EAggStage m_aggStage; + // is aggregate gernerated by pushdown + BOOL m_aggpushdown; + // compute required distribution of the n-th child of an intermediate aggregate CDistributionSpec *PdsRequiredIntermediateAgg(CMemoryPool *mp, ULONG ulOptReq) const; @@ -111,7 +114,7 @@ class CPhysicalAgg : public CPhysical COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, CLogicalGbAgg::EAggStage aggStage, - BOOL should_enforce_distribution); + BOOL isAggPushdown, BOOL should_enforce_distribution); // is this agg generated by CXformSplitDQA BOOL IsAggFromSplitDQA() const; diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAgg.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAgg.h index 87c817d7aa2..ff63514e41f 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAgg.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAgg.h @@ -41,7 +41,7 @@ class CPhysicalHashAgg : public CPhysicalAgg BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, CLogicalGbAgg::EAggStage aggStage, - BOOL should_enforce_distribution = true + BOOL isAggPushdown, BOOL should_enforce_distribution // should_enforce_distribution should be set to false if // 'local' and 'global' splits don't need to have different // distributions. This flag is set to false if the local diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAggDeduplicate.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAggDeduplicate.h index 75fefc1535c..204a3bbe4b5 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAggDeduplicate.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalHashAggDeduplicate.h @@ -42,6 +42,7 @@ class CPhysicalHashAggDeduplicate : public CPhysicalHashAgg BOOL fGeneratesDuplicates, BOOL fMultiStage, BOOL isAggFromSplitDQA, CLogicalGbAgg::EAggStage aggStage, + BOOL isAggPushdown, BOOL should_enforce_distribution); // dtor diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalScalarAgg.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalScalarAgg.h index 17aee5e65d8..291b49f0732 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalScalarAgg.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalScalarAgg.h @@ -40,7 +40,8 @@ class CPhysicalScalarAgg : public CPhysicalAgg CColRefArray *pdrgpcrMinimal, // minimal grouping columns based on FD's COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution); + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution); // dtor ~CPhysicalScalarAgg() override; diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAgg.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAgg.h index 1cb0dbf9490..21dc3464638 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAgg.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAgg.h @@ -60,8 +60,8 @@ class CPhysicalStreamAgg : public CPhysicalAgg CColRefArray *pdrgpcrMinimal, // minimal grouping columns based on FD's COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, - BOOL should_enforce_distribution = true + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution // should_enforce_distribution should be set to false if // 'local' and 'global' splits don't need to have different // distributions. This flag is set to false if the local diff --git a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAggDeduplicate.h b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAggDeduplicate.h index 6a79c3690d3..a05479aa4bf 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAggDeduplicate.h +++ b/src/backend/gporca/libgpopt/include/gpopt/operators/CPhysicalStreamAggDeduplicate.h @@ -43,6 +43,7 @@ class CPhysicalStreamAggDeduplicate : public CPhysicalStreamAgg BOOL fGeneratesDuplicates, BOOL fMultiStage, BOOL isAggFromSplitDQA, CLogicalGbAgg::EAggStage aggStage, + BOOL isAggPushdown, BOOL should_enforce_distribution); // dtor diff --git a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXform.h b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXform.h index 904e6b4b1c8..225371097c5 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXform.h +++ b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXform.h @@ -165,6 +165,7 @@ class CXform : public CRefCount, public DbgPrintMixin ExfPushGbWithHavingBelowJoin, ExfPushGbBelowUnion, ExfPushGbBelowUnionAll, + ExfPushPartialAggBelowJoin, ExfSplitGbAgg, ExfSplitGbAggDedup, ExfSplitDQA, diff --git a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformPushPartialAggBelowJoin.h b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformPushPartialAggBelowJoin.h new file mode 100644 index 00000000000..4c4d858c813 --- /dev/null +++ b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformPushPartialAggBelowJoin.h @@ -0,0 +1,97 @@ +//--------------------------------------------------------------------------- +// Greenplum Database +// Copyright (C) 2012 EMC Corp. +// +// @filename: +// CXformPushPartialAggBelowJoin.h +// +// @doc: +// Push group by below join transform +//--------------------------------------------------------------------------- +#ifndef GPOPT_CXformPushPartialAggBelowJoin_H +#define GPOPT_CXformPushPartialAggBelowJoin_H + +#include + +#include "gpos/base.h" + +#include "gpopt/xforms/CXformExploration.h" +namespace gpopt +{ +using namespace gpos; + +//--------------------------------------------------------------------------- +// @class: +// CXformPushPartialAggBelowJoin +// +// @doc: +// Push group by below join transform +// +//--------------------------------------------------------------------------- +class CXformPushPartialAggBelowJoin : public CXformExploration +{ +private: + static BOOL FLocalGbAggAlreadyPushed(CExpression *pexprGlobalGb); + + static std::pair FCanPushLocalGbAggBelowJoin( + CMemoryPool *mp, CExpression *pexpr); + + static CExpression *ExchangeGbkeyFromJoinKey(CMemoryPool *mp, + CExpression *pexpr); + static CColRefSet *PcrsJoinKey(CMemoryPool *mp, CExpression *pexprOuter, + CExpression *pexprInner, + CExpression *pexprScalar); + + static CExpression *PushLocalGbAggBelowJoin(CMemoryPool *mp, + CExpression *pexprGlobalGb); + + static CColRefSet *PexprGetGbAggkey(CMemoryPool *mp, + CExpression *pexprGbAgg); + +public: + CXformPushPartialAggBelowJoin(const CXformPushPartialAggBelowJoin &) = + delete; + + // ctor + explicit CXformPushPartialAggBelowJoin(CMemoryPool *mp); + + // ctor + explicit CXformPushPartialAggBelowJoin(CExpression *pexprPattern); + + // dtor + ~CXformPushPartialAggBelowJoin() override = default; + + // ident accessors + EXformId + Exfid() const override + { + return ExfPushPartialAggBelowJoin; + } + + const CHAR * + SzId() const override + { + return "CXformPushPartialAggBelowJoin"; + } + + // Compatibility function + BOOL + FCompatible(CXform::EXformId exfid) override + { + return (CXform::ExfPushPartialAggBelowJoin != exfid); + } + + // compute xform promise for a given expression handle + EXformPromise Exfp(CExpressionHandle &exprhdl) const override; + + // actual transform + void Transform(CXformContext *pxfctxt, CXformResult *pxfres, + CExpression *pexpr) const override; + +}; // class CXformPushPartialAggBelowJoin + +} // namespace gpopt + +#endif // !GPOPT_CXformPushPartialAggBelowJoin_H + +// EOF diff --git a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformSplitDQA.h b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformSplitDQA.h index 9a62c9a5d4f..4c7512b6e41 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformSplitDQA.h +++ b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformSplitDQA.h @@ -117,7 +117,8 @@ class CXformSplitDQA : public CXformExploration FCompatible(CXform::EXformId exfid) override { return (CXform::ExfSplitDQA != exfid) && - (CXform::ExfSplitGbAgg != exfid); + (CXform::ExfSplitGbAgg != exfid) && + (CXform::ExfPushPartialAggBelowJoin != exfid); } // compute xform promise for a given expression handle diff --git a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformUtils.h b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformUtils.h index 65651204184..6494cfc992a 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformUtils.h +++ b/src/backend/gporca/libgpopt/include/gpopt/xforms/CXformUtils.h @@ -82,10 +82,6 @@ class CXformUtils CTableDescriptor *ptabdesc, CColRefArray *colref_array); - // helper for extracting foreign key - static CColRefSet *PcrsFKey(CMemoryPool *mp, CExpressionArray *pdrgpexpr, - CColRefSet *prcsOutput, CColRefSet *pcrsKey); - // return the set of columns from the given array of columns which appear // in the index included / key columns static CColRefSet *PcrsIndexColumns(CMemoryPool *mp, @@ -172,13 +168,6 @@ class CXformUtils CColRefSet *prcsOutput, CColRefSet *pcrsGrpCols); - // check if the preconditions for pushing down Group by through join are satisfied - static BOOL FCanPushGbAggBelowJoin(CColRefSet *pcrsGrpCols, - CColRefSet *pcrsJoinOuterChildOutput, - CColRefSet *pcrsJoinScalarUsedFromOuter, - CColRefSet *pcrsGrpByOutput, - CColRefSet *pcrsGrpByUsed, - CColRefSet *pcrsFKey); // create a dynamic operator for a btree index plan template static CLogical * @@ -295,11 +284,23 @@ class CXformUtils static CXform::EXformPromise ExfpExpandJoinOrder(CExpressionHandle &exprhdl, const CXform *xform); + // extract foreign key + static CColRefSet *PcrsFKey(CMemoryPool *mp, CExpressionArray *pdrgpexpr, + CColRefSet *prcsOutput, CColRefSet *pcrsKey); + // extract foreign key static CColRefSet *PcrsFKey(CMemoryPool *mp, CExpression *pexprOuter, CExpression *pexprInner, CExpression *pexprScalar); + // check if the preconditions for pushing down Group by through join are satisfied + static BOOL FCanPushGbAggBelowJoin(CColRefSet *pcrsGrpCols, + CColRefSet *pcrsJoinOuterChildOutput, + CColRefSet *pcrsJoinScalarUsedFromOuter, + CColRefSet *pcrsGrpByOutput, + CColRefSet *pcrsGrpByUsed, + CColRefSet *pcrsFKey); + // compute a swap of the two given joins static CExpression *PexprSwapJoins(CMemoryPool *mp, CExpression *pexprTopJoin, diff --git a/src/backend/gporca/libgpopt/include/gpopt/xforms/xforms.h b/src/backend/gporca/libgpopt/include/gpopt/xforms/xforms.h index d0972b2be2c..fb5ba6b1e3c 100644 --- a/src/backend/gporca/libgpopt/include/gpopt/xforms/xforms.h +++ b/src/backend/gporca/libgpopt/include/gpopt/xforms/xforms.h @@ -132,6 +132,7 @@ #include "gpopt/xforms/CXformPushGbBelowJoin.h" #include "gpopt/xforms/CXformPushGbBelowUnion.h" #include "gpopt/xforms/CXformPushGbBelowUnionAll.h" +#include "gpopt/xforms/CXformPushPartialAggBelowJoin.h" #include "gpopt/xforms/CXformPushGbDedupBelowJoin.h" #include "gpopt/xforms/CXformPushGbWithHavingBelowJoin.h" #include "gpopt/xforms/CXformPushJoinBelowLeftUnionAll.h" diff --git a/src/backend/gporca/libgpopt/src/operators/CLogicalGbAgg.cpp b/src/backend/gporca/libgpopt/src/operators/CLogicalGbAgg.cpp index 12a367dba3e..6d70f7df0a3 100644 --- a/src/backend/gporca/libgpopt/src/operators/CLogicalGbAgg.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CLogicalGbAgg.cpp @@ -40,7 +40,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp) m_pdrgpcr(nullptr), m_pdrgpcrMinimal(nullptr), m_egbaggtype(COperator::EgbaggtypeSentinel), - m_aggStage(EasOthers) + m_aggStage(EasOthers), + m_aggpushdown(false) { m_fPattern = true; } @@ -62,7 +63,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp, CColRefArray *colref_array, m_pdrgpcr(colref_array), m_pdrgpcrMinimal(nullptr), m_egbaggtype(egbaggtype), - m_aggStage(EasOthers) + m_aggStage(EasOthers), + m_aggpushdown(false) { if (COperator::EgbaggtypeLocal == egbaggtype) { @@ -86,7 +88,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp, CColRefArray *colref_array, m_pdrgpcr(colref_array), m_pdrgpcrMinimal(nullptr), m_egbaggtype(egbaggtype), - m_aggStage(aggStage) + m_aggStage(aggStage), + m_aggpushdown(false) { if (COperator::EgbaggtypeLocal == egbaggtype) { @@ -120,7 +123,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp, CColRefArray *colref_array, m_pdrgpcr(colref_array), m_pdrgpcrMinimal(nullptr), m_egbaggtype(egbaggtype), - m_aggStage(EasOthers) + m_aggStage(EasOthers), + m_aggpushdown(false) { GPOS_ASSERT(nullptr != colref_array); GPOS_ASSERT(COperator::EgbaggtypeSentinel > egbaggtype); @@ -142,7 +146,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp, CColRefArray *colref_array, m_pdrgpcr(colref_array), m_pdrgpcrMinimal(nullptr), m_egbaggtype(egbaggtype), - m_aggStage(aggStage) + m_aggStage(aggStage), + m_aggpushdown(false) { GPOS_ASSERT(nullptr != colref_array); GPOS_ASSERT(COperator::EgbaggtypeSentinel > egbaggtype); @@ -171,7 +176,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp, CColRefArray *colref_array, m_pdrgpcr(colref_array), m_pdrgpcrMinimal(pdrgpcrMinimal), m_egbaggtype(egbaggtype), - m_aggStage(EasOthers) + m_aggStage(EasOthers), + m_aggpushdown(false) { GPOS_ASSERT(nullptr != colref_array); GPOS_ASSERT(COperator::EgbaggtypeSentinel > egbaggtype); @@ -209,7 +215,8 @@ CLogicalGbAgg::CLogicalGbAgg(CMemoryPool *mp, CColRefArray *colref_array, m_pdrgpcr(colref_array), m_pdrgpcrMinimal(pdrgpcrMinimal), m_egbaggtype(egbaggtype), - m_aggStage(EasOthers) + m_aggStage(EasOthers), + m_aggpushdown(false) { GPOS_ASSERT(nullptr != colref_array); GPOS_ASSERT(COperator::EgbaggtypeSentinel > egbaggtype); @@ -592,6 +599,7 @@ CLogicalGbAgg::PxfsCandidates(CMemoryPool *mp) const (void) xform_set->ExchangeSet(CXform::ExfPushGbBelowUnionAll); if (FGlobal()) { + (void) xform_set->ExchangeSet(CXform::ExfPushPartialAggBelowJoin); (void) xform_set->ExchangeSet(CXform::ExfSplitGbAgg); } (void) xform_set->ExchangeSet(CXform::ExfSplitDQA); @@ -698,7 +706,7 @@ CLogicalGbAgg::OsPrint(IOstream &os) const os << SzId() << "( "; OsPrintGbAggType(os, m_egbaggtype); - os << " )"; + os << (m_aggpushdown ? ", pushdown )" : " )"); os << " Grp Cols: ["; CUtils::OsPrintDrgPcr(os, m_pdrgpcr); os << "]" diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp index e8f39bc59b8..0b35e399eb2 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp @@ -39,12 +39,14 @@ CPhysicalAgg::CPhysicalAgg( CColRefArray *pdrgpcrMinimal, // minimal grouping columns based on FD's COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution) + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution) : CPhysical(mp), m_pdrgpcr(colref_array), m_egbaggtype(egbaggtype), m_isAggFromSplitDQA(isAggFromSplitDQA), m_aggStage(aggStage), + m_aggpushdown(isAggPushdown), m_pdrgpcrMinimal(nullptr), m_fGeneratesDuplicates(fGeneratesDuplicates), m_pdrgpcrArgDQA(pdrgpcrArgDQA), @@ -696,6 +698,10 @@ CPhysicalAgg::OsPrint(IOstream &os) const { os << ", multi-stage"; } + if (m_aggpushdown) + { + os << ", pushdown"; + } os << " )"; diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAgg.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAgg.cpp index 198f1e8f7c2..82fd5d09799 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAgg.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAgg.cpp @@ -33,10 +33,12 @@ CPhysicalHashAgg::CPhysicalHashAgg( CMemoryPool *mp, CColRefArray *colref_array, CColRefArray *pdrgpcrMinimal, COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution) + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution) : CPhysicalAgg(mp, colref_array, pdrgpcrMinimal, egbaggtype, fGeneratesDuplicates, pdrgpcrArgDQA, fMultiStage, - isAggFromSplitDQA, aggStage, should_enforce_distribution) + isAggFromSplitDQA, aggStage, isAggPushdown, + should_enforce_distribution) { } diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAggDeduplicate.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAggDeduplicate.cpp index 089f431e261..41aab57db29 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAggDeduplicate.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalHashAggDeduplicate.cpp @@ -33,11 +33,12 @@ CPhysicalHashAggDeduplicate::CPhysicalHashAggDeduplicate( CMemoryPool *mp, CColRefArray *colref_array, CColRefArray *pdrgpcrMinimal, COperator::EGbAggType egbaggtype, CColRefArray *pdrgpcrKeys, BOOL fGeneratesDuplicates, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution) + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution) : CPhysicalHashAgg(mp, colref_array, pdrgpcrMinimal, egbaggtype, fGeneratesDuplicates, nullptr /*pdrgpcrGbMinusDistinct*/, fMultiStage, isAggFromSplitDQA, aggStage, - should_enforce_distribution), + isAggPushdown, should_enforce_distribution), m_pdrgpcrKeys(pdrgpcrKeys) { GPOS_ASSERT(nullptr != pdrgpcrKeys); diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalScalarAgg.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalScalarAgg.cpp index 6b1f54fdfb4..c63f2790376 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalScalarAgg.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalScalarAgg.cpp @@ -33,10 +33,12 @@ CPhysicalScalarAgg::CPhysicalScalarAgg( CMemoryPool *mp, CColRefArray *colref_array, CColRefArray *pdrgpcrMinimal, COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution) + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution) : CPhysicalAgg(mp, colref_array, pdrgpcrMinimal, egbaggtype, fGeneratesDuplicates, pdrgpcrArgDQA, fMultiStage, - isAggFromSplitDQA, aggStage, should_enforce_distribution) + isAggFromSplitDQA, aggStage, isAggPushdown, + should_enforce_distribution) { } diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAgg.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAgg.cpp index 156f24d92ee..4f4b31de650 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAgg.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAgg.cpp @@ -36,10 +36,12 @@ CPhysicalStreamAgg::CPhysicalStreamAgg( CMemoryPool *mp, CColRefArray *colref_array, CColRefArray *pdrgpcrMinimal, COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates, CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution) + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution) : CPhysicalAgg(mp, colref_array, pdrgpcrMinimal, egbaggtype, fGeneratesDuplicates, pdrgpcrArgDQA, fMultiStage, - isAggFromSplitDQA, aggStage, should_enforce_distribution), + isAggFromSplitDQA, aggStage, isAggPushdown, + should_enforce_distribution), m_pos(nullptr) { GPOS_ASSERT(nullptr != m_pdrgpcrMinimal); diff --git a/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAggDeduplicate.cpp b/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAggDeduplicate.cpp index 500c9cdcb8e..8af573b6285 100644 --- a/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAggDeduplicate.cpp +++ b/src/backend/gporca/libgpopt/src/operators/CPhysicalStreamAggDeduplicate.cpp @@ -31,11 +31,12 @@ CPhysicalStreamAggDeduplicate::CPhysicalStreamAggDeduplicate( CMemoryPool *mp, CColRefArray *colref_array, CColRefArray *pdrgpcrMinimal, COperator::EGbAggType egbaggtype, CColRefArray *pdrgpcrKeys, BOOL fGeneratesDuplicates, BOOL fMultiStage, BOOL isAggFromSplitDQA, - CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution) + CLogicalGbAgg::EAggStage aggStage, BOOL isAggPushdown, + BOOL should_enforce_distribution) : CPhysicalStreamAgg( mp, colref_array, pdrgpcrMinimal, egbaggtype, fGeneratesDuplicates, nullptr /*pdrgpcrGbMinusDistinct*/, fMultiStage, isAggFromSplitDQA, - aggStage, should_enforce_distribution), + aggStage, isAggPushdown, should_enforce_distribution), m_pdrgpcrKeys(pdrgpcrKeys) { GPOS_ASSERT(nullptr != pdrgpcrKeys); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformFactory.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformFactory.cpp index f54aa957ebc..e21c24511dd 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformFactory.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformFactory.cpp @@ -243,6 +243,7 @@ CXformFactory::Instantiate() Add(GPOS_NEW(m_mp) CXformPushGbWithHavingBelowJoin(m_mp)); Add(GPOS_NEW(m_mp) CXformPushGbBelowUnion(m_mp)); Add(GPOS_NEW(m_mp) CXformPushGbBelowUnionAll(m_mp)); + Add(GPOS_NEW(m_mp) CXformPushPartialAggBelowJoin(m_mp)); Add(GPOS_NEW(m_mp) CXformSplitGbAgg(m_mp)); Add(GPOS_NEW(m_mp) CXformSplitGbAggDedup(m_mp)); Add(GPOS_NEW(m_mp) CXformSplitDQA(m_mp)); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2HashAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2HashAgg.cpp index c94a6bdff55..f739779762c 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2HashAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2HashAgg.cpp @@ -135,6 +135,7 @@ CXformGbAgg2HashAgg::Transform(CXformContext *pxfctxt, CXformResult *pxfres, popAgg->FGeneratesDuplicates(), pdrgpcrArgDQA, CXformUtils::FMultiStageAgg(pexpr), CXformUtils::FAggGenBySplitDQAXform(pexpr), popAgg->AggStage(), + popAgg->FAggPushdown(), !CXformUtils::FLocalAggCreatedByEagerAggXform(pexpr)), pexprRel, pexprScalar); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2ScalarAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2ScalarAgg.cpp index b270371663a..7564d830d46 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2ScalarAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2ScalarAgg.cpp @@ -107,6 +107,7 @@ CXformGbAgg2ScalarAgg::Transform(CXformContext *pxfctxt, CXformResult *pxfres, popAgg->FGeneratesDuplicates(), pdrgpcrArgDQA, CXformUtils::FMultiStageAgg(pexpr), CXformUtils::FAggGenBySplitDQAXform(pexpr), popAgg->AggStage(), + popAgg->FAggPushdown(), !CXformUtils::FLocalAggCreatedByEagerAggXform(pexpr)), pexprRel, pexprScalar); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2StreamAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2StreamAgg.cpp index a4875cf222c..2cca4071ce2 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2StreamAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformGbAgg2StreamAgg.cpp @@ -121,6 +121,7 @@ CXformGbAgg2StreamAgg::Transform(CXformContext *pxfctxt, CXformResult *pxfres, popAgg->FGeneratesDuplicates(), pdrgpcrArgDQA, CXformUtils::FMultiStageAgg(pexpr), CXformUtils::FAggGenBySplitDQAXform(pexpr), popAgg->AggStage(), + popAgg->FAggPushdown(), !CXformUtils::FLocalAggCreatedByEagerAggXform(pexpr)), pexprRel, pexprScalar); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2HashAggDedup.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2HashAggDedup.cpp index b139e6c9f63..2c7994869c5 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2HashAggDedup.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2HashAggDedup.cpp @@ -83,6 +83,7 @@ CXformGbAggDedup2HashAggDedup::Transform(CXformContext *pxfctxt, popAggDedup->FGeneratesDuplicates(), CXformUtils::FMultiStageAgg(pexpr), CXformUtils::FAggGenBySplitDQAXform(pexpr), popAggDedup->AggStage(), + popAggDedup->FAggPushdown(), !CXformUtils::FLocalAggCreatedByEagerAggXform(pexpr)), pexprRel, pexprScalar); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2StreamAggDedup.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2StreamAggDedup.cpp index 388e6eed2a0..f6341a96eec 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2StreamAggDedup.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformGbAggDedup2StreamAggDedup.cpp @@ -84,6 +84,7 @@ CXformGbAggDedup2StreamAggDedup::Transform(CXformContext *pxfctxt, popAggDedup->FGeneratesDuplicates(), CXformUtils::FMultiStageAgg(pexpr), CXformUtils::FAggGenBySplitDQAXform(pexpr), popAggDedup->AggStage(), + popAggDedup->FAggPushdown(), !CXformUtils::FLocalAggCreatedByEagerAggXform(pexpr)), pexprRel, pexprScalar); diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformPushPartialAggBelowJoin.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformPushPartialAggBelowJoin.cpp new file mode 100644 index 00000000000..f551b4a3aef --- /dev/null +++ b/src/backend/gporca/libgpopt/src/xforms/CXformPushPartialAggBelowJoin.cpp @@ -0,0 +1,422 @@ +//--------------------------------------------------------------------------- +// Greenplum Database +// Copyright (C) 2012 EMC Corp. +// +// @filename: +// CXformPushPartialAggBelowJoin.cpp +// +// @doc: +// Implementation of pushing group by below join transform +//--------------------------------------------------------------------------- + +#include "gpopt/xforms/CXformPushPartialAggBelowJoin.h" + +#include "gpos/base.h" + +#include "gpopt/operators/CLogicalGbAgg.h" +#include "gpopt/operators/CLogicalInnerJoin.h" +#include "gpopt/operators/CPatternLeaf.h" +#include "gpopt/xforms/CXformUtils.h" + +using namespace gpopt; + +// ctor +CXformPushPartialAggBelowJoin::CXformPushPartialAggBelowJoin(CMemoryPool *mp) + : // pattern + CXformExploration(GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CLogicalGbAgg(mp), // global-stage agg + GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CLogicalGbAgg(mp), // local-stage agg + GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CLogicalInnerJoin(mp), + GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CPatternLeaf(mp)), // join outer child + GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CPatternLeaf(mp)), // join inner child + GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CPatternTree(mp)) // join predicate + ), + GPOS_NEW(mp) CExpression( + mp, + GPOS_NEW(mp) CPatternTree(mp)) // local scalar project list + ), + GPOS_NEW(mp) CExpression( + mp, GPOS_NEW(mp) CPatternTree(mp)) // global scalar project list + )) +{ +} + +// ctor +CXformPushPartialAggBelowJoin::CXformPushPartialAggBelowJoin( + CExpression *pexprPattern) + : CXformExploration(pexprPattern) +{ +} + + +BOOL +CXformPushPartialAggBelowJoin::FLocalGbAggAlreadyPushed( + CExpression *pexprGlobalGb) +{ + CExpression *pexprLocalGb = (*pexprGlobalGb)[0]; + CExpression *pexprJoin = (*pexprLocalGb)[0]; + CExpression *pexprOuter = (*pexprJoin)[0]; + return pexprOuter->Pop()->Eopid() == COperator::EopLogicalGbAgg && + CLogicalGbAgg::PopConvert(pexprOuter->Pop())->Egbaggtype() == + COperator::EgbaggtypeLocal; +} + + +// compute xform promise for a given expression handle +CXform::EXformPromise +CXformPushPartialAggBelowJoin::Exfp(CExpressionHandle &exprhdl) const +{ + CLogicalGbAgg *popGbAgg = CLogicalGbAgg::PopConvert(exprhdl.Pop()); + if (!popGbAgg->FGlobal()) // multi-stage-but-local-agg + { + return CXform::ExfpNone; + } + + return CXform::ExfpHigh; +} + +CColRefSet * +CXformPushPartialAggBelowJoin::PexprGetGbAggkey(CMemoryPool *mp, + CExpression *pexprGbAgg) +{ + // get the group by key set + CLogicalGbAgg *popGbAgg = CLogicalGbAgg::PopConvert(pexprGbAgg->Pop()); + + CColRefSet *pcrsGbKey = GPOS_NEW(mp) CColRefSet(mp); + CColRefArray *colref_array = popGbAgg->PdrgpcrMinimal(); + if (nullptr == colref_array) + { + colref_array = popGbAgg->Pdrgpcr(); + } + pcrsGbKey->Include(colref_array); + return pcrsGbKey; +} + +// always get the outer join key +CColRefSet * +CXformPushPartialAggBelowJoin::PcrsJoinKey(CMemoryPool *mp, + CExpression *pexprOuter, + CExpression *pexprInner, + CExpression *pexprScalar) +{ + CColRefSet *prcsOuterOutput = pexprOuter->DeriveOutputColumns(); + CColRefSet *prcsInnerOutput = pexprInner->DeriveOutputColumns(); + CColRefSet *pcrsFKey = GPOS_NEW(mp) CColRefSet(mp); + + CExpressionArray *pdrgpexpr = + CPredicateUtils::PdrgpexprConjuncts(mp, pexprScalar); + + const ULONG ulConjuncts = pdrgpexpr->Size(); + for (ULONG ul = 0; ul < ulConjuncts; ul++) + { + CExpression *pexprConjunct = (*pdrgpexpr)[ul]; + if (!CPredicateUtils::FPlainEquality(pexprConjunct)) + { + continue; + } + + CColRef *pcrFst = const_cast( + CScalarIdent::PopConvert((*pexprConjunct)[0]->Pop())->Pcr()); + CColRef *pcrSnd = const_cast( + CScalarIdent::PopConvert((*pexprConjunct)[1]->Pop())->Pcr()); + if (prcsOuterOutput->FMember(pcrFst) && + prcsInnerOutput->FMember(pcrSnd)) + { + pcrsFKey->Include(pcrFst); + } + else if (prcsOuterOutput->FMember(pcrSnd) && + prcsInnerOutput->FMember(pcrFst)) + { + pcrsFKey->Include(pcrSnd); + } + } + + pdrgpexpr->Release(); + if (pcrsFKey->Size() == 0) + { + pcrsFKey->Release(); + pcrsFKey = nullptr; + } + + return pcrsFKey; +} + + +// Different with CXformUtils::PexprPushGbBelowJoin(will pushdown the one-stage agg) +// Push the partial agg only follow the three rules: +// 1. The keyset of group by in(contains all) the keyset of join +// 2. The output from join outer child contains the scalar from groupby +// - the scalar from groupby is not the groupby key, but it's the output of agg +// 3. The output of group by contains(all) the join scalar used from outer child +// +// Partial agg cannot be pushed through join because +// (1) no group by keysets in(contains all) join keysets, or +// (3) Gb uses columns from both join children, or +// (4) Gb hides columns required for the join scalar child +// +// not like the CXformUtils::PexprPushGbBelowJoin, pushdown a partial agg **NO REQUIRED** +// the unique key in the key of join inner child. +// +// Also we should **NEVER** pushdown the multi-stage agg from `CXformSplitDQA`(the DISTINCT case). +// +std::pair +CXformPushPartialAggBelowJoin::FCanPushLocalGbAggBelowJoin( + CMemoryPool *mp, CExpression *pexprGlobalGb) +{ + // safe to direct extract the child expressions + CExpression *pexprLocalGb = (*pexprGlobalGb)[0]; + + CExpression *pexprJoin = (*pexprLocalGb)[0]; + CExpression *pexprLocalPrjList = (*pexprLocalGb)[1]; + + CExpression *pexprOuter = (*pexprJoin)[0]; + CExpression *pexprInner = (*pexprJoin)[1]; + CExpression *pexprScalar = (*pexprJoin)[2]; + + // prepare the args for the `FCanPushGbAggBelowJoin` + CColRefSet *pcrsJoinOuterChildOutput = pexprOuter->DeriveOutputColumns(); + CColRefSet *pcrsGrpByUsed = pexprLocalPrjList->DeriveUsedColumns(); + CColRefSet *pcrsGrpByOutput = pexprLocalGb->DeriveOutputColumns(); + + BOOL fCanPush; + + // current agg form DQA, can't be pushed, otherwise the result may be wrong + if (!pexprLocalPrjList->PdrgPexpr() || + pexprLocalPrjList->PdrgPexpr()->Size() == 0 || + CLogicalGbAgg::PopConvert(pexprLocalGb->Pop())->PdrgpcrArgDQA() != + nullptr) + { + return std::make_pair(false, true); + } + + // get the group by key set + CColRefSet *pcrsGbKey = PexprGetGbAggkey(mp, pexprLocalGb); + + // get the keyset group by key in join key + CColRefSet *pcrsGbInJoinKey = + PcrsJoinKey(mp, pexprOuter, pexprInner, pexprScalar); + + // get the join scalar used from outer child + CColRefSet *pcrsJoinScalarUsedFromOuter = + GPOS_NEW(mp) CColRefSet(mp, *(pexprScalar->DeriveUsedColumns())); + pcrsJoinScalarUsedFromOuter->Intersection(pcrsJoinOuterChildOutput); + + // check can we pushdown the local agg + fCanPush = CXformUtils::FCanPushGbAggBelowJoin( + pcrsGbKey, pcrsJoinOuterChildOutput, pcrsJoinScalarUsedFromOuter, + pcrsGrpByOutput, pcrsGrpByUsed, pcrsGbInJoinKey); + + pcrsJoinScalarUsedFromOuter->Release(); + pcrsGbKey->Release(); + CRefCount::SafeRelease(pcrsGbInJoinKey); + return std::make_pair(fCanPush, false); +} + +CExpression * +CXformPushPartialAggBelowJoin::PushLocalGbAggBelowJoin( + CMemoryPool *mp, CExpression *pexprGlobalGb) +{ + CExpression *pexprLocalGb = (*pexprGlobalGb)[0]; + CLogicalGbAgg *popGbAgg = CLogicalGbAgg::PopConvert(pexprLocalGb->Pop()); + CExpression *pexprGlobalPrjList = (*pexprGlobalGb)[1]; + + CExpression *pexprJoin = (*pexprLocalGb)[0]; + CExpression *pexprLocalPrjList = (*pexprLocalGb)[1]; + + CExpression *pexprOuter = (*pexprJoin)[0]; + CExpression *pexprInner = (*pexprJoin)[1]; + CExpression *pexprScalar = (*pexprJoin)[2]; + + // get the group by key set + CColRefSet *pcrsGbKey = PexprGetGbAggkey(mp, pexprLocalGb); + + CColRefSet *pcrsJoinOuterChildOutput = pexprOuter->DeriveOutputColumns(); + CLogicalGbAgg *popGbAggNew; + + if (!pcrsJoinOuterChildOutput->ContainsAll(pcrsGbKey)) + { + CColRefSet *pcrsGrpColsNew = GPOS_NEW(mp) CColRefSet(mp); + pcrsGrpColsNew->Include(pcrsGbKey); + pcrsGrpColsNew->Intersection(pcrsJoinOuterChildOutput); + + popGbAggNew = GPOS_NEW(mp) + CLogicalGbAgg(mp, pcrsGrpColsNew->Pdrgpcr(mp), + popGbAgg->Egbaggtype(), popGbAgg->AggStage()); + + pcrsGrpColsNew->Release(); + } + else + { + popGbAgg->AddRef(); + popGbAggNew = popGbAgg; + } + + pcrsGbKey->Release(); + + // recreate a local agg below join, above the join outer expression + pexprOuter->AddRef(); + pexprLocalPrjList->AddRef(); + + popGbAggNew->MarkAggPushdown(); + CExpression *pexprNewGb = GPOS_NEW(mp) + CExpression(mp, popGbAggNew, pexprOuter, pexprLocalPrjList); + + // recreate the join which outer is the local agg + COperator *popJoin = pexprJoin->Pop(); + popJoin->AddRef(); + pexprInner->AddRef(); + pexprScalar->AddRef(); + CExpression *pexprNewJoin = GPOS_NEW(mp) + CExpression(mp, popJoin, pexprNewGb, pexprInner, pexprScalar); + + // recreate the global agg below the join + CLogicalGbAgg *popGlobalGb = + CLogicalGbAgg::PopConvert(pexprGlobalGb->Pop()); + popGlobalGb->Pdrgpcr()->AddRef(); + CLogicalGbAgg *popGlobalGbNew = GPOS_NEW(mp) + CLogicalGbAgg(mp, popGlobalGb->Pdrgpcr(), popGlobalGb->Egbaggtype(), + popGlobalGb->AggStage()); + popGlobalGbNew->MarkAggPushdown(); + + pexprGlobalPrjList->AddRef(); + return GPOS_NEW(mp) + CExpression(mp, popGlobalGbNew, pexprNewJoin, pexprGlobalPrjList); +} + + +// Used to exchange the group by key with join key +// Current groupby should not equal with outer join key but equal with inner join key. +// Also should not be the multi groupby key. +// Then exchange the local agg group by key to the outer join key +CExpression * +CXformPushPartialAggBelowJoin::ExchangeGbkeyFromJoinKey( + CMemoryPool *mp, CExpression *pexprGlobalGb) +{ + CExpression *pexprLocalGb = (*pexprGlobalGb)[0]; + CLogicalGbAgg *popGbAgg = CLogicalGbAgg::PopConvert(pexprLocalGb->Pop()); + CExpression *pexprGlobalPrjList = (*pexprGlobalGb)[1]; + + CExpression *pexprJoin = (*pexprLocalGb)[0]; + CExpression *pexprLocalPrjList = (*pexprLocalGb)[1]; + + CExpression *pexprOuter = (*pexprJoin)[0]; + CExpression *pexprInner = (*pexprJoin)[1]; + CExpression *pexprScalar = (*pexprJoin)[2]; + + CExpression *pexprNew = nullptr; + + // get the group by key set + CColRefSet *pcrsGbKey = PexprGetGbAggkey(mp, pexprLocalGb); + CColRefSet *pcrsGbInJoinKey = + PcrsJoinKey(mp, pexprOuter, pexprInner, pexprScalar); + + if (pcrsGbKey->Size() == 1 && pcrsGbInJoinKey && + pcrsGbInJoinKey->Size() == 1) + { + CColRefSet *pcrsJoinInnerKey = + PcrsJoinKey(mp, pexprInner, pexprOuter, pexprScalar); + + // current group by not in outer and inner join key + if (!pcrsJoinInnerKey || !pcrsGbKey->ContainsAll(pcrsJoinInnerKey)) + { + pcrsGbKey->Release(); + CRefCount::SafeRelease(pcrsJoinInnerKey); + CRefCount::SafeRelease(pcrsGbInJoinKey); + return nullptr; + } + else + { + CRefCount::SafeRelease(pcrsJoinInnerKey); + } + + CColRefArray *pcraGbInJoinKey = pcrsGbInJoinKey->Pdrgpcr(mp); + + // change group by to the join key(another side)mak + CLogicalGbAgg *popGbAggNew = GPOS_NEW(mp) CLogicalGbAgg( + mp, pcraGbInJoinKey, popGbAgg->Egbaggtype(), popGbAgg->AggStage()); + + pexprJoin->AddRef(); + pexprLocalPrjList->AddRef(); + + CExpression *pexprNewLocalGb = GPOS_NEW(mp) + CExpression(mp, popGbAggNew, pexprJoin, pexprLocalPrjList); + + // recreate the global agg also exchange the group by key + CLogicalGbAgg *popGlobalGb = + CLogicalGbAgg::PopConvert(pexprGlobalGb->Pop()); + popGlobalGb->Pdrgpcr()->AddRef(); + COperator *popGlobalGbNew = GPOS_NEW(mp) + CLogicalGbAgg(mp, popGlobalGb->Pdrgpcr(), popGlobalGb->Egbaggtype(), + popGlobalGb->AggStage()); + + pexprGlobalPrjList->AddRef(); + pexprNew = GPOS_NEW(mp) CExpression(mp, popGlobalGbNew, pexprNewLocalGb, + pexprGlobalPrjList); + } + + pcrsGbKey->Release(); + CRefCount::SafeRelease(pcrsGbInJoinKey); + + return pexprNew; +} + +// actual transform +void +CXformPushPartialAggBelowJoin::Transform(CXformContext *pxfctxt, + CXformResult *pxfres, + CExpression *pexpr) const +{ + GPOS_ASSERT(nullptr != pxfctxt); + GPOS_ASSERT(FPromising(pxfctxt->Pmp(), this, pexpr)); + GPOS_ASSERT(FCheckPattern(pexpr)); + + BOOL fCanPush; + BOOL fFailDueDQA; + CMemoryPool *mp = pxfctxt->Pmp(); + + // check current agg already been pushdowned + if (FLocalGbAggAlreadyPushed(pexpr)) + { + return; + } + + CExpression *pexprResult = nullptr; + std::tie(fCanPush, fFailDueDQA) = FCanPushLocalGbAggBelowJoin(mp, pexpr); + + // check the expression can do the pushdown. + if (fCanPush) + { + // do the pushdown + pexprResult = PushLocalGbAggBelowJoin(mp, pexpr); + GPOS_ASSERT(pexprResult); + } + else if (!fFailDueDQA) // no DQA, we can exchange the gb key and try again + { + // exchange the groupby by key + CExpression *pexprNew = ExchangeGbkeyFromJoinKey(mp, pexpr); + if (pexprNew) + { + std::tie(fCanPush, fFailDueDQA) = + FCanPushLocalGbAggBelowJoin(mp, pexprNew); + + // should not fail due the DQA + GPOS_ASSERT(!fFailDueDQA); + if (fCanPush) + { + pexprResult = PushLocalGbAggBelowJoin(mp, pexprNew); + GPOS_ASSERT(pexprResult); + } + } + + // release the expression + CRefCount::SafeRelease(pexprNew); + } + + if (pexprResult) + pxfres->Add(pexprResult); +} \ No newline at end of file diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSimplifyGbAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSimplifyGbAgg.cpp index 3abb4ffa69a..b7483342bff 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSimplifyGbAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSimplifyGbAgg.cpp @@ -61,9 +61,8 @@ CXformSimplifyGbAgg::Exfp(CExpressionHandle &exprhdl) const { CLogicalGbAgg *popAgg = CLogicalGbAgg::PopConvert(exprhdl.Pop()); - GPOS_ASSERT(COperator::EgbaggtypeGlobal == popAgg->Egbaggtype()); - - if (0 == popAgg->Pdrgpcr()->Size() || nullptr != popAgg->PdrgpcrMinimal()) + if (0 == popAgg->Pdrgpcr()->Size() || nullptr != popAgg->PdrgpcrMinimal() + || popAgg->Egbaggtype() != COperator::EgbaggtypeGlobal) { return CXform::ExfpNone; } diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp index a5ad355f5d3..c4008a885f3 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitDQA.cpp @@ -62,9 +62,11 @@ CXformSplitDQA::CXformSplitDQA(CMemoryPool *mp) CXform::EXformPromise CXformSplitDQA::Exfp(CExpressionHandle &exprhdl) const { + CLogicalGbAgg *popGbAgg = CLogicalGbAgg::PopConvert(exprhdl.Pop()); // do not split aggregate if it is not a global aggregate, has no distinct aggs, has MDQAs, has outer references, // or return types of Agg functions are ambiguous - if (!CLogicalGbAgg::PopConvert(exprhdl.Pop())->FGlobal() || + if (!popGbAgg->FGlobal() || + popGbAgg->FAggPushdown() || 0 == exprhdl.DeriveTotalDistinctAggs(1) || exprhdl.DeriveHasMultipleDistinctAggs(1) || 0 < exprhdl.DeriveOuterReferences()->Size() || diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp index 2a55afdc752..25a46d1519a 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformSplitGbAgg.cpp @@ -76,9 +76,11 @@ CXformSplitGbAgg::CXformSplitGbAgg(CExpression *pexprPattern) CXform::EXformPromise CXformSplitGbAgg::Exfp(CExpressionHandle &exprhdl) const { + CLogicalGbAgg *popGbAgg = CLogicalGbAgg::PopConvert(exprhdl.Pop()); // do not split aggregate if it is a local aggregate, has distinct aggs, has outer references, // or return types of Agg functions are ambiguous - if (!CLogicalGbAgg::PopConvert(exprhdl.Pop())->FGlobal() || + if (!popGbAgg->FGlobal() || + popGbAgg->FAggPushdown() || // current agg have been pushdown 0 < exprhdl.DeriveTotalDistinctAggs(1) || 0 < exprhdl.DeriveOuterReferences()->Size() || nullptr == exprhdl.PexprScalarExactChild(1) || diff --git a/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp b/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp index a21e3d963de..b810b058f4d 100644 --- a/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp +++ b/src/backend/gporca/libgpopt/src/xforms/CXformUtils.cpp @@ -560,6 +560,7 @@ CXformUtils::PexprPushGbBelowJoin(CMemoryPool *mp, CExpression *pexpr) pexprOuter->AddRef(); pexprPrjList->AddRef(); + popGbAggNew->MarkAggPushdown(); CExpression *pexprNewGb = GPOS_NEW(mp) CExpression(mp, popGbAggNew, pexprOuter, pexprPrjList); diff --git a/src/backend/gporca/libgpopt/src/xforms/Makefile b/src/backend/gporca/libgpopt/src/xforms/Makefile index a51ddd6f564..03f6293b36d 100644 --- a/src/backend/gporca/libgpopt/src/xforms/Makefile +++ b/src/backend/gporca/libgpopt/src/xforms/Makefile @@ -114,6 +114,7 @@ OBJS = CDecorrelator.o \ CXformPushGbBelowJoin.o \ CXformPushGbDedupBelowJoin.o \ CXformPushGbWithHavingBelowJoin.o \ + CXformPushPartialAggBelowJoin.o \ CXformRemoveSubqDistinct.o \ CXformResult.o \ CXformRightOuterJoin2HashJoin.o \ diff --git a/src/test/regress/expected/agg_pushdown.out b/src/test/regress/expected/agg_pushdown.out index 8e6b8e478ba..2895c8e69a8 100644 --- a/src/test/regress/expected/agg_pushdown.out +++ b/src/test/regress/expected/agg_pushdown.out @@ -1,23 +1,23 @@ --- disable ORCA -SET optimizer TO off; -- Test case group 1: basic functions CREATE TABLE agg_pushdown_parent ( - i int primary key, - x int); + i int, + x int); +NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'i' as the Apache Cloudberry data distribution key for this table. +HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. CREATE TABLE agg_pushdown_child1 ( - j int, - parent int, - v double precision, - PRIMARY KEY (j, parent)); -CREATE INDEX ON agg_pushdown_child1(parent); + j int, + parent int, + v double precision + ) + DISTRIBUTED BY (j, parent); CREATE TABLE agg_pushdown_child2 ( - k int, - parent int, - v double precision, - PRIMARY KEY (k, parent));; + k int, + parent int, + v double precision) + DISTRIBUTED BY (k, parent); INSERT INTO agg_pushdown_parent(i, x) SELECT n, n -FROM generate_series(0, 7) AS s(n); +FROM generate_series(0, 10) AS s(n); INSERT INTO agg_pushdown_child1(j, parent, v) SELECT 128 * i + n, i, random() FROM generate_series(0, 127) AS s(n), agg_pushdown_parent; @@ -27,79 +27,12 @@ FROM generate_series(0, 127) AS s(n), agg_pushdown_parent; ANALYZE agg_pushdown_parent; ANALYZE agg_pushdown_child1; ANALYZE agg_pushdown_child2; -SET enable_nestloop TO on; -SET enable_hashjoin TO off; -SET enable_mergejoin TO off; -- Perform scan of a table, aggregate the result, join it to the other table -- and finalize the aggregation. --- --- In addition, check that functionally dependent column "p.x" can be --- referenced by SELECT although GROUP BY references "p.i". -SET gp_enable_agg_pushdown TO off; -EXPLAIN (VERBOSE on, COSTS off) -SELECT p.x, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 -AS c1 ON c1.parent = p.i GROUP BY p.i; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) - Output: p.x, (avg(c1.v)), p.i - -> HashAggregate - Output: p.x, avg(c1.v), p.i - Group Key: p.i - -> Nested Loop - Output: p.i, p.x, c1.v - Inner Unique: true - -> Redistribute Motion 3:3 (slice2; segments: 3) - Output: c1.v, c1.parent - Hash Key: c1.parent - -> Seq Scan on public.agg_pushdown_child1 c1 - Output: c1.v, c1.parent - -> Memoize - Output: p.x, p.i - Cache Key: c1.parent - Cache Mode: logical - -> Index Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.x, p.i - Index Cond: (p.i = c1.parent) - Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', gp_enable_agg_pushdown = 'off', optimizer = 'off' - Optimizer: Postgres query optimizer -(22 rows) - -SET gp_enable_agg_pushdown TO on; -EXPLAIN (VERBOSE on, COSTS off) -SELECT p.x, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 -AS c1 ON c1.parent = p.i GROUP BY p.i; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) - Output: p.x, (avg(c1.v)), p.i - -> Finalize GroupAggregate - Output: p.x, avg(c1.v), p.i - Group Key: p.i - -> Sort - Output: p.i, p.x, (PARTIAL avg(c1.v)) - Sort Key: p.i - -> Nested Loop - Output: p.i, p.x, (PARTIAL avg(c1.v)) - Inner Unique: true - -> Redistribute Motion 3:3 (slice2; segments: 3) - Output: c1.parent, (PARTIAL avg(c1.v)) - Hash Key: c1.parent - -> Partial HashAggregate - Output: c1.parent, PARTIAL avg(c1.v) - Group Key: c1.parent - -> Seq Scan on public.agg_pushdown_child1 c1 - Output: c1.j, c1.parent, c1.v - -> Index Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.i, p.x - Index Cond: (p.i = c1.parent) - Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'off' - Optimizer: Postgres query optimizer -(24 rows) - --- The same for hash join. +SET enable_mergejoin TO off; SET enable_nestloop TO off; SET enable_hashjoin TO on; +SET gp_enable_agg_pushdown TO on; EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i GROUP BY p.i; @@ -182,63 +115,73 @@ SET enable_seqscan TO off; EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i GROUP BY p.i; - QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) Output: p.i, (avg(c1.v)) - -> HashAggregate + -> Finalize HashAggregate Output: p.i, avg(c1.v) Group Key: p.i - -> Nested Loop - Output: p.i, c1.v - Inner Unique: true - -> Redistribute Motion 3:3 (slice2; segments: 3) - Output: c1.v, c1.parent - Hash Key: c1.parent - -> Index Scan using agg_pushdown_child1_parent_idx on public.agg_pushdown_child1 c1 - Output: c1.v, c1.parent - -> Memoize - Output: p.i - Cache Key: c1.parent - Cache Mode: logical - -> Index Only Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.i - Index Cond: (p.i = c1.parent) - Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_seqscan = 'off', gp_enable_agg_pushdown = 'off', optimizer = 'off' + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: p.i, (PARTIAL avg(c1.v)) + Hash Key: p.i + -> Partial GroupAggregate + Output: p.i, PARTIAL avg(c1.v) + Group Key: p.i + -> Merge Join + Output: p.i, c1.v + Merge Cond: (p.i = c1.parent) + -> Sort + Output: p.i + Sort Key: p.i + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + -> Sort + Output: c1.v, c1.parent + Sort Key: c1.parent + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.v, c1.parent + Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'off', optimizer = 'off' Optimizer: Postgres query optimizer -(22 rows) +(28 rows) SET gp_enable_agg_pushdown TO on; EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i GROUP BY p.i; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) Output: p.i, (avg(c1.v)) - -> Finalize GroupAggregate + -> Finalize HashAggregate Output: p.i, avg(c1.v) Group Key: p.i - -> Sort + -> Redistribute Motion 3:3 (slice2; segments: 3) Output: p.i, (PARTIAL avg(c1.v)) - Sort Key: p.i - -> Nested Loop + Hash Key: p.i + -> Merge Join Output: p.i, (PARTIAL avg(c1.v)) - Inner Unique: true - -> Redistribute Motion 3:3 (slice2; segments: 3) + Merge Cond: (p.i = c1.parent) + -> Sort + Output: p.i + Sort Key: p.i + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + -> Sort Output: c1.parent, (PARTIAL avg(c1.v)) - Hash Key: c1.parent - -> Partial GroupAggregate + Sort Key: c1.parent + -> Partial HashAggregate Output: c1.parent, PARTIAL avg(c1.v) Group Key: c1.parent - -> Index Scan using agg_pushdown_child1_parent_idx on public.agg_pushdown_child1 c1 + -> Seq Scan on public.agg_pushdown_child1 c1 Output: c1.j, c1.parent, c1.v - -> Index Only Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.i - Index Cond: (p.i = c1.parent) - Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_seqscan = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'off' + Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'off' Optimizer: Postgres query optimizer -(24 rows) +(28 rows) SET enable_seqscan TO on; -- Join "c1" to "p.x" column, i.e. one that is not in the GROUP BY clause. The @@ -324,45 +267,49 @@ EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------- - Gather Motion 3:1 (slice1; segments: 3) - Output: p.i, (avg((c1.v + c2.v))) - -> HashAggregate - Output: p.i, avg((c1.v + c2.v)) - Group Key: p.i - -> Nested Loop - Output: p.i, c1.v, c2.v - Inner Unique: true - -> Redistribute Motion 3:3 (slice2; segments: 3) - Output: c1.v, c1.parent, c2.v, c2.parent - Hash Key: c1.parent + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Finalize GroupAggregate + Output: p.i, avg((c1.v + c2.v)) + Group Key: p.i + -> Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (PARTIAL avg((c1.v + c2.v))) + Merge Key: p.i + -> Partial GroupAggregate + Output: p.i, PARTIAL avg((c1.v + c2.v)) + Group Key: p.i + -> Sort + Output: p.i, c1.v, c2.v + Sort Key: p.i -> Nested Loop - Output: c1.v, c1.parent, c2.v, c2.parent - Inner Unique: true - -> Seq Scan on public.agg_pushdown_child1 c1 - Output: c1.j, c1.parent, c1.v - -> Index Scan using agg_pushdown_child2_pkey on public.agg_pushdown_child2 c2 - Output: c2.k, c2.parent, c2.v - Index Cond: ((c2.k = c1.j) AND (c2.parent = c1.parent)) - -> Memoize - Output: p.i - Cache Key: c1.parent - Cache Mode: logical - -> Index Only Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.i - Index Cond: (p.i = c1.parent) - Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', enable_seqscan = 'on', gp_enable_agg_pushdown = 'off', optimizer = 'off' + Output: p.i, c1.v, c2.v + Join Filter: (c1.parent = p.i) + -> Broadcast Motion 3:3 (slice2; segments: 3) + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + -> Materialize + Output: c1.v, c1.parent, c2.v, c2.parent + -> Nested Loop + Output: c1.v, c1.parent, c2.v, c2.parent + Join Filter: ((c1.parent = c2.parent) AND (c1.j = c2.k)) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.j, c1.parent, c1.v + -> Materialize + Output: c2.v, c2.parent, c2.k + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.v, c2.parent, c2.k + Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'off', optimizer = 'off' Optimizer: Postgres query optimizer -(28 rows) +(32 rows) SET gp_enable_agg_pushdown TO on; EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; - QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) Output: p.i, (avg((c1.v + c2.v))) -> Finalize GroupAggregate @@ -371,29 +318,33 @@ c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; -> Sort Output: p.i, (PARTIAL avg((c1.v + c2.v))) Sort Key: p.i - -> Nested Loop + -> Redistribute Motion 3:3 (slice2; segments: 3) Output: p.i, (PARTIAL avg((c1.v + c2.v))) - Inner Unique: true - -> Redistribute Motion 3:3 (slice2; segments: 3) - Output: c1.parent, c2.parent, (PARTIAL avg((c1.v + c2.v))) - Hash Key: c1.parent - -> Partial HashAggregate - Output: c1.parent, c2.parent, PARTIAL avg((c1.v + c2.v)) - Group Key: c1.parent - -> Nested Loop - Output: c1.v, c1.parent, c2.v, c2.parent - Inner Unique: true - -> Seq Scan on public.agg_pushdown_child1 c1 - Output: c1.j, c1.parent, c1.v - -> Index Scan using agg_pushdown_child2_pkey on public.agg_pushdown_child2 c2 - Output: c2.k, c2.parent, c2.v - Index Cond: ((c2.k = c1.j) AND (c2.parent = c1.parent)) - -> Index Only Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.i - Index Cond: (p.i = c1.parent) - Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'off' + Hash Key: p.i + -> Nested Loop + Output: p.i, (PARTIAL avg((c1.v + c2.v))) + Join Filter: (c1.parent = p.i) + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + -> Materialize + Output: c1.parent, c2.parent, (PARTIAL avg((c1.v + c2.v))) + -> Partial HashAggregate + Output: c1.parent, c2.parent, PARTIAL avg((c1.v + c2.v)) + Group Key: c1.parent + -> Nested Loop + Output: c1.v, c1.parent, c2.v, c2.parent + Join Filter: ((c1.parent = c2.parent) AND (c1.j = c2.k)) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.j, c1.parent, c1.v + -> Materialize + Output: c2.v, c2.parent, c2.k + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.v, c2.parent, c2.k + Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'off' Optimizer: Postgres query optimizer -(30 rows) +(34 rows) -- The same for hash join. SET enable_nestloop TO off; @@ -402,8 +353,8 @@ EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; - QUERY PLAN --------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) Output: p.i, (avg((c1.v + c2.v))) -> Finalize GroupAggregate @@ -429,7 +380,6 @@ c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; Group Key: c1.parent -> Hash Join Output: c1.v, c1.parent, c2.v, c2.parent - Inner Unique: true Hash Cond: ((c1.parent = c2.parent) AND (c1.j = c2.k)) -> Seq Scan on public.agg_pushdown_child1 c1 Output: c1.j, c1.parent, c1.v @@ -437,9 +387,9 @@ c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; Output: c2.v, c2.parent, c2.k -> Seq Scan on public.agg_pushdown_child2 c2 Output: c2.v, c2.parent, c2.k - Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'off' + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'off' Optimizer: Postgres query optimizer -(35 rows) +(34 rows) -- The same for merge join. SET enable_hashjoin TO off; @@ -449,39 +399,42 @@ EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; - QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------- + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Gather Motion 3:1 (slice1; segments: 3) Output: p.i, (avg((c1.v + c2.v))) - -> Finalize GroupAggregate + -> HashAggregate Output: p.i, avg((c1.v + c2.v)) Group Key: p.i - -> Merge Join - Output: p.i, (PARTIAL avg((c1.v + c2.v))) - Inner Unique: true - Merge Cond: (c1.parent = p.i) - -> Sort - Output: c1.parent, c2.parent, (PARTIAL avg((c1.v + c2.v))) - Sort Key: c1.parent - -> Redistribute Motion 3:3 (slice2; segments: 3) - Output: c1.parent, c2.parent, (PARTIAL avg((c1.v + c2.v))) - Hash Key: c1.parent - -> Partial HashAggregate - Output: c1.parent, c2.parent, PARTIAL avg((c1.v + c2.v)) - Group Key: c1.parent - -> Merge Join - Output: c1.v, c1.parent, c2.v, c2.parent - Inner Unique: true - Merge Cond: ((c1.j = c2.k) AND (c1.parent = c2.parent)) - -> Index Scan using agg_pushdown_child1_pkey on public.agg_pushdown_child1 c1 - Output: c1.j, c1.parent, c1.v - -> Index Scan using agg_pushdown_child2_pkey on public.agg_pushdown_child2 c2 - Output: c2.k, c2.parent, c2.v - -> Index Only Scan using agg_pushdown_parent_pkey on public.agg_pushdown_parent p - Output: p.i - Settings: enable_hashjoin = 'off', enable_mergejoin = 'on', enable_nestloop = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'off' + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: p.i, c1.v, c2.v + Hash Key: p.i + -> Merge Join + Output: p.i, c1.v, c2.v + Merge Cond: (c1.parent = p.i) + -> Merge Join + Output: c1.v, c1.parent, c2.v, c2.parent + Merge Cond: ((c1.parent = c2.parent) AND (c1.j = c2.k)) + -> Sort + Output: c1.v, c1.parent, c1.j + Sort Key: c1.parent, c1.j + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.v, c1.parent, c1.j + -> Sort + Output: c2.v, c2.parent, c2.k + Sort Key: c2.parent, c2.k + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.v, c2.parent, c2.k + -> Sort + Output: p.i + Sort Key: p.i + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'off', enable_mergejoin = 'on', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'off' Optimizer: Postgres query optimizer -(30 rows) +(33 rows) SET enable_seqscan TO on; -- Clear tables @@ -502,6 +455,7 @@ SET enable_nestloop TO off; SET enable_hashjoin TO on; SET enable_mergejoin TO off; SET gp_enable_agg_pushdown TO ON; +SET optimizer_force_multistage_agg to ON; -- Join key and group key are the same. EXPLAIN (VERBOSE on, COSTS off) SELECT t1.id, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY t1.id; @@ -846,7 +800,7 @@ NOTICE: table "nation_pd" does not exist, skipping CREATE TABLE vendor_pd (v_id int, v_name VARCHAR(20)) WITH (APPENDONLY=true, ORIENTATION=column); NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'v_id' as the Apache Cloudberry data distribution key for this table. HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. -CREATE TABLE customer_pd (c_id int primary key, c_v_id int, c_n_id int, c_type int, c_consumption int); +CREATE TABLE customer_pd (c_id int, c_v_id int, c_n_id int, c_type int, c_consumption int); CREATE TABLE nation_pd (n_id int, n_name VARCHAR(20), n_type int, n_population int) WITH (APPENDONLY=true, ORIENTATION=column); NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'n_id' as the Apache Cloudberry data distribution key for this table. HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. @@ -1153,16 +1107,11 @@ DROP TABLE pagg_pd; CREATE TABLE pagg_pd_p (a int, b int); NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. -CREATE TABLE pagg_pd (c text, d int) inherits (pagg_pd_p) PARTITION BY LIST(c); -ERROR: cannot create partitioned table as inheritance child DROP TABLE IF EXISTS pagg_pd, pagg_pd_p; NOTICE: table "pagg_pd" does not exist, skipping CREATE TABLE pagg_pd_p (a int, b int, c text) PARTITION BY LIST(c); NOTICE: Table doesn't have 'DISTRIBUTED BY' clause -- Using column named 'a' as the Apache Cloudberry data distribution key for this table. HINT: The 'DISTRIBUTED BY' clause determines the distribution of data. Make sure column(s) chosen are the optimal data distribution key to minimize skew. -CREATE TABLE pagg_pd (d int) inherits (pagg_pd_p); -NOTICE: table has parent, setting distribution columns to match parent table -ERROR: cannot inherit from partitioned table "pagg_pd_p" DROP TABLE IF EXISTS pagg_pd, pagg_pd_p; NOTICE: table "pagg_pd" does not exist, skipping CREATE TABLE pagg_pd_p (a int, b int); @@ -1234,7 +1183,7 @@ SELECT t1.c, sum(t1.a) DROP TABLE pagg_pd, pagg_pd_p; RESET enable_incremental_sort; -- Clear settings -SET optimizer TO default; +RESET optimizer_force_multistage_agg; SET gp_enable_agg_pushdown TO off; SET enable_seqscan TO on; SET enable_nestloop TO on; diff --git a/src/test/regress/expected/agg_pushdown_optimizer.out b/src/test/regress/expected/agg_pushdown_optimizer.out new file mode 100644 index 00000000000..af0b16315e9 --- /dev/null +++ b/src/test/regress/expected/agg_pushdown_optimizer.out @@ -0,0 +1,1113 @@ +-- Test case group 1: basic functions +CREATE TABLE agg_pushdown_parent ( + i int, + x int); +CREATE TABLE agg_pushdown_child1 ( + j int, + parent int, + v double precision + ) + DISTRIBUTED BY (j, parent); +CREATE TABLE agg_pushdown_child2 ( + k int, + parent int, + v double precision) + DISTRIBUTED BY (k, parent); +INSERT INTO agg_pushdown_parent(i, x) +SELECT n, n +FROM generate_series(0, 10) AS s(n); +INSERT INTO agg_pushdown_child1(j, parent, v) +SELECT 128 * i + n, i, random() +FROM generate_series(0, 127) AS s(n), agg_pushdown_parent; +INSERT INTO agg_pushdown_child2(k, parent, v) +SELECT 128 * i + n, i, random() +FROM generate_series(0, 127) AS s(n), agg_pushdown_parent; +ANALYZE agg_pushdown_parent; +ANALYZE agg_pushdown_child1; +ANALYZE agg_pushdown_child2; +-- Perform scan of a table, aggregate the result, join it to the other table +-- and finalize the aggregation. +SET enable_mergejoin TO off; +SET enable_nestloop TO off; +SET enable_hashjoin TO on; +SET gp_enable_agg_pushdown TO on; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 +AS c1 ON c1.parent = p.i GROUP BY p.i; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg(c1.v)) + -> Finalize HashAggregate + Output: p.i, avg(c1.v) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg(c1.v)) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg(c1.v)) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg(c1.v) + Group Key: c1.parent + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.parent, c1.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(22 rows) + +-- The same for merge join. +SET enable_hashjoin TO off; +SET enable_mergejoin TO on; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 +AS c1 ON c1.parent = p.i GROUP BY p.i; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg(c1.v)) + -> Finalize HashAggregate + Output: p.i, avg(c1.v) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg(c1.v)) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg(c1.v)) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg(c1.v) + Group Key: c1.parent + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.parent, c1.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'off', enable_mergejoin = 'on', enable_nestloop = 'off', enable_parallel = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(22 rows) + +-- Restore the default values. +SET enable_nestloop TO on; +SET enable_hashjoin TO on; +-- Scan index on agg_pushdown_child1(parent) column and aggregate the result +-- using AGG_SORTED strategy. +SET gp_enable_agg_pushdown TO off; +SET enable_seqscan TO off; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 +AS c1 ON c1.parent = p.i GROUP BY p.i; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg(c1.v)) + -> Finalize HashAggregate + Output: p.i, avg(c1.v) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg(c1.v)) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg(c1.v)) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg(c1.v) + Group Key: c1.parent + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.parent, c1.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'off', optimizer = 'on' + Optimizer: GPORCA +(22 rows) + +SET gp_enable_agg_pushdown TO on; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 +AS c1 ON c1.parent = p.i GROUP BY p.i; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg(c1.v)) + -> Finalize HashAggregate + Output: p.i, avg(c1.v) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg(c1.v)) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg(c1.v)) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg(c1.v) + Group Key: c1.parent + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.parent, c1.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(22 rows) + +SET enable_seqscan TO on; +-- Join "c1" to "p.x" column, i.e. one that is not in the GROUP BY clause. The +-- planner should still use "c1.parent" as grouping expression for partial +-- aggregation, although it's not in the same equivalence class as the GROUP +-- BY expression ("p.i"). The reason to use "c1.parent" for partial +-- aggregation is that this is the only way for "c1" to provide the join +-- expression with input data. +SET gp_enable_agg_pushdown TO off; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 +AS c1 ON c1.parent = p.x GROUP BY p.i; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg(c1.v)) + -> Finalize HashAggregate + Output: p.i, avg(c1.v) + Group Key: p.i + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: p.i, (PARTIAL avg(c1.v)) + Hash Key: p.i + -> Streaming Partial HashAggregate + Output: p.i, PARTIAL avg(c1.v) + Group Key: p.i + -> Hash Join + Output: p.i, c1.v + Hash Cond: (c1.parent = p.x) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.parent, c1.v + -> Hash + Output: p.i, p.x + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: p.i, p.x + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i, p.x + Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'off', optimizer = 'on' + Optimizer: GPORCA +(24 rows) + +SET gp_enable_agg_pushdown TO on; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 +AS c1 ON c1.parent = p.x GROUP BY p.i; + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg(c1.v)) + -> Finalize HashAggregate + Output: p.i, avg(c1.v) + Group Key: p.i + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: p.i, (PARTIAL avg(c1.v)) + Hash Key: p.i + -> Streaming Partial HashAggregate + Output: p.i, PARTIAL avg(c1.v) + Group Key: p.i + -> Hash Join + Output: p.i, c1.v + Hash Cond: (c1.parent = p.x) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.parent, c1.v + -> Hash + Output: p.i, p.x + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: p.i, p.x + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i, p.x + Settings: enable_hashjoin = 'on', enable_mergejoin = 'on', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(24 rows) + +-- Perform nestloop join between agg_pushdown_child1 and agg_pushdown_child2 +-- and aggregate the result. +SET enable_nestloop TO on; +SET enable_hashjoin TO off; +SET enable_mergejoin TO off; +SET gp_enable_agg_pushdown TO off; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN +agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON +c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg((c1.v + c2.v))) + -> Finalize HashAggregate + Output: p.i, avg((c1.v + c2.v)) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg((c1.v + c2.v))) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg((c1.v + c2.v))) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg((c1.v + c2.v)) + Group Key: c1.parent + -> Hash Join + Output: c1.parent, c1.v, c2.v + Hash Cond: ((c1.j = c2.k) AND (c1.parent = c2.parent)) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.j, c1.parent, c1.v + -> Hash + Output: c2.k, c2.parent, c2.v + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.k, c2.parent, c2.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'off', optimizer = 'on' + Optimizer: GPORCA +(29 rows) + +SET gp_enable_agg_pushdown TO on; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN +agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON +c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg((c1.v + c2.v))) + -> Finalize HashAggregate + Output: p.i, avg((c1.v + c2.v)) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg((c1.v + c2.v))) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg((c1.v + c2.v))) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg((c1.v + c2.v)) + Group Key: c1.parent + -> Hash Join + Output: c1.parent, c1.v, c2.v + Hash Cond: ((c1.j = c2.k) AND (c1.parent = c2.parent)) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.j, c1.parent, c1.v + -> Hash + Output: c2.k, c2.parent, c2.v + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.k, c2.parent, c2.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'off', enable_mergejoin = 'off', enable_nestloop = 'on', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(29 rows) + +-- The same for hash join. +SET enable_nestloop TO off; +SET enable_hashjoin TO on; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN +agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON +c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg((c1.v + c2.v))) + -> Finalize HashAggregate + Output: p.i, avg((c1.v + c2.v)) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg((c1.v + c2.v))) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg((c1.v + c2.v))) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg((c1.v + c2.v)) + Group Key: c1.parent + -> Hash Join + Output: c1.parent, c1.v, c2.v + Hash Cond: ((c1.j = c2.k) AND (c1.parent = c2.parent)) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.j, c1.parent, c1.v + -> Hash + Output: c2.k, c2.parent, c2.v + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.k, c2.parent, c2.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(29 rows) + +-- The same for merge join. +SET enable_hashjoin TO off; +SET enable_mergejoin TO on; +SET enable_seqscan TO off; +EXPLAIN (VERBOSE on, COSTS off) +SELECT p.i, avg(c1.v + c2.v) FROM agg_pushdown_parent AS p JOIN +agg_pushdown_child1 AS c1 ON c1.parent = p.i JOIN agg_pushdown_child2 AS c2 ON +c2.parent = p.i WHERE c1.j = c2.k GROUP BY p.i; + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: p.i, (avg((c1.v + c2.v))) + -> Finalize HashAggregate + Output: p.i, avg((c1.v + c2.v)) + Group Key: p.i + -> Hash Join + Output: p.i, (PARTIAL avg((c1.v + c2.v))) + Hash Cond: (c1.parent = p.i) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: c1.parent, (PARTIAL avg((c1.v + c2.v))) + Hash Key: c1.parent + -> Streaming Partial HashAggregate + Output: c1.parent, PARTIAL avg((c1.v + c2.v)) + Group Key: c1.parent + -> Hash Join + Output: c1.parent, c1.v, c2.v + Hash Cond: ((c1.j = c2.k) AND (c1.parent = c2.parent)) + -> Seq Scan on public.agg_pushdown_child1 c1 + Output: c1.j, c1.parent, c1.v + -> Hash + Output: c2.k, c2.parent, c2.v + -> Seq Scan on public.agg_pushdown_child2 c2 + Output: c2.k, c2.parent, c2.v + -> Hash + Output: p.i + -> Seq Scan on public.agg_pushdown_parent p + Output: p.i + Settings: enable_hashjoin = 'off', enable_mergejoin = 'on', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'off', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(29 rows) + +SET enable_seqscan TO on; +-- Clear tables +DROP TABLE agg_pushdown_child1; +DROP TABLE agg_pushdown_child2; +DROP TABLE agg_pushdown_parent; +-- Test case group 2: Pushdown with different join keys and group keys. +DROP TABLE IF EXISTS t1, t2; +NOTICE: table "t1" does not exist, skipping +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1 (id int, val int, comment VARCHAR(20)); +CREATE TABLE t2 (id int, val int); +SET enable_nestloop TO off; +SET enable_hashjoin TO on; +SET enable_mergejoin TO off; +SET gp_enable_agg_pushdown TO ON; +SET optimizer_force_multistage_agg to ON; +-- Join key and group key are the same. +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.id, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY t1.id; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.id, (sum(t1.val)) + -> Finalize HashAggregate + Output: t1.id, sum(t1.val) + Group Key: t1.id + -> Hash Join + Output: t1.id, (PARTIAL sum(t1.val)) + Hash Cond: (t1.id = t2.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.id, (PARTIAL sum(t1.val)) + Hash Key: t1.id + -> Streaming Partial HashAggregate + Output: t1.id, PARTIAL sum(t1.val) + Group Key: t1.id + -> Redistribute Motion 3:3 (slice3; segments: 3) + Output: t1.id, t1.val + -> Seq Scan on public.t1 + Output: t1.id, t1.val + -> Hash + Output: t2.id + -> Seq Scan on public.t2 + Output: t2.id + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(24 rows) + +-- Join key and group key are different. +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.val, SUM(t1.id) FROM t1, t2 WHERE t1.id = t2.id GROUP BY t1.val; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.val, (sum(t1.id)) + -> Finalize HashAggregate + Output: t1.val, sum(t1.id) + Group Key: t1.val + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.val, (PARTIAL sum(t1.id)) + Hash Key: t1.val + -> Streaming Partial HashAggregate + Output: t1.val, PARTIAL sum(t1.id) + Group Key: t1.val + -> Hash Join + Output: t1.id, t1.val + Hash Cond: (t1.id = t2.id) + -> Seq Scan on public.t1 + Output: t1.id, t1.val + -> Hash + Output: t2.id + -> Seq Scan on public.t2 + Output: t2.id + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(22 rows) + +-- Pushdown with equivclass. +EXPLAIN (VERBOSE on, COSTS off) +SELECT t2.id, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY t2.id; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t2.id, (sum(t1.val)) + -> Finalize HashAggregate + Output: t2.id, sum(t1.val) + Group Key: t2.id + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t2.id, (PARTIAL sum(t1.val)) + Hash Key: t2.id + -> Streaming Partial HashAggregate + Output: t2.id, PARTIAL sum(t1.val) + Group Key: t2.id + -> Redistribute Motion 3:3 (slice3; segments: 3) + Output: t1.val, t2.id + -> Hash Join + Output: t1.val, t2.id + Hash Cond: (t1.id = t2.id) + -> Seq Scan on public.t1 + Output: t1.id, t1.val + -> Hash + Output: t2.id + -> Seq Scan on public.t2 + Output: t2.id + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(24 rows) + +-- Group by column from t2 and aggregate column from t1. +EXPLAIN (VERBOSE on, COSTS off) +SELECT t2.val, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY t2.val; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t2.val, (sum(t1.val)) + -> Finalize HashAggregate + Output: t2.val, sum(t1.val) + Group Key: t2.val + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t2.val, (PARTIAL sum(t1.val)) + Hash Key: t2.val + -> Streaming Partial HashAggregate + Output: t2.val, PARTIAL sum(t1.val) + Group Key: t2.val + -> Hash Join + Output: t1.val, t2.val + Hash Cond: (t1.id = t2.id) + -> Seq Scan on public.t1 + Output: t1.id, t1.val + -> Hash + Output: t2.id, t2.val + -> Seq Scan on public.t2 + Output: t2.id, t2.val + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(22 rows) + +-- Pushdown with multiply group keys. +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.id, t1.comment, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id GROUP BY t1.id, t1.comment; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.id, t1.comment, (sum(t1.val)) + -> Finalize HashAggregate + Output: t1.id, t1.comment, sum(t1.val) + Group Key: t1.id, t1.comment + -> Hash Join + Output: t1.id, t1.comment, (PARTIAL sum(t1.val)) + Hash Cond: (t1.id = t2.id) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.id, t1.comment, (PARTIAL sum(t1.val)) + Hash Key: t1.id + -> Streaming Partial HashAggregate + Output: t1.id, t1.comment, PARTIAL sum(t1.val) + Group Key: t1.id, t1.comment + -> Redistribute Motion 3:3 (slice3; segments: 3) + Output: t1.id, t1.val, t1.comment + -> Seq Scan on public.t1 + Output: t1.id, t1.val, t1.comment + -> Hash + Output: t2.id + -> Seq Scan on public.t2 + Output: t2.id + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(24 rows) + +-- Pushdown with multiply join keys. +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.id, t1.comment, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id and t1.val = t2.val GROUP BY t1.id, t1.comment; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.id, t1.comment, (sum(t1.val)) + -> HashAggregate + Output: t1.id, t1.comment, sum(t1.val) + Group Key: t1.id, t1.comment + -> Hash Join + Output: t1.id, t1.val, t1.comment + Hash Cond: ((t1.id = t2.id) AND (t1.val = t2.val)) + -> Seq Scan on public.t1 + Output: t1.id, t1.val, t1.comment + -> Hash + Output: t2.id, t2.val + -> Seq Scan on public.t2 + Output: t2.id, t2.val + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(16 rows) + +-- Test above case with different data distributions +INSERT INTO t1 SELECT i, i, 'asd' FROM generate_series(1, 10000) s(i); +ANALYZE t1; +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.id, t1.comment, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id and t1.val = t2.val GROUP BY t1.id, t1.comment; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.id, t1.comment, (sum(t1.val)) + -> HashAggregate + Output: t1.id, t1.comment, sum(t1.val) + Group Key: t1.id, t1.comment + -> Hash Join + Output: t1.id, t1.val, t1.comment + Hash Cond: ((t1.id = t2.id) AND (t1.val = t2.val)) + -> Seq Scan on public.t1 + Output: t1.id, t1.val, t1.comment + -> Hash + Output: t2.id, t2.val + -> Seq Scan on public.t2 + Output: t2.id, t2.val + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(16 rows) + +DELETE FROM t1; +INSERT INTO t1 SELECT i % 10, 1, 'asd' FROM generate_series(1, 10000) s(i); +ANALYZE t1; +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.id, t1.comment, SUM(t1.val) FROM t1, t2 WHERE t1.id = t2.id and t1.val = t2.val GROUP BY t1.id, t1.comment; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.id, t1.comment, (sum(t1.val)) + -> HashAggregate + Output: t1.id, t1.comment, sum(t1.val) + Group Key: t1.id, t1.comment + -> Hash Join + Output: t1.id, t1.val, t1.comment + Hash Cond: ((t1.id = t2.id) AND (t1.val = t2.val)) + -> Seq Scan on public.t1 + Output: t1.id, t1.val, t1.comment + -> Hash + Output: t2.id, t2.val + -> Seq Scan on public.t2 + Output: t2.id, t2.val + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(16 rows) + +-- Clear tables +DROP TABLE t1, t2; +-- Test case group 3: Pushdown in subquery and group from subquery. +DROP TABLE IF EXISTS part, lineitem; +NOTICE: table "part" does not exist, skipping +NOTICE: table "lineitem" does not exist, skipping +CREATE TABLE part (p_partkey int, p_size int, p_price int); +CREATE TABLE lineitem (l_orderkey int, l_partkey int, l_amount int); +SET enable_nestloop TO off; +SET enable_hashjoin TO on; +SET enable_mergejoin TO off; +SET gp_enable_agg_pushdown TO ON; +-- Pushdown within subquery. +EXPLAIN (VERBOSE on, COSTS off) +SELECT SUM(slp) FROM + (SELECT l_partkey, SUM(p_price) from lineitem, part + WHERE l_partkey = p_partkey AND p_size < 40 + GROUP BY l_partkey + ORDER BY l_partkey + LIMIT 100) temp(lp, slp) + WHERE slp > 10; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Aggregate + Output: sum((sum(part.p_price))) + -> Result + Output: (sum(part.p_price)) + Filter: ((sum(part.p_price)) > 10) + -> Limit + Output: lineitem.l_partkey, (sum(part.p_price)) + -> Gather Motion 3:1 (slice1; segments: 3) + Output: lineitem.l_partkey, (sum(part.p_price)) + Merge Key: lineitem.l_partkey + -> Finalize GroupAggregate + Output: lineitem.l_partkey, sum(part.p_price) + Group Key: lineitem.l_partkey + -> Sort + Output: lineitem.l_partkey, (PARTIAL sum(part.p_price)) + Sort Key: lineitem.l_partkey + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: lineitem.l_partkey, (PARTIAL sum(part.p_price)) + Hash Key: lineitem.l_partkey + -> Streaming Partial HashAggregate + Output: lineitem.l_partkey, PARTIAL sum(part.p_price) + Group Key: lineitem.l_partkey + -> Redistribute Motion 3:3 (slice3; segments: 3) + Output: lineitem.l_partkey, part.p_price + -> Hash Join + Output: lineitem.l_partkey, part.p_price + Hash Cond: (part.p_partkey = lineitem.l_partkey) + -> Seq Scan on public.part + Output: part.p_partkey, part.p_price + Filter: (part.p_size < 40) + -> Hash + Output: lineitem.l_partkey + -> Redistribute Motion 3:3 (slice4; segments: 3) + Output: lineitem.l_partkey + Hash Key: lineitem.l_partkey + -> Seq Scan on public.lineitem + Output: lineitem.l_partkey + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(39 rows) + +-- Group base on subquery. +EXPLAIN (VERBOSE on, COSTS off) +SELECT p_partkey, SUM(l_amount) FROM + part, (SELECT l_partkey, l_amount + 10 + FROM lineitem ORDER BY l_partkey LIMIT 10000) li(l_partkey, l_amount) + WHERE l_partkey = p_partkey + GROUP BY p_partkey; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: part.p_partkey, (sum(((lineitem.l_amount + 10)))) + -> Finalize HashAggregate + Output: part.p_partkey, sum(((lineitem.l_amount + 10))) + Group Key: part.p_partkey + -> Hash Join + Output: part.p_partkey, (PARTIAL sum(((lineitem.l_amount + 10)))) + Hash Cond: (lineitem.l_partkey = part.p_partkey) + -> Redistribute Motion 1:3 (slice2) + Output: lineitem.l_partkey, (PARTIAL sum(((lineitem.l_amount + 10)))) + Hash Key: lineitem.l_partkey + -> Partial GroupAggregate + Output: lineitem.l_partkey, PARTIAL sum(((lineitem.l_amount + 10))) + Group Key: lineitem.l_partkey + -> Limit + Output: lineitem.l_partkey, ((lineitem.l_amount + 10)) + -> Gather Motion 3:1 (slice3; segments: 3) + Output: lineitem.l_partkey, ((lineitem.l_amount + 10)) + Merge Key: lineitem.l_partkey + -> Result + Output: lineitem.l_partkey, (lineitem.l_amount + 10) + -> Sort + Output: lineitem.l_partkey, lineitem.l_amount + Sort Key: lineitem.l_partkey + -> Seq Scan on public.lineitem + Output: lineitem.l_partkey, lineitem.l_amount + -> Hash + Output: part.p_partkey + -> Seq Scan on public.part + Output: part.p_partkey + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(32 rows) + +-- Clear tables +DROP TABLE part, lineitem; +-- Test case group 4: construct grouped join rel from 2 plain rels +DROP TABLE IF EXISTS vendor_pd, customer_pd, nation_pd; +NOTICE: table "vendor_pd" does not exist, skipping +NOTICE: table "customer_pd" does not exist, skipping +NOTICE: table "nation_pd" does not exist, skipping +CREATE TABLE vendor_pd (v_id int, v_name VARCHAR(20)) WITH (APPENDONLY=true, ORIENTATION=column); +CREATE TABLE customer_pd (c_id int, c_v_id int, c_n_id int, c_type int, c_consumption int); +CREATE TABLE nation_pd (n_id int, n_name VARCHAR(20), n_type int, n_population int) WITH (APPENDONLY=true, ORIENTATION=column); +INSERT INTO nation_pd SELECT i, 'abc', 1, 1 from generate_series(1, 100) s(i); +INSERT INTO customer_pd SELECT i, i % 100, i % 100, 1, 100 from generate_series(1, 10000) s(i); +ANALYZE nation_pd, customer_pd; +-- For each vendor, calculate the total consumption of qualified customers +EXPLAIN (VERBOSE on, COSTS off) +SELECT v_id, v_name, SUM(c_consumption) + FROM vendor_pd, customer_pd, nation_pd + WHERE v_id = c_v_id AND c_n_id = n_id AND c_id > n_population + GROUP BY v_id, v_name; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: vendor_pd.v_id, vendor_pd.v_name, (sum(customer_pd.c_consumption)) + -> Finalize HashAggregate + Output: vendor_pd.v_id, vendor_pd.v_name, sum(customer_pd.c_consumption) + Group Key: vendor_pd.v_id, vendor_pd.v_name + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: vendor_pd.v_id, vendor_pd.v_name, (PARTIAL sum(customer_pd.c_consumption)) + Hash Key: vendor_pd.v_id, vendor_pd.v_name + -> Streaming Partial HashAggregate + Output: vendor_pd.v_id, vendor_pd.v_name, PARTIAL sum(customer_pd.c_consumption) + Group Key: vendor_pd.v_id, vendor_pd.v_name + -> Hash Join + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_consumption + Hash Cond: (nation_pd.n_id = customer_pd.c_n_id) + Join Filter: (customer_pd.c_id > nation_pd.n_population) + -> Seq Scan on public.nation_pd + Output: nation_pd.n_id, nation_pd.n_population + -> Hash + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_id, customer_pd.c_n_id, customer_pd.c_consumption + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_id, customer_pd.c_n_id, customer_pd.c_consumption + -> Hash Join + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_id, customer_pd.c_n_id, customer_pd.c_consumption + Hash Cond: (customer_pd.c_v_id = vendor_pd.v_id) + -> Seq Scan on public.customer_pd + Output: customer_pd.c_id, customer_pd.c_v_id, customer_pd.c_n_id, customer_pd.c_consumption + -> Hash + Output: vendor_pd.v_id, vendor_pd.v_name + -> Broadcast Motion 3:3 (slice4; segments: 3) + Output: vendor_pd.v_id, vendor_pd.v_name + -> Seq Scan on public.vendor_pd + Output: vendor_pd.v_id, vendor_pd.v_name + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(34 rows) + +-- For each vendor/c_type/n_type, calculate the total consumption of qualified customers +EXPLAIN (VERBOSE on, COSTS off) +SELECT v_id, c_type, n_type, SUM(c_consumption) + FROM vendor_pd, customer_pd, nation_pd + WHERE v_id = c_v_id AND c_n_id = n_id AND c_id > n_population + GROUP BY v_id, c_type, n_type; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type, (sum(customer_pd.c_consumption)) + -> Finalize GroupAggregate + Output: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type, sum(customer_pd.c_consumption) + Group Key: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type + -> Sort + Output: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type, (PARTIAL sum(customer_pd.c_consumption)) + Sort Key: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type, (PARTIAL sum(customer_pd.c_consumption)) + Hash Key: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type + -> Streaming Partial HashAggregate + Output: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type, PARTIAL sum(customer_pd.c_consumption) + Group Key: vendor_pd.v_id, customer_pd.c_type, nation_pd.n_type + -> Hash Join + Output: vendor_pd.v_id, customer_pd.c_type, customer_pd.c_consumption, nation_pd.n_type + Hash Cond: (nation_pd.n_id = customer_pd.c_n_id) + Join Filter: (customer_pd.c_id > nation_pd.n_population) + -> Seq Scan on public.nation_pd + Output: nation_pd.n_id, nation_pd.n_type, nation_pd.n_population + -> Hash + Output: vendor_pd.v_id, customer_pd.c_id, customer_pd.c_n_id, customer_pd.c_type, customer_pd.c_consumption + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: vendor_pd.v_id, customer_pd.c_id, customer_pd.c_n_id, customer_pd.c_type, customer_pd.c_consumption + -> Hash Join + Output: vendor_pd.v_id, customer_pd.c_id, customer_pd.c_n_id, customer_pd.c_type, customer_pd.c_consumption + Hash Cond: (customer_pd.c_v_id = vendor_pd.v_id) + -> Seq Scan on public.customer_pd + Output: customer_pd.c_id, customer_pd.c_v_id, customer_pd.c_n_id, customer_pd.c_type, customer_pd.c_consumption + -> Hash + Output: vendor_pd.v_id + -> Broadcast Motion 3:3 (slice4; segments: 3) + Output: vendor_pd.v_id + -> Seq Scan on public.vendor_pd + Output: vendor_pd.v_id + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(37 rows) + +-- For each vendor/n_type, calculate the total consumption of customers from nation with condition. +EXPLAIN (VERBOSE on, COSTS off) +SELECT v_id, v_name, n_type, SUM(c_consumption) + FROM vendor_pd, customer_pd, nation_pd + WHERE v_id = c_v_id AND c_n_id = n_id AND n_population > 100 + GROUP BY v_id, v_name, n_type; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: vendor_pd.v_id, vendor_pd.v_name, nation_pd.n_type, sum(customer_pd.c_consumption) + Group Key: vendor_pd.v_id, vendor_pd.v_name, nation_pd.n_type + -> Sort + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_consumption, nation_pd.n_type + Sort Key: vendor_pd.v_id, vendor_pd.v_name, nation_pd.n_type + -> Hash Join + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_consumption, nation_pd.n_type + Hash Cond: (customer_pd.c_n_id = nation_pd.n_id) + -> Gather Motion 3:1 (slice1; segments: 3) + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_n_id, customer_pd.c_consumption + -> Hash Join + Output: vendor_pd.v_id, vendor_pd.v_name, customer_pd.c_n_id, customer_pd.c_consumption + Hash Cond: (customer_pd.c_v_id = vendor_pd.v_id) + -> Seq Scan on public.customer_pd + Output: customer_pd.c_v_id, customer_pd.c_n_id, customer_pd.c_consumption + -> Hash + Output: vendor_pd.v_id, vendor_pd.v_name + -> Broadcast Motion 3:3 (slice2; segments: 3) + Output: vendor_pd.v_id, vendor_pd.v_name + -> Seq Scan on public.vendor_pd + Output: vendor_pd.v_id, vendor_pd.v_name + -> Hash + Output: nation_pd.n_id, nation_pd.n_type + -> Gather Motion 3:1 (slice3; segments: 3) + Output: nation_pd.n_id, nation_pd.n_type + -> Seq Scan on public.nation_pd + Output: nation_pd.n_id, nation_pd.n_type + Filter: (nation_pd.n_population > 100) + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(31 rows) + +-- Clear tables +DROP TABLE vendor_pd, customer_pd, nation_pd; +-- Test case group 4: OLAP-like cases +DROP TABLE IF EXISTS fact, dim; +NOTICE: table "fact" does not exist, skipping +NOTICE: table "dim" does not exist, skipping +CREATE TABLE fact (id int, did int, fact_time int, val int) WITH (APPENDONLY=true, ORIENTATION=column); +CREATE TABLE dim (did int, proj_name varchar(20), brand int, model int); +INSERT INTO dim SELECT i % 100, 1, 1 FROM generate_series(1, 100) s(i); +INSERT INTO fact SELECT i % 10, i % 100, 30, 1 FROM generate_series(1, 10000) s(i); +ANALYZE dim, fact; +-- Test sum fact vals group by dim column +EXPLAIN (VERBOSE on, COSTS off) +SELECT dim.did, sum(val) + FROM fact JOIN dim ON fact.did = dim.did + WHERE fact_time > 10 AND fact_time < 2000 + GROUP BY dim.did; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: dim.did, (sum(fact.val)) + -> Finalize HashAggregate + Output: dim.did, sum(fact.val) + Group Key: dim.did + -> Hash Join + Output: dim.did, (PARTIAL sum(fact.val)) + Hash Cond: (fact.did = dim.did) + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: fact.did, (PARTIAL sum(fact.val)) + Hash Key: fact.did + -> Streaming Partial HashAggregate + Output: fact.did, PARTIAL sum(fact.val) + Group Key: fact.did + -> Seq Scan on public.fact + Output: fact.did, fact.val + Filter: ((fact.fact_time > 10) AND (fact.fact_time < 2000)) + -> Hash + Output: dim.did + -> Seq Scan on public.dim + Output: dim.did + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(23 rows) + +EXPLAIN (VERBOSE on, COSTS off) +SELECT dim.proj_name, sum(val) + FROM fact JOIN dim ON fact.did = dim.did + WHERE fact_time > 10 AND fact_time < 2000 + GROUP BY dim.proj_name; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: dim.proj_name, (sum(fact.val)) + -> Finalize HashAggregate + Output: dim.proj_name, sum(fact.val) + Group Key: dim.proj_name + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: dim.proj_name, (PARTIAL sum(fact.val)) + Hash Key: dim.proj_name + -> Streaming Partial HashAggregate + Output: dim.proj_name, PARTIAL sum(fact.val) + Group Key: dim.proj_name + -> Hash Join + Output: fact.val, dim.proj_name + Hash Cond: (fact.did = dim.did) + -> Seq Scan on public.fact + Output: fact.did, fact.val + Filter: ((fact.fact_time > 10) AND (fact.fact_time < 2000)) + -> Hash + Output: dim.did, dim.proj_name + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: dim.did, dim.proj_name + -> Seq Scan on public.dim + Output: dim.did, dim.proj_name + Settings: enable_hashjoin = 'on', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(25 rows) + +-- Clear tables +DROP TABLE dim, fact; +-- Test case group 5: partition table and inherit table +SET enable_incremental_sort TO off; +DROP TABLE IF EXISTS pagg_pd; +NOTICE: table "pagg_pd" does not exist, skipping +CREATE TABLE pagg_pd (a int, b int, c text, d int) PARTITION BY LIST(c); +CREATE TABLE pagg_pd_p1 PARTITION OF pagg_pd FOR VALUES IN ('0000', '0001', '0002', '0003', '0004'); +NOTICE: table has parent, setting distribution columns to match parent table +CREATE TABLE pagg_pd_p2 PARTITION OF pagg_pd FOR VALUES IN ('0005', '0006', '0007', '0008'); +NOTICE: table has parent, setting distribution columns to match parent table +CREATE TABLE pagg_pd_p3 PARTITION OF pagg_pd FOR VALUES IN ('0009', '0010', '0011'); +NOTICE: table has parent, setting distribution columns to match parent table +INSERT INTO pagg_pd SELECT i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 FROM generate_series(0, 2999) i; +ANALYZE pagg_pd; +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.c, sum(t1.a) + FROM pagg_pd t1 JOIN pagg_pd t2 ON t1.c < t2.c + GROUP BY t1.c + ORDER BY 1, 2; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.c, (sum(t1.a)) + Merge Key: t1.c, (sum(t1.a)) + -> Sort + Output: t1.c, (sum(t1.a)) + Sort Key: t1.c, (sum(t1.a)) + -> Finalize HashAggregate + Output: t1.c, sum(t1.a) + Group Key: t1.c + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.c, (PARTIAL sum(t1.a)) + Hash Key: t1.c + -> Streaming Partial HashAggregate + Output: t1.c, PARTIAL sum(t1.a) + Group Key: t1.c + -> Nested Loop + Output: t1.a, t1.c + Join Filter: (t1.c < t2.c) + -> Dynamic Seq Scan on public.pagg_pd t1 + Output: t1.a, t1.c + Number of partitions to scan: 3 (out of 3) + -> Materialize + Output: t2.c + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: t2.c + -> Dynamic Seq Scan on public.pagg_pd t2 + Output: t2.c + Number of partitions to scan: 3 (out of 3) + Settings: enable_hashjoin = 'on', enable_incremental_sort = 'off', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(30 rows) + +SELECT t1.c, sum(t1.a) + FROM pagg_pd t1 JOIN pagg_pd t2 ON t1.c < t2.c + GROUP BY t1.c + ORDER BY 1, 2; + c | sum +------+--------- + 0000 | 5500000 + 0001 | 5625000 + 0002 | 5625000 + 0003 | 5500000 + 0004 | 3500000 + 0005 | 3375000 + 0006 | 3125000 + 0007 | 2750000 + 0008 | 1500000 + 0009 | 1125000 + 0010 | 625000 +(11 rows) + +DROP TABLE pagg_pd; +CREATE TABLE pagg_pd_p (a int, b int); +DROP TABLE IF EXISTS pagg_pd, pagg_pd_p; +NOTICE: table "pagg_pd" does not exist, skipping +CREATE TABLE pagg_pd_p (a int, b int, c text) PARTITION BY LIST(c); +DROP TABLE IF EXISTS pagg_pd, pagg_pd_p; +NOTICE: table "pagg_pd" does not exist, skipping +CREATE TABLE pagg_pd_p (a int, b int); +CREATE TABLE pagg_pd (c text, d int) inherits (pagg_pd_p); +NOTICE: table has parent, setting distribution columns to match parent table +INSERT INTO pagg_pd SELECT i % 20, i % 30, to_char(i % 12, 'FM0000'), i % 30 FROM generate_series(0, 2999) i; +ANALYZE pagg_pd; +EXPLAIN (VERBOSE on, COSTS off) +SELECT t1.c, sum(t1.a) + FROM pagg_pd t1 JOIN pagg_pd t2 ON t1.c < t2.c + GROUP BY t1.c + ORDER BY 1, 2; + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + Output: t1.c, (sum(t1.a)) + Merge Key: t1.c, (sum(t1.a)) + -> Sort + Output: t1.c, (sum(t1.a)) + Sort Key: t1.c, (sum(t1.a)) + -> Finalize HashAggregate + Output: t1.c, sum(t1.a) + Group Key: t1.c + -> Redistribute Motion 3:3 (slice2; segments: 3) + Output: t1.c, (PARTIAL sum(t1.a)) + Hash Key: t1.c + -> Streaming Partial HashAggregate + Output: t1.c, PARTIAL sum(t1.a) + Group Key: t1.c + -> Nested Loop + Output: t1.a, t1.c + Join Filter: (t1.c < t2.c) + -> Seq Scan on public.pagg_pd t1 + Output: t1.a, t1.c + -> Materialize + Output: t2.c + -> Broadcast Motion 3:3 (slice3; segments: 3) + Output: t2.c + -> Seq Scan on public.pagg_pd t2 + Output: t2.c + Settings: enable_hashjoin = 'on', enable_incremental_sort = 'off', enable_mergejoin = 'off', enable_nestloop = 'off', enable_parallel = 'off', enable_seqscan = 'on', gp_enable_agg_pushdown = 'on', optimizer = 'on' + Optimizer: GPORCA +(28 rows) + +SELECT t1.c, sum(t1.a) + FROM pagg_pd t1 JOIN pagg_pd t2 ON t1.c < t2.c + GROUP BY t1.c + ORDER BY 1, 2; + c | sum +------+--------- + 0000 | 5500000 + 0001 | 5625000 + 0002 | 5625000 + 0003 | 5500000 + 0004 | 3500000 + 0005 | 3375000 + 0006 | 3125000 + 0007 | 2750000 + 0008 | 1500000 + 0009 | 1125000 + 0010 | 625000 +(11 rows) + +DROP TABLE pagg_pd, pagg_pd_p; +RESET enable_incremental_sort; +-- Clear settings +RESET optimizer_force_multistage_agg; +SET gp_enable_agg_pushdown TO off; +SET enable_seqscan TO on; +SET enable_nestloop TO on; +SET enable_hashjoin TO on; +SET enable_mergejoin TO on; diff --git a/src/test/regress/sql/agg_pushdown.sql b/src/test/regress/sql/agg_pushdown.sql index 3378f8b64e6..3bdf1bd7571 100644 --- a/src/test/regress/sql/agg_pushdown.sql +++ b/src/test/regress/sql/agg_pushdown.sql @@ -1,28 +1,24 @@ --- disable ORCA -SET optimizer TO off; - -- Test case group 1: basic functions CREATE TABLE agg_pushdown_parent ( - i int primary key, - x int); + i int, + x int); CREATE TABLE agg_pushdown_child1 ( - j int, - parent int, - v double precision, - PRIMARY KEY (j, parent)); - -CREATE INDEX ON agg_pushdown_child1(parent); + j int, + parent int, + v double precision + ) + DISTRIBUTED BY (j, parent); CREATE TABLE agg_pushdown_child2 ( - k int, - parent int, - v double precision, - PRIMARY KEY (k, parent));; + k int, + parent int, + v double precision) + DISTRIBUTED BY (k, parent); INSERT INTO agg_pushdown_parent(i, x) SELECT n, n -FROM generate_series(0, 7) AS s(n); +FROM generate_series(0, 10) AS s(n); INSERT INTO agg_pushdown_child1(j, parent, v) SELECT 128 * i + n, i, random() @@ -36,29 +32,12 @@ ANALYZE agg_pushdown_parent; ANALYZE agg_pushdown_child1; ANALYZE agg_pushdown_child2; -SET enable_nestloop TO on; -SET enable_hashjoin TO off; -SET enable_mergejoin TO off; - -- Perform scan of a table, aggregate the result, join it to the other table -- and finalize the aggregation. --- --- In addition, check that functionally dependent column "p.x" can be --- referenced by SELECT although GROUP BY references "p.i". -SET gp_enable_agg_pushdown TO off; -EXPLAIN (VERBOSE on, COSTS off) -SELECT p.x, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 -AS c1 ON c1.parent = p.i GROUP BY p.i; - -SET gp_enable_agg_pushdown TO on; -EXPLAIN (VERBOSE on, COSTS off) -SELECT p.x, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 -AS c1 ON c1.parent = p.i GROUP BY p.i; - --- The same for hash join. +SET enable_mergejoin TO off; SET enable_nestloop TO off; SET enable_hashjoin TO on; - +SET gp_enable_agg_pushdown TO on; EXPLAIN (VERBOSE on, COSTS off) SELECT p.i, avg(c1.v) FROM agg_pushdown_parent AS p JOIN agg_pushdown_child1 AS c1 ON c1.parent = p.i GROUP BY p.i; @@ -160,6 +139,7 @@ SET enable_hashjoin TO on; SET enable_mergejoin TO off; SET gp_enable_agg_pushdown TO ON; +SET optimizer_force_multistage_agg to ON; -- Join key and group key are the same. EXPLAIN (VERBOSE on, COSTS off) @@ -235,7 +215,7 @@ DROP TABLE part, lineitem; -- Test case group 4: construct grouped join rel from 2 plain rels DROP TABLE IF EXISTS vendor_pd, customer_pd, nation_pd; CREATE TABLE vendor_pd (v_id int, v_name VARCHAR(20)) WITH (APPENDONLY=true, ORIENTATION=column); -CREATE TABLE customer_pd (c_id int primary key, c_v_id int, c_n_id int, c_type int, c_consumption int); +CREATE TABLE customer_pd (c_id int, c_v_id int, c_n_id int, c_type int, c_consumption int); CREATE TABLE nation_pd (n_id int, n_name VARCHAR(20), n_type int, n_population int) WITH (APPENDONLY=true, ORIENTATION=column); INSERT INTO nation_pd SELECT i, 'abc', 1, 1 from generate_series(1, 100) s(i); @@ -316,11 +296,9 @@ SELECT t1.c, sum(t1.a) DROP TABLE pagg_pd; CREATE TABLE pagg_pd_p (a int, b int); -CREATE TABLE pagg_pd (c text, d int) inherits (pagg_pd_p) PARTITION BY LIST(c); DROP TABLE IF EXISTS pagg_pd, pagg_pd_p; CREATE TABLE pagg_pd_p (a int, b int, c text) PARTITION BY LIST(c); -CREATE TABLE pagg_pd (d int) inherits (pagg_pd_p); DROP TABLE IF EXISTS pagg_pd, pagg_pd_p; CREATE TABLE pagg_pd_p (a int, b int); @@ -343,8 +321,7 @@ DROP TABLE pagg_pd, pagg_pd_p; RESET enable_incremental_sort; -- Clear settings -SET optimizer TO default; - +RESET optimizer_force_multistage_agg; SET gp_enable_agg_pushdown TO off; SET enable_seqscan TO on;