Skip to content

Commit f84d742

Browse files
committed
change: only add peers to active view after receiving neighbor messages
1 parent de7b148 commit f84d742

File tree

4 files changed

+56
-52
lines changed

4 files changed

+56
-52
lines changed

src/proto.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ mod test {
178178
// Now let node 3 join node 0.
179179
// Node 0 is full, so it will disconnect from either node 1 or node 2.
180180
network.command(3, t, Command::Join(vec![0]));
181-
network.ticks(8);
181+
network.ticks(10);
182182

183183
// Confirm emitted events. There's two options because whether node 0 disconnects from
184184
// node 1 or node 2 is random.
@@ -213,7 +213,7 @@ mod test {
213213
let config = Config::default();
214214
let mut network = Network::new(Instant::now());
215215
let broadcast_ticks = 12;
216-
let join_ticks = 12;
216+
let join_ticks = 13;
217217
// build a network with 6 nodes
218218
let rng = rand_chacha::ChaCha12Rng::seed_from_u64(99);
219219
for i in 0..6 {

src/proto/hyparview.rs

Lines changed: 46 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -392,46 +392,50 @@ where
392392
&mut self,
393393
sender: PI,
394394
message: ForwardJoin<PI>,
395-
now: Instant,
395+
_now: Instant,
396396
io: &mut impl IO<PI>,
397397
) {
398+
let peer_id = message.peer.id;
398399
// If the peer is already in our active view, we renew our neighbor relationship.
399-
if self.active_view.contains(&message.peer.id) {
400-
self.send_neighbor(message.peer.id, Priority::High, io);
400+
if self.active_view.contains(&peer_id) {
401+
self.insert_peer_info(message.peer, io);
402+
self.send_neighbor(peer_id, Priority::High, io);
401403
}
402404
// "i) If the time to live is equal to zero or if the number of nodes in p’s active view is equal to one,
403405
// it will add the new node to its active view (7)"
404406
else if message.ttl.expired() || self.active_view.len() <= 1 {
405-
self.add_active(
406-
message.peer.id,
407-
message.peer.data.clone(),
408-
Priority::High,
409-
now,
410-
io,
411-
);
412-
}
413-
// "ii) If the time to live is equal to PRWL, p will insert the new node into its passive view"
414-
else if message.ttl == self.config.passive_random_walk_length {
415-
self.add_passive(message.peer.id, message.peer.data.clone(), io);
416-
}
417-
// "iii) The time to live field is decremented."
418-
// "iv) If, at this point, n has not been inserted
419-
// in p’s active view, p will forward the request to a random node in its active view
420-
// (different from the one from which the request was received)."
421-
if !self.active_view.contains(&message.peer.id) {
422-
match self
423-
.active_view
424-
.pick_random_without(&[&sender], &mut self.rng)
407+
self.insert_peer_info(message.peer, io);
408+
// Modification from paper: Instead of adding the peer directly to our active view,
409+
// we only send the Neighbor message. We will add the peer to our active view once we receive a
410+
// reply from our neighbor.
411+
// This prevents us adding unreachable peers to our active view.
412+
self.send_neighbor(peer_id, Priority::High, io);
413+
} else {
414+
// "ii) If the time to live is equal to PRWL, p will insert the new node into its passive view"
415+
if message.ttl == self.config.passive_random_walk_length {
416+
self.add_passive(peer_id, message.peer.data.clone(), io);
417+
}
418+
// "iii) The time to live field is decremented."
419+
// "iv) If, at this point, n has not been inserted
420+
// in p’s active view, p will forward the request to a random node in its active view
421+
// (different from the one from which the request was received)."
422+
if !self.active_view.contains(&peer_id)
423+
&& !self.pending_neighbor_requests.contains(&peer_id)
425424
{
426-
None => {
427-
unreachable!("if the peer was not added, there are at least two peers in our active view.");
428-
}
429-
Some(next) => {
430-
let message = Message::ForwardJoin(ForwardJoin {
431-
peer: message.peer,
432-
ttl: message.ttl.next(),
433-
});
434-
io.push(OutEvent::SendMessage(*next, message));
425+
match self
426+
.active_view
427+
.pick_random_without(&[&sender], &mut self.rng)
428+
{
429+
None => {
430+
unreachable!("if the peer was not added, there are at least two peers in our active view.");
431+
}
432+
Some(next) => {
433+
let message = Message::ForwardJoin(ForwardJoin {
434+
peer: message.peer,
435+
ttl: message.ttl.next(),
436+
});
437+
io.push(OutEvent::SendMessage(*next, message));
438+
}
435439
}
436440
}
437441
}
@@ -612,23 +616,19 @@ where
612616
if let Some(node) = self
613617
.passive_view
614618
.pick_random_without(&skip_peers, &mut self.rng)
619+
.copied()
615620
{
616621
let priority = match self.active_view.is_empty() {
617622
true => Priority::High,
618623
false => Priority::Low,
619624
};
620-
let message = Message::Neighbor(Neighbor {
621-
priority,
622-
data: self.me_data.clone(),
623-
});
624-
io.push(OutEvent::SendMessage(*node, message));
625+
self.send_neighbor(node, priority, io);
625626
// schedule a timer that checks if the node replied with a neighbor message,
626627
// otherwise try again with another passive node.
627628
io.push(OutEvent::ScheduleTimer(
628629
self.config.neighbor_request_timeout,
629-
Timer::PendingNeighborRequest(*node),
630+
Timer::PendingNeighborRequest(node),
630631
));
631-
self.pending_neighbor_requests.insert(*node);
632632
};
633633
}
634634

@@ -716,11 +716,13 @@ where
716716
}
717717

718718
fn send_neighbor(&mut self, peer: PI, priority: Priority, io: &mut impl IO<PI>) {
719-
let message = Message::Neighbor(Neighbor {
720-
priority,
721-
data: self.me_data.clone(),
722-
});
723-
io.push(OutEvent::SendMessage(peer, message));
719+
if self.pending_neighbor_requests.insert(peer) {
720+
let message = Message::Neighbor(Neighbor {
721+
priority,
722+
data: self.me_data.clone(),
723+
});
724+
io.push(OutEvent::SendMessage(peer, message));
725+
}
724726
}
725727
}
726728

src/proto/state.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ impl<PI: PeerIdentity, R: Rng + Clone> State<PI, R> {
207207
event: InEvent<PI>,
208208
now: Instant,
209209
) -> impl Iterator<Item = OutEvent<PI>> + '_ {
210-
trace!("in_event: {event:?}");
210+
trace!("in : {event:?}");
211211
track_in_event(&event);
212212

213213
let event: InEventMapped<PI> = event.into();
@@ -274,7 +274,7 @@ fn handle_out_event<PI: PeerIdentity>(
274274
conns: &mut ConnsMap<PI>,
275275
outbox: &mut Outbox<PI>,
276276
) {
277-
trace!("out_event: {event:?}");
277+
trace!("out: {event:?}");
278278
match event {
279279
topic::OutEvent::SendMessage(to, message) => {
280280
outbox.push(OutEvent::SendMessage(to, Message { topic, message }))

src/proto/tests.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use bytes::Bytes;
66
use n0_future::time::{Duration, Instant};
77
use rand::Rng;
88
use rand_core::SeedableRng;
9-
use tracing::{debug, warn};
9+
use tracing::{debug, trace_span, warn};
1010

1111
use super::{
1212
util::TimerMap, Command, Config, Event, InEvent, OutEvent, PeerIdentity, State, Timer, TopicId,
@@ -62,7 +62,7 @@ fn push_back<PI: Eq + std::hash::Hash>(
6262
inqueues.get_mut(peer_pos).unwrap().push_back(event);
6363
}
6464

65-
impl<PI: PeerIdentity + Ord, R: Rng + Clone> Network<PI, R> {
65+
impl<PI: PeerIdentity + Ord + std::fmt::Display, R: Rng + Clone> Network<PI, R> {
6666
pub fn push(&mut self, peer: State<PI, R>) {
6767
let idx = self.inqueues.len();
6868
self.inqueues.push(VecDeque::new());
@@ -98,6 +98,8 @@ impl<PI: PeerIdentity + Ord, R: Rng + Clone> Network<PI, R> {
9898

9999
pub fn tick(&mut self) {
100100
self.time += self.tick_duration;
101+
let tick =
102+
self.time.duration_since(self.start).as_millis() / self.tick_duration.as_millis();
101103

102104
// process timers
103105
for (_time, (idx, timer)) in self.timers.drain_until(&self.time) {
@@ -114,14 +116,14 @@ impl<PI: PeerIdentity + Ord, R: Rng + Clone> Network<PI, R> {
114116
for (idx, queue) in self.inqueues.iter_mut().enumerate() {
115117
let state = self.peers.get_mut(idx).unwrap();
116118
let peer = *state.me();
119+
let span = trace_span!("tick", node = %peer, %tick);
120+
let _guard = span.enter();
117121
while let Some(event) = queue.pop_front() {
118122
if let InEvent::RecvMessage(from, _message) = &event {
119123
self.conns.insert((*from, peer).into());
120124
}
121-
debug!(peer = ?peer, "IN {event:?}");
122125
let out = state.handle(event, self.time);
123126
for event in out {
124-
debug!(peer = ?peer, "OUT {event:?}");
125127
match event {
126128
OutEvent::SendMessage(to, message) => {
127129
let to_idx = *self.peers_by_address.get(&to).unwrap();

0 commit comments

Comments
 (0)