@@ -1498,7 +1498,7 @@ mod conn {
1498
1498
use futures_util:: future:: { self , poll_fn, FutureExt , TryFutureExt } ;
1499
1499
use http_body_util:: { BodyExt , Empty , Full , StreamBody } ;
1500
1500
use hyper:: rt:: Timer ;
1501
- use tokio:: io:: { AsyncReadExt as _, AsyncWriteExt as _} ;
1501
+ use tokio:: io:: { AsyncReadExt as _, AsyncWriteExt as _, DuplexStream } ;
1502
1502
use tokio:: net:: { TcpListener as TkTcpListener , TcpStream } ;
1503
1503
1504
1504
use hyper:: body:: { Body , Frame } ;
@@ -1530,6 +1530,21 @@ mod conn {
1530
1530
( listener, addr)
1531
1531
}
1532
1532
1533
+ fn setup_duplex_test_server ( ) -> ( DuplexStream , DuplexStream , SocketAddr ) {
1534
+ use std:: net:: { IpAddr , Ipv6Addr } ;
1535
+ setup_logger ( ) ;
1536
+
1537
+ const BUF_SIZE : usize = 1024 ;
1538
+ let ( ioa, iob) = tokio:: io:: duplex ( BUF_SIZE ) ;
1539
+
1540
+ /// A test address inside the 'documentation' address range.
1541
+ /// See: <https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml>
1542
+ const TEST_ADDR : IpAddr = IpAddr :: V6 ( Ipv6Addr :: new ( 0x3fff , 0 , 0 , 0 , 0 , 0 , 0 , 1 ) ) ;
1543
+ const TEST_SOCKET : SocketAddr = SocketAddr :: new ( TEST_ADDR , 8080 ) ;
1544
+
1545
+ ( ioa, iob, TEST_SOCKET )
1546
+ }
1547
+
1533
1548
#[ tokio:: test]
1534
1549
async fn get ( ) {
1535
1550
let ( listener, addr) = setup_tk_test_server ( ) . await ;
@@ -2307,16 +2322,14 @@ mod conn {
2307
2322
// Regression test for failure to fully close connections when using HTTP2 CONNECT
2308
2323
// We send 2 requests and then drop them. We should see the connection gracefully close.
2309
2324
use futures_util:: future;
2310
- let ( listener , addr) = setup_tk_test_server ( ) . await ;
2325
+ let ( client_io , server_io , addr) = setup_duplex_test_server ( ) ;
2311
2326
let ( tx, rxx) = oneshot:: channel :: < ( ) > ( ) ;
2312
2327
2313
2328
tokio:: task:: spawn ( async move {
2314
2329
use hyper:: server:: conn:: http2;
2315
2330
use hyper:: service:: service_fn;
2316
2331
2317
- let res = listener. accept ( ) . await ;
2318
- let ( stream, _) = res. unwrap ( ) ;
2319
- let stream = TokioIo :: new ( stream) ;
2332
+ let stream = TokioIo :: new ( server_io) ;
2320
2333
2321
2334
let service = service_fn ( move |req : Request < hyper:: body:: Incoming > | {
2322
2335
tokio:: task:: spawn ( async move {
@@ -2334,7 +2347,7 @@ mod conn {
2334
2347
} ) ;
2335
2348
} ) ;
2336
2349
2337
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2350
+ let io = TokioIo :: new ( client_io ) ;
2338
2351
let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2339
2352
. handshake ( io)
2340
2353
. await
@@ -2380,11 +2393,11 @@ mod conn {
2380
2393
2381
2394
#[ tokio:: test]
2382
2395
async fn http2_keep_alive_detects_unresponsive_server ( ) {
2383
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2396
+ let ( client_io , server_io , _ ) = setup_duplex_test_server ( ) ;
2384
2397
2385
2398
// spawn a server that reads but doesn't write
2386
2399
tokio:: spawn ( async move {
2387
- let mut sock = listener . accept ( ) . await . unwrap ( ) . 0 ;
2400
+ let mut sock = server_io ;
2388
2401
let mut buf = [ 0u8 ; 1024 ] ;
2389
2402
loop {
2390
2403
let n = sock. read ( & mut buf) . await . expect ( "server read" ) ;
@@ -2395,7 +2408,7 @@ mod conn {
2395
2408
}
2396
2409
} ) ;
2397
2410
2398
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2411
+ let io = TokioIo :: new ( client_io ) ;
2399
2412
let ( _client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2400
2413
. timer ( TokioTimer )
2401
2414
. keep_alive_interval ( Duration :: from_secs ( 1 ) )
@@ -2415,15 +2428,14 @@ mod conn {
2415
2428
// will use the default behavior which will NOT detect the server
2416
2429
// is unresponsive while no streams are active.
2417
2430
2418
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2431
+ let ( client_io , server_io , _ ) = setup_duplex_test_server ( ) ;
2419
2432
2420
2433
// spawn a server that reads but doesn't write
2421
2434
tokio:: spawn ( async move {
2422
- let sock = listener. accept ( ) . await . unwrap ( ) . 0 ;
2423
- drain_til_eof ( sock) . await . expect ( "server read" ) ;
2435
+ drain_til_eof ( server_io) . await . expect ( "server read" ) ;
2424
2436
} ) ;
2425
2437
2426
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2438
+ let io = TokioIo :: new ( client_io ) ;
2427
2439
let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2428
2440
. timer ( TokioTimer )
2429
2441
. keep_alive_interval ( Duration :: from_secs ( 1 ) )
@@ -2446,15 +2458,14 @@ mod conn {
2446
2458
2447
2459
#[ tokio:: test]
2448
2460
async fn http2_keep_alive_closes_open_streams ( ) {
2449
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2461
+ let ( client_io , server_io , _addr ) = setup_duplex_test_server ( ) ;
2450
2462
2451
2463
// spawn a server that reads but doesn't write
2452
2464
tokio:: spawn ( async move {
2453
- let sock = listener. accept ( ) . await . unwrap ( ) . 0 ;
2454
- drain_til_eof ( sock) . await . expect ( "server read" ) ;
2465
+ drain_til_eof ( server_io) . await . expect ( "server read" ) ;
2455
2466
} ) ;
2456
2467
2457
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2468
+ let io = TokioIo :: new ( client_io ) ;
2458
2469
let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2459
2470
. timer ( TokioTimer )
2460
2471
. keep_alive_interval ( Duration :: from_secs ( 1 ) )
@@ -2491,11 +2502,11 @@ mod conn {
2491
2502
// alive is enabled
2492
2503
use hyper:: service:: service_fn;
2493
2504
2494
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2505
+ let ( client_io , server_io , _addr ) = setup_duplex_test_server ( ) ;
2495
2506
2496
2507
// Spawn an HTTP2 server that reads the whole body and responds
2497
2508
tokio:: spawn ( async move {
2498
- let sock = TokioIo :: new ( listener . accept ( ) . await . unwrap ( ) . 0 ) ;
2509
+ let sock = TokioIo :: new ( server_io ) ;
2499
2510
hyper:: server:: conn:: http2:: Builder :: new ( TokioExecutor )
2500
2511
. timer ( TokioTimer )
2501
2512
. serve_connection (
@@ -2513,7 +2524,7 @@ mod conn {
2513
2524
. expect ( "serve_connection" ) ;
2514
2525
} ) ;
2515
2526
2516
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2527
+ let io = TokioIo :: new ( client_io ) ;
2517
2528
let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2518
2529
. timer ( TokioTimer )
2519
2530
. keep_alive_interval ( Duration :: from_secs ( 1 ) )
@@ -2598,11 +2609,11 @@ mod conn {
2598
2609
2599
2610
#[ tokio:: test]
2600
2611
async fn h2_connect ( ) {
2601
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2612
+ let ( client_io , server_io , _ ) = setup_duplex_test_server ( ) ;
2602
2613
2603
2614
// Spawn an HTTP2 server that asks for bread and responds with baguette.
2604
2615
tokio:: spawn ( async move {
2605
- let sock = listener . accept ( ) . await . unwrap ( ) . 0 ;
2616
+ let sock = server_io ;
2606
2617
let mut h2 = h2:: server:: handshake ( sock) . await . unwrap ( ) ;
2607
2618
2608
2619
let ( req, mut respond) = h2. accept ( ) . await . unwrap ( ) . unwrap ( ) ;
@@ -2624,7 +2635,7 @@ mod conn {
2624
2635
assert ! ( body. data( ) . await . is_none( ) ) ;
2625
2636
} ) ;
2626
2637
2627
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2638
+ let io = TokioIo :: new ( client_io ) ;
2628
2639
let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2629
2640
. handshake ( io)
2630
2641
. await
@@ -2653,11 +2664,11 @@ mod conn {
2653
2664
2654
2665
#[ tokio:: test]
2655
2666
async fn h2_connect_rejected ( ) {
2656
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2667
+ let ( client_io , server_io , _ ) = setup_duplex_test_server ( ) ;
2657
2668
let ( done_tx, done_rx) = oneshot:: channel ( ) ;
2658
2669
2659
2670
tokio:: spawn ( async move {
2660
- let sock = listener . accept ( ) . await . unwrap ( ) . 0 ;
2671
+ let sock = server_io ;
2661
2672
let mut h2 = h2:: server:: handshake ( sock) . await . unwrap ( ) ;
2662
2673
2663
2674
let ( req, mut respond) = h2. accept ( ) . await . unwrap ( ) . unwrap ( ) ;
@@ -2674,7 +2685,7 @@ mod conn {
2674
2685
done_rx. await . unwrap ( ) ;
2675
2686
} ) ;
2676
2687
2677
- let io = tcp_connect ( & addr ) . await . expect ( "tcp connect" ) ;
2688
+ let io = TokioIo :: new ( client_io ) ;
2678
2689
let ( mut client, conn) = conn:: http2:: Builder :: new ( TokioExecutor )
2679
2690
. handshake :: < _ , Empty < Bytes > > ( io)
2680
2691
. await
@@ -2703,16 +2714,15 @@ mod conn {
2703
2714
2704
2715
#[ tokio:: test]
2705
2716
async fn test_body_panics ( ) {
2706
- let ( listener , addr ) = setup_tk_test_server ( ) . await ;
2717
+ let ( client_io , server_io , _ ) = setup_duplex_test_server ( ) ;
2707
2718
2708
2719
// spawn a server that reads but doesn't write
2709
2720
tokio:: spawn ( async move {
2710
- let sock = listener . accept ( ) . await . unwrap ( ) . 0 ;
2721
+ let sock = server_io ;
2711
2722
drain_til_eof ( sock) . await . expect ( "server read" ) ;
2712
2723
} ) ;
2713
2724
2714
- let io = tcp_connect ( & addr) . await . expect ( "tcp connect" ) ;
2715
-
2725
+ let io = TokioIo :: new ( client_io) ;
2716
2726
let ( mut client, conn) = conn:: http1:: Builder :: new ( )
2717
2727
. handshake ( io)
2718
2728
. await
0 commit comments