Skip to content

Commit 456a24a

Browse files
committed
add native_iceberg_compat test
1 parent 8d219c9 commit 456a24a

File tree

2 files changed

+41
-26
lines changed

2 files changed

+41
-26
lines changed

spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromFakeHadoopFsSuite.scala

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import java.util.UUID
2626
import org.apache.commons.io.FileUtils
2727
import org.apache.spark.SparkConf
2828
import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode}
29-
import org.apache.spark.sql.comet.CometNativeScanExec
29+
import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
3030
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
3131
import org.apache.spark.sql.functions.{col, sum}
3232

@@ -62,16 +62,21 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP
6262
}
6363

6464
private def assertCometNativeScanOnFakeFs(df: DataFrame): Unit = {
65-
val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec =>
66-
p
65+
val scans = collect(df.queryExecution.executedPlan) {
66+
case p: CometNativeScanExec =>
67+
assert(
68+
p.nativeOp.getNativeScan
69+
.getFilePartitions(0)
70+
.getPartitionedFile(0)
71+
.getFilePath
72+
.startsWith(FakeHDFSFileSystem.PREFIX))
73+
p
74+
case p: CometScanExec if p.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
75+
assert(p.toString().contains(FakeHDFSFileSystem.PREFIX))
76+
p
6777
}
6878
assert(scans.size == 1)
69-
assert(
70-
scans.head.nativeOp.getNativeScan
71-
.getFilePartitions(0)
72-
.getPartitionedFile(0)
73-
.getFilePath
74-
.startsWith(FakeHDFSFileSystem.PREFIX))
79+
7580
}
7681

7782
test("test native_datafusion scan on fake fs") {
@@ -80,10 +85,13 @@ class ParquetReadFromFakeHadoopFsSuite extends CometTestBase with AdaptiveSparkP
8085
val testFilePath =
8186
s"${FakeHDFSFileSystem.PREFIX}${fake_root_dir.getAbsolutePath}/data/test-file.parquet"
8287
writeTestParquetFile(testFilePath)
83-
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
84-
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
85-
assertCometNativeScanOnFakeFs(df)
86-
assert(df.first().getLong(0) == 499500)
88+
Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach {
89+
scanImpl =>
90+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) {
91+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
92+
assertCometNativeScanOnFakeFs(df)
93+
assert(df.first().getLong(0) == 499500)
94+
}
8795
}
8896
}
8997
}

spark/src/test/scala/org/apache/comet/parquet/ParquetReadFromHdfsSuite.scala

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
package org.apache.comet.parquet
2121

2222
import org.apache.spark.sql.{CometTestBase, DataFrame, SaveMode}
23-
import org.apache.spark.sql.comet.CometNativeScanExec
23+
import org.apache.spark.sql.comet.{CometNativeScanExec, CometScanExec}
2424
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
2525
import org.apache.spark.sql.functions.{col, sum}
2626

@@ -50,16 +50,20 @@ class ParquetReadFromHdfsSuite
5050
}
5151

5252
private def assertCometNativeScanOnHDFS(df: DataFrame): Unit = {
53-
val scans = collect(df.queryExecution.executedPlan) { case p: CometNativeScanExec =>
54-
p
53+
val scans = collect(df.queryExecution.executedPlan) {
54+
case p: CometNativeScanExec =>
55+
assert(
56+
p.nativeOp.getNativeScan
57+
.getFilePartitions(0)
58+
.getPartitionedFile(0)
59+
.getFilePath
60+
.startsWith("hdfs://"))
61+
p
62+
case p: CometScanExec if p.scanImpl == CometConf.SCAN_NATIVE_ICEBERG_COMPAT =>
63+
assert(p.toString().contains("hdfs://"))
64+
p
5565
}
5666
assert(scans.size == 1)
57-
assert(
58-
scans.head.nativeOp.getNativeScan
59-
.getFilePartitions(0)
60-
.getPartitionedFile(0)
61-
.getFilePath
62-
.startsWith("hdfs://"))
6367
}
6468

6569
test("test native_datafusion scan on hdfs") {
@@ -69,10 +73,13 @@ class ParquetReadFromHdfsSuite
6973
{
7074
val testFilePath = dir.toString
7175
writeTestParquetFile(testFilePath)
72-
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> CometConf.SCAN_NATIVE_DATAFUSION) {
73-
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
74-
assertCometNativeScanOnHDFS(df)
75-
assert(df.first().getLong(0) == 499500)
76+
Seq(CometConf.SCAN_NATIVE_DATAFUSION, CometConf.SCAN_NATIVE_ICEBERG_COMPAT).foreach {
77+
scanImpl =>
78+
withSQLConf(CometConf.COMET_NATIVE_SCAN_IMPL.key -> scanImpl) {
79+
val df = spark.read.format("parquet").load(testFilePath).agg(sum(col("id")))
80+
assertCometNativeScanOnHDFS(df)
81+
assert(df.first().getLong(0) == 499500)
82+
}
7683
}
7784
}
7885
}

0 commit comments

Comments
 (0)