Skip to content

Commit d39dd1b

Browse files
committed
don't hold previous iterator reference after advancing to next file in ParquetPartitionReaderFactory
1 parent 4cc8101 commit d39dd1b

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ case class ParquetPartitionReaderFactory(
8484
private val datetimeRebaseModeInRead = options.datetimeRebaseModeInRead
8585
private val int96RebaseModeInRead = options.int96RebaseModeInRead
8686

87+
private val parquetReaderCallback = new ParquetReaderCallback()
88+
8789
private def getFooter(file: PartitionedFile): ParquetMetadata = {
8890
val conf = broadcastedConf.value.value
8991
if (aggregation.isDefined || enableVectorizedReader) {
@@ -309,7 +311,8 @@ case class ParquetPartitionReaderFactory(
309311
reader, readDataSchema)
310312
val iter = new RecordReaderIterator(readerWithRowIndexes)
311313
// SPARK-23457 Register a task completion listener before `initialization`.
312-
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
314+
parquetReaderCallback.advanceFile(iter)
315+
taskContext.foreach(parquetReaderCallback.initIfNotAlready)
313316
readerWithRowIndexes
314317
}
315318

@@ -337,8 +340,39 @@ case class ParquetPartitionReaderFactory(
337340
capacity)
338341
val iter = new RecordReaderIterator(vectorizedReader)
339342
// SPARK-23457 Register a task completion listener before `initialization`.
340-
taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
343+
parquetReaderCallback.advanceFile(iter)
344+
taskContext.foreach(parquetReaderCallback.initIfNotAlready)
341345
logDebug(s"Appending $partitionSchema $partitionValues")
342346
vectorizedReader
343347
}
344348
}
349+
350+
/**
351+
* A callback class to handle the cleanup of Parquet readers.
352+
*
353+
* This class is used to ensure that the Parquet readers are closed properly when the task
354+
* completes, and it also allows for the initialization of the reader callback only once per task.
355+
*/
356+
private class ParquetReaderCallback extends Serializable {
357+
private var init: Boolean = false
358+
private var iter: RecordReaderIterator[_] = null
359+
360+
def initIfNotAlready(taskContext: TaskContext): Unit = {
361+
if (!init) {
362+
taskContext.addTaskCompletionListener[Unit](_ => execute())
363+
init = true
364+
}
365+
}
366+
367+
def advanceFile(iter: RecordReaderIterator[_]): Unit = {
368+
execute()
369+
370+
this.iter = iter
371+
}
372+
373+
def execute(): Unit = {
374+
if (iter != null) {
375+
iter.close()
376+
}
377+
}
378+
}

0 commit comments

Comments
 (0)