Skip to content

Commit e83fc79

Browse files
committed
async create conn
1 parent 8aecdb8 commit e83fc79

File tree

9 files changed

+614
-96
lines changed

9 files changed

+614
-96
lines changed

async_handoff_integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
5353
Dialer: func(ctx context.Context) (net.Conn, error) {
5454
return &mockNetConn{addr: "original:6379"}, nil
5555
},
56-
PoolSize: int32(5),
57-
PoolTimeout: time.Second,
56+
PoolSize: int32(5),
57+
MaxConcurrentDials: 5,
58+
PoolTimeout: time.Second,
5859
})
5960

6061
// Add the hook to the pool after creation
@@ -153,8 +154,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
153154
return &mockNetConn{addr: "original:6379"}, nil
154155
},
155156

156-
PoolSize: int32(10),
157-
PoolTimeout: time.Second,
157+
PoolSize: int32(10),
158+
MaxConcurrentDials: 10,
159+
PoolTimeout: time.Second,
158160
})
159161
defer testPool.Close()
160162

@@ -225,8 +227,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
225227
return &mockNetConn{addr: "original:6379"}, nil
226228
},
227229

228-
PoolSize: int32(3),
229-
PoolTimeout: time.Second,
230+
PoolSize: int32(3),
231+
MaxConcurrentDials: 3,
232+
PoolTimeout: time.Second,
230233
})
231234
defer testPool.Close()
232235

@@ -288,8 +291,9 @@ func TestEventDrivenHandoffIntegration(t *testing.T) {
288291
return &mockNetConn{addr: "original:6379"}, nil
289292
},
290293

291-
PoolSize: int32(2),
292-
PoolTimeout: time.Second,
294+
PoolSize: int32(2),
295+
MaxConcurrentDials: 2,
296+
PoolTimeout: time.Second,
293297
})
294298
defer testPool.Close()
295299

internal/pool/bench_test.go

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@ func BenchmarkPoolGetPut(b *testing.B) {
3131
for _, bm := range benchmarks {
3232
b.Run(bm.String(), func(b *testing.B) {
3333
connPool := pool.NewConnPool(&pool.Options{
34-
Dialer: dummyDialer,
35-
PoolSize: int32(bm.poolSize),
36-
PoolTimeout: time.Second,
37-
DialTimeout: 1 * time.Second,
38-
ConnMaxIdleTime: time.Hour,
34+
Dialer: dummyDialer,
35+
PoolSize: int32(bm.poolSize),
36+
MaxConcurrentDials: bm.poolSize,
37+
PoolTimeout: time.Second,
38+
DialTimeout: 1 * time.Second,
39+
ConnMaxIdleTime: time.Hour,
3940
})
4041

4142
b.ResetTimer()
@@ -75,11 +76,12 @@ func BenchmarkPoolGetRemove(b *testing.B) {
7576
for _, bm := range benchmarks {
7677
b.Run(bm.String(), func(b *testing.B) {
7778
connPool := pool.NewConnPool(&pool.Options{
78-
Dialer: dummyDialer,
79-
PoolSize: int32(bm.poolSize),
80-
PoolTimeout: time.Second,
81-
DialTimeout: 1 * time.Second,
82-
ConnMaxIdleTime: time.Hour,
79+
Dialer: dummyDialer,
80+
PoolSize: int32(bm.poolSize),
81+
MaxConcurrentDials: bm.poolSize,
82+
PoolTimeout: time.Second,
83+
DialTimeout: 1 * time.Second,
84+
ConnMaxIdleTime: time.Hour,
8385
})
8486

8587
b.ResetTimer()

internal/pool/buffer_size_test.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ var _ = Describe("Buffer Size Configuration", func() {
2525

2626
It("should use default buffer sizes when not specified", func() {
2727
connPool = pool.NewConnPool(&pool.Options{
28-
Dialer: dummyDialer,
29-
PoolSize: int32(1),
30-
PoolTimeout: 1000,
28+
Dialer: dummyDialer,
29+
PoolSize: int32(1),
30+
MaxConcurrentDials: 1,
31+
PoolTimeout: 1000,
3132
})
3233

3334
cn, err := connPool.NewConn(ctx)
@@ -47,11 +48,12 @@ var _ = Describe("Buffer Size Configuration", func() {
4748
customWriteSize := 64 * 1024 // 64KB
4849

4950
connPool = pool.NewConnPool(&pool.Options{
50-
Dialer: dummyDialer,
51-
PoolSize: int32(1),
52-
PoolTimeout: 1000,
53-
ReadBufferSize: customReadSize,
54-
WriteBufferSize: customWriteSize,
51+
Dialer: dummyDialer,
52+
PoolSize: int32(1),
53+
MaxConcurrentDials: 1,
54+
PoolTimeout: 1000,
55+
ReadBufferSize: customReadSize,
56+
WriteBufferSize: customWriteSize,
5557
})
5658

5759
cn, err := connPool.NewConn(ctx)
@@ -68,11 +70,12 @@ var _ = Describe("Buffer Size Configuration", func() {
6870

6971
It("should handle zero buffer sizes by using defaults", func() {
7072
connPool = pool.NewConnPool(&pool.Options{
71-
Dialer: dummyDialer,
72-
PoolSize: int32(1),
73-
PoolTimeout: 1000,
74-
ReadBufferSize: 0, // Should use default
75-
WriteBufferSize: 0, // Should use default
73+
Dialer: dummyDialer,
74+
PoolSize: int32(1),
75+
MaxConcurrentDials: 1,
76+
PoolTimeout: 1000,
77+
ReadBufferSize: 0, // Should use default
78+
WriteBufferSize: 0, // Should use default
7679
})
7780

7881
cn, err := connPool.NewConn(ctx)
@@ -104,9 +107,10 @@ var _ = Describe("Buffer Size Configuration", func() {
104107
// Test the scenario where someone creates a pool directly (like in tests)
105108
// without setting ReadBufferSize and WriteBufferSize
106109
connPool = pool.NewConnPool(&pool.Options{
107-
Dialer: dummyDialer,
108-
PoolSize: int32(1),
109-
PoolTimeout: 1000,
110+
Dialer: dummyDialer,
111+
PoolSize: int32(1),
112+
MaxConcurrentDials: 1,
113+
PoolTimeout: 1000,
110114
// ReadBufferSize and WriteBufferSize are not set (will be 0)
111115
})
112116

internal/pool/hooks_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,9 @@ func TestPoolWithHooks(t *testing.T) {
177177
Dialer: func(ctx context.Context) (net.Conn, error) {
178178
return &net.TCPConn{}, nil // Mock connection
179179
},
180-
PoolSize: 1,
181-
DialTimeout: time.Second,
180+
PoolSize: 1,
181+
MaxConcurrentDials: 1,
182+
DialTimeout: time.Second,
182183
}
183184

184185
pool := NewConnPool(opt)

internal/pool/pool.go

Lines changed: 108 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ type Options struct {
9191

9292
PoolFIFO bool
9393
PoolSize int32
94+
MaxConcurrentDials int
9495
DialTimeout time.Duration
9596
PoolTimeout time.Duration
9697
MinIdleConns int32
@@ -113,13 +114,65 @@ type lastDialErrorWrap struct {
113114
err error
114115
}
115116

117+
type wantConn struct {
118+
mu sync.Mutex // protects ctx, done and sending of the result
119+
ctx context.Context // context for dial, cleared after delivered or canceled
120+
cancelCtx context.CancelFunc
121+
done bool // true after delivered or canceled
122+
result chan wantConnResult // channel to deliver connection or error
123+
}
124+
125+
func (w *wantConn) tryDeliver(cn *Conn, err error) bool {
126+
w.mu.Lock()
127+
defer w.mu.Unlock()
128+
if w.done {
129+
return false
130+
}
131+
132+
w.done = true
133+
w.ctx = nil
134+
135+
w.result <- wantConnResult{cn: cn, err: err}
136+
close(w.result)
137+
138+
return true
139+
}
140+
141+
func (w *wantConn) cancel(ctx context.Context, p *ConnPool) {
142+
w.mu.Lock()
143+
var cn *Conn
144+
if w.done {
145+
select {
146+
case result := <-w.result:
147+
cn = result.cn
148+
default:
149+
}
150+
} else {
151+
close(w.result)
152+
}
153+
154+
w.done = true
155+
w.ctx = nil
156+
w.mu.Unlock()
157+
158+
if cn != nil {
159+
p.Put(ctx, cn)
160+
}
161+
}
162+
163+
type wantConnResult struct {
164+
cn *Conn
165+
err error
166+
}
167+
116168
type ConnPool struct {
117169
cfg *Options
118170

119171
dialErrorsNum uint32 // atomic
120172
lastDialError atomic.Value
121173

122-
queue chan struct{}
174+
queue chan struct{}
175+
dialsInProgress chan struct{}
123176

124177
connsMu sync.Mutex
125178
conns map[uint64]*Conn
@@ -145,9 +198,10 @@ func NewConnPool(opt *Options) *ConnPool {
145198
p := &ConnPool{
146199
cfg: opt,
147200

148-
queue: make(chan struct{}, opt.PoolSize),
149-
conns: make(map[uint64]*Conn),
150-
idleConns: make([]*Conn, 0, opt.PoolSize),
201+
queue: make(chan struct{}, opt.PoolSize),
202+
conns: make(map[uint64]*Conn),
203+
dialsInProgress: make(chan struct{}, opt.MaxConcurrentDials),
204+
idleConns: make([]*Conn, 0, opt.PoolSize),
151205
}
152206

153207
// Only create MinIdleConns if explicitly requested (> 0)
@@ -473,9 +527,8 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
473527

474528
atomic.AddUint32(&p.stats.Misses, 1)
475529

476-
newcn, err := p.newConn(ctx, true)
530+
newcn, err := p.asyncNewConn(ctx)
477531
if err != nil {
478-
p.freeTurn()
479532
return nil, err
480533
}
481534

@@ -495,6 +548,55 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
495548
return newcn, nil
496549
}
497550

551+
func (p *ConnPool) asyncNewConn(ctx context.Context) (*Conn, error) {
552+
// First try to acquire permission to create a connection
553+
select {
554+
case p.dialsInProgress <- struct{}{}:
555+
// Got permission, proceed to create connection
556+
case <-ctx.Done():
557+
p.freeTurn()
558+
return nil, ctx.Err()
559+
}
560+
561+
dialCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), p.cfg.DialTimeout)
562+
563+
w := &wantConn{
564+
ctx: dialCtx,
565+
cancelCtx: cancel,
566+
result: make(chan wantConnResult, 1),
567+
}
568+
var err error
569+
defer func() {
570+
if err != nil {
571+
w.cancel(ctx, p)
572+
}
573+
}()
574+
575+
go func(w *wantConn) {
576+
defer w.cancelCtx()
577+
defer func() { <-p.dialsInProgress }() // Release connection creation permission
578+
579+
cn, cnErr := p.newConn(w.ctx, true)
580+
delivered := w.tryDeliver(cn, cnErr)
581+
if cnErr == nil && delivered {
582+
return
583+
} else if cnErr == nil && !delivered {
584+
p.Put(w.ctx, cn)
585+
} else { // freeTurn after error
586+
p.freeTurn()
587+
}
588+
}(w)
589+
590+
select {
591+
case <-ctx.Done():
592+
err = ctx.Err()
593+
return nil, err
594+
case result := <-w.result:
595+
err = result.err
596+
return result.cn, err
597+
}
598+
}
599+
498600
func (p *ConnPool) waitTurn(ctx context.Context) error {
499601
select {
500602
case <-ctx.Done():

0 commit comments

Comments
 (0)