@@ -19,22 +19,19 @@ use crate::errors::Error;
1919use crate :: handle:: Handle ;
2020use crate :: packet:: Packet ;
2121use crate :: pcap_util;
22- use crate :: stream:: StreamItem ;
22+ use crate :: stream:: { Interruptable , StreamItem } ;
2323
2424#[ pin_project]
25- struct CallbackFuture < E , T >
25+ struct CallbackFuture < T >
2626where
27- E : Sync + Send ,
28- T : Stream < Item = StreamItem < E > > + Sized + Unpin ,
27+ T : Stream < Item = StreamItem > + Sized + Unpin ,
2928{
3029 idx : usize ,
3130 stream : Option < T > ,
3231}
3332
34- impl < E : Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > Future
35- for CallbackFuture < E , T >
36- {
37- type Output = ( usize , Option < ( T , StreamItem < E > ) > ) ;
33+ impl < T : Stream < Item = StreamItem > + Sized + Unpin > Future for CallbackFuture < T > {
34+ type Output = ( usize , Option < ( T , StreamItem ) > ) ;
3835
3936 fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
4037 let this = self . project ( ) ;
@@ -60,17 +57,22 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Future
6057 }
6158}
6259
63- struct BridgeStreamState < E , T >
60+ struct BridgeStreamState < T >
6461where
65- E : Sync + Send ,
66- T : Stream < Item = StreamItem < E > > + Sized + Unpin ,
62+ T : Interruptable + Sized + Unpin ,
6763{
6864 stream : Option < T > ,
6965 current : Vec < Vec < Packet > > ,
7066 complete : bool ,
7167}
7268
73- impl < E : Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > BridgeStreamState < E , T > {
69+ impl < T : Interruptable + Sized + Unpin > BridgeStreamState < T > {
70+ fn interrupt ( & self ) {
71+ if let Some ( st) = & self . stream {
72+ st. interrupt ( ) ;
73+ }
74+ }
75+
7476 fn is_complete ( & self ) -> bool {
7577 self . complete && self . current . is_empty ( )
7678 }
@@ -100,22 +102,22 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStre
100102// `max_buffer_time` will check the spread of packets, and if it to large it will sort what it has and pass it on.
101103
102104#[ pin_project]
103- pub struct BridgeStream < E : Sync + Send , T >
105+ pub struct BridgeStream < T >
104106where
105- T : Stream < Item = StreamItem < E > > + Sized + Unpin ,
107+ T : Interruptable + Sized + Unpin ,
106108{
107- stream_states : VecDeque < BridgeStreamState < E , T > > ,
109+ stream_states : VecDeque < BridgeStreamState < T > > ,
108110 max_buffer_time : Duration ,
109111 min_states_needed : usize ,
110- poll_queue : FuturesUnordered < CallbackFuture < E , T > > ,
112+ poll_queue : FuturesUnordered < CallbackFuture < T > > ,
111113}
112114
113- impl < E : Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > BridgeStream < E , T > {
115+ impl < T : Interruptable + Sized + Unpin > BridgeStream < T > {
114116 pub fn new (
115117 streams : Vec < T > ,
116118 max_buffer_time : Duration ,
117119 min_states_needed : usize ,
118- ) -> Result < BridgeStream < E , T > , Error > {
120+ ) -> Result < BridgeStream < T > , Error > {
119121 let poll_queue = FuturesUnordered :: new ( ) ;
120122 let mut stream_states = VecDeque :: with_capacity ( streams. len ( ) ) ;
121123 for ( idx, stream) in streams. into_iter ( ) . enumerate ( ) {
@@ -139,10 +141,16 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> BridgeStre
139141 poll_queue,
140142 } )
141143 }
144+
145+ pub fn interrupt ( & self ) {
146+ for st in & self . stream_states {
147+ st. interrupt ( ) ;
148+ }
149+ }
142150}
143151
144- fn gather_packets < E : Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > (
145- stream_states : & mut VecDeque < BridgeStreamState < E , T > > ,
152+ fn gather_packets < T : Interruptable + Sized + Unpin > (
153+ stream_states : & mut VecDeque < BridgeStreamState < T > > ,
146154) -> Vec < Packet > {
147155 let mut result = vec ! [ ] ;
148156 let mut gather_to: Option < SystemTime > = None ;
@@ -183,10 +191,11 @@ fn gather_packets<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpi
183191 result
184192}
185193
186- impl < E : Sync + Send , T : Stream < Item = StreamItem < E > > + Sized + Unpin > Stream
187- for BridgeStream < E , T >
194+ impl < T > Stream for BridgeStream < T >
195+ where
196+ T : Interruptable + Sized + Unpin ,
188197{
189- type Item = StreamItem < E > ;
198+ type Item = StreamItem ;
190199
191200 fn poll_next ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
192201 let this = self . project ( ) ;
@@ -195,12 +204,12 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
195204 this. stream_states. len( ) ,
196205 this. poll_queue. len( )
197206 ) ;
198- let states: & mut VecDeque < BridgeStreamState < E , T > > = this. stream_states ;
207+ let states: & mut VecDeque < BridgeStreamState < T > > = this. stream_states ;
199208 let min_states_needed: usize = * this. min_states_needed ;
200209 let max_buffer_time = this. max_buffer_time ;
201210 let mut max_time_spread: Duration = Duration :: from_millis ( 0 ) ;
202211 let mut not_pending: usize = 0 ;
203- let mut poll_queue: & mut FuturesUnordered < CallbackFuture < E , T > > = this. poll_queue ;
212+ let mut poll_queue: & mut FuturesUnordered < CallbackFuture < T > > = this. poll_queue ;
204213
205214 loop {
206215 match Pin :: new ( & mut poll_queue) . poll_next ( cx) {
@@ -284,6 +293,7 @@ impl<E: Sync + Send, T: Stream<Item = StreamItem<E>> + Sized + Unpin> Stream
284293
285294#[ cfg( test) ]
286295mod tests {
296+ use std:: convert:: TryFrom ;
287297 use std:: io:: Cursor ;
288298 use std:: ops:: Range ;
289299 use std:: path:: PathBuf ;
@@ -293,9 +303,10 @@ mod tests {
293303 use futures:: { Future , Stream } ;
294304 use rand;
295305
296- use crate :: PacketStream ;
306+ use crate :: { Interface , PacketStream } ;
297307
298308 use super :: * ;
309+ use std:: sync:: atomic:: { AtomicBool , Ordering } ;
299310
300311 fn make_packet ( ts : usize ) -> Packet {
301312 Packet {
@@ -316,11 +327,10 @@ mod tests {
316327
317328 info ! ( "Testing against {:?}" , pcap_path) ;
318329
319- let handle = Handle :: file_capture ( pcap_path . to_str ( ) . expect ( "No path found" ) )
320- . expect ( "No handle created" ) ;
330+ let mut cfg = Config :: default ( ) ;
331+ cfg . with_interface ( Interface :: File ( pcap_path ) ) ;
321332
322- let packet_stream =
323- PacketStream :: new ( Config :: default ( ) , Arc :: clone ( & handle) ) . expect ( "Failed to build" ) ;
333+ let packet_stream = PacketStream :: try_from ( cfg) . expect ( "Failed to build" ) ;
324334
325335 let packet_provider = BridgeStream :: new ( vec ! [ packet_stream] , Duration :: from_millis ( 100 ) , 2 )
326336 . expect ( "Failed to build" ) ;
@@ -335,8 +345,6 @@ mod tests {
335345 . filter ( |p| p. data ( ) . len ( ) == p. actual_length ( ) as usize )
336346 . collect ( ) ;
337347
338- handle. interrupt ( ) ;
339-
340348 packets
341349 } ) ;
342350
@@ -373,11 +381,10 @@ mod tests {
373381
374382 info ! ( "Testing against {:?}" , pcap_path) ;
375383
376- let handle = Handle :: file_capture ( pcap_path . to_str ( ) . expect ( "No path found" ) )
377- . expect ( "No handle created" ) ;
384+ let mut cfg = Config :: default ( ) ;
385+ cfg . with_interface ( Interface :: File ( pcap_path ) ) ;
378386
379- let packet_stream =
380- PacketStream :: new ( Config :: default ( ) , Arc :: clone ( & handle) ) . expect ( "Failed to build" ) ;
387+ let packet_stream = PacketStream :: try_from ( cfg) . expect ( "Failed to build" ) ;
381388
382389 let packet_provider = BridgeStream :: new ( vec ! [ packet_stream] , Duration :: from_millis ( 100 ) , 2 )
383390 . expect ( "Failed to build" ) ;
@@ -396,11 +403,9 @@ mod tests {
396403 . await
397404 . into_iter ( )
398405 . flatten ( )
399- . filter ( |p| p. data ( ) . len ( ) == p. actual_length ( ) as _ )
406+ . filter ( |p| p. data ( ) . len ( ) == p. actual_length ( ) as usize )
400407 . count ( ) ;
401408
402- handle. interrupt ( ) ;
403-
404409 packets
405410 } ) ;
406411
@@ -411,9 +416,8 @@ mod tests {
411416 fn packets_from_lookup_bridge ( ) {
412417 let _ = env_logger:: try_init ( ) ;
413418
414- let handle = Handle :: lookup ( ) . expect ( "No handle created" ) ;
415- let packet_stream =
416- PacketStream :: new ( Config :: default ( ) , Arc :: clone ( & handle) ) . expect ( "Failed to build" ) ;
419+ let cfg = Config :: default ( ) ;
420+ let packet_stream = PacketStream :: try_from ( cfg) . expect ( "Failed to build" ) ;
417421
418422 let stream = BridgeStream :: new ( vec ! [ packet_stream] , Duration :: from_millis ( 100 ) , 2 ) ;
419423
@@ -432,9 +436,7 @@ mod tests {
432436 "(not (net 172.16.0.0/16 and port 443)) and (not (host 172.17.76.33 and port 443))"
433437 . to_owned ( ) ,
434438 ) ;
435- let handle = Handle :: lookup ( ) . expect ( "No handle created" ) ;
436- let packet_stream =
437- PacketStream :: new ( Config :: default ( ) , Arc :: clone ( & handle) ) . expect ( "Failed to build" ) ;
439+ let packet_stream = PacketStream :: try_from ( cfg) . expect ( "Failed to build" ) ;
438440
439441 let stream = BridgeStream :: new ( vec ! [ packet_stream] , Duration :: from_millis ( 100 ) , 2 ) ;
440442
@@ -444,6 +446,33 @@ mod tests {
444446 ) ;
445447 }
446448
449+ #[ pin_project]
450+ struct IterStream {
451+ inner : Vec < Packet > ,
452+ interrupted : AtomicBool ,
453+ }
454+
455+ impl Interruptable for IterStream {
456+ fn interrupt ( & self ) {
457+ self . interrupted . store ( true , Ordering :: Relaxed ) ;
458+ }
459+ }
460+
461+ impl Stream for IterStream {
462+ type Item = StreamItem ;
463+
464+ fn poll_next ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < Option < Self :: Item > > {
465+ let mut this = self ;
466+ if !this. interrupted . load ( Ordering :: Relaxed ) {
467+ let d = std:: mem:: replace ( & mut this. inner , vec ! [ ] ) ;
468+ this. interrupted . store ( true , Ordering :: Relaxed ) ;
469+ return Poll :: Ready ( Some ( Ok ( d) ) ) ;
470+ } else {
471+ return Poll :: Ready ( None ) ;
472+ }
473+ }
474+ }
475+
447476 #[ test]
448477 fn packets_come_out_time_ordered ( ) {
449478 let mut packets1 = vec ! [ ] ;
@@ -463,18 +492,21 @@ mod tests {
463492 packets2. push ( p)
464493 }
465494
466- let item1: StreamItem < Error > = Ok ( packets1. clone ( ) ) ;
467- let item2: StreamItem < Error > = Ok ( packets2. clone ( ) ) ;
468-
469- let stream1 = futures:: stream:: iter ( vec ! [ item1] ) ;
470- let stream2 = futures:: stream:: iter ( vec ! [ item2] ) ;
495+ let stream1 = IterStream {
496+ interrupted : AtomicBool :: default ( ) ,
497+ inner : packets1. clone ( ) ,
498+ } ;
499+ let stream2 = IterStream {
500+ interrupted : AtomicBool :: default ( ) ,
501+ inner : packets2. clone ( ) ,
502+ } ;
471503
472504 let result = smol:: block_on ( async move {
473505 let bridge = BridgeStream :: new ( vec ! [ stream1, stream2] , Duration :: from_millis ( 100 ) , 0 ) ;
474506
475507 let result = bridge
476508 . expect ( "Unable to create BridgeStream" )
477- . collect :: < Vec < StreamItem < Error > > > ( )
509+ . collect :: < Vec < StreamItem > > ( )
478510 . await ;
479511 result
480512 . into_iter ( )
0 commit comments