Skip to content

Spark: Doing a Coalesce and foreachpartitions in spark directly on an iceberg table is leaking memory heavy iterators #13297

@jkolash

Description

@jkolash

Apache Iceberg version

1.5.0

Query engine

Spark

Please describe the bug 🐞

Summary

Doing the following should not leak any significant amount of memory.

sparkSession.sql("select * from icebergcatalog.db.table").coalesce(4).foreachPartition( (iterator) -> {
 while (iterator.hasNext()) iterator.next();
});

A workaround is to use repartition() instead however this requires more resources to handle spilling shuffling etc..

Spark version: Spark 3.4.X

Details

The Below code can be run on a sufficiently large iceberg table.

  static AtomicInteger partitionCounter = new AtomicInteger(0);

    static void reproduceBug(SparkSession sparkSession, String table) {

        sparkSession.sql("select * from "+table).coalesce(4).foreachPartition( (iterator) -> {
            int partition = partitionCounter.getAndIncrement();
            AtomicLong rowCounter = ThreadLocal.withInitial(() -> new AtomicLong(0)).get();

            while (iterator.hasNext()) {
                iterator.next();
                if (rowCounter.getAndIncrement() % 100000 == 0) {
                    System.out.println(partition + " " + rowCounter.get());
                }
            }
        });
    }

The following image is me running the reproduceBug method over sufficiently large table that we have in our environment with ~500 columns.

Image

The following image shows the "Dominators" report in VisualVM org.apache.spark.TaskContextImpl

Image

Digging deeper we see that the onCallbacks is keeping an anonymous class inside org.apache.spark.sql.execution.datasources.v2.DataSourceRDD and that is holding a reference to org.apache.iceberg.spark.source.RowDataReader

I believe this callback is added here https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceRDD.scala#L90
We also see the org.apache.iceberg.util.Filter iterator holding a heavy reference.
Image

Exploring the problem

Is this inherently a bug in org.apache.spark.sql.execution.datasources.v2.DataSourceRDD? Or should iterators not hold onto state no longer needed once advanced to the end? Is the iterator even exhausted? Once an iterator is exhausted there is no longer a need for referencing. However this kind of breaks the concept of a CloseableIterator which has an explicit close vs an implicit close where you could detect hasNext() is false and auto-close. then even ignore a duplicate close() as it was handled by an implicit close() of iterator exhaustion. I believe an iterator accumulating hundreds of megabytes of state kind of breaks the implicit "expected" contract of an iterator being a streaming set of objects. There might even be a distinction between closing and simply holding onto large object references.

Digging deeper I see items = org.apache.iceberg.parquet.ParquetReader$FileIterator#6 holding onto a model reference. It might be possible to null out the model references when hasNext() is false.

   @Override
    public boolean hasNext() {
      boolean hasNext = valuesRead < totalValues; 
      if (!hasNext) {
        this.model = null;
      }
      return hasNext;
    }
Image

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions