Skip to content

Commit a82a0b9

Browse files
committed
feat: optimize poll cache
1 parent 1f9f12c commit a82a0b9

File tree

4 files changed

+303
-21
lines changed

4 files changed

+303
-21
lines changed

connstate/poll_bsd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (p *kqueue) wait() error {
3535
var err error
3636
for {
3737
// timeout=0 must be set to avoid getting stuck in a blocking syscall,
38-
// which could cause a P to fail to be released in a timely manner.
38+
// which could occupy a P until runtime.sysmon thread handoff it.
3939
n, err = syscall.Kevent(p.fd, nil, events, timeout)
4040
if err != nil && err != syscall.EINTR {
4141
// exit gracefully

connstate/poll_cache.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package connstate
1616

1717
import (
1818
"sync"
19+
"sync/atomic"
1920
"unsafe"
2021
)
2122

@@ -38,6 +39,7 @@ type pollCache struct {
3839
// freelist store the freeable operator
3940
// to reduce GC pressure, we only store op index here
4041
freelist []int32
42+
freeack int32
4143
}
4244

4345
func (c *pollCache) alloc() *fdOperator {
@@ -67,17 +69,19 @@ func (c *pollCache) alloc() *fdOperator {
6769
// only poller could do the real free action
6870
func (c *pollCache) freeable(op *fdOperator) {
6971
c.lock.Lock()
72+
// reset all state
73+
if atomic.CompareAndSwapInt32(&c.freeack, 1, 0) {
74+
for _, idx := range c.freelist {
75+
op := c.cache[idx]
76+
op.link = c.first
77+
c.first = op
78+
}
79+
c.freelist = c.freelist[:0]
80+
}
7081
c.freelist = append(c.freelist, op.index)
7182
c.lock.Unlock()
7283
}
7384

7485
func (c *pollCache) free() {
75-
c.lock.Lock()
76-
for _, idx := range c.freelist {
77-
op := c.cache[idx]
78-
op.link = c.first
79-
c.first = op
80-
}
81-
c.freelist = c.freelist[:0]
82-
c.lock.Unlock()
86+
atomic.StoreInt32(&c.freeack, 1)
8387
}

connstate/poll_cache_test.go

Lines changed: 289 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,298 @@
1515
package connstate
1616

1717
import (
18+
"sync"
19+
"sync/atomic"
1820
"testing"
21+
"time"
22+
"unsafe"
1923

2024
"github.com/stretchr/testify/assert"
25+
"github.com/stretchr/testify/require"
2126
)
2227

23-
func TestPollCache(t *testing.T) {
24-
pollcache.free()
25-
fd := pollcache.alloc()
26-
pollcache.freeable(fd)
27-
assert.Equal(t, 1, len(pollcache.freelist))
28-
fd1 := pollcache.alloc()
29-
assert.NotEqual(t, fd, fd1)
30-
pollcache.free()
31-
assert.Equal(t, 0, len(pollcache.freelist))
32-
fd2 := pollcache.alloc()
33-
assert.Equal(t, fd, fd2)
28+
func TestPollCacheAlloc(t *testing.T) {
29+
cache := &pollCache{}
30+
31+
// Test initial allocation
32+
op1 := cache.alloc()
33+
require.NotNil(t, op1)
34+
assert.GreaterOrEqual(t, op1.index, int32(0))
35+
assert.Equal(t, int(0), op1.fd)
36+
37+
// Test multiple allocations
38+
op2 := cache.alloc()
39+
require.NotNil(t, op2)
40+
assert.Equal(t, int(0), op2.fd)
41+
42+
// Test that allocated operators are different
43+
assert.NotEqual(t, op1, op2)
44+
45+
// Verify they both have valid indices
46+
assert.GreaterOrEqual(t, op1.index, int32(0))
47+
assert.GreaterOrEqual(t, op2.index, int32(0))
48+
}
49+
50+
func TestPollCacheAllocReuse(t *testing.T) {
51+
cache := &pollCache{}
52+
53+
// Allocate all operators
54+
var ops []*fdOperator
55+
for i := 0; i < 10; i++ {
56+
op := cache.alloc()
57+
require.NotNil(t, op)
58+
ops = append(ops, op)
59+
}
60+
61+
// Mark some as freeable
62+
for i := 0; i < 5; i++ {
63+
cache.freeable(ops[i])
64+
}
65+
66+
// Set freeack to trigger cleanup
67+
cache.free()
68+
69+
// Allocate again, should reuse freed operators
70+
reusedOp := cache.alloc()
71+
require.NotNil(t, reusedOp)
72+
73+
// The reused operator should have a high index (from cache)
74+
assert.GreaterOrEqual(t, reusedOp.index, int32(10))
75+
}
76+
77+
func TestPollCacheFreeable(t *testing.T) {
78+
cache := &pollCache{}
79+
80+
// Allocate operators
81+
op1 := cache.alloc()
82+
op2 := cache.alloc()
83+
84+
require.NotNil(t, op1)
85+
require.NotNil(t, op2)
86+
87+
// Mark operators as freeable
88+
cache.freeable(op1)
89+
cache.freeable(op2)
90+
91+
// Verify they are in freelist
92+
cache.lock.Lock()
93+
assert.Len(t, cache.freelist, 2)
94+
assert.Contains(t, cache.freelist, op1.index)
95+
assert.Contains(t, cache.freelist, op2.index)
96+
cache.lock.Unlock()
97+
}
98+
99+
func TestPollCacheFree(t *testing.T) {
100+
cache := &pollCache{}
101+
102+
// Allocate and mark operators as freeable
103+
var ops []*fdOperator
104+
for i := 0; i < 5; i++ {
105+
op := cache.alloc()
106+
require.NotNil(t, op)
107+
ops = append(ops, op)
108+
cache.freeable(op)
109+
}
110+
111+
// Verify they are in freelist
112+
cache.lock.Lock()
113+
freelistLen := len(cache.freelist)
114+
cache.lock.Unlock()
115+
assert.Equal(t, 5, freelistLen)
116+
117+
// Set freeack flag
118+
cache.free()
119+
120+
// Verify freeack is set
121+
assert.Equal(t, int32(1), atomic.LoadInt32(&cache.freeack))
122+
123+
// Call freeable again to trigger cleanup
124+
cache.freeable(ops[0])
125+
126+
// Verify freelist was cleared (should be 1 for the newly added operator)
127+
cache.lock.Lock()
128+
finalFreelistLen := len(cache.freelist)
129+
cache.lock.Unlock()
130+
assert.Equal(t, 1, finalFreelistLen) // Only the newly added operator
131+
}
132+
133+
func TestPollCacheConcurrent(t *testing.T) {
134+
cache := &pollCache{}
135+
136+
const numGoroutines = 10
137+
const numAllocations = 100
138+
139+
var wg sync.WaitGroup
140+
wg.Add(numGoroutines)
141+
142+
// Concurrent allocations
143+
for i := 0; i < numGoroutines; i++ {
144+
go func() {
145+
defer wg.Done()
146+
147+
var ops []*fdOperator
148+
for j := 0; j < numAllocations; j++ {
149+
op := cache.alloc()
150+
if op != nil {
151+
ops = append(ops, op)
152+
}
153+
154+
// Randomly mark some as freeable
155+
if j%3 == 0 && len(ops) > 0 {
156+
freeableOp := ops[0]
157+
ops = ops[1:]
158+
cache.freeable(freeableOp)
159+
}
160+
}
161+
162+
// Mark remaining as freeable
163+
for _, op := range ops {
164+
cache.freeable(op)
165+
}
166+
}()
167+
}
168+
169+
wg.Wait()
170+
171+
// Verify cache is still functional
172+
finalOp := cache.alloc()
173+
require.NotNil(t, finalOp)
174+
}
175+
176+
func TestFDOperatorFields(t *testing.T) {
177+
op := &fdOperator{
178+
index: 42,
179+
fd: 123,
180+
}
181+
182+
assert.Equal(t, int32(42), op.index)
183+
assert.Equal(t, int(123), op.fd)
184+
assert.Nil(t, op.link)
185+
assert.Nil(t, op.conn)
186+
}
187+
188+
func TestFDOperatorSize(t *testing.T) {
189+
// Test that fdOperator has consistent size
190+
size1 := unsafe.Sizeof(fdOperator{})
191+
size2 := unsafe.Sizeof(fdOperator{})
192+
assert.Equal(t, size1, size2)
193+
194+
// Should have reasonable size (not too large, not too small)
195+
assert.Greater(t, size1, uintptr(16)) // At least contains fields
196+
assert.Less(t, size1, uintptr(256)) // Not excessively large
197+
}
198+
199+
func TestPollCacheBlockAllocation(t *testing.T) {
200+
cache := &pollCache{}
201+
202+
// Calculate expected number of operators per block
203+
pdSize := unsafe.Sizeof(fdOperator{})
204+
expectedPerBlock := pollBlockSize / pdSize
205+
if expectedPerBlock == 0 {
206+
expectedPerBlock = 1
207+
}
208+
209+
// Allocate more than one block worth
210+
var ops []*fdOperator
211+
allocations := int(expectedPerBlock) + 10
212+
213+
for i := 0; i < allocations; i++ {
214+
op := cache.alloc()
215+
require.NotNil(t, op, "Allocation %d should succeed", i)
216+
ops = append(ops, op)
217+
}
218+
219+
// Verify all have unique indices
220+
indices := make(map[int32]struct{})
221+
for _, op := range ops {
222+
_, exists := indices[op.index]
223+
assert.False(t, exists, "Index %d should be unique", op.index)
224+
indices[op.index] = struct{}{}
225+
}
226+
}
227+
228+
func TestPollCacheFreeAckRace(t *testing.T) {
229+
cache := &pollCache{}
230+
231+
const numOperations = 1000
232+
var freeCount int64
233+
234+
// Start goroutine that marks operators as freeable
235+
go func() {
236+
for i := 0; i < numOperations; i++ {
237+
op := cache.alloc()
238+
if op != nil {
239+
cache.freeable(op)
240+
atomic.AddInt64(&freeCount, 1)
241+
}
242+
time.Sleep(time.Microsecond) // Small delay to increase race chance
243+
}
244+
}()
245+
246+
// Start goroutine that calls free() periodically
247+
go func() {
248+
for i := 0; i < numOperations/10; i++ {
249+
cache.free()
250+
time.Sleep(10 * time.Microsecond)
251+
}
252+
}()
253+
254+
time.Sleep(100 * time.Millisecond) // Let goroutines work
255+
256+
// Verify no panics or corruption
257+
finalOp := cache.alloc()
258+
require.NotNil(t, finalOp)
259+
260+
// Verify some operations completed
261+
assert.Greater(t, atomic.LoadInt64(&freeCount), int64(0))
262+
}
263+
264+
func BenchmarkPollCacheAlloc(b *testing.B) {
265+
cache := &pollCache{}
266+
267+
b.ResetTimer()
268+
b.RunParallel(func(pb *testing.PB) {
269+
for pb.Next() {
270+
op := cache.alloc()
271+
if op != nil {
272+
// Simulate some usage
273+
op.fd = 42
274+
op.index = 1
275+
}
276+
}
277+
})
278+
}
279+
280+
func BenchmarkPollCacheFreeable(b *testing.B) {
281+
cache := &pollCache{}
282+
283+
// Pre-allocate some operators
284+
ops := make([]*fdOperator, 1000)
285+
for i := range ops {
286+
ops[i] = cache.alloc()
287+
}
288+
289+
b.ResetTimer()
290+
b.RunParallel(func(pb *testing.PB) {
291+
i := 0
292+
for pb.Next() {
293+
cache.freeable(ops[i%len(ops)])
294+
i++
295+
}
296+
})
297+
}
298+
299+
func BenchmarkPollCacheAllocFreeCycle(b *testing.B) {
300+
cache := &pollCache{}
301+
302+
b.ResetTimer()
303+
for i := 0; i < b.N; i++ {
304+
op := cache.alloc()
305+
if op != nil {
306+
cache.freeable(op)
307+
if i%100 == 0 {
308+
cache.free() // Trigger cleanup occasionally
309+
}
310+
}
311+
}
34312
}

connstate/poll_linux.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (p *epoller) wait() error {
3434
var err error
3535
for {
3636
// timeout=0 must be set to avoid getting stuck in a blocking syscall,
37-
// which could cause a P to fail to be released in a timely manner.
37+
// which could occupy a P until runtime.sysmon thread handoff it.
3838
n, err = syscall.EpollWait(p.epfd, events, 0)
3939
if err != nil && err != syscall.EINTR {
4040
return err

0 commit comments

Comments
 (0)