-
Notifications
You must be signed in to change notification settings - Fork 1.6k
fix EquivalenceProperties calculation in DataSourceExec #17323
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
TODOs:
|
@@ -595,7 +595,7 @@ fn test_no_pushdown_through_aggregates() { | |||
Ok: | |||
- FilterExec: b@1 = bar | |||
- CoalesceBatchesExec: target_batch_size=100 | |||
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt], ordering_mode=PartiallySorted([0]) | |||
- AggregateExec: mode=Final, gby=[a@0 as a, b@1 as b], aggr=[cnt] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this, need to think about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm guessing some order info is missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix was trivial: 2a80495
I had just forgotten to update the test code.
fn update_oeq_cache(&mut self) -> Result<()> { | ||
// Renormalize orderings if the equivalence group changes: | ||
let normal_cls = mem::take(&mut self.oeq_cache.normal_cls); | ||
let normal_orderings = normal_cls | ||
.into_iter() | ||
.map(|o| self.eq_group.normalize_sort_exprs(o)); | ||
self.oeq_cache.normal_cls = OrderingEquivalenceClass::new(normal_orderings); | ||
self.oeq_cache.update_map(); | ||
// Discover any new orderings based on the new equivalence classes: | ||
let leading_exprs: Vec<_> = self.oeq_cache.leading_map.keys().cloned().collect(); | ||
for expr in leading_exprs { | ||
self.discover_new_orderings(expr)?; | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ~ same code was in 2 different places, this is a drive by deduplication
@@ -1000,7 +1000,7 @@ fn roundtrip_parquet_exec_with_custom_predicate_expr() -> Result<()> { | |||
self: Arc<Self>, | |||
_children: Vec<Arc<dyn PhysicalExpr>>, | |||
) -> Result<Arc<dyn PhysicalExpr>> { | |||
todo!() | |||
Ok(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now gets called by reassign_predicate_columns
@berkaysynnada could you review this since you folks wrote a lot of the equivalence property machinery? The main question in my mind is what version of filters equivalence properties should be working on: projected or unprojected. It seems that right now they deal with projected, but I'm not sure why, and it leads to the weirdness where we have to reassign but ignore missing columns. |
FWIW we've put this into production and haven't had issues stemming from this change |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the big picture problem imo is that the equivalence info / cache is at the DataSourceExec level, the projection is at the FileScanConfig level and the filter is at the FileSource level. So the information / code is distributed across 3 places
I agree with this.
The PR generally looks good to me. Do we want to wait for #17242 or apply the change first
I think we should apply this first, especially since we can pretty easily put it out as a hot fix, there are no or very minimal breaking changes in this PR versus the other one is going to have pretty extensive breaking changes. |
similar to #17077, equivalence info was being lost. this time it happens when a dynamic filter is pushed down (which is now unblocked in some queries thanks to #17238).
the tldr is that the second filter comes around and we try to re-compute equivalence info using only the second filter and since we no longer have the first filter we ignore that one -> that info gets lost. my solution in this PR is to re-calculate all of the properties every time in one place instead of trying to stitch together old and new properties, which is error prone and more code anwyay.
the big picture problem imo is that the equivalence info / cache is at the DataSourceExec level, the projection is at the FileScanConfig level and the filter is at the FileSource level. So the information / code is distributed across 3 places. I'm hoping that #17242 is the long term fix to that but in the meantime this should make things at least a bit more robust by not trying to combine info from two different places.
MRE: