@@ -3,7 +3,9 @@ use sozu_command::ready::Ready;
33use crate :: {
44 println_,
55 protocol:: mux:: {
6- debug_kawa, forcefully_terminate_answer, parser:: H2Error , set_default_answer, update_readiness_after_read, update_readiness_after_write, BackendStatus , Context , Endpoint , GlobalStreamId , MuxResult , Position , StreamState
6+ debug_kawa, forcefully_terminate_answer, parser:: H2Error , set_default_answer,
7+ update_readiness_after_read, update_readiness_after_write, BackendStatus , Context ,
8+ Endpoint , GlobalStreamId , MuxResult , Position , StreamState ,
79 } ,
810 socket:: SocketHandler ,
911 timer:: TimeoutContainer ,
@@ -44,6 +46,16 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
4446 let kawa = parts. rbuffer ;
4547 let ( size, status) = self . socket . socket_read ( kawa. storage . space ( ) ) ;
4648 kawa. storage . fill ( size) ;
49+ match self . position {
50+ Position :: Client ( ..) => {
51+ count ! ( "back_bytes_in" , size as i64 ) ;
52+ parts. metrics . backend_bin += size;
53+ }
54+ Position :: Server => {
55+ count ! ( "bytes_in" , size as i64 ) ;
56+ parts. metrics . bin += size;
57+ }
58+ }
4759 if update_readiness_after_read ( size, status, & mut self . readiness ) {
4860 return MuxResult :: Continue ;
4961 }
@@ -53,7 +65,7 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
5365 debug_kawa ( kawa) ;
5466 if kawa. is_error ( ) {
5567 match self . position {
56- Position :: Client ( _ ) => {
68+ Position :: Client ( .. ) => {
5769 let StreamState :: Linked ( token) = stream. state else {
5870 unreachable ! ( )
5971 } ;
@@ -79,15 +91,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
7991 . interest
8092 . insert ( Ready :: WRITABLE )
8193 }
82- match self . position {
83- Position :: Server => {
84- if !was_main_phase {
85- self . requests += 1 ;
86- println_ ! ( "REQUESTS: {}" , self . requests) ;
87- stream. state = StreamState :: Link
88- }
89- }
90- Position :: Client ( _) => { }
94+ if !was_main_phase && self . position . is_server ( ) {
95+ self . requests += 1 ;
96+ println_ ! ( "REQUESTS: {}" , self . requests) ;
97+ gauge_add ! ( "http.active_requests" , 1 ) ;
98+ stream. state = StreamState :: Link
9199 }
92100 } ;
93101 MuxResult :: Continue
@@ -101,7 +109,8 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
101109 println_ ! ( "======= MUX H1 WRITABLE {:?}" , self . position) ;
102110 self . timeout_container . reset ( ) ;
103111 let stream = & mut context. streams [ self . stream ] ;
104- let kawa = stream. wbuffer ( & self . position ) ;
112+ let parts = stream. split ( & self . position ) ;
113+ let kawa = parts. wbuffer ;
105114 kawa. prepare ( & mut kawa:: h1:: BlockConverter ) ;
106115 debug_kawa ( kawa) ;
107116 let bufs = kawa. as_io_slice ( ) ;
@@ -111,13 +120,23 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
111120 }
112121 let ( size, status) = self . socket . socket_write_vectored ( & bufs) ;
113122 kawa. consume ( size) ;
123+ match self . position {
124+ Position :: Client ( ..) => {
125+ count ! ( "back_bytes_out" , size as i64 ) ;
126+ parts. metrics . backend_bout += size;
127+ }
128+ Position :: Server => {
129+ count ! ( "bytes_out" , size as i64 ) ;
130+ parts. metrics . bout += size;
131+ }
132+ }
114133 if update_readiness_after_write ( size, status, & mut self . readiness ) {
115134 return MuxResult :: Continue ;
116135 }
117136
118137 if kawa. is_terminated ( ) && kawa. is_completed ( ) {
119138 match self . position {
120- Position :: Client ( _ ) => self . readiness . interest . insert ( Ready :: READABLE ) ,
139+ Position :: Client ( .. ) => self . readiness . interest . insert ( Ready :: READABLE ) ,
121140 Position :: Server => {
122141 if stream. context . closing {
123142 return MuxResult :: CloseSession ;
@@ -153,7 +172,12 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
153172 _ => { }
154173 }
155174 // ACCESS LOG
156- stream. generate_access_log ( false , Some ( String :: from ( "H1" ) ) , context. listener . clone ( ) ) ;
175+ stream. generate_access_log (
176+ false ,
177+ Some ( String :: from ( "H1" ) ) ,
178+ context. listener . clone ( ) ,
179+ ) ;
180+ stream. metrics . reset ( ) ;
157181 let old_state = std:: mem:: replace ( & mut stream. state , StreamState :: Unlinked ) ;
158182 if stream. context . keep_alive_frontend {
159183 self . timeout_container . reset ( ) ;
@@ -180,9 +204,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
180204 }
181205
182206 pub fn force_disconnect ( & mut self ) -> MuxResult {
183- match self . position {
184- Position :: Client ( _) => {
185- self . position = Position :: Client ( BackendStatus :: Disconnecting ) ;
207+ match & mut self . position {
208+ Position :: Client ( _, _ , status ) => {
209+ * status = BackendStatus :: Disconnecting ;
186210 self . readiness . event = Ready :: HUP ;
187211 MuxResult :: Continue
188212 }
@@ -196,13 +220,13 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
196220 L : ListenerHandler + L7ListenerHandler ,
197221 {
198222 match self . position {
199- Position :: Client ( BackendStatus :: KeepAlive ( _ ) )
200- | Position :: Client ( BackendStatus :: Disconnecting ) => {
223+ Position :: Client ( _ , _ , BackendStatus :: KeepAlive )
224+ | Position :: Client ( _ , _ , BackendStatus :: Disconnecting ) => {
201225 println_ ! ( "close detached client ConnectionH1" ) ;
202226 return ;
203227 }
204- Position :: Client ( BackendStatus :: Connecting ( _) )
205- | Position :: Client ( BackendStatus :: Connected ( _ ) ) => { }
228+ Position :: Client ( _ , _ , BackendStatus :: Connecting ( _) )
229+ | Position :: Client ( _ , _ , BackendStatus :: Connected ) => { }
206230 Position :: Server => unreachable ! ( ) ,
207231 }
208232 // reconnection is handled by the server
@@ -221,28 +245,34 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
221245 let stream_context = & mut stream. context ;
222246 println_ ! ( "end H1 stream {}: {stream_context:#?}" , self . stream) ;
223247 match & mut self . position {
224- Position :: Client ( BackendStatus :: Connected ( cluster_id) )
225- | Position :: Client ( BackendStatus :: Connecting ( cluster_id) ) => {
248+ Position :: Client ( _, _, BackendStatus :: Connecting ( _) ) => {
249+ self . stream = usize:: MAX ;
250+ self . force_disconnect ( ) ;
251+ }
252+ Position :: Client ( _, _, status @ BackendStatus :: Connected ) => {
226253 self . stream = usize:: MAX ;
227254 // keep alive should probably be used only if the http context is fully reset
228255 // in case end_stream occurs due to an error the connection state is probably
229256 // unrecoverable and should be terminated
230257 if stream_context. keep_alive_backend {
231- self . position =
232- Position :: Client ( BackendStatus :: KeepAlive ( std:: mem:: take ( cluster_id) ) )
258+ * status = BackendStatus :: KeepAlive ;
233259 } else {
234260 self . force_disconnect ( ) ;
235261 }
236262 }
237- Position :: Client ( BackendStatus :: KeepAlive ( _ ) )
238- | Position :: Client ( BackendStatus :: Disconnecting ) => unreachable ! ( ) ,
263+ Position :: Client ( _ , _ , BackendStatus :: KeepAlive )
264+ | Position :: Client ( _ , _ , BackendStatus :: Disconnecting ) => unreachable ! ( ) ,
239265 Position :: Server => match ( stream. front . consumed , stream. back . is_main_phase ( ) ) {
240266 ( true , true ) => {
241267 // we have a "forwardable" answer from the back
242268 // if the answer is not terminated we send an RstStream to properly clean the stream
243269 // if it is terminated, we finish the transfer, the backend is not necessary anymore
244270 if !stream. back . is_terminated ( ) {
245- forcefully_terminate_answer ( stream, & mut self . readiness , H2Error :: InternalError ) ;
271+ forcefully_terminate_answer (
272+ stream,
273+ & mut self . readiness ,
274+ H2Error :: InternalError ,
275+ ) ;
246276 } else {
247277 stream. state = StreamState :: Unlinked ;
248278 self . readiness . interest . insert ( Ready :: WRITABLE ) ;
@@ -271,11 +301,11 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
271301 self . readiness . interest . insert ( Ready :: ALL ) ;
272302 self . stream = stream;
273303 match & mut self . position {
274- Position :: Client ( BackendStatus :: KeepAlive ( cluster_id) ) => {
275- self . position =
276- Position :: Client ( BackendStatus :: Connecting ( std:: mem:: take ( cluster_id) ) )
304+ Position :: Client ( _, _, status @ BackendStatus :: KeepAlive ) => {
305+ * status = BackendStatus :: Connected ;
277306 }
278- Position :: Client ( _) => { }
307+ Position :: Client ( _, _, BackendStatus :: Disconnecting ) => unreachable ! ( ) ,
308+ Position :: Client ( _, _, _) => { }
279309 Position :: Server => unreachable ! ( ) ,
280310 }
281311 }
0 commit comments