diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index ecfe63fa343f..c553c2998bda 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -25,13 +25,14 @@ use arrow::datatypes::SchemaRef; use async_trait::async_trait; use datafusion_common::Result; use datafusion_common::{not_impl_err, Constraints, Statistics}; -use datafusion_expr::Expr; +use datafusion_expr::{Expr, SortExpr}; use datafusion_expr::dml::InsertOp; use datafusion_expr::{ CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType, }; use datafusion_physical_plan::ExecutionPlan; +use itertools::Itertools; /// A table which can be queried and modified. /// @@ -171,6 +172,34 @@ pub trait TableProvider: Debug + Sync + Send { limit: Option, ) -> Result>; + async fn scan_with_args( + &self, + state: &dyn Session, + args: ScanArgs, + ) -> Result { + let ScanArgs { + preferred_ordering: _, + filters, + projection, + limit, + } = args; + let filters = filters.unwrap_or_default(); + let unsupported_filters = self + .supports_filters_pushdown(&filters.iter().collect_vec())? + .into_iter() + .zip(&filters) + .filter_map(|(support, expr)| match support { + TableProviderFilterPushDown::Inexact + | TableProviderFilterPushDown::Unsupported => Some(expr.clone()), + TableProviderFilterPushDown::Exact => None, + }) + .collect_vec(); + let plan = self + .scan(state, projection.as_ref(), &filters, limit) + .await?; + Ok(ScanResult::new(plan, unsupported_filters)) + } + /// Specify if DataFusion should provide filter expressions to the /// TableProvider to apply *during* the scan. /// @@ -299,6 +328,75 @@ pub trait TableProvider: Debug + Sync + Send { } } +#[derive(Debug, Clone, Default)] +pub struct ScanArgs { + preferred_ordering: Option>, + filters: Option>, + projection: Option>, + limit: Option, +} + +impl ScanArgs { + pub fn with_projection(mut self, projection: Option>) -> Self { + self.projection = projection; + self + } + + pub fn projection(&self) -> Option> { + self.projection.clone() + } + + pub fn with_filters(mut self, filters: Option>) -> Self { + self.filters = filters; + self + } + + pub fn filters(&self) -> Option<&[Expr]> { + self.filters.as_deref() + } + + pub fn with_limit(mut self, limit: Option) -> Self { + self.limit = limit; + self + } + + pub fn limit(&self) -> Option { + self.limit + } + + pub fn with_preferred_ordering(mut self, ordering: Option>) -> Self { + self.preferred_ordering = ordering; + self + } + + pub fn preferred_ordering(&self) -> Option<&[SortExpr]> { + self.preferred_ordering.as_deref() + } +} + +#[derive(Debug, Clone)] +pub struct ScanResult { + /// The ExecutionPlan to run. + plan: Arc, + // Remaining filters that were not completely evaluated during `scan_with_args()`. + // These were previously referred to as "unsupported filters" or "inexact filters". + filters: Vec, +} + +impl ScanResult { + pub fn new(plan: Arc, filters: Vec) -> Self { + Self { plan, filters } + } + + pub fn plan(&self) -> Arc { + Arc::clone(&self.plan) + } + + pub fn filters(&self) -> &[Expr] { + &self.filters + } +} + /// A factory which creates [`TableProvider`]s at runtime given a URL. /// /// For example, this can be used to create a table "on the fly" diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d289a1d07129..a7911e6ea09c 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -29,16 +29,18 @@ use crate::{ use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use async_trait::async_trait; -use datafusion_catalog::{Session, TableProvider}; +use datafusion_catalog::{ScanArgs, ScanResult, Session, TableProvider}; use datafusion_common::{ config_datafusion_err, config_err, internal_err, plan_err, project_schema, - stats::Precision, Constraints, DataFusionError, Result, SchemaExt, + stats::Precision, + tree_node::{TreeNodeContainer, TreeNodeRecursion}, + Constraints, DataFusionError, Result, SchemaExt, }; use datafusion_datasource::{ compute_all_files_statistics, file::FileSource, file_groups::FileGroup, - file_scan_config::{FileScanConfig, FileScanConfigBuilder}, + file_scan_config::{FileGroupPartitioning, FileScanConfig, FileScanConfigBuilder}, schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapter, SchemaAdapterFactory}, }; use datafusion_execution::{ @@ -1129,6 +1131,175 @@ impl ListingTable { fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) } + + /// Checks if the requested ordering can be satisfied using file statistics. + /// + /// Only simple column references (not expressions) can be used for file ordering + /// because statistics are typically available only at the column level. + fn can_use_ordering_from_statistics(&self, ordering: &[SortExpr]) -> bool { + ordering.iter().all(|sort_expr| { + // Check if sort expression contains only simple column references + let mut is_simple_column = true; + let _ = sort_expr.apply_elements(|e| { + if !matches!(e, Expr::Column(_)) { + is_simple_column = false; + Ok(TreeNodeRecursion::Stop) + } else { + Ok(TreeNodeRecursion::Continue) + } + }); + is_simple_column + }) + } + + /// Resolves the desired file ordering for file arrangement purposes. + /// + /// This method returns the ordering to use for arranging files optimally, + /// prioritizing query-requested ordering if it can be satisfied using statistics, + /// otherwise using any configured file_sort_order. + /// + /// Note: This is for file arrangement only - output ordering should be determined + /// separately based on whether file_sort_order is configured. + fn resolve_desired_file_arrangement_ordering( + &self, + requested_ordering: Option<&[SortExpr]>, + ) -> Result> { + // Check if query requested specific ordering that we can use for file arrangement + if let Some(ordering) = requested_ordering { + if !ordering.is_empty() && self.can_use_ordering_from_statistics(ordering) { + return create_ordering(&self.table_schema, &[ordering.to_vec()]) + .map(|orderings| orderings.first().cloned()); + } + } + + // Fall back to file_sort_order for arrangement if configured + self.try_create_output_ordering() + .map(|orderings| orderings.first().cloned()) + } + + /// Gets the output orderings that can be guaranteed by the scan. + /// + /// Only returns orderings if file_sort_order is explicitly configured, + /// as this represents a promise about the physical file layout. + /// Returns all configured equivalent orderings. + /// + /// TODO: For formats like Parquet that store ordering metadata in the file, + /// we could read and use that information instead of requiring explicit configuration. + fn get_guaranteed_output_orderings(&self) -> Result>> { + if self.options.file_sort_order.is_empty() { + // No explicit file sort order configured + Ok(None) + } else { + // Return all configured file sort orderings + self.try_create_output_ordering().map(|orderings| { + if orderings.is_empty() { + None + } else { + Some(orderings) + } + }) + } + } + + /// Determines the optimal file grouping and ordering strategy. + /// + /// This method orchestrates the file grouping process by: + /// 1. Resolving the desired ordering for file arrangement (query-requested vs file_sort_order) + /// 2. Applying statistics-based splitting if enabled and available + /// 3. Determining output ordering separately based on file_sort_order configuration + /// 4. Returning both the optimized file groups and any guaranteed output ordering + /// + /// # Arguments + /// * `state` - The session state containing configuration options + /// * `partitioned_file_lists` - Original file groups to potentially reorganize + /// * `requested_ordering` - Ordering requested by the query, if any + /// + /// # Returns + /// A tuple of (file_groups, optional_output_ordering) where: + /// - file_groups: The optimized file group arrangement + /// - optional_output_ordering: Output ordering guaranteed by scan (only if file_sort_order configured) + fn determine_file_groups_and_ordering( + &self, + state: &dyn Session, + partitioned_file_lists: Vec, + requested_ordering: Option<&[SortExpr]>, + ) -> Result<(Vec, Option>)> { + // 1. Determine desired ordering for file arrangement (query-requested vs configured) + let arrangement_ordering = + self.resolve_desired_file_arrangement_ordering(requested_ordering)?; + + // 2. Check if statistics-based splitting is enabled + if !state + .config_options() + .execution + .split_file_groups_by_statistics + { + // No statistics-based splitting - return original groups with guaranteed ordering if any + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + return Ok((partitioned_file_lists, guaranteed_orderings)); + } + + // 3. Apply statistics-based splitting if we have an arrangement ordering requirement + let Some(ordering) = arrangement_ordering else { + // No ordering requirement for arrangement, keep original groups + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + return Ok((partitioned_file_lists, guaranteed_orderings)); + }; + + match FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &self.table_schema, + &partitioned_file_lists, + &ordering, + self.options.target_partitions, + ) { + Ok(FileGroupPartitioning::TotalOrder(groups)) => { + // Files don't overlap and are arranged in total order + log::debug!( + "Files arranged in total order across {} partitions", + groups.len() + ); + + // Only guarantee output ordering if file_sort_order is configured + // and the arrangement ordering matches one of the configured orderings + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + let output_orderings = + if let Some(configured_orderings) = &guaranteed_orderings { + // Check if the arrangement ordering matches any of the configured file_sort_orders + if configured_orderings.contains(&ordering) { + Some(configured_orderings.clone()) + } else { + None // Arrangement was for query optimization, not scan output + } + } else { + None // No file_sort_order configured + }; + + Ok((groups, output_orderings)) + } + Ok(FileGroupPartitioning::PartialOrder(groups)) => { + // Files overlap but are ordered within partitions - cannot guarantee total ordering + log::debug!( + "Files arranged in partial order across {} partitions", + groups.len() + ); + Ok((groups, None)) + } + Ok(FileGroupPartitioning::Unordered(groups)) => { + // No statistics available, files ordered by path + log::debug!( + "Files arranged by path across {} partitions (no statistics)", + groups.len() + ); + Ok((groups, None)) + } + Err(e) => { + log::debug!("Failed to split file groups by statistics: {e}"); + // Fallback to original groups, but still check for guaranteed ordering + let guaranteed_orderings = self.get_guaranteed_output_orderings()?; + Ok((partitioned_file_lists, guaranteed_orderings)) + } + } + } } // Expressions can be used for parttion pruning if they can be evaluated using @@ -1166,6 +1337,22 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { + let options = ScanArgs::default() + .with_projection(projection.cloned()) + .with_filters(Some(filters.to_vec())) + .with_limit(limit); + Ok(self.scan_with_args(state, options).await?.plan()) + } + + async fn scan_with_args( + &self, + state: &dyn Session, + args: ScanArgs, + ) -> Result { + let projection = args.projection(); + let filters = args.filters().map(|f| f.to_vec()).unwrap_or_default(); + let limit = args.limit(); + // extract types of partition columns let table_partition_cols = self .options @@ -1189,54 +1376,41 @@ impl TableProvider for ListingTable { // at the same time. This is because the limit should be applied after the filters are applied. let statistic_file_limit = if filters.is_empty() { limit } else { None }; - let (mut partitioned_file_lists, statistics) = self + let (partitioned_file_lists, statistics) = self .list_files_for_scan(state, &partition_filters, statistic_file_limit) .await?; // if no files need to be read, return an `EmptyExec` if partitioned_file_lists.is_empty() { - let projected_schema = project_schema(&self.schema(), projection)?; - return Ok(Arc::new(EmptyExec::new(projected_schema))); + let projected_schema = project_schema(&self.schema(), projection.as_ref())?; + return Ok(ScanResult::new( + Arc::new(EmptyExec::new(projected_schema)), + filters.clone(), + )); } - let output_ordering = self.try_create_output_ordering()?; - match state - .config_options() - .execution - .split_file_groups_by_statistics - .then(|| { - output_ordering.first().map(|output_ordering| { - FileScanConfig::split_groups_by_statistics_with_target_partitions( - &self.table_schema, - &partitioned_file_lists, - output_ordering, - self.options.target_partitions, - ) - }) - }) - .flatten() - { - Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), - Some(Ok(new_groups)) => { - if new_groups.len() <= self.options.target_partitions { - partitioned_file_lists = new_groups; - } else { - log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") - } - } - None => {} // no ordering required - }; + // Determine optimal file grouping and ordering strategy + let (partitioned_file_lists, output_ordering) = self + .determine_file_groups_and_ordering( + state, + partitioned_file_lists, + args.preferred_ordering(), + )?; let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { - return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); + return Ok(ScanResult::new( + Arc::new(EmptyExec::new(Arc::new(Schema::empty()))), + filters.clone(), + )); }; let file_source = self.create_file_source_with_schema_adapter()?; // create the execution plan - self.options + let plan = self + .options .format .create_physical_plan( state, @@ -1248,14 +1422,16 @@ impl TableProvider for ListingTable { .with_file_groups(partitioned_file_lists) .with_constraints(self.constraints.clone()) .with_statistics(statistics) - .with_projection(projection.cloned()) + .with_projection(projection) .with_limit(limit) - .with_output_ordering(output_ordering) + .with_output_ordering(output_ordering.unwrap_or_default()) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) .build(), ) - .await + .await?; + + Ok(ScanResult::new(plan, filters.clone())) } fn supports_filters_pushdown( diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 0ce5621ac89f..f9ead2a7443a 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -60,6 +60,7 @@ use crate::schema_equivalence::schema_satisfied_by; use arrow::array::{builder::StringBuilder, RecordBatch}; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; +use datafusion_catalog::ScanArgs; use datafusion_common::display::ToStringifiedPlan; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeVisitor, @@ -452,6 +453,7 @@ impl DefaultPhysicalPlanner { projection, filters, fetch, + preferred_ordering, .. }) => { let source = source_as_provider(source)?; @@ -459,9 +461,14 @@ impl DefaultPhysicalPlanner { // doesn't know (nor should care) how the relation was // referred to in the query let filters = unnormalize_cols(filters.iter().cloned()); - source - .scan(session_state, projection.as_ref(), &filters, *fetch) - .await? + let opts = ScanArgs::default() + .with_projection(projection.clone()) + .with_filters(Some(filters).clone()) + .with_preferred_ordering(preferred_ordering.clone()) + .with_limit(*fetch); + let res = source.scan_with_args(session_state, opts).await?; + // TODO: move FilterExec wrapping logic from filter pushdown rule to here? + res.plan() } LogicalPlan::Values(Values { values, schema }) => { let exec_schema = schema.as_ref().to_owned().into(); diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 7088f811bbce..08c7741c4ced 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -43,6 +43,7 @@ use arrow::{ }, buffer::Buffer, datatypes::{ArrowNativeType, DataType, Field, Schema, SchemaRef, UInt16Type}, + row::Row, }; use datafusion_common::config::ConfigOptions; use datafusion_common::{ @@ -69,6 +70,41 @@ use datafusion_physical_plan::coop::cooperative; use datafusion_physical_plan::execution_plan::SchedulingType; use log::{debug, warn}; +/// Result of file group partitioning that indicates the ordering properties +/// of the resulting file groups. +#[derive(Debug, Clone)] +pub enum FileGroupPartitioning { + /// Files are globally ordered and non-overlapping across all partitions. + /// This means the entire scan result maintains the requested sort order. + TotalOrder(Vec), + /// Files are ordered within partitions but may overlap across partitions. + /// Individual partitions maintain sort order but global ordering is not guaranteed. + PartialOrder(Vec), + /// Files have no specific ordering guarantees. + Unordered(Vec), +} + +impl FileGroupPartitioning { + /// Extract the file groups regardless of their ordering properties. + pub fn file_groups(self) -> Vec { + match self { + Self::TotalOrder(groups) + | Self::PartialOrder(groups) + | Self::Unordered(groups) => groups, + } + } + + /// Returns true if the file groups maintain total ordering across all partitions. + pub fn is_total_order(&self) -> bool { + matches!(self, Self::TotalOrder(_)) + } + + /// Returns true if the file groups have some ordering (partial or total). + pub fn is_ordered(&self) -> bool { + matches!(self, Self::TotalOrder(_) | Self::PartialOrder(_)) + } +} + /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. /// @@ -890,6 +926,215 @@ impl FileScanConfig { .collect()) } + /// Distributes sorted files evenly across the target number of partitions using round-robin. + /// + /// This helper function takes pre-sorted files and distributes them across `target_partitions` + /// partitions in a round-robin fashion, which maintains the relative ordering within each + /// partition while ensuring even distribution. + /// + /// # Arguments + /// * `sorted_files` - Files that have already been sorted (by statistics or path) + /// * `target_partitions` - The desired number of output partitions + /// + /// # Returns + /// A vector of file groups, one per partition, with files distributed evenly + fn distribute_sorted_files_evenly( + sorted_files: Vec, + target_partitions: usize, + ) -> Vec { + if sorted_files.is_empty() || target_partitions == 0 { + return vec![]; + } + + let mut groups: Vec> = vec![vec![]; target_partitions]; + + // Round-robin distribution maintaining order + for (i, file) in sorted_files.into_iter().enumerate() { + groups[i % target_partitions].push(file); + } + + // Convert to FileGroups, filtering out empty groups + groups + .into_iter() + .filter(|g| !g.is_empty()) + .map(FileGroup::new) + .collect() + } + + /// Splits file groups by statistics with overlap handling, implementing a three-tier strategy. + /// + /// This method attempts to arrange files in the optimal order for query execution while + /// respecting the target number of partitions. It uses a fallback strategy: + /// + /// 1. **Tier 1 (TotalOrder)**: Try to arrange files as non-overlapping and ordered + /// 2. **Tier 2 (PartialOrder)**: If files overlap, distribute them evenly while maintaining order + /// 3. **Tier 3 (Unordered)**: If no statistics, sort by file path and distribute + /// + /// # Arguments + /// * `table_schema` - Schema of the table for statistics extraction + /// * `file_groups` - Original file groups to split + /// * `sort_order` - The desired lexicographical ordering + /// * `target_partitions` - The desired number of output partitions + /// + /// # Returns + /// A `FileGroupPartitioning` enum indicating the achieved ordering and containing the file groups + pub fn split_groups_by_statistics_with_overlap_handling( + table_schema: &SchemaRef, + file_groups: &[FileGroup], + sort_order: &LexOrdering, + target_partitions: usize, + ) -> Result { + if target_partitions == 0 { + return Err(DataFusionError::Internal( + "target_partitions must be greater than 0".to_string(), + )); + } + + let flattened_files: Vec<_> = + file_groups.iter().flat_map(FileGroup::iter).collect(); + + if flattened_files.is_empty() { + return Ok(FileGroupPartitioning::Unordered(vec![])); + } + + // Try to get statistics - if this fails, use Tier 3 (path-based ordering) + let statistics = match MinMaxStatistics::new_from_files( + sort_order, + table_schema, + None, + flattened_files.iter().copied(), + ) { + Ok(stats) => stats, + Err(e) => { + debug!("Unable to use statistics for file ordering: {e}, falling back to path-based ordering"); + return Ok(Self::arrange_files_by_path( + &flattened_files, + target_partitions, + )); + } + }; + + let indices_sorted_by_min = statistics.min_values_sorted(); + + // Try Tier 1: Non-overlapping arrangement + if let Some(groups) = Self::try_arrange_files_non_overlapping( + &flattened_files, + &statistics, + &indices_sorted_by_min, + target_partitions, + ) { + debug!( + "Successfully arranged {} files in {} non-overlapping groups", + flattened_files.len(), + groups.len() + ); + return Ok(FileGroupPartitioning::TotalOrder(groups)); + } + + // Tier 2: Overlapping arrangement with statistics-based ordering + debug!("Files have overlapping statistics, using overlapping arrangement with {target_partitions} target partitions"); + let groups = Self::arrange_files_with_overlap( + &flattened_files, + &indices_sorted_by_min, + target_partitions, + ); + debug!( + "Arranged {} files in {} groups with statistics-based ordering (overlapping)", + flattened_files.len(), + groups.len() + ); + Ok(FileGroupPartitioning::PartialOrder(groups)) + } + + /// Tier 1: Attempts to arrange files without overlap across partitions. + /// Returns Some(groups) if successful, None if files have overlapping statistics. + fn try_arrange_files_non_overlapping( + flattened_files: &[&PartitionedFile], + statistics: &MinMaxStatistics, + indices_sorted_by_min: &[(usize, Row<'_>)], + target_partitions: usize, + ) -> Option> { + // Initialize with target_partitions empty groups + let mut file_groups_indices: Vec> = vec![vec![]; target_partitions]; + + for (idx, min) in indices_sorted_by_min.iter().copied() { + if let Some((_, group)) = file_groups_indices + .iter_mut() + .enumerate() + .filter(|(_, group)| { + group.is_empty() + || min + > statistics + .max(*group.last().expect("groups should not be empty")) + }) + .min_by_key(|(_, group)| group.len()) + { + group.push(idx); + } else { + // No existing group can fit this file without overlap + return None; + } + } + + // Remove any empty groups + file_groups_indices.retain(|group| !group.is_empty()); + + // Success: files are non-overlapping and fit in target partitions + let groups = file_groups_indices + .into_iter() + .map(|file_group_indices| { + FileGroup::new( + file_group_indices + .into_iter() + .map(|idx| flattened_files[idx].clone()) + .collect(), + ) + }) + .collect(); + + Some(groups) + } + + /// Tier 2: Arranges files with overlapping statistics by distributing them evenly + /// across partitions while maintaining statistical ordering within each partition. + fn arrange_files_with_overlap( + flattened_files: &[&PartitionedFile], + indices_sorted_by_min: &[(usize, Row<'_>)], + target_partitions: usize, + ) -> Vec { + let sorted_files: Vec<_> = indices_sorted_by_min + .iter() + .map(|(idx, _)| flattened_files[*idx].clone()) + .collect(); + + Self::distribute_sorted_files_evenly(sorted_files, target_partitions) + } + + /// Tier 3: Arranges files without statistics by sorting them by path and distributing evenly. + fn arrange_files_by_path( + flattened_files: &[&PartitionedFile], + target_partitions: usize, + ) -> FileGroupPartitioning { + let mut files_with_paths: Vec<_> = flattened_files + .iter() + .map(|f| (f.path().to_string(), (*f).clone())) + .collect(); + + files_with_paths.sort_by(|a, b| a.0.cmp(&b.0)); + + let sorted_files: Vec<_> = + files_with_paths.into_iter().map(|(_, file)| file).collect(); + + let groups = + Self::distribute_sorted_files_evenly(sorted_files, target_partitions); + debug!( + "Arranged {} files in {} groups ordered by file path (no statistics)", + flattened_files.len(), + groups.len() + ); + FileGroupPartitioning::Unordered(groups) + } + /// Attempts to do a bin-packing on files into file groups, such that any two files /// in a file group are ordered and non-overlapping with respect to their statistics. /// It will produce the smallest number of file groups possible. @@ -2429,4 +2674,372 @@ mod tests { Ok(()) } + + #[test] + fn test_try_arrange_files_non_overlapping() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = [col("value").sort(true, false)]; + let sort_ordering = sort_expr + .map(|expr| { + create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap() + }) + .into(); + + // Test case 1: Non-overlapping files should succeed + let non_overlapping_files = generate_test_files(4, 0.0); + let flattened_files: Vec<_> = non_overlapping_files + .iter() + .flat_map(FileGroup::iter) + .collect(); + + let statistics = MinMaxStatistics::new_from_files( + &sort_ordering, + &schema, + None, + flattened_files.iter().copied(), + )?; + let indices_sorted_by_min = statistics.min_values_sorted(); + + let result = FileScanConfig::try_arrange_files_non_overlapping( + &flattened_files, + &statistics, + &indices_sorted_by_min, + 3, + ); + + assert!(result.is_some(), "Non-overlapping files should succeed"); + let groups = result.unwrap(); + assert!(groups.len() <= 3, "Should not exceed target partitions"); + + // Verify total file count is preserved + let total_files: usize = groups.iter().map(FileGroup::len).sum(); + assert_eq!(total_files, flattened_files.len()); + + // Test case 2: Overlapping files should fail + let overlapping_files = generate_test_files(8, 0.9); // High overlap + let flattened_overlapping: Vec<_> = + overlapping_files.iter().flat_map(FileGroup::iter).collect(); + + let overlapping_stats = MinMaxStatistics::new_from_files( + &sort_ordering, + &schema, + None, + flattened_overlapping.iter().copied(), + )?; + let overlapping_indices = overlapping_stats.min_values_sorted(); + + let overlapping_result = FileScanConfig::try_arrange_files_non_overlapping( + &flattened_overlapping, + &overlapping_stats, + &overlapping_indices, + 3, + ); + + // This might succeed or fail depending on the specific overlap pattern + match overlapping_result { + Some(groups) => { + println!( + "✓ Overlapping files surprisingly fit in non-overlapping arrangement" + ); + assert!(groups.len() <= 3); + } + None => { + println!("✓ Overlapping files correctly rejected by non-overlapping arrangement"); + } + } + + println!("✓ try_arrange_files_non_overlapping tests passed"); + Ok(()) + } + + #[test] + fn test_arrange_files_with_overlap() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = [col("value").sort(true, false)]; + let sort_ordering = sort_expr + .map(|expr| { + create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap() + }) + .into(); + + // Test with overlapping files + let overlapping_files = generate_test_files(10, 0.8); + let flattened_files: Vec<_> = + overlapping_files.iter().flat_map(FileGroup::iter).collect(); + + let statistics = MinMaxStatistics::new_from_files( + &sort_ordering, + &schema, + None, + flattened_files.iter().copied(), + )?; + let indices_sorted_by_min = statistics.min_values_sorted(); + + let target_partitions = 3; + let result = FileScanConfig::arrange_files_with_overlap( + &flattened_files, + &indices_sorted_by_min, + target_partitions, + ); + + // Should create exactly target_partitions groups + assert_eq!(result.len(), target_partitions); + + // Verify total file count is preserved + let total_files: usize = result.iter().map(FileGroup::len).sum(); + assert_eq!(total_files, flattened_files.len()); + + // Verify files are distributed somewhat evenly + let group_sizes: Vec = result.iter().map(FileGroup::len).collect(); + let max_size = *group_sizes.iter().max().unwrap(); + let min_size = *group_sizes.iter().min().unwrap(); + let size_diff = max_size - min_size; + + // Difference should not be more than 1 (round-robin distribution) + assert!( + size_diff <= 1, + "Files should be distributed evenly, got sizes: {group_sizes:?}" + ); + + println!("✓ arrange_files_with_overlap tests passed"); + Ok(()) + } + + #[test] + fn test_arrange_files_by_path() { + use crate::PartitionedFile; + use object_store::path::Path; + + // Create files with specific names to test path sorting + let files = vec![ + PartitionedFile::new(Path::from("file_c.parquet"), 1000), + PartitionedFile::new(Path::from("file_a.parquet"), 2000), + PartitionedFile::new(Path::from("file_b.parquet"), 1500), + ]; + + let file_refs: Vec<_> = files.iter().collect(); + let target_partitions = 2; + + let result = FileScanConfig::arrange_files_by_path(&file_refs, target_partitions); + + match result { + FileGroupPartitioning::Unordered(groups) => { + // Should not exceed target partitions + assert!(groups.len() <= target_partitions); + + // Collect all files in order they appear in groups + let all_files: Vec<_> = groups + .iter() + .flat_map(FileGroup::iter) + .map(|f| f.path().to_string()) + .collect(); + + // The files are sorted before distribution, but round-robin distribution + // means the global order may not be maintained across partitions. + // Instead, verify that all original files are present. + let mut sorted_all_files = all_files.clone(); + sorted_all_files.sort(); + let expected_files = vec![ + "file_a.parquet".to_string(), + "file_b.parquet".to_string(), + "file_c.parquet".to_string(), + ]; + assert_eq!( + sorted_all_files, expected_files, + "All files should be present" + ); + + // Verify that within each partition, files maintain relative order if they exist + for group in &groups { + let group_files: Vec<_> = + group.iter().map(|f| f.path().to_string()).collect(); + let mut sorted_group_files = group_files.clone(); + sorted_group_files.sort(); + assert_eq!( + group_files, sorted_group_files, + "Files within each partition should be sorted" + ); + } + + // Verify total count + assert_eq!(all_files.len(), files.len()); + + println!("✓ arrange_files_by_path correctly sorted files by path"); + } + _ => panic!("Expected Unordered result"), + } + + println!("✓ arrange_files_by_path tests passed"); + } + + #[test] + fn test_split_groups_by_statistics_with_overlap_handling_integration() -> Result<()> { + use datafusion_common::DFSchema; + use datafusion_expr::{col, execution_props::ExecutionProps}; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Float64, + false, + )])); + + let exec_props = ExecutionProps::new(); + let df_schema = DFSchema::try_from_qualified_schema("test", schema.as_ref())?; + let sort_expr = [col("value").sort(true, false)]; + let sort_ordering = sort_expr + .map(|expr| { + create_physical_sort_expr(&expr, &df_schema, &exec_props).unwrap() + }) + .into(); + + let target_partitions = 3; + + // Test case 1: Non-overlapping files (Tier 1) + let non_overlapping_files = generate_test_files(5, 0.0); + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &non_overlapping_files, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::TotalOrder(groups) => { + println!("✓ Non-overlapping files -> TotalOrder"); + assert!(groups.len() <= target_partitions); + } + _ => panic!("Expected TotalOrder for non-overlapping files"), + } + + // Test case 2: Overlapping files (Tier 2) + let overlapping_files = generate_test_files(10, 0.8); + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &overlapping_files, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::PartialOrder(groups) => { + println!("✓ Overlapping files -> PartialOrder"); + assert_eq!(groups.len(), target_partitions); + } + FileGroupPartitioning::TotalOrder(groups) => { + println!("✓ Overlapping files fit in TotalOrder"); + assert!(groups.len() <= target_partitions); + } + _ => panic!("Expected PartialOrder or TotalOrder for files with statistics"), + } + + // Test case 3: Files without statistics (Tier 3) + use crate::PartitionedFile; + use object_store::path::Path; + + let files_no_stats = vec![FileGroup::new(vec![ + PartitionedFile::new(Path::from("file1.parquet"), 1000), + PartitionedFile::new(Path::from("file2.parquet"), 2000), + PartitionedFile::new(Path::from("file3.parquet"), 1500), + ])]; + + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &files_no_stats, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::Unordered(groups) => { + println!("✓ Files without statistics -> Unordered"); + assert!(groups.len() <= target_partitions); + } + _ => panic!("Expected Unordered for files without statistics"), + } + + // Test case 4: Empty file groups + let empty_groups: Vec = vec![]; + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &empty_groups, + &sort_ordering, + target_partitions, + )?; + + match result { + FileGroupPartitioning::Unordered(groups) => { + assert!(groups.is_empty()); + println!("✓ Empty file groups handled correctly"); + } + _ => panic!("Expected empty Unordered for empty file groups"), + } + + // Test case 5: Zero target partitions + let result = FileScanConfig::split_groups_by_statistics_with_overlap_handling( + &schema, + &non_overlapping_files, + &sort_ordering, + 0, + ); + + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("target_partitions must be greater than 0")); + + println!("✓ Integration tests passed for split_groups_by_statistics_with_overlap_handling"); + Ok(()) + } + + #[test] + fn test_file_group_partitioning_methods() { + // Test the enum helper methods + let groups = vec![FileGroup::new(vec![]), FileGroup::new(vec![])]; + + let total_order = FileGroupPartitioning::TotalOrder(groups.clone()); + let partial_order = FileGroupPartitioning::PartialOrder(groups.clone()); + let unordered = FileGroupPartitioning::Unordered(groups.clone()); + + // Test is_total_order + assert!(total_order.is_total_order()); + assert!(!partial_order.is_total_order()); + assert!(!unordered.is_total_order()); + + // Test is_ordered + assert!(total_order.is_ordered()); + assert!(partial_order.is_ordered()); + assert!(!unordered.is_ordered()); + + // Test file_groups extraction + let extracted_total = total_order.file_groups(); + let extracted_partial = partial_order.file_groups(); + let extracted_unordered = unordered.file_groups(); + + assert_eq!(extracted_total.len(), 2); + assert_eq!(extracted_partial.len(), 2); + assert_eq!(extracted_unordered.len(), 2); + + println!("✓ FileGroupPartitioning helper methods work correctly"); + } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 887afd7cde3e..ea879a854b60 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -2525,6 +2525,8 @@ pub struct TableScan { pub filters: Vec, /// Optional number of rows to read pub fetch: Option, + /// Preferred read ordering of the table + pub preferred_ordering: Option>, } impl Debug for TableScan { @@ -2643,8 +2645,14 @@ impl TableScan { projected_schema, filters, fetch, + preferred_ordering: None, }) } + + pub fn with_preferred_ordering(mut self, ordering: Option>) -> Self { + self.preferred_ordering = ordering; + self + } } // Repartition the plan based on a partitioning scheme. @@ -4814,6 +4822,7 @@ mod tests { projected_schema: Arc::clone(&schema), filters: vec![], fetch: None, + preferred_ordering: None, })); let col = schema.field_names()[0].clone(); @@ -4844,6 +4853,7 @@ mod tests { projected_schema: Arc::clone(&unique_schema), filters: vec![], fetch: None, + preferred_ordering: None, })); let col = schema.field_names()[0].clone(); diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 47088370a1d9..37244ebf9437 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -599,6 +599,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + preferred_ordering, }) => filters.map_elements(f)?.update_data(|filters| { LogicalPlan::TableScan(TableScan { table_name, @@ -607,6 +608,7 @@ impl LogicalPlan { projected_schema, filters, fetch, + preferred_ordering, }) }), LogicalPlan::Distinct(Distinct::On(DistinctOn { diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 280010e3d92c..8d6088cc9d91 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -58,6 +58,7 @@ pub mod optimizer; pub mod propagate_empty_relation; pub mod push_down_filter; pub mod push_down_limit; +pub mod push_down_sort; pub mod replace_distinct_aggregate; pub mod scalar_subquery_to_join; pub mod simplify_expressions; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index 97402c990b83..d99543993f65 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -242,6 +242,7 @@ fn optimize_projections( filters, fetch, projected_schema: _, + preferred_ordering, } = table_scan; // Get indices referred to in the original (schema with all fields) @@ -257,6 +258,7 @@ fn optimize_projections( filters, fetch, ) + .map(|s| s.with_preferred_ordering(preferred_ordering)) .map(LogicalPlan::TableScan) .map(Transformed::yes); } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 49806d6db344..df04f65fbcd9 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -51,6 +51,7 @@ use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; use crate::push_down_filter::PushDownFilter; use crate::push_down_limit::PushDownLimit; +use crate::push_down_sort::PushDownSort; use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; use crate::scalar_subquery_to_join::ScalarSubqueryToJoin; use crate::simplify_expressions::SimplifyExpressions; @@ -243,6 +244,8 @@ impl Optimizer { // Filters can't be pushed down past Limits, we should do PushDownFilter after PushDownLimit Arc::new(PushDownLimit::new()), Arc::new(PushDownFilter::new()), + // Push down sort requirements to TableScan preferred_ordering + Arc::new(PushDownSort::new()), Arc::new(SingleDistinctToGroupBy::new()), // The previous optimizations added expressions and projections, // that might benefit from the following rules diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 27c2499c8a26..50e7d00b7788 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -3055,6 +3055,7 @@ mod tests { projection, source: Arc::new(test_provider), fetch: None, + preferred_ordering: None, }); Ok(LogicalPlanBuilder::from(table_scan)) diff --git a/datafusion/optimizer/src/push_down_sort.rs b/datafusion/optimizer/src/push_down_sort.rs new file mode 100644 index 000000000000..38408d3a6bba --- /dev/null +++ b/datafusion/optimizer/src/push_down_sort.rs @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`PushDownSort`] pushes `SORT` requirements into `TableScan` + +use std::sync::Arc; + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; + +use datafusion_common::tree_node::Transformed; +use datafusion_common::{ExprSchema, Result}; +use datafusion_expr::logical_plan::{LogicalPlan, Sort}; +use datafusion_expr::SortExpr; + +/// Optimization rule that tries to push down `SORT` ordering requirements into `TableScan`. +/// +/// This optimization allows table providers to potentially provide pre-sorted data, +/// eliminating the need for expensive Sort operations. +#[derive(Default, Debug)] +pub struct PushDownSort {} + +impl PushDownSort { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } + + /// Attempts to push sort expressions down to a TableScan + fn try_push_sort_to_table_scan( + sort_exprs: &[SortExpr], + mut plan: LogicalPlan, + ) -> Result> { + match plan { + LogicalPlan::TableScan(ref mut scan) => { + // If the table scan already has a preferred ordering, don't override it + if scan.preferred_ordering.is_some() { + return Ok(Transformed::no(plan)); + } + + // Set the preferred ordering on the table scan + scan.preferred_ordering = Some(sort_exprs.to_vec()); + Ok(Transformed::yes(plan)) + } + LogicalPlan::Filter(mut filter) => { + // Recursively try to push down through the filter + let filter_input = Arc::clone(&filter.input); + let input_result = Self::try_push_sort_to_table_scan( + sort_exprs, + Arc::unwrap_or_clone(filter_input), + )?; + if input_result.transformed { + filter.input = Arc::new(input_result.data); + Ok(Transformed::yes(LogicalPlan::Filter(filter))) + } else { + Ok(Transformed::no(LogicalPlan::Filter(filter))) + } + } + LogicalPlan::Projection(mut projection) => { + // For projections, we need to check if the sort expressions are still valid + // For simplicity, we'll only push down if all sort expressions reference + // columns that exist in the projection's input + let input_schema = projection.input.schema(); + let sort_is_valid = sort_exprs.iter().all(|sort_expr| { + // Check if the sort expression is a simple column reference + if let datafusion_expr::Expr::Column(col) = &sort_expr.expr { + input_schema.field_from_column(col).is_ok() + } else { + // For non-column expressions, we'd need more sophisticated analysis + false + } + }); + + if sort_is_valid { + let projection_input = Arc::clone(&projection.input); + let input_result = Self::try_push_sort_to_table_scan( + sort_exprs, + Arc::unwrap_or_clone(projection_input), + )?; + if input_result.transformed { + projection.input = Arc::new(input_result.data); + return Ok(Transformed::yes(LogicalPlan::Projection(projection))); + } + } + Ok(Transformed::no(LogicalPlan::Projection(projection))) + } + LogicalPlan::SubqueryAlias(mut alias) => { + // Recursively try to push down through the subquery alias + let alias_input = Arc::clone(&alias.input); + let input_result = Self::try_push_sort_to_table_scan( + sort_exprs, + Arc::unwrap_or_clone(alias_input), + )?; + if input_result.transformed { + alias.input = Arc::new(input_result.data); + Ok(Transformed::yes(LogicalPlan::SubqueryAlias(alias))) + } else { + Ok(Transformed::no(LogicalPlan::SubqueryAlias(alias))) + } + } + _ => { + // Cannot push sort through other operations + Ok(Transformed::no(plan)) + } + } + } +} + +impl OptimizerRule for PushDownSort { + fn name(&self) -> &str { + "push_down_sort" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::TopDown) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + let LogicalPlan::Sort(sort) = plan else { + return Ok(Transformed::no(plan)); + }; + + // Try to push the sort requirements down to a table scan + let sort_exprs = sort.expr.clone(); + let sort_input = Arc::clone(&sort.input); + let sort_fetch = sort.fetch; + + let input_result = Self::try_push_sort_to_table_scan( + &sort_exprs, + Arc::unwrap_or_clone(sort_input), + )?; + + if input_result.transformed { + // If we successfully pushed the sort down, we can potentially eliminate the sort + // For now, we'll keep the sort for safety, but in a full implementation + // we could eliminate it if we're confident the table provider will provide sorted data + Ok(Transformed::yes(LogicalPlan::Sort(Sort { + expr: sort_exprs, + input: Arc::new(input_result.data), + fetch: sort_fetch, + }))) + } else { + // Could not push down the sort + Ok(Transformed::no(LogicalPlan::Sort(sort))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::*; + use crate::OptimizerContext; + use datafusion_expr::{col, logical_plan::builder::LogicalPlanBuilder}; + + fn optimize(plan: LogicalPlan) -> Result { + let rule = PushDownSort::new(); + let config = &OptimizerContext::new(); + let result = rule.rewrite(plan, config)?; + Ok(result.data) + } + + #[test] + fn test_sort_table_scan() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .sort(vec![col("a").sort(true, true)])? + .build()?; + + let optimized = optimize(plan)?; + + // The optimized plan should have the sort pushed down to the table scan + match &optimized { + LogicalPlan::Sort(sort) => match sort.input.as_ref() { + LogicalPlan::TableScan(scan) => { + assert!(scan.preferred_ordering.is_some()); + assert_eq!(scan.preferred_ordering.as_ref().unwrap().len(), 1); + } + _ => panic!("Expected TableScan after Sort"), + }, + _ => panic!("Expected Sort at root"), + } + + Ok(()) + } + + #[test] + fn test_sort_through_filter() -> Result<()> { + let table_scan = test_table_scan()?; + let plan = LogicalPlanBuilder::from(table_scan) + .filter(col("a").gt(col("b")))? + .sort(vec![col("c").sort(true, true)])? + .build()?; + + let optimized = optimize(plan)?; + + // Should push sort through filter to table scan + match &optimized { + LogicalPlan::Sort(sort) => match sort.input.as_ref() { + LogicalPlan::Filter(filter) => match filter.input.as_ref() { + LogicalPlan::TableScan(scan) => { + assert!(scan.preferred_ordering.is_some()); + } + _ => panic!("Expected TableScan after Filter"), + }, + _ => panic!("Expected Filter after Sort"), + }, + _ => panic!("Expected Sort at root"), + } + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 80f1de5a5b6f..dc2e12b6cd80 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -290,6 +290,12 @@ impl SharedBoundsAccumulator { // Combine all column predicates for this partition with AND if !column_predicates.is_empty() { + #[cfg(debug_assertions)] + { + // Sort predicates for consistent ordering in debug builds + column_predicates + .sort_by_cached_key(|expr| format!("{}", fmt_sql(expr.as_ref()))); + } let partition_predicate = column_predicates .into_iter() .reduce(|acc, pred| { diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 576a51707c96..0cd4b4bf9279 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -271,6 +271,7 @@ fn from_table_source( projected_schema, filters: vec![], fetch: None, + preferred_ordering: None, }); LogicalPlanNode::try_from_logical_plan(&r, extension_codec) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index cccffe08ad7c..05803270c7c8 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -195,6 +195,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE @@ -217,6 +218,7 @@ logical_plan after filter_null_join_keys SAME TEXT AS ABOVE logical_plan after eliminate_outer_join SAME TEXT AS ABOVE logical_plan after push_down_limit SAME TEXT AS ABOVE logical_plan after push_down_filter SAME TEXT AS ABOVE +logical_plan after push_down_sort SAME TEXT AS ABOVE logical_plan after single_distinct_aggregation_to_group_by SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt index fe909e70ffb0..e675a9a8cd49 100644 --- a/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt +++ b/datafusion/sqllogictest/test_files/parquet_sorted_statistics.slt @@ -275,4 +275,4 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [constant_col@0 ASC NULLS LAST] 02)--SortExec: expr=[constant_col@0 ASC NULLS LAST], preserve_partitioning=[true] -03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet]]}, projection=[constant_col], file_type=parquet +03)----DataSourceExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=A/0.parquet, WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=C/2.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_sorted_statistics/test_table/partition_col=B/1.parquet]]}, projection=[constant_col], file_type=parquet