Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 41 additions & 46 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,17 @@ use n0_future::{
stream::StreamExt,
task::{self, AbortOnDropHandle},
time::{self, Duration},
Stream, TryStreamExt,
Stream,
};
use nested_enum_utils::common_fields;
use snafu::{ensure, IntoError, Snafu};
use tokio::sync::oneshot;
use tracing::{debug, error_span, warn, Instrument};

pub use crate::node_info::{NodeData, NodeInfo, ParseError, UserData};
use crate::Endpoint;
use crate::{
watcher::{self, Watcher},
Endpoint,
};

#[cfg(not(wasm_browser))]
pub mod dns;
Expand Down Expand Up @@ -258,7 +260,7 @@ impl<T: Discovery> Discovery for Arc<T> {}
///
/// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`]
/// directly from [`DiscoveryItem`].
#[derive(Debug, Clone)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DiscoveryItem {
/// The node info for the node, as discovered by the the discovery service.
node_info: NodeInfo,
Expand Down Expand Up @@ -412,24 +414,31 @@ const MAX_AGE: Duration = Duration::from_secs(10);

/// A wrapper around a tokio task which runs a node discovery.
pub(super) struct DiscoveryTask {
on_first_rx: oneshot::Receiver<Result<(), DiscoveryError>>,
watcher: watcher::Direct<Option<DiscoveryItem>>,
_task: AbortOnDropHandle<()>,
}

impl DiscoveryTask {
/// Starts a discovery task.
pub(super) fn start(ep: Endpoint, node_id: NodeId) -> Result<Self, DiscoveryError> {
ensure!(ep.discovery().is_some(), NoServiceConfiguredSnafu);
let (on_first_tx, on_first_rx) = oneshot::channel();
let watchable = watcher::Watchable::default();
let watcher = watchable.watch();
let me = ep.node_id();
let task = task::spawn(
async move { Self::run(ep, node_id, on_first_tx).await }.instrument(
async move {
if let Err(err) = Self::run(ep, node_id, watchable).await {
// TODO: propagate err
warn!("discovery failed: {:?}", err);
}
}
.instrument(
error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
),
);
Ok(Self {
_task: AbortOnDropHandle::new(task),
on_first_rx,
watcher,
})
}

Expand All @@ -451,36 +460,39 @@ impl DiscoveryTask {
return Ok(None);
}
ensure!(ep.discovery().is_some(), NoServiceConfiguredSnafu);
let (on_first_tx, on_first_rx) = oneshot::channel();
let watchable = watcher::Watchable::default();
let watcher = watchable.watch();
let ep = ep.clone();
let me = ep.node_id();

let task = task::spawn(
async move {
// If delay is set, wait and recheck if discovery is needed. If not, early-exit.
if let Some(delay) = delay {
time::sleep(delay).await;
if !Self::needs_discovery(&ep, node_id) {
debug!("no discovery needed, abort");
on_first_tx.send(Ok(())).ok();
return;
}
}
Self::run(ep, node_id, on_first_tx).await
if let Err(err) = Self::run(ep, node_id, watchable).await {
// TODO: propagate error
warn!("failed discovery: {:?}", err);
}
}
.instrument(
error_span!("discovery", me = %me.fmt_short(), node = %node_id.fmt_short()),
),
);
Ok(Some(Self {
_task: AbortOnDropHandle::new(task),
on_first_rx,
watcher,
}))
}

/// Waits until the discovery task produced at least one result.
pub(super) async fn first_arrived(&mut self) -> Result<(), DiscoveryError> {
let fut = &mut self.on_first_rx;
fut.await.expect("sender dropped")?;
self.watcher.initialized().await.ok();
Ok(())
}

Expand Down Expand Up @@ -522,16 +534,9 @@ impl DiscoveryTask {
async fn run(
ep: Endpoint,
node_id: NodeId,
on_first_tx: oneshot::Sender<Result<(), DiscoveryError>>,
) {
let mut stream = match Self::create_stream(&ep, node_id) {
Ok(stream) => stream,
Err(err) => {
on_first_tx.send(Err(err)).ok();
return;
}
};
let mut on_first_tx = Some(on_first_tx);
watchable: watcher::Watchable<Option<DiscoveryItem>>,
) -> Result<(), DiscoveryError> {
let mut stream = Self::create_stream(&ep, node_id)?;
debug!("starting");
loop {
match stream.next().await {
Expand All @@ -543,12 +548,12 @@ impl DiscoveryTask {
continue;
}
debug!(%provenance, addr = ?node_addr, "new address found");
ep.add_node_addr_with_source(node_addr, provenance).ok();
if let Some(tx) = on_first_tx.take() {
tx.send(Ok(())).ok();
ep.discovery_subscribers().send(r.clone());

if watchable.set(Some(r)).is_err() {
// abort if the watchable is disconnected
return Ok(());
}
// Send the discovery item to the subscribers of the discovery broadcast stream.
ep.discovery_subscribers().send(r);
}
Some(Err(err)) => {
warn!(?err, "discovery service produced error");
Expand All @@ -557,9 +562,7 @@ impl DiscoveryTask {
None => break,
}
}
if let Some(tx) = on_first_tx.take() {
tx.send(Err(NoResultsSnafu { node_id }.build())).ok();
}
Ok(())
}
}

Expand All @@ -580,30 +583,22 @@ pub struct Lagged {

#[derive(Clone, Debug)]
pub(super) struct DiscoverySubscribers {
inner: tokio::sync::broadcast::Sender<DiscoveryItem>,
inner: watcher::Watchable<Option<DiscoveryItem>>,
}

impl DiscoverySubscribers {
pub(crate) fn new() -> Self {
// TODO: Make capacity configurable from the endpoint builder?
// This is the maximum number of [`DiscoveryItem`]s held by the channel if
// subscribers are stalled.
const CAPACITY: usize = 128;
Self {
inner: tokio::sync::broadcast::Sender::new(CAPACITY),
inner: Default::default(),
}
}

pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryItem, Lagged>> {
use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream};
let recv = self.inner.subscribe();
BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n })
pub(crate) fn subscribe(&self) -> impl Stream<Item = Option<DiscoveryItem>> {
self.inner.watch().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this is a quite big behaviour change: previously you'd have a reasonable chance of seeing all discovered items, now you only get a single one. Especially the ConcurrentDiscovery this could be an issue if multiple discovery services find something at the same time you'll miss one address.

}

pub(crate) fn send(&self, item: DiscoveryItem) {
// `broadcast::Sender::send` returns an error if the channel has no subscribers,
// which we don't care about.
self.inner.send(item).ok();
self.inner.set(Some(item)).ok();
}
}

Expand All @@ -624,7 +619,7 @@ mod tests {
use tracing_test::traced_test;

use super::*;
use crate::{endpoint::ConnectOptions, watcher::Watcher as _, RelayMode};
use crate::{endpoint::ConnectOptions, RelayMode};

type InfoStore = HashMap<NodeId, (NodeData, u64)>;

Expand Down
Loading
Loading