Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 140 additions & 8 deletions mem/buffer_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package mem

import (
"math/bits"
"slices"
"sort"
"sync"

Expand All @@ -38,20 +40,21 @@ type BufferPool interface {
Put(*[]byte)
}

const goPageSize = 4 << 10 // 4KiB. N.B. this must be a power of 2.
const goPageSizeExponent = 12
const goPageSize = 1 << goPageSizeExponent // 4KiB. N.B. this must be a power of 2.

var defaultBufferPoolSizes = []int{
256,
goPageSize,
16 << 10, // 16KB (max HTTP/2 frame size used by gRPC)
32 << 10, // 32KB (default buffer size for io.Copy)
1 << 20, // 1MB
var defaultBufferPoolSizeExponents = []uint8{
8,
goPageSizeExponent,
14, // 16KB (max HTTP/2 frame size used by gRPC)
15, // 32KB (default buffer size for io.Copy)
20, // 1MB
}

var defaultBufferPool BufferPool

func init() {
defaultBufferPool = NewTieredBufferPool(defaultBufferPoolSizes...)
defaultBufferPool = NewBinaryTieredBufferPool(defaultBufferPoolSizeExponents...)

internal.SetDefaultBufferPoolForTesting = func(pool BufferPool) {
defaultBufferPool = pool
Expand Down Expand Up @@ -109,6 +112,135 @@ func (p *tieredBufferPool) getPool(size int) BufferPool {
return p.sizedPools[poolIdx]
}

type binaryTieredBufferPool struct {
// exponentToNextLargestPoolMap maps a power-of-two exponent (e.g., 12 for
// 4KB) to the index of the next largest sizedBufferPool. This is used by
// Get() to find the smallest pool that can satisfy a request for a given
// size.
exponentToNextLargestPoolMap []int
// exponentToPreviousLargestPoolMap maps a power-of-two exponent to the
// index of the previous largest sizedBufferPool. This is used by Put()
// to return a buffer to the most appropriate pool based on its capacity.
exponentToPreviousLargestPoolMap []int
sizedPools []*sizedBufferPool
fallbackPool simpleBufferPool
maxPoolCap int // Optimization: Cache max capacity
}

// NewBinaryTieredBufferPool returns a BufferPool implementation that uses
// multiple underlying pools of the given pool sizes. The pool sizes must be
// powers of 2. This enables O(1) lookup when getting or putting a buffer.
//
// Note that the argument passed to this functions are the powers of 2 of the
// capacity of the buffers in the pool, not the capacities of the buffers
// themselves. For example, if you wanted a pool that had buffers with a capacity
// of 16kb, you would pass 14 as the argument to this function.
func NewBinaryTieredBufferPool(powerOfTwoExponents ...uint8) BufferPool {
slices.Sort(powerOfTwoExponents)

// Determine the maximum exponent we need to support.
// bits.Len64(math.MaxUint64) is 63.
const maxExponent = 63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assume a 64-bit machine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works for machines up to 64 bits, i.e. both 32 and 64 bits.

indexOfNextLargestBit := slices.Repeat([]int{-1}, maxExponent+1)
indexOfPreviousLargestBit := slices.Repeat([]int{-1}, maxExponent+1)

maxCap := 0
pools := make([]*sizedBufferPool, 0, len(powerOfTwoExponents))

for i, exp := range powerOfTwoExponents {
// Allocating slices of size > 2^maxExponent isn't possible on 64-bit
// machines.
if exp > maxExponent {
continue
}
capSize := 1 << exp
pools = append(pools, newSizedBufferPool(capSize))
if capSize > maxCap {
maxCap = capSize
}

// Map the exact power of 2 to this pool index.
indexOfNextLargestBit[exp] = i
indexOfPreviousLargestBit[exp] = i
}

// Fill gaps for Get() (Next Largest)
// We iterate backwards. If current is empty, take the value from the right (larger).
for i := maxExponent - 1; i >= 0; i-- {
if indexOfNextLargestBit[i] == -1 {
indexOfNextLargestBit[i] = indexOfNextLargestBit[i+1]
}
}

// Fill gaps for Put() (Previous Largest)
// We iterate forwards. If current is empty, take the value from the left (smaller).
for i := 1; i <= maxExponent; i++ {
if indexOfPreviousLargestBit[i] == -1 {
indexOfPreviousLargestBit[i] = indexOfPreviousLargestBit[i-1]
}
}

return &binaryTieredBufferPool{
exponentToNextLargestPoolMap: indexOfNextLargestBit,
exponentToPreviousLargestPoolMap: indexOfPreviousLargestBit,
sizedPools: pools,
maxPoolCap: maxCap,
}
}

func (b *binaryTieredBufferPool) Get(size int) *[]byte {
return b.poolForGet(size).Get(size)
}

func (b *binaryTieredBufferPool) poolForGet(size int) BufferPool {
if size == 0 || size > b.maxPoolCap {
return &b.fallbackPool
}

// Calculate the exponent of the smallest power of 2 >= size.
// We subtract 1 from size to handle exact powers of 2 correctly.
//
// Examples:
// size=16 (0b10000) -> size-1=15 (0b01111) -> bits.Len=4 -> Pool for 2^4
// size=17 (0b10001) -> size-1=16 (0b10000) -> bits.Len=5 -> Pool for 2^5
querySize := uint(size - 1)
poolIdx := b.exponentToNextLargestPoolMap[bits.Len(querySize)]

return b.sizedPools[poolIdx]
}

func (b *binaryTieredBufferPool) Put(buf *[]byte) {
b.poolForPut(cap(*buf)).Put(buf)
}

func (b *binaryTieredBufferPool) poolForPut(bCap int) BufferPool {
if bCap == 0 {
return NopBufferPool{}
}
if bCap > b.maxPoolCap {
return &b.fallbackPool
}
// Find the pool with the largest capacity <= bCap.
//
// We calculate the exponent of the largest power of 2 <= bCap.
// bits.Len(x) returns the minimum number of bits required to represent x;
// i.e. the number of bits up to and including the most significant bit.
// Subtracting 1 gives the 0-based index of the most significant bit,
// which is the exponent of the largest power of 2 <= bCap.
//
// Examples:
// cap=16 (0b10000) -> Len=5 -> 5-1=4 -> 2^4
// cap=15 (0b01111) -> Len=4 -> 4-1=3 -> 2^3
largestPowerOfTwo := bits.Len(uint(bCap)) - 1
poolIdx := b.exponentToPreviousLargestPoolMap[largestPowerOfTwo]
// The buffer is smaller than the smallest power of 2, discard it.
if poolIdx == -1 {
// Buffer is smaller than our smallest pool bucket.
return NopBufferPool{}
}
return b.sizedPools[poolIdx]
}

// sizedBufferPool is a BufferPool implementation that is optimized for specific
// buffer sizes. For example, HTTP/2 frames within gRPC have a default max size
// of 16kb and a sizedBufferPool can be configured to only return buffers with a
Expand Down
38 changes: 38 additions & 0 deletions mem/buffer_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package mem_test

import (
"bytes"
"fmt"
"testing"
"unsafe"

Expand Down Expand Up @@ -105,3 +106,40 @@ func (s) TestBufferPoolIgnoresShortBuffers(t *testing.T) {
// pool, it could cause a panic.
pool.Get(10)
}

func TestBinaryBufferPool(t *testing.T) {
poolSizes := []uint8{0, 2, 3, 4}

testCases := []struct {
requestSize int
wantCapacity int
}{
{requestSize: 0, wantCapacity: 0},
{requestSize: 1, wantCapacity: 1},
{requestSize: 2, wantCapacity: 4},
{requestSize: 3, wantCapacity: 4},
{requestSize: 4, wantCapacity: 4},
{requestSize: 5, wantCapacity: 8},
{requestSize: 6, wantCapacity: 8},
{requestSize: 7, wantCapacity: 8},
{requestSize: 8, wantCapacity: 8},
{requestSize: 9, wantCapacity: 16},
{requestSize: 15, wantCapacity: 16},
{requestSize: 16, wantCapacity: 16},
{requestSize: 17, wantCapacity: 4096}, // fallback pool returns sizes in multiples of 4096.
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("requestSize=%d", tc.requestSize), func(t *testing.T) {
pool := mem.NewBinaryTieredBufferPool(poolSizes...)
buf := pool.Get(tc.requestSize)
if cap(*buf) != tc.wantCapacity {
t.Errorf("Get(%d) returned buffer with capacity: %d, want %d", tc.requestSize, cap(*buf), tc.wantCapacity)
}
if len(*buf) != tc.requestSize {
t.Errorf("Get(%d) returned buffer with length: %d, want %d", tc.requestSize, len(*buf), tc.requestSize)
}
pool.Put(buf)
})
}
}