Skip to content

Commit eab58d4

Browse files
authored
feat: auto scan mode should check for supported file location (#1930)
1 parent 32ac269 commit eab58d4

File tree

5 files changed

+76
-3
lines changed

5 files changed

+76
-3
lines changed

dev/diffs/3.4.3.diff

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2868,6 +2868,28 @@ index 52abd248f3a..7a199931a08 100644
28682868
case h: HiveTableScanExec => h.partitionPruningPred.collect {
28692869
case d: DynamicPruningExpression => d.child
28702870
}
2871+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
2872+
index de3b1ffccf0..2a76d127093 100644
2873+
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
2874+
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
2875+
@@ -23,14 +23,15 @@ import java.util.concurrent.{Executors, TimeUnit}
2876+
import org.scalatest.BeforeAndAfterEach
2877+
2878+
import org.apache.spark.metrics.source.HiveCatalogMetrics
2879+
-import org.apache.spark.sql.QueryTest
2880+
+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest}
2881+
import org.apache.spark.sql.execution.datasources.FileStatusCache
2882+
import org.apache.spark.sql.hive.test.TestHiveSingleton
2883+
import org.apache.spark.sql.internal.SQLConf
2884+
import org.apache.spark.sql.test.SQLTestUtils
2885+
2886+
class PartitionedTablePerfStatsSuite
2887+
- extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
2888+
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach
2889+
+ with IgnoreCometSuite {
2890+
2891+
override def beforeEach(): Unit = {
2892+
super.beforeEach()
28712893
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
28722894
index a902cb3a69e..800a3acbe99 100644
28732895
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

dev/diffs/3.5.6.diff

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2883,6 +2883,28 @@ index 549431ef4f4..e48f1730da6 100644
28832883
withTempDir { dir =>
28842884
withSQLConf(
28852885
"parquet.crypto.factory.class" ->
2886+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
2887+
index de3b1ffccf0..2a76d127093 100644
2888+
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
2889+
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
2890+
@@ -23,14 +23,15 @@ import java.util.concurrent.{Executors, TimeUnit}
2891+
import org.scalatest.BeforeAndAfterEach
2892+
2893+
import org.apache.spark.metrics.source.HiveCatalogMetrics
2894+
-import org.apache.spark.sql.QueryTest
2895+
+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest}
2896+
import org.apache.spark.sql.execution.datasources.FileStatusCache
2897+
import org.apache.spark.sql.hive.test.TestHiveSingleton
2898+
import org.apache.spark.sql.internal.SQLConf
2899+
import org.apache.spark.sql.test.SQLTestUtils
2900+
2901+
class PartitionedTablePerfStatsSuite
2902+
- extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
2903+
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach
2904+
+ with IgnoreCometSuite {
2905+
2906+
override def beforeEach(): Unit = {
2907+
super.beforeEach()
28862908
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
28872909
index 6160c3e5f6c..0956d7d9edc 100644
28882910
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

dev/diffs/4.0.0-preview1.diff

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3079,6 +3079,28 @@ index 52abd248f3a..7a199931a08 100644
30793079
case h: HiveTableScanExec => h.partitionPruningPred.collect {
30803080
case d: DynamicPruningExpression => d.child
30813081
}
3082+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
3083+
index de3b1ffccf0..2a76d127093 100644
3084+
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
3085+
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
3086+
@@ -23,14 +23,15 @@ import java.util.concurrent.{Executors, TimeUnit}
3087+
import org.scalatest.BeforeAndAfterEach
3088+
3089+
import org.apache.spark.metrics.source.HiveCatalogMetrics
3090+
-import org.apache.spark.sql.QueryTest
3091+
+import org.apache.spark.sql.{IgnoreCometSuite, QueryTest}
3092+
import org.apache.spark.sql.execution.datasources.FileStatusCache
3093+
import org.apache.spark.sql.hive.test.TestHiveSingleton
3094+
import org.apache.spark.sql.internal.SQLConf
3095+
import org.apache.spark.sql.test.SQLTestUtils
3096+
3097+
class PartitionedTablePerfStatsSuite
3098+
- extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach {
3099+
+ extends QueryTest with TestHiveSingleton with SQLTestUtils with BeforeAndAfterEach
3100+
+ with IgnoreCometSuite {
3101+
3102+
override def beforeEach(): Unit = {
3103+
super.beforeEach()
30823104
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
30833105
index 0bcac639443..8957c76886f 100644
30843106
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -258,11 +258,15 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
258258
}
259259

260260
private def selectScan(scanExec: FileSourceScanExec, partitionSchema: StructType): String = {
261-
// TODO these checks are not yet exhaustive. For example, native_iceberg_compat does
262-
// not support reading from S3
263261

264262
val fallbackReasons = new ListBuffer[String]()
265263

264+
// native_iceberg_compat only supports local filesystem and S3
265+
if (!scanExec.relation.inputFiles
266+
.forall(path => path.startsWith("file://") || path.startsWith("s3a://"))) {
267+
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT only supports local filesystem and S3"
268+
}
269+
266270
val typeChecker = CometScanTypeChecker(SCAN_NATIVE_ICEBERG_COMPAT)
267271
val schemaSupported =
268272
typeChecker.isSchemaSupported(scanExec.requiredSchema, fallbackReasons)
@@ -297,7 +301,8 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] {
297301
fallbackReasons += s"$SCAN_NATIVE_ICEBERG_COMPAT requires ${COMET_EXEC_ENABLED.key}=true"
298302
}
299303

300-
if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues) {
304+
if (cometExecEnabled && schemaSupported && partitionSchemaSupported && !knownIssues &&
305+
fallbackReasons.isEmpty) {
301306
logInfo(s"Auto scan mode selecting $SCAN_NATIVE_ICEBERG_COMPAT")
302307
SCAN_NATIVE_ICEBERG_COMPAT
303308
} else {

spark/src/main/scala/org/apache/spark/sql/comet/CometScanExec.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ case class CometScanExec(
7474
with ShimCometScanExec
7575
with CometPlan {
7676

77+
assert(scanImpl != CometConf.SCAN_AUTO)
78+
7779
// FIXME: ideally we should reuse wrapped.supportsColumnar, however that fails many tests
7880
override lazy val supportsColumnar: Boolean =
7981
relation.fileFormat.supportBatch(relation.sparkSession, schema)

0 commit comments

Comments
 (0)