@@ -184,8 +184,13 @@ where
184184
185185        // Execute the replay 
186186        crate :: shortcircuit!( 
187-             self . replay_loop( & mut  http_stream_tx,  & mut  decoder_stream,  journal_stream) 
188-                 . await 
187+             self . replay_loop( 
188+                 & mut  http_stream_tx, 
189+                 & mut  decoder_stream, 
190+                 journal_stream, 
191+                 journal_metadata. length
192+             ) 
193+             . await 
189194        ) ; 
190195
191196        // If we have the invoker_rx and the protocol type is bidi stream, 
@@ -305,13 +310,15 @@ where
305310        http_stream_tx :  & mut  InvokerRequestStreamSender , 
306311        http_stream_rx :  & mut  S , 
307312        journal_stream :  JournalStream , 
313+         expected_entries_count :  u32 , 
308314    )  -> TerminalLoopState < ( ) > 
309315    where 
310316        JournalStream :  Stream < Item  = JournalEntry >  + Unpin , 
311317        S :  Stream < Item  = Result < DecoderStreamItem ,  InvokerError > >  + Unpin , 
312318    { 
313319        let  mut  journal_stream = journal_stream. fuse ( ) ; 
314320        let  mut  got_headers = false ; 
321+         let  mut  sent_entries = 0 ; 
315322
316323        loop  { 
317324            tokio:: select! { 
@@ -334,10 +341,11 @@ where
334341                opt_je = journal_stream. next( )  => { 
335342                    match  opt_je { 
336343                        Some ( JournalEntry :: JournalV2 ( entry) )  => { 
344+                             sent_entries += 1 ; 
337345                            crate :: shortcircuit!( self . write_entry( http_stream_tx,  entry. inner) . await ) ; 
338- 
339346                        } 
340347                        Some ( JournalEntry :: JournalV1 ( old_entry) )  => { 
348+                             sent_entries += 1 ; 
341349                            if  let  journal:: Entry :: Input ( input_entry)  = crate :: shortcircuit!( old_entry. deserialize_entry:: <ProtobufRawEntryCodec >( ) )  { 
342350                                crate :: shortcircuit!( self . write_entry( 
343351                                    http_stream_tx, 
@@ -352,6 +360,14 @@ where
352360                            } 
353361                        } , 
354362                        None  => { 
363+                             // Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way! 
364+                             if  sent_entries < expected_entries_count { 
365+                                 return  TerminalLoopState :: Failed ( InvokerError :: UnexpectedEntryCount  { 
366+                                     actual:  sent_entries, 
367+                                     expected:  expected_entries_count, 
368+                                 } ) 
369+                             } 
370+ 
355371                            // No need to wait for the headers to continue 
356372                            trace!( "Finished to replay the journal" ) ; 
357373                            return  TerminalLoopState :: Continue ( ( ) ) 
0 commit comments