diff --git a/.github/actions/setup-iceberg-builder/action.yaml b/.github/actions/setup-iceberg-builder/action.yaml index eb8bc0e32c..5b9f2d59b5 100644 --- a/.github/actions/setup-iceberg-builder/action.yaml +++ b/.github/actions/setup-iceberg-builder/action.yaml @@ -21,12 +21,6 @@ inputs: iceberg-version: description: 'The Apache Iceberg version (e.g., 1.8.1) to build' required: true - scala-version: - description: 'The Scala short version (e.g., 2.13) to build' - required: true - spark-short-version: - description: 'The Apache Spark short version (e.g., 3.5) to build' - required: true runs: using: "composite" steps: @@ -43,8 +37,3 @@ runs: run: | cd apache-iceberg git apply ../dev/diffs/iceberg/${{inputs.iceberg-version}}.diff - - - name: Build Comet - shell: bash - run: | - PROFILES="-Pspark-${{inputs.spark-short-version}} -Pscala-${{inputs.scala-version}}" make release diff --git a/.github/actions/setup-spark-local-jar/action.yaml b/.github/actions/setup-spark-local-jar/action.yaml deleted file mode 100644 index 5334bf1ea6..0000000000 --- a/.github/actions/setup-spark-local-jar/action.yaml +++ /dev/null @@ -1,48 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -name: Setup Spark Local Jar -description: 'Build comet-patched Apache Spark for Iceberg Spark tests' -inputs: - spark-short-version: - description: 'The Apache Spark short version (e.g., 3.5) to build' - required: true - spark-version: - description: 'The Apache Spark version (e.g., 3.5.6) to build' - required: true - scala-version: - description: 'The Scala short version (e.g., 2.13) to build' - required: true -runs: - using: "composite" - steps: - - name: Clone Spark repo - uses: actions/checkout@v4 - with: - repository: apache/spark - path: apache-spark - ref: v${{inputs.spark-version}} - fetch-depth: 1 - - - name: Publish local Spark snapshot w/ Comet - shell: bash - run: | - cd apache-spark - git apply ../dev/diffs/${{inputs.spark-version}}.diff - ./dev/change-scala-version.sh ${{inputs.scala-version}} - ./build/mvn versions:set -DnewVersion=${{inputs.spark-version}}-SNAPSHOT - ./build/mvn -Pscala-${{inputs.scala-version}} -Phive -Phive-thriftserver -DskipTests -Denforcer.skip=true clean install diff --git a/.github/workflows/iceberg_spark_test.yml b/.github/workflows/iceberg_spark_test.yml index bf850e11bf..7f8f2f8b74 100644 --- a/.github/workflows/iceberg_spark_test.yml +++ b/.github/workflows/iceberg_spark_test.yml @@ -40,7 +40,7 @@ env: RUST_VERSION: stable jobs: - iceberg-spark-sql: + iceberg-spark: if: contains(github.event.pull_request.title, '[iceberg]') strategy: matrix: @@ -50,7 +50,7 @@ jobs: spark-version: [{short: '3.5', full: '3.5.6'}] scala-version: ['2.13'] fail-fast: false - name: iceberg-spark-sql/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }} + name: iceberg-spark/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }} runs-on: ${{ matrix.os }} container: image: amd64/rust @@ -63,24 +63,96 @@ jobs: with: rust-version: ${{env.RUST_VERSION}} jdk-version: ${{ matrix.java-version }} + - name: Build Comet + shell: bash + run: | + PROFILES="-Pspark-${{matrix.spark-version.short}} -Pscala-${{matrix.scala-version}}" make release - name: Setup Iceberg uses: ./.github/actions/setup-iceberg-builder with: iceberg-version: ${{ matrix.iceberg-version.full }} - scala-version: ${{ matrix.scala-version }} - spark-short-version: ${{ matrix.spark-version.short }} - - name: Build local Spark jar with comet patch - uses: ./.github/actions/setup-spark-local-jar - with: - spark-short-version: ${{ matrix.spark-version.short }} - spark-version: ${{ matrix.spark-version.full }} - scala-version: ${{ matrix.scala-version }} - name: Run Iceberg Spark tests run: | cd apache-iceberg rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ - :iceberg-spark:iceberg-spark-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:check \ - :iceberg-spark:iceberg-spark-extensions-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:check \ - :iceberg-spark:iceberg-spark-runtime-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:check \ + :iceberg-spark:iceberg-spark-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \ -Pquick=true -x javadoc + + iceberg-spark-extensions: + if: contains(github.event.pull_request.title, '[iceberg]') + strategy: + matrix: + os: [ubuntu-24.04] + java-version: [11, 17] + iceberg-version: [{short: '1.8', full: '1.8.1'}] + spark-version: [{short: '3.5', full: '3.5.6'}] + scala-version: ['2.13'] + fail-fast: false + name: iceberg-spark-extensions/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }} + runs-on: ${{ matrix.os }} + container: + image: amd64/rust + env: + SPARK_LOCAL_IP: localhost + steps: + - uses: actions/checkout@v4 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java-version }} + - name: Build Comet + shell: bash + run: | + PROFILES="-Pspark-${{matrix.spark-version.short}} -Pscala-${{matrix.scala-version}}" make release + - name: Setup Iceberg + uses: ./.github/actions/setup-iceberg-builder + with: + iceberg-version: ${{ matrix.iceberg-version.full }} + - name: Run Iceberg Spark extensions tests + run: | + cd apache-iceberg + rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups + ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ + :iceberg-spark:iceberg-spark-extensions-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:test \ + -Pquick=true -x javadoc + + iceberg-spark-runtime: + if: contains(github.event.pull_request.title, '[iceberg]') + strategy: + matrix: + os: [ubuntu-24.04] + java-version: [11, 17] + iceberg-version: [{short: '1.8', full: '1.8.1'}] + spark-version: [{short: '3.5', full: '3.5.6'}] + scala-version: ['2.13'] + fail-fast: false + name: iceberg-spark-runtime/${{ matrix.os }}/iceberg-${{ matrix.iceberg-version.full }}/spark-${{ matrix.spark-version.full }}/scala-${{ matrix.scala-version }}/java-${{ matrix.java-version }} + runs-on: ${{ matrix.os }} + container: + image: amd64/rust + env: + SPARK_LOCAL_IP: localhost + steps: + - uses: actions/checkout@v4 + - name: Setup Rust & Java toolchain + uses: ./.github/actions/setup-builder + with: + rust-version: ${{env.RUST_VERSION}} + jdk-version: ${{ matrix.java-version }} + - name: Build Comet + shell: bash + run: | + PROFILES="-Pspark-${{matrix.spark-version.short}} -Pscala-${{matrix.scala-version}}" make release + - name: Setup Iceberg + uses: ./.github/actions/setup-iceberg-builder + with: + iceberg-version: ${{ matrix.iceberg-version.full }} + - name: Run Iceberg Spark runtime tests + run: | + cd apache-iceberg + rm -rf /root/.m2/repository/org/apache/parquet # somehow parquet cache requires cleanups + ENABLE_COMET=true ./gradlew -DsparkVersions=${{ matrix.spark-version.short }} -DscalaVersion=${{ matrix.scala-version }} -DflinkVersions= -DkafkaVersions= \ + :iceberg-spark:iceberg-spark-runtime-${{ matrix.spark-version.short }}_${{ matrix.scala-version }}:integrationTest \ + -Pquick=true -x javadoc \ No newline at end of file diff --git a/dev/diffs/iceberg/1.8.1.diff b/dev/diffs/iceberg/1.8.1.diff index 7c6fa0ad15..3b9dab9aec 100644 --- a/dev/diffs/iceberg/1.8.1.diff +++ b/dev/diffs/iceberg/1.8.1.diff @@ -1,160 +1,664 @@ +diff --git a/build.gradle b/build.gradle +index 7327b38..7967109 100644 +--- a/build.gradle ++++ b/build.gradle +@@ -780,6 +780,13 @@ project(':iceberg-parquet') { + implementation project(':iceberg-core') + implementation project(':iceberg-common') + ++ implementation("org.apache.datafusion:comet-spark-spark${sparkVersionsString}_${scalaVersion}:${libs.versions.comet.get()}") { ++ exclude group: 'org.apache.arrow' ++ exclude group: 'org.apache.parquet' ++ exclude group: 'org.apache.spark' ++ exclude group: 'org.apache.iceberg' ++ } ++ + implementation(libs.parquet.avro) { + exclude group: 'org.apache.avro', module: 'avro' + // already shaded by Parquet diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml -index 04ffa8f..d4107be 100644 +index 04ffa8f..cc0099c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml -@@ -81,7 +81,7 @@ slf4j = "2.0.16" +@@ -34,6 +34,7 @@ azuresdk-bom = "1.2.31" + awssdk-s3accessgrants = "2.3.0" + caffeine = "2.9.3" + calcite = "1.10.0" ++comet = "0.10.0-SNAPSHOT" + datasketches = "6.2.0" + delta-standalone = "3.3.0" + delta-spark = "3.3.0" +@@ -81,7 +82,7 @@ slf4j = "2.0.16" snowflake-jdbc = "3.22.0" spark-hive33 = "3.3.4" spark-hive34 = "3.4.4" -spark-hive35 = "3.5.4" -+spark-hive35 = "3.5.6-SNAPSHOT" ++spark-hive35 = "3.5.6" sqlite-jdbc = "3.48.0.0" testcontainers = "1.20.4" tez010 = "0.10.4" -diff --git a/spark/v3.4/build.gradle b/spark/v3.4/build.gradle -index 6eb26e8..50cefce 100644 ---- a/spark/v3.4/build.gradle -+++ b/spark/v3.4/build.gradle -@@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { - exclude group: 'org.roaringbitmap' - } - -- compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" -+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.10.0-SNAPSHOT" - - implementation libs.parquet.column - implementation libs.parquet.hadoop -@@ -185,7 +185,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer - testImplementation libs.avro.avro - testImplementation libs.parquet.hadoop - testImplementation libs.junit.vintage.engine -- testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" -+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.10.0-SNAPSHOT" - - // Required because we remove antlr plugin dependencies from the compile configuration, see note above - runtimeOnly libs.antlr.runtime -@@ -260,6 +260,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio - integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') - integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') - integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') -+ integrationImplementation project(path: ':iceberg-parquet') -+ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.10.0-SNAPSHOT" - - // runtime dependencies for running Hive Catalog based integration test - integrationRuntimeOnly project(':iceberg-hive-metastore') -@@ -297,8 +299,8 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio - relocate 'org.apache.avro', 'org.apache.iceberg.shaded.org.apache.avro' - relocate 'avro.shaded', 'org.apache.iceberg.shaded.org.apache.avro.shaded' - relocate 'com.thoughtworks.paranamer', 'org.apache.iceberg.shaded.com.thoughtworks.paranamer' -- relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' -- relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' -+// relocate 'org.apache.parquet', 'org.apache.iceberg.shaded.org.apache.parquet' -+// relocate 'shaded.parquet', 'org.apache.iceberg.shaded.org.apache.parquet.shaded' - relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' - relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' - relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' -diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java -index 0ca1236..87daef4 100644 ---- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java -+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java -@@ -29,7 +29,7 @@ public class SparkSQLProperties { - - // Controls which Parquet reader implementation to use - public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; -- public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; -+ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET; - - // Controls whether reading/writing timestamps without timezones is allowed - @Deprecated -diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java -index 4794863..0be31c1 100644 ---- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java -+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java -@@ -20,11 +20,11 @@ package org.apache.iceberg.spark.data.vectorized; +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java +new file mode 100644 +index 0000000..ddf6c7d +--- /dev/null ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometTypeUtils.java +@@ -0,0 +1,255 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.apache.iceberg.parquet; ++ ++import java.util.Map; ++import org.apache.comet.parquet.ParquetColumnSpec; ++import org.apache.iceberg.relocated.com.google.common.collect.Maps; ++import org.apache.parquet.column.ColumnDescriptor; ++import org.apache.parquet.schema.LogicalTypeAnnotation; ++import org.apache.parquet.schema.PrimitiveType; ++import org.apache.parquet.schema.Type; ++import org.apache.parquet.schema.Types; ++ ++public class CometTypeUtils { ++ ++ private CometTypeUtils() {} ++ ++ public static ParquetColumnSpec descriptorToParquetColumnSpec(ColumnDescriptor descriptor) { ++ ++ String[] path = descriptor.getPath(); ++ PrimitiveType primitiveType = descriptor.getPrimitiveType(); ++ String physicalType = primitiveType.getPrimitiveTypeName().name(); ++ ++ int typeLength = ++ primitiveType.getPrimitiveTypeName() == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY ++ ? primitiveType.getTypeLength() ++ : 0; ++ ++ boolean isRepeated = primitiveType.getRepetition() == Type.Repetition.REPEATED; ++ ++ // ToDo: extract this into a Util method ++ String logicalTypeName = null; ++ Map logicalTypeParams = Maps.newHashMap(); ++ LogicalTypeAnnotation logicalType = primitiveType.getLogicalTypeAnnotation(); ++ ++ if (logicalType != null) { ++ logicalTypeName = logicalType.getClass().getSimpleName(); ++ ++ // Handle specific logical types ++ if (logicalType instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.DecimalLogicalTypeAnnotation decimal = ++ (LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("precision", String.valueOf(decimal.getPrecision())); ++ logicalTypeParams.put("scale", String.valueOf(decimal.getScale())); ++ } else if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.TimestampLogicalTypeAnnotation timestamp = ++ (LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(timestamp.isAdjustedToUTC())); ++ logicalTypeParams.put("unit", timestamp.getUnit().name()); ++ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.TimeLogicalTypeAnnotation time = ++ (LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("isAdjustedToUTC", String.valueOf(time.isAdjustedToUTC())); ++ logicalTypeParams.put("unit", time.getUnit().name()); ++ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { ++ LogicalTypeAnnotation.IntLogicalTypeAnnotation intType = ++ (LogicalTypeAnnotation.IntLogicalTypeAnnotation) logicalType; ++ logicalTypeParams.put("isSigned", String.valueOf(intType.isSigned())); ++ logicalTypeParams.put("bitWidth", String.valueOf(intType.getBitWidth())); ++ } ++ } ++ ++ return new ParquetColumnSpec( ++ 1, // ToDo: pass in the correct id ++ path, ++ physicalType, ++ typeLength, ++ isRepeated, ++ descriptor.getMaxDefinitionLevel(), ++ descriptor.getMaxRepetitionLevel(), ++ logicalTypeName, ++ logicalTypeParams); ++ } ++ ++ public static ColumnDescriptor buildColumnDescriptor(ParquetColumnSpec columnSpec) { ++ PrimitiveType.PrimitiveTypeName primType = ++ PrimitiveType.PrimitiveTypeName.valueOf(columnSpec.getPhysicalType()); ++ ++ Type.Repetition repetition; ++ if (columnSpec.getMaxRepetitionLevel() > 0) { ++ repetition = Type.Repetition.REPEATED; ++ } else if (columnSpec.getMaxDefinitionLevel() > 0) { ++ repetition = Type.Repetition.OPTIONAL; ++ } else { ++ repetition = Type.Repetition.REQUIRED; ++ } ++ ++ String name = columnSpec.getPath()[columnSpec.getPath().length - 1]; ++ // Reconstruct the logical type from parameters ++ LogicalTypeAnnotation logicalType = null; ++ if (columnSpec.getLogicalTypeName() != null) { ++ logicalType = ++ reconstructLogicalType( ++ columnSpec.getLogicalTypeName(), columnSpec.getLogicalTypeParams()); ++ } ++ ++ PrimitiveType primitiveType; ++ if (primType == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { ++ primitiveType = ++ org.apache.parquet.schema.Types.primitive(primType, repetition) ++ .length(columnSpec.getTypeLength()) ++ .as(logicalType) ++ .id(columnSpec.getFieldId()) ++ .named(name); ++ } else { ++ primitiveType = ++ Types.primitive(primType, repetition) ++ .as(logicalType) ++ .id(columnSpec.getFieldId()) ++ .named(name); ++ } ++ ++ return new ColumnDescriptor( ++ columnSpec.getPath(), ++ primitiveType, ++ columnSpec.getMaxRepetitionLevel(), ++ columnSpec.getMaxDefinitionLevel()); ++ } ++ ++ private static LogicalTypeAnnotation reconstructLogicalType( ++ String logicalTypeName, java.util.Map params) { ++ ++ switch (logicalTypeName) { ++ // MAP ++ case "MapLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.mapType(); ++ ++ // LIST ++ case "ListLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.listType(); ++ ++ // STRING ++ case "StringLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.stringType(); ++ ++ // MAP_KEY_VALUE ++ case "MapKeyValueLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.MapKeyValueTypeAnnotation.getInstance(); ++ ++ // ENUM ++ case "EnumLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.enumType(); ++ ++ // DECIMAL ++ case "DecimalLogicalTypeAnnotation": ++ if (!params.containsKey("scale") || !params.containsKey("precision")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for DecimalLogicalTypeAnnotation: " + params); ++ } ++ int scale = Integer.parseInt(params.get("scale")); ++ int precision = Integer.parseInt(params.get("precision")); ++ return LogicalTypeAnnotation.decimalType(scale, precision); ++ ++ // DATE ++ case "DateLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.dateType(); ++ ++ // TIME ++ case "TimeLogicalTypeAnnotation": ++ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for TimeLogicalTypeAnnotation: " + params); ++ } ++ ++ boolean isUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); ++ String timeUnitStr = params.get("unit"); ++ ++ LogicalTypeAnnotation.TimeUnit timeUnit; ++ switch (timeUnitStr) { ++ case "MILLIS": ++ timeUnit = LogicalTypeAnnotation.TimeUnit.MILLIS; ++ break; ++ case "MICROS": ++ timeUnit = LogicalTypeAnnotation.TimeUnit.MICROS; ++ break; ++ case "NANOS": ++ timeUnit = LogicalTypeAnnotation.TimeUnit.NANOS; ++ break; ++ default: ++ throw new IllegalArgumentException("Unknown time unit: " + timeUnitStr); ++ } ++ return LogicalTypeAnnotation.timeType(isUTC, timeUnit); ++ ++ // TIMESTAMP ++ case "TimestampLogicalTypeAnnotation": ++ if (!params.containsKey("isAdjustedToUTC") || !params.containsKey("unit")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for TimestampLogicalTypeAnnotation: " + params); ++ } ++ boolean isAdjustedToUTC = Boolean.parseBoolean(params.get("isAdjustedToUTC")); ++ String unitStr = params.get("unit"); ++ ++ LogicalTypeAnnotation.TimeUnit unit; ++ switch (unitStr) { ++ case "MILLIS": ++ unit = LogicalTypeAnnotation.TimeUnit.MILLIS; ++ break; ++ case "MICROS": ++ unit = LogicalTypeAnnotation.TimeUnit.MICROS; ++ break; ++ case "NANOS": ++ unit = LogicalTypeAnnotation.TimeUnit.NANOS; ++ break; ++ default: ++ throw new IllegalArgumentException("Unknown timestamp unit: " + unitStr); ++ } ++ return LogicalTypeAnnotation.timestampType(isAdjustedToUTC, unit); ++ ++ // INTEGER ++ case "IntLogicalTypeAnnotation": ++ if (!params.containsKey("isSigned") || !params.containsKey("bitWidth")) { ++ throw new IllegalArgumentException( ++ "Missing required parameters for IntLogicalTypeAnnotation: " + params); ++ } ++ boolean isSigned = Boolean.parseBoolean(params.get("isSigned")); ++ int bitWidth = Integer.parseInt(params.get("bitWidth")); ++ return LogicalTypeAnnotation.intType(bitWidth, isSigned); ++ ++ // JSON ++ case "JsonLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.jsonType(); ++ ++ // BSON ++ case "BsonLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.bsonType(); ++ ++ // UUID ++ case "UUIDLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.uuidType(); ++ ++ // INTERVAL ++ case "IntervalLogicalTypeAnnotation": ++ return LogicalTypeAnnotation.IntervalLogicalTypeAnnotation.getInstance(); ++ ++ default: ++ throw new IllegalArgumentException("Unknown logical type: " + logicalTypeName); ++ } ++ } ++} +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java +new file mode 100644 +index 0000000..88b195b +--- /dev/null ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/CometVectorizedParquetReader.java +@@ -0,0 +1,255 @@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, ++ * software distributed under the License is distributed on an ++ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ++ * KIND, either express or implied. See the License for the ++ * specific language governing permissions and limitations ++ * under the License. ++ */ ++package org.apache.iceberg.parquet; ++ ++import java.io.IOException; ++import java.io.UncheckedIOException; ++import java.nio.ByteBuffer; ++import java.util.List; ++import java.util.Map; ++import java.util.NoSuchElementException; ++import java.util.function.Function; ++import org.apache.comet.parquet.FileReader; ++import org.apache.comet.parquet.ParquetColumnSpec; ++import org.apache.comet.parquet.ReadOptions; ++import org.apache.comet.parquet.RowGroupReader; ++import org.apache.comet.parquet.WrappedInputFile; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.iceberg.Schema; ++import org.apache.iceberg.exceptions.RuntimeIOException; ++import org.apache.iceberg.expressions.Expression; ++import org.apache.iceberg.expressions.Expressions; ++import org.apache.iceberg.io.CloseableGroup; ++import org.apache.iceberg.io.CloseableIterable; ++import org.apache.iceberg.io.CloseableIterator; ++import org.apache.iceberg.io.InputFile; ++import org.apache.iceberg.mapping.NameMapping; ++import org.apache.iceberg.relocated.com.google.common.collect.Lists; ++import org.apache.iceberg.util.ByteBuffers; ++import org.apache.parquet.ParquetReadOptions; ++import org.apache.parquet.column.ColumnDescriptor; ++import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; ++import org.apache.parquet.hadoop.metadata.ColumnPath; ++import org.apache.parquet.schema.MessageType; ++ ++public class CometVectorizedParquetReader extends CloseableGroup ++ implements CloseableIterable { ++ private final InputFile input; ++ private final ParquetReadOptions options; ++ private final Schema expectedSchema; ++ private final Function> batchReaderFunc; ++ private final Expression filter; ++ private final boolean reuseContainers; ++ private final boolean caseSensitive; ++ private final int batchSize; ++ private final NameMapping nameMapping; ++ private final Map properties; ++ private Long start = null; ++ private Long length = null; ++ private ByteBuffer fileEncryptionKey = null; ++ private ByteBuffer fileAADPrefix = null; ++ ++ public CometVectorizedParquetReader( ++ InputFile input, ++ Schema expectedSchema, ++ ParquetReadOptions options, ++ Function> readerFunc, ++ NameMapping nameMapping, ++ Expression filter, ++ boolean reuseContainers, ++ boolean caseSensitive, ++ int maxRecordsPerBatch, ++ Map properties, ++ Long start, ++ Long length, ++ ByteBuffer fileEncryptionKey, ++ ByteBuffer fileAADPrefix) { ++ this.input = input; ++ this.expectedSchema = expectedSchema; ++ this.options = options; ++ this.batchReaderFunc = readerFunc; ++ // replace alwaysTrue with null to avoid extra work evaluating a trivial filter ++ this.filter = filter == Expressions.alwaysTrue() ? null : filter; ++ this.reuseContainers = reuseContainers; ++ this.caseSensitive = caseSensitive; ++ this.batchSize = maxRecordsPerBatch; ++ this.nameMapping = nameMapping; ++ this.properties = properties; ++ this.start = start; ++ this.length = length; ++ this.fileEncryptionKey = fileEncryptionKey; ++ this.fileAADPrefix = fileAADPrefix; ++ } ++ ++ private ReadConf conf = null; ++ ++ private ReadConf init() { ++ if (conf == null) { ++ ReadConf readConf = ++ new ReadConf( ++ input, ++ options, ++ expectedSchema, ++ filter, ++ null, ++ batchReaderFunc, ++ nameMapping, ++ reuseContainers, ++ caseSensitive, ++ batchSize); ++ this.conf = readConf.copy(); ++ return readConf; ++ } ++ return conf; ++ } ++ ++ @Override ++ public CloseableIterator iterator() { ++ FileIterator iter = ++ new FileIterator<>(init(), properties, start, length, fileEncryptionKey, fileAADPrefix); ++ addCloseable(iter); ++ return iter; ++ } ++ ++ private static class FileIterator implements CloseableIterator { ++ // private final ParquetFileReader reader; ++ private final boolean[] shouldSkip; ++ private final VectorizedReader model; ++ private final long totalValues; ++ private final int batchSize; ++ private final List> columnChunkMetadata; ++ private final boolean reuseContainers; ++ private int nextRowGroup = 0; ++ private long nextRowGroupStart = 0; ++ private long valuesRead = 0; ++ private T last = null; ++ private final FileReader cometReader; ++ ++ FileIterator( ++ ReadConf conf, ++ Map properties, ++ Long start, ++ Long length, ++ ByteBuffer fileEncryptionKey, ++ ByteBuffer fileAADPrefix) { ++ this.shouldSkip = conf.shouldSkip(); ++ this.totalValues = conf.totalValues(); ++ this.reuseContainers = conf.reuseContainers(); ++ this.model = conf.vectorizedModel(); ++ this.batchSize = conf.batchSize(); ++ this.model.setBatchSize(this.batchSize); ++ this.columnChunkMetadata = conf.columnChunkMetadataForRowGroups(); ++ this.cometReader = ++ newCometReader( ++ conf.file(), ++ conf.projection(), ++ properties, ++ start, ++ length, ++ fileEncryptionKey, ++ fileAADPrefix); ++ } ++ ++ private FileReader newCometReader( ++ InputFile file, ++ MessageType projection, ++ Map properties, ++ Long start, ++ Long length, ++ ByteBuffer fileEncryptionKey, ++ ByteBuffer fileAADPrefix) { ++ try { ++ ReadOptions cometOptions = ReadOptions.builder(new Configuration()).build(); ++ ++ FileReader fileReader = ++ new FileReader( ++ new WrappedInputFile(file), ++ cometOptions, ++ properties, ++ start, ++ length, ++ ByteBuffers.toByteArray(fileEncryptionKey), ++ ByteBuffers.toByteArray(fileAADPrefix)); ++ ++ List columnDescriptors = projection.getColumns(); ++ ++ List specs = Lists.newArrayList(); ++ ++ for (ColumnDescriptor descriptor : columnDescriptors) { ++ ParquetColumnSpec spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); ++ specs.add(spec); ++ } ++ ++ fileReader.setRequestedSchemaFromSpecs(specs); ++ return fileReader; ++ } catch (IOException e) { ++ throw new UncheckedIOException("Failed to open Parquet file: " + file.location(), e); ++ } ++ } ++ ++ @Override ++ public boolean hasNext() { ++ return valuesRead < totalValues; ++ } ++ ++ @Override ++ public T next() { ++ if (!hasNext()) { ++ throw new NoSuchElementException(); ++ } ++ if (valuesRead >= nextRowGroupStart) { ++ advance(); ++ } ++ ++ // batchSize is an integer, so casting to integer is safe ++ int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); ++ if (reuseContainers) { ++ this.last = model.read(last, numValuesToRead); ++ } else { ++ this.last = model.read(null, numValuesToRead); ++ } ++ valuesRead += numValuesToRead; ++ ++ return last; ++ } ++ ++ private void advance() { ++ while (shouldSkip[nextRowGroup]) { ++ nextRowGroup += 1; ++ cometReader.skipNextRowGroup(); ++ } ++ RowGroupReader pages; ++ try { ++ pages = cometReader.readNextRowGroup(); ++ } catch (IOException e) { ++ throw new RuntimeIOException(e); ++ } ++ ++ model.setRowGroupInfo(pages, columnChunkMetadata.get(nextRowGroup)); ++ nextRowGroupStart += pages.getRowCount(); ++ nextRowGroup += 1; ++ } ++ ++ @Override ++ public void close() throws IOException { ++ model.close(); ++ cometReader.close(); ++ } ++ } ++} +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +index 2c37a52..3442cfc 100644 +--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +@@ -1075,6 +1075,7 @@ public class Parquet { + private NameMapping nameMapping = null; + private ByteBuffer fileEncryptionKey = null; + private ByteBuffer fileAADPrefix = null; ++ private boolean isComet; - import java.io.IOException; - import java.util.Map; -+import org.apache.comet.CometSchemaImporter; - import org.apache.comet.parquet.AbstractColumnReader; - import org.apache.comet.parquet.ColumnReader; - import org.apache.comet.parquet.TypeUtil; - import org.apache.comet.parquet.Utils; --import org.apache.comet.shaded.arrow.c.CometSchemaImporter; - import org.apache.comet.shaded.arrow.memory.RootAllocator; - import org.apache.iceberg.parquet.VectorizedReader; - import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -@@ -96,7 +96,7 @@ class CometColumnReader implements VectorizedReader { + private ReadBuilder(InputFile file) { + this.file = file; +@@ -1172,6 +1173,11 @@ public class Parquet { + return this; } - this.importer = new CometSchemaImporter(new RootAllocator()); -- this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); -+ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); - this.initialized = true; - } - -diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java -index a361a7f..9021cd5 100644 ---- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java -+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java -@@ -24,6 +24,7 @@ import java.util.Objects; - import java.util.Set; - import java.util.function.Supplier; - import java.util.stream.Collectors; -+import org.apache.comet.parquet.SupportsComet; - import org.apache.iceberg.DeleteFile; - import org.apache.iceberg.FileContent; - import org.apache.iceberg.FileScanTask; -@@ -63,7 +64,7 @@ import org.slf4j.Logger; - import org.slf4j.LoggerFactory; - - class SparkBatchQueryScan extends SparkPartitioningAwareScan -- implements SupportsRuntimeV2Filtering { -+ implements SupportsRuntimeV2Filtering, SupportsComet { ++ public ReadBuilder enableComet(boolean enableComet) { ++ this.isComet = enableComet; ++ return this; ++ } ++ + public ReadBuilder withFileEncryptionKey(ByteBuffer encryptionKey) { + this.fileEncryptionKey = encryptionKey; + return this; +@@ -1182,7 +1188,7 @@ public class Parquet { + return this; + } - private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); +- @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) ++ @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity", "MethodLength"}) + public CloseableIterable build() { + FileDecryptionProperties fileDecryptionProperties = null; + if (fileEncryptionKey != null) { +@@ -1234,16 +1240,35 @@ public class Parquet { + } -@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan - runtimeFilterExpressions, - caseSensitive()); + if (batchedReaderFunc != null) { +- return new VectorizedParquetReader<>( +- file, +- schema, +- options, +- batchedReaderFunc, +- mapping, +- filter, +- reuseContainers, +- caseSensitive, +- maxRecordsPerBatch); ++ if (isComet) { ++ LOG.info("Comet enabled"); ++ return new CometVectorizedParquetReader<>( ++ file, ++ schema, ++ options, ++ batchedReaderFunc, ++ mapping, ++ filter, ++ reuseContainers, ++ caseSensitive, ++ maxRecordsPerBatch, ++ properties, ++ start, ++ length, ++ fileEncryptionKey, ++ fileAADPrefix); ++ } else { ++ return new VectorizedParquetReader<>( ++ file, ++ schema, ++ options, ++ batchedReaderFunc, ++ mapping, ++ filter, ++ reuseContainers, ++ caseSensitive, ++ maxRecordsPerBatch); ++ } + } else { + return new org.apache.iceberg.parquet.ParquetReader<>( + file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive); +diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +index 1fb2372..142e5fb 100644 +--- a/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java ++++ b/parquet/src/main/java/org/apache/iceberg/parquet/ReadConf.java +@@ -157,6 +157,14 @@ class ReadConf { + return newReader; } + ++ InputFile file() { ++ return file; ++ } + -+ @Override -+ public boolean isCometEnabled() { -+ return true; ++ MessageType projection() { ++ return projection; + } - } -diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java -index 47a0e87..531b7ce 100644 ---- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java -+++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java -@@ -41,6 +41,7 @@ import org.apache.spark.sql.internal.SQLConf; - import org.junit.After; - import org.junit.Assert; - import org.junit.Before; -+import org.junit.Ignore; - import org.junit.Test; - - public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog { -@@ -214,7 +215,7 @@ public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog { - Assert.assertEquals(4, fields.size()); - } - -- @Test -+ @Ignore - public void testMergeSchemaIgnoreCastingLongToInt() throws Exception { - sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", -@@ -254,7 +255,7 @@ public class TestDataFrameWriterV2 extends SparkTestBaseWithCatalog { - assertThat(idField.type().typeId()).isEqualTo(Type.TypeID.LONG); - } - -- @Test -+ @Ignore - public void testMergeSchemaIgnoreCastingDoubleToFloat() throws Exception { - removeTables(); - sql("CREATE TABLE %s (id double, data string) USING iceberg", tableName); ++ + ParquetValueReader model() { + return model; + } diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle -index e2d2c7a..d23acef 100644 +index e2d2c7a..f64232d 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -75,7 +75,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { @@ -162,17 +666,16 @@ index e2d2c7a..d23acef 100644 } - compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" -+ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.10.0-SNAPSHOT" ++ compileOnly "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" implementation libs.parquet.column implementation libs.parquet.hadoop -@@ -182,8 +182,8 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer - +@@ -183,7 +183,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer testImplementation libs.avro.avro testImplementation libs.parquet.hadoop -+ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.10.0-SNAPSHOT" testImplementation libs.awaitility - testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.5.0" ++ testImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" // Required because we remove antlr plugin dependencies from the compile configuration, see note above runtimeOnly libs.antlr.runtime @@ -180,7 +683,7 @@ index e2d2c7a..d23acef 100644 integrationImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') integrationImplementation project(path: ":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts') -+ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:0.10.0-SNAPSHOT" ++ integrationImplementation "org.apache.datafusion:comet-spark-spark${sparkMajorVersion}_${scalaVersion}:${libs.versions.comet.get()}" // runtime dependencies for running Hive Catalog based integration test integrationRuntimeOnly project(':iceberg-hive-metastore') @@ -195,103 +698,1009 @@ index e2d2c7a..d23acef 100644 relocate 'org.apache.orc', 'org.apache.iceberg.shaded.org.apache.orc' relocate 'io.airlift', 'org.apache.iceberg.shaded.io.airlift' relocate 'org.apache.hc.client5', 'org.apache.iceberg.shaded.org.apache.hc.client5' -diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java -index d6c16bb..123a300 100644 ---- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java -+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java -@@ -29,7 +29,7 @@ public class SparkSQLProperties { - - // Controls which Parquet reader implementation to use - public static final String PARQUET_READER_TYPE = "spark.sql.iceberg.parquet.reader-type"; -- public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.ICEBERG; -+ public static final ParquetReaderType PARQUET_READER_TYPE_DEFAULT = ParquetReaderType.COMET; - // Controls whether to perform the nullability check during writes - public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability"; - public static final boolean CHECK_NULLABILITY_DEFAULT = true; +diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +index 578845e..2b16f8e 100644 +--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java ++++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/ExtensionsTestBase.java +@@ -57,6 +57,14 @@ public abstract class ExtensionsTestBase extends CatalogTestBase { + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") + .config( + SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), String.valueOf(RANDOM.nextBoolean())) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java +index ade19de..255c416 100644 +--- a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java ++++ b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCallStatementParser.java +@@ -56,6 +56,14 @@ public class TestCallStatementParser { + .master("local[2]") + .config("spark.sql.extensions", IcebergSparkSessionExtensions.class.getName()) + .config("spark.extra.prop", "value") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + TestCallStatementParser.parser = spark.sessionState().sqlParser(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java +index 64edb10..ea4d76f 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/DeleteOrphanFilesBenchmark.java +@@ -179,6 +179,14 @@ public class DeleteOrphanFilesBenchmark { + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", catalogWarehouse()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local"); + spark = builder.getOrCreate(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java +index a5d0456..5aec87e 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/action/IcebergSortCompactionBenchmark.java +@@ -392,6 +392,14 @@ public class IcebergSortCompactionBenchmark { + "spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", getCatalogWarehouse()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[*]"); + spark = builder.getOrCreate(); + Configuration sparkHadoopConf = spark.sessionState().newHadoopConf(); +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java +index c6794e4..60bf66c 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVReaderBenchmark.java +@@ -239,6 +239,14 @@ public class DVReaderBenchmark { + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[*]") + .getOrCreate(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java +index ac74fb5..3fca3bd 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/DVWriterBenchmark.java +@@ -223,6 +223,14 @@ public class DVWriterBenchmark { + .config("spark.sql.catalog.spark_catalog", SparkSessionCatalog.class.getName()) + .config("spark.sql.catalog.spark_catalog.type", "hadoop") + .config("spark.sql.catalog.spark_catalog.warehouse", newWarehouseDir()) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .master("local[*]") + .getOrCreate(); + } +diff --git a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +index 68c537e..2b821f8 100644 +--- a/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java ++++ b/spark/v3.5/spark/src/jmh/java/org/apache/iceberg/spark/source/IcebergSourceBenchmark.java +@@ -94,7 +94,17 @@ public abstract class IcebergSourceBenchmark { + } + + protected void setupSpark(boolean enableDictionaryEncoding) { +- SparkSession.Builder builder = SparkSession.builder().config("spark.ui.enabled", false); ++ SparkSession.Builder builder = ++ SparkSession.builder() ++ .config("spark.ui.enabled", false) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true"); + if (!enableDictionaryEncoding) { + builder + .config("parquet.dictionary.page.size", "1") diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java -index 4794863..0be31c1 100644 +index 4794863..8bb508f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnReader.java -@@ -20,11 +20,11 @@ package org.apache.iceberg.spark.data.vectorized; +@@ -20,21 +20,25 @@ package org.apache.iceberg.spark.data.vectorized; import java.io.IOException; import java.util.Map; ++import org.apache.comet.CometConf; +import org.apache.comet.CometSchemaImporter; import org.apache.comet.parquet.AbstractColumnReader; import org.apache.comet.parquet.ColumnReader; ++import org.apache.comet.parquet.ParquetColumnSpec; ++import org.apache.comet.parquet.RowGroupReader; import org.apache.comet.parquet.TypeUtil; import org.apache.comet.parquet.Utils; -import org.apache.comet.shaded.arrow.c.CometSchemaImporter; import org.apache.comet.shaded.arrow.memory.RootAllocator; ++import org.apache.iceberg.parquet.CometTypeUtils; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -@@ -96,7 +96,7 @@ class CometColumnReader implements VectorizedReader { + import org.apache.iceberg.spark.SparkSchemaUtil; + import org.apache.iceberg.types.Types; + import org.apache.parquet.column.ColumnDescriptor; + import org.apache.parquet.column.page.PageReadStore; +-import org.apache.parquet.column.page.PageReader; + import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; + import org.apache.parquet.hadoop.metadata.ColumnPath; ++import org.apache.spark.sql.internal.SQLConf; + import org.apache.spark.sql.types.DataType; + import org.apache.spark.sql.types.Metadata; + import org.apache.spark.sql.types.StructField; +@@ -46,23 +50,28 @@ class CometColumnReader implements VectorizedReader { + + private final ColumnDescriptor descriptor; + private final DataType sparkType; ++ private final int fieldId; + + // The delegated ColumnReader from Comet side + private AbstractColumnReader delegate; + private boolean initialized = false; + private int batchSize = DEFAULT_BATCH_SIZE; + private CometSchemaImporter importer; ++ private ParquetColumnSpec spec; + +- CometColumnReader(DataType sparkType, ColumnDescriptor descriptor) { ++ CometColumnReader(DataType sparkType, ColumnDescriptor descriptor, int fieldId) { + this.sparkType = sparkType; + this.descriptor = descriptor; ++ this.fieldId = fieldId; + } + + CometColumnReader(Types.NestedField field) { + DataType dataType = SparkSchemaUtil.convert(field.type()); + StructField structField = new StructField(field.name(), dataType, false, Metadata.empty()); + this.sparkType = dataType; +- this.descriptor = TypeUtil.convertToParquet(structField); ++ this.descriptor = ++ CometTypeUtils.buildColumnDescriptor(TypeUtil.convertToParquetSpec(structField)); ++ this.fieldId = field.fieldId(); + } + + public AbstractColumnReader delegate() { +@@ -96,7 +105,26 @@ class CometColumnReader implements VectorizedReader { } this.importer = new CometSchemaImporter(new RootAllocator()); - this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, false, false); -+ this.delegate = Utils.getColumnReader(sparkType, descriptor, importer, batchSize, true, false); ++ ++ spec = CometTypeUtils.descriptorToParquetColumnSpec(descriptor); ++ ++ boolean useLegacyTime = ++ Boolean.parseBoolean( ++ SQLConf.get() ++ .getConfString( ++ CometConf.COMET_EXCEPTION_ON_LEGACY_DATE_TIMESTAMP().key(), "false")); ++ boolean useLazyMaterialization = ++ Boolean.parseBoolean( ++ SQLConf.get().getConfString(CometConf.COMET_USE_LAZY_MATERIALIZATION().key(), "false")); ++ this.delegate = ++ Utils.getColumnReader( ++ sparkType, ++ spec, ++ importer, ++ batchSize, ++ true, // Comet sets this to true for native execution ++ useLazyMaterialization, ++ useLegacyTime); this.initialized = true; } -diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java -index a361a7f..9021cd5 100644 ---- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java -+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java -@@ -24,6 +24,7 @@ import java.util.Objects; - import java.util.Set; +@@ -115,9 +143,9 @@ class CometColumnReader implements VectorizedReader { + *

NOTE: this should be called before reading a new Parquet column chunk, and after {@link + * CometColumnReader#reset} is called. + */ +- public void setPageReader(PageReader pageReader) throws IOException { ++ public void setPageReader(RowGroupReader pageStore) throws IOException { + Preconditions.checkState(initialized, "Invalid state: 'reset' should be called first"); +- ((ColumnReader) delegate).setPageReader(pageReader); ++ ((ColumnReader) delegate).setRowGroupReader(pageStore, spec); + } + + @Override +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +index 1440e5d..fc6b283 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometColumnarBatchReader.java +@@ -23,7 +23,8 @@ import java.io.UncheckedIOException; + import java.util.List; + import java.util.Map; + import org.apache.comet.parquet.AbstractColumnReader; +-import org.apache.comet.parquet.BatchReader; ++import org.apache.comet.parquet.IcebergCometBatchReader; ++import org.apache.comet.parquet.RowGroupReader; + import org.apache.iceberg.Schema; + import org.apache.iceberg.data.DeleteFilter; + import org.apache.iceberg.parquet.VectorizedReader; +@@ -55,7 +56,7 @@ class CometColumnarBatchReader implements VectorizedReader { + // calling BatchReader.nextBatch, the isDeleted value is not yet available, so + // DeleteColumnReader.readBatch must be called explicitly later, after the isDeleted value is + // available. +- private final BatchReader delegate; ++ private final IcebergCometBatchReader delegate; + private DeleteFilter deletes = null; + private long rowStartPosInBatch = 0; + +@@ -65,9 +66,7 @@ class CometColumnarBatchReader implements VectorizedReader { + this.hasIsDeletedColumn = + readers.stream().anyMatch(reader -> reader instanceof CometDeleteColumnReader); + +- AbstractColumnReader[] abstractColumnReaders = new AbstractColumnReader[readers.size()]; +- this.delegate = new BatchReader(abstractColumnReaders); +- delegate.setSparkSchema(SparkSchemaUtil.convert(schema)); ++ this.delegate = new IcebergCometBatchReader(readers.size(), SparkSchemaUtil.convert(schema)); + } + + @Override +@@ -85,19 +84,22 @@ class CometColumnarBatchReader implements VectorizedReader { + && !(readers[i] instanceof CometPositionColumnReader) + && !(readers[i] instanceof CometDeleteColumnReader)) { + readers[i].reset(); +- readers[i].setPageReader(pageStore.getPageReader(readers[i].descriptor())); ++ readers[i].setPageReader((RowGroupReader) pageStore); + } + } catch (IOException e) { + throw new UncheckedIOException("Failed to setRowGroupInfo for Comet vectorization", e); + } + } + ++ AbstractColumnReader[] delegateReaders = new AbstractColumnReader[readers.length]; + for (int i = 0; i < readers.length; i++) { +- delegate.getColumnReaders()[i] = this.readers[i].delegate(); ++ delegateReaders[i] = readers[i].delegate(); + } + ++ delegate.init(delegateReaders); ++ + this.rowStartPosInBatch = +- pageStore ++ ((RowGroupReader) pageStore) + .getRowIndexOffset() + .orElseThrow( + () -> +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +index 047c963..88d691a 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometConstantColumnReader.java +@@ -21,6 +21,7 @@ package org.apache.iceberg.spark.data.vectorized; + import java.math.BigDecimal; + import java.nio.ByteBuffer; + import org.apache.comet.parquet.ConstantColumnReader; ++import org.apache.iceberg.parquet.CometTypeUtils; + import org.apache.iceberg.types.Types; + import org.apache.spark.sql.types.DataType; + import org.apache.spark.sql.types.DataTypes; +@@ -34,7 +35,11 @@ class CometConstantColumnReader extends CometColumnReader { + super(field); + // use delegate to set constant value on the native side to be consumed by native execution. + setDelegate( +- new ConstantColumnReader(sparkType(), descriptor(), convertToSparkValue(value), false)); ++ new ConstantColumnReader( ++ sparkType(), ++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor()), ++ convertToSparkValue(value), ++ false)); + } + + @Override +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +index 6235bfe..cba108e 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometDeleteColumnReader.java +@@ -51,10 +51,10 @@ class CometDeleteColumnReader extends CometColumnReader { + DeleteColumnReader() { + super( + DataTypes.BooleanType, +- TypeUtil.convertToParquet( ++ TypeUtil.convertToParquetSpec( + new StructField("_deleted", DataTypes.BooleanType, false, Metadata.empty())), + false /* useDecimal128 = false */, +- false /* isConstant = false */); ++ false /* isConstant */); + this.isDeleted = new boolean[0]; + } + +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +index bcc0e51..98e8006 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometPositionColumnReader.java +@@ -20,6 +20,7 @@ package org.apache.iceberg.spark.data.vectorized; + + import org.apache.comet.parquet.MetadataColumnReader; + import org.apache.comet.parquet.Native; ++import org.apache.iceberg.parquet.CometTypeUtils; + import org.apache.iceberg.types.Types; + import org.apache.parquet.column.ColumnDescriptor; + import org.apache.spark.sql.types.DataTypes; +@@ -44,7 +45,7 @@ class CometPositionColumnReader extends CometColumnReader { + PositionColumnReader(ColumnDescriptor descriptor) { + super( + DataTypes.LongType, +- descriptor, ++ CometTypeUtils.descriptorToParquetColumnSpec(descriptor), + false /* useDecimal128 = false */, + false /* isConstant = false */); + } +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +index d36f1a7..56f8c9b 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/CometVectorizedReaderBuilder.java +@@ -142,6 +142,7 @@ class CometVectorizedReaderBuilder extends TypeWithSchemaVisitor extends BaseReader taskGroup = (ScanTaskGroup) task; ++ return taskGroup.tasks().stream().allMatch(this::supportsCometBatchReads); ++ ++ } else if (task.isFileScanTask() && !task.isDataTask()) { ++ FileScanTask fileScanTask = task.asFileScanTask(); ++ // Comet can't handle delete files for now ++ return fileScanTask.file().format() == FileFormat.PARQUET && fileScanTask.deletes().isEmpty(); ++ ++ } else { ++ return false; ++ } ++ } ++ + // conditions for using ORC batch reads: + // - ORC vectorization is enabled + // - all tasks are of type FileScanTask and read only ORC files with no delete files +diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +index 019f391..656e060 100644 +--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java ++++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +@@ -23,6 +23,7 @@ import java.util.List; + import java.util.Map; import java.util.function.Supplier; import java.util.stream.Collectors; +import org.apache.comet.parquet.SupportsComet; - import org.apache.iceberg.DeleteFile; - import org.apache.iceberg.FileContent; - import org.apache.iceberg.FileScanTask; -@@ -63,7 +64,7 @@ import org.slf4j.Logger; + import org.apache.iceberg.BlobMetadata; + import org.apache.iceberg.ScanTask; + import org.apache.iceberg.ScanTaskGroup; +@@ -94,7 +95,7 @@ import org.apache.spark.sql.types.StructType; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; - class SparkBatchQueryScan extends SparkPartitioningAwareScan -- implements SupportsRuntimeV2Filtering { -+ implements SupportsRuntimeV2Filtering, SupportsComet { +-abstract class SparkScan implements Scan, SupportsReportStatistics { ++abstract class SparkScan implements Scan, SupportsReportStatistics, SupportsComet { + private static final Logger LOG = LoggerFactory.getLogger(SparkScan.class); + private static final String NDV_KEY = "ndv"; - private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); - -@@ -290,4 +291,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan - runtimeFilterExpressions, - caseSensitive()); +@@ -348,4 +349,10 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { + return splitSize; + } } + + @Override + public boolean isCometEnabled() { -+ return true; ++ SparkBatch batch = (SparkBatch) this.toBatch(); ++ return batch.useCometBatchReads(); + } } -diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java -index 7404b18..6ce9485 100644 ---- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java -+++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataFrameWriterV2.java -@@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException; - import org.apache.spark.sql.internal.SQLConf; - import org.junit.jupiter.api.AfterEach; - import org.junit.jupiter.api.BeforeEach; -+import org.junit.jupiter.api.Disabled; - import org.junit.jupiter.api.TestTemplate; - - public class TestDataFrameWriterV2 extends TestBaseWithCatalog { -@@ -248,7 +249,7 @@ public class TestDataFrameWriterV2 extends TestBaseWithCatalog { - sql("select * from %s order by id", tableName)); - } - -- @TestTemplate -+ @Disabled - public void testMergeSchemaIgnoreCastingLongToInt() throws Exception { - sql( - "ALTER TABLE %s SET TBLPROPERTIES ('%s'='true')", -@@ -288,7 +289,7 @@ public class TestDataFrameWriterV2 extends TestBaseWithCatalog { - assertThat(idField.type().typeId()).isEqualTo(Type.TypeID.LONG); - } - -- @TestTemplate -+ @Disabled - public void testMergeSchemaIgnoreCastingDoubleToFloat() throws Exception { - removeTables(); - sql("CREATE TABLE %s (id double, data string) USING iceberg", tableName); +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java +index 404ba72..64c5e51 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java +@@ -90,6 +90,14 @@ public abstract class SparkDistributedDataScanTestBase + .master("local[2]") + .config("spark.serializer", serializer) + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + } +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +index 659507e..ead2de6 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java +@@ -73,6 +73,14 @@ public class TestSparkDistributedDataScanDeletes + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java +index a218f96..a11f574 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java +@@ -62,6 +62,14 @@ public class TestSparkDistributedDataScanFilterFiles + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +index 2665d7b..95647d7 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java +@@ -63,6 +63,14 @@ public class TestSparkDistributedDataScanReporting + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config(SQLConf.SHUFFLE_PARTITIONS().key(), "4") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +index de68351..248f927 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestBase.java +@@ -77,6 +77,15 @@ public abstract class TestBase extends SparkTestHelperBase { + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .config("spark.comet.exec.broadcastExchange.enabled", "false") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java +index bc4e722..a2d8346 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java +@@ -59,7 +59,18 @@ public class TestParquetDictionaryEncodedVectorizedReads extends TestParquetVect + + @BeforeAll + public static void startSpark() { +- spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +index 3a26974..9be02b9 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/ScanTestBase.java +@@ -54,7 +54,18 @@ public abstract class ScanTestBase extends AvroDataTest { + + @BeforeAll + public static void startSpark() { +- ScanTestBase.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ ScanTestBase.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + ScanTestBase.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +index f411920..f2a9593 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestCompressionSettings.java +@@ -144,7 +144,18 @@ public class TestCompressionSettings extends CatalogTestBase { + + @BeforeAll + public static void startSpark() { +- TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestCompressionSettings.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @BeforeEach +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +index c4ba96e..5a1073d 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestDataSourceOptions.java +@@ -75,7 +75,18 @@ public class TestDataSourceOptions extends TestBaseWithCatalog { + + @BeforeAll + public static void startSpark() { +- TestDataSourceOptions.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestDataSourceOptions.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +index 3481735..688daba 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java +@@ -110,7 +110,18 @@ public class TestFilteredScan { + + @BeforeAll + public static void startSpark() { +- TestFilteredScan.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestFilteredScan.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +index 84c99a5..58a414b 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestForwardCompatibility.java +@@ -93,7 +93,18 @@ public class TestForwardCompatibility { + + @BeforeAll + public static void startSpark() { +- TestForwardCompatibility.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestForwardCompatibility.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java +index 7eff93d..990f386 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSpark.java +@@ -46,7 +46,18 @@ public class TestIcebergSpark { + + @BeforeAll + public static void startSpark() { +- TestIcebergSpark.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestIcebergSpark.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +index 9464f68..e8d08ec 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionPruning.java +@@ -112,7 +112,18 @@ public class TestPartitionPruning { + + @BeforeAll + public static void startSpark() { +- TestPartitionPruning.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestPartitionPruning.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + TestPartitionPruning.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + + String optionKey = String.format("fs.%s.impl", CountOpenLocalFileSystem.scheme); +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +index 5c218f2..829b67b 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java +@@ -107,7 +107,18 @@ public class TestPartitionValues { + + @BeforeAll + public static void startSpark() { +- TestPartitionValues.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestPartitionValues.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +index a7334a5..abe55f2 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSnapshotSelection.java +@@ -87,7 +87,18 @@ public class TestSnapshotSelection { + + @BeforeAll + public static void startSpark() { +- TestSnapshotSelection.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestSnapshotSelection.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +index 182b1ef..ffceac5 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataFile.java +@@ -120,7 +120,18 @@ public class TestSparkDataFile { + + @BeforeAll + public static void startSpark() { +- TestSparkDataFile.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestSparkDataFile.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + TestSparkDataFile.sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +index fb2b312..58911fc 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java +@@ -96,7 +96,18 @@ public class TestSparkDataWrite { + + @BeforeAll + public static void startSpark() { +- TestSparkDataWrite.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestSparkDataWrite.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterEach +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +index becf6a0..b98c2f6 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadProjection.java +@@ -83,7 +83,18 @@ public class TestSparkReadProjection extends TestReadProjection { + + @BeforeAll + public static void startSpark() { +- TestSparkReadProjection.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestSparkReadProjection.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + ImmutableMap config = + ImmutableMap.of( + "type", "hive", +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +index 4f1cef5..f31c9b1 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderDeletes.java +@@ -136,6 +136,14 @@ public class TestSparkReaderDeletes extends DeleteReadTests { + .config("spark.ui.liveUpdate.period", 0) + .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), "dynamic") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java +index baf7fa8..02cf1c8 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReaderWithBloomFilter.java +@@ -182,6 +182,14 @@ public class TestSparkReaderWithBloomFilter { + SparkSession.builder() + .master("local[2]") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java +index 17db46b..e6afced 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreaming.java +@@ -65,6 +65,14 @@ public class TestStructuredStreaming { + SparkSession.builder() + .master("local[2]") + .config("spark.sql.shuffle.partitions", 4) ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .getOrCreate(); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +index 306444b..525ddb0 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestTimestampWithoutZone.java +@@ -75,7 +75,18 @@ public class TestTimestampWithoutZone extends TestBase { + + @BeforeAll + public static void startSpark() { +- TestTimestampWithoutZone.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestTimestampWithoutZone.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + } + + @AfterAll +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java +index 841268a..8da9ea2 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestWriteMetricsConfig.java +@@ -80,7 +80,18 @@ public class TestWriteMetricsConfig { + + @BeforeAll + public static void startSpark() { +- TestWriteMetricsConfig.spark = SparkSession.builder().master("local[2]").getOrCreate(); ++ TestWriteMetricsConfig.spark = ++ SparkSession.builder() ++ .master("local[2]") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") ++ .getOrCreate(); + TestWriteMetricsConfig.sc = JavaSparkContext.fromSparkContext(spark.sparkContext()); + } + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java +index 6e09252..f92dad4 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestAggregatePushDown.java +@@ -60,6 +60,14 @@ public class TestAggregatePushDown extends CatalogTestBase { + SparkSession.builder() + .master("local[2]") + .config("spark.sql.iceberg.aggregate_pushdown", "true") ++ .config("spark.plugins", "org.apache.spark.CometPlugin") ++ .config("spark.comet.exec.shuffle.enabled", "false") ++ .config("spark.comet.explainFallback.enabled", "true") ++ .config("spark.sql.iceberg.parquet.reader-type", "COMET") ++ .config("spark.memory.offHeap.enabled", "true") ++ .config("spark.memory.offHeap.size", "10g") ++ .config("spark.comet.use.lazyMaterialization", "false") ++ .config("spark.comet.schemaEvolution.enabled", "true") + .enableHiveSupport() + .getOrCreate(); + +diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +index 9d2ce2b..5e23368 100644 +--- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java ++++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestFilterPushDown.java +@@ -598,9 +598,7 @@ public class TestFilterPushDown extends TestBaseWithCatalog { + String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", ""); + + if (sparkFilter != null) { +- assertThat(planAsString) +- .as("Post scan filter should match") +- .contains("Filter (" + sparkFilter + ")"); ++ assertThat(planAsString).as("Post scan filter should match").contains("CometFilter"); + } else { + assertThat(planAsString).as("Should be no post scan filter").doesNotContain("Filter ("); + } diff --git a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala index 6a328f4be2..ac04fe043a 100644 --- a/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala +++ b/spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala @@ -73,6 +73,26 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { }) } + def isIcebergMetadataTable(scanExec: BatchScanExec): Boolean = { + // List of Iceberg metadata tables: + // https://iceberg.apache.org/docs/latest/spark-queries/#inspecting-tables + val metadataTableSuffix = Set( + "history", + "metadata_log_entries", + "snapshots", + "entries", + "files", + "manifests", + "partitions", + "position_deletes", + "all_data_files", + "all_delete_files", + "all_entries", + "all_manifests") + + metadataTableSuffix.exists(suffix => scanExec.table.name().endsWith(suffix)) + } + plan.transform { case scan if hasMetadataCol(scan) => withInfo(scan, "Metadata column is not supported") @@ -83,7 +103,11 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] { // data source V2 case scanExec: BatchScanExec => - transformV2Scan(scanExec) + if (isIcebergMetadataTable(scanExec)) { + withInfo(scanExec, "Iceberg Metadata tables are not supported") + } else { + transformV2Scan(scanExec) + } } } }