@@ -3,8 +3,9 @@ use sozu_command::ready::Ready;
33use crate :: {
44 println_,
55 protocol:: mux:: {
6- debug_kawa, set_default_answer, update_readiness_after_read, update_readiness_after_write,
7- BackendStatus , Context , Endpoint , GlobalStreamId , MuxResult , Position , StreamState , forcefully_terminate_answer,
6+ debug_kawa, forcefully_terminate_answer, set_default_answer, update_readiness_after_read,
7+ update_readiness_after_write, BackendStatus , Context , Endpoint , GlobalStreamId , MuxResult ,
8+ Position , StreamState ,
89 } ,
910 socket:: SocketHandler ,
1011 Readiness ,
@@ -58,16 +59,15 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
5859 endpoint. end_stream ( token, global_stream_id, context) ;
5960 }
6061 Position :: Server => {
61- set_default_answer ( & mut stream. back , & mut self . readiness , 400 ) ;
62- stream. state = StreamState :: Unlinked ;
62+ set_default_answer ( stream, & mut self . readiness , 400 ) ;
6363 }
6464 }
6565 return MuxResult :: Continue ;
6666 }
6767 if kawa. is_terminated ( ) {
6868 self . readiness . interest . remove ( Ready :: READABLE ) ;
6969 }
70- if was_initial && kawa. is_main_phase ( ) {
70+ if kawa. is_main_phase ( ) {
7171 match self . position {
7272 Position :: Client ( _) => {
7373 let StreamState :: Linked ( token) = stream. state else { unreachable ! ( ) } ;
@@ -77,12 +77,14 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
7777 . insert ( Ready :: WRITABLE )
7878 }
7979 Position :: Server => {
80- self . requests += 1 ;
81- println_ ! ( "REQUESTS: {}" , self . requests) ;
82- stream. state = StreamState :: Link
80+ if was_initial {
81+ self . requests += 1 ;
82+ println_ ! ( "REQUESTS: {}" , self . requests) ;
83+ stream. state = StreamState :: Link
84+ }
8385 }
84- } ;
85- }
86+ }
87+ } ;
8688 MuxResult :: Continue
8789 }
8890
@@ -115,7 +117,9 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
115117 stream. back . storage . clear ( ) ;
116118 stream. front . clear ( ) ;
117119 // do not clear stream.front.storage because of H1 pipelining
118- if let StreamState :: Linked ( token) = stream. state {
120+ stream. attempts = 0 ;
121+ let old_state = std:: mem:: replace ( & mut stream. state , StreamState :: Unlinked ) ;
122+ if let StreamState :: Linked ( token) = old_state {
119123 endpoint. end_stream ( token, self . stream , context) ;
120124 }
121125 }
@@ -148,14 +152,12 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
148152 let stream = & mut context. streams [ stream] ;
149153 let stream_context = & mut stream. context ;
150154 println_ ! ( "end H1 stream {}: {stream_context:#?}" , self . stream) ;
151- self . stream = usize:: MAX ;
152- let mut owned_position = Position :: Server ;
153- std:: mem:: swap ( & mut owned_position, & mut self . position ) ;
154- match owned_position {
155+ match & mut self . position {
155156 Position :: Client ( BackendStatus :: Connected ( cluster_id) )
156157 | Position :: Client ( BackendStatus :: Connecting ( cluster_id) ) => {
158+ self . stream = usize:: MAX ;
157159 self . position = if stream_context. keep_alive_backend {
158- Position :: Client ( BackendStatus :: KeepAlive ( cluster_id) )
160+ Position :: Client ( BackendStatus :: KeepAlive ( std :: mem :: take ( cluster_id) ) )
159161 } else {
160162 Position :: Client ( BackendStatus :: Disconnecting )
161163 }
@@ -168,13 +170,14 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
168170 // if the answer is not terminated we send an RstStream to properly clean the stream
169171 // if it is terminated, we finish the transfer, the backend is not necessary anymore
170172 if !stream. back . is_terminated ( ) {
171- forcefully_terminate_answer ( & mut stream. back , & mut self . readiness ) ;
173+ forcefully_terminate_answer ( stream, & mut self . readiness ) ;
174+ } else {
175+ stream. state = StreamState :: Unlinked ;
176+ self . readiness . interest . insert ( Ready :: WRITABLE ) ;
172177 }
173- stream. state = StreamState :: Unlinked ;
174178 }
175179 ( true , false ) => {
176- set_default_answer ( & mut stream. back , & mut self . readiness , 502 ) ;
177- stream. state = StreamState :: Unlinked ;
180+ set_default_answer ( stream, & mut self . readiness , 502 ) ;
178181 }
179182 ( false , false ) => {
180183 // we do not have an answer, but the request is untouched so we can retry
@@ -189,16 +192,13 @@ impl<Front: SocketHandler> ConnectionH1<Front> {
189192 pub fn start_stream ( & mut self , stream : GlobalStreamId , context : & mut Context ) {
190193 println_ ! ( "start H1 stream {stream} {:?}" , self . readiness) ;
191194 self . stream = stream;
192- let mut owned_position = Position :: Server ;
193- std:: mem:: swap ( & mut owned_position, & mut self . position ) ;
194- match owned_position {
195+ match & mut self . position {
195196 Position :: Client ( BackendStatus :: KeepAlive ( cluster_id) ) => {
196- self . position = Position :: Client ( BackendStatus :: Connecting ( cluster_id) )
197+ self . position =
198+ Position :: Client ( BackendStatus :: Connecting ( std:: mem:: take ( cluster_id) ) )
197199 }
200+ Position :: Client ( _) => { }
198201 Position :: Server => unreachable ! ( ) ,
199- _ => {
200- self . position = owned_position;
201- }
202202 }
203203 }
204204}
0 commit comments