Skip to content

Conversation

adriangb
Copy link
Contributor

@adriangb adriangb commented Aug 21, 2025

I think the refactor to scan() is overall positive: there's already several arguments, it's conceivable we want to add more or allow the TableProvider to return more information.

@github-actions github-actions bot added logical-expr Logical plan and expressions optimizer Optimizer rules core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate proto Related to proto crate labels Aug 21, 2025
@adriangb
Copy link
Contributor Author

lots of warnings to resolve, and I'm not sure my handling of Vec<Vec> is correct... but I wanted to post to get the initial sketch out there

@github-actions github-actions bot added ffi Changes to the ffi crate physical-plan Changes to the physical-plan crate labels Aug 21, 2025
@@ -138,6 +138,7 @@ impl MemTable {
) -> Result<Self> {
let schema = t.schema();
let constraints = t.constraints();
#[expect(deprecated)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could just not deprecate the existing method to (1) reduce the diff here and (2) make sure we want to commit to the new method before deprecating the old one

} = args;
let filters = filters.unwrap_or_default();
let unsupported_filters = self
.supports_filters_pushdown(&filters.iter().collect_vec())?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm tempted to also do something about supports_filters_pushdown. I think it could be folded into scan() (and somewhat already set it up that way since scan_with_args gets to return unsupported/inexact filters).

I would rework the filter pushdown optimizer so that it pushes all filters into the TableScan structure (without caring about supported / unsupported) and then then when we go from logical -> physical plan we apply a FilterExec for any non-exact filters and a ProjectionExec for the projections.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend doing this as a follow on PR -- and the key consideration would be how disruptive it would be to existing users vs the benefit existing users and new users would get

Comment on lines 1221 to 1233
let known_file_ordering = self.try_create_output_ordering()?;
let desired_file_ordering = match args.preferred_ordering() {
Some(ordering) if !ordering.is_empty() => {
// Prefer the ordering requested by the query to any inherint file ordering
create_ordering(&self.table_schema, &[ordering.to_vec()])?
.first()
.cloned()
}
Some(_) | None => {
// If the query did not request a specific ordering, fall back to any inherent file ordering
known_file_ordering.first().cloned()
}
};
Copy link
Contributor Author

@adriangb adriangb Aug 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too sure about correctness here. The Vec<Vec<SortExpr>> is a bit wonky. I think the answer is to add tests / benchmarks proving that the "correct" ordering for files is being chosen.

@github-actions github-actions bot removed the ffi Changes to the ffi crate label Aug 22, 2025
@adriangb
Copy link
Contributor Author

adriangb commented Aug 22, 2025

The actual sort optimization is not working. split_groups_by_statistics_with_target_partitions does stuff I do not understand, it's not putting out groups that are internally sorted. It seems like it was all designed for the use case of avoiding the sorts altogether with non-overlapping statistics, etc. aka the ProgressiveEval use case that as far as I know never fully materialized in DataFusion

@adriangb
Copy link
Contributor Author

Okay this mostly works other than the fact that split_groups_by_statistics_with_target_partitions does not respect target partitions and instead tries to force files into groups so that the ranges don't overlap. @alamb this seems like an over-specialization to the influx use case, but maybe it's a wider spread use case than I am considering. It's not clear to me the tradeoffs of forcing that behavior vs. alternative (e.g. making a single group where all of the files are sorted but have overlapping ranges).

This test illustrates what happens:

copy (select i from generate_series(1, 10) as t(i)) to 'data/sort/t10.parquet';
copy (select i from generate_series(9, 20) as t(i)) to 'data/sort/t9.parquet';
copy (select i from generate_series(19, 30) as t(i)) to 'data/sort/t8.parquet';
copy (select i from generate_series(29, 40) as t(i)) to 'data/sort/t7.parquet';
copy (select i from generate_series(39, 50) as t(i)) to 'data/sort/t6.parquet';
copy (select i from generate_series(49, 60) as t(i)) to 'data/sort/t5.parquet';
copy (select i from generate_series(59, 70) as t(i)) to 'data/sort/t4.parquet';
copy (select i from generate_series(69, 80) as t(i)) to 'data/sort/t3.parquet';
copy (select i from generate_series(79, 90) as t(i)) to 'data/sort/t2.parquet';
copy (select i from generate_series(89, 100) as t(i)) to 'data/sort/t1.parquet';
copy (select i from generate_series(99, 110) as t(i)) to 'data/sort/t0.parquet';
SET datafusion.execution.target_partitions = '1';
SET datafusion.execution.split_file_groups_by_statistics = 'true';
create external table t stored as parquet location 'data/sort/';
explain analyze
select *
from t
order by i asc
limit 1;
  1. split_groups_by_statistics_with_target_partitions will split these up into 2 groups (roughly [(1,10), (19, 30), ...], [(9, 20), (29, 40), ...]) :
    for (idx, min) in indices_sorted_by_min {
    if let Some((_, group)) = file_groups_indices
    .iter_mut()
    .enumerate()
    .filter(|(_, group)| {
    group.is_empty()
    || min
    > statistics
    .max(*group.last().expect("groups should not be empty"))
    })
    .min_by_key(|(_, group)| group.len())
    {
    group.push(idx);
    } else {
    // Create a new group if no existing group fits
    file_groups_indices.push(vec![idx]);
    }
    }
  2. Because we have 2 groups and 1 target partition this line gets hit and we fall back to ordering by file path:
    log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered")

That said we (Pydantic) don't use that function (nor do we use ListingTable) so just the refactor to make the sorting information available to TableProvider is good enough for us. To get that optimization to everyone using ListingTable / datafusion-cli I think we'll have to have a think about how hardcoded the assumption of a desire for non-overlapping ranges is.

@adriangb
Copy link
Contributor Author

I think one proposal that may work is:

  1. Try to arrange the files as non-overlapping ordered into the number of partitions requested.
  2. If that is not possible (there is overlap) simply place them as ordered into the number of partitions requested.
  3. If that's not possible (e.g. not statistics) then order them by some deterministic property (file path).
    That keeps backwards compatibility in all cases I believe but unlocks state (2) which will be beneficial to many.

}

/// Attempts to push sort expressions down to a TableScan
fn try_push_sort_to_table_scan(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another place to pay attention

@alamb
Copy link
Contributor

alamb commented Aug 24, 2025

I am out this next week on vacation, but I'll be around and will try and review this PR but it may take me a few days

@alamb
Copy link
Contributor

alamb commented Aug 24, 2025

I can't remember who,but there were other people who have asked about pushing down order by into scans that might be interested as well.

Maybe @suremarc remembers 🤔

@github-actions github-actions bot added the datasource Changes to the datasource crate label Aug 25, 2025
@adriangb adriangb changed the title [DRAFT]: refactor TableProvider::scan to push down sorting Refactor TableProvider::scan to push down sorting Aug 25, 2025
@adriangb
Copy link
Contributor Author

adriangb commented Aug 25, 2025

I think one proposal that may work is:

  1. Try to arrange the files as non-overlapping ordered into the number of partitions requested.
  2. If that is not possible (there is overlap) simply place them as ordered into the number of partitions requested.
  3. If that's not possible (e.g. not statistics) then order them by some deterministic property (file path).
    That keeps backwards compatibility in all cases I believe but unlocks state (2) which will be beneficial to many.

I've now implemented this but the diff is much larger (full disclosure: I gave Claude the plan, coached it through some mistakes but had it do most of the work, the output looks good to me in general though but needs more human review by me and others). I propose that if we want to move forward this top level PR get some review and if we agree on the big picture I'll split this up into:

  1. Refactor scan -> scan_with_args
  2. Implement the sort pushdown rule and add the field to TableScan
  3. Update ListingTable to use this preferred sort order
  4. Read the sort order of the files at the same time as we read stats. May need to refactor some traits from collect_stats to collect_file_properties or something like that. Should be free for parquet (order is in the metadata which we have to pull down to get stats from). (This work is not in this PR)

I can make an [EPIC] issue to track this if we agree on the plan.

@xudong963 xudong963 self-requested a review August 25, 2025 08:27
@alamb alamb changed the title Refactor TableProvider::scan to push down sorting Add TableProvider::scan_with_args to support pushdown sorting Aug 27, 2025
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR @adriangb

There are multiple things going on in this PR:

  1. Update TableProvider API
  2. Pushdown sorts
  3. rework how filter pushdown works during physical planning

It might help to split them up into separate PRs

Also in terms of pushdown sort, I feel like it would really help to add an example how how to use it, or even better find some way to use that information to improve one of the built in features of DataFusion (so more users could benefit)

However, I can't think of a good improvement at the moment.

Maybe just an example showing how to use it in a custom table provdider / extend the existing "custom file format" example

@@ -171,6 +172,34 @@ pub trait TableProvider: Debug + Sync + Send {
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>>;

async fn scan_with_args(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a great new API and makes a lot of sense as it allows us to iterate / expand the scan API over time with less API disruption

It is also consistent with ScalarUDFImpl::invoke_with_args

To merge this PR I think we should:

  1. document this function (perhaps move the docs from scan here and then direct people to use scan_with_args with new TableProviders
  2. Migrate all existing code in Datafusion to use scan_with_args
  3. Consider deprecating scan

(maybe we can do this as a follow on PR)

} = args;
let filters = filters.unwrap_or_default();
let unsupported_filters = self
.supports_filters_pushdown(&filters.iter().collect_vec())?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend doing this as a follow on PR -- and the key consideration would be how disruptive it would be to existing users vs the benefit existing users and new users would get

plan: Arc<dyn ExecutionPlan>,
// Remaining filters that were not completely evaluated during `scan_with_args()`.
// These were previously referred to as "unsupported filters" or "inexact filters".
filters: Vec<Expr>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is confusing as the "can the filters be supported" is currently a separate API. I think we should have one or the other but not both.

I realize that scan_with_args is basically the "create the appropriate physical plan" so logically it makes sense here.

@adriangb
Copy link
Contributor Author

There are multiple things going on in this PR:

  1. Update TableProvider API
  2. Pushdown sorts
  3. rework how filter pushdown works during physical planning

It might help to split them up into separate PRs

Yep agreed, that's why I laid out a plan in #17273 (comment):

  1. Refactor scan -> scan_with_args
  2. Implement the sort pushdown rule and add the field to TableScan
  3. Update ListingTable to use this preferred sort order
  4. Read the sort order of the files at the same time as we read stats. May need to refactor some traits from collect_stats to collect_file_properties or something like that. Should be free for parquet (order is in the metadata which we have to pull down to get stats from). (This work is not in this PR)

Also in terms of pushdown sort, I feel like it would really help to add an example how how to use it, or even better find some way to use that information to improve one of the built in features of DataFusion (so more users could benefit)

Agreed as well, that's why I've kept this "larger" PR so we can see the e2e change. My hope is that this e2e change can demonstrate a performance improvement using datafusion-cli as the "example".

It sounds like you're generally on board with the vision here, so I'll start the work of splitting this PR up.

adriangb added a commit to pydantic/datafusion that referenced this pull request Aug 27, 2025
This commit adds a new optional field `preferred_ordering` to the `TableScan`
logical plan node to support sort pushdown optimizations.

Changes include:
- Add `preferred_ordering: Option<Vec<SortExpr>>` field to `TableScan` struct
- Add `try_new_with_preferred_ordering` constructor method
- Update all `TableScan` constructors throughout the codebase to include the new field
- Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations
- Update pattern matching in optimizer and other modules

The preferred_ordering field is currently not used by any optimization rules
but provides the foundation for future sort pushdown implementations.

This is part 2 of 2 PRs split from apache#17273 as requested in
apache#17273 (comment)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
@adriangb
Copy link
Contributor Author

@alamb I've started with #17336 and #17337, two relatively small independent PRs that build the foundation for this change

@adriangb
Copy link
Contributor Author

I'll close this PR for now and focus on the breakouts

@adriangb adriangb closed this Aug 27, 2025
@adriangb
Copy link
Contributor Author

I've made an epic to track this work: #17348

@alamb
Copy link
Contributor

alamb commented Sep 4, 2025

Thanks -- sorry I have been away. I am back now and will be working through the review backlog

adriangb added a commit to pydantic/datafusion that referenced this pull request Sep 7, 2025
This commit adds a new optional field `preferred_ordering` to the `TableScan`
logical plan node to support sort pushdown optimizations.

Changes include:
- Add `preferred_ordering: Option<Vec<SortExpr>>` field to `TableScan` struct
- Add `try_new_with_preferred_ordering` constructor method
- Update all `TableScan` constructors throughout the codebase to include the new field
- Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations
- Update pattern matching in optimizer and other modules

The preferred_ordering field is currently not used by any optimization rules
but provides the foundation for future sort pushdown implementations.

This is part 2 of 2 PRs split from apache#17273 as requested in
apache#17273 (comment)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
adriangb added a commit to pydantic/datafusion that referenced this pull request Sep 9, 2025
This commit adds a new optional field `preferred_ordering` to the `TableScan`
logical plan node to support sort pushdown optimizations.

Changes include:
- Add `preferred_ordering: Option<Vec<SortExpr>>` field to `TableScan` struct
- Add `try_new_with_preferred_ordering` constructor method
- Update all `TableScan` constructors throughout the codebase to include the new field
- Update `Debug`, `PartialEq`, `Hash`, and `PartialOrd` implementations
- Update pattern matching in optimizer and other modules

The preferred_ordering field is currently not used by any optimization rules
but provides the foundation for future sort pushdown implementations.

This is part 2 of 2 PRs split from apache#17273 as requested in
apache#17273 (comment)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
catalog Related to the catalog crate core Core DataFusion crate datasource Changes to the datasource crate logical-expr Logical plan and expressions optimizer Optimizer rules physical-plan Changes to the physical-plan crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Control ordering of file opened based on statistics
2 participants