Skip to content

POC: Test DataFusion with experimental Parquet Filter Pushdown (try 2) #16562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,12 +521,12 @@ config_namespace! {

/// (reading) If true, filter expressions are be applied during the parquet decoding operation to
/// reduce the number of rows decoded. This optimization is sometimes called "late materialization".
pub pushdown_filters: bool, default = false
pub pushdown_filters: bool, default = true

/// (reading) If true, filter expressions evaluated during the parquet decoding operation
/// will be reordered heuristically to minimize the cost of evaluation. If false,
/// the filters are applied in the same order as written in the query
pub reorder_filters: bool, default = false
pub reorder_filters: bool, default = true

/// (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`,
/// and `Binary/BinaryLarge` with `BinaryView`.
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/dataframe/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ mod tests {
let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = pretty::pretty_format_batches(&plan)?.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"));
assert!(
formatted.contains("projection=[bool_col, int_col], file_type=parquet"),
"formated:\n {formatted}"
);

Ok(())
}
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/view_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ mod tests {
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
assert!(formatted.contains("FilterExec: id@0 = 1"));
assert!(
formatted.contains("file_type=parquet, predicate=id@0 = 1"),
"formatted:\n{formatted}",
);
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ async fn parquet_explain_analyze() {
.to_string();

// should contain aggregated stats
assert_contains!(&formatted, "output_rows=8");
assert_contains!(&formatted, "output_rows=5");
assert_contains!(&formatted, "row_groups_matched_bloom_filter=0");
assert_contains!(&formatted, "row_groups_pruned_bloom_filter=0");
assert_contains!(&formatted, "row_groups_matched_statistics=1");
Expand Down
35 changes: 7 additions & 28 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -605,35 +605,14 @@ explain SELECT int_col FROM table2 WHERE string_col != 'foo';
----
physical_plan
01)┌───────────────────────────┐
02)│ CoalesceBatchesExec
02)│ DataSourceExec
03)│ -------------------- │
04)│ target_batch_size: │
05)│ 8192 │
06)└─────────────┬─────────────┘
07)┌─────────────┴─────────────┐
08)│ FilterExec │
09)│ -------------------- │
10)│ predicate: │
11)│ string_col != foo │
12)└─────────────┬─────────────┘
13)┌─────────────┴─────────────┐
14)│ RepartitionExec │
15)│ -------------------- │
16)│ partition_count(in->out): │
17)│ 1 -> 4 │
18)│ │
19)│ partitioning_scheme: │
20)│ RoundRobinBatch(4) │
21)└─────────────┬─────────────┘
22)┌─────────────┴─────────────┐
23)│ DataSourceExec │
24)│ -------------------- │
25)│ files: 1 │
26)│ format: parquet │
27)│ │
28)│ predicate: │
29)│ string_col != foo │
30)└───────────────────────────┘
04)│ files: 1 │
05)│ format: parquet │
06)│ │
07)│ predicate: │
08)│ string_col != foo │
09)└───────────────────────────┘

# Query with filter on memory
query TT
Expand Down
8 changes: 4 additions & 4 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,8 @@ datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2
datafusion.execution.parquet.maximum_parallel_row_group_writers 1
datafusion.execution.parquet.metadata_size_hint NULL
datafusion.execution.parquet.pruning true
datafusion.execution.parquet.pushdown_filters false
datafusion.execution.parquet.reorder_filters false
datafusion.execution.parquet.pushdown_filters true
datafusion.execution.parquet.reorder_filters true
datafusion.execution.parquet.schema_force_view_types true
datafusion.execution.parquet.skip_arrow_metadata false
datafusion.execution.parquet.skip_metadata true
Expand Down Expand Up @@ -359,8 +359,8 @@ datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writi
datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame.
datafusion.execution.parquet.metadata_size_hint NULL (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer
datafusion.execution.parquet.pruning true (reading) If true, the parquet reader attempts to skip entire row groups based on the predicate in the query and the metadata (min/max values) stored in the parquet file
datafusion.execution.parquet.pushdown_filters false (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization".
datafusion.execution.parquet.reorder_filters false (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query
datafusion.execution.parquet.pushdown_filters true (reading) If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded. This optimization is sometimes called "late materialization".
datafusion.execution.parquet.reorder_filters true (reading) If true, filter expressions evaluated during the parquet decoding operation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query
datafusion.execution.parquet.schema_force_view_types true (reading) If true, parquet reader will read columns of `Utf8/Utf8Large` with `Utf8View`, and `Binary/BinaryLarge` with `BinaryView`.
datafusion.execution.parquet.skip_arrow_metadata false (writing) Skip encoding the embedded arrow metadata in the KV_meta This is analogous to the `ArrowWriterOptions::with_skip_arrow_metadata`. Refer to <https://docs.rs/parquet/53.3.0/parquet/arrow/arrow_writer/struct.ArrowWriterOptions.html#method.with_skip_arrow_metadata>
datafusion.execution.parquet.skip_metadata true (reading) If true, the parquet reader skip the optional embedded metadata that may be in the file Schema. This setting can help avoid schema conflicts when querying multiple parquet files with schemas containing compatible types but different metadata
Expand Down
24 changes: 4 additions & 20 deletions datafusion/sqllogictest/test_files/parquet.slt
Original file line number Diff line number Diff line change
Expand Up @@ -455,11 +455,7 @@ EXPLAIN
logical_plan
01)Filter: CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%") AND CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")
02)--TableScan: binary_as_string_default projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[CAST(binary_as_string_default.binary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.largebinary_col AS Utf8View) LIKE Utf8View("%a%"), CAST(binary_as_string_default.binaryview_col AS Utf8View) LIKE Utf8View("%a%")]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a%
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a%
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=CAST(binary_col@0 AS Utf8View) LIKE %a% AND CAST(largebinary_col@1 AS Utf8View) LIKE %a% AND CAST(binaryview_col@2 AS Utf8View) LIKE %a%


statement ok
Expand Down Expand Up @@ -503,11 +499,7 @@ EXPLAIN
logical_plan
01)Filter: binary_as_string_option.binary_col LIKE Utf8View("%a%") AND binary_as_string_option.largebinary_col LIKE Utf8View("%a%") AND binary_as_string_option.binaryview_col LIKE Utf8View("%a%")
02)--TableScan: binary_as_string_option projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_option.binary_col LIKE Utf8View("%a%"), binary_as_string_option.largebinary_col LIKE Utf8View("%a%"), binary_as_string_option.binaryview_col LIKE Utf8View("%a%")]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a%
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a%
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a%


statement ok
Expand Down Expand Up @@ -554,11 +546,7 @@ EXPLAIN
logical_plan
01)Filter: binary_as_string_both.binary_col LIKE Utf8View("%a%") AND binary_as_string_both.largebinary_col LIKE Utf8View("%a%") AND binary_as_string_both.binaryview_col LIKE Utf8View("%a%")
02)--TableScan: binary_as_string_both projection=[binary_col, largebinary_col, binaryview_col], partial_filters=[binary_as_string_both.binary_col LIKE Utf8View("%a%"), binary_as_string_both.largebinary_col LIKE Utf8View("%a%"), binary_as_string_both.binaryview_col LIKE Utf8View("%a%")]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a%
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a%
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/binary_as_string.parquet]]}, projection=[binary_col, largebinary_col, binaryview_col], file_type=parquet, predicate=binary_col@0 LIKE %a% AND largebinary_col@1 LIKE %a% AND binaryview_col@2 LIKE %a%


statement ok
Expand Down Expand Up @@ -669,11 +657,7 @@ explain select * from foo where starts_with(column1, 'f');
logical_plan
01)Filter: foo.column1 LIKE Utf8View("f%")
02)--TableScan: foo projection=[column1], partial_filters=[foo.column1 LIKE Utf8View("f%")]
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192
02)--FilterExec: column1@0 LIKE f%
03)----RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[]
physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/foo.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 LIKE f%, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= g AND f <= column1_max@1, required_guarantees=[]

statement ok
drop table foo
Expand Down
21 changes: 6 additions & 15 deletions datafusion/sqllogictest/test_files/parquet_statistics.slt
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,8 @@ query TT
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
----
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(2), Bytes=Inexact(44), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]]
02)--FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(44), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(173), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)]
05), statistics=[Rows=Inexact(5), Bytes=Inexact(173), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]]
01)DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)]
02), statistics=[Rows=Inexact(5), Bytes=Inexact(173), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]]

# cleanup
statement ok
Expand All @@ -86,11 +83,8 @@ query TT
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
----
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Inexact(2), Bytes=Inexact(44), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]]
02)--FilterExec: column1@0 = 1, statistics=[Rows=Inexact(2), Bytes=Inexact(44), [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)) Null=Inexact(0))]]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Inexact(5), Bytes=Inexact(173), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)]
05), statistics=[Rows=Inexact(5), Bytes=Inexact(173), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]]
01)DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)]
02), statistics=[Rows=Inexact(5), Bytes=Inexact(173), [(Col[0]: Min=Inexact(Int64(1)) Max=Inexact(Int64(4)) Null=Inexact(0))]]

# cleanup
statement ok
Expand All @@ -114,11 +108,8 @@ query TT
EXPLAIN SELECT * FROM test_table WHERE column1 = 1;
----
physical_plan
01)CoalesceBatchesExec: target_batch_size=8192, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
02)--FilterExec: column1@0 = 1, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]: Min=Exact(Int64(1)) Max=Exact(Int64(1)))]]
03)----RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
04)------DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)]
05), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]
01)DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/0.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_statistics/test_table/1.parquet]]}, projection=[column1], file_type=parquet, predicate=column1@0 = 1, pruning_predicate=column1_null_count@2 != row_count@3 AND column1_min@0 <= 1 AND 1 <= column1_max@1, required_guarantees=[column1 in (1)]
02), statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:)]]

# cleanup
statement ok
Expand Down
Loading
Loading