Skip to content

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Aug 22, 2025

📰 See rendered preview here: https://datafusion.staged.apache.org/blog/2025/09/10/dynamic-filters/ 📰

Notes:
This is based on @adriangb 's PR in #102, but is hosted in the apache/datafusion-site fork so

  1. Other people can push commits to it
  2. It benefits from the staging site, as described here

@alamb alamb mentioned this pull request Aug 22, 2025
@alamb
Copy link
Contributor Author

alamb commented Aug 22, 2025

@adriangb an update

  1. I added a several diagrams to describe topk and the background
  2. I rewrote some of the intro and restructured the flow so it was building up the explanation rather than following the implementation timeline
  3. I moved some of the content around

This has resulted in a bit of a frankenstein 👹 at the moment -- I will fix it up shortly

You can see the current setup here: https://datafusion.staged.apache.org/blog/2025/09/01/dynamic-filters/

Question: Do you mind if I add myself a the second author?

@alamb
Copy link
Contributor Author

alamb commented Aug 22, 2025

Screenshot 2025-08-22 at 11 46 27 AM

@adriangb
Copy link
Contributor

Question: Do you mind if I add myself a the second author?

You're making extensive edits, you should add yourself 😄

@adriangb
Copy link
Contributor

FWIW I like the new figures but I also like the idea of annotating a plan that looks ~ the output from running explain on a query.

@alamb
Copy link
Contributor Author

alamb commented Aug 22, 2025

FWIW I like the new figures but I also like the idea of annotating a plan that looks ~ the output from running explain on a query.

Yes absolutely -- I think there is room for both -- basically I want to write a "everyman's explanation of Topk/dynamic filters" and then show how that translates into DataFusion.

@adriangb
Copy link
Contributor

Sounds like a good plan to me!

@alamb
Copy link
Contributor Author

alamb commented Aug 24, 2025

I absolutely have reviewing / working on this article on my list this week, but it will likely take me a few days

@alamb
Copy link
Contributor Author

alamb commented Sep 4, 2025

I am very sorry for the delay -- I have been away, but plan to keep working on this post now that i have returned

THank you for your patience

@XiangpengHao
Copy link
Contributor

Love this blog! Thank you @adriangb and @alamb for the hard work and putting it together -- easily my favorite read of the year!

@alamb
Copy link
Contributor Author

alamb commented Sep 9, 2025

I plan to publish this blog post tomorrow unless there are further comments

Copy link

@djanderson djanderson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this post, really cool.

Copy link
Contributor

@nuno-faria nuno-faria left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really great read!

@comphead
Copy link
Contributor

comphead commented Sep 9, 2025

@adriangb @alamb what is still unclear for me, how the dynamic filter gets the min value? is it under the hood by scanning lets say stats, like parquet file footers or histograms?

@adriangb
Copy link
Contributor

adriangb commented Sep 9, 2025

@adriangb @alamb what is still unclear for me, how the dynamic filter gets the min value? is it under the hood by scanning lets say stats, like parquet file footers or histograms?

We can probably improve the wording but for:

  1. The TopK operator it comes from the TopK Heap.
  2. For joins we accumulate min/max values of the join keys for each partition as we build the build side.
    So the values are coming from the data itself.

@comphead
Copy link
Contributor

comphead commented Sep 9, 2025

  1. The TopK operator it comes from the TopK Heap.
  2. For joins we accumulate min/max values of the join keys for each partition as we build the build side.
    So the values are coming from the data itself.

Maybe it requires slightly more details for the reader. I'm still trying to grasp the idea. 🤔
First of all having the filter makes a lot of sense as we do not scan whats unnecessary.

However to get the filter value (it doesn't have to be super accurate, just close to reduce the reading scope) it is possible to scan select min(ts) from t1 first, and this refers to a single column which might be cheap, and even cheaper if min/max can be derived from the footer, and then apply the value for TopK filter.

For the heap though the algorithm still not clear for me. How it makes sure we dont need to scan 100M rows as before, is it for any scenario, or when underlying files data are sorted? If the heap stored topK it still need to read all the rows? the benefit is we don't pay for full sorting and just for rebuild a heap.

@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

However to get the filter value (it doesn't have to be super accurate, just close to reduce the reading scope) it is possible to scan select min(ts) from t1 first, and this refers to a single column which might be cheap, and even cheaper if min/max can be derived from the footer, and then apply the value for TopK filter.

I think one major idea is to reuse state / information that is already present in the operators -- so for example the TopK operator already has a topK heap, and the dynamic filter concept allows this information to be passed down to the scan.

How it makes sure we dont need to scan 100M rows as before, is it for any scenario, or when underlying files data are sorted?

I don't think the dynamic filter has any guarantees that it will filter rows -- for example, in the pathalogical case where the data is scanned in reverse order, it will not filter any

However, the idea is that updaing the dynamic filter is cheap and it does help in many real world settings, so it is overall a good optimization to do

@adriangb
Copy link
Contributor

Besides, scanning the data in the precisely reverse order to the query is bad dynamic filters or not and we should fix that

@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

I am going to incorporate the feedback from @comphead and @nuno-faria in the next few hours

@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

Besides, scanning the data in the precisely reverse order to the query is bad dynamic filters or not and we should fix that

If it is known that the data is in the wrong order for sure. I am not sure DataFusion always knows how data is distributed across files.

Another potentially pathological case is when the data is randomly distributed throughout the files (so no files or row groups can be pruned out)

@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

@adriangb @alamb what is still unclear for me, how the dynamic filter gets the min value? is it under the hood by scanning lets say stats, like parquet file footers or histograms?

@comphead -- the min value is the minimum value of what has already been seen during query execution.

  • So when the query starts, there is no min value.
  • As data flows through and the top K operator starts updating the heap, the min value is set
  • As more data flows through and the heap is refined (new values potentially replace existing values) then the min value can be updated as well

The TopK operator heap here:
https://github.com/apache/datafusion/blob/ab108a50d75e4e12fb6ebbfac0d0bffa24c265ea/datafusion/physical-plan/src/topk/mod.rs#L119

@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

I am happy to keep making clarifications / etc on tickets and this blog in follow on PRs. But for now, let's publish it as I have already delayed it by several weeks and I would like this to be available before the NYC meetup next week

Onwards 🚀

@alamb alamb merged commit f69c5cc into main Sep 10, 2025
1 check passed
@comphead
Copy link
Contributor

@adriangb @alamb what is still unclear for me, how the dynamic filter gets the min value? is it under the hood by scanning lets say stats, like parquet file footers or histograms?

@comphead -- the min value is the minimum value of what has already been seen during query execution.

  • So when the query starts, there is no min value.
  • As data flows through and the top K operator starts updating the heap, the min value is set
  • As more data flows through and the heap is refined (new values potentially replace existing values) then the min value can be updated as well

The TopK operator heap here: https://github.com/apache/datafusion/blob/ab108a50d75e4e12fb6ebbfac0d0bffa24c265ea/datafusion/physical-plan/src/topk/mod.rs#L119

Thanks @alamb I was referring to statement

Figure 3 is better, but it still reads and decodes all 100M rows of the hits table, which is often unnecessary once we have found the top 10 rows

which implies the optimization doesn't have to read and decode 100M rows to get top10, and I cannot see how this is exactly happening 🤔 This is makes sense with the filter, but to get the min value for the filter we still to full scan, that is something i'm still missing, lets go ahead, yes, thanks for explanations

@alamb alamb deleted the site/dynamic-filters branch September 10, 2025 16:15
@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

This is makes sense with the filter, but to get the min value for the filter we still to full scan, that is something i'm still missing, lets go ahead, yes, thanks for explanations

Let's take the best case, which is

  • after reading the first batch from the first file, DataFusion has read the actual minimum value

While it is true DataFusion now still needs to check all remaining files to ensure this is actually the minimum value, it may not have to actually open and read and decode the rows in the file -- for example, it could potentially prune (skip) all remaining files using statistics. And even if it can't prune out the entire file, it may be able to prune row groups, or ranges of rows (if pushdown_filters) is turned on

@comphead
Copy link
Contributor

This is makes sense with the filter, but to get the min value for the filter we still to full scan, that is something i'm still missing, lets go ahead, yes, thanks for explanations

Let's take the best case, which is

  • after reading the first batch from the first file, DataFusion has read the actual minimum value

While it is true DataFusion now still needs to check all remaining files to ensure this is actually the minimum value, it may not have to actually open and read and decode the rows in the file -- for example, it could potentially prune (skip) all remaining files using statistics. And even if it can't prune out the entire file, it may be able to prune row groups, or ranges of rows (if pushdown_filters) is turned on

Oh I think I'm getting the picture now. So it is not only derived from data itself(like I was told) it is hybrid, data + parquet stats. That makes sense now, so we have an assumption that some value in the heap is approximate just to remove unnecessary reads, because it is still better than full scan. Best case scenario if we got the min value from the first batch, worst case still should be cheaper than full scan

@alamb
Copy link
Contributor Author

alamb commented Sep 10, 2025

Oh I think I'm getting the picture now. So it is not only derived from data itself(like I was told) it is hybrid, data + parquet stats.

Yeah, I think the predicate (min value) itself is derived only from data, but actually using it to make the query faster relies on statistics (and all the other parts of multi-level pruning)

@adriangb
Copy link
Contributor

@nuno-faria what computer (or at least how many cores) did you run #103 (comment) on?

@nuno-faria
Copy link
Contributor

@nuno-faria what computer (or at least how many cores) did you run #103 (comment) on?

@adriangb I used a 6-core CPU (12 execution threads).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Blog post about TopK filter pushdown
7 participants