Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/java/org/apache/cassandra/net/BufferPoolAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ public long usedSizeInBytes()
return bufferPool.usedSizeInBytes();
}

@VisibleForTesting
long overflowMemoryInBytes()
{
return bufferPool.overflowMemoryInBytes();
}

void release()
{
}
Expand All @@ -117,6 +123,7 @@ public ByteBuf capacity(int newCapacity)

ByteBuf newBuffer = super.capacity(newCapacity);
ByteBuffer nioBuffer = newBuffer.nioBuffer(0, newBuffer.capacity());
nioBuffer = bufferPool.unwrapBufferPoolManagedBuffer(nioBuffer);

bufferPool.put(wrapped);
wrapped = nioBuffer;
Expand Down
26 changes: 26 additions & 0 deletions src/java/org/apache/cassandra/utils/memory/BufferPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -1613,4 +1613,30 @@ int unsafeNumChunks()
+ (pool.chunks.chunk1 != null ? 1 : 0)
+ (pool.chunks.chunk2 != null ? 1 : 0);
}

/**
* @return the inner buffer if it has a BufferPool.Chunk attached
* and originalBuffer in other cases
*/
public ByteBuffer unwrapBufferPoolManagedBuffer(ByteBuffer originalBuffer)
{
int MAX_DEPTH = 32; // a protection against possible loops in attachments
int depth = 0;
ByteBuffer buffer = originalBuffer;
do
{
if (buffer == null || !isExactlyDirect(buffer))
return originalBuffer;
if (Chunk.getParentChunk(buffer) != null)
return buffer;

Object attachment = MemoryUtil.getAttachment(buffer);
if (!(attachment instanceof ByteBuffer))
return originalBuffer;
Comment on lines +1629 to +1635
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the two places where we fallback to return originalBuffer; is there any difference if originalBuffer or the previous buffer value (not the current value of buffer) was returned ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I relied on a common sense logic here: if we get something non-expected as an input - do not modify it and return the original value (it is also a fallback to the original behaviour), it is just a safer approach by default, IMHO

buffer = (ByteBuffer) attachment;
depth++;
}
while (depth < MAX_DEPTH);
return originalBuffer;
}
}
107 changes: 62 additions & 45 deletions test/unit/org/apache/cassandra/net/BufferPoolAllocatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,57 +22,68 @@
import java.util.Arrays;
import java.util.Random;

import org.junit.BeforeClass;
import org.junit.Test;

import io.netty.buffer.ByteBuf;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.assertj.core.api.Assertions;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;

public class BufferPoolAllocatorTest
{
@Test
public void testAdoptedBufferContentAfterResize() {

@BeforeClass
public static void beforeClass()
{
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
// cache size hould be more than a macro chunk size for proper pool testing
// if it is 0 or less than a macro chunk size we actually do not pool
DatabaseDescriptor.getRawConfig().networking_cache_size_in_mb = 128;
}

@Test
public void testAdoptedBufferContentAfterResize() {
ByteBuf buffer = allocateByteBuf(200, 500);
int originalCapacity = buffer.capacity();
byte[] content = new byte[300];

Random rand = new Random();
rand.nextBytes(content);

buffer.writeBytes(Arrays.copyOfRange(content, 0, 200));
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());

buffer.writeBytes(Arrays.copyOfRange(content, 200, 300));
int increasedCapacity = buffer.capacity();
assertEquals(increasedCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());

byte[] bufferContent = new byte[300];

BufferPoolAllocator.Wrapped wrapped = (BufferPoolAllocator.Wrapped) buffer;
ByteBuffer adopted = wrapped.adopt();
adopted.get(bufferContent);
assertArrayEquals(content, bufferContent);
assertEquals(500, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(increasedCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());

GlobalBufferPoolAllocator.instance.put(adopted);
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

@Test
public void testAdoptedBufferContentBeforeResize() {
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 300);
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ByteBuf buffer = allocateByteBuf(200, 300);
int originalCapacity = buffer.capacity();

byte[] content = new byte[200];

Random rand = new Random();
rand.nextBytes(content);

buffer.writeBytes(content);
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());

byte[] bufferContent = new byte[200];

Expand All @@ -82,115 +93,121 @@ public void testAdoptedBufferContentBeforeResize() {
assertArrayEquals(content, bufferContent);

GlobalBufferPoolAllocator.instance.put(adopted);
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

@Test
public void testPutPooledBufferBackIntoPool() {
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ByteBuf buffer = allocateByteBuf(200, 500);
buffer.writeBytes(new byte[200]);

buffer.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

@Test
public void testPutResizedBufferBackIntoPool() {
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 500);
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ByteBuf buffer = allocateByteBuf(200, 500);
buffer.writeBytes(new byte[500]);

buffer.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

@Test
public void testBufferDefaultMaxCapacity()
{
DatabaseDescriptor.clientInitialization();
ByteBuf noMaxCapacity = GlobalBufferPoolAllocator.instance.buffer(100);
noMaxCapacity.writeBytes(new byte[100]);
assertEquals(100, noMaxCapacity.readableBytes());
noMaxCapacity.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

@Test
public void testBufferWithMaxCapacity()
{
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(100, 500);
ByteBuf buffer = allocateByteBuf(100, 500);
buffer.writeBytes(new byte[500]);
assertEquals(500, buffer.readableBytes());
assertEquals(500, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());
buffer.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

@Test
public void testBufferContentAfterResize()
{
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(200, 300);
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ByteBuf buffer = allocateByteBuf(200, 300);
int originalCapacity = buffer.capacity();

byte[] content = new byte[300];

Random rand = new Random();
rand.nextBytes(content);

buffer.writeBytes(Arrays.copyOfRange(content, 0, 200));
assertEquals(200, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());

buffer.writeBytes(Arrays.copyOfRange(content, 200, 300));

byte[] bufferContent = new byte[300];
buffer.readBytes(bufferContent);
assertArrayEquals(content, bufferContent);
assertEquals(300, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
Assertions.assertThat(buffer.capacity()).isGreaterThanOrEqualTo(300);
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());

buffer.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();

}

@Test(expected = IndexOutOfBoundsException.class)
public void testBufferExceedMaxCapacity()
{
DatabaseDescriptor.clientInitialization();
ByteBuf maxCapacity = GlobalBufferPoolAllocator.instance.buffer(100, 200);
ByteBuf maxCapacity = allocateByteBuf(100, 200);
try
{
maxCapacity.writeBytes(new byte[300]);
} finally {
maxCapacity.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}
}

@Test
public void testResizeBufferMultipleTimes()
{
DatabaseDescriptor.clientInitialization();
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(100, 2000);
ByteBuf buffer = allocateByteBuf(100, 2000);
buffer.writeBytes(new byte[200]);
assertEquals(200, buffer.readableBytes());
assertEquals(256, buffer.capacity());
assertEquals(256, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());

buffer.writeBytes(new byte[100]);
assertEquals(300, buffer.readableBytes());
assertEquals(512, buffer.capacity());
assertEquals(512, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());

buffer.writeBytes(new byte[300]);
assertEquals(600, buffer.readableBytes());
assertEquals(1024, buffer.capacity());
assertEquals(1024, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(buffer.capacity(), GlobalBufferPoolAllocator.instance.usedSizeInBytes());

buffer.release();
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
ensureThatAllMemoryIsReturnedBackToBufferPool();
}

private static ByteBuf allocateByteBuf(int initialCapacity, int maxCapacity)
{
ByteBuf buffer = GlobalBufferPoolAllocator.instance.buffer(initialCapacity, maxCapacity);
int originalCapacity = buffer.capacity();

// BufferPool can allocate more capacity than requested to avoid fragmentation
Assertions.assertThat(originalCapacity).isGreaterThanOrEqualTo(initialCapacity);
assertEquals(originalCapacity, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
return buffer;
}

private static void ensureThatAllMemoryIsReturnedBackToBufferPool()
{
assertEquals(0, GlobalBufferPoolAllocator.instance.usedSizeInBytes());
assertEquals(0, GlobalBufferPoolAllocator.instance.overflowMemoryInBytes());
}
}