diff --git a/httpcore5-testing/pom.xml b/httpcore5-testing/pom.xml index cdaa813a0c..cf50172a30 100644 --- a/httpcore5-testing/pom.xml +++ b/httpcore5-testing/pom.xml @@ -171,6 +171,9 @@ -classpath org.openjdk.jmh.Main + + -prof + gc -rf json -rff diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/ByteBufferAllocatorBenchmark.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/ByteBufferAllocatorBenchmark.java new file mode 100644 index 0000000000..a74e30d704 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/benchmark/ByteBufferAllocatorBenchmark.java @@ -0,0 +1,217 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.benchmark; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +import org.apache.hc.core5.io.ByteBufferAllocator; +import org.apache.hc.core5.io.PooledByteBufferAllocator; +import org.apache.hc.core5.io.SimpleByteBufferAllocator; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS) +@Fork(1) +public class ByteBufferAllocatorBenchmark { + + public enum BufferKind { + HEAP, + DIRECT + } + + /** + * Per-thread state: each thread gets its own allocators. + * This measures the best-case hot-path behaviour with no contention + * on the pooled allocator. + */ + @State(Scope.Thread) + public static class ThreadLocalState { + + @Param({"1024", "8192", "65536"}) + public int bufferSize; + + @Param({"100"}) + public int iterations; + + @Param({"HEAP", "DIRECT"}) + public BufferKind kind; + + public ByteBufferAllocator simpleAllocator; + public ByteBufferAllocator pooledAllocator; + + public byte[] payload; + + @Setup + public void setUp() { + this.simpleAllocator = SimpleByteBufferAllocator.INSTANCE; + this.pooledAllocator = new PooledByteBufferAllocator( + 1024, // min pooled size + 256 * 1024, // max pooled size + 1024, // maxGlobalPerBucket + 64 // maxLocalPerBucket + ); + this.payload = new byte[bufferSize / 2]; + for (int i = 0; i < this.payload.length; i++) { + this.payload[i] = (byte) (i & 0xFF); + } + } + + } + + /** + * Shared state: all threads share the same allocators. + * This measures contention on the global buckets and any shared + * structures inside the pooled allocator. + */ + @State(Scope.Benchmark) + public static class SharedState { + + @Param({"1024", "8192", "65536"}) + public int bufferSize; + + @Param({"100"}) + public int iterations; + + @Param({"HEAP", "DIRECT"}) + public BufferKind kind; + + public ByteBufferAllocator simpleAllocator; + public ByteBufferAllocator pooledAllocator; + + public byte[] payload; + + @Setup + public void setUp() { + this.simpleAllocator = SimpleByteBufferAllocator.INSTANCE; + this.pooledAllocator = new PooledByteBufferAllocator( + 1024, + 256 * 1024, + 1024, + 64 + ); + this.payload = new byte[bufferSize / 2]; + for (int i = 0; i < this.payload.length; i++) { + this.payload[i] = (byte) (i & 0xFF); + } + } + + } + + private static ByteBuffer allocate( + final ByteBufferAllocator allocator, + final BufferKind kind, + final int bufferSize) { + if (kind == BufferKind.DIRECT) { + return allocator.allocateDirect(bufferSize); + } + return allocator.allocate(bufferSize); + } + + + @Benchmark + public void pooled_allocator_thread_local(final ThreadLocalState state, final Blackhole blackhole) { + final int bufferSize = state.bufferSize; + final int iterations = state.iterations; + final ByteBufferAllocator allocator = state.pooledAllocator; + final byte[] payload = state.payload; + final BufferKind kind = state.kind; + + for (int i = 0; i < iterations; i++) { + final ByteBuffer buf = allocate(allocator, kind, bufferSize); + buf.put(payload); + buf.flip(); + blackhole.consume(buf.get(0)); + allocator.release(buf); + } + } + + @Benchmark + public void simple_allocator_thread_local(final ThreadLocalState state, final Blackhole blackhole) { + final int bufferSize = state.bufferSize; + final int iterations = state.iterations; + final ByteBufferAllocator allocator = state.simpleAllocator; + final byte[] payload = state.payload; + final BufferKind kind = state.kind; + + for (int i = 0; i < iterations; i++) { + final ByteBuffer buf = allocate(allocator, kind, bufferSize); + buf.put(payload); + buf.flip(); + blackhole.consume(buf.get(0)); + allocator.release(buf); + } + } + + @Benchmark + public void pooled_allocator_shared(final SharedState state, final Blackhole blackhole) { + final int bufferSize = state.bufferSize; + final int iterations = state.iterations; + final ByteBufferAllocator allocator = state.pooledAllocator; + final byte[] payload = state.payload; + final BufferKind kind = state.kind; + + for (int i = 0; i < iterations; i++) { + final ByteBuffer buf = allocate(allocator, kind, bufferSize); + buf.put(payload); + buf.flip(); + blackhole.consume(buf.get(0)); + allocator.release(buf); + } + } + + @Benchmark + public void simple_allocator_shared(final SharedState state, final Blackhole blackhole) { + final int bufferSize = state.bufferSize; + final int iterations = state.iterations; + final ByteBufferAllocator allocator = state.simpleAllocator; + final byte[] payload = state.payload; + final BufferKind kind = state.kind; + + for (int i = 0; i < iterations; i++) { + final ByteBuffer buf = allocate(allocator, kind, bufferSize); + buf.put(payload); + buf.flip(); + blackhole.consume(buf.get(0)); + allocator.release(buf); + } + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/io/ByteBufferAllocator.java b/httpcore5/src/main/java/org/apache/hc/core5/io/ByteBufferAllocator.java new file mode 100644 index 0000000000..cce19e9b1a --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/io/ByteBufferAllocator.java @@ -0,0 +1,71 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.io; + +import java.nio.ByteBuffer; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; + +/** + * Strategy for allocating and releasing {@link ByteBuffer} instances. + *

+ * Implementations may allocate fresh buffers on every call or reuse + * buffers from a pool. + * + * @since 5.4 + */ +@Contract(threading = ThreadingBehavior.STATELESS) +public interface ByteBufferAllocator { + + /** + * Allocates a new heap buffer of the given capacity. + * + * @param capacity buffer capacity in bytes; non-negative. + * @return a heap {@link ByteBuffer} with the given capacity. + */ + ByteBuffer allocate(int capacity); + + /** + * Allocates a new direct buffer of the given capacity. + * + * @param capacity buffer capacity in bytes; non-negative. + * @return a direct {@link ByteBuffer} with the given capacity. + */ + ByteBuffer allocateDirect(int capacity); + + /** + * Releases a buffer back to the allocator. + *

+ * Implementations that do not pool buffers may choose to ignore + * this call. + * + * @param buffer the buffer to release; may be {@code null}. + */ + void release(ByteBuffer buffer); + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/io/PooledByteBufferAllocator.java b/httpcore5/src/main/java/org/apache/hc/core5/io/PooledByteBufferAllocator.java new file mode 100644 index 0000000000..2a0881fae3 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/io/PooledByteBufferAllocator.java @@ -0,0 +1,311 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.io; + +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.util.Args; + +/** + * {@link ByteBufferAllocator} implementation backed by a bucketed buffer pool + * with global queues and per-thread caches, inspired by Netty's pooled buffer + * allocator. + *

+ * Buffer capacities are rounded up to the nearest power of two between + * {@code minCapacity} and {@code maxCapacity}. Released buffers are cached in + * a per-thread cache (up to {@code maxLocalPerBucket}) and then in a global + * bucket (up to {@code maxGlobalPerBucket}). + *

+ * The returned buffers may have an underlying {@link ByteBuffer#capacity()} + * greater than the requested capacity; however, their {@link ByteBuffer#limit(int)} + * is set to the requested capacity. + * + * @since 5.4 + */ +@Contract(threading = ThreadingBehavior.SAFE) +public final class PooledByteBufferAllocator implements ByteBufferAllocator { + + private static final class GlobalBucket { + + final ConcurrentLinkedQueue queue; + final AtomicInteger size; + + GlobalBucket() { + this.queue = new ConcurrentLinkedQueue<>(); + this.size = new AtomicInteger(); + } + + } + + private static final class LocalCache { + + final Deque[] heapBuckets; + final Deque[] directBuckets; + + LocalCache(final int bucketCount) { + @SuppressWarnings("unchecked") + final Deque[] heap = new Deque[bucketCount]; + @SuppressWarnings("unchecked") + final Deque[] direct = new Deque[bucketCount]; + for (int i = 0; i < bucketCount; i++) { + heap[i] = new ArrayDeque<>(); + direct[i] = new ArrayDeque<>(); + } + this.heapBuckets = heap; + this.directBuckets = direct; + } + + } + + private final int minCapacity; + private final int maxCapacity; + private final int minShift; + private final int[] bucketCapacities; + private final GlobalBucket[] heapBuckets; + private final GlobalBucket[] directBuckets; + private final int maxGlobalPerBucket; + private final int maxLocalPerBucket; + + private final ThreadLocal localCache; + + /** + * Creates a new pooled allocator. + * + * @param minCapacity minimum capacity (inclusive) to pool, in bytes. + * @param maxCapacity maximum capacity (inclusive) to pool, in bytes. + * @param maxGlobalPerBucket maximum number of buffers to keep per global + * bucket and memory type (heap/direct). + * @param maxLocalPerBucket maximum number of buffers to keep per-thread + * cache bucket and memory type (heap/direct). + */ + public PooledByteBufferAllocator( + final int minCapacity, + final int maxCapacity, + final int maxGlobalPerBucket, + final int maxLocalPerBucket) { + Args.notNegative(minCapacity, "Min capacity"); + Args.notNegative(maxCapacity, "Max capacity"); + Args.notNegative(maxGlobalPerBucket, "Max global per bucket"); + Args.notNegative(maxLocalPerBucket, "Max local per bucket"); + Args.check(maxCapacity >= minCapacity, "Max capacity must be >= min capacity"); + + this.minCapacity = normalizeCapacity(minCapacity); + this.maxCapacity = normalizeCapacity(maxCapacity); + this.maxGlobalPerBucket = maxGlobalPerBucket; + this.maxLocalPerBucket = maxLocalPerBucket; + + this.minShift = Integer.numberOfTrailingZeros(this.minCapacity); + final int maxShift = Integer.numberOfTrailingZeros(this.maxCapacity); + final int bucketCount = maxShift - this.minShift + 1; + + this.bucketCapacities = new int[bucketCount]; + this.heapBuckets = new GlobalBucket[bucketCount]; + this.directBuckets = new GlobalBucket[bucketCount]; + + int capacity = this.minCapacity; + for (int i = 0; i < bucketCount; i++) { + this.bucketCapacities[i] = capacity; + this.heapBuckets[i] = new GlobalBucket(); + this.directBuckets[i] = new GlobalBucket(); + capacity <<= 1; + } + + this.localCache = ThreadLocal.withInitial(() -> new LocalCache(bucketCount)); + } + + private static int normalizeCapacity(final int capacity) { + if (capacity <= 0) { + return 1; + } + int normalized = 1; + while (normalized < capacity) { + normalized <<= 1; + } + return normalized; + } + + /** + * Returns the bucket index for an arbitrary requested capacity, or {@code -1} + * if the capacity is greater than {@code maxCapacity}. + *

+ * This implementation runs in O(1) using bit operations. It assumes + * {@code minCapacity} and {@code maxCapacity} are powers of two. + */ + private int bucketIndexForRequest(final int capacity) { + if (capacity <= minCapacity) { + return 0; + } + if (capacity > maxCapacity) { + return -1; + } + // Ceil(log2(capacity)) using bit length of (capacity - 1). + final int neededShift = 32 - Integer.numberOfLeadingZeros(capacity - 1); + final int idx = neededShift - minShift; + if (idx < 0 || idx >= bucketCapacities.length) { + return -1; + } + return idx; + } + + /** + * Returns the bucket index for a pooled buffer {@link ByteBuffer#capacity()}. + *

+ * This assumes the capacity is a power of two in the configured range. If + * someone passes in an arbitrary foreign buffer, this returns {@code -1} + * and the buffer is ignored by the pool. + */ + private int bucketIndexForPooledCapacity(final int capacity) { + if (capacity < minCapacity || capacity > maxCapacity) { + return -1; + } + // Must be a power of two. + if ((capacity & (capacity - 1)) != 0) { + return -1; + } + final int idx = Integer.numberOfTrailingZeros(capacity) - minShift; + if (idx < 0 || idx >= bucketCapacities.length) { + return -1; + } + return idx; + } + + private ByteBuffer allocateInternal(final int requestedCapacity, final boolean direct) { + Args.notNegative(requestedCapacity, "Buffer capacity"); + final int idx = bucketIndexForRequest(requestedCapacity); + if (idx < 0) { + // Not pooled: allocate exact requested capacity. + return direct + ? ByteBuffer.allocateDirect(requestedCapacity) + : ByteBuffer.allocate(requestedCapacity); + } + + final LocalCache cache = localCache.get(); + final Deque[] localBuckets = direct + ? cache.directBuckets + : cache.heapBuckets; + final Deque local = localBuckets[idx]; + + ByteBuffer buf = local.pollFirst(); + if (buf != null) { + buf.clear(); + buf.limit(requestedCapacity); + return buf; + } + + if (maxGlobalPerBucket > 0) { + final GlobalBucket[] globalArray = direct ? directBuckets : heapBuckets; + final GlobalBucket global = globalArray[idx]; + + buf = global.queue.poll(); + if (buf != null) { + global.size.decrementAndGet(); + buf.clear(); + buf.limit(requestedCapacity); + return buf; + } + } + + final int bucketCapacity = bucketCapacities[idx]; + buf = direct + ? ByteBuffer.allocateDirect(bucketCapacity) + : ByteBuffer.allocate(bucketCapacity); + buf.limit(requestedCapacity); + return buf; + } + + @Override + public ByteBuffer allocate(final int capacity) { + return allocateInternal(capacity, false); + } + + @Override + public ByteBuffer allocateDirect(final int capacity) { + return allocateInternal(capacity, true); + } + + @Override + public void release(final ByteBuffer buffer) { + if (buffer == null) { + return; + } + final int capacity = buffer.capacity(); + final int idx = bucketIndexForPooledCapacity(capacity); + if (idx < 0) { + // Not a pooled buffer (or foreign capacity), drop it. + return; + } + + final boolean direct = buffer.isDirect(); + final LocalCache cache = localCache.get(); + final Deque[] localBuckets = direct + ? cache.directBuckets + : cache.heapBuckets; + final Deque local = localBuckets[idx]; + + buffer.clear(); + buffer.limit(bucketCapacities[idx]); + + if (local.size() < maxLocalPerBucket) { + local.addFirst(buffer); + return; + } + + if (maxGlobalPerBucket == 0) { + return; + } + + final GlobalBucket[] globalArray = direct ? directBuckets : heapBuckets; + final GlobalBucket global = globalArray[idx]; + + // One atomic on the hot success path instead of get() + increment(). + final int newSize = global.size.incrementAndGet(); + if (newSize > maxGlobalPerBucket) { + global.size.decrementAndGet(); + return; + } + global.queue.offer(buffer); + } + + @Override + public String toString() { + final StringBuilder buf = new StringBuilder(128); + buf.append("PooledByteBufferAllocator[") + .append("minCapacity=").append(minCapacity) + .append(", maxCapacity=").append(maxCapacity) + .append(", maxGlobalPerBucket=").append(maxGlobalPerBucket) + .append(", maxLocalPerBucket=").append(maxLocalPerBucket) + .append(']'); + return buf.toString(); + } + +} diff --git a/httpcore5/src/main/java/org/apache/hc/core5/io/SimpleByteBufferAllocator.java b/httpcore5/src/main/java/org/apache/hc/core5/io/SimpleByteBufferAllocator.java new file mode 100644 index 0000000000..adec88de06 --- /dev/null +++ b/httpcore5/src/main/java/org/apache/hc/core5/io/SimpleByteBufferAllocator.java @@ -0,0 +1,66 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.io; + +import java.nio.ByteBuffer; + +import org.apache.hc.core5.annotation.Contract; +import org.apache.hc.core5.annotation.ThreadingBehavior; +import org.apache.hc.core5.util.Args; + +/** + * Simple {@link ByteBufferAllocator} that allocates a new buffer for + * every request and does not pool released buffers. + * + * @since 5.4 + */ +@Contract(threading = ThreadingBehavior.STATELESS) +public final class SimpleByteBufferAllocator implements ByteBufferAllocator { + + public static final SimpleByteBufferAllocator INSTANCE = new SimpleByteBufferAllocator(); + + private SimpleByteBufferAllocator() { + } + + @Override + public ByteBuffer allocate(final int capacity) { + Args.notNegative(capacity, "Buffer capacity"); + return ByteBuffer.allocate(capacity); + } + + @Override + public ByteBuffer allocateDirect(final int capacity) { + Args.notNegative(capacity, "Buffer capacity"); + return ByteBuffer.allocateDirect(capacity); + } + + @Override + public void release(final ByteBuffer buffer) { + // No-op: GC will reclaim the buffer. + } + +} diff --git a/httpcore5/src/test/java/org/apache/hc/core5/io/TestPooledByteBufferAllocator.java b/httpcore5/src/test/java/org/apache/hc/core5/io/TestPooledByteBufferAllocator.java new file mode 100644 index 0000000000..ff735e909f --- /dev/null +++ b/httpcore5/src/test/java/org/apache/hc/core5/io/TestPooledByteBufferAllocator.java @@ -0,0 +1,148 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.io; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.ByteBuffer; + +import org.junit.jupiter.api.Test; + +class TestPooledByteBufferAllocator { + + @Test + void testNonPooledExactCapacity() { + final PooledByteBufferAllocator allocator = new PooledByteBufferAllocator( + 64, 1024, 16, 8); + + final int requested = 2048; // > maxCapacity → non-pooled + + final ByteBuffer buf = allocator.allocate(requested); + assertNotNull(buf); + assertFalse(buf.isDirect()); + assertEquals(requested, buf.capacity()); + assertEquals(requested, buf.limit()); + + allocator.release(buf); // should be a no-op + } + + @Test + void testPooledRoundedCapacityAndLimit() { + final PooledByteBufferAllocator allocator = new PooledByteBufferAllocator( + 64, 1024, 16, 8); + + final int requested = 100; + + final ByteBuffer buf = allocator.allocate(requested); + assertNotNull(buf); + assertFalse(buf.isDirect()); + + // With min=64, max=1024, 100 should land in the 128 bucket. + assertEquals(128, buf.capacity()); + assertEquals(requested, buf.limit()); + assertEquals(0, buf.position()); + + allocator.release(buf); + } + + @Test + void testPooledReusesLocalCache() { + final PooledByteBufferAllocator allocator = new PooledByteBufferAllocator( + 64, 1024, 16, 8); + + final int requested = 128; + + final ByteBuffer buf1 = allocator.allocate(requested); + assertNotNull(buf1); + assertFalse(buf1.isDirect()); + assertEquals(requested, buf1.limit()); + + allocator.release(buf1); + + final ByteBuffer buf2 = allocator.allocate(requested); + assertSame(buf1, buf2); + assertEquals(0, buf2.position()); + assertEquals(requested, buf2.limit()); + } + + @Test + void testReleaseResetsLimitForNextUse() { + final PooledByteBufferAllocator allocator = new PooledByteBufferAllocator( + 64, 1024, 16, 8); + + final int requested = 200; + + final ByteBuffer buf1 = allocator.allocate(requested); + assertEquals(requested, buf1.limit()); + + // Mess with position/limit to ensure release() really resets them. + buf1.position(requested); + buf1.limit(requested); + allocator.release(buf1); + + final ByteBuffer buf2 = allocator.allocate(requested); + assertSame(buf1, buf2); + assertEquals(0, buf2.position()); + assertEquals(requested, buf2.limit()); + } + + @Test + void testDirectAndHeapArePooledSeparately() { + final PooledByteBufferAllocator allocator = new PooledByteBufferAllocator( + 64, 1024, 16, 8); + + final int requested = 128; + + final ByteBuffer direct1 = allocator.allocateDirect(requested); + assertTrue(direct1.isDirect()); + allocator.release(direct1); + + final ByteBuffer direct2 = allocator.allocateDirect(requested); + assertTrue(direct2.isDirect()); + assertSame(direct1, direct2); + + final ByteBuffer heap = allocator.allocate(requested); + assertFalse(heap.isDirect()); + assertNotSame(direct2, heap); + } + + @Test + void testToStringDoesNotThrow() { + final PooledByteBufferAllocator allocator = new PooledByteBufferAllocator( + 64, 1024, 16, 8); + + final String s = allocator.toString(); + assertNotNull(s); + assertTrue(s.contains("PooledByteBufferAllocator")); + } + +}