@@ -392,46 +392,50 @@ where
392
392
& mut self ,
393
393
sender : PI ,
394
394
message : ForwardJoin < PI > ,
395
- now : Instant ,
395
+ _now : Instant ,
396
396
io : & mut impl IO < PI > ,
397
397
) {
398
+ let peer_id = message. peer . id ;
398
399
// If the peer is already in our active view, we renew our neighbor relationship.
399
- if self . active_view . contains ( & message. peer . id ) {
400
- self . send_neighbor ( message. peer . id , Priority :: High , io) ;
400
+ if self . active_view . contains ( & peer_id) {
401
+ self . insert_peer_info ( message. peer , io) ;
402
+ self . send_neighbor ( peer_id, Priority :: High , io) ;
401
403
}
402
404
// "i) If the time to live is equal to zero or if the number of nodes in p’s active view is equal to one,
403
405
// it will add the new node to its active view (7)"
404
406
else if message. ttl . expired ( ) || self . active_view . len ( ) <= 1 {
405
- self . add_active (
406
- message. peer . id ,
407
- message. peer . data . clone ( ) ,
408
- Priority :: High ,
409
- now,
410
- io,
411
- ) ;
412
- }
413
- // "ii) If the time to live is equal to PRWL, p will insert the new node into its passive view"
414
- else if message. ttl == self . config . passive_random_walk_length {
415
- self . add_passive ( message. peer . id , message. peer . data . clone ( ) , io) ;
416
- }
417
- // "iii) The time to live field is decremented."
418
- // "iv) If, at this point, n has not been inserted
419
- // in p’s active view, p will forward the request to a random node in its active view
420
- // (different from the one from which the request was received)."
421
- if !self . active_view . contains ( & message. peer . id ) {
422
- match self
423
- . active_view
424
- . pick_random_without ( & [ & sender] , & mut self . rng )
407
+ self . insert_peer_info ( message. peer , io) ;
408
+ // Modification from paper: Instead of adding the peer directly to our active view,
409
+ // we only send the Neighbor message. We will add the peer to our active view once we receive a
410
+ // reply from our neighbor.
411
+ // This prevents us adding unreachable peers to our active view.
412
+ self . send_neighbor ( peer_id, Priority :: High , io) ;
413
+ } else {
414
+ // "ii) If the time to live is equal to PRWL, p will insert the new node into its passive view"
415
+ if message. ttl == self . config . passive_random_walk_length {
416
+ self . add_passive ( peer_id, message. peer . data . clone ( ) , io) ;
417
+ }
418
+ // "iii) The time to live field is decremented."
419
+ // "iv) If, at this point, n has not been inserted
420
+ // in p’s active view, p will forward the request to a random node in its active view
421
+ // (different from the one from which the request was received)."
422
+ if !self . active_view . contains ( & peer_id)
423
+ && !self . pending_neighbor_requests . contains ( & peer_id)
425
424
{
426
- None => {
427
- unreachable ! ( "if the peer was not added, there are at least two peers in our active view." ) ;
428
- }
429
- Some ( next) => {
430
- let message = Message :: ForwardJoin ( ForwardJoin {
431
- peer : message. peer ,
432
- ttl : message. ttl . next ( ) ,
433
- } ) ;
434
- io. push ( OutEvent :: SendMessage ( * next, message) ) ;
425
+ match self
426
+ . active_view
427
+ . pick_random_without ( & [ & sender] , & mut self . rng )
428
+ {
429
+ None => {
430
+ unreachable ! ( "if the peer was not added, there are at least two peers in our active view." ) ;
431
+ }
432
+ Some ( next) => {
433
+ let message = Message :: ForwardJoin ( ForwardJoin {
434
+ peer : message. peer ,
435
+ ttl : message. ttl . next ( ) ,
436
+ } ) ;
437
+ io. push ( OutEvent :: SendMessage ( * next, message) ) ;
438
+ }
435
439
}
436
440
}
437
441
}
@@ -612,23 +616,19 @@ where
612
616
if let Some ( node) = self
613
617
. passive_view
614
618
. pick_random_without ( & skip_peers, & mut self . rng )
619
+ . copied ( )
615
620
{
616
621
let priority = match self . active_view . is_empty ( ) {
617
622
true => Priority :: High ,
618
623
false => Priority :: Low ,
619
624
} ;
620
- let message = Message :: Neighbor ( Neighbor {
621
- priority,
622
- data : self . me_data . clone ( ) ,
623
- } ) ;
624
- io. push ( OutEvent :: SendMessage ( * node, message) ) ;
625
+ self . send_neighbor ( node, priority, io) ;
625
626
// schedule a timer that checks if the node replied with a neighbor message,
626
627
// otherwise try again with another passive node.
627
628
io. push ( OutEvent :: ScheduleTimer (
628
629
self . config . neighbor_request_timeout ,
629
- Timer :: PendingNeighborRequest ( * node) ,
630
+ Timer :: PendingNeighborRequest ( node) ,
630
631
) ) ;
631
- self . pending_neighbor_requests . insert ( * node) ;
632
632
} ;
633
633
}
634
634
@@ -716,11 +716,13 @@ where
716
716
}
717
717
718
718
fn send_neighbor ( & mut self , peer : PI , priority : Priority , io : & mut impl IO < PI > ) {
719
- let message = Message :: Neighbor ( Neighbor {
720
- priority,
721
- data : self . me_data . clone ( ) ,
722
- } ) ;
723
- io. push ( OutEvent :: SendMessage ( peer, message) ) ;
719
+ if self . pending_neighbor_requests . insert ( peer) {
720
+ let message = Message :: Neighbor ( Neighbor {
721
+ priority,
722
+ data : self . me_data . clone ( ) ,
723
+ } ) ;
724
+ io. push ( OutEvent :: SendMessage ( peer, message) ) ;
725
+ }
724
726
}
725
727
}
726
728
0 commit comments