-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Closed
Labels
bugSomething isn't workingSomething isn't workingperformanceMake DataFusion fasterMake DataFusion faster
Description
Describe the bug
Query 1:
SELECT col1, col2, count(*)
FROM test_data
WHERE (col1 = 'category_1' AND col2 = 'type_1')
OR (col1 = 'category_2' AND col2 = 'type_2')
GROUP BY col1, col2
DataSourceExec: file_groups={1 group: [[var/folders/6z/kt4t6jkd4ss1_fj16dv_05xc0000gn/T/.tmpl2ljbz/bloom_filter_data.parquet]]}, projection=[col1, col2], file_type=parquet, predicate=col1@0 = category_1 AND col2@1 = type_1 OR col1@0 = category_2 AND col2@1 = type_2, pruning_predicate=col1_null_count@2 != row_count@3 AND col1_min@0 <= category_1 AND category_1 <= col1_max@1 AND col2_null_count@6 != row_count@3 AND col2_min@4 <= type_1 AND type_1 <= col2_max@5 OR col1_null_count@2 != row_count@3 AND col1_min@0 <= category_2 AND category_2 <= col1_max@1 AND col2_null_count@6 != row_count@3 AND col2_min@4 <= type_2 AND type_2 <= col2_max@5, required_guarantees=[], metrics=[output_rows=10000, elapsed_compute=1ns, bytes_scanned=18452, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, page_index_rows_matched=0, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=0, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=217.585µs, metadata_load_time=330.042µs, page_index_eval_time=43ns, row_pushdown_eval_time=2ns, statistics_eval_time=480.376µs, time_elapsed_opening=1.069667ms, time_elapsed_processing=2.172875ms, time_elapsed_scanning_total=1.259999ms, time_elapsed_scanning_until_data=1.026791ms]
As we can see from the above query, col1
can only have two values: category_1
and category_2
. But we have required_guarantees=[]
and row_groups_matched_bloom_filter=0
. The bloom filter is unused.
Query 2:
SELECT col1, col2, count(*)
FROM test_data
WHERE ((col1 = 'category_1' AND col2 = 'type_1')
OR (col1 = 'category_2' AND col2 = 'type_2'))
AND col1 IN ('category_1', 'category_2')
GROUP BY col1, col2
DataSourceExec: file_groups={1 group: [[var/folders/6z/kt4t6jkd4ss1_fj16dv_05xc0000gn/T/.tmpl2ljbz/bloom_filter_data.parquet]]}, projection=[col1, col2], file_type=parquet, predicate=(col1@0 = category_1 AND col2@1 = type_1 OR col1@0 = category_2 AND col2@1 = type_2) AND (col1@0 = category_1 OR col1@0 = category_2), pruning_predicate=(col1_null_count@2 != row_count@3 AND col1_min@0 <= category_1 AND category_1 <= col1_max@1 AND col2_null_count@6 != row_count@3 AND col2_min@4 <= type_1 AND type_1 <= col2_max@5 OR col1_null_count@2 != row_count@3 AND col1_min@0 <= category_2 AND category_2 <= col1_max@1 AND col2_null_count@6 != row_count@3 AND col2_min@4 <= type_2 AND type_2 <= col2_max@5) AND (col1_null_count@2 != row_count@3 AND col1_min@0 <= category_1 AND category_1 <= col1_max@1 OR col1_null_count@2 != row_count@3 AND col1_min@0 <= category_2 AND category_2 <= col1_max@1), required_guarantees=[col1 in (category_1, category_2)], metrics=[output_rows=10000, elapsed_compute=1ns, bytes_scanned=18695, file_open_errors=0, file_scan_errors=0, num_predicate_creation_errors=0, page_index_rows_matched=10000, page_index_rows_pruned=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, row_groups_matched_bloom_filter=1, row_groups_matched_statistics=1, row_groups_pruned_bloom_filter=0, row_groups_pruned_statistics=0, bloom_filter_eval_time=199.459µs, metadata_load_time=452.793µs, page_index_eval_time=85.71µs, row_pushdown_eval_time=2ns, statistics_eval_time=97.667µs, time_elapsed_opening=896.792µs, time_elapsed_processing=1.827918ms, time_elapsed_scanning_total=1.140416ms, time_elapsed_scanning_until_data=941.917µs]
Here we have required_guarantees=[col1 in (category_1, category_2)]
and row_groups_matched_bloom_filter=1
due to explicit col1 IN ('category_1', 'category_2')
.
In my production use case, I achieve around a 40% improvement by adding the col1 IN ('category_1', 'category_2')
pattern, as I need to process less data.
To Reproduce
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion::prelude::*;
use parquet::arrow::arrow_writer::ArrowWriter;
use parquet::file::properties::WriterProperties;
use std::fs::File;
use std::sync::Arc;
use tempfile::TempDir;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Creating parquet file with bloom filters...");
let temp_dir = TempDir::new()?;
let parquet_path = temp_dir.path().join("bloom_filter_data.parquet");
// Create sample data with repetitive patterns to test bloom filter efficiency
let mut col1_values = Vec::new();
let mut col2_values = Vec::new();
// Generate data with specific patterns
for i in 0..10000 {
col1_values.push(format!("category_{}", i % 100));
col2_values.push(format!("type_{}", i % 50));
}
let col1 = StringArray::from(col1_values);
let col2 = StringArray::from(col2_values);
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Utf8, false),
Field::new("col2", DataType::Utf8, false),
]));
let record_batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(col1), Arc::new(col2)])?;
let file = File::create(&parquet_path)?;
// Enable bloom filters
let props = WriterProperties::builder()
.set_bloom_filter_enabled(true)
.set_bloom_filter_ndv(100)
.build();
let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
writer.write(&record_batch)?;
writer.close()?;
println!("Parquet file created at: {:?}", parquet_path);
println!("Bloom filters enabled for columns");
let ctx = SessionContext::new_with_config(SessionConfig::from_env()?);
ctx.register_parquet(
"test_data",
parquet_path.to_str().unwrap(),
ParquetReadOptions::default(),
)
.await?;
println!("\nSample data:");
let df = ctx.sql("SELECT col1, col2 FROM test_data LIMIT 10").await?;
df.show().await?;
println!("\n=== First Query: OR conditions with bloom filter ===");
let query1 = "
EXPLAIN ANALYZE
SELECT col1, col2, count(*)
FROM test_data
WHERE (col1 = 'category_1' AND col2 = 'type_1')
OR (col1 = 'category_2' AND col2 = 'type_2')
GROUP BY col1, col2
";
let result1 = ctx.sql(query1).await?;
result1.show().await?;
println!("\n=== Second Query: OR conditions with IN clause ===");
let query2 = "
EXPLAIN ANALYZE
SELECT col1, col2, count(*)
FROM test_data
WHERE ((col1 = 'category_1' AND col2 = 'type_1')
OR (col1 = 'category_2' AND col2 = 'type_2'))
AND col1 IN ('category_1', 'category_2')
GROUP BY col1, col2
";
let result2 = ctx.sql(query2).await?;
result2.show().await?;
Ok(())
}
I am using Datafusion 47.
❯ env | grep DATAFUSION
DATAFUSION_EXECUTION_PARQUET_REORDER_FILTERS=true
DATAFUSION_EXECUTION_TARGET_PARTITIONS=8
DATAFUSION_EXECUTION_USE_ROW_NUMBER_ESTIMATES_TO_OPTIMIZE_PARTITIONING=true
DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true
DATAFUSION_EXECUTION_META_FETCH_CONCURRENCY=10
DATAFUSION_EXECUTION_PARQUET_BINARY_AS_STRING=true
DATAFUSION_EXECUTION_PARQUET_SCHEMA_FORCE_VIEW_TYPES=true
DATAFUSION_EXECUTION_BATCH_SIZE=20000
Expected behavior
WHERE (col1 = 'category_1' AND col2 = 'type_1')
OR (col1 = 'category_2' AND col2 = 'type_2')
The above pattern should create required_guarantees=[col1 in (category_1, category_2)]
or an equivalent expression and utilise the bloom filter.
Additional context
No response
alamb, haohuaijin and chiragjn
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't workingperformanceMake DataFusion fasterMake DataFusion faster