-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-53805][SQL] Push Variant into DSv2 scan #52522
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM from my side.
|
cc @peter-toth , too. |
| hadoopFsRelation@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _)) => | ||
| rewritePlan(p, projectList, filters, relation, hadoopFsRelation) | ||
| case p@PhysicalOperation(projectList, filters, relation: DataSourceV2Relation) => | ||
| rewriteV2RelationPlan(p, projectList, filters, relation.output, relation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are sending the relation already do we need to send the relation.output seperately ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I overlooked this. Removed.
| SchemaPruning, | ||
| GroupBasedRowLevelOperationScanPlanning, | ||
| V1Writes, | ||
| PushVariantIntoScan, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now PushVariantIntoScan runs before the PruneFileSourcePartition, which i think was for v1 sources, does this matter or if i were to ask did we just like add in later, just because it was a new rule ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think variant columns will ever be used in the partition schema. Schema transformations by PushVariantIntoScan shouldn't affect partition pruning in v1 sources.
| relation @ LogicalRelationWithTable( | ||
| hadoopFsRelation@HadoopFsRelation(_, _, _, _, _: ParquetFileFormat, _), _)) => | ||
| rewritePlan(p, projectList, filters, relation, hadoopFsRelation) | ||
| case p@PhysicalOperation(projectList, filters, relation: DataSourceV2Relation) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any code we can share between the v1 rewritePlan and the v2 rewriteV2RelationPlan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, there’s shared logic. I intentionally left the v1 rewritePlan unchanged in this PR to keep the diff small and easier to review. After this merges, I’ll do a small follow-up to have v1 rewritePlan reuse the common code. If you prefer, I can fold that refactor into this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's actually harder to review as I can't tell what's the key difference between the v1 and v2 versions with the current PR...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the confusion. I have updated the code.
The logic for transforming variant columns to struct is identical between DSv1 and DSv2. Now they both use the same helper methods (collectAndRewriteVariants, buildAttributeMap, buildFilterAndProject).
The only difference is how the transformed schema is communicated to the data source. DSv1 stores the new schema in HadoopFsRelation.dataSchema and the file source reads this field directly; DSv2 has no schema field to update. The schema is communicated later when V2ScanRelationPushDown calls pruneColumns.
|
Merged to master. Thanks everyone for the review! |
|
Thank you, @huaxingao and all! |
…PushDownVariants ### What changes were proposed in this pull request? This patch goes to add DSv2 support to the optimization rule `PushVariantIntoScan`. The `PushVariantIntoScan` rule only supports DSv1 Parquet (`ParquetFileFormat`) source. It limits the effectiveness of variant type usage on DSv2. ### Why are the changes needed? Although #52522 tried to add DSv2 support recently, the implementation implicitly binds `pruneColumns` to this variant access pushdown which could cause unexpected errors on the DSv2 datasources which don't support that. It also breaks the API semantics. We need an explicit API between Spark and DSv2 datasource for the feature. #52522 also didn't test through this DSv2 variant pushdown feature actually on the built-in DSv2 Parquet datasource but on InMemoryTable. This patch reverts #52522 and proposes a new approach with comprehensive test coverage. ### Does this PR introduce _any_ user-facing change? Yes. After this PR, if users enable `spark.sql.variant.pushVariantIntoScan`, they can push down variant column accesses into DSv2 datasource if it is supported. ### How was this patch tested? Added new unit test suites `PushVariantIntoScanV2Suite` and `PushVariantIntoScanV2VectorizedSuite`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.13 Closes #52578 from viirya/pushvariantdsv2-pr. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request? Push Variant into DSv2 scan ### Why are the changes needed? with the change, DSV2 scan only needs to fetch the necessary shredded columns required by the plan ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? new tests ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#52522 from huaxingao/variant-v2-pushdown. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Huaxin Gao <[email protected]>
…PushDownVariants ### What changes were proposed in this pull request? This patch goes to add DSv2 support to the optimization rule `PushVariantIntoScan`. The `PushVariantIntoScan` rule only supports DSv1 Parquet (`ParquetFileFormat`) source. It limits the effectiveness of variant type usage on DSv2. ### Why are the changes needed? Although apache#52522 tried to add DSv2 support recently, the implementation implicitly binds `pruneColumns` to this variant access pushdown which could cause unexpected errors on the DSv2 datasources which don't support that. It also breaks the API semantics. We need an explicit API between Spark and DSv2 datasource for the feature. apache#52522 also didn't test through this DSv2 variant pushdown feature actually on the built-in DSv2 Parquet datasource but on InMemoryTable. This patch reverts apache#52522 and proposes a new approach with comprehensive test coverage. ### Does this PR introduce _any_ user-facing change? Yes. After this PR, if users enable `spark.sql.variant.pushVariantIntoScan`, they can push down variant column accesses into DSv2 datasource if it is supported. ### How was this patch tested? Added new unit test suites `PushVariantIntoScanV2Suite` and `PushVariantIntoScanV2VectorizedSuite`. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Claude Code v2.0.13 Closes apache#52578 from viirya/pushvariantdsv2-pr. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
|
For the record, Apache Spark 4.1.0 RC2 got -1 due to the whole feature across three PRs including this.
Inevitably, to unblock Apache Spark 4.1.0, we are re-evaluating this whole feature. |
…tal` ### What changes were proposed in this pull request? This PR aims to mark `SupportsPushDownVariants` as `Experimental` instead of `Evolving` in Apache Spark 4.1.x. ### Why are the changes needed? During Apache Spark 4.1.0 RC2, it turns out that this new `Variant` improvement feature still needs more time to stabilize. - #52522 - #52578 - #53276 - [[VOTE] Release Spark 4.1.0 (RC2)](https://lists.apache.org/thread/og4dn0g7r92qj22fdsmqoqs518k324q5) We had better mark this interface itself as `Experimental` in Apache Spark 4.1.0 while keeping it `Evolving` in `master` branch. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #53354 from dongjoon-hyun/SPARK-54616. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
Push Variant into DSv2 scan
Why are the changes needed?
with the change, DSV2 scan only needs to fetch the necessary shredded columns required by the plan
Does this PR introduce any user-facing change?
No
How was this patch tested?
new tests
Was this patch authored or co-authored using generative AI tooling?
No