Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.apache.parquet.HadoopReadOptions;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.bytes.ByteBufferReleaser;
import org.apache.parquet.bytes.BytesInput;
Expand Down Expand Up @@ -1324,6 +1325,33 @@ private boolean arePartsValidForVectoredIo(List<ConsecutivePartList> allParts) {
return true;
}

static class ReleasingAllocator implements ByteBufferAllocator {
ByteBufferAllocator base;
ByteBufferReleaser releaser;

public ReleasingAllocator(ByteBufferAllocator base, ByteBufferReleaser releaser) {
this.base = base;
this.releaser = releaser;
}

@Override
public ByteBuffer allocate(int size) {
ByteBuffer res = base.allocate(size);
releaser.releaseLater(res);
return res;
}

@Override
public void release(ByteBuffer b) {
base.release(b);
}

@Override
public boolean isDirect() {
return base.isDirect();
}
}

/**
* Read all parts through vectored IO.
* <p>
Expand Down Expand Up @@ -1354,7 +1382,7 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
}
LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
// Request a vectored read;
f.readVectored(ranges, options.getAllocator());
f.readVectored(ranges, new ReleasingAllocator(options.getAllocator(), builder.releaser));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the right direction. Is it better to make it a contract for ByteBufferAllocator implementations to take this responsibility?

WDYT? @gszadovszky

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I get the concept of this PR. ByteBufferAllocator actually has the contract of releasing the ByteBuffer allocated by it. The only thing we need to do is to invoke this method at the right time when the related buffer is not needed anymore.

The ByteBufferReleaser concept came into the scope only to easily postpone the release invocation to the time we really can release the related ByteBuffers. (By using BytesInput we may pass the related buffers around and it is not always clear when to release them.)

@annimesh2809, I would suggest you to implement a unit test to reproduce the issue first. You may use TrackingByteBufferAllocator to fail if any allocated buffer is not released during the execution. You may find examples of its usage among the unit tests. If you find the issue, you'll need to ensure that the related allocated buffers are get back to their allocator to release them. You may use the existing patterns we already have or invent new ones if necessary.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I try to build parquet-mr with hadoop 3.4.2 without any additional changes, I see testRangeFiltering test case (and some others) of TestParquetReader suite fail. The TrackingByteBufferAllocator reveals that the unreleased allocation happens in:

  Cause: org.apache.parquet.bytes.TrackingByteBufferAllocator$ByteBufferAllocationStacktraceException: Allocation stacktrace of the first ByteBuffer:
  at org.apache.parquet.bytes.TrackingByteBufferAllocator$ByteBufferAllocationStacktraceException.create(TrackingByteBufferAllocator.java:96)
  at org.apache.parquet.bytes.TrackingByteBufferAllocator.allocate(TrackingByteBufferAllocator.java:136)
  at org.apache.hadoop.fs.impl.VectorIOBufferPool.getBuffer(VectorIOBufferPool.java:65)
  at org.apache.hadoop.fs.RawLocalFileSystem$AsyncHandler.initiateRead(RawLocalFileSystem.java:400)
  at org.apache.hadoop.fs.RawLocalFileSystem$AsyncHandler.access$000(RawLocalFileSystem.java:360)
  at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.readVectored(RawLocalFileSystem.java:345)
  at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileInputStream.readVectored(RawLocalFileSystem.java:324)
  at org.apache.hadoop.fs.BufferedFSInputStream.readVectored(BufferedFSInputStream.java:183)
  at org.apache.hadoop.fs.FSDataInputStream.readVectored(FSDataInputStream.java:308)
  at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readVectored(ChecksumFileSystem.java:474)
  at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.readVectored(ChecksumFileSystem.java:463)

The root cause here seems to be that ChecksumFileSystem (coming from hadoop) starts supporting readVectored: https://github.com/apache/hadoop/blob/branch-3.4.2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java#L460-L513.
ChecksumFileSystem.readVectored internally does more allocations like:

sums.readVectored(checksumRanges, allocate, release);
datas.readVectored(dataRanges, allocate, release);

which are not marked for release by ByteBufferReleaser.

Also with vectored reads, it is not sufficient to mark the buffers returned by the allocator for release, as they are sliced internally and the returned buffer object is different even though the underlying memory remains the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context, @annimesh2809.
Why do you need to track the allocated buffers to be released later instead of simply giving the allocate and release methods of the ByteBufferAllocator instance to the related Hadoop API via the implementations of SeekableInputStream.readVectored? I assume the Hadoop code would release the allocated buffers as soon as they are not needed anymore.

int k = 0;
for (ConsecutivePartList consecutivePart : allParts) {
ParquetFileRange currRange = ranges.get(k++);
Expand Down Expand Up @@ -1842,6 +1870,8 @@ void add(ChunkDescriptor descriptor, List<ByteBuffer> buffers, SeekableInputStre
this.f = f;
}

ByteBufferReleaser getReleaser() { return releaser; }

void addBuffersToRelease(List<ByteBuffer> toRelease) {
toRelease.forEach(releaser::releaseLater);
}
Expand Down
Loading