1
1
mod test_utils;
2
2
3
+ use std:: future:: Future ;
3
4
use std:: io:: { Read , Write } ;
4
5
use std:: net:: { SocketAddr , TcpListener } ;
5
- use std:: pin:: Pin ;
6
+ use std:: pin:: { pin , Pin } ;
6
7
use std:: sync:: atomic:: Ordering ;
7
8
use std:: sync:: Arc ;
8
9
use std:: task:: Poll ;
9
10
use std:: thread;
10
11
use std:: time:: Duration ;
11
12
12
13
use futures_channel:: { mpsc, oneshot} ;
14
+ use futures_core:: Stream ;
13
15
use futures_util:: future:: { self , FutureExt , TryFutureExt } ;
14
16
use futures_util:: stream:: StreamExt ;
15
- use futures_util:: { self , Stream } ;
16
17
use http_body_util:: BodyExt ;
17
18
use http_body_util:: { Empty , Full , StreamBody } ;
18
19
use tokio:: io:: { AsyncReadExt , AsyncWriteExt } ;
@@ -132,7 +133,7 @@ async fn drop_client_closes_idle_connections() {
132
133
res. unwrap ( ) ;
133
134
134
135
// not closed yet, just idle
135
- future:: poll_fn ( |ctx| {
136
+ std :: future:: poll_fn ( |ctx| {
136
137
assert ! ( Pin :: new( & mut closes) . poll_next( ctx) . is_pending( ) ) ;
137
138
Poll :: Ready ( ( ) )
138
139
} )
@@ -142,8 +143,7 @@ async fn drop_client_closes_idle_connections() {
142
143
drop ( client) ;
143
144
144
145
// and wait a few ticks for the connections to close
145
- let t = tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . map ( |_| panic ! ( "time out" ) ) ;
146
- futures_util:: pin_mut!( t) ;
146
+ let t = pin ! ( tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . map( |_| panic!( "time out" ) ) ) ;
147
147
let close = closes. into_future ( ) . map ( |( opt, _) | opt. expect ( "closes" ) ) ;
148
148
future:: select ( t, close) . await ;
149
149
t1. await . unwrap ( ) ;
@@ -192,8 +192,7 @@ async fn drop_response_future_closes_in_progress_connection() {
192
192
future:: select ( res, rx1) . await ;
193
193
194
194
// res now dropped
195
- let t = tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . map ( |_| panic ! ( "time out" ) ) ;
196
- futures_util:: pin_mut!( t) ;
195
+ let t = pin ! ( tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . map( |_| panic!( "time out" ) ) ) ;
197
196
let close = closes. into_future ( ) . map ( |( opt, _) | opt. expect ( "closes" ) ) ;
198
197
future:: select ( t, close) . await ;
199
198
}
@@ -248,8 +247,7 @@ async fn drop_response_body_closes_in_progress_connection() {
248
247
res. unwrap ( ) ;
249
248
250
249
// and wait a few ticks to see the connection drop
251
- let t = tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . map ( |_| panic ! ( "time out" ) ) ;
252
- futures_util:: pin_mut!( t) ;
250
+ let t = pin ! ( tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . map( |_| panic!( "time out" ) ) ) ;
253
251
let close = closes. into_future ( ) . map ( |( opt, _) | opt. expect ( "closes" ) ) ;
254
252
future:: select ( t, close) . await ;
255
253
}
@@ -301,8 +299,7 @@ async fn no_keep_alive_closes_connection() {
301
299
let ( res, _) = future:: join ( res, rx) . await ;
302
300
res. unwrap ( ) ;
303
301
304
- let t = tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . map ( |_| panic ! ( "time out" ) ) ;
305
- futures_util:: pin_mut!( t) ;
302
+ let t = pin ! ( tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . map( |_| panic!( "time out" ) ) ) ;
306
303
let close = closes. into_future ( ) . map ( |( opt, _) | opt. expect ( "closes" ) ) ;
307
304
future:: select ( close, t) . await ;
308
305
}
@@ -348,8 +345,7 @@ async fn socket_disconnect_closes_idle_conn() {
348
345
let ( res, _) = future:: join ( res, rx) . await ;
349
346
res. unwrap ( ) ;
350
347
351
- let t = tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . map ( |_| panic ! ( "time out" ) ) ;
352
- futures_util:: pin_mut!( t) ;
348
+ let t = pin ! ( tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . map( |_| panic!( "time out" ) ) ) ;
353
349
let close = closes. into_future ( ) . map ( |( opt, _) | opt. expect ( "closes" ) ) ;
354
350
future:: select ( t, close) . await ;
355
351
}
@@ -562,7 +558,7 @@ async fn client_keep_alive_when_response_before_request_body_ends() {
562
558
} ) ;
563
559
564
560
future:: join ( res, rx2) . await . 0 . unwrap ( ) ;
565
- future:: poll_fn ( |ctx| {
561
+ std :: future:: poll_fn ( |ctx| {
566
562
assert ! ( Pin :: new( & mut closes) . poll_next( ctx) . is_pending( ) ) ;
567
563
Poll :: Ready ( ( ) )
568
564
} )
@@ -571,8 +567,7 @@ async fn client_keep_alive_when_response_before_request_body_ends() {
571
567
assert_eq ! ( connects. load( Ordering :: Relaxed ) , 1 ) ;
572
568
573
569
drop ( client) ;
574
- let t = tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . map ( |_| panic ! ( "time out" ) ) ;
575
- futures_util:: pin_mut!( t) ;
570
+ let t = pin ! ( tokio:: time:: sleep( Duration :: from_millis( 100 ) ) . map( |_| panic!( "time out" ) ) ) ;
576
571
let close = closes. into_future ( ) . map ( |( opt, _) | opt. expect ( "closes" ) ) ;
577
572
future:: select ( t, close) . await ;
578
573
}
@@ -1255,10 +1250,7 @@ impl tower_service::Service<hyper::Uri> for MockConnector {
1255
1250
type Response = crate :: MockConnection ;
1256
1251
type Error = std:: io:: Error ;
1257
1252
type Future = std:: pin:: Pin <
1258
- Box <
1259
- dyn futures_util:: Future < Output = std:: result:: Result < Self :: Response , Self :: Error > >
1260
- + Send ,
1261
- > ,
1253
+ Box < dyn Future < Output = std:: result:: Result < Self :: Response , Self :: Error > > + Send > ,
1262
1254
> ;
1263
1255
1264
1256
// Polls the connector to check if it’s ready to handle a request.
0 commit comments