-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Only update TopK dynamic filters if the new ones are more selective #16433
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
/// Filter matching the state of the sort for dynamic filter pushdown. | ||
/// If `fetch` is `Some`, this will also be set and a TopK operator may be used. | ||
/// If `fetch` is `None`, this will be `None`. | ||
filter: Option<TopKDynamicFilters>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like there's some further refactoring that could happen here, e.g. split up SortExec, leaving for another day.
pub fn with_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> Self { | ||
self.filter = Some(filter); | ||
self | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was unused and had slipped through the cracks. I can make a new PR to just remove these methods.
self.filter | ||
.as_ref() | ||
.expect("Filter should be set when fetch is Some") | ||
.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I refactored so that the TopK
struct always expects this parameter which better reflects the reality of execution. But since it's strangely tied to the fetch
param I am doing an expect
assertion here. It should never fail at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend turning this into an internal error so that if someone does hit this for some reason the symptom is less severe
It seems in some cases it's faster:
And in other cases it's mixed / slower:
|
Seems like a bug in my implementation right? I'd be surprised if the update checks I added are that heavy compared to other work... |
let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? else { | ||
return Ok(()); | ||
}; | ||
|
||
// Are the new thresholds more selective than our existing ones? | ||
let should_update = { | ||
let mut current = self.filter.thresholds.write(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps this lock takes too long + overhead of doing this for all updates + all partitions quite often?
We could also store a Row
instead of thresholds
to make comparison much quicker (should also be able to avoid allocations)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be a good idea anyway to simplify the code.
Hm my earlier benchmarks didn't seem correct. not sure where the earlier run came from 🤔 |
In hindsight I think actually another fact that we don't see topk being as effective with more partitions is that spreading them over partitions will essentially make Synchronizing based on the |
(Removed a non-working proposal). I am wondering if we can somehow synchronize the values in the heap efficiently in order to make the filter for a |
I think the only way to do that would be to make a global heap and put a lock on it, similar to what we do with the filters? |
What do the current ones show? Not much improvement? |
Yes about the same compared to main. |
We could try a shared heap. It might work? I guess it will be a sort of balance between lock contention and better selectivity. Maybe we can balance it by having distinct heaps for writes with no locks but read only references to all of them so that when we do reads we compute on the fly the "combined" heap? Then we don't need any locks. The cost is that computations on the heap are larger but as long as |
how would you compute the shared heap on the fly? I was thinking something similar: each write to a own heap, only write the ones that updated the local heap to the shared heap (limiting the lock access time). |
I was thinking you'd compute the top K of the top K * partitions on the fly. But maybe your proposal makes more sense. |
Marking as draft until we sort out #16501 |
I did a bench run, confounding results:
Mostly... slower |
I am taking a look now, see if I can find a thing |
if let Some(current_row) = threshold_guard.as_ref() { | ||
match current_row.as_slice().cmp(new_threshold_row) { | ||
Ordering::Greater => { | ||
// new < current, so new threshold is more selective |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was wrong before @adriangb - in the heap lower means more selective
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current solution seems roughly on par with main, I don't observe a speedup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess we should try changing it to the global heap thing - write updates to the shared/global heap and update the filter based on global heap.
I tried some stuff. Still not an obvious improvement:
|
ClickBench results for partitioned datasets look much better, the only query that is truly slower is a very fast one anyway (so probably noise). But also it seems like most of these should not be faster 🤔 so maybe my measurement is off.
|
@alamb I think you've reviewed but not approved, I thought you had approved. Can you take another look at this PR when you get a chance? I think I've addressed all of the feedback. |
🤖 |
Looking at it now -- I kicked off the benchmarks again after making some changes to my gcp machine that hopefully will make the results more consistent |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
🤖 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @adriangb -- I went over this carefully again.
As long as the benchmark runs look good I think this PR is ready to go. A very nice optimization 👏
I found one small potential improvement (which you already merged :) )
I am working on a follow on PR to extract the early stream stop
Again, really nice work and thank you
let new_threshold_row = &max_row.row; | ||
|
||
// Extract scalar values BEFORE acquiring lock to reduce critical section | ||
let thresholds = match self.heap.get_threshold_values(&self.expr)? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can move this down after the check for update too:
.and_then(|b| schema_mapping.map_batch(b).map_err(Into::into)) | ||
}); | ||
// Create a stateful stream that can check pruning after each batch | ||
let adapted = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found this code somewhat 🤯 (and this function is already 100s of lines long) I spent some time refactoring it into its own stream for readability and I also understand it better now. I'll put up a follow on PR to extract this logic -- no need to do it in this one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖: Benchmark completed Details
|
Benchmarks look good, several faster queries, no queries really slower! I'll merge this in the next couple hours if you don't first Andrew. |
Closes #16432
The idea here is to introduce a global thresholds reference that gets updated across all partitions.
This could drastically speed up early termination and will also avoid re-evaluating file-level statistics pruning in ParquetOpener.
I've also swapped our use of
Arc<RwLock<T>>
toArc<ArcSwap<T>>
which should offer some perf improvements.