-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Optimize merging of partial case expression results #18152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
macOS test failure is unrelated afaict. Looks like a DNS issue on the test runner. |
206af12
to
39ab973
Compare
24e3d38
to
39ab973
Compare
4442c54
to
7539068
Compare
interleave
to compose case expression results
@alamb could you run the benchmarks against this? |
The test failure for f49d3ea seems unrelated. Pulling in changes from |
96451ac
to
864b156
Compare
interleave
to compose case expression results864b156
to
01d51d8
Compare
e7d3ec0
to
a7bbd8f
Compare
a7bbd8f
to
9cb6496
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed a little bit will try to review more later
// Merge into a single array. | ||
|
||
let data_refs = self.partial_results.iter().collect(); | ||
let mut mutable = MutableArrayData::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using mutable array is not very performant for the case when there can be a small range of values from the same array.
Isn't it basically an interleave?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, what I'm doing here is less general than interleave
. I tried that at first, but it gave pretty bad performance in certain cases. This is more like a multi array zip
. In contrast to zip
the rows are not expected to be lined up here. Instead values are taken from the start of each array. I'll try to make an ascii art drawing of what's being going on.
Regarding the usage of MutableArrayData
, I took inspiration from the zip
implementation. I tried to avoid reaching this point in the trivial cases to avoid overhead where possible. This code path is not taken for the simple evaluation methods either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be useful to explicitly point out that the big change in this PR is that the per branch results are no longer scattered back to the length of the input record batch. Instead the potentially small results are held as is in small arrays. Only at the end everything gets consolidated.
What's not yet handled in an optimal fashion by this code is the variant you're working where you know up front that all the values are going to be scalars. That's intentional, I'm only trying to improve the general case first. In other words, please compare with the status quo on main
and not with all the potential further optimisations that might be possible.
One further optimisation that would be nice to add would be based on apache/arrow-rs#8658. That would allow us to avoid expanding scalars to arrays and instead fold that into the merge operation. Not available for use just yet though, so that will have to wait for later.
apache/arrow-rs#8653 will be useful for ExprOrExpr
, but is going to be of more limited use in the general eval methods. You can only zip two scalars and at that point you have a scalar and a non scalar. When we have to reduce more than two arrays it's back to regular zip (which is kind of what I'm doing here, but without the alignment requirement and in a single pass for all arrays).
Ok(()) | ||
} | ||
|
||
fn add_partial_result(&mut self, rows: &ArrayRef, data: ArrayData) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add comments what does rows for and data for?
And rename if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't that just going to be repeating the comments from add_branch_result
? I don't think it's very useful to repeat the same thing over and over again. It's not like this is public API.
// An optional result that is the covering result for all rows. | ||
// This is used as an optimisation to avoid the cost of merging when all rows | ||
// evaluate to the same case branch. | ||
covering_result: Option<ColumnarValue>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does covering means
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't come up with a better word for this. It's in contrast with partial and as in 'covering index'.
This gets filled in when one branch of the case expression matches all the input rows from the record batch. In that case there's no need to do the complex merge logic.
Open to suggestions for a better name.
.iter() | ||
.map(|a| filter_array(a, filter)) | ||
.collect::<std::result::Result<Vec<_>, _>>()?; | ||
unsafe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a hot loop? If not why unsafe is needed? The check is inexpensive
Generally when writing unsafe comment you need to write a comment why it is safe to use and what is the benefit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the case expression, but it can be. We need to create subset record batches for each when branch and the possibly the else branch. The validation checks that try_new
would do are completely redundant here.
I've added some comments and a pointer to apache/arrow-rs#8693.
else_expr: Option<Arc<dyn PhysicalExpr>>, | ||
/// Evaluation method to use | ||
eval_method: EvalMethod, | ||
pub eval_method: EvalMethod, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the pub is needed here?
And also this is an implementation detail that I don't think need to be expose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a debugging remnant. I'll get rid of that.
|
||
#[derive(Debug, Hash, PartialEq, Eq)] | ||
enum EvalMethod { | ||
pub enum EvalMethod { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pub here, this is an implementation detail that I don't think we should expose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed. Was committed by accident. I had made this public temporarily so I could print out the eval method in the benchmarks.
2e6d07a
to
5123d5a
Compare
5123d5a
to
7f60d68
Compare
Which issue does this PR close?
Rationale for this change
Case evaluation currently uses
PhysicalExpr::evaluate_selection
for each branch of the case expression. This implementation is fine, but becauseevaluate_selection
is not specific to thecase
logic we're missing some optimisation opportunities. The main consequence is that too much work is being done filtering record batches and scattering results. This PR introduces specialised filtering logic and result interleaving for case.A more detailed description and diagrams are available at #18075 (comment)
What changes are included in this PR?
Rewrite the
case_when_no_expr
andcase_when_with_expr
evaluation loops to avoid as much unnecessary work as possible. In particular the remaining rows to be evaluated are retained across loop iterations. This allows the record batch that needs to be filtered to shrink as the loop is being evaluated which reduces the number of rows that needs to be refiltered. If a when predicate does not match any rows at all, filtering is avoided entirely.The final result is also not merged every loop iteration. Instead an index vector is constructed which is used to compose the final result once using a custom 'multi zip'/'interleave' like operation.
Are these changes tested?
Covered by existing unit tests and SLTs
Are there any user-facing changes?
No