diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 551b1bf6c7..9f547ac3cf 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -65,6 +65,7 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -1324,6 +1325,33 @@ private boolean arePartsValidForVectoredIo(List allParts) { return true; } + static class ReleasingAllocator implements ByteBufferAllocator { + ByteBufferAllocator base; + ByteBufferReleaser releaser; + + public ReleasingAllocator(ByteBufferAllocator base, ByteBufferReleaser releaser) { + this.base = base; + this.releaser = releaser; + } + + @Override + public ByteBuffer allocate(int size) { + ByteBuffer res = base.allocate(size); + releaser.releaseLater(res); + return res; + } + + @Override + public void release(ByteBuffer b) { + base.release(b); + } + + @Override + public boolean isDirect() { + return base.isDirect(); + } + } + /** * Read all parts through vectored IO. *

@@ -1354,7 +1382,7 @@ private void readVectored(List allParts, ChunkListBuilder b } LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); // Request a vectored read; - f.readVectored(ranges, options.getAllocator()); + f.readVectored(ranges, new ReleasingAllocator(options.getAllocator(), builder.releaser)); int k = 0; for (ConsecutivePartList consecutivePart : allParts) { ParquetFileRange currRange = ranges.get(k++); @@ -1842,6 +1870,8 @@ void add(ChunkDescriptor descriptor, List buffers, SeekableInputStre this.f = f; } + ByteBufferReleaser getReleaser() { return releaser; } + void addBuffersToRelease(List toRelease) { toRelease.forEach(releaser::releaseLater); }