Skip to content

[iceberg] Iceberg data files not found when Comet vectorized reader is enabled #2116

@hsiang-c

Description

@hsiang-c

Describe the bug

TestRuntimeFiltering > testBucketedTableWithMultipleSpecs() > catalogName = testhadoop, implementation = org.apache.iceberg.spark.SparkCatalog, config = {type=hadoop, cache-enabled=false}, planningMode = LOCAL FAILED
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 8) (localhost executor driver): org.apache.iceberg.exceptions.NotFoundException: File does not exist: file:/tmp/warehouse5030381489500028172.tmp/default/table/data/id_bucket_8=6/00000-2-e3fa66fe-9a9a-49bd-ba46-e386f0db7577-0-00002.parquet
        at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:164)
        at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:200)
        at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:51)
        at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:194)
        at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:76)
        at org.apache.iceberg.parquet.VectorizedParquetReader.init(VectorizedParquetReader.java:90)
        at org.apache.iceberg.parquet.VectorizedParquetReader.iterator(VectorizedParquetReader.java:99)
        at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:116)
        at org.apache.iceberg.spark.source.BatchDataReader.open(BatchDataReader.java:43)
        at org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:134)
        at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
        at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
        at scala.Option.exists(Option.scala:406)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
        at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at org.apache.spark.sql.comet.CometBatchScanExec$$anon$1.hasNext(CometBatchScanExec.scala:62)
        at org.apache.comet.CometBatchIterator.hasNext(CometBatchIterator.java:50)
        at org.apache.comet.Native.executePlan(Native Method)
        at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2(CometExecIterator.scala:155)
        at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$2$adapted(CometExecIterator.scala:154)
        at org.apache.comet.vector.NativeUtil.getNextBatch(NativeUtil.scala:157)
        at org.apache.comet.CometExecIterator.$anonfun$getNextBatch$1(CometExecIterator.scala:154)
        at org.apache.comet.Tracing$.withTrace(Tracing.scala:31)
        at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:152)
        at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:203)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:601)
        at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
        at scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:583)
        at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:41)
        at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:322)
        at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:320)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
        at org.apache.spark.scheduler.Task.run(Task.scala:141)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:621)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:624)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)

Steps to reproduce

SparkSession configs used

            .config("spark.plugins", "org.apache.spark.CometPlugin")
            .config("spark.shuffle.manager", "org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager")
            .config("spark.comet.explainFallback.enabled", "true")
            .config("spark.sql.iceberg.parquet.reader-type", "COMET")
            .config("spark.memory.offHeap.enabled", "true")
            .config("spark.memory.offHeap.size", "10g")
            .config("spark.comet.use.lazyMaterialization", "false")
            .config("spark.comet.schemaEvolution.enabled", "true")

Expected behavior

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions