33use std:: { pin:: Pin , task:: Poll } ;
44
55use iroh_base:: NodeId ;
6- use iroh_metrics:: inc;
76use n0_future:: {
87 task:: { self , AbortOnDropHandle } ,
98 MergeUnbounded , Stream , StreamExt ,
@@ -21,9 +20,10 @@ pub(super) struct RttHandle {
2120}
2221
2322impl RttHandle {
24- pub ( super ) fn new ( ) -> Self {
23+ pub ( super ) fn new ( metrics : MagicsockMetrics ) -> Self {
2524 let mut actor = RttActor {
2625 connection_events : Default :: default ( ) ,
26+ metrics,
2727 } ;
2828 let ( msg_tx, msg_rx) = mpsc:: channel ( 16 ) ;
2929 let handle = task:: spawn (
@@ -62,6 +62,7 @@ struct RttActor {
6262 /// Stream of connection type changes.
6363 #[ debug( "MergeUnbounded<WatcherStream<ConnectionType>>" ) ]
6464 connection_events : MergeUnbounded < MappedStream > ,
65+ metrics : MagicsockMetrics ,
6566}
6667
6768#[ derive( Debug ) ]
@@ -75,8 +76,12 @@ struct MappedStream {
7576 was_direct_before : bool ,
7677}
7778
79+ struct ConnectionEvent {
80+ became_direct : bool ,
81+ }
82+
7883impl Stream for MappedStream {
79- type Item = ConnectionType ;
84+ type Item = ConnectionEvent ;
8085
8186 /// Performs the congestion controller reset for a magic socket path change.
8287 ///
@@ -90,6 +95,7 @@ impl Stream for MappedStream {
9095 ) -> Poll < Option < Self :: Item > > {
9196 match Pin :: new ( & mut self . stream ) . poll_next ( cx) {
9297 Poll :: Ready ( Some ( new_conn_type) ) => {
98+ let mut became_direct = false ;
9399 if self . connection . network_path_changed ( ) {
94100 debug ! (
95101 node_id = %self . node_id. fmt_short( ) ,
@@ -99,10 +105,10 @@ impl Stream for MappedStream {
99105 if !self . was_direct_before && matches ! ( new_conn_type, ConnectionType :: Direct ( _) )
100106 {
101107 self . was_direct_before = true ;
102- inc ! ( MagicsockMetrics , connection_became_direct ) ;
108+ became_direct = true
103109 }
104- }
105- Poll :: Ready ( Some ( new_conn_type ) )
110+ } ;
111+ Poll :: Ready ( Some ( ConnectionEvent { became_direct } ) )
106112 }
107113 Poll :: Ready ( None ) => Poll :: Ready ( None ) ,
108114 Poll :: Pending => Poll :: Pending ,
@@ -124,7 +130,11 @@ impl RttActor {
124130 None => break ,
125131 }
126132 }
127- _item = self . connection_events. next( ) , if !self . connection_events. is_empty( ) => { }
133+ event = self . connection_events. next( ) , if !self . connection_events. is_empty( ) => {
134+ if event. map( |e| e. became_direct) . unwrap_or( false ) {
135+ self . metrics. connection_became_direct. inc( ) ;
136+ }
137+ }
128138 }
129139 }
130140 debug ! ( "rtt-actor finished" ) ;
@@ -156,6 +166,6 @@ impl RttActor {
156166 node_id,
157167 was_direct_before : false ,
158168 } ) ;
159- inc ! ( MagicsockMetrics , connection_handshake_success ) ;
169+ self . metrics . connection_handshake_success . inc ( ) ;
160170 }
161171}
0 commit comments