Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ impl FileSource for TestSource {
})
}

fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
self.predicate.clone()
}

fn as_any(&self) -> &dyn Any {
todo!("should not be called")
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/datasource-parquet/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ impl FileSource for ParquetSource {
self
}

fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
self.predicate.clone()
}

fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
let mut conf = self.clone();
conf.batch_size = Some(batch_size);
Expand Down
4 changes: 4 additions & 0 deletions datafusion/datasource/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ pub trait FileSource: Send + Sync {
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
/// Initialize new instance with projected statistics
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
/// Returns the filter expression that will be applied during the file scan.
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
None
}
/// Return execution plan metrics
fn metrics(&self) -> &ExecutionPlanMetricsSet;
/// Return projected statistics
Expand Down
44 changes: 40 additions & 4 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,20 @@ use datafusion_common::{
use datafusion_execution::{
object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::{expressions::Column, utils::reassign_predicate_columns};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::{
display::{display_orderings, ProjectSchemaDisplay},
metrics::ExecutionPlanMetricsSet,
projection::{all_alias_free_columns, new_projections_for_columns, ProjectionExec},
DisplayAs, DisplayFormatType, ExecutionPlan,
};
use datafusion_physical_plan::{
filter::collect_columns_from_predicate, filter_pushdown::FilterPushdownPropagation,
};

use datafusion_physical_plan::coop::cooperative;
use datafusion_physical_plan::execution_plan::SchedulingType;
Expand Down Expand Up @@ -577,8 +579,31 @@ impl DataSource for FileScanConfig {

fn eq_properties(&self) -> EquivalenceProperties {
let (schema, constraints, _, orderings) = self.project();
EquivalenceProperties::new_with_orderings(schema, orderings)
.with_constraints(constraints)
let mut eq_properties =
EquivalenceProperties::new_with_orderings(Arc::clone(&schema), orderings)
.with_constraints(constraints);
if let Some(filter) = self.file_source.filter() {
// We need to remap column indexes to match the projected schema since that's what the equivalence properties deal with.
// Note that this will *ignore* any non-projected columns: these don't factor into ordering / equivalence.
match reassign_predicate_columns(filter, &schema, true) {
Ok(filter) => {
match Self::add_filter_equivalence_info(filter, &mut eq_properties) {
Ok(()) => {}
Err(e) => {
warn!("Failed to add filter equivalence info: {e}");
#[cfg(debug_assertions)]
panic!("Failed to add filter equivalence info: {e}");
}
}
}
Err(e) => {
warn!("Failed to reassign predicate columns: {e}");
#[cfg(debug_assertions)]
panic!("Failed to reassign predicate columns: {e}");
}
};
}
eq_properties
}

fn scheduling_type(&self) -> SchedulingType {
Expand Down Expand Up @@ -724,6 +749,17 @@ impl FileScanConfig {
))
}

fn add_filter_equivalence_info(
filter: Arc<dyn PhysicalExpr>,
eq_properties: &mut EquivalenceProperties,
) -> Result<()> {
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
for (lhs, rhs) in equal_pairs {
eq_properties.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
}
Ok(())
}

pub fn projected_constraints(&self) -> Constraints {
let indexes = self.projection_indices();
self.constraints.project(&indexes).unwrap_or_default()
Expand Down
32 changes: 2 additions & 30 deletions datafusion/datasource/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,8 @@ use crate::file_scan_config::FileScanConfig;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{Constraints, Result, Statistics};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_expr::{
conjunction, EquivalenceProperties, Partitioning, PhysicalExpr,
};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr};
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::filter::collect_columns_from_predicate;
use datafusion_physical_plan::filter_pushdown::{
ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation, PushedDown,
};
Expand Down Expand Up @@ -375,21 +372,10 @@ impl ExecutionPlan for DataSourceExec {
Some(data_source) => {
let mut new_node = self.clone();
new_node.data_source = data_source;
// Re-compute properties since we have new filters which will impact equivalence info
new_node.cache =
Self::compute_properties(Arc::clone(&new_node.data_source));

// Recompute equivalence info using new filters
let filter = conjunction(
res.filters
.iter()
.zip(parent_filters)
.filter_map(|(s, f)| match s {
PushedDown::Yes => Some(f),
PushedDown::No => None,
})
.collect_vec(),
);
new_node = new_node.add_filter_equivalence_info(filter)?;
Ok(FilterPushdownPropagation {
filters: res.filters,
updated_node: Some(Arc::new(new_node)),
Expand Down Expand Up @@ -437,20 +423,6 @@ impl DataSourceExec {
self
}

/// Add filters' equivalence info
fn add_filter_equivalence_info(
mut self,
filter: Arc<dyn PhysicalExpr>,
) -> Result<Self> {
let (equal_pairs, _) = collect_columns_from_predicate(&filter);
for (lhs, rhs) in equal_pairs {
self.cache
.eq_properties
.add_equal_conditions(Arc::clone(lhs), Arc::clone(rhs))?
}
Ok(self)
}

fn compute_properties(data_source: Arc<dyn DataSource>) -> PlanProperties {
PlanProperties::new(
data_source.eq_properties(),
Expand Down
41 changes: 19 additions & 22 deletions datafusion/physical-expr/src/equivalence/properties/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,22 @@ impl EquivalenceProperties {
self.add_orderings(std::iter::once(ordering));
}

fn update_oeq_cache(&mut self) -> Result<()> {
// Renormalize orderings if the equivalence group changes:
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
let normal_orderings = normal_cls
.into_iter()
.map(|o| self.eq_group.normalize_sort_exprs(o));
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
self.oeq_cache.update_map();
// Discover any new orderings based on the new equivalence classes:
let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect();
for expr in leading_exprs {
self.discover_new_orderings(expr)?;
}
Ok(())
}
Comment on lines +329 to +343
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ~ same code was in 2 different places, this is a drive by deduplication


/// Incorporates the given equivalence group to into the existing
/// equivalence group within.
pub fn add_equivalence_group(
Expand All @@ -334,19 +350,7 @@ impl EquivalenceProperties {
) -> Result<()> {
if !other_eq_group.is_empty() {
self.eq_group.extend(other_eq_group);
// Renormalize orderings if the equivalence group changes:
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
let normal_orderings = normal_cls
.into_iter()
.map(|o| self.eq_group.normalize_sort_exprs(o));
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
self.oeq_cache.update_map();
// Discover any new orderings based on the new equivalence classes:
let leading_exprs: Vec<_> =
self.oeq_cache.leading_map.keys().cloned().collect();
for expr in leading_exprs {
self.discover_new_orderings(expr)?;
}
self.update_oeq_cache()?;
}
Ok(())
}
Expand All @@ -373,16 +377,9 @@ impl EquivalenceProperties {
) -> Result<()> {
// Add equal expressions to the state:
if self.eq_group.add_equal_conditions(Arc::clone(&left), right) {
// Renormalize orderings if the equivalence group changes:
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls);
let normal_orderings = normal_cls
.into_iter()
.map(|o| self.eq_group.normalize_sort_exprs(o));
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings);
self.oeq_cache.update_map();
// Discover any new orderings:
self.discover_new_orderings(left)?;
self.update_oeq_cache()?;
}
self.update_oeq_cache()?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> {
self: Arc<Self>,
_children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
todo!()
Ok(self)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now gets called by reassign_predicate_columns

}

fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,3 +575,12 @@ WHERE trace_id = '00000000000000000000000000000002'
ORDER BY start_timestamp, trace_id;
----
staging

query P
SELECT start_timestamp
FROM t1
WHERE trace_id = '00000000000000000000000000000002' AND deployment_environment = 'staging'
ORDER BY start_timestamp, trace_id
LIMIT 1;
----
2024-10-01T00:00:00Z