12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
- use bytes:: BytesMut ;
16
- use hyper:: service:: { make_service_fn , service_fn} ;
17
- use hyper:: { upgrade:: Upgraded , Body , Request , Response , Server } ;
15
+ use bytes:: { Bytes , BytesMut } ;
16
+ use hyper:: service:: service_fn;
17
+ use hyper:: { upgrade:: Upgraded , Request , Response } ;
18
18
use std:: {
19
19
error:: Error ,
20
- net:: { Ipv4Addr , SocketAddr , TcpListener } ,
20
+ net:: { Ipv4Addr , SocketAddr } ,
21
21
pin:: pin,
22
22
sync:: Arc ,
23
23
time:: Duration ,
@@ -29,57 +29,63 @@ use futures::{
29
29
future:: { join, select, Either } ,
30
30
Future ,
31
31
} ;
32
+ use http_body_util:: Full ;
33
+ use hyper:: body:: Incoming ;
34
+ use hyper_util:: rt:: { TokioExecutor , TokioIo } ;
35
+ use hyper_util:: server:: conn:: auto:: Builder ;
36
+ use hyper_util:: server:: graceful:: GracefulShutdown ;
32
37
use ratchet:: { CloseCode , CloseReason , Message , NoExt , NoExtProvider , PayloadType , WebSocket } ;
33
38
use thiserror:: Error ;
39
+ use tokio:: net:: TcpListener ;
34
40
use tokio:: { net:: TcpSocket , sync:: Notify } ;
35
41
36
42
async fn run_server (
37
43
bound_to : oneshot:: Sender < SocketAddr > ,
38
44
done : Arc < Notify > ,
39
45
) -> Result < ( ) , Box < dyn Error > > {
40
- let listener = TcpListener :: bind ( ( Ipv4Addr :: LOCALHOST , 0 ) ) ?;
46
+ let listener = TcpListener :: bind ( ( Ipv4Addr :: LOCALHOST , 0 ) ) . await ?;
41
47
let bound = listener. local_addr ( ) ?;
42
48
let _ = bound_to. send ( bound) ;
43
49
44
- let service = make_service_fn ( |_| async { Ok :: < _ , hyper:: Error > ( service_fn ( upgrade_server) ) } ) ;
45
-
46
- let shutdown = Arc :: new ( Notify :: new ( ) ) ;
47
- let shutdown_cpy = shutdown. clone ( ) ;
48
-
49
- let server = pin ! ( Server :: from_tcp( listener) ?
50
- . serve( service)
51
- . with_graceful_shutdown( async move {
52
- shutdown_cpy. notified( ) . await ;
53
- } ) ) ;
50
+ let ( io, _) = listener. accept ( ) . await ?;
51
+ let builder = Builder :: new ( TokioExecutor :: new ( ) ) ;
52
+ let connection =
53
+ builder. serve_connection_with_upgrades ( TokioIo :: new ( io) , service_fn ( upgrade_server) ) ;
54
+ let shutdown = GracefulShutdown :: new ( ) ;
54
55
56
+ let server = pin ! ( shutdown. watch( connection) ) ;
55
57
let stop = pin ! ( done. notified( ) ) ;
58
+
56
59
match select ( server, stop) . await {
57
- Either :: Left ( ( result, _) ) => result?,
58
- Either :: Right ( ( _, server) ) => {
59
- shutdown. notify_one ( ) ;
60
- tokio:: time:: timeout ( Duration :: from_secs ( 2 ) , server) . await ??;
60
+ Either :: Left ( ( result, _) ) => match result {
61
+ Ok ( ( ) ) => Ok ( ( ) ) ,
62
+ Err ( e) => Err ( e) ,
63
+ } ,
64
+ Either :: Right ( ( _, _server) ) => {
65
+ tokio:: time:: timeout ( Duration :: from_secs ( 2 ) , shutdown. shutdown ( ) ) . await ?;
66
+ Ok ( ( ) )
61
67
}
62
68
}
63
-
64
- Ok ( ( ) )
65
69
}
66
70
67
- async fn upgrade_server ( request : Request < Body > ) -> Result < Response < Body > , hyper:: http:: Error > {
71
+ async fn upgrade_server (
72
+ request : Request < Incoming > ,
73
+ ) -> Result < Response < Full < Bytes > > , hyper:: http:: Error > {
68
74
let protocols = [ "warp0" ] . into_iter ( ) . collect ( ) ;
69
75
match swimos_http:: negotiate_upgrade ( & request, & protocols, & NoExtProvider ) {
70
76
Ok ( Some ( negotiated) ) => {
71
77
let ( response, upgraded) = swimos_http:: upgrade ( request, negotiated, None , NoUnwrap ) ;
72
78
tokio:: spawn ( run_websocket ( upgraded) ) ;
73
79
Ok ( response)
74
80
}
75
- Ok ( None ) => Response :: builder ( ) . body ( Body :: from ( "Success" ) ) ,
81
+ Ok ( None ) => Response :: builder ( ) . body ( Full :: from ( "Success" ) ) ,
76
82
Err ( err) => Ok ( swimos_http:: fail_upgrade ( err) ) ,
77
83
}
78
84
}
79
85
80
86
async fn run_websocket < F > ( upgrade_fut : F )
81
87
where
82
- F : Future < Output = Result < WebSocket < Upgraded , NoExt > , hyper:: Error > > + Send ,
88
+ F : Future < Output = Result < WebSocket < TokioIo < Upgraded > , NoExt > , hyper:: Error > > + Send ,
83
89
{
84
90
match upgrade_fut. await {
85
91
Ok ( mut websocket) => {
0 commit comments