Skip to content

Commit d306b04

Browse files
committed
fix DSv2 in PushVariantIntoScan by adding SupportsPushDownVariants
1 parent 9aab260 commit d306b04

File tree

7 files changed

+779
-220
lines changed

7 files changed

+779
-220
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read;
19+
20+
import org.apache.spark.annotation.Evolving;
21+
22+
/**
23+
* A mix-in interface for {@link Scan}. Data sources can implement this interface to
24+
* support pushing down variant field access operations to the data source.
25+
* <p>
26+
* When variant columns are accessed with specific field extractions (e.g., variant_get),
27+
* the optimizer can push these accesses down to the data source. The data source can then
28+
* read only the required fields from variant columns, reducing I/O and improving performance.
29+
* <p>
30+
* The typical workflow is:
31+
* <ol>
32+
* <li>Optimizer analyzes the query plan and identifies variant field accesses</li>
33+
* <li>Optimizer calls {@link #pushVariantAccess} with the access information</li>
34+
* <li>Data source validates and stores the variant access information</li>
35+
* <li>Optimizer retrieves pushed information via {@link #pushedVariantAccess}</li>
36+
* <li>Data source uses the information to optimize reading in {@link #readSchema()}
37+
* and readers</li>
38+
* </ol>
39+
*
40+
* @since 4.1.0
41+
*/
42+
@Evolving
43+
public interface SupportsPushDownVariants extends Scan {
44+
45+
/**
46+
* Pushes down variant field access information to the data source.
47+
* <p>
48+
* Implementations should validate if the variant accesses can be pushed down based on
49+
* the data source's capabilities. If some accesses cannot be pushed down, the implementation
50+
* can choose to:
51+
* <ul>
52+
* <li>Push down only the supported accesses and return true</li>
53+
* <li>Reject all pushdown and return false</li>
54+
* </ul>
55+
* <p>
56+
* The implementation should store the variant access information that can be pushed down.
57+
* The stored information will be retrieved later via {@link #pushedVariantAccess()}.
58+
*
59+
* @param variantAccessInfo Array of variant access information, one per variant column
60+
* @return true if at least some variant accesses were pushed down, false if none were pushed
61+
*/
62+
boolean pushVariantAccess(VariantAccessInfo[] variantAccessInfo);
63+
64+
/**
65+
* Returns the variant access information that has been pushed down to this scan.
66+
* <p>
67+
* This method is called by the optimizer after {@link #pushVariantAccess} to retrieve
68+
* what variant accesses were actually accepted by the data source. The optimizer uses
69+
* this information to rewrite the query plan.
70+
* <p>
71+
* If {@link #pushVariantAccess} was not called or returned false, this should return
72+
* an empty array.
73+
*
74+
* @return Array of pushed down variant access information
75+
*/
76+
VariantAccessInfo[] pushedVariantAccess();
77+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.connector.read;
19+
20+
import java.io.Serializable;
21+
import java.util.Objects;
22+
23+
import org.apache.spark.annotation.Evolving;
24+
import org.apache.spark.sql.types.StructType;
25+
26+
/**
27+
* Variant access information that describes how variant fields are accessed in a query.
28+
* <p>
29+
* This class captures the information needed by data sources to optimize reading variant columns.
30+
* Instead of reading the entire variant value, the data source can read only the fields that
31+
* are actually accessed, represented as a structured schema.
32+
* <p>
33+
* For example, if a query accesses `variant_get(v, '$.a', 'int')` and
34+
* `variant_get(v, '$.b', 'string')`, the extracted schema would be
35+
* `struct&lt;0:int, 1:string&gt;` where field ordinals correspond to the access order.
36+
*
37+
* @since 4.1.0
38+
*/
39+
@Evolving
40+
public final class VariantAccessInfo implements Serializable {
41+
private final String columnName;
42+
private final StructType extractedSchema;
43+
44+
/**
45+
* Creates variant access information for a variant column.
46+
*
47+
* @param columnName The name of the variant column
48+
* @param extractedSchema The schema representing extracted fields from the variant.
49+
* Each field represents one variant field access, with field names
50+
* typically being ordinals (e.g., "0", "1", "2") and metadata
51+
* containing variant-specific information like JSON path.
52+
*/
53+
public VariantAccessInfo(String columnName, StructType extractedSchema) {
54+
this.columnName = Objects.requireNonNull(columnName, "columnName cannot be null");
55+
this.extractedSchema =
56+
Objects.requireNonNull(extractedSchema, "extractedSchema cannot be null");
57+
}
58+
59+
/**
60+
* Returns the name of the variant column.
61+
*/
62+
public String columnName() {
63+
return columnName;
64+
}
65+
66+
/**
67+
* Returns the schema representing fields extracted from the variant column.
68+
* <p>
69+
* The schema structure is:
70+
* <ul>
71+
* <li>Field names: Typically ordinals ("0", "1", "2", ...) representing access order</li>
72+
* <li>Field types: The target data type for each field extraction</li>
73+
* <li>Field metadata: Contains variant-specific information such as JSON path,
74+
* timezone, and error handling mode</li>
75+
* </ul>
76+
* <p>
77+
* Data sources should use this schema to determine what fields to extract from the variant
78+
* and what types they should be converted to.
79+
*/
80+
public StructType extractedSchema() {
81+
return extractedSchema;
82+
}
83+
84+
@Override
85+
public boolean equals(Object o) {
86+
if (this == o) return true;
87+
if (o == null || getClass() != o.getClass()) return false;
88+
VariantAccessInfo that = (VariantAccessInfo) o;
89+
return columnName.equals(that.columnName) &&
90+
extractedSchema.equals(that.extractedSchema);
91+
}
92+
93+
@Override
94+
public int hashCode() {
95+
return Objects.hash(columnName, extractedSchema);
96+
}
97+
98+
@Override
99+
public String toString() {
100+
return "VariantAccessInfo{" +
101+
"columnName='" + columnName + '\'' +
102+
", extractedSchema=" + extractedSchema +
103+
'}';
104+
}
105+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@ class SparkOptimizer(
4040
SchemaPruning,
4141
GroupBasedRowLevelOperationScanPlanning,
4242
V1Writes,
43-
PushVariantIntoScan,
4443
V2ScanRelationPushDown,
4544
V2ScanPartitioningAndOrdering,
4645
V2Writes,
47-
PruneFileSourcePartitions)
46+
PruneFileSourcePartitions,
47+
PushVariantIntoScan)
4848

4949
override def preCBORules: Seq[Rule[LogicalPlan]] =
5050
Seq(OptimizeMetadataOnlyDeleteFromTable)

0 commit comments

Comments
 (0)