diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs index acb2b808ef8f..64cee011cca2 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs @@ -142,6 +142,10 @@ impl FileSource for TestSource { }) } + fn filter(&self) -> Option> { + self.predicate.clone() + } + fn as_any(&self) -> &dyn Any { todo!("should not be called") } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 1277ec52adf7..6f4dd39f29f5 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -587,6 +587,10 @@ impl FileSource for ParquetSource { self } + fn filter(&self) -> Option> { + self.predicate.clone() + } + fn with_batch_size(&self, batch_size: usize) -> Arc { let mut conf = self.clone(); conf.batch_size = Some(batch_size); diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 29fa38a8ee36..7a2cf403fd8d 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -69,6 +69,10 @@ pub trait FileSource: Send + Sync { fn with_projection(&self, config: &FileScanConfig) -> Arc; /// Initialize new instance with projected statistics fn with_statistics(&self, statistics: Statistics) -> Arc; + /// Returns the filter expression that will be applied during the file scan. + fn filter(&self) -> Option> { + None + } /// Return execution plan metrics fn metrics(&self) -> &ExecutionPlanMetricsSet; /// Return projected statistics diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7088f811bbce..47f03cbb1bfe 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -52,18 +52,20 @@ use datafusion_common::{ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; -use datafusion_physical_expr::expressions::Column; +use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns}; use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr}; -use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation; use datafusion_physical_plan::{ display::{display_orderings, ProjectSchemaDisplay}, metrics::ExecutionPlanMetricsSet, projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec}, DisplayAs, DisplayFormatType, ExecutionPlan, }; +use datafusion_physical_plan::{ + filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation, +}; use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; @@ -577,8 +579,31 @@ impl DataSource for FileScanConfig { fn eq_properties(&self) -> EquivalenceProperties { let (schema, constraints, _, orderings) = self.project(); - EquivalenceProperties::new_with_orderings(schema, orderings) - .with_constraints(constraints) + let mut eq_properties = + EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings) + .with_constraints(constraints); + if let Some(filter) = self.file_source.filter() { + // We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with. + // Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence. + match reassign_predicate_columns(filter, &schema, true) { + Ok(filter) => { + match Self::add_filter_equivalence_info(filter, &mut eq_properties) { + Ok(()) => {} + Err(e) => { + warn!("Failed to add filter equivalence info: {e}"); + #[cfg(debug_assertions)] + panic!("Failed to add filter equivalence info: {e}"); + } + } + } + Err(e) => { + warn!("Failed to reassign predicate columns: {e}"); + #[cfg(debug_assertions)] + panic!("Failed to reassign predicate columns: {e}"); + } + }; + } + eq_properties } fn scheduling_type(&self) -> SchedulingType { @@ -724,6 +749,17 @@ impl FileScanConfig { )) } + fn add_filter_equivalence_info( + filter: Arc, + eq_properties: &mut EquivalenceProperties, + ) -> Result<()> { + let (equal_pairs, _) = collect_columns_from_predicate(&filter); + for (lhs, rhs) in equal_pairs { + eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? + } + Ok(()) + } + pub fn projected_constraints(&self) -> Constraints { let indexes = self.projection_indices(); self.constraints.project(&indexes).unwrap_or_default() diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 153d03b3ab49..60be39bc637d 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -39,11 +39,8 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Result, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{ - conjunction, EquivalenceProperties, Partitioning, PhysicalExpr, -}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; -use datafusion_physical_plan::filter::collect_columns_from_predicate; use datafusion_physical_plan::filter_pushdown::{ ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown, }; @@ -375,21 +372,10 @@ impl ExecutionPlan for DataSourceExec { Some(data_source) => { let mut new_node = self.clone(); new_node.data_source = data_source; + // Re-compute properties since we have new filters which will impact equivalence info new_node.cache = Self::compute_properties(Arc::clone(&new_node.data_source)); - // Recompute equivalence info using new filters - let filter = conjunction( - res.filters - .iter() - .zip(parent_filters) - .filter_map(|(s, f)| match s { - PushedDown::Yes => Some(f), - PushedDown::No => None, - }) - .collect_vec(), - ); - new_node = new_node.add_filter_equivalence_info(filter)?; Ok(FilterPushdownPropagation { filters: res.filters, updated_node: Some(Arc::new(new_node)), @@ -437,20 +423,6 @@ impl DataSourceExec { self } - /// Add filters' equivalence info - fn add_filter_equivalence_info( - mut self, - filter: Arc, - ) -> Result { - let (equal_pairs, _) = collect_columns_from_predicate(&filter); - for (lhs, rhs) in equal_pairs { - self.cache - .eq_properties - .add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))? - } - Ok(self) - } - fn compute_properties(data_source: Arc) -> PlanProperties { PlanProperties::new( data_source.eq_properties(), diff --git a/datafusion/physical-expr/src/equivalence/properties/mod.rs b/datafusion/physical-expr/src/equivalence/properties/mod.rs index 6d18d34ca4de..822b4f5f7e64 100644 --- a/datafusion/physical-expr/src/equivalence/properties/mod.rs +++ b/datafusion/physical-expr/src/equivalence/properties/mod.rs @@ -326,6 +326,22 @@ impl EquivalenceProperties { self.add_orderings(std::iter::once(ordering)); } + fn update_oeq_cache(&mut self) -> Result<()> { + // Renormalize orderings if the equivalence group changes: + let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); + let normal_orderings = normal_cls + .into_iter() + .map(|o| self.eq_group.normalize_sort_exprs(o)); + self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); + self.oeq_cache.update_map(); + // Discover any new orderings based on the new equivalence classes: + let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect(); + for expr in leading_exprs { + self.discover_new_orderings(expr)?; + } + Ok(()) + } + /// Incorporates the given equivalence group to into the existing /// equivalence group within. pub fn add_equivalence_group( @@ -334,19 +350,7 @@ impl EquivalenceProperties { ) -> Result<()> { if !other_eq_group.is_empty() { self.eq_group.extend(other_eq_group); - // Renormalize orderings if the equivalence group changes: - let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); - let normal_orderings = normal_cls - .into_iter() - .map(|o| self.eq_group.normalize_sort_exprs(o)); - self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); - self.oeq_cache.update_map(); - // Discover any new orderings based on the new equivalence classes: - let leading_exprs: Vec<_> = - self.oeq_cache.leading_map.keys().cloned().collect(); - for expr in leading_exprs { - self.discover_new_orderings(expr)?; - } + self.update_oeq_cache()?; } Ok(()) } @@ -373,16 +377,9 @@ impl EquivalenceProperties { ) -> Result<()> { // Add equal expressions to the state: if self.eq_group.add_equal_conditions(Arc::clone(&left), right) { - // Renormalize orderings if the equivalence group changes: - let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); - let normal_orderings = normal_cls - .into_iter() - .map(|o| self.eq_group.normalize_sort_exprs(o)); - self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); - self.oeq_cache.update_map(); - // Discover any new orderings: - self.discover_new_orderings(left)?; + self.update_oeq_cache()?; } + self.update_oeq_cache()?; Ok(()) } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 86ad54d3f1eb..44e842e9356f 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1000,7 +1000,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { self: Arc, _children: Vec>, ) -> Result> { - todo!() + Ok(self) } fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt index 6adf379b2171..6dc2c264aeb8 100644 --- a/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt +++ b/datafusion/sqllogictest/test_files/parquet_filter_pushdown.slt @@ -575,3 +575,12 @@ WHERE trace_id = '00000000000000000000000000000002' ORDER BY start_timestamp, trace_id; ---- staging + +query P +SELECT start_timestamp +FROM t1 +WHERE trace_id = '00000000000000000000000000000002' AND deployment_environment = 'staging' +ORDER BY start_timestamp, trace_id +LIMIT 1; +---- +2024-10-01T00:00:00Z