Skip to content

Commit 20b97eb

Browse files
committed
test: handoff p
1 parent b780b6f commit 20b97eb

File tree

1 file changed

+118
-12
lines changed

1 file changed

+118
-12
lines changed

connstate/conn_test.go

Lines changed: 118 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"errors"
1919
"io"
2020
"net"
21+
"runtime"
2122
"sync"
2223
"syscall"
2324
"testing"
@@ -26,11 +27,7 @@ import (
2627
"github.com/stretchr/testify/assert"
2728
)
2829

29-
var testMutex sync.Mutex
30-
3130
func TestListenConnState(t *testing.T) {
32-
testMutex.Lock()
33-
defer testMutex.Unlock()
3431
ln, err := net.Listen("tcp", "localhost:0")
3532
if err != nil {
3633
panic(err)
@@ -39,12 +36,12 @@ func TestListenConnState(t *testing.T) {
3936
for {
4037
conn, err := ln.Accept()
4138
assert.Nil(t, err)
42-
go func() {
39+
go func(conn net.Conn) {
4340
buf := make([]byte, 11)
4441
_, err := conn.Read(buf)
4542
assert.Nil(t, err)
4643
conn.Close()
47-
}()
44+
}(conn)
4845
}
4946
}()
5047
conn, err := net.Dial("tcp", ln.Addr().String())
@@ -97,8 +94,6 @@ func (r *mockRawConn) Control(f func(fd uintptr)) error {
9794
}
9895

9996
func TestListenConnState_Err(t *testing.T) {
100-
testMutex.Lock()
101-
defer testMutex.Unlock()
10297
// replace poll
10398
pollInitOnce.Do(createPoller)
10499
oldPoll := poll
@@ -166,8 +161,6 @@ func TestListenConnState_Err(t *testing.T) {
166161
}
167162

168163
func BenchmarkListenConnState(b *testing.B) {
169-
testMutex.Lock()
170-
defer testMutex.Unlock()
171164
ln, err := net.Listen("tcp", "localhost:0")
172165
if err != nil {
173166
panic(err)
@@ -176,14 +169,15 @@ func BenchmarkListenConnState(b *testing.B) {
176169
for {
177170
conn, err := ln.Accept()
178171
assert.Nil(b, err)
179-
go func() {
172+
go func(conn net.Conn) {
180173
buf := make([]byte, 11)
181174
_, err := conn.Read(buf)
182175
assert.Nil(b, err)
183176
conn.Close()
184-
}()
177+
}(conn)
185178
}
186179
}()
180+
b.ResetTimer()
187181
b.RunParallel(func(pb *testing.PB) {
188182
for pb.Next() {
189183
conn, err := net.Dial("tcp", ln.Addr().String())
@@ -204,3 +198,115 @@ func BenchmarkListenConnState(b *testing.B) {
204198
}
205199
})
206200
}
201+
202+
type statefulConn struct {
203+
net.Conn
204+
stater ConnStater
205+
}
206+
207+
func (s *statefulConn) Close() error {
208+
s.stater.Close()
209+
return s.Conn.Close()
210+
}
211+
212+
type connpool struct {
213+
mu sync.Mutex
214+
conns []*statefulConn
215+
}
216+
217+
func (p *connpool) get(dialFunc func() *statefulConn) *statefulConn {
218+
p.mu.Lock()
219+
if len(p.conns) == 0 {
220+
p.mu.Unlock()
221+
return dialFunc()
222+
}
223+
for i := len(p.conns) - 1; i >= 0; i-- {
224+
conn := p.conns[i]
225+
if conn.stater.State() == StateOK {
226+
p.conns = p.conns[:i]
227+
p.mu.Unlock()
228+
return conn
229+
} else {
230+
conn.Close()
231+
}
232+
}
233+
p.conns = p.conns[:0]
234+
p.mu.Unlock()
235+
return dialFunc()
236+
}
237+
238+
func (p *connpool) put(conn *statefulConn) {
239+
p.mu.Lock()
240+
defer p.mu.Unlock()
241+
p.conns = append(p.conns, conn)
242+
}
243+
244+
// BenchmarkHandoffP is used to verify the impact on performance caused by P being occupied by the poller
245+
// when using syscall.EpollWait() in a high-load scenario.
246+
// To compare with syscall.EpollWait(), you could run `go test -bench=BenchmarkHandoffP -benchtime=10s .`
247+
// to test the first time, and replace isyscall.EpollWait() with syscall.EpollWait() to test the second time.
248+
func BenchmarkHandoffP(b *testing.B) {
249+
// set GOMAXPROCS to 1 to make P resources scarce
250+
runtime.GOMAXPROCS(1)
251+
ln, err := net.Listen("tcp", "localhost:0")
252+
if err != nil {
253+
panic(err)
254+
}
255+
go func() {
256+
for {
257+
conn, err := ln.Accept()
258+
assert.Nil(b, err)
259+
go func(conn net.Conn) {
260+
var count uint64
261+
for {
262+
buf := make([]byte, 11)
263+
_, err := conn.Read(buf)
264+
if err != nil {
265+
conn.Close()
266+
return
267+
}
268+
_, err = conn.Write(buf)
269+
if err != nil {
270+
conn.Close()
271+
return
272+
}
273+
count++
274+
if count == 1000 {
275+
conn.Close()
276+
return
277+
}
278+
}
279+
}(conn)
280+
}
281+
}()
282+
cp := &connpool{}
283+
dialFunc := func() *statefulConn {
284+
conn, err := net.Dial("tcp", ln.Addr().String())
285+
assert.Nil(b, err)
286+
stater, err := ListenConnState(conn)
287+
assert.Nil(b, err)
288+
return &statefulConn{
289+
Conn: conn,
290+
stater: stater,
291+
}
292+
}
293+
b.ResetTimer()
294+
b.RunParallel(func(pb *testing.PB) {
295+
for pb.Next() {
296+
var conn *statefulConn
297+
conn = cp.get(dialFunc)
298+
buf := make([]byte, 11)
299+
_, err := conn.Write(buf)
300+
if err != nil {
301+
conn.Close()
302+
continue
303+
}
304+
_, err = conn.Read(buf)
305+
if err != nil {
306+
conn.Close()
307+
continue
308+
}
309+
cp.put(conn)
310+
}
311+
})
312+
}

0 commit comments

Comments
 (0)