Skip to content

Commit ae5434c

Browse files
cyningsunndyakovhtemelski-redis
authored
feat(pool): Improve success rate of new connections (#3518)
* async create conn * update default values and testcase * fix comments * fix data race * remove context.WithoutCancel, which is a function introduced in Go 1.21 * fix TestDialerRetryConfiguration/DefaultDialerRetries, because tryDial are likely done in async flow * change to share failed to delivery connection to other waiting * remove chinese comment * fix: optimize WantConnQueue benchmarks to prevent memory exhaustion - Fix BenchmarkWantConnQueue_Dequeue timeout issue by limiting pre-population - Use object pooling in BenchmarkWantConnQueue_Enqueue to reduce allocations - Optimize BenchmarkWantConnQueue_EnqueueDequeue with reusable wantConn pool - Prevent GitHub Actions benchmark failures due to excessive memory usage Before: BenchmarkWantConnQueue_Dequeue ran for 11+ minutes and was killed After: All benchmarks complete in ~8 seconds with consistent performance * format * fix turn leaks --------- Co-authored-by: Nedyalko Dyakov <[email protected]> Co-authored-by: Hristo Temelski <[email protected]>
1 parent 7f48276 commit ae5434c

File tree

11 files changed

+1361
-97
lines changed

11 files changed

+1361
-97
lines changed

async_handoff_integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5353
Dialer: func(ctx context.Context) (net.Conn, error) {
5454
return &mockNetConn{addr: "original:6379"}, nil
5555
},
56-
PoolSize: int32(5),
57-
PoolTimeout: time.Second,
56+
PoolSize: int32(5),
57+
MaxConcurrentDials: 5,
58+
PoolTimeout: time.Second,
5859
})
5960

6061
// Add the hook to the pool after creation
@@ -153,8 +154,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
153154
return &mockNetConn{addr: "original:6379"}, nil
154155
},
155156

156-
PoolSize: int32(10),
157-
PoolTimeout: time.Second,
157+
PoolSize: int32(10),
158+
MaxConcurrentDials: 10,
159+
PoolTimeout: time.Second,
158160
})
159161
defer testPool.Close()
160162

@@ -225,8 +227,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
225227
return &mockNetConn{addr: "original:6379"}, nil
226228
},
227229

228-
PoolSize: int32(3),
229-
PoolTimeout: time.Second,
230+
PoolSize: int32(3),
231+
MaxConcurrentDials: 3,
232+
PoolTimeout: time.Second,
230233
})
231234
defer testPool.Close()
232235

@@ -288,8 +291,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
288291
return &mockNetConn{addr: "original:6379"}, nil
289292
},
290293

291-
PoolSize: int32(2),
292-
PoolTimeout: time.Second,
294+
PoolSize: int32(2),
295+
MaxConcurrentDials: 2,
296+
PoolTimeout: time.Second,
293297
})
294298
defer testPool.Close()
295299

internal/pool/bench_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ func BenchmarkPoolGetPut(b *testing.B) {
3131
for _, bm := range benchmarks {
3232
b.Run(bm.String(), func(b *testing.B) {
3333
connPool := pool.NewConnPool(&pool.Options{
34-
Dialer: dummyDialer,
35-
PoolSize: int32(bm.poolSize),
36-
PoolTimeout: time.Second,
37-
DialTimeout: 1 * time.Second,
38-
ConnMaxIdleTime: time.Hour,
34+
Dialer: dummyDialer,
35+
PoolSize: int32(bm.poolSize),
36+
MaxConcurrentDials: bm.poolSize,
37+
PoolTimeout: time.Second,
38+
DialTimeout: 1 * time.Second,
39+
ConnMaxIdleTime: time.Hour,
3940
})
4041

4142
b.ResetTimer()
@@ -75,11 +76,12 @@ func BenchmarkPoolGetRemove(b *testing.B) {
7576
for _, bm := range benchmarks {
7677
b.Run(bm.String(), func(b *testing.B) {
7778
connPool := pool.NewConnPool(&pool.Options{
78-
Dialer: dummyDialer,
79-
PoolSize: int32(bm.poolSize),
80-
PoolTimeout: time.Second,
81-
DialTimeout: 1 * time.Second,
82-
ConnMaxIdleTime: time.Hour,
79+
Dialer: dummyDialer,
80+
PoolSize: int32(bm.poolSize),
81+
MaxConcurrentDials: bm.poolSize,
82+
PoolTimeout: time.Second,
83+
DialTimeout: 1 * time.Second,
84+
ConnMaxIdleTime: time.Hour,
8385
})
8486

8587
b.ResetTimer()

internal/pool/buffer_size_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ var _ = Describe("Buffer Size Configuration", func() {
2424

2525
It("should use default buffer sizes when not specified", func() {
2626
connPool = pool.NewConnPool(&pool.Options{
27-
Dialer: dummyDialer,
28-
PoolSize: int32(1),
29-
PoolTimeout: 1000,
27+
Dialer: dummyDialer,
28+
PoolSize: int32(1),
29+
MaxConcurrentDials: 1,
30+
PoolTimeout: 1000,
3031
})
3132

3233
cn, err := connPool.NewConn(ctx)
@@ -46,11 +47,12 @@ var _ = Describe("Buffer Size Configuration", func() {
4647
customWriteSize := 64 * 1024 // 64KB
4748

4849
connPool = pool.NewConnPool(&pool.Options{
49-
Dialer: dummyDialer,
50-
PoolSize: int32(1),
51-
PoolTimeout: 1000,
52-
ReadBufferSize: customReadSize,
53-
WriteBufferSize: customWriteSize,
50+
Dialer: dummyDialer,
51+
PoolSize: int32(1),
52+
MaxConcurrentDials: 1,
53+
PoolTimeout: 1000,
54+
ReadBufferSize: customReadSize,
55+
WriteBufferSize: customWriteSize,
5456
})
5557

5658
cn, err := connPool.NewConn(ctx)
@@ -67,11 +69,12 @@ var _ = Describe("Buffer Size Configuration", func() {
6769

6870
It("should handle zero buffer sizes by using defaults", func() {
6971
connPool = pool.NewConnPool(&pool.Options{
70-
Dialer: dummyDialer,
71-
PoolSize: int32(1),
72-
PoolTimeout: 1000,
73-
ReadBufferSize: 0, // Should use default
74-
WriteBufferSize: 0, // Should use default
72+
Dialer: dummyDialer,
73+
PoolSize: int32(1),
74+
MaxConcurrentDials: 1,
75+
PoolTimeout: 1000,
76+
ReadBufferSize: 0, // Should use default
77+
WriteBufferSize: 0, // Should use default
7578
})
7679

7780
cn, err := connPool.NewConn(ctx)
@@ -103,9 +106,10 @@ var _ = Describe("Buffer Size Configuration", func() {
103106
// Test the scenario where someone creates a pool directly (like in tests)
104107
// without setting ReadBufferSize and WriteBufferSize
105108
connPool = pool.NewConnPool(&pool.Options{
106-
Dialer: dummyDialer,
107-
PoolSize: int32(1),
108-
PoolTimeout: 1000,
109+
Dialer: dummyDialer,
110+
PoolSize: int32(1),
111+
MaxConcurrentDials: 1,
112+
PoolTimeout: 1000,
109113
// ReadBufferSize and WriteBufferSize are not set (will be 0)
110114
})
111115

internal/pool/hooks_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ func TestPoolWithHooks(t *testing.T) {
191191
Dialer: func(ctx context.Context) (net.Conn, error) {
192192
return &net.TCPConn{}, nil // Mock connection
193193
},
194-
PoolSize: 1,
195-
DialTimeout: time.Second,
194+
PoolSize: 1,
195+
MaxConcurrentDials: 1,
196+
DialTimeout: time.Second,
196197
}
197198

198199
pool := NewConnPool(opt)

internal/pool/pool.go

Lines changed: 104 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ type Options struct {
9898

9999
PoolFIFO bool
100100
PoolSize int32
101+
MaxConcurrentDials int
101102
DialTimeout time.Duration
102103
PoolTimeout time.Duration
103104
MinIdleConns int32
@@ -126,7 +127,9 @@ type ConnPool struct {
126127
dialErrorsNum uint32 // atomic
127128
lastDialError atomic.Value
128129

129-
queue chan struct{}
130+
queue chan struct{}
131+
dialsInProgress chan struct{}
132+
dialsQueue *wantConnQueue
130133

131134
connsMu sync.Mutex
132135
conns map[uint64]*Conn
@@ -152,9 +155,11 @@ func NewConnPool(opt *Options) *ConnPool {
152155
p := &ConnPool{
153156
cfg: opt,
154157

155-
queue: make(chan struct{}, opt.PoolSize),
156-
conns: make(map[uint64]*Conn),
157-
idleConns: make([]*Conn, 0, opt.PoolSize),
158+
queue: make(chan struct{}, opt.PoolSize),
159+
conns: make(map[uint64]*Conn),
160+
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
161+
dialsQueue: newWantConnQueue(),
162+
idleConns: make([]*Conn, 0, opt.PoolSize),
158163
}
159164

160165
// Only create MinIdleConns if explicitly requested (> 0)
@@ -233,6 +238,7 @@ func (p *ConnPool) checkMinIdleConns() {
233238
return
234239
}
235240
}
241+
236242
}
237243

238244
func (p *ConnPool) addIdleConn() error {
@@ -491,9 +497,8 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
491497

492498
atomic.AddUint32(&p.stats.Misses, 1)
493499

494-
newcn, err := p.newConn(ctx, true)
500+
newcn, err := p.queuedNewConn(ctx)
495501
if err != nil {
496-
p.freeTurn()
497502
return nil, err
498503
}
499504

@@ -512,6 +517,99 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
512517
return newcn, nil
513518
}
514519

520+
func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
521+
select {
522+
case p.dialsInProgress <- struct{}{}:
523+
// Got permission, proceed to create connection
524+
case <-ctx.Done():
525+
p.freeTurn()
526+
return nil, ctx.Err()
527+
}
528+
529+
dialCtx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
530+
531+
w := &wantConn{
532+
ctx: dialCtx,
533+
cancelCtx: cancel,
534+
result: make(chan wantConnResult, 1),
535+
}
536+
var err error
537+
defer func() {
538+
if err != nil {
539+
if cn := w.cancel(); cn != nil {
540+
p.putIdleConn(ctx, cn)
541+
p.freeTurn()
542+
}
543+
}
544+
}()
545+
546+
p.dialsQueue.enqueue(w)
547+
548+
go func(w *wantConn) {
549+
var freeTurnCalled bool
550+
defer func() {
551+
if err := recover(); err != nil {
552+
if !freeTurnCalled {
553+
p.freeTurn()
554+
}
555+
internal.Logger.Printf(context.Background(), "queuedNewConn panic: %+v", err)
556+
}
557+
}()
558+
559+
defer w.cancelCtx()
560+
defer func() { <-p.dialsInProgress }() // Release connection creation permission
561+
562+
dialCtx := w.getCtxForDial()
563+
cn, cnErr := p.newConn(dialCtx, true)
564+
delivered := w.tryDeliver(cn, cnErr)
565+
if cnErr == nil && delivered {
566+
return
567+
} else if cnErr == nil && !delivered {
568+
p.putIdleConn(dialCtx, cn)
569+
p.freeTurn()
570+
freeTurnCalled = true
571+
} else {
572+
p.freeTurn()
573+
freeTurnCalled = true
574+
}
575+
}(w)
576+
577+
select {
578+
case <-ctx.Done():
579+
err = ctx.Err()
580+
return nil, err
581+
case result := <-w.result:
582+
err = result.err
583+
return result.cn, err
584+
}
585+
}
586+
587+
func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
588+
for {
589+
w, ok := p.dialsQueue.dequeue()
590+
if !ok {
591+
break
592+
}
593+
if w.tryDeliver(cn, nil) {
594+
return
595+
}
596+
}
597+
598+
cn.SetUsable(true)
599+
600+
p.connsMu.Lock()
601+
defer p.connsMu.Unlock()
602+
603+
if p.closed() {
604+
_ = cn.Close()
605+
return
606+
}
607+
608+
// poolSize is increased in newConn
609+
p.idleConns = append(p.idleConns, cn)
610+
p.idleConnsLen.Add(1)
611+
}
612+
515613
func (p *ConnPool) waitTurn(ctx context.Context) error {
516614
select {
517615
case <-ctx.Done():

0 commit comments

Comments
 (0)