Skip to content

Commit 84b1e76

Browse files
committed
feat: pushdown bloom filter to pax table am
This optimization pushes down Bloom Filter conditions for runtime filters to the Pax Table AM layer. By applying the filter earlier than the SeqNext() function, it eliminates the overhead of converting data from columnar format to TableTupleSlot, resulting in faster query execution
1 parent 1c45f0c commit 84b1e76

File tree

14 files changed

+822
-57
lines changed

14 files changed

+822
-57
lines changed

contrib/pax_storage/src/cpp/access/pax_scanner.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns(
372372
&& !(flags & SO_TYPE_VECTOR)
373373
#endif
374374
) {
375-
filter->InitRowFilter(rel, ps, filter->GetColumnProjection());
375+
filter->InitRowFilter(rel, ps, filter->GetColumnProjection(), key, nkeys);
376376
}
377377
}
378378
return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags,

contrib/pax_storage/src/cpp/comm/cbdb_api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ extern "C" {
8080
#include "commands/progress.h"
8181
#include "commands/tablecmds.h"
8282
#include "funcapi.h"
83+
#include "lib/bloomfilter.h"
8384
#include "miscadmin.h"
8485
#include "nodes/bitmapset.h"
8586
#include "nodes/execnodes.h"

contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,8 @@ namespace pax {
4343

4444
PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {}
4545

46-
void PaxFilter::InitSparseFilter(Relation relation, List *quals,
47-
ScanKey key, int nkeys,
48-
bool allow_fallback_to_pg) {
46+
void PaxFilter::InitSparseFilter(Relation relation, List *quals, ScanKey key,
47+
int nkeys, bool allow_fallback_to_pg) {
4948
Assert(!sparse_filter_);
5049
sparse_filter_ =
5150
std::make_shared<PaxSparseFilter>(relation, allow_fallback_to_pg);
@@ -123,10 +122,11 @@ void PaxFilter::SetColumnProjection(const std::vector<int> &cols, int natts) {
123122
}
124123

125124
void PaxFilter::InitRowFilter(Relation relation, PlanState *ps,
126-
const std::vector<bool> &projection) {
125+
const std::vector<bool> &projection, ScanKey key,
126+
int nkeys) {
127127
Assert(!row_filter_);
128128
row_filter_ = std::make_shared<PaxRowFilter>();
129-
if (!row_filter_->Initialize(relation, ps, projection)) {
129+
if (!row_filter_->Initialize(relation, ps, projection, key, nkeys)) {
130130
row_filter_ = nullptr;
131131
}
132132
}

contrib/pax_storage/src/cpp/storage/filter/pax_filter.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ class PaxFilter final {
7373

7474
// The row filter
7575
void InitRowFilter(Relation relation, PlanState *ps,
76-
const std::vector<bool> &projection);
76+
const std::vector<bool> &projection, ScanKey key,
77+
int nkeys);
7778
std::shared_ptr<PaxRowFilter> GetRowFilter();
7879

7980
void LogStatistics() const;

contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc

Lines changed: 60 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
*/
2727

2828
#include "storage/filter/pax_row_filter.h"
29+
2930
#include "comm/cbdb_wrappers.h"
3031

3132
namespace paxc {
@@ -45,19 +46,57 @@ static inline void FindAttrsInQual(Node *qual, bool *proj, int ncol,
4546
}
4647

4748
static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
48-
pax::ExecutionFilterContext *ctx) {
49+
pax::ExecutionFilterContext *ctx,
50+
ScanKey key, int nkeys) {
4951
List *qual = ps->plan->qual;
5052
List **qual_list;
5153
ListCell *lc;
5254
bool *proj;
5355
int *qual_atts;
5456
int natts = RelationGetNumberOfAttributes(rel);
5557

56-
if (!qual || !IsA(qual, List)) return false;
58+
int ret = false;
59+
60+
if (key && nkeys > 0) {
61+
// We don't need to support DynamicSeqScanState here. Even if the plan uses
62+
// DynamicSeqScanNode for partitioned tables, it's always a regular SeqScan
63+
// on a single table. So this will always be a SeqScanState.
64+
if (nodeTag(ps) != T_SeqScanState) {
65+
elog(ERROR, "runtime filter only support seqscan state, but got %d",
66+
nodeTag(ps));
67+
}
68+
69+
for (int i = 0; i < nkeys; i++) {
70+
if (key[i].sk_flags & SK_BLOOM_FILTER) {
71+
ctx->runtime_bloom_keys.emplace_back(key[i]);
72+
ret = true;
73+
}
74+
}
75+
76+
// register bloom filters
77+
for (int i = 0; i < (int)ctx->runtime_bloom_keys.size(); ++i) {
78+
pax::ExecutionFilterContext::FilterNode node;
79+
node.kind = pax::ExecutionFilterContext::FilterKind::kBloom;
80+
node.index = i;
81+
ctx->filter_nodes.emplace_back(node);
82+
}
83+
84+
if (ps->instrument) {
85+
ps->instrument->prf_work = true;
86+
}
87+
ctx->ps = ps;
88+
89+
// set filter_in_seqscan to false, so that the filter will not be executed
90+
// in SeqNext(), but will be executed in pax_row_filter
91+
auto seqscan = (SeqScanState *)ps;
92+
seqscan->filter_in_seqscan = false;
93+
}
94+
95+
if (!qual || !IsA(qual, List)) return ret;
5796

5897
if (list_length(qual) == 1 && IsA(linitial(qual), BoolExpr)) {
5998
auto boolexpr = (BoolExpr *)linitial(qual);
60-
if (boolexpr->boolop != AND_EXPR) return false;
99+
if (boolexpr->boolop != AND_EXPR) return ret;
61100
qual = boolexpr->args;
62101
}
63102
Assert(IsA(qual, List));
@@ -98,6 +137,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
98137
if (!qual_list[i]) continue;
99138
ctx->estates[k] = ExecInitQual(qual_list[i], ps);
100139
ctx->attnos[k] = i;
140+
// register expr filter node (by index k)
141+
pax::ExecutionFilterContext::FilterNode node;
142+
node.kind = pax::ExecutionFilterContext::FilterKind::kExpr;
143+
node.index = k;
144+
ctx->filter_nodes.emplace_back(node);
101145
list_free(qual_list[i]);
102146
k++;
103147
}
@@ -108,7 +152,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
108152
list_free(qual_list[0]);
109153
}
110154

111-
Assert(ctx->size > 0 || ctx->estate_final);
155+
Assert(ctx->size > 0 || ctx->estate_final ||
156+
ctx->runtime_bloom_keys.size() > 0);
157+
158+
// remove qual from plan state, so that the qual will not be executed in
159+
// executor, but will be executed in pax_row_filter
112160
ps->qual = nullptr;
113161

114162
pfree(proj);
@@ -117,20 +165,19 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps,
117165
return true;
118166
}
119167

120-
} // namespace paxc
121-
168+
} // namespace paxc
122169

123170
namespace pax {
124171

125172
PaxRowFilter::PaxRowFilter() {}
126173

127-
bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector<bool> &projection) {
174+
bool PaxRowFilter::Initialize(Relation rel, PlanState *ps,
175+
const std::vector<bool> &projection, ScanKey key,
176+
int nkeys) {
128177
bool ok = false;
129-
178+
130179
CBDB_WRAP_START;
131-
{
132-
ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_);
133-
}
180+
{ ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_, key, nkeys); }
134181
CBDB_WRAP_END;
135182

136183
if (ok) {
@@ -140,7 +187,8 @@ bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector<boo
140187
return ok;
141188
}
142189

143-
void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector<bool> &projection) {
190+
void PaxRowFilter::FillRemainingColumns(Relation rel,
191+
const std::vector<bool> &projection) {
144192
int natts = RelationGetNumberOfAttributes(rel);
145193
auto proj_len = projection.size();
146194
std::vector<bool> atts(natts);
@@ -162,5 +210,4 @@ void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector<bool> &p
162210
}
163211
}
164212

165-
166213
} // namespace pax

contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,38 @@ struct ExecutionFilterContext {
4242
ExprState *estate_final = nullptr;
4343
ExprState **estates;
4444
AttrNumber *attnos;
45+
PlanState *ps;
4546
int size = 0;
4647
inline bool HasExecutionFilter() const { return size > 0 || estate_final; }
48+
49+
// runtime bloom filters pushed down via SeqScanState->filters
50+
// (SK_BLOOM_FILTER)
51+
std::vector<ScanKeyData> runtime_bloom_keys;
52+
53+
// unified filter nodes (expr + bloom) for execution ordering
54+
enum class FilterKind { kExpr, kBloom };
55+
struct FilterNode {
56+
FilterKind kind;
57+
int index; // index in estates (for kExpr) or in runtime_bloom_keys (for
58+
// kBloom)
59+
uint64 tested = 0; // number of rows tested during sampling
60+
uint64 passed = 0; // number of rows passed during sampling
61+
double score = 1.0; // pass rate used for ordering (lower is better)
62+
};
63+
std::vector<FilterNode> filter_nodes;
64+
65+
// sampling control to determine filter order
66+
bool sampling = true;
67+
uint64 sample_target = 65536; // number of rows for sampling phase
68+
uint64 sample_rows = 0; // rows seen in sampling
4769
};
4870

4971
class PaxRowFilter final {
50-
public:
72+
public:
5173
PaxRowFilter();
5274

5375
bool Initialize(Relation rel, PlanState *ps,
54-
const std::vector<bool> &projection);
76+
const std::vector<bool> &projection, ScanKey key, int nkeys);
5577

5678
inline const ExecutionFilterContext *GetExecutionFilterContext() const {
5779
return &efctx_;
@@ -60,17 +82,16 @@ class PaxRowFilter final {
6082
inline const std::vector<AttrNumber> &GetRemainingColumns() const {
6183
return remaining_attnos_;
6284
}
63-
64-
private:
85+
86+
private:
6587
void FillRemainingColumns(Relation rel, const std::vector<bool> &projection);
6688

67-
private:
89+
private:
6890
ExecutionFilterContext efctx_;
6991
// all selected columns - single row filting columns
7092
// before running final cross columns expression filtering, the remaining
7193
// columns should be filled.
7294
std::vector<AttrNumber> remaining_attnos_;
7395
};
7496

75-
76-
};
97+
}; // namespace pax

contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@ void PaxSparseFilter::Initialize(List *quals, ScanKey key, int nkeys) {
5050

5151
// walk scan key and only support min/max filter now
5252
for (int i = 0; i < nkeys; i++) {
53-
// TODO: support bloom filter in PaxFilter
54-
// but now just skip it, SeqNext() will check bloom filter in PassByBloomFilter()
53+
// Now just skip bloom filter here, it will be handled in PaxRowFilter.
5554
if (key[i].sk_flags & SK_BLOOM_FILTER) {
5655
continue;
5756
}
5857

58+
// PushdownRuntimeFilter only support BTGreaterEqualStrategyNumber and
59+
// BTLessEqualStrategyNumber. If the strategy is not supported, skip orther
60+
// strategies.
5961
if (key[i].sk_strategy != BTGreaterEqualStrategyNumber &&
6062
key[i].sk_strategy != BTLessEqualStrategyNumber) {
6163
continue;

0 commit comments

Comments
 (0)