From c61755021962ed31c82c0a0ed367439406871725 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 12:18:33 -0500 Subject: [PATCH 01/12] initial stab --- .../catalog/src/default_table_source.rs | 1 + datafusion/catalog/src/table.rs | 92 ++++++- .../core/src/datasource/listing/table.rs | 59 ++++- datafusion/core/src/physical_planner.rs | 15 +- datafusion/expr/src/logical_plan/plan.rs | 10 + datafusion/expr/src/logical_plan/tree_node.rs | 2 + datafusion/optimizer/src/lib.rs | 1 + .../optimizer/src/optimize_projections/mod.rs | 2 + datafusion/optimizer/src/optimizer.rs | 3 + datafusion/optimizer/src/push_down_filter.rs | 1 + datafusion/optimizer/src/push_down_sort.rs | 235 ++++++++++++++++++ datafusion/proto/src/logical_plan/mod.rs | 1 + .../sqllogictest/test_files/explain.slt | 2 + 13 files changed, 408 insertions(+), 16 deletions(-) create mode 100644 datafusion/optimizer/src/push_down_sort.rs diff --git a/datafusion/catalog/src/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs index 11963c06c88f..b3e76a2e9931 100644 --- a/datafusion/catalog/src/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -73,6 +73,7 @@ impl TableSource for DefaultTableSource { &self, filter: &[&Expr], ) -> datafusion_common::Result> { + #[allow(deprecated)] self.table_provider.supports_filters_pushdown(filter) } diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index ecfe63fa343f..7b1dcaebbbca 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,13 +25,14 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; -use datafusion_expr::Expr; +use datafusion_expr::{Expr, SortExpr}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType, }; use datafusion_physical_plan::ExecutionPlan; +use itertools::Itertools; /// A table which can be queried and modified. /// @@ -163,6 +164,7 @@ pub trait TableProvider: Debug + Sync + Send { /// because inexact filters do not guarantee that every filtered row is /// removed, so applying the limit could lead to too few rows being available /// to return as a final result. + #[deprecated(since = "50.0.0", note = "Use `scan_with_options` instead")] async fn scan( &self, state: &dyn Session, @@ -171,6 +173,25 @@ pub trait TableProvider: Debug + Sync + Send { limit: Option, ) -> Result>; + async fn scan_with_options( + &self, + state: &dyn Session, + options: ScanArgs, + ) -> Result { + let ScanArgs { + preferred_ordering: _, + filters, + projection, + limit, + } = options; + let filters = filters.unwrap_or_default(); + #[allow(deprecated)] + let plan = self + .scan(state, projection.as_ref(), &filters, limit) + .await?; + Ok(ScanResult::new(plan, vec![])) + } + /// Specify if DataFusion should provide filter expressions to the /// TableProvider to apply *during* the scan. /// @@ -251,6 +272,7 @@ pub trait TableProvider: Debug + Sync + Send { /// } /// } /// ``` + #[deprecated(since = "50.0.0", note = "Use `scan_with_options` instead")] fn supports_filters_pushdown( &self, filters: &[&Expr], @@ -299,6 +321,74 @@ pub trait TableProvider: Debug + Sync + Send { } } +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + preferred_ordering: Option>, + filters: Option>, + projection: Option>, + limit: Option, +} + +impl ScanArgs { + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + pub fn projection(&self) -> Option> { + self.projection.clone() + } + + pub fn with_filters(mut self, filters: Option>) -> Self { + self.filters = filters; + self + } + + pub fn filters(&self) -> Option<&[Expr]> { + self.filters.as_deref() + } + + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + pub fn limit(&self) -> Option { + self.limit + } + + pub fn with_preferred_ordering(mut self, ordering: Option>) -> Self { + self.preferred_ordering = ordering; + self + } + + pub fn preferred_ordering(&self) -> Option<&[SortExpr]> { + self.preferred_ordering.as_deref() + } +} + +#[derive(Debug, Clone)] +pub struct ScanResult { + /// The ExecutionPlan to run. + plan: Arc, + // Remaining filters that were not completely evaluated during `scan_with_options()`. + filters: Vec, +} + +impl ScanResult { + pub fn new(plan: Arc, filters: Vec) -> Self { + Self { plan, filters } + } + + pub fn plan(&self) -> Arc { + Arc::clone(&self.plan) + } + + pub fn filters(&self) -> &[Expr] { + &self.filters + } +} + /// A factory which creates [`TableProvider`]s at runtime given a URL. /// /// For example, this can be used to create a table "on the fly" diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d289a1d07129..2a3dc0c96fef 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -29,7 +29,7 @@ use crate::{ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; -use datafusion_catalog::{Session, TableProvider}; +use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::{ config_datafusion_err, config_err, internal_err, plan_err, project_schema, stats::Precision, Constraints, DataFusionError, Result, SchemaExt, @@ -1166,6 +1166,22 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { + let options = ScanArgs::default() + .with_projection(projection.cloned()) + .with_filters(Some(filters.to_vec())) + .with_limit(limit); + Ok(self.scan_with_options(state, options).await?.plan()) + } + + async fn scan_with_options( + &self, + state: &dyn Session, + options: ScanArgs, + ) -> Result { + let projection = options.projection(); + let filters = options.filters().map(|f| f.to_vec()).unwrap_or_default(); + let limit = options.limit(); + // extract types of partition columns let table_partition_cols = self .options @@ -1195,21 +1211,36 @@ impl TableProvider for ListingTable { // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection)?; - return Ok(Arc::new(EmptyExec::new(projected_schema))); + let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + return Ok(ScanResult::new( + Arc::new(EmptyExec::new(projected_schema)), + filters.clone(), + )); } - let output_ordering = self.try_create_output_ordering()?; + let known_file_ordering = self.try_create_output_ordering()?; + let desired_file_ordering = match options.preferred_ordering() { + Some(ordering) if !ordering.is_empty() => { + // Prefer the ordering requested by the query to any inherint file ordering + create_ordering(&self.table_schema, &[ordering.to_vec()])? + .first() + .cloned() + } + Some(_) | None => { + // If the query did not request a specific ordering, fall back to any inherent file ordering + known_file_ordering.first().cloned() + } + }; match state .config_options() .execution .split_file_groups_by_statistics .then(|| { - output_ordering.first().map(|output_ordering| { + desired_file_ordering.map(|ordering| { FileScanConfig::split_groups_by_statistics_with_target_partitions( &self.table_schema, &partitioned_file_lists, - output_ordering, + &ordering, self.options.target_partitions, ) }) @@ -1230,13 +1261,17 @@ impl TableProvider for ListingTable { let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { - return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); + return Ok(ScanResult::new( + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), + filters.clone(), + )); }; let file_source = self.create_file_source_with_schema_adapter()?; // create the execution plan - self.options + let plan = self + .options .format .create_physical_plan( state, @@ -1248,14 +1283,16 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection.cloned()) + .with_projection(projection) .with_limit(limit) - .with_output_ordering(output_ordering) + .with_output_ordering(known_file_ordering) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) .build(), ) - .await + .await?; + + Ok(ScanResult::new(plan, filters.clone())) } fn supports_filters_pushdown( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0ce5621ac89f..c8add16aab45 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; +use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, @@ -85,7 +86,7 @@ use datafusion_expr::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{ - create_physical_sort_exprs, LexOrdering, PhysicalSortExpr, + conjunction, create_physical_sort_exprs, LexOrdering, PhysicalSortExpr, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; @@ -452,6 +453,7 @@ impl DefaultPhysicalPlanner { projection, filters, fetch, + preferred_ordering, .. }) => { let source = source_as_provider(source)?; @@ -459,9 +461,14 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); - source - .scan(session_state, projection.as_ref(), &filters, *fetch) - .await? + let opts = ScanArgs::default() + .with_projection(projection.clone()) + .with_filters(Some(filters).clone()) + .with_preferred_ordering(preferred_ordering.clone()) + .with_limit(*fetch); + let res = source.scan_with_options(session_state, opts).await?; + // TODO: move FilterExec wrapping logic from filter pushdown rule to here? + res.plan() } LogicalPlan::Values(Values { values, schema }) => { let exec_schema = schema.as_ref().to_owned().into(); diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 887afd7cde3e..ea879a854b60 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2525,6 +2525,8 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Preferred read ordering of the table + pub preferred_ordering: Option>, } impl Debug for TableScan { @@ -2643,8 +2645,14 @@ impl TableScan { projected_schema, filters, fetch, + preferred_ordering: None, }) } + + pub fn with_preferred_ordering(mut self, ordering: Option>) -> Self { + self.preferred_ordering = ordering; + self + } } // Repartition the plan based on a partitioning scheme. @@ -4814,6 +4822,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + preferred_ordering: None, })); let col = schema.field_names()[0].clone(); @@ -4844,6 +4853,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + preferred_ordering: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 47088370a1d9..37244ebf9437 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -599,6 +599,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + preferred_ordering, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -607,6 +608,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + preferred_ordering, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 280010e3d92c..8d6088cc9d91 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -58,6 +58,7 @@ pub mod optimizer; pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; +pub mod push_down_sort; pub mod replace_distinct_aggregate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 97402c990b83..d99543993f65 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -242,6 +242,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + preferred_ordering, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -257,6 +258,7 @@ fn optimize_projections( filters, fetch, ) + .map(|s| s.with_preferred_ordering(preferred_ordering)) .map(LogicalPlan::TableScan) .map(Transformed::yes); } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 49806d6db344..df04f65fbcd9 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -51,6 +51,7 @@ use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; +use crate::push_down_sort::PushDownSort; use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; @@ -243,6 +244,8 @@ impl Optimizer { // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), Arc::new(PushDownFilter::new()), + // Push down sort requirements to TableScan preferred_ordering + Arc::new(PushDownSort::new()), Arc::new(SingleDistinctToGroupBy::new()), // The previous optimizations added expressions and projections, // that might benefit from the following rules diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 27c2499c8a26..50e7d00b7788 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3055,6 +3055,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + preferred_ordering: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs new file mode 100644 index 000000000000..38408d3a6bba --- /dev/null +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PushDownSort`] pushes `SORT` requirements into `TableScan` + +use std::sync::Arc; + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; + +use datafusion_common::tree_node::Transformed; +use datafusion_common::{ExprSchema, Result}; +use datafusion_expr::logical_plan::{LogicalPlan, Sort}; +use datafusion_expr::SortExpr; + +/// Optimization rule that tries to push down `SORT` ordering requirements into `TableScan`. +/// +/// This optimization allows table providers to potentially provide pre-sorted data, +/// eliminating the need for expensive Sort operations. +#[derive(Default, Debug)] +pub struct PushDownSort {} + +impl PushDownSort { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } + + /// Attempts to push sort expressions down to a TableScan + fn try_push_sort_to_table_scan( + sort_exprs: &[SortExpr], + mut plan: LogicalPlan, + ) -> Result> { + match plan { + LogicalPlan::TableScan(ref mut scan) => { + // If the table scan already has a preferred ordering, don't override it + if scan.preferred_ordering.is_some() { + return Ok(Transformed::no(plan)); + } + + // Set the preferred ordering on the table scan + scan.preferred_ordering = Some(sort_exprs.to_vec()); + Ok(Transformed::yes(plan)) + } + LogicalPlan::Filter(mut filter) => { + // Recursively try to push down through the filter + let filter_input = Arc::clone(&filter.input); + let input_result = Self::try_push_sort_to_table_scan( + sort_exprs, + Arc::unwrap_or_clone(filter_input), + )?; + if input_result.transformed { + filter.input = Arc::new(input_result.data); + Ok(Transformed::yes(LogicalPlan::Filter(filter))) + } else { + Ok(Transformed::no(LogicalPlan::Filter(filter))) + } + } + LogicalPlan::Projection(mut projection) => { + // For projections, we need to check if the sort expressions are still valid + // For simplicity, we'll only push down if all sort expressions reference + // columns that exist in the projection's input + let input_schema = projection.input.schema(); + let sort_is_valid = sort_exprs.iter().all(|sort_expr| { + // Check if the sort expression is a simple column reference + if let datafusion_expr::Expr::Column(col) = &sort_expr.expr { + input_schema.field_from_column(col).is_ok() + } else { + // For non-column expressions, we'd need more sophisticated analysis + false + } + }); + + if sort_is_valid { + let projection_input = Arc::clone(&projection.input); + let input_result = Self::try_push_sort_to_table_scan( + sort_exprs, + Arc::unwrap_or_clone(projection_input), + )?; + if input_result.transformed { + projection.input = Arc::new(input_result.data); + return Ok(Transformed::yes(LogicalPlan::Projection(projection))); + } + } + Ok(Transformed::no(LogicalPlan::Projection(projection))) + } + LogicalPlan::SubqueryAlias(mut alias) => { + // Recursively try to push down through the subquery alias + let alias_input = Arc::clone(&alias.input); + let input_result = Self::try_push_sort_to_table_scan( + sort_exprs, + Arc::unwrap_or_clone(alias_input), + )?; + if input_result.transformed { + alias.input = Arc::new(input_result.data); + Ok(Transformed::yes(LogicalPlan::SubqueryAlias(alias))) + } else { + Ok(Transformed::no(LogicalPlan::SubqueryAlias(alias))) + } + } + _ => { + // Cannot push sort through other operations + Ok(Transformed::no(plan)) + } + } + } +} + +impl OptimizerRule for PushDownSort { + fn name(&self) -> &str { + "push_down_sort" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let LogicalPlan::Sort(sort) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Try to push the sort requirements down to a table scan + let sort_exprs = sort.expr.clone(); + let sort_input = Arc::clone(&sort.input); + let sort_fetch = sort.fetch; + + let input_result = Self::try_push_sort_to_table_scan( + &sort_exprs, + Arc::unwrap_or_clone(sort_input), + )?; + + if input_result.transformed { + // If we successfully pushed the sort down, we can potentially eliminate the sort + // For now, we'll keep the sort for safety, but in a full implementation + // we could eliminate it if we're confident the table provider will provide sorted data + Ok(Transformed::yes(LogicalPlan::Sort(Sort { + expr: sort_exprs, + input: Arc::new(input_result.data), + fetch: sort_fetch, + }))) + } else { + // Could not push down the sort + Ok(Transformed::no(LogicalPlan::Sort(sort))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::*; + use crate::OptimizerContext; + use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder}; + + fn optimize(plan: LogicalPlan) -> Result { + let rule = PushDownSort::new(); + let config = &OptimizerContext::new(); + let result = rule.rewrite(plan, config)?; + Ok(result.data) + } + + #[test] + fn test_sort_table_scan() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, true)])? + .build()?; + + let optimized = optimize(plan)?; + + // The optimized plan should have the sort pushed down to the table scan + match &optimized { + LogicalPlan::Sort(sort) => match sort.input.as_ref() { + LogicalPlan::TableScan(scan) => { + assert!(scan.preferred_ordering.is_some()); + assert_eq!(scan.preferred_ordering.as_ref().unwrap().len(), 1); + } + _ => panic!("Expected TableScan after Sort"), + }, + _ => panic!("Expected Sort at root"), + } + + Ok(()) + } + + #[test] + fn test_sort_through_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("a").gt(col("b")))? + .sort(vec![col("c").sort(true, true)])? + .build()?; + + let optimized = optimize(plan)?; + + // Should push sort through filter to table scan + match &optimized { + LogicalPlan::Sort(sort) => match sort.input.as_ref() { + LogicalPlan::Filter(filter) => match filter.input.as_ref() { + LogicalPlan::TableScan(scan) => { + assert!(scan.preferred_ordering.is_some()); + } + _ => panic!("Expected TableScan after Filter"), + }, + _ => panic!("Expected Filter after Sort"), + }, + _ => panic!("Expected Sort at root"), + } + + Ok(()) + } +} diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 576a51707c96..0cd4b4bf9279 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -271,6 +271,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + preferred_ordering: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index cccffe08ad7c..05803270c7c8 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -195,6 +195,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE @@ -217,6 +218,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE From 527cdcdb9018753b8196947af5a39fd5c0ddfbf2 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 12:21:07 -0500 Subject: [PATCH 02/12] rename --- datafusion/catalog/src/table.rs | 10 +++++----- datafusion/core/src/datasource/listing/table.rs | 6 +++--- datafusion/core/src/physical_planner.rs | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 7b1dcaebbbca..c5855910dcaf 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -164,7 +164,7 @@ pub trait TableProvider: Debug + Sync + Send { /// because inexact filters do not guarantee that every filtered row is /// removed, so applying the limit could lead to too few rows being available /// to return as a final result. - #[deprecated(since = "50.0.0", note = "Use `scan_with_options` instead")] + #[deprecated(since = "50.0.0", note = "Use `scan_with_args` instead")] async fn scan( &self, state: &dyn Session, @@ -173,10 +173,10 @@ pub trait TableProvider: Debug + Sync + Send { limit: Option, ) -> Result>; - async fn scan_with_options( + async fn scan_with_args( &self, state: &dyn Session, - options: ScanArgs, + args: ScanArgs, ) -> Result { let ScanArgs { preferred_ordering: _, @@ -272,7 +272,7 @@ pub trait TableProvider: Debug + Sync + Send { /// } /// } /// ``` - #[deprecated(since = "50.0.0", note = "Use `scan_with_options` instead")] + #[deprecated(since = "50.0.0", note = "Use `scan_with_args` instead")] fn supports_filters_pushdown( &self, filters: &[&Expr], @@ -371,7 +371,7 @@ impl ScanArgs { pub struct ScanResult { /// The ExecutionPlan to run. plan: Arc, - // Remaining filters that were not completely evaluated during `scan_with_options()`. + // Remaining filters that were not completely evaluated during `scan_with_args()`. filters: Vec, } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2a3dc0c96fef..9bc4b053370a 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1170,13 +1170,13 @@ impl TableProvider for ListingTable { .with_projection(projection.cloned()) .with_filters(Some(filters.to_vec())) .with_limit(limit); - Ok(self.scan_with_options(state, options).await?.plan()) + Ok(self.scan_with_args(state, options).await?.plan()) } - async fn scan_with_options( + async fn scan_with_args( &self, state: &dyn Session, - options: ScanArgs, + args: ScanArgs, ) -> Result { let projection = options.projection(); let filters = options.filters().map(|f| f.to_vec()).unwrap_or_default(); diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index c8add16aab45..3084cb74c8de 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -466,7 +466,7 @@ impl DefaultPhysicalPlanner { .with_filters(Some(filters).clone()) .with_preferred_ordering(preferred_ordering.clone()) .with_limit(*fetch); - let res = source.scan_with_options(session_state, opts).await?; + let res = source.scan_with_args(session_state, opts).await?; // TODO: move FilterExec wrapping logic from filter pushdown rule to here? res.plan() } From 6c34c72136a427479094c3823d8bd5c55448dc38 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 13:56:35 -0500 Subject: [PATCH 03/12] fix --- datafusion/catalog/src/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index c5855910dcaf..75379c295fae 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -183,7 +183,7 @@ pub trait TableProvider: Debug + Sync + Send { filters, projection, limit, - } = options; + } = args; let filters = filters.unwrap_or_default(); #[allow(deprecated)] let plan = self From b063b3b6cf0f6c47ce3358cef541b77adb77b743 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 14:55:38 -0500 Subject: [PATCH 04/12] fix --- datafusion/catalog/src/default_table_source.rs | 1 - datafusion/catalog/src/memory/table.rs | 1 + datafusion/catalog/src/table.rs | 16 +++++++++++++--- datafusion/core/src/datasource/listing/table.rs | 15 +++++++++++---- datafusion/core/src/datasource/memory_test.rs | 4 ++++ datafusion/core/src/physical_planner.rs | 2 +- datafusion/core/tests/parquet/file_statistics.rs | 8 ++++++++ .../physical_optimizer/partition_statistics.rs | 1 + datafusion/ffi/src/table_provider.rs | 1 + 9 files changed, 40 insertions(+), 9 deletions(-) diff --git a/datafusion/catalog/src/default_table_source.rs b/datafusion/catalog/src/default_table_source.rs index b3e76a2e9931..11963c06c88f 100644 --- a/datafusion/catalog/src/default_table_source.rs +++ b/datafusion/catalog/src/default_table_source.rs @@ -73,7 +73,6 @@ impl TableSource for DefaultTableSource { &self, filter: &[&Expr], ) -> datafusion_common::Result> { - #[allow(deprecated)] self.table_provider.supports_filters_pushdown(filter) } diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 90224f6a37bc..32e25ba5be45 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -138,6 +138,7 @@ impl MemTable { ) -> Result { let schema = t.schema(); let constraints = t.constraints(); + #[expect(deprecated)] let exec = t.scan(state, None, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 75379c295fae..81618306efa0 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -185,11 +185,22 @@ pub trait TableProvider: Debug + Sync + Send { limit, } = args; let filters = filters.unwrap_or_default(); - #[allow(deprecated)] + let unsupported_filters = self + .supports_filters_pushdown(&filters.iter().collect_vec())? + .into_iter() + .zip(&filters) + .filter_map(|(support, expr)| match support { + TableProviderFilterPushDown::Inexact | TableProviderFilterPushDown::Unsupported => { + Some(expr.clone()) + } + TableProviderFilterPushDown::Exact => None, + }) + .collect_vec(); + #[expect(deprecated)] let plan = self .scan(state, projection.as_ref(), &filters, limit) .await?; - Ok(ScanResult::new(plan, vec![])) + Ok(ScanResult::new(plan, unsupported_filters)) } /// Specify if DataFusion should provide filter expressions to the @@ -272,7 +283,6 @@ pub trait TableProvider: Debug + Sync + Send { /// } /// } /// ``` - #[deprecated(since = "50.0.0", note = "Use `scan_with_args` instead")] fn supports_filters_pushdown( &self, filters: &[&Expr], diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 9bc4b053370a..6dff7e4ccea5 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1178,9 +1178,9 @@ impl TableProvider for ListingTable { state: &dyn Session, args: ScanArgs, ) -> Result { - let projection = options.projection(); - let filters = options.filters().map(|f| f.to_vec()).unwrap_or_default(); - let limit = options.limit(); + let projection = args.projection(); + let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); + let limit = args.limit(); // extract types of partition columns let table_partition_cols = self @@ -1219,7 +1219,7 @@ impl TableProvider for ListingTable { } let known_file_ordering = self.try_create_output_ordering()?; - let desired_file_ordering = match options.preferred_ordering() { + let desired_file_ordering = match args.preferred_ordering() { Some(ordering) if !ordering.is_empty() => { // Prefer the ordering requested by the query to any inherint file ordering create_ordering(&self.table_schema, &[ordering.to_vec()])? @@ -1696,6 +1696,7 @@ mod tests { let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; + #[expect(deprecated)] let exec = table .scan(&ctx.state(), projection, &[], None) .await @@ -1833,6 +1834,7 @@ mod tests { // this will filter out the only file in the store let filter = Expr::not_eq(col("p1"), lit("v1")); + #[expect(deprecated)] let scan = table .scan(&ctx.state(), None, &[filter], None) .await @@ -2733,6 +2735,7 @@ mod tests { .with_schema(schema_default); let table_default = ListingTable::try_new(config_default)?; + #[expect(deprecated)] let exec_default = table_default.scan(&state, None, &[], None).await?; assert_eq!( exec_default.partition_statistics(None)?.num_rows, @@ -2754,6 +2757,7 @@ mod tests { .with_schema(schema_disabled); let table_disabled = ListingTable::try_new(config_disabled)?; + #[expect(deprecated)] let exec_disabled = table_disabled.scan(&state, None, &[], None).await?; assert_eq!( exec_disabled.partition_statistics(None)?.num_rows, @@ -2773,6 +2777,7 @@ mod tests { .with_schema(schema_enabled); let table_enabled = ListingTable::try_new(config_enabled)?; + #[expect(deprecated)] let exec_enabled = table_enabled.scan(&state, None, &[], None).await?; assert_eq!( exec_enabled.partition_statistics(None)?.num_rows, @@ -2879,6 +2884,7 @@ mod tests { assert!(table.schema_adapter_factory().is_none()); // The scan should work correctly with the default schema adapter + #[expect(deprecated)] let scan_result = table.scan(&ctx.state(), None, &[], None).await; assert!( scan_result.is_ok(), @@ -2915,6 +2921,7 @@ mod tests { )?; // The error should bubble up from the scan operation when schema mapping fails + #[expect(deprecated)] let scan_result = table.scan(&ctx.state(), None, &[], None).await; assert!(scan_result.is_err()); diff --git a/datafusion/core/src/datasource/memory_test.rs b/datafusion/core/src/datasource/memory_test.rs index c16837c73b4f..b7b80b4a9b11 100644 --- a/datafusion/core/src/datasource/memory_test.rs +++ b/datafusion/core/src/datasource/memory_test.rs @@ -59,6 +59,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; // scan with projection + #[expect(deprecated)] let exec = provider .scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None) .await?; @@ -94,6 +95,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; + #[expect(deprecated)] let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; @@ -126,6 +128,7 @@ mod tests { let projection: Vec = vec![0, 4]; + #[expect(deprecated)] match provider .scan(&session_ctx.state(), Some(&projection), &[], None) .await @@ -254,6 +257,7 @@ mod tests { let provider = MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; + #[expect(deprecated)] let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 3084cb74c8de..f9ead2a7443a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -86,7 +86,7 @@ use datafusion_expr::{ use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::{ - conjunction, create_physical_sort_exprs, LexOrdering, PhysicalSortExpr, + create_physical_sort_exprs, LexOrdering, PhysicalSortExpr, }; use datafusion_physical_optimizer::PhysicalOptimizerRule; use datafusion_physical_plan::empty::EmptyExec; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 64ee92eda254..d9b9775cd689 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -60,6 +60,7 @@ async fn check_stats_precision_with_filter_pushdown() { options.execution.parquet.pushdown_filters = true; // Scan without filter, stats are exact + #[expect(deprecated)] let exec = table.scan(&state, None, &[], None).await.unwrap(); assert_eq!( exec.partition_statistics(None).unwrap().num_rows, @@ -71,6 +72,7 @@ async fn check_stats_precision_with_filter_pushdown() { // (it is not a partition filter). Therefore; it will be pushed down to the // source operator after the appropriate optimizer pass. let filter_expr = Expr::gt(col("id"), lit(1)); + #[expect(deprecated)] let exec_with_filter = table .scan(&state, None, std::slice::from_ref(&filter_expr), None) .await @@ -119,6 +121,7 @@ async fn load_table_stats_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_static_cache_size(&state1), 0); + #[expect(deprecated)] let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); assert_eq!( @@ -135,6 +138,7 @@ async fn load_table_stats_with_session_level_cache() { //Session 2 first time list files //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state2), 0); + #[expect(deprecated)] let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); assert_eq!( exec2.partition_statistics(None).unwrap().num_rows, @@ -150,6 +154,7 @@ async fn load_table_stats_with_session_level_cache() { //Session 1 second time list files //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state1), 1); + #[expect(deprecated)] let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); assert_eq!( exec3.partition_statistics(None).unwrap().num_rows, @@ -195,6 +200,7 @@ async fn list_files_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_list_file_cache_size(&state1), 0); + #[expect(deprecated)] let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); let data_source_exec = exec1.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); @@ -211,6 +217,7 @@ async fn list_files_with_session_level_cache() { //Session 2 first time list files //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state2), 0); + #[expect(deprecated)] let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); let data_source_exec = exec2.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); @@ -227,6 +234,7 @@ async fn list_files_with_session_level_cache() { //Session 1 second time list files //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state1), 1); + #[expect(deprecated)] let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); let data_source_exec = exec3.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index df1032e0652e..cc9d852cd1f4 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -104,6 +104,7 @@ mod test { .downcast_ref::() .unwrap() .clone(); + #[expect(deprecated)] listing_table .scan(&ctx.state(), None, &[], None) .await diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index 890511997a70..c11381e5b12b 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -260,6 +260,7 @@ unsafe extern "C" fn scan_fn_wrapper( let projections: Vec<_> = projections.into_iter().collect(); + #[expect(deprecated)] let plan = rresult_return!( internal_provider .scan(&ctx.state(), Some(&projections), &filters, limit.into()) From 23857eb4b0d051c52538bdd977d2431ae088a1b1 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 15:15:41 -0500 Subject: [PATCH 05/12] fix unrelated flakiness? --- datafusion/catalog/src/table.rs | 5 ++--- datafusion/physical-plan/src/joins/hash_join.rs | 5 +++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 81618306efa0..36dedba54bdd 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -190,9 +190,8 @@ pub trait TableProvider: Debug + Sync + Send { .into_iter() .zip(&filters) .filter_map(|(support, expr)| match support { - TableProviderFilterPushDown::Inexact | TableProviderFilterPushDown::Unsupported => { - Some(expr.clone()) - } + TableProviderFilterPushDown::Inexact + | TableProviderFilterPushDown::Unsupported => Some(expr.clone()), TableProviderFilterPushDown::Exact => None, }) .collect_vec(); diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 80f1de5a5b6f..a407c15bf0cd 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -290,6 +290,11 @@ impl SharedBoundsAccumulator { // Combine all column predicates for this partition with AND if !column_predicates.is_empty() { + #[cfg(debug_assertions)] + { + // Sort predicates for consistent ordering in debug builds + column_predicates.sort_by_cached_key(|expr| format!("{}", fmt_sql(expr.as_ref()))); + } let partition_predicate = column_predicates .into_iter() .reduce(|acc, pred| { From b711b048651835f8c2f706652455b05a858fca89 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 15:19:34 -0500 Subject: [PATCH 06/12] fmt --- datafusion/physical-plan/src/joins/hash_join.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index a407c15bf0cd..dc2e12b6cd80 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -293,7 +293,8 @@ impl SharedBoundsAccumulator { #[cfg(debug_assertions)] { // Sort predicates for consistent ordering in debug builds - column_predicates.sort_by_cached_key(|expr| format!("{}", fmt_sql(expr.as_ref()))); + column_predicates + .sort_by_cached_key(|expr| format!("{}", fmt_sql(expr.as_ref()))); } let partition_predicate = column_predicates .into_iter() From af0e9ea83e3e0adc9f0e460a0d92f7f6a72d41cf Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 19:21:25 -0500 Subject: [PATCH 07/12] un-deprecate to minimize diff --- datafusion/catalog/src/memory/table.rs | 1 - datafusion/catalog/src/table.rs | 3 +-- datafusion/core/src/datasource/listing/table.rs | 7 ------- datafusion/core/src/datasource/memory_test.rs | 4 ---- datafusion/core/tests/parquet/file_statistics.rs | 8 -------- .../core/tests/physical_optimizer/partition_statistics.rs | 1 - datafusion/ffi/src/table_provider.rs | 1 - 7 files changed, 1 insertion(+), 24 deletions(-) diff --git a/datafusion/catalog/src/memory/table.rs b/datafusion/catalog/src/memory/table.rs index 32e25ba5be45..90224f6a37bc 100644 --- a/datafusion/catalog/src/memory/table.rs +++ b/datafusion/catalog/src/memory/table.rs @@ -138,7 +138,6 @@ impl MemTable { ) -> Result { let schema = t.schema(); let constraints = t.constraints(); - #[expect(deprecated)] let exec = t.scan(state, None, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index 36dedba54bdd..c553c2998bda 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -164,7 +164,6 @@ pub trait TableProvider: Debug + Sync + Send { /// because inexact filters do not guarantee that every filtered row is /// removed, so applying the limit could lead to too few rows being available /// to return as a final result. - #[deprecated(since = "50.0.0", note = "Use `scan_with_args` instead")] async fn scan( &self, state: &dyn Session, @@ -195,7 +194,6 @@ pub trait TableProvider: Debug + Sync + Send { TableProviderFilterPushDown::Exact => None, }) .collect_vec(); - #[expect(deprecated)] let plan = self .scan(state, projection.as_ref(), &filters, limit) .await?; @@ -381,6 +379,7 @@ pub struct ScanResult { /// The ExecutionPlan to run. plan: Arc, // Remaining filters that were not completely evaluated during `scan_with_args()`. + // These were previously referred to as "unsupported filters" or "inexact filters". filters: Vec, } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 6dff7e4ccea5..94569e2130df 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1696,7 +1696,6 @@ mod tests { let table = load_table(&ctx, "alltypes_plain.parquet").await?; let projection = None; - #[expect(deprecated)] let exec = table .scan(&ctx.state(), projection, &[], None) .await @@ -1834,7 +1833,6 @@ mod tests { // this will filter out the only file in the store let filter = Expr::not_eq(col("p1"), lit("v1")); - #[expect(deprecated)] let scan = table .scan(&ctx.state(), None, &[filter], None) .await @@ -2735,7 +2733,6 @@ mod tests { .with_schema(schema_default); let table_default = ListingTable::try_new(config_default)?; - #[expect(deprecated)] let exec_default = table_default.scan(&state, None, &[], None).await?; assert_eq!( exec_default.partition_statistics(None)?.num_rows, @@ -2757,7 +2754,6 @@ mod tests { .with_schema(schema_disabled); let table_disabled = ListingTable::try_new(config_disabled)?; - #[expect(deprecated)] let exec_disabled = table_disabled.scan(&state, None, &[], None).await?; assert_eq!( exec_disabled.partition_statistics(None)?.num_rows, @@ -2777,7 +2773,6 @@ mod tests { .with_schema(schema_enabled); let table_enabled = ListingTable::try_new(config_enabled)?; - #[expect(deprecated)] let exec_enabled = table_enabled.scan(&state, None, &[], None).await?; assert_eq!( exec_enabled.partition_statistics(None)?.num_rows, @@ -2884,7 +2879,6 @@ mod tests { assert!(table.schema_adapter_factory().is_none()); // The scan should work correctly with the default schema adapter - #[expect(deprecated)] let scan_result = table.scan(&ctx.state(), None, &[], None).await; assert!( scan_result.is_ok(), @@ -2921,7 +2915,6 @@ mod tests { )?; // The error should bubble up from the scan operation when schema mapping fails - #[expect(deprecated)] let scan_result = table.scan(&ctx.state(), None, &[], None).await; assert!(scan_result.is_err()); diff --git a/datafusion/core/src/datasource/memory_test.rs b/datafusion/core/src/datasource/memory_test.rs index b7b80b4a9b11..c16837c73b4f 100644 --- a/datafusion/core/src/datasource/memory_test.rs +++ b/datafusion/core/src/datasource/memory_test.rs @@ -59,7 +59,6 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; // scan with projection - #[expect(deprecated)] let exec = provider .scan(&session_ctx.state(), Some(&vec![2, 1]), &[], None) .await?; @@ -95,7 +94,6 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; - #[expect(deprecated)] let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; @@ -128,7 +126,6 @@ mod tests { let projection: Vec = vec![0, 4]; - #[expect(deprecated)] match provider .scan(&session_ctx.state(), Some(&projection), &[], None) .await @@ -257,7 +254,6 @@ mod tests { let provider = MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; - #[expect(deprecated)] let exec = provider.scan(&session_ctx.state(), None, &[], None).await?; let mut it = exec.execute(0, task_ctx)?; let batch1 = it.next().await.unwrap()?; diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index d9b9775cd689..64ee92eda254 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -60,7 +60,6 @@ async fn check_stats_precision_with_filter_pushdown() { options.execution.parquet.pushdown_filters = true; // Scan without filter, stats are exact - #[expect(deprecated)] let exec = table.scan(&state, None, &[], None).await.unwrap(); assert_eq!( exec.partition_statistics(None).unwrap().num_rows, @@ -72,7 +71,6 @@ async fn check_stats_precision_with_filter_pushdown() { // (it is not a partition filter). Therefore; it will be pushed down to the // source operator after the appropriate optimizer pass. let filter_expr = Expr::gt(col("id"), lit(1)); - #[expect(deprecated)] let exec_with_filter = table .scan(&state, None, std::slice::from_ref(&filter_expr), None) .await @@ -121,7 +119,6 @@ async fn load_table_stats_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_static_cache_size(&state1), 0); - #[expect(deprecated)] let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); assert_eq!( @@ -138,7 +135,6 @@ async fn load_table_stats_with_session_level_cache() { //Session 2 first time list files //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state2), 0); - #[expect(deprecated)] let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); assert_eq!( exec2.partition_statistics(None).unwrap().num_rows, @@ -154,7 +150,6 @@ async fn load_table_stats_with_session_level_cache() { //Session 1 second time list files //check session 1 cache result not show in session 2 assert_eq!(get_static_cache_size(&state1), 1); - #[expect(deprecated)] let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); assert_eq!( exec3.partition_statistics(None).unwrap().num_rows, @@ -200,7 +195,6 @@ async fn list_files_with_session_level_cache() { //Session 1 first time list files assert_eq!(get_list_file_cache_size(&state1), 0); - #[expect(deprecated)] let exec1 = table1.scan(&state1, None, &[], None).await.unwrap(); let data_source_exec = exec1.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); @@ -217,7 +211,6 @@ async fn list_files_with_session_level_cache() { //Session 2 first time list files //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state2), 0); - #[expect(deprecated)] let exec2 = table2.scan(&state2, None, &[], None).await.unwrap(); let data_source_exec = exec2.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); @@ -234,7 +227,6 @@ async fn list_files_with_session_level_cache() { //Session 1 second time list files //check session 1 cache result not show in session 2 assert_eq!(get_list_file_cache_size(&state1), 1); - #[expect(deprecated)] let exec3 = table1.scan(&state1, None, &[], None).await.unwrap(); let data_source_exec = exec3.as_any().downcast_ref::().unwrap(); let data_source = data_source_exec.data_source(); diff --git a/datafusion/core/tests/physical_optimizer/partition_statistics.rs b/datafusion/core/tests/physical_optimizer/partition_statistics.rs index cc9d852cd1f4..df1032e0652e 100644 --- a/datafusion/core/tests/physical_optimizer/partition_statistics.rs +++ b/datafusion/core/tests/physical_optimizer/partition_statistics.rs @@ -104,7 +104,6 @@ mod test { .downcast_ref::() .unwrap() .clone(); - #[expect(deprecated)] listing_table .scan(&ctx.state(), None, &[], None) .await diff --git a/datafusion/ffi/src/table_provider.rs b/datafusion/ffi/src/table_provider.rs index c11381e5b12b..890511997a70 100644 --- a/datafusion/ffi/src/table_provider.rs +++ b/datafusion/ffi/src/table_provider.rs @@ -260,7 +260,6 @@ unsafe extern "C" fn scan_fn_wrapper( let projections: Vec<_> = projections.into_iter().collect(); - #[expect(deprecated)] let plan = rresult_return!( internal_provider .scan(&ctx.state(), Some(&projections), &filters, limit.into()) From ceaa07d807bf7237f510892c5029697679625b7f Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Thu, 21 Aug 2025 19:45:36 -0500 Subject: [PATCH 08/12] fix? --- .../core/src/datasource/listing/table.rs | 33 ++++++++++++++++--- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 94569e2130df..831f55c8cf23 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -32,7 +32,9 @@ use async_trait::async_trait; use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::{ config_datafusion_err, config_err, internal_err, plan_err, project_schema, - stats::Precision, Constraints, DataFusionError, Result, SchemaExt, + stats::Precision, + tree_node::{TreeNodeContainer, TreeNodeRecursion}, + Constraints, DataFusionError, Result, SchemaExt, }; use datafusion_datasource::{ compute_all_files_statistics, @@ -1221,10 +1223,31 @@ impl TableProvider for ListingTable { let known_file_ordering = self.try_create_output_ordering()?; let desired_file_ordering = match args.preferred_ordering() { Some(ordering) if !ordering.is_empty() => { - // Prefer the ordering requested by the query to any inherint file ordering - create_ordering(&self.table_schema, &[ordering.to_vec()])? - .first() - .cloned() + // Prefer the ordering requested by the query to any natural file ordering. + // We'll try to re-order the file reads to match the requested ordering as best we can using statistics. + // Whatever the result is, it's likely better than a natural file ordering that doesn't match the query's ordering. + // But we can only do this if the query's ordering is a simple ordering of columns (no expressions). + let can_use_preferred_ordering = ordering.iter().all(|sort_expr| { + let mut contains_only_columns = true; + sort_expr + .apply_elements(|e| { + if !matches!(e, Expr::Column(_)) { + contains_only_columns = false; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }) + .expect("infallible closure cannot fail"); + contains_only_columns + }); + if can_use_preferred_ordering { + create_ordering(&self.table_schema, &[ordering.to_vec()])? + .first() + .cloned() + } else { + known_file_ordering.first().cloned() + } } Some(_) | None => { // If the query did not request a specific ordering, fall back to any inherent file ordering From 2b7bf6dd8d8d3ae132542213b73ef0c4de3625af Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 24 Aug 2025 23:01:46 -0500 Subject: [PATCH 09/12] wip on tiered ordering of files --- .../core/src/datasource/listing/table.rs | 191 ++++-- datafusion/datasource/src/file_scan_config.rs | 615 ++++++++++++++++++ 2 files changed, 743 insertions(+), 63 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 831f55c8cf23..401756a3a754 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -40,7 +40,7 @@ use datafusion_datasource::{ compute_all_files_statistics, file::FileSource, file_groups::FileGroup, - file_scan_config::{FileScanConfig, FileScanConfigBuilder}, + file_scan_config::{FileGroupPartitioning, FileScanConfig, FileScanConfigBuilder}, schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory}, }; use datafusion_execution::{ @@ -1131,6 +1131,124 @@ impl ListingTable { fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) } + + /// Checks if the requested ordering can be satisfied using file statistics. + /// + /// Only simple column references (not expressions) can be used for file ordering + /// because statistics are typically available only at the column level. + fn can_use_ordering_from_statistics(&self, ordering: &[SortExpr]) -> bool { + ordering.iter().all(|sort_expr| { + // Check if sort expression contains only simple column references + let mut is_simple_column = true; + let _ = sort_expr.apply_elements(|e| { + if !matches!(e, Expr::Column(_)) { + is_simple_column = false; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }); + is_simple_column + }) + } + + /// Resolves the desired file ordering based on query requirements and natural ordering. + /// + /// This method prioritizes query-requested ordering if it can be satisfied using statistics, + /// otherwise falls back to any natural file ordering defined in the table configuration. + fn resolve_desired_ordering( + &self, + requested_ordering: Option<&[SortExpr]>, + ) -> Result> { + // Check if query requested specific ordering that we can use + if let Some(ordering) = requested_ordering { + if !ordering.is_empty() && self.can_use_ordering_from_statistics(ordering) { + return create_ordering(&self.table_schema, &[ordering.to_vec()]) + .map(|orderings| orderings.first().cloned()); + } + } + + // Fall back to natural file ordering if any + self.try_create_output_ordering() + .map(|orderings| orderings.first().cloned()) + } + + /// Determines the optimal file grouping and ordering strategy. + /// + /// This method orchestrates the file grouping process by: + /// 1. Resolving the desired ordering (query-requested vs natural) + /// 2. Applying statistics-based splitting if enabled and available + /// 3. Returning both the file groups and any output ordering that can be guaranteed + /// + /// # Arguments + /// * `state` - The session state containing configuration options + /// * `partitioned_file_lists` - Original file groups to potentially reorganize + /// * `requested_ordering` - Ordering requested by the query, if any + /// + /// # Returns + /// A tuple of (file_groups, optional_output_ordering) where: + /// - file_groups: The optimized file group arrangement + /// - optional_output_ordering: Output ordering that can be guaranteed (if any) + fn determine_file_groups_and_ordering( + &self, + state: &dyn Session, + partitioned_file_lists: Vec, + requested_ordering: Option<&[SortExpr]>, + ) -> Result<(Vec, Option>)> { + // 1. Determine desired ordering (query-requested vs natural) + let desired_ordering = self.resolve_desired_ordering(requested_ordering)?; + + // 2. Check if statistics-based splitting is enabled + if !state + .config_options() + .execution + .split_file_groups_by_statistics + { + return Ok((partitioned_file_lists, desired_ordering.map(|o| vec![o]))); + } + + // 3. Apply statistics-based splitting if we have an ordering requirement + let Some(ordering) = desired_ordering else { + // No ordering requirement, keep original groups + return Ok((partitioned_file_lists, None)); + }; + + match FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &self.table_schema, + &partitioned_file_lists, + &ordering, + self.options.target_partitions, + ) { + Ok(FileGroupPartitioning::TotalOrder(groups)) => { + // Files don't overlap - can guarantee output ordering + log::debug!( + "Files arranged in total order across {} partitions", + groups.len() + ); + Ok((groups, Some(vec![ordering]))) + } + Ok(FileGroupPartitioning::PartialOrder(groups)) => { + // Files overlap but are ordered within partitions + log::debug!( + "Files arranged in partial order across {} partitions", + groups.len() + ); + Ok((groups, None)) + } + Ok(FileGroupPartitioning::Unordered(groups)) => { + // No statistics available, files ordered by path + log::debug!( + "Files arranged by path across {} partitions (no statistics)", + groups.len() + ); + Ok((groups, None)) + } + Err(e) => { + log::debug!("Failed to split file groups by statistics: {e}"); + Ok((partitioned_file_lists, None)) + } + } + } } // Expressions can be used for parttion pruning if they can be evaluated using @@ -1207,7 +1325,7 @@ impl TableProvider for ListingTable { // at the same time. This is because the limit should be applied after the filters are applied. let statistic_file_limit = if filters.is_empty() { limit } else { None }; - let (mut partitioned_file_lists, statistics) = self + let (partitioned_file_lists, statistics) = self .list_files_for_scan(state, &partition_filters, statistic_file_limit) .await?; @@ -1220,66 +1338,13 @@ impl TableProvider for ListingTable { )); } - let known_file_ordering = self.try_create_output_ordering()?; - let desired_file_ordering = match args.preferred_ordering() { - Some(ordering) if !ordering.is_empty() => { - // Prefer the ordering requested by the query to any natural file ordering. - // We'll try to re-order the file reads to match the requested ordering as best we can using statistics. - // Whatever the result is, it's likely better than a natural file ordering that doesn't match the query's ordering. - // But we can only do this if the query's ordering is a simple ordering of columns (no expressions). - let can_use_preferred_ordering = ordering.iter().all(|sort_expr| { - let mut contains_only_columns = true; - sort_expr - .apply_elements(|e| { - if !matches!(e, Expr::Column(_)) { - contains_only_columns = false; - Ok(TreeNodeRecursion::Stop) - } else { - Ok(TreeNodeRecursion::Continue) - } - }) - .expect("infallible closure cannot fail"); - contains_only_columns - }); - if can_use_preferred_ordering { - create_ordering(&self.table_schema, &[ordering.to_vec()])? - .first() - .cloned() - } else { - known_file_ordering.first().cloned() - } - } - Some(_) | None => { - // If the query did not request a specific ordering, fall back to any inherent file ordering - known_file_ordering.first().cloned() - } - }; - match state - .config_options() - .execution - .split_file_groups_by_statistics - .then(|| { - desired_file_ordering.map(|ordering| { - FileScanConfig::split_groups_by_statistics_with_target_partitions( - &self.table_schema, - &partitioned_file_lists, - &ordering, - self.options.target_partitions, - ) - }) - }) - .flatten() - { - Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), - Some(Ok(new_groups)) => { - if new_groups.len() <= self.options.target_partitions { - partitioned_file_lists = new_groups; - } else { - log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") - } - } - None => {} // no ordering required - }; + // Determine optimal file grouping and ordering strategy + let (partitioned_file_lists, output_ordering) = self + .determine_file_groups_and_ordering( + state, + partitioned_file_lists, + args.preferred_ordering(), + )?; let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) @@ -1308,7 +1373,7 @@ impl TableProvider for ListingTable { .with_statistics(statistics) .with_projection(projection) .with_limit(limit) - .with_output_ordering(known_file_ordering) + .with_output_ordering(output_ordering.unwrap_or_default()) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) .build(), diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7088f811bbce..003f8cd40a6b 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -43,6 +43,7 @@ use arrow::{ }, buffer::Buffer, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, + row::Row, }; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -69,6 +70,41 @@ use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; use log::{debug, warn}; +/// Result of file group partitioning that indicates the ordering properties +/// of the resulting file groups. +#[derive(Debug, Clone)] +pub enum FileGroupPartitioning { + /// Files are globally ordered and non-overlapping across all partitions. + /// This means the entire scan result maintains the requested sort order. + TotalOrder(Vec), + /// Files are ordered within partitions but may overlap across partitions. + /// Individual partitions maintain sort order but global ordering is not guaranteed. + PartialOrder(Vec), + /// Files have no specific ordering guarantees. + Unordered(Vec), +} + +impl FileGroupPartitioning { + /// Extract the file groups regardless of their ordering properties. + pub fn file_groups(self) -> Vec { + match self { + Self::TotalOrder(groups) + | Self::PartialOrder(groups) + | Self::Unordered(groups) => groups, + } + } + + /// Returns true if the file groups maintain total ordering across all partitions. + pub fn is_total_order(&self) -> bool { + matches!(self, Self::TotalOrder(_)) + } + + /// Returns true if the file groups have some ordering (partial or total). + pub fn is_ordered(&self) -> bool { + matches!(self, Self::TotalOrder(_) | Self::PartialOrder(_)) + } +} + /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. /// @@ -890,6 +926,216 @@ impl FileScanConfig { .collect()) } + /// Distributes sorted files evenly across the target number of partitions using round-robin. + /// + /// This helper function takes pre-sorted files and distributes them across `target_partitions` + /// partitions in a round-robin fashion, which maintains the relative ordering within each + /// partition while ensuring even distribution. + /// + /// # Arguments + /// * `sorted_files` - Files that have already been sorted (by statistics or path) + /// * `target_partitions` - The desired number of output partitions + /// + /// # Returns + /// A vector of file groups, one per partition, with files distributed evenly + fn distribute_sorted_files_evenly( + sorted_files: Vec, + target_partitions: usize, + ) -> Vec { + if sorted_files.is_empty() || target_partitions == 0 { + return vec![]; + } + + let mut groups: Vec> = vec![vec![]; target_partitions]; + + // Round-robin distribution maintaining order + for (i, file) in sorted_files.into_iter().enumerate() { + groups[i % target_partitions].push(file); + } + + // Convert to FileGroups, filtering out empty groups + groups + .into_iter() + .filter(|g| !g.is_empty()) + .map(FileGroup::new) + .collect() + } + + /// Splits file groups by statistics with overlap handling, implementing a three-tier strategy. + /// + /// This method attempts to arrange files in the optimal order for query execution while + /// respecting the target number of partitions. It uses a fallback strategy: + /// + /// 1. **Tier 1 (TotalOrder)**: Try to arrange files as non-overlapping and ordered + /// 2. **Tier 2 (PartialOrder)**: If files overlap, distribute them evenly while maintaining order + /// 3. **Tier 3 (Unordered)**: If no statistics, sort by file path and distribute + /// + /// # Arguments + /// * `table_schema` - Schema of the table for statistics extraction + /// * `file_groups` - Original file groups to split + /// * `sort_order` - The desired lexicographical ordering + /// * `target_partitions` - The desired number of output partitions + /// + /// # Returns + /// A `FileGroupPartitioning` enum indicating the achieved ordering and containing the file groups + pub fn split_groups_by_statistics_with_overlap_handling( + table_schema: &SchemaRef, + file_groups: &[FileGroup], + sort_order: &LexOrdering, + target_partitions: usize, + ) -> Result { + if target_partitions == 0 { + return Err(DataFusionError::Internal( + "target_partitions must be greater than 0".to_string(), + )); + } + + let flattened_files: Vec<_> = + file_groups.iter().flat_map(FileGroup::iter).collect(); + + if flattened_files.is_empty() { + return Ok(FileGroupPartitioning::Unordered(vec![])); + } + + // Try to get statistics - if this fails, use Tier 3 (path-based ordering) + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + ) { + Ok(stats) => stats, + Err(e) => { + debug!("Unable to use statistics for file ordering: {e}, falling back to path-based ordering"); + return Ok(Self::arrange_files_by_path( + &flattened_files, + target_partitions, + )); + } + }; + + let indices_sorted_by_min = statistics.min_values_sorted(); + + // Try Tier 1: Non-overlapping arrangement + if let Some(groups) = Self::try_arrange_files_non_overlapping( + &flattened_files, + &statistics, + &indices_sorted_by_min, + target_partitions, + ) { + debug!( + "Successfully arranged {} files in {} non-overlapping groups", + flattened_files.len(), + groups.len() + ); + return Ok(FileGroupPartitioning::TotalOrder(groups)); + } + + // Tier 2: Overlapping arrangement with statistics-based ordering + debug!("Files have overlapping statistics, using overlapping arrangement with {} target partitions", + target_partitions); + let groups = Self::arrange_files_with_overlap( + &flattened_files, + &indices_sorted_by_min, + target_partitions, + ); + debug!( + "Arranged {} files in {} groups with statistics-based ordering (overlapping)", + flattened_files.len(), + groups.len() + ); + Ok(FileGroupPartitioning::PartialOrder(groups)) + } + + /// Tier 1: Attempts to arrange files without overlap across partitions. + /// Returns Some(groups) if successful, None if files have overlapping statistics. + fn try_arrange_files_non_overlapping( + flattened_files: &[&PartitionedFile], + statistics: &MinMaxStatistics, + indices_sorted_by_min: &[(usize, Row<'_>)], + target_partitions: usize, + ) -> Option> { + // Initialize with target_partitions empty groups + let mut file_groups_indices: Vec> = vec![vec![]; target_partitions]; + + for (idx, min) in indices_sorted_by_min.iter().copied() { + if let Some((_, group)) = file_groups_indices + .iter_mut() + .enumerate() + .filter(|(_, group)| { + group.is_empty() + || min + > statistics + .max(*group.last().expect("groups should not be empty")) + }) + .min_by_key(|(_, group)| group.len()) + { + group.push(idx); + } else { + // No existing group can fit this file without overlap + return None; + } + } + + // Remove any empty groups + file_groups_indices.retain(|group| !group.is_empty()); + + // Success: files are non-overlapping and fit in target partitions + let groups = file_groups_indices + .into_iter() + .map(|file_group_indices| { + FileGroup::new( + file_group_indices + .into_iter() + .map(|idx| flattened_files[idx].clone()) + .collect(), + ) + }) + .collect(); + + Some(groups) + } + + /// Tier 2: Arranges files with overlapping statistics by distributing them evenly + /// across partitions while maintaining statistical ordering within each partition. + fn arrange_files_with_overlap( + flattened_files: &[&PartitionedFile], + indices_sorted_by_min: &[(usize, Row<'_>)], + target_partitions: usize, + ) -> Vec { + let sorted_files: Vec<_> = indices_sorted_by_min + .iter() + .map(|(idx, _)| flattened_files[*idx].clone()) + .collect(); + + Self::distribute_sorted_files_evenly(sorted_files, target_partitions) + } + + /// Tier 3: Arranges files without statistics by sorting them by path and distributing evenly. + fn arrange_files_by_path( + flattened_files: &[&PartitionedFile], + target_partitions: usize, + ) -> FileGroupPartitioning { + let mut files_with_paths: Vec<_> = flattened_files + .iter() + .map(|f| (f.path().to_string(), (*f).clone())) + .collect(); + + files_with_paths.sort_by(|a, b| a.0.cmp(&b.0)); + + let sorted_files: Vec<_> = + files_with_paths.into_iter().map(|(_, file)| file).collect(); + + let groups = + Self::distribute_sorted_files_evenly(sorted_files, target_partitions); + debug!( + "Arranged {} files in {} groups ordered by file path (no statistics)", + flattened_files.len(), + groups.len() + ); + FileGroupPartitioning::Unordered(groups) + } + /// Attempts to do a bin-packing on files into file groups, such that any two files /// in a file group are ordered and non-overlapping with respect to their statistics. /// It will produce the smallest number of file groups possible. @@ -2429,4 +2675,373 @@ mod tests { Ok(()) } + + #[test] + fn test_try_arrange_files_non_overlapping() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = [col("value").sort(true, false)]; + let sort_ordering = sort_expr + .map(|expr| { + create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap() + }) + .into(); + + // Test case 1: Non-overlapping files should succeed + let non_overlapping_files = generate_test_files(4, 0.0); + let flattened_files: Vec<_> = non_overlapping_files + .iter() + .flat_map(FileGroup::iter) + .collect(); + + let statistics = MinMaxStatistics::new_from_files( + &sort_ordering, + &schema, + None, + flattened_files.iter().copied(), + )?; + let indices_sorted_by_min = statistics.min_values_sorted(); + + let result = FileScanConfig::try_arrange_files_non_overlapping( + &flattened_files, + &statistics, + &indices_sorted_by_min, + 3, + ); + + assert!(result.is_some(), "Non-overlapping files should succeed"); + let groups = result.unwrap(); + assert!(groups.len() <= 3, "Should not exceed target partitions"); + + // Verify total file count is preserved + let total_files: usize = groups.iter().map(FileGroup::len).sum(); + assert_eq!(total_files, flattened_files.len()); + + // Test case 2: Overlapping files should fail + let overlapping_files = generate_test_files(8, 0.9); // High overlap + let flattened_overlapping: Vec<_> = + overlapping_files.iter().flat_map(FileGroup::iter).collect(); + + let overlapping_stats = MinMaxStatistics::new_from_files( + &sort_ordering, + &schema, + None, + flattened_overlapping.iter().copied(), + )?; + let overlapping_indices = overlapping_stats.min_values_sorted(); + + let overlapping_result = FileScanConfig::try_arrange_files_non_overlapping( + &flattened_overlapping, + &overlapping_stats, + &overlapping_indices, + 3, + ); + + // This might succeed or fail depending on the specific overlap pattern + match overlapping_result { + Some(groups) => { + println!( + "✓ Overlapping files surprisingly fit in non-overlapping arrangement" + ); + assert!(groups.len() <= 3); + } + None => { + println!("✓ Overlapping files correctly rejected by non-overlapping arrangement"); + } + } + + println!("✓ try_arrange_files_non_overlapping tests passed"); + Ok(()) + } + + #[test] + fn test_arrange_files_with_overlap() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = [col("value").sort(true, false)]; + let sort_ordering = sort_expr + .map(|expr| { + create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap() + }) + .into(); + + // Test with overlapping files + let overlapping_files = generate_test_files(10, 0.8); + let flattened_files: Vec<_> = + overlapping_files.iter().flat_map(FileGroup::iter).collect(); + + let statistics = MinMaxStatistics::new_from_files( + &sort_ordering, + &schema, + None, + flattened_files.iter().copied(), + )?; + let indices_sorted_by_min = statistics.min_values_sorted(); + + let target_partitions = 3; + let result = FileScanConfig::arrange_files_with_overlap( + &flattened_files, + &indices_sorted_by_min, + target_partitions, + ); + + // Should create exactly target_partitions groups + assert_eq!(result.len(), target_partitions); + + // Verify total file count is preserved + let total_files: usize = result.iter().map(FileGroup::len).sum(); + assert_eq!(total_files, flattened_files.len()); + + // Verify files are distributed somewhat evenly + let group_sizes: Vec = result.iter().map(FileGroup::len).collect(); + let max_size = *group_sizes.iter().max().unwrap(); + let min_size = *group_sizes.iter().min().unwrap(); + let size_diff = max_size - min_size; + + // Difference should not be more than 1 (round-robin distribution) + assert!( + size_diff <= 1, + "Files should be distributed evenly, got sizes: {:?}", + group_sizes + ); + + println!("✓ arrange_files_with_overlap tests passed"); + Ok(()) + } + + #[test] + fn test_arrange_files_by_path() { + use crate::PartitionedFile; + use object_store::path::Path; + + // Create files with specific names to test path sorting + let files = vec![ + PartitionedFile::new(Path::from("file_c.parquet"), 1000), + PartitionedFile::new(Path::from("file_a.parquet"), 2000), + PartitionedFile::new(Path::from("file_b.parquet"), 1500), + ]; + + let file_refs: Vec<_> = files.iter().collect(); + let target_partitions = 2; + + let result = FileScanConfig::arrange_files_by_path(&file_refs, target_partitions); + + match result { + FileGroupPartitioning::Unordered(groups) => { + // Should not exceed target partitions + assert!(groups.len() <= target_partitions); + + // Collect all files in order they appear in groups + let all_files: Vec<_> = groups + .iter() + .flat_map(FileGroup::iter) + .map(|f| f.path().to_string()) + .collect(); + + // The files are sorted before distribution, but round-robin distribution + // means the global order may not be maintained across partitions. + // Instead, verify that all original files are present. + let mut sorted_all_files = all_files.clone(); + sorted_all_files.sort(); + let expected_files = vec![ + "file_a.parquet".to_string(), + "file_b.parquet".to_string(), + "file_c.parquet".to_string(), + ]; + assert_eq!( + sorted_all_files, expected_files, + "All files should be present" + ); + + // Verify that within each partition, files maintain relative order if they exist + for group in &groups { + let group_files: Vec<_> = + group.iter().map(|f| f.path().to_string()).collect(); + let mut sorted_group_files = group_files.clone(); + sorted_group_files.sort(); + assert_eq!( + group_files, sorted_group_files, + "Files within each partition should be sorted" + ); + } + + // Verify total count + assert_eq!(all_files.len(), files.len()); + + println!("✓ arrange_files_by_path correctly sorted files by path"); + } + _ => panic!("Expected Unordered result"), + } + + println!("✓ arrange_files_by_path tests passed"); + } + + #[test] + fn test_split_groups_by_statistics_with_overlap_handling_integration() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = [col("value").sort(true, false)]; + let sort_ordering = sort_expr + .map(|expr| { + create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap() + }) + .into(); + + let target_partitions = 3; + + // Test case 1: Non-overlapping files (Tier 1) + let non_overlapping_files = generate_test_files(5, 0.0); + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &non_overlapping_files, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::TotalOrder(groups) => { + println!("✓ Non-overlapping files -> TotalOrder"); + assert!(groups.len() <= target_partitions); + } + _ => panic!("Expected TotalOrder for non-overlapping files"), + } + + // Test case 2: Overlapping files (Tier 2) + let overlapping_files = generate_test_files(10, 0.8); + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &overlapping_files, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::PartialOrder(groups) => { + println!("✓ Overlapping files -> PartialOrder"); + assert_eq!(groups.len(), target_partitions); + } + FileGroupPartitioning::TotalOrder(groups) => { + println!("✓ Overlapping files fit in TotalOrder"); + assert!(groups.len() <= target_partitions); + } + _ => panic!("Expected PartialOrder or TotalOrder for files with statistics"), + } + + // Test case 3: Files without statistics (Tier 3) + use crate::PartitionedFile; + use object_store::path::Path; + + let files_no_stats = vec![FileGroup::new(vec![ + PartitionedFile::new(Path::from("file1.parquet"), 1000), + PartitionedFile::new(Path::from("file2.parquet"), 2000), + PartitionedFile::new(Path::from("file3.parquet"), 1500), + ])]; + + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &files_no_stats, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::Unordered(groups) => { + println!("✓ Files without statistics -> Unordered"); + assert!(groups.len() <= target_partitions); + } + _ => panic!("Expected Unordered for files without statistics"), + } + + // Test case 4: Empty file groups + let empty_groups: Vec = vec![]; + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &empty_groups, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::Unordered(groups) => { + assert!(groups.is_empty()); + println!("✓ Empty file groups handled correctly"); + } + _ => panic!("Expected empty Unordered for empty file groups"), + } + + // Test case 5: Zero target partitions + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &non_overlapping_files, + &sort_ordering, + 0, + ); + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("target_partitions must be greater than 0")); + + println!("✓ Integration tests passed for split_groups_by_statistics_with_overlap_handling"); + Ok(()) + } + + #[test] + fn test_file_group_partitioning_methods() { + // Test the enum helper methods + let groups = vec![FileGroup::new(vec![]), FileGroup::new(vec![])]; + + let total_order = FileGroupPartitioning::TotalOrder(groups.clone()); + let partial_order = FileGroupPartitioning::PartialOrder(groups.clone()); + let unordered = FileGroupPartitioning::Unordered(groups.clone()); + + // Test is_total_order + assert!(total_order.is_total_order()); + assert!(!partial_order.is_total_order()); + assert!(!unordered.is_total_order()); + + // Test is_ordered + assert!(total_order.is_ordered()); + assert!(partial_order.is_ordered()); + assert!(!unordered.is_ordered()); + + // Test file_groups extraction + let extracted_total = total_order.file_groups(); + let extracted_partial = partial_order.file_groups(); + let extracted_unordered = unordered.file_groups(); + + assert_eq!(extracted_total.len(), 2); + assert_eq!(extracted_partial.len(), 2); + assert_eq!(extracted_unordered.len(), 2); + + println!("✓ FileGroupPartitioning helper methods work correctly"); + } } From c352b07c8a1883f3bec4120b657534b936e1aa52 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 24 Aug 2025 23:43:05 -0500 Subject: [PATCH 10/12] tests passing --- .../core/src/datasource/listing/table.rs | 93 ++++++++++++++----- 1 file changed, 72 insertions(+), 21 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 401756a3a754..a7911e6ea09c 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1152,15 +1152,19 @@ impl ListingTable { }) } - /// Resolves the desired file ordering based on query requirements and natural ordering. + /// Resolves the desired file ordering for file arrangement purposes. /// - /// This method prioritizes query-requested ordering if it can be satisfied using statistics, - /// otherwise falls back to any natural file ordering defined in the table configuration. - fn resolve_desired_ordering( + /// This method returns the ordering to use for arranging files optimally, + /// prioritizing query-requested ordering if it can be satisfied using statistics, + /// otherwise using any configured file_sort_order. + /// + /// Note: This is for file arrangement only - output ordering should be determined + /// separately based on whether file_sort_order is configured. + fn resolve_desired_file_arrangement_ordering( &self, requested_ordering: Option<&[SortExpr]>, ) -> Result> { - // Check if query requested specific ordering that we can use + // Check if query requested specific ordering that we can use for file arrangement if let Some(ordering) = requested_ordering { if !ordering.is_empty() && self.can_use_ordering_from_statistics(ordering) { return create_ordering(&self.table_schema, &[ordering.to_vec()]) @@ -1168,17 +1172,42 @@ impl ListingTable { } } - // Fall back to natural file ordering if any + // Fall back to file_sort_order for arrangement if configured self.try_create_output_ordering() .map(|orderings| orderings.first().cloned()) } + /// Gets the output orderings that can be guaranteed by the scan. + /// + /// Only returns orderings if file_sort_order is explicitly configured, + /// as this represents a promise about the physical file layout. + /// Returns all configured equivalent orderings. + /// + /// TODO: For formats like Parquet that store ordering metadata in the file, + /// we could read and use that information instead of requiring explicit configuration. + fn get_guaranteed_output_orderings(&self) -> Result>> { + if self.options.file_sort_order.is_empty() { + // No explicit file sort order configured + Ok(None) + } else { + // Return all configured file sort orderings + self.try_create_output_ordering().map(|orderings| { + if orderings.is_empty() { + None + } else { + Some(orderings) + } + }) + } + } + /// Determines the optimal file grouping and ordering strategy. /// /// This method orchestrates the file grouping process by: - /// 1. Resolving the desired ordering (query-requested vs natural) - /// 2. Applying statistics-based splitting if enabled and available - /// 3. Returning both the file groups and any output ordering that can be guaranteed + /// 1. Resolving the desired ordering for file arrangement (query-requested vs file_sort_order) + /// 2. Applying statistics-based splitting if enabled and available + /// 3. Determining output ordering separately based on file_sort_order configuration + /// 4. Returning both the optimized file groups and any guaranteed output ordering /// /// # Arguments /// * `state` - The session state containing configuration options @@ -1188,15 +1217,16 @@ impl ListingTable { /// # Returns /// A tuple of (file_groups, optional_output_ordering) where: /// - file_groups: The optimized file group arrangement - /// - optional_output_ordering: Output ordering that can be guaranteed (if any) + /// - optional_output_ordering: Output ordering guaranteed by scan (only if file_sort_order configured) fn determine_file_groups_and_ordering( &self, state: &dyn Session, partitioned_file_lists: Vec, requested_ordering: Option<&[SortExpr]>, ) -> Result<(Vec, Option>)> { - // 1. Determine desired ordering (query-requested vs natural) - let desired_ordering = self.resolve_desired_ordering(requested_ordering)?; + // 1. Determine desired ordering for file arrangement (query-requested vs configured) + let arrangement_ordering = + self.resolve_desired_file_arrangement_ordering(requested_ordering)?; // 2. Check if statistics-based splitting is enabled if !state @@ -1204,13 +1234,16 @@ impl ListingTable { .execution .split_file_groups_by_statistics { - return Ok((partitioned_file_lists, desired_ordering.map(|o| vec![o]))); + // No statistics-based splitting - return original groups with guaranteed ordering if any + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + return Ok((partitioned_file_lists, guaranteed_orderings)); } - // 3. Apply statistics-based splitting if we have an ordering requirement - let Some(ordering) = desired_ordering else { - // No ordering requirement, keep original groups - return Ok((partitioned_file_lists, None)); + // 3. Apply statistics-based splitting if we have an arrangement ordering requirement + let Some(ordering) = arrangement_ordering else { + // No ordering requirement for arrangement, keep original groups + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + return Ok((partitioned_file_lists, guaranteed_orderings)); }; match FileScanConfig::split_groups_by_statistics_with_overlap_handling( @@ -1220,15 +1253,31 @@ impl ListingTable { self.options.target_partitions, ) { Ok(FileGroupPartitioning::TotalOrder(groups)) => { - // Files don't overlap - can guarantee output ordering + // Files don't overlap and are arranged in total order log::debug!( "Files arranged in total order across {} partitions", groups.len() ); - Ok((groups, Some(vec![ordering]))) + + // Only guarantee output ordering if file_sort_order is configured + // and the arrangement ordering matches one of the configured orderings + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + let output_orderings = + if let Some(configured_orderings) = &guaranteed_orderings { + // Check if the arrangement ordering matches any of the configured file_sort_orders + if configured_orderings.contains(&ordering) { + Some(configured_orderings.clone()) + } else { + None // Arrangement was for query optimization, not scan output + } + } else { + None // No file_sort_order configured + }; + + Ok((groups, output_orderings)) } Ok(FileGroupPartitioning::PartialOrder(groups)) => { - // Files overlap but are ordered within partitions + // Files overlap but are ordered within partitions - cannot guarantee total ordering log::debug!( "Files arranged in partial order across {} partitions", groups.len() @@ -1245,7 +1294,9 @@ impl ListingTable { } Err(e) => { log::debug!("Failed to split file groups by statistics: {e}"); - Ok((partitioned_file_lists, None)) + // Fallback to original groups, but still check for guaranteed ordering + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + Ok((partitioned_file_lists, guaranteed_orderings)) } } } From ebc950a98694c116fde5b41fd12521fe33d1ccb9 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Sun, 24 Aug 2025 23:54:17 -0500 Subject: [PATCH 11/12] lint, fix --- datafusion/datasource/src/file_scan_config.rs | 3 +-- .../sqllogictest/test_files/parquet_sorted_statistics.slt | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 003f8cd40a6b..01aa308ab071 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -1032,8 +1032,7 @@ impl FileScanConfig { } // Tier 2: Overlapping arrangement with statistics-based ordering - debug!("Files have overlapping statistics, using overlapping arrangement with {} target partitions", - target_partitions); + debug!("Files have overlapping statistics, using overlapping arrangement with {target_partitions} target partitions"); let groups = Self::arrange_files_with_overlap( &flattened_files, &indices_sorted_by_min, diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index fe909e70ffb0..e675a9a8cd49 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -275,4 +275,4 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST] 02)--SortExec: expr=[constant_col@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], file_type=parquet +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[constant_col], file_type=parquet From 932b36e1a7848a249b8f8658366e89f9759f024b Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Mon, 25 Aug 2025 00:11:02 -0500 Subject: [PATCH 12/12] lint --- datafusion/datasource/src/file_scan_config.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 01aa308ab071..08c7741c4ced 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -2818,8 +2818,7 @@ mod tests { // Difference should not be more than 1 (round-robin distribution) assert!( size_diff <= 1, - "Files should be distributed evenly, got sizes: {:?}", - group_sizes + "Files should be distributed evenly, got sizes: {group_sizes:?}" ); println!("✓ arrange_files_with_overlap tests passed");