Skip to content

Commit ef3e06f

Browse files
committed
Merge remote-tracking branch 'origin/master' into ndyakov/state-machine-conn
2 parents d207749 + ae5434c commit ef3e06f

File tree

11 files changed

+1358
-101
lines changed

11 files changed

+1358
-101
lines changed

async_handoff_integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5757
Dialer: func(ctx context.Context) (net.Conn, error) {
5858
return &mockNetConn{addr: "original:6379"}, nil
5959
},
60-
PoolSize: int32(5),
61-
PoolTimeout: time.Second,
60+
PoolSize: int32(5),
61+
MaxConcurrentDials: 5,
62+
PoolTimeout: time.Second,
6263
})
6364

6465
// Add the hook to the pool after creation
@@ -190,8 +191,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
190191
return &mockNetConn{addr: "original:6379"}, nil
191192
},
192193

193-
PoolSize: int32(10),
194-
PoolTimeout: time.Second,
194+
PoolSize: int32(10),
195+
MaxConcurrentDials: 10,
196+
PoolTimeout: time.Second,
195197
})
196198
defer testPool.Close()
197199

@@ -262,8 +264,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
262264
return &mockNetConn{addr: "original:6379"}, nil
263265
},
264266

265-
PoolSize: int32(3),
266-
PoolTimeout: time.Second,
267+
PoolSize: int32(3),
268+
MaxConcurrentDials: 3,
269+
PoolTimeout: time.Second,
267270
})
268271
defer testPool.Close()
269272

@@ -333,8 +336,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
333336
return &mockNetConn{addr: "original:6379"}, nil
334337
},
335338

336-
PoolSize: int32(2),
337-
PoolTimeout: time.Second,
339+
PoolSize: int32(2),
340+
MaxConcurrentDials: 2,
341+
PoolTimeout: time.Second,
338342
})
339343
defer testPool.Close()
340344

internal/pool/bench_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ func BenchmarkPoolGetPut(b *testing.B) {
3131
for _, bm := range benchmarks {
3232
b.Run(bm.String(), func(b *testing.B) {
3333
connPool := pool.NewConnPool(&pool.Options{
34-
Dialer: dummyDialer,
35-
PoolSize: int32(bm.poolSize),
36-
PoolTimeout: time.Second,
37-
DialTimeout: 1 * time.Second,
38-
ConnMaxIdleTime: time.Hour,
34+
Dialer: dummyDialer,
35+
PoolSize: int32(bm.poolSize),
36+
MaxConcurrentDials: bm.poolSize,
37+
PoolTimeout: time.Second,
38+
DialTimeout: 1 * time.Second,
39+
ConnMaxIdleTime: time.Hour,
3940
})
4041

4142
b.ResetTimer()
@@ -75,11 +76,12 @@ func BenchmarkPoolGetRemove(b *testing.B) {
7576
for _, bm := range benchmarks {
7677
b.Run(bm.String(), func(b *testing.B) {
7778
connPool := pool.NewConnPool(&pool.Options{
78-
Dialer: dummyDialer,
79-
PoolSize: int32(bm.poolSize),
80-
PoolTimeout: time.Second,
81-
DialTimeout: 1 * time.Second,
82-
ConnMaxIdleTime: time.Hour,
79+
Dialer: dummyDialer,
80+
PoolSize: int32(bm.poolSize),
81+
MaxConcurrentDials: bm.poolSize,
82+
PoolTimeout: time.Second,
83+
DialTimeout: 1 * time.Second,
84+
ConnMaxIdleTime: time.Hour,
8385
})
8486

8587
b.ResetTimer()

internal/pool/buffer_size_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ var _ = Describe("Buffer Size Configuration", func() {
2525

2626
It("should use default buffer sizes when not specified", func() {
2727
connPool = pool.NewConnPool(&pool.Options{
28-
Dialer: dummyDialer,
29-
PoolSize: int32(1),
30-
PoolTimeout: 1000,
28+
Dialer: dummyDialer,
29+
PoolSize: int32(1),
30+
MaxConcurrentDials: 1,
31+
PoolTimeout: 1000,
3132
})
3233

3334
cn, err := connPool.NewConn(ctx)
@@ -47,11 +48,12 @@ var _ = Describe("Buffer Size Configuration", func() {
4748
customWriteSize := 64 * 1024 // 64KB
4849

4950
connPool = pool.NewConnPool(&pool.Options{
50-
Dialer: dummyDialer,
51-
PoolSize: int32(1),
52-
PoolTimeout: 1000,
53-
ReadBufferSize: customReadSize,
54-
WriteBufferSize: customWriteSize,
51+
Dialer: dummyDialer,
52+
PoolSize: int32(1),
53+
MaxConcurrentDials: 1,
54+
PoolTimeout: 1000,
55+
ReadBufferSize: customReadSize,
56+
WriteBufferSize: customWriteSize,
5557
})
5658

5759
cn, err := connPool.NewConn(ctx)
@@ -68,11 +70,12 @@ var _ = Describe("Buffer Size Configuration", func() {
6870

6971
It("should handle zero buffer sizes by using defaults", func() {
7072
connPool = pool.NewConnPool(&pool.Options{
71-
Dialer: dummyDialer,
72-
PoolSize: int32(1),
73-
PoolTimeout: 1000,
74-
ReadBufferSize: 0, // Should use default
75-
WriteBufferSize: 0, // Should use default
73+
Dialer: dummyDialer,
74+
PoolSize: int32(1),
75+
MaxConcurrentDials: 1,
76+
PoolTimeout: 1000,
77+
ReadBufferSize: 0, // Should use default
78+
WriteBufferSize: 0, // Should use default
7679
})
7780

7881
cn, err := connPool.NewConn(ctx)
@@ -104,9 +107,10 @@ var _ = Describe("Buffer Size Configuration", func() {
104107
// Test the scenario where someone creates a pool directly (like in tests)
105108
// without setting ReadBufferSize and WriteBufferSize
106109
connPool = pool.NewConnPool(&pool.Options{
107-
Dialer: dummyDialer,
108-
PoolSize: int32(1),
109-
PoolTimeout: 1000,
110+
Dialer: dummyDialer,
111+
PoolSize: int32(1),
112+
MaxConcurrentDials: 1,
113+
PoolTimeout: 1000,
110114
// ReadBufferSize and WriteBufferSize are not set (will be 0)
111115
})
112116

internal/pool/hooks_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ func TestPoolWithHooks(t *testing.T) {
191191
Dialer: func(ctx context.Context) (net.Conn, error) {
192192
return &net.TCPConn{}, nil // Mock connection
193193
},
194-
PoolSize: 1,
195-
DialTimeout: time.Second,
194+
PoolSize: 1,
195+
MaxConcurrentDials: 1,
196+
DialTimeout: time.Second,
196197
}
197198

198199
pool := NewConnPool(opt)

internal/pool/pool.go

Lines changed: 101 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,6 @@ type Pooler interface {
7575
Put(context.Context, *Conn)
7676
Remove(context.Context, *Conn, error)
7777

78-
// RemoveWithoutTurn removes a connection from the pool without freeing a turn.
79-
// This should be used when removing a connection from a context that didn't acquire
80-
// a turn via Get() (e.g., background workers, cleanup tasks).
81-
// For normal removal after Get(), use Remove() instead.
82-
RemoveWithoutTurn(context.Context, *Conn, error)
83-
8478
Len() int
8579
IdleLen() int
8680
Stats() *Stats
@@ -102,6 +96,7 @@ type Options struct {
10296

10397
PoolFIFO bool
10498
PoolSize int32
99+
MaxConcurrentDials int
105100
DialTimeout time.Duration
106101
PoolTimeout time.Duration
107102
MinIdleConns int32
@@ -130,6 +125,9 @@ type ConnPool struct {
130125
dialErrorsNum uint32 // atomic
131126
lastDialError atomic.Value
132127

128+
queue chan struct{}
129+
dialsInProgress chan struct{}
130+
dialsQueue *wantConnQueue
133131
// Fast atomic semaphore for connection limiting
134132
// Replaces the old channel-based queue for better performance
135133
semaphore *internal.FastSemaphore
@@ -167,8 +165,11 @@ func NewConnPool(opt *Options) *ConnPool {
167165
p := &ConnPool{
168166
cfg: opt,
169167
semaphore: internal.NewFastSemaphore(semSize),
170-
conns: make(map[uint64]*Conn),
171-
idleConns: make([]*Conn, 0, opt.PoolSize),
168+
queue: make(chan struct{}, opt.PoolSize),
169+
conns: make(map[uint64]*Conn),
170+
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
171+
dialsQueue: newWantConnQueue(),
172+
idleConns: make([]*Conn, 0, opt.PoolSize),
172173
}
173174

174175
// Only create MinIdleConns if explicitly requested (> 0)
@@ -517,9 +518,8 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
517518

518519
atomic.AddUint32(&p.stats.Misses, 1)
519520

520-
newcn, err := p.newConn(ctx, true)
521+
newcn, err := p.queuedNewConn(ctx)
521522
if err != nil {
522-
p.freeTurn()
523523
return nil, err
524524
}
525525

@@ -538,6 +538,97 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
538538
return newcn, nil
539539
}
540540

541+
func (p *ConnPool) queuedNewConn(ctx context.Context) (*Conn, error) {
542+
select {
543+
case p.dialsInProgress <- struct{}{}:
544+
// Got permission, proceed to create connection
545+
case <-ctx.Done():
546+
p.freeTurn()
547+
return nil, ctx.Err()
548+
}
549+
550+
dialCtx, cancel := context.WithTimeout(context.Background(), p.cfg.DialTimeout)
551+
552+
w := &wantConn{
553+
ctx: dialCtx,
554+
cancelCtx: cancel,
555+
result: make(chan wantConnResult, 1),
556+
}
557+
var err error
558+
defer func() {
559+
if err != nil {
560+
if cn := w.cancel(); cn != nil {
561+
p.putIdleConn(ctx, cn)
562+
p.freeTurn()
563+
}
564+
}
565+
}()
566+
567+
p.dialsQueue.enqueue(w)
568+
569+
go func(w *wantConn) {
570+
var freeTurnCalled bool
571+
defer func() {
572+
if err := recover(); err != nil {
573+
if !freeTurnCalled {
574+
p.freeTurn()
575+
}
576+
internal.Logger.Printf(context.Background(), "queuedNewConn panic: %+v", err)
577+
}
578+
}()
579+
580+
defer w.cancelCtx()
581+
defer func() { <-p.dialsInProgress }() // Release connection creation permission
582+
583+
dialCtx := w.getCtxForDial()
584+
cn, cnErr := p.newConn(dialCtx, true)
585+
delivered := w.tryDeliver(cn, cnErr)
586+
if cnErr == nil && delivered {
587+
return
588+
} else if cnErr == nil && !delivered {
589+
p.putIdleConn(dialCtx, cn)
590+
p.freeTurn()
591+
freeTurnCalled = true
592+
} else {
593+
p.freeTurn()
594+
freeTurnCalled = true
595+
}
596+
}(w)
597+
598+
select {
599+
case <-ctx.Done():
600+
err = ctx.Err()
601+
return nil, err
602+
case result := <-w.result:
603+
err = result.err
604+
return result.cn, err
605+
}
606+
}
607+
608+
func (p *ConnPool) putIdleConn(ctx context.Context, cn *Conn) {
609+
for {
610+
w, ok := p.dialsQueue.dequeue()
611+
if !ok {
612+
break
613+
}
614+
if w.tryDeliver(cn, nil) {
615+
return
616+
}
617+
}
618+
619+
p.connsMu.Lock()
620+
defer p.connsMu.Unlock()
621+
622+
if p.closed() {
623+
_ = cn.Close()
624+
return
625+
}
626+
627+
// poolSize is increased in newConn
628+
p.idleConns = append(p.idleConns, cn)
629+
p.idleConnsLen.Add(1)
630+
}
631+
541632
func (p *ConnPool) waitTurn(ctx context.Context) error {
542633
// Fast path: check context first
543634
select {

0 commit comments

Comments
 (0)