Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

99 changes: 94 additions & 5 deletions iroh-willow/src/engine/actor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{sync::Arc, thread::JoinHandle};

use anyhow::Result;
use futures_lite::stream::Stream;
use futures_lite::{stream::Stream, StreamExt};
use iroh_base::key::NodeId;
use tokio::{
sync::{mpsc, oneshot},
Expand All @@ -16,14 +16,16 @@ use crate::{
net::ConnHandle,
proto::{
data_model::{AuthorisedEntry, Path, SubspaceId},
grouping::Range3d,
grouping::{Area, Range3d},
keys::{NamespaceId, NamespaceKind, UserId, UserSecretKey},
meadowcap::{self, AccessMode},
},
session::{intents::Intent, run_session, Error, EventSender, SessionHandle},
store::{
entry::EntryOrigin,
traits::{EntryReader, SecretStorage, Storage},
traits::{
EntryOrigin, EntryReader, EntryStorage, SecretStorage, Storage, StoreEvent,
SubscribeParams,
},
Store,
},
};
Expand Down Expand Up @@ -212,6 +214,42 @@ impl ActorHandle {
reply_rx.await?;
Ok(())
}

pub async fn subscribe_area(
&self,
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
) -> Result<()> {
self.send(Input::SubscribeArea {
namespace,
area,
params,
sender,
})
.await?;
Ok(())
}

pub async fn resume_subscription(
&self,
progress_id: u64,
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
) -> Result<()> {
self.send(Input::ResumeSubscription {
progress_id,
namespace,
area,
params,
sender,
})
.await?;
Ok(())
}
}

impl Drop for ActorHandle {
Expand Down Expand Up @@ -300,6 +338,19 @@ pub enum Input {
#[debug(skip)]
reply: Option<oneshot::Sender<()>>,
},
SubscribeArea {
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
},
ResumeSubscription {
progress_id: u64,
namespace: NamespaceId,
area: Area,
params: SubscribeParams,
sender: mpsc::Sender<StoreEvent>,
},
}

#[derive(Debug)]
Expand Down Expand Up @@ -436,7 +487,7 @@ impl<S: Storage> Actor<S> {
origin,
reply,
} => {
let res = self.store.entries().ingest(&authorised_entry, origin);
let res = self.store.entries().ingest_entry(&authorised_entry, origin);
send_reply(reply, res)
}
Input::InsertEntry { entry, auth, reply } => {
Expand Down Expand Up @@ -480,6 +531,44 @@ impl<S: Storage> Actor<S> {
let res = self.store.auth().resolve_interests(interests);
send_reply(reply, res.map_err(anyhow::Error::from))
}
Input::SubscribeArea {
namespace,
area,
params,
sender,
} => {
let store = self.store.clone();
self.tasks.spawn_local(async move {
let mut stream = store.entries().subscribe_area(namespace, area, params);
while let Some(event) = stream.next().await {
if sender.send(event).await.is_err() {
break;
}
}
});
Ok(())
}
Input::ResumeSubscription {
progress_id,
namespace,
area,
params,
sender,
} => {
let store = self.store.clone();
self.tasks.spawn_local(async move {
let mut stream =
store
.entries()
.resume_subscription(progress_id, namespace, area, params);
while let Some(event) = stream.next().await {
if sender.send(event).await.is_err() {
break;
}
}
});
Ok(())
}
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions iroh-willow/src/proto/grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub type AreaOfInterest = willow_data_model::grouping::AreaOfInterest<
>;

/// Extension methods for [`AreaOfInterest`].
// TODO: Upstream to willow-rs as methods on [`AreaOfInterest].
pub trait AreaOfInterestExt {
/// Creates a new area of interest with the specified area and no other limits.
fn with_area(area: Area) -> AreaOfInterest;
Expand All @@ -53,6 +54,7 @@ impl AreaOfInterestExt for AreaOfInterest {
}

/// Extension methods for [`Area`].
// TODO: Upstream to willow-rs as methods on [`Area`].
pub trait AreaExt {
/// Returns `true` if the area contains `point`.
fn includes_point(&self, point: &Point) -> bool;
Expand Down Expand Up @@ -93,6 +95,7 @@ impl AreaExt for Area {
/// A single point in the 3D range space.
///
/// I.e. an entry.
// TODO: Upstream to willow-rs.
#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)]
pub struct Point {
#[serde(with = "data_model::serde_encoding::path")]
Expand Down
2 changes: 1 addition & 1 deletion iroh-willow/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub(crate) type SessionId = u64;

/// To break symmetry, we refer to the peer that initiated the synchronisation session as Alfie,
/// and the other peer as Betty.
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
pub enum Role {
/// The peer that initiated the synchronisation session.
Alfie,
Expand Down
47 changes: 24 additions & 23 deletions iroh-willow/src/session/data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use futures_lite::StreamExt;
use tokio::sync::broadcast;

use crate::{
proto::{
Expand All @@ -8,8 +7,7 @@ use crate::{
},
session::{channels::ChannelSenders, static_tokens::StaticTokens, Error, SessionId},
store::{
entry::{EntryChannel, EntryOrigin},
traits::Storage,
traits::{EntryOrigin, EntryStorage, Storage, StoreEvent, SubscribeParams},
Store,
},
util::stream::CancelableReceiver,
Expand Down Expand Up @@ -51,29 +49,36 @@ impl<S: Storage> DataSender<S> {
}
}
pub async fn run(mut self) -> Result<(), Error> {
let mut entry_stream = self.store.entries().subscribe(self.session_id);
let mut entry_stream = futures_concurrency::stream::StreamGroup::new();
loop {
tokio::select! {
input = self.inbox.next() => {
let Some(input) = input else {
break;
};
let Input::AoiIntersection(intersection) = input;
self.store.entries().watch_area(
self.session_id,
intersection.namespace,
intersection.intersection.area.clone(),
);
let params = SubscribeParams::default().ingest_only().ignore_remote(self.session_id);
// TODO: We could start at the progress id at the beginning of the session.
let stream = self
.store
.entries()
.subscribe_area(
intersection.namespace,
intersection.intersection.area.clone(),
params,
)
.filter_map(|event| match event {
StoreEvent::Ingested(_id, entry, _origin) => Some(entry),
// We get only Ingested events because we set ingest_only() param above.
_ => unreachable!("expected only Ingested event but got another event"),
});
entry_stream.insert(stream);
},
entry = entry_stream.recv() => {
entry = entry_stream.next(), if !entry_stream.is_empty() => {
match entry {
Ok(entry) => self.send_entry(entry).await?,
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(_count)) => {
// TODO: Queue another reconciliation
}
Some(entry) => self.send_entry(entry).await?,
None => break,
}

}
}
}
Expand Down Expand Up @@ -149,13 +154,9 @@ impl<S: Storage> DataReceiver<S> {
message.dynamic_token,
)
.await?;
self.store.entries().ingest(
&authorised_entry,
EntryOrigin::Remote {
session: self.session_id,
channel: EntryChannel::Data,
},
)?;
self.store
.entries()
.ingest_entry(&authorised_entry, EntryOrigin::Remote(self.session_id))?;
let (entry, _token) = authorised_entry.into_parts();
// TODO: handle offset
self.current_payload.set(
Expand Down
10 changes: 3 additions & 7 deletions iroh-willow/src/session/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use crate::{
Error, Role, SessionId,
},
store::{
entry::{EntryChannel, EntryOrigin},
traits::{EntryReader, EntryStorage, SplitAction, SplitOpts, Storage},
traits::{EntryOrigin, EntryReader, EntryStorage, SplitAction, SplitOpts, Storage},
Store,
},
util::{
Expand Down Expand Up @@ -164,12 +163,9 @@ impl<S: Storage> Reconciler<S> {
authorised_entry.entry().payload_length(),
message.entry.available,
)?;
self.shared.store.entries().ingest(
self.shared.store.entries().ingest_entry(
&authorised_entry,
EntryOrigin::Remote {
session: self.shared.session_id,
channel: EntryChannel::Reconciliation,
},
EntryOrigin::Remote(self.shared.session_id),
)?;
}
ReconciliationMessage::SendPayload(message) => {
Expand Down
3 changes: 0 additions & 3 deletions iroh-willow/src/session/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,6 @@ pub(crate) async fn run_session<S: Storage>(
.try_join()
.await;

// Unsubscribe from the store.
store.entries().unsubscribe(&session_id);

// Track if we closed the session by triggering the cancel token, or if the remote peer closed
// the session by closing the control channel.
let we_cancelled = close_session_token.is_cancelled();
Expand Down
Loading
Loading