diff --git a/Cargo.lock b/Cargo.lock index bb92b6846a2..685e393d125 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2526,6 +2526,7 @@ dependencies = [ "serde_json", "strum 0.25.0", "tempfile", + "test-strategy 0.4.0", "testdir", "testresult", "thiserror", @@ -2772,7 +2773,7 @@ dependencies = [ "serde", "strum 0.25.0", "tempfile", - "test-strategy", + "test-strategy 0.3.1", "thiserror", "tokio", "tokio-stream", @@ -3053,7 +3054,7 @@ dependencies = [ "strum 0.26.3", "syncify", "tempfile", - "test-strategy", + "test-strategy 0.3.1", "thiserror", "tokio", "tokio-stream", @@ -5435,7 +5436,19 @@ checksum = "78ad9e09554f0456d67a69c1584c9798ba733a5b50349a6c0d0948710523922d" dependencies = [ "proc-macro2", "quote", - "structmeta-derive", + "structmeta-derive 0.2.0", + "syn 2.0.72", +] + +[[package]] +name = "structmeta" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e1575d8d40908d70f6fd05537266b90ae71b15dbbe7a8b7dffa2b759306d329" +dependencies = [ + "proc-macro2", + "quote", + "structmeta-derive 0.3.0", "syn 2.0.72", ] @@ -5450,6 +5463,17 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "structmeta-derive" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.72", +] + [[package]] name = "strum" version = "0.25.0" @@ -5683,7 +5707,19 @@ checksum = "b8361c808554228ad09bfed70f5c823caf8a3450b6881cc3a38eb57e8c08c1d9" dependencies = [ "proc-macro2", "quote", - "structmeta", + "structmeta 0.2.0", + "syn 2.0.72", +] + +[[package]] +name = "test-strategy" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf41af45e3f54cc184831d629d41d5b2bda8297e29c81add7ae4f362ed5e01b" +dependencies = [ + "proc-macro2", + "quote", + "structmeta 0.3.0", "syn 2.0.72", ] diff --git a/iroh-willow/src/engine/actor.rs b/iroh-willow/src/engine/actor.rs index c8a68307d27..b38e664dfc8 100644 --- a/iroh-willow/src/engine/actor.rs +++ b/iroh-willow/src/engine/actor.rs @@ -1,7 +1,7 @@ use std::{sync::Arc, thread::JoinHandle}; use anyhow::Result; -use futures_lite::stream::Stream; +use futures_lite::{stream::Stream, StreamExt}; use iroh_base::key::NodeId; use tokio::{ sync::{mpsc, oneshot}, @@ -16,14 +16,16 @@ use crate::{ net::ConnHandle, proto::{ data_model::{AuthorisedEntry, Path, SubspaceId}, - grouping::Range3d, + grouping::{Area, Range3d}, keys::{NamespaceId, NamespaceKind, UserId, UserSecretKey}, meadowcap::{self, AccessMode}, }, session::{intents::Intent, run_session, Error, EventSender, SessionHandle}, store::{ - entry::EntryOrigin, - traits::{EntryReader, SecretStorage, Storage}, + traits::{ + EntryOrigin, EntryReader, EntryStorage, SecretStorage, Storage, StoreEvent, + SubscribeParams, + }, Store, }, }; @@ -212,6 +214,42 @@ impl ActorHandle { reply_rx.await?; Ok(()) } + + pub async fn subscribe_area( + &self, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + sender: mpsc::Sender, + ) -> Result<()> { + self.send(Input::SubscribeArea { + namespace, + area, + params, + sender, + }) + .await?; + Ok(()) + } + + pub async fn resume_subscription( + &self, + progress_id: u64, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + sender: mpsc::Sender, + ) -> Result<()> { + self.send(Input::ResumeSubscription { + progress_id, + namespace, + area, + params, + sender, + }) + .await?; + Ok(()) + } } impl Drop for ActorHandle { @@ -300,6 +338,19 @@ pub enum Input { #[debug(skip)] reply: Option>, }, + SubscribeArea { + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + sender: mpsc::Sender, + }, + ResumeSubscription { + progress_id: u64, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + sender: mpsc::Sender, + }, } #[derive(Debug)] @@ -436,7 +487,7 @@ impl Actor { origin, reply, } => { - let res = self.store.entries().ingest(&authorised_entry, origin); + let res = self.store.entries().ingest_entry(&authorised_entry, origin); send_reply(reply, res) } Input::InsertEntry { entry, auth, reply } => { @@ -480,6 +531,44 @@ impl Actor { let res = self.store.auth().resolve_interests(interests); send_reply(reply, res.map_err(anyhow::Error::from)) } + Input::SubscribeArea { + namespace, + area, + params, + sender, + } => { + let store = self.store.clone(); + self.tasks.spawn_local(async move { + let mut stream = store.entries().subscribe_area(namespace, area, params); + while let Some(event) = stream.next().await { + if sender.send(event).await.is_err() { + break; + } + } + }); + Ok(()) + } + Input::ResumeSubscription { + progress_id, + namespace, + area, + params, + sender, + } => { + let store = self.store.clone(); + self.tasks.spawn_local(async move { + let mut stream = + store + .entries() + .resume_subscription(progress_id, namespace, area, params); + while let Some(event) = stream.next().await { + if sender.send(event).await.is_err() { + break; + } + } + }); + Ok(()) + } } } } diff --git a/iroh-willow/src/proto/grouping.rs b/iroh-willow/src/proto/grouping.rs index a6e3510c6e5..140acffa313 100644 --- a/iroh-willow/src/proto/grouping.rs +++ b/iroh-willow/src/proto/grouping.rs @@ -37,6 +37,7 @@ pub type AreaOfInterest = willow_data_model::grouping::AreaOfInterest< >; /// Extension methods for [`AreaOfInterest`]. +// TODO: Upstream to willow-rs as methods on [`AreaOfInterest]. pub trait AreaOfInterestExt { /// Creates a new area of interest with the specified area and no other limits. fn with_area(area: Area) -> AreaOfInterest; @@ -53,6 +54,7 @@ impl AreaOfInterestExt for AreaOfInterest { } /// Extension methods for [`Area`]. +// TODO: Upstream to willow-rs as methods on [`Area`]. pub trait AreaExt { /// Returns `true` if the area contains `point`. fn includes_point(&self, point: &Point) -> bool; @@ -93,6 +95,7 @@ impl AreaExt for Area { /// A single point in the 3D range space. /// /// I.e. an entry. +// TODO: Upstream to willow-rs. #[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] pub struct Point { #[serde(with = "data_model::serde_encoding::path")] diff --git a/iroh-willow/src/session.rs b/iroh-willow/src/session.rs index 2cf059e6c14..d12eae4f9a3 100644 --- a/iroh-willow/src/session.rs +++ b/iroh-willow/src/session.rs @@ -42,7 +42,7 @@ pub(crate) type SessionId = u64; /// To break symmetry, we refer to the peer that initiated the synchronisation session as Alfie, /// and the other peer as Betty. -#[derive(Debug, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)] pub enum Role { /// The peer that initiated the synchronisation session. Alfie, diff --git a/iroh-willow/src/session/data.rs b/iroh-willow/src/session/data.rs index dc525688953..b58b2dfece6 100644 --- a/iroh-willow/src/session/data.rs +++ b/iroh-willow/src/session/data.rs @@ -1,5 +1,4 @@ use futures_lite::StreamExt; -use tokio::sync::broadcast; use crate::{ proto::{ @@ -8,8 +7,7 @@ use crate::{ }, session::{channels::ChannelSenders, static_tokens::StaticTokens, Error, SessionId}, store::{ - entry::{EntryChannel, EntryOrigin}, - traits::Storage, + traits::{EntryOrigin, EntryStorage, Storage, StoreEvent, SubscribeParams}, Store, }, util::stream::CancelableReceiver, @@ -51,7 +49,7 @@ impl DataSender { } } pub async fn run(mut self) -> Result<(), Error> { - let mut entry_stream = self.store.entries().subscribe(self.session_id); + let mut entry_stream = futures_concurrency::stream::StreamGroup::new(); loop { tokio::select! { input = self.inbox.next() => { @@ -59,21 +57,28 @@ impl DataSender { break; }; let Input::AoiIntersection(intersection) = input; - self.store.entries().watch_area( - self.session_id, - intersection.namespace, - intersection.intersection.area.clone(), - ); + let params = SubscribeParams::default().ingest_only().ignore_remote(self.session_id); + // TODO: We could start at the progress id at the beginning of the session. + let stream = self + .store + .entries() + .subscribe_area( + intersection.namespace, + intersection.intersection.area.clone(), + params, + ) + .filter_map(|event| match event { + StoreEvent::Ingested(_id, entry, _origin) => Some(entry), + // We get only Ingested events because we set ingest_only() param above. + _ => unreachable!("expected only Ingested event but got another event"), + }); + entry_stream.insert(stream); }, - entry = entry_stream.recv() => { + entry = entry_stream.next(), if !entry_stream.is_empty() => { match entry { - Ok(entry) => self.send_entry(entry).await?, - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_count)) => { - // TODO: Queue another reconciliation - } + Some(entry) => self.send_entry(entry).await?, + None => break, } - } } } @@ -149,13 +154,9 @@ impl DataReceiver { message.dynamic_token, ) .await?; - self.store.entries().ingest( - &authorised_entry, - EntryOrigin::Remote { - session: self.session_id, - channel: EntryChannel::Data, - }, - )?; + self.store + .entries() + .ingest_entry(&authorised_entry, EntryOrigin::Remote(self.session_id))?; let (entry, _token) = authorised_entry.into_parts(); // TODO: handle offset self.current_payload.set( diff --git a/iroh-willow/src/session/reconciler.rs b/iroh-willow/src/session/reconciler.rs index edd4bf63796..c0ffc994f7b 100644 --- a/iroh-willow/src/session/reconciler.rs +++ b/iroh-willow/src/session/reconciler.rs @@ -29,8 +29,7 @@ use crate::{ Error, Role, SessionId, }, store::{ - entry::{EntryChannel, EntryOrigin}, - traits::{EntryReader, EntryStorage, SplitAction, SplitOpts, Storage}, + traits::{EntryOrigin, EntryReader, EntryStorage, SplitAction, SplitOpts, Storage}, Store, }, util::{ @@ -164,12 +163,9 @@ impl Reconciler { authorised_entry.entry().payload_length(), message.entry.available, )?; - self.shared.store.entries().ingest( + self.shared.store.entries().ingest_entry( &authorised_entry, - EntryOrigin::Remote { - session: self.shared.session_id, - channel: EntryChannel::Reconciliation, - }, + EntryOrigin::Remote(self.shared.session_id), )?; } ReconciliationMessage::SendPayload(message) => { diff --git a/iroh-willow/src/session/run.rs b/iroh-willow/src/session/run.rs index e5324e3f572..ad452285820 100644 --- a/iroh-willow/src/session/run.rs +++ b/iroh-willow/src/session/run.rs @@ -386,9 +386,6 @@ pub(crate) async fn run_session( .try_join() .await; - // Unsubscribe from the store. - store.entries().unsubscribe(&session_id); - // Track if we closed the session by triggering the cancel token, or if the remote peer closed // the session by closing the control channel. let we_cancelled = close_session_token.is_cancelled(); diff --git a/iroh-willow/src/store.rs b/iroh-willow/src/store.rs index ee8efdda41f..e729fd6e825 100644 --- a/iroh-willow/src/store.rs +++ b/iroh-willow/src/store.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, Context, Result}; use rand_core::CryptoRngCore; +use traits::EntryStorage; use crate::{ form::{AuthForm, EntryForm, EntryOrForm, SubspaceForm, TimestampForm}, @@ -22,42 +23,40 @@ use crate::{ use self::auth::{Auth, AuthError}; use self::traits::Storage; -pub(crate) use self::entry::{EntryOrigin, WatchableEntryStore}; +pub(crate) use self::traits::EntryOrigin; pub(crate) mod auth; -pub(crate) mod entry; pub mod memory; pub mod traits; /// Storage for the Willow engine. +/// +/// Wraps a `Storage` instance and adds the [`Auth`] struct that uses the secret and caps storage to provide +/// authentication when inserting entries. #[derive(Debug, Clone)] pub(crate) struct Store { - entries: WatchableEntryStore, - secrets: S::Secrets, - payloads: S::Payloads, + storage: S, auth: Auth, } impl Store { pub fn new(storage: S) -> Self { Self { - entries: WatchableEntryStore::new(storage.entries().clone()), - secrets: storage.secrets().clone(), - payloads: storage.payloads().clone(), auth: Auth::new(storage.secrets().clone(), storage.caps().clone()), + storage, } } - pub fn entries(&self) -> &WatchableEntryStore { - &self.entries + pub fn entries(&self) -> &S::Entries { + self.storage.entries() } pub fn secrets(&self) -> &S::Secrets { - &self.secrets + self.storage.secrets() } pub fn payloads(&self) -> &S::Payloads { - &self.payloads + self.storage.payloads() } pub fn auth(&self) -> &Auth { @@ -97,7 +96,7 @@ impl Store { let authorised_entry = AuthorisedEntry::new_unchecked(entry, token); let inserted = self .entries() - .ingest(&authorised_entry, EntryOrigin::Local)?; + .ingest_entry(&authorised_entry, EntryOrigin::Local)?; Ok((authorised_entry, inserted)) } @@ -118,7 +117,7 @@ impl Store { /// the provided [`Store`]. /// /// `user_id` must be set to the user who is authenticating the entry. - pub async fn form_to_entry( + async fn form_to_entry( &self, form: EntryForm, user_id: UserId, // auth: AuthForm, diff --git a/iroh-willow/src/store/entry.rs b/iroh-willow/src/store/entry.rs deleted file mode 100644 index 499066f1ee2..00000000000 --- a/iroh-willow/src/store/entry.rs +++ /dev/null @@ -1,179 +0,0 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; -use tokio::sync::broadcast; - -use crate::proto::{ - data_model::{AuthorisedEntry, NamespaceId}, - grouping::Area, -}; - -pub type SessionId = u64; - -use super::traits::EntryStorage; - -const BROADCAST_CAP: usize = 1024; - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub enum EntryOrigin { - /// The entry is inserted locally. - Local, - /// The entry was received from a peer. - Remote { - session: SessionId, - channel: EntryChannel, - }, // TODO: Add details. - // Remote { - // peer: NodeId, - // channel: EntryChannel, - // }, -} - -impl EntryOrigin { - // pub fn peer(&self) -> Option { - // match self { - // EntryOrigin::Local => None, - // EntryOrigin::Remote { peer, .. } => Some(peer) - // } - // } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -pub enum EntryChannel { - Reconciliation, - Data, -} - -#[derive(Debug, Clone)] -pub struct WatchableEntryStore { - storage: ES, - broadcast: Arc>, -} - -impl WatchableEntryStore { - pub(super) fn new(storage: ES) -> Self { - Self { - storage, - broadcast: Default::default(), - } - } - - // /// Returns a store reader. - // pub fn reader(&self) -> ES::Reader { - // self.storage.reader() - // } - - /// Returns a store snapshot. - pub fn snapshot(&self) -> anyhow::Result { - self.storage.snapshot() - } - - /// Returns a store reader. - pub fn reader(&self) -> ES::Reader { - self.storage.reader() - } - - /// Ingest a new entry. - /// - /// Returns `true` if the entry was stored, and `false` if the entry already exists or is - /// obsoleted by an existing entry. - pub fn ingest(&self, entry: &AuthorisedEntry, origin: EntryOrigin) -> anyhow::Result { - if self.storage.ingest_entry(entry)? { - self.broadcast.lock().unwrap().broadcast(entry, origin); - Ok(true) - } else { - Ok(false) - } - } - - /// Setup a new subscription, identified by `session_id`. - /// - /// The subscription will initially be empty. To actually receive newly ingested entries, - /// add areas to watch with [`Self::watch_area`]. - /// - /// Returns a [`broadcast::Receiver`]. - pub fn subscribe(&self, session_id: SessionId) -> broadcast::Receiver { - self.broadcast - .lock() - .unwrap() - .subscribe(session_id, BROADCAST_CAP) - } - - /// Remove a subscription. - pub fn unsubscribe(&self, session_id: &SessionId) { - self.broadcast.lock().unwrap().unsubscribe(session_id) - } - - /// Add an area to the list of watched areas for a subscription. - /// - /// The subscription has to be setup with [`Self::subscribe`] to actually receive new entries - /// that fall within the area. - pub fn watch_area(&self, session: SessionId, namespace: NamespaceId, area: Area) { - self.broadcast - .lock() - .unwrap() - .watch_area(session, namespace, area); - } -} - -#[derive(Debug, Default)] -struct Broadcaster { - senders: HashMap>, - watched_areas: HashMap>>, -} - -impl Broadcaster { - fn subscribe( - &mut self, - session: SessionId, - cap: usize, - ) -> broadcast::Receiver { - self.senders - .entry(session) - .or_insert_with(|| broadcast::Sender::new(cap)) - .subscribe() - } - - fn unsubscribe(&mut self, session: &SessionId) { - self.senders.remove(session); - self.watched_areas.retain(|_namespace, sessions| { - sessions.remove(session); - !sessions.is_empty() - }); - } - - fn watch_area(&mut self, session: SessionId, namespace: NamespaceId, area: Area) { - self.watched_areas - .entry(namespace) - .or_default() - .entry(session) - .or_default() - .push(area) - } - - fn broadcast(&mut self, entry: &AuthorisedEntry, origin: EntryOrigin) { - let Some(sessions) = self.watched_areas.get_mut(entry.entry().namespace_id()) else { - return; - }; - let mut dropped_receivers = vec![]; - for (session_id, areas) in sessions { - // Do not broadcast back into sessions where the entry came from. - if matches!(origin, EntryOrigin::Remote { session, ..} if session == *session_id) { - continue; - } - // Check if the session is watching an area where the entry falls into. - if areas.iter().any(|area| area.includes_entry(entry.entry())) { - if let Some(sender) = self.senders.get(session_id) { - // Send the entry and mark senders with dropped receivers for removal. - if let Err(_err) = sender.send(entry.clone()) { - dropped_receivers.push(*session_id); - } - } - } - } - for session_id in dropped_receivers { - self.unsubscribe(&session_id); - } - } -} diff --git a/iroh-willow/src/store/memory.rs b/iroh-willow/src/store/memory.rs index 4ccc46a8afa..3b17561050f 100644 --- a/iroh-willow/src/store/memory.rs +++ b/iroh-willow/src/store/memory.rs @@ -6,11 +6,15 @@ //! hopefully easily kept correct. use std::cell::RefCell; -use std::collections::HashMap; -use std::rc::Rc; +use std::collections::{HashMap, VecDeque}; +use std::pin::Pin; +use std::rc::{Rc, Weak}; +use std::task::{ready, Context, Poll, Waker}; use anyhow::Result; +use futures_util::Stream; +use crate::proto::grouping::Area; use crate::{ interest::{CapSelector, CapabilityPack}, proto::{ @@ -23,6 +27,9 @@ use crate::{ store::traits::{self, RangeSplit, SplitAction, SplitOpts}, }; +use super::traits::{StoreEvent, SubscribeParams}; +use super::EntryOrigin; + #[derive(Debug, Clone, Default)] pub struct Store { secrets: Rc>, @@ -96,7 +103,13 @@ impl traits::SecretStorage for Rc> { #[derive(Debug, Default)] pub struct EntryStore { - entries: HashMap>, + stores: HashMap, +} + +#[derive(Debug, Default)] +pub struct NamespaceStore { + entries: Vec, + events: EventQueue, } // impl + 'static> ReadonlyStore for T { @@ -191,8 +204,9 @@ impl traits::EntryReader for Rc> { range: &Range3d, ) -> impl Iterator> + 'a { let slf = self.borrow(); - slf.entries + slf.stores .get(&namespace) + .map(|s| &s.entries) .into_iter() .flatten() .filter(|entry| range.includes_entry(entry.entry())) @@ -208,10 +222,11 @@ impl traits::EntryReader for Rc> { path: &Path, ) -> Result> { let inner = self.borrow(); - let Some(entries) = inner.entries.get(&namespace) else { + let Some(entries) = inner.stores.get(&namespace) else { return Ok(None); }; Ok(entries + .entries .iter() .find(|e| { let e = e.entry(); @@ -221,27 +236,15 @@ impl traits::EntryReader for Rc> { } } -impl traits::EntryStorage for Rc> { - type Snapshot = Self; - type Reader = Self; - - fn reader(&self) -> Self::Reader { - self.clone() - } - - fn snapshot(&self) -> Result { - let entries = self.borrow().entries.clone(); - Ok(Rc::new(RefCell::new(EntryStore { entries }))) - } - - fn ingest_entry(&self, entry: &AuthorisedEntry) -> Result { - let mut slf = self.borrow_mut(); - let entries = slf - .entries +impl EntryStore { + fn ingest_entry(&mut self, entry: &AuthorisedEntry, origin: EntryOrigin) -> Result { + let store = self + .stores .entry(*entry.entry().namespace_id()) .or_default(); + let entries = &mut store.entries; let new = entry.entry(); - let mut to_remove = vec![]; + let mut to_prune = vec![]; for (i, existing) in entries.iter().enumerate() { let existing = existing.entry(); if existing == new { @@ -258,17 +261,217 @@ impl traits::EntryStorage for Rc> { && new.path().is_prefix_of(existing.path()) && new.is_newer_than(existing) { - to_remove.push(i); + to_prune.push(i); } } - for i in to_remove { - entries.remove(i); + for i in to_prune { + let pruned = entries.remove(i); + store.events.insert(move |id| { + StoreEvent::Pruned( + id, + traits::PruneEvent { + pruned, + by: entry.clone(), + }, + ) + }); } entries.push(entry.clone()); + store + .events + .insert(|id| StoreEvent::Ingested(id, entry.clone(), origin)); Ok(true) } } +impl traits::EntryStorage for Rc> { + type Snapshot = Self; + type Reader = Self; + + fn reader(&self) -> Self::Reader { + self.clone() + } + + fn snapshot(&self) -> Result { + // This is quite ugly. But this is a quick memory impl only. + // But we should really maybe strive to not expose snapshots. + let stores = self + .borrow() + .stores + .iter() + .map(|(key, value)| { + ( + *key, + NamespaceStore { + entries: value.entries.clone(), + events: Default::default(), + }, + ) + }) + .collect(); + Ok(Rc::new(RefCell::new(EntryStore { stores }))) + } + + fn ingest_entry(&self, entry: &AuthorisedEntry, origin: EntryOrigin) -> Result { + let mut slf = self.borrow_mut(); + slf.ingest_entry(entry, origin) + } + + fn subscribe_area( + &self, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + ) -> impl Stream + Unpin + 'static { + let progress_id = self + .borrow_mut() + .stores + .entry(namespace) + .or_default() + .events + .next_progress_id(); + EventStream { + area, + params, + namespace, + progress_id, + store: Rc::downgrade(self), + } + } + + fn resume_subscription( + &self, + progress_id: u64, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + ) -> impl Stream + Unpin + 'static { + EventStream { + area, + params, + progress_id, + namespace, + store: Rc::downgrade(self), + } + } +} + +/// Stream of events from a store subscription. +/// +/// We have weak pointer to the entry store and thus the EventQueue. +/// Once the store is dropped, the EventQueue wakes all streams a last time in its drop impl, +/// which then makes the stream return none because Weak::upgrade returns None. +#[derive(Debug)] +struct EventStream { + progress_id: u64, + store: Weak>, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, +} + +impl Stream for EventStream { + type Item = StoreEvent; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(inner) = self.store.upgrade() else { + return Poll::Ready(None); + }; + let mut inner_mut = inner.borrow_mut(); + let store = inner_mut.stores.entry(self.namespace).or_default(); + let res = ready!(store.events.poll_next( + self.progress_id, + |e| e.matches(self.namespace, &self.area, &self.params), + cx, + )); + drop(inner_mut); + drop(inner); + Poll::Ready(match res { + None => None, + Some((next_id, event)) => { + self.progress_id = next_id; + Some(event) + } + }) + } +} + +/// A simple in-memory event queue. +/// +/// Events can be pushed, and get a unique monotonically-increasing *progress id*. +/// Events can be polled, with a progress id to start at, and an optional filter function. +/// +/// Current in-memory impl keeps all events, forever. +// TODO: Add max_len constructor, add a way to truncate old entries. +// TODO: This would be quite a bit more efficient if we filtered the waker with a closure +// that is set from the last poll, to not wake everyone for each new event. +#[derive(Debug)] +struct EventQueue { + events: VecDeque, + offset: u64, + wakers: VecDeque, +} + +impl Drop for EventQueue { + fn drop(&mut self) { + for waker in self.wakers.drain(..) { + waker.wake() + } + } +} + +impl Default for EventQueue { + fn default() -> Self { + Self { + events: Default::default(), + offset: 0, + wakers: Default::default(), + } + } +} + +impl EventQueue { + fn insert(&mut self, f: impl FnOnce(u64) -> T) { + let progress_id = self.next_progress_id(); + let event = f(progress_id); + self.events.push_back(event); + for waker in self.wakers.drain(..) { + waker.wake() + } + } + + fn next_progress_id(&self) -> u64 { + self.offset + self.events.len() as u64 + } + + fn get(&self, progress_id: u64) -> Option<&T> { + let index = progress_id.checked_sub(self.offset)?; + self.events.get(index as usize) + } + + fn poll_next( + &mut self, + progress_id: u64, + filter: impl Fn(&T) -> bool, + cx: &mut Context<'_>, + ) -> Poll> { + if progress_id < self.offset { + return Poll::Ready(None); + } + let mut i = progress_id; + loop { + if let Some(event) = self.get(i) { + i += 1; + if filter(event) { + break Poll::Ready(Some((i, event.clone()))); + } + } else { + self.wakers.push_back(cx.waker().clone()); + break Poll::Pending; + } + } + } +} + #[derive(Debug, Default)] pub struct CapsStore { write_caps: HashMap>, diff --git a/iroh-willow/src/store/traits.rs b/iroh-willow/src/store/traits.rs index ceb13dd6713..ebcea1a1b7d 100644 --- a/iroh-willow/src/store/traits.rs +++ b/iroh-willow/src/store/traits.rs @@ -3,12 +3,16 @@ use std::fmt::Debug; use anyhow::Result; +use futures_lite::Stream; +use serde::{Deserialize, Serialize}; use crate::{ interest::{CapSelector, CapabilityPack}, proto::{ - data_model::{AuthorisedEntry, Entry, NamespaceId, Path, SubspaceId, WriteCapability}, - grouping::Range3d, + data_model::{ + self, AuthorisedEntry, Entry, NamespaceId, Path, SubspaceId, WriteCapability, + }, + grouping::{Area, Range3d}, keys::{NamespaceSecretKey, NamespaceSignature, UserId, UserSecretKey, UserSignature}, meadowcap::{self, ReadAuthorisation}, wgps::Fingerprint, @@ -82,7 +86,33 @@ pub trait EntryStorage: EntryReader + Clone + Debug + 'static { fn reader(&self) -> Self::Reader; fn snapshot(&self) -> Result; - fn ingest_entry(&self, entry: &AuthorisedEntry) -> Result; + + /// Ingest a new entry. + /// + /// Returns `true` if the entry was ingested, and `false` if the entry was not ingested because a newer entry exists. + fn ingest_entry(&self, entry: &AuthorisedEntry, origin: EntryOrigin) -> Result; + + /// Subscribe to events concerning entries [included](https://willowprotocol.org/specs/grouping-entries/index.html#area_include) + /// by an [`AreaOfInterest`], returning a producer of `StoreEvent`s which occurred since the moment of calling this function. + /// + /// If `ignore_incomplete_payloads` is `true`, the producer will not produce entries with incomplete corresponding payloads. + /// If `ignore_empty_payloads` is `true`, the producer will not produce entries with a `payload_length` of `0`. + fn subscribe_area( + &self, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + ) -> impl Stream + Unpin + 'static; + + /// Attempt to resume a subscription using a *progress ID* obtained from a previous subscription, or return an error + /// if this store implementation is unable to resume the subscription. + fn resume_subscription( + &self, + progress_id: u64, + namespace: NamespaceId, + area: Area, + params: SubscribeParams, + ) -> impl Stream + Unpin + 'static; } /// Read-only interface to [`EntryStorage`]. @@ -179,3 +209,130 @@ pub trait CapsStorage: Debug + Clone { fn get_read_cap(&self, selector: &CapSelector) -> Result>; } + +/// An event which took place within a [`EntryStorage`]. +/// Each event includes a *progress ID* which can be used to *resume* a subscription at any point in the future. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum StoreEvent { + /// A new entry was ingested. + Ingested( + u64, + #[serde(with = "data_model::serde_encoding::authorised_entry")] AuthorisedEntry, + EntryOrigin, + ), + // PayloadForgotten(u64, PD), + /// An entry was pruned via prefix pruning. + Pruned(u64, PruneEvent), + // /// An existing entry received a portion of its corresponding payload. + // Appended(u64, LengthyAuthorisedEntry), + // /// An entry was forgotten. + // EntryForgotten(u64, (S, Path)), + // /// A payload was forgotten. +} + +impl StoreEvent { + pub fn progress_id(&self) -> u64 { + match self { + StoreEvent::Ingested(id, _, _) => *id, + StoreEvent::Pruned(id, _) => *id, + } + } +} + +impl StoreEvent { + /// Returns `true` if the event is included in the `area` and not skipped by `ignore_params`. + pub fn matches( + &self, + namespace_id: NamespaceId, + area: &Area, + params: &SubscribeParams, + ) -> bool { + match self { + StoreEvent::Ingested(_, entry, origin) => { + *entry.entry().namespace_id() == namespace_id + && area.includes_entry(entry.entry()) + && params.includes_entry(entry.entry()) + && params.includes_origin(origin) + } + StoreEvent::Pruned(_, PruneEvent { pruned, by: _ }) => { + !params.ingest_only + && *pruned.entry().namespace_id() == namespace_id + && area.includes_entry(pruned.entry()) + } + } + } +} + +/// Describes an [`AuthorisedEntry`] which was pruned and the [`AuthorisedEntry`] which triggered the pruning. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct PruneEvent { + #[serde(with = "data_model::serde_encoding::authorised_entry")] + pub pruned: AuthorisedEntry, + /// The entry which triggered the pruning. + #[serde(with = "data_model::serde_encoding::authorised_entry")] + pub by: AuthorisedEntry, +} + +/// The origin of an entry ingestion event. +#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)] +pub enum EntryOrigin { + /// The entry was probably created on this machine. + Local, + /// The entry was sourced from another device, e.g. a networked sync session. + Remote(u64), +} + +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EntryChannel { + Reconciliation, + Data, +} + +/// Describes which entries to ignore during a query. +#[derive(Debug, Default, Serialize, Deserialize)] +pub struct SubscribeParams { + /// Omit entries whose payload is the empty string. + pub ignore_empty_payloads: bool, + /// Omit entries whose origin is this remote. + pub ignore_remote: Option, + /// Only emit ingestion events. + pub ingest_only: bool, + // TODO: ignore_incomplete_payloads is harder to support for us because we need to query the blob store each time currently. + // /// Omit entries with locally incomplete corresponding payloads. + // pub ignore_incomplete_payloads: bool, +} + +impl SubscribeParams { + // pub fn ignore_incomplete_payloads(&mut self) { + // self.ignore_incomplete_payloads = true; + // } + + pub fn ignore_empty_payloads(mut self) -> Self { + self.ignore_empty_payloads = true; + self + } + + pub fn ignore_remote(mut self, remote: u64) -> Self { + self.ignore_remote = Some(remote); + self + } + + pub fn ingest_only(mut self) -> Self { + self.ingest_only = true; + self + } + + pub fn includes_entry(&self, entry: &Entry) -> bool { + !(self.ignore_empty_payloads && entry.payload_length() == 0) + } + + pub fn includes_origin(&self, origin: &EntryOrigin) -> bool { + match &self.ignore_remote { + None => true, + Some(ignored_session) => match origin { + EntryOrigin::Local => true, + EntryOrigin::Remote(session) => session != ignored_session, + }, + } + } +} diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index 9801b885a06..c70b8afd9eb 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -81,6 +81,7 @@ proptest = "1.2.0" rand_chacha = "0.3.1" regex = { version = "1.7.1", features = ["std"] } serde_json = "1.0.107" +test-strategy = "0.4.0" testdir = "0.9.1" testresult = "0.4.0" tokio = { version = "1", features = ["macros", "io-util", "rt"] } diff --git a/iroh/src/client/spaces.rs b/iroh/src/client/spaces.rs index 65be8ee8522..e585cfa7fe2 100644 --- a/iroh/src/client/spaces.rs +++ b/iroh/src/client/spaces.rs @@ -36,8 +36,9 @@ use iroh_willow::{ }, session::{ intents::{serde_encoding::Event, Completion, IntentUpdate}, - SessionInit, + SessionInit, SessionMode, }, + store::traits::{StoreEvent, SubscribeParams}, }; use ref_cast::RefCast; use serde::{Deserialize, Serialize}; @@ -98,7 +99,11 @@ impl Client { } /// Import a ticket and start to synchronize. - pub async fn import_and_sync(&self, ticket: SpaceTicket) -> Result<(Space, SyncHandleSet)> { + pub async fn import_and_sync( + &self, + ticket: SpaceTicket, + mode: SessionMode, + ) -> Result<(Space, SyncHandleSet)> { if ticket.caps.is_empty() { anyhow::bail!("Invalid ticket: Does not include any capabilities"); } @@ -110,7 +115,7 @@ impl Client { self.import_caps(ticket.caps).await?; let interests = Interests::builder().add_full_cap(CapSelector::any(namespace)); - let init = SessionInit::reconcile_once(interests); + let init = SessionInit::new(interests, mode); let mut intents = SyncHandleSet::default(); for addr in ticket.nodes { let node_id = addr.node_id; @@ -353,14 +358,39 @@ impl Space { }) } - /// TODO - pub fn subscribe(&self, _area: Area) { - todo!() + /// Subscribe to events concerning entries included by an `Area`. + pub async fn subscribe_area( + &self, + area: Area, + params: SubscribeParams, + ) -> Result>> { + let req = SubscribeRequest { + namespace: self.namespace_id, + area, + params, + initial_progress_id: None, + }; + let stream = self.rpc.try_server_streaming(req).await?; + let stream = stream.map(|item| item.map_err(anyhow::Error::from)); + Ok(stream) } - /// TODO - pub fn subscribe_offset(&self, _area: Area, _offset: u64) { - todo!() + /// Resume a subscription using a progress ID obtained from a previous subscription. + pub async fn resume_subscription( + &self, + progress_id: u64, + area: Area, + params: SubscribeParams, + ) -> Result>> { + let req = SubscribeRequest { + namespace: self.namespace_id, + area, + params, + initial_progress_id: Some(progress_id), + }; + let stream = self.rpc.try_server_streaming(req).await?; + let stream = stream.map(|item| item.map_err(anyhow::Error::from)); + Ok(stream) } } diff --git a/iroh/src/node/rpc/spaces.rs b/iroh/src/node/rpc/spaces.rs index eb164c358ee..3f8de9d8adc 100644 --- a/iroh/src/node/rpc/spaces.rs +++ b/iroh/src/node/rpc/spaces.rs @@ -155,6 +155,32 @@ impl Handler { .await } SyncWithPeerUpdate(_) => Err(RpcServerError::UnexpectedStartMessage), + Subscribe(msg) => { + chan.try_server_streaming(msg, self, |handler, req| async move { + let (tx, rx) = mpsc::channel(1024); + if let Some(progress_id) = req.initial_progress_id { + handler + .spaces()? + .resume_subscription( + progress_id, + req.namespace, + req.area, + req.params, + tx, + ) + .await + .map_err(map_err)?; + } else { + handler + .spaces()? + .subscribe_area(req.namespace, req.area, req.params, tx) + .await + .map_err(map_err)?; + } + Ok(ReceiverStream::new(rx).map(Ok)) + }) + .await + } } } } diff --git a/iroh/src/rpc_protocol/spaces.rs b/iroh/src/rpc_protocol/spaces.rs index 067c44399e6..22aaffa5a29 100644 --- a/iroh/src/rpc_protocol/spaces.rs +++ b/iroh/src/rpc_protocol/spaces.rs @@ -9,7 +9,7 @@ use iroh_willow::{ self, serde_encoding::SerdeAuthorisedEntry, AuthorisedEntry, Entry, NamespaceId, Path, SubspaceId, }, - grouping::{self, Range3d}, + grouping::{self, Area, Range3d}, keys::{NamespaceKind, UserId}, meadowcap::{self, AccessMode, SecretKey}, }, @@ -17,6 +17,7 @@ use iroh_willow::{ intents::{serde_encoding::Event, IntentUpdate}, SessionInit, }, + store::traits::{StoreEvent, SubscribeParams}, }; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; @@ -47,11 +48,11 @@ pub enum Request { DelegateCaps(DelegateCapsRequest), #[rpc(response = RpcResult)] ImportCaps(ImportCapsRequest), - // #[rpc(response = RpcResult)] - // ResolveInterests(ResolveInterestsRequest), #[bidi_streaming(update = SyncWithPeerUpdate, response = RpcResult)] SyncWithPeer(SyncWithPeerRequest), SyncWithPeerUpdate(SyncWithPeerUpdate), + #[try_server_streaming(create_error = RpcError, item_error = RpcError, item = StoreEvent)] + Subscribe(SubscribeRequest), } #[allow(missing_docs)] @@ -67,8 +68,8 @@ pub enum Response { CreateUser(RpcResult), DelegateCaps(RpcResult), ImportCaps(RpcResult), - // ResolveInterests(RpcResult), SyncWithPeer(RpcResult), + Subscribe(RpcResult), } #[derive(Debug, Serialize, Deserialize)] @@ -139,9 +140,7 @@ pub struct GetEntryRequest { } #[derive(Debug, Serialize, Deserialize)] -pub struct GetEntryResponse( - pub Option, // #[serde(with = "data_model::serde_encoding::authorised_entry")] pub AuthorisedEntry, -); +pub struct GetEntryResponse(pub Option); #[derive(Debug, Serialize, Deserialize)] pub struct CreateNamespaceRequest { @@ -192,6 +191,15 @@ pub enum SyncWithPeerResponse { Event(Event), } +#[derive(Debug, Serialize, Deserialize)] +pub struct SubscribeRequest { + pub namespace: NamespaceId, + #[serde(with = "grouping::serde_encoding::area")] + pub area: Area, + pub params: SubscribeParams, + pub initial_progress_id: Option, +} + /// Either a complete [`Entry`] or a [`FullEntryForm`]. #[derive(Debug, Serialize, Deserialize)] pub enum EntryOrForm { diff --git a/iroh/tests/spaces.proptest-regressions b/iroh/tests/spaces.proptest-regressions new file mode 100644 index 00000000000..dc542b3393f --- /dev/null +++ b/iroh/tests/spaces.proptest-regressions @@ -0,0 +1,16 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc b247c5db7888ec8f993852033ea7d612f5a7cc5e51d6dc80cbbf0b370f1bf9df # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Alfie, [])] } +cc 10758efcbd4145b23bb48a35a5a93b13f42cc71457b18e8b2f521fb66537e94e # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Betty, [Write("alpha", "gamma"), Write("gamma", "beta"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("alpha", "beta"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("gamma", "beta"), Write("beta", "beta"), Write("beta", "beta"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("beta", "beta")]), (Betty, [Write("gamma", "gamma")]), (Alfie, [Write("gamma", "gamma"), Write("alpha", "gamma"), Write("beta", "beta"), Write("alpha", "gamma"), Write("beta", "beta"), Write("beta", "alpha")]), (Alfie, [Write("beta", "gamma")]), (Betty, [Write("beta", "alpha"), Write("alpha", "alpha")]), (Alfie, [Write("alpha", "alpha"), Write("beta", "beta"), Write("gamma", "alpha"), Write("alpha", "alpha"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("beta", "beta"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("beta", "beta")]), (Betty, [Write("gamma", "beta"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("alpha", "beta")]), (Alfie, [Write("beta", "alpha"), Write("beta", "gamma"), Write("alpha", "beta"), Write("alpha", "alpha"), Write("gamma", "alpha"), Write("beta", "beta")]), (Alfie, [Write("gamma", "beta"), Write("beta", "gamma")]), (Betty, [Write("alpha", "alpha"), Write("gamma", "gamma"), Write("alpha", "beta"), Write("gamma", "beta")]), (Betty, [Write("beta", "alpha"), Write("alpha", "gamma"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("gamma", "gamma"), Write("beta", "beta"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "gamma")]), (Alfie, [Write("gamma", "beta")]), (Betty, [Write("alpha", "alpha"), Write("beta", "alpha"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("beta", "beta"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("beta", "beta"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "beta"), Write("beta", "gamma"), Write("gamma", "alpha"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("alpha", "alpha")]), (Betty, [Write("gamma", "beta"), Write("gamma", "gamma"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("beta", "beta"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("alpha", "beta")])] } +cc bad55ca9718ab95bc85e0ee4581fcf9ca019f10ae8cd8b1c30acd2ab7fd03a7f # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Betty, [Write("alpha", "gamma"), Write("gamma", "beta"), Write("alpha", "gamma"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("gamma", "gamma"), Write("gamma", "beta")]), (Betty, [Write("gamma", "gamma"), Write("beta", "gamma"), Write("gamma", "beta"), Write("gamma", "beta"), Write("gamma", "beta"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("gamma", "gamma")]), (Betty, [Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("alpha", "beta")]), (Alfie, [Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "beta"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "beta"), Write("alpha", "alpha"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("alpha", "gamma"), Write("gamma", "beta"), Write("alpha", "gamma"), Write("beta", "beta")]), (Alfie, [Write("beta", "beta"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("alpha", "beta")]), (Alfie, [Write("beta", "alpha"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("beta", "beta"), Write("beta", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("alpha", "gamma")]), (Betty, [Write("alpha", "beta")]), (Betty, [Write("alpha", "gamma"), Write("alpha", "alpha")]), (Alfie, [Write("alpha", "gamma"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("gamma", "beta"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("alpha", "beta"), Write("beta", "gamma"), Write("alpha", "alpha")]), (Betty, [Write("alpha", "gamma"), Write("gamma", "beta"), Write("beta", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("gamma", "beta"), Write("beta", "gamma")]), (Betty, [Write("gamma", "gamma"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("gamma", "beta"), Write("beta", "alpha"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("alpha", "alpha"), Write("gamma", "gamma"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("beta", "alpha"), Write("alpha", "gamma")]), (Alfie, [Write("beta", "alpha"), Write("beta", "gamma"), Write("alpha", "gamma")])] } +cc 9c1851f6773562a9d437743f7033d15df566ef3ee865533a4d197120af731891 # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Alfie, [Write("gamma", "beta"), Write("alpha", "gamma"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("beta", "beta"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("beta", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("alpha", "beta"), Write("beta", "beta"), Write("beta", "alpha")]), (Alfie, [Write("beta", "gamma"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("beta", "alpha")]), (Betty, [Write("gamma", "alpha"), Write("beta", "gamma"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "alpha"), Write("alpha", "beta")]), (Alfie, [Write("beta", "alpha"), Write("alpha", "beta"), Write("alpha", "beta"), Write("beta", "gamma")]), (Betty, [Write("alpha", "beta"), Write("alpha", "alpha"), Write("beta", "gamma")]), (Alfie, [Write("beta", "alpha"), Write("gamma", "gamma")]), (Betty, [Write("beta", "gamma"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("beta", "beta"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("beta", "beta"), Write("alpha", "alpha")]), (Alfie, [Write("beta", "beta"), Write("alpha", "alpha"), Write("beta", "beta"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("beta", "beta"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("beta", "beta"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("gamma", "alpha")]), (Alfie, [])] } +cc 2bd80650f13377a3e39bbbf73c0fcf1f17b056880651abfc68e75f66a7f3c130 # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Alfie, [Write("beta", "beta"), Write("beta", "beta"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("beta", "beta"), Write("gamma", "gamma"), Write("beta", "beta"), Write("beta", "beta"), Write("gamma", "beta"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("beta", "beta")]), (Alfie, [Write("alpha", "beta"), Write("alpha", "beta"), Write("gamma", "alpha"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("alpha", "beta"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("gamma", "alpha"), Write("gamma", "alpha"), Write("beta", "beta"), Write("beta", "alpha"), Write("gamma", "gamma"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("alpha", "gamma")]), (Betty, [Write("gamma", "beta"), Write("gamma", "beta"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("beta", "beta"), Write("beta", "alpha"), Write("beta", "alpha"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("gamma", "beta")]), (Alfie, [Write("alpha", "beta")]), (Alfie, [Write("gamma", "alpha"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("alpha", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("gamma", "gamma")]), (Betty, [Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "beta"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("beta", "beta"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("alpha", "beta")]), (Betty, []), (Alfie, [Write("gamma", "alpha")]), (Betty, [Write("alpha", "alpha"), Write("alpha", "beta"), Write("beta", "alpha"), Write("beta", "alpha"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("beta", "beta")]), (Betty, [Write("alpha", "alpha"), Write("gamma", "beta"), Write("gamma", "alpha")]), (Alfie, [Write("alpha", "gamma")])] } +cc 48f0a951e77785086a8625b0e5afeb4a25e49fd1923e707eebc42d30c430a144 # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Betty, [Write("beta", "beta")]), (Alfie, [Write("gamma", "beta"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("alpha", "beta"), Write("beta", "beta")]), (Betty, [Write("alpha", "beta"), Write("gamma", "alpha"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("beta", "gamma"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("beta", "gamma")]), (Alfie, [Write("beta", "gamma"), Write("alpha", "beta"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("gamma", "beta"), Write("gamma", "gamma")]), (Betty, [Write("alpha", "beta"), Write("beta", "beta")]), (Alfie, [Write("beta", "beta"), Write("gamma", "gamma"), Write("alpha", "alpha"), Write("alpha", "beta"), Write("alpha", "alpha"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("beta", "alpha"), Write("gamma", "gamma")]), (Alfie, [Write("alpha", "gamma"), Write("beta", "alpha"), Write("beta", "beta"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("alpha", "gamma")]), (Betty, [Write("beta", "gamma"), Write("beta", "beta"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("beta", "beta"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("gamma", "gamma"), Write("alpha", "gamma"), Write("beta", "beta"), Write("beta", "alpha")]), (Betty, [Write("alpha", "beta"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("gamma", "beta"), Write("beta", "beta"), Write("gamma", "gamma"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("beta", "beta"), Write("beta", "alpha"), Write("gamma", "gamma")]), (Alfie, [Write("alpha", "alpha"), Write("gamma", "alpha"), Write("gamma", "alpha"), Write("alpha", "alpha"), Write("alpha", "gamma"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("alpha", "alpha")]), (Alfie, [Write("gamma", "gamma"), Write("alpha", "alpha"), Write("gamma", "gamma"), Write("beta", "beta"), Write("gamma", "beta")]), (Alfie, [Write("beta", "beta"), Write("beta", "alpha"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("alpha", "beta"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("alpha", "gamma"), Write("alpha", "gamma")]), (Alfie, [Write("beta", "beta"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("alpha", "beta"), Write("beta", "alpha"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("beta", "beta"), Write("gamma", "gamma"), Write("beta", "beta"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("beta", "alpha"), Write("alpha", "beta")]), (Betty, [Write("alpha", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("beta", "beta"), Write("alpha", "gamma")]), (Betty, [Write("beta", "beta"), Write("gamma", "gamma"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("gamma", "beta"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("beta", "alpha")]), (Betty, [Write("beta", "gamma"), Write("alpha", "alpha"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("alpha", "gamma")]), (Alfie, [Write("gamma", "beta"), Write("beta", "gamma"), Write("beta", "alpha"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("gamma", "gamma"), Write("beta", "beta"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("beta", "gamma"), Write("gamma", "beta")])] } +cc fd7a666da43de4a6647fd7a5b7c543c4f11abde19a3d8592d3845226bc964f1c # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [] } +cc 42fc5284840d3b6e58d5650c131a3ab6e7528fc98fb4c4fb77097e29715326f5 # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [] } +cc cef4354611d13f5c0cdecbc409eb54eea1ef59512c272875d01040b187db84ea # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Alfie, [Write("beta", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("gamma", "beta"), Write("alpha", "beta")]), (Alfie, [Write("beta", "alpha")]), (Alfie, [Write("alpha", "alpha"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("beta", "beta"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("alpha", "gamma"), Write("beta", "beta"), Write("beta", "gamma"), Write("alpha", "alpha")]), (Betty, [Write("alpha", "gamma"), Write("beta", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("gamma", "gamma"), Write("beta", "alpha"), Write("gamma", "beta"), Write("alpha", "alpha"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("alpha", "beta")]), (Betty, []), (Alfie, [Write("alpha", "alpha"), Write("alpha", "gamma"), Write("gamma", "gamma"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("beta", "beta"), Write("beta", "beta"), Write("beta", "gamma"), Write("beta", "alpha")]), (Betty, [Write("alpha", "alpha"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("beta", "beta"), Write("gamma", "alpha"), Write("alpha", "alpha"), Write("gamma", "alpha"), Write("beta", "beta"), Write("gamma", "beta"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("gamma", "beta"), Write("gamma", "beta"), Write("beta", "beta"), Write("beta", "alpha"), Write("gamma", "beta")]), (Alfie, [Write("beta", "beta")]), (Betty, [Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("alpha", "alpha"), Write("alpha", "gamma"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("alpha", "alpha"), Write("alpha", "gamma")]), (Alfie, [Write("beta", "alpha"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("gamma", "gamma"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("gamma", "gamma")]), (Betty, [Write("gamma", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("beta", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("alpha", "beta"), Write("beta", "alpha"), Write("beta", "beta"), Write("alpha", "alpha"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("beta", "alpha")]), (Alfie, [Write("beta", "alpha"), Write("alpha", "alpha"), Write("beta", "beta"), Write("alpha", "beta"), Write("beta", "beta"), Write("alpha", "alpha"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("beta", "beta"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("gamma", "gamma"), Write("gamma", "alpha"), Write("alpha", "alpha")]), (Betty, [Write("beta", "beta"), Write("gamma", "gamma"), Write("alpha", "beta"), Write("alpha", "beta"), Write("beta", "beta"), Write("beta", "gamma"), Write("gamma", "beta")]), (Alfie, [Write("alpha", "beta"), Write("gamma", "beta"), Write("alpha", "beta"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("alpha", "beta")])] } +cc cfd6874efc9b42a5cad679512edfb09332852f4919920b2dde117e7039edff5a # shrinks to input = _TestGetManyWeirdResultArgs { rounds: [(Alfie, [Write("alpha", "alpha"), Write("gamma", "gamma"), Write("beta", "gamma"), Write("beta", "alpha"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("alpha", "beta"), Write("beta", "beta")]), (Alfie, [Write("alpha", "alpha"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("beta", "alpha"), Write("beta", "gamma"), Write("alpha", "gamma"), Write("alpha", "alpha"), Write("alpha", "beta"), Write("beta", "beta"), Write("gamma", "beta"), Write("beta", "alpha"), Write("gamma", "beta"), Write("alpha", "gamma"), Write("alpha", "alpha")]), (Betty, [Write("alpha", "beta"), Write("alpha", "alpha"), Write("beta", "beta"), Write("beta", "gamma"), Write("beta", "alpha"), Write("beta", "gamma"), Write("gamma", "beta"), Write("alpha", "beta"), Write("beta", "alpha")]), (Betty, [Write("alpha", "gamma"), Write("alpha", "beta"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("alpha", "gamma"), Write("alpha", "gamma")]), (Alfie, [Write("alpha", "alpha"), Write("alpha", "alpha"), Write("alpha", "gamma"), Write("beta", "beta"), Write("beta", "beta"), Write("beta", "alpha"), Write("gamma", "alpha")]), (Alfie, [Write("alpha", "alpha"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("gamma", "alpha"), Write("beta", "alpha"), Write("beta", "beta"), Write("beta", "gamma"), Write("beta", "beta"), Write("beta", "alpha"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("beta", "beta"), Write("gamma", "beta")]), (Alfie, [Write("gamma", "beta"), Write("alpha", "gamma"), Write("gamma", "beta"), Write("gamma", "beta"), Write("beta", "gamma"), Write("gamma", "gamma"), Write("beta", "alpha"), Write("beta", "alpha")]), (Betty, [Write("gamma", "gamma"), Write("alpha", "alpha"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("beta", "beta"), Write("gamma", "gamma"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("beta", "beta"), Write("alpha", "beta"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("beta", "beta")]), (Betty, [Write("beta", "alpha")]), (Betty, [Write("gamma", "alpha"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("beta", "beta"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("alpha", "alpha"), Write("beta", "gamma"), Write("beta", "beta"), Write("alpha", "alpha"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("alpha", "beta"), Write("beta", "beta"), Write("gamma", "gamma")]), (Alfie, [Write("alpha", "alpha"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("beta", "gamma"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("gamma", "beta"), Write("beta", "alpha"), Write("gamma", "gamma"), Write("beta", "beta"), Write("alpha", "alpha"), Write("alpha", "beta")]), (Alfie, [Write("gamma", "beta"), Write("gamma", "beta")]), (Alfie, [Write("gamma", "beta"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("gamma", "beta"), Write("beta", "beta"), Write("gamma", "alpha"), Write("alpha", "beta"), Write("gamma", "alpha"), Write("beta", "gamma"), Write("beta", "alpha"), Write("gamma", "alpha"), Write("gamma", "beta"), Write("alpha", "alpha")]), (Betty, [Write("gamma", "beta"), Write("beta", "alpha")]), (Alfie, [Write("beta", "gamma"), Write("beta", "alpha"), Write("alpha", "gamma"), Write("beta", "gamma"), Write("beta", "gamma"), Write("gamma", "gamma")]), (Betty, [Write("gamma", "alpha")])] } diff --git a/iroh/tests/spaces.rs b/iroh/tests/spaces.rs index b927347a780..070b8b8f798 100644 --- a/iroh/tests/spaces.rs +++ b/iroh/tests/spaces.rs @@ -1,17 +1,26 @@ -use anyhow::Result; +use std::collections::BTreeMap; + +use anyhow::ensure; use futures_lite::StreamExt; -use iroh::client::{spaces::EntryForm, Iroh}; +use iroh::client::{ + spaces::{EntryForm, Space}, + Iroh, +}; use iroh_net::{key::SecretKey, NodeAddr}; use iroh_willow::{ - interest::{CapSelector, DelegateTo, RestrictArea}, + interest::{AreaOfInterestSelector, CapSelector, DelegateTo, RestrictArea}, proto::{ data_model::{Path, PathExt}, grouping::{Area, Range3d}, - keys::NamespaceKind, + keys::{NamespaceKind, UserId}, meadowcap::AccessMode, }, - session::intents::Completion, + session::{intents::Completion, Role, SessionMode}, + store::traits::{EntryOrigin, StoreEvent}, }; +use proptest::{collection::vec, prelude::Strategy, sample::select}; +use test_strategy::proptest; +use testresult::TestResult; use tracing::info; /// Spawn an iroh node in a separate thread and tokio runtime, and return @@ -40,8 +49,170 @@ async fn spawn_node() -> (NodeAddr, Iroh) { receiver.await.unwrap() } +#[derive(Debug, Clone)] +enum Operation { + Write(String, String), +} + +fn simple_str() -> impl Strategy { + select(&["alpha", "beta", "gamma"]).prop_map(str::to_string) +} + +fn simple_op() -> impl Strategy { + (simple_str(), simple_str()).prop_map(|(key, value)| Operation::Write(key, value)) +} + +fn role() -> impl Strategy { + select(&[Role::Alfie, Role::Betty]) +} + +#[proptest] +fn test_get_many_weird_result( + #[strategy(vec((role(), vec(simple_op(), 0..20)), 0..20))] rounds: Vec<(Role, Vec)>, +) { + iroh_test::logging::setup_multithreaded(); + + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + let mut simulated_entries: BTreeMap<(Role, String), String> = BTreeMap::new(); + + let (alfie_addr, alfie) = spawn_node().await; + let (betty_addr, betty) = spawn_node().await; + info!("alfie is {}", alfie_addr.node_id.fmt_short()); + info!("betty is {}", betty_addr.node_id.fmt_short()); + alfie.net().add_node_addr(betty_addr.clone()).await?; + betty.net().add_node_addr(alfie_addr.clone()).await?; + let betty_user = betty.spaces().create_user().await?; + let alfie_user = alfie.spaces().create_user().await?; + let alfie_space = alfie + .spaces() + .create(NamespaceKind::Owned, alfie_user) + .await?; + + let ticket = alfie_space + .share(betty_user, AccessMode::Write, RestrictArea::None) + .await?; + + // give betty access + let (betty_space, syncs) = betty + .spaces() + .import_and_sync(ticket, SessionMode::ReconcileOnce) + .await?; + + syncs.complete_all().await; + + for (role, round) in rounds { + let (space, user, other_node_id) = match role { + Role::Alfie => (&alfie_space, alfie_user, betty_addr.node_id), + Role::Betty => (&betty_space, betty_user, alfie_addr.node_id), + }; + + for Operation::Write(key, value) in round { + space + .insert_bytes( + EntryForm::new(user, Path::from_bytes(&[key.as_bytes()])?), + value.clone().into_bytes(), + ) + .await?; + simulated_entries.insert((role, key), value); + } + + tokio::time::timeout( + std::time::Duration::from_secs(5), + space + .sync_once(other_node_id, AreaOfInterestSelector::Widest) + .await? + .complete(), + ) + .await??; + } + + let alfie_map = space_to_map(&alfie_space, &alfie, alfie_user, betty_user).await?; + let betty_map = space_to_map(&betty_space, &betty, alfie_user, betty_user).await?; + + ensure!( + alfie_map == betty_map, + "states out of sync:\n{alfie_map:#?}\n !=\n{betty_map:#?}" + ); + ensure!( + simulated_entries == alfie_map, + "alfie in unexpected state:\n{simulated_entries:#?}\n !=\n{alfie_map:#?}" + ); + // follows transitively, but still + ensure!( + simulated_entries == betty_map, + "betty in unexpected state:\n{simulated_entries:#?}\n !=\n{betty_map:#?}" + ); + + println!("Success!"); + + Ok(()) + }) + .map_err(AnyhowStdErr)?; +} + +async fn space_to_map( + space: &Space, + node: &Iroh, + alfie_user: UserId, + betty_user: UserId, +) -> anyhow::Result> { + let role_lookup = BTreeMap::from([(alfie_user, Role::Alfie), (betty_user, Role::Betty)]); + let entries = space + .get_many(Range3d::new_full()) + .await? + .try_collect::<_, _, Vec<_>>() + .await?; + let mut map: BTreeMap<(Role, String), String> = BTreeMap::new(); + for auth_entry in entries { + let (entry, auth) = auth_entry.into_parts(); + let key_component = entry + .path() + .get_component(0) + .ok_or_else(|| anyhow::anyhow!("path component missing"))?; + let key = String::from_utf8(key_component.to_vec())?; + + let value = node.blobs().read_to_bytes(entry.payload_digest().0).await?; + + let user = auth.capability.receiver(); + let role = role_lookup + .get(user) + .ok_or_else(|| anyhow::anyhow!("foreign write?"))?; + + map.insert((*role, key), String::from_utf8_lossy(&value).to_string()); + } + + Ok(map) +} + +#[derive(Debug)] +struct AnyhowStdErr(anyhow::Error); + +impl std::fmt::Display for AnyhowStdErr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::error::Error for AnyhowStdErr { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } + + fn description(&self) -> &str { + "description() is deprecated; use Display" + } + + fn cause(&self) -> Option<&dyn std::error::Error> { + self.source() + } +} + #[tokio::test] -async fn spaces_smoke() -> Result<()> { +async fn spaces_smoke() -> TestResult { iroh_test::logging::setup_multithreaded(); let (alfie_addr, alfie) = spawn_node().await; let (betty_addr, betty) = spawn_node().await; @@ -75,7 +246,10 @@ async fn spaces_smoke() -> Result<()> { .await?; println!("ticket {ticket:?}"); - let (betty_space, betty_sync_intent) = betty.spaces().import_and_sync(ticket).await?; + let (betty_space, betty_sync_intent) = betty + .spaces() + .import_and_sync(ticket, SessionMode::ReconcileOnce) + .await?; let mut completion = betty_sync_intent.complete_all().await; assert_eq!(completion.len(), 1); @@ -132,3 +306,87 @@ async fn spaces_smoke() -> Result<()> { Ok(()) } + +#[tokio::test] +async fn spaces_subscription() -> TestResult { + iroh_test::logging::setup_multithreaded(); + let (alfie_addr, alfie) = spawn_node().await; + let (betty_addr, betty) = spawn_node().await; + info!("alfie is {}", alfie_addr.node_id.fmt_short()); + info!("betty is {}", betty_addr.node_id.fmt_short()); + + let betty_user = betty.spaces().create_user().await?; + let alfie_user = alfie.spaces().create_user().await?; + let alfie_space = alfie + .spaces() + .create(NamespaceKind::Owned, alfie_user) + .await?; + + let _namespace = alfie_space.namespace_id(); + + let mut alfie_sub = alfie_space + .subscribe_area(Area::new_full(), Default::default()) + .await?; + + let ticket = alfie_space + .share(betty_user, AccessMode::Write, RestrictArea::None) + .await?; + + let (betty_space, betty_sync_intent) = betty + .spaces() + .import_and_sync(ticket, SessionMode::Continuous) + .await?; + + let _sync_task = tokio::task::spawn(async move { + // TODO: We should add a "detach" method to a sync intent! + // (leaves the sync running but stop consuming events) + let _ = betty_sync_intent.complete_all().await; + }); + + let mut betty_sub = betty_space + .resume_subscription(0, Area::new_full(), Default::default()) + .await?; + + betty_space + .insert_bytes( + EntryForm::new(betty_user, Path::from_bytes(&[b"foo"])?), + "hi", + ) + .await?; + + let ev = betty_sub.next().await.unwrap().unwrap(); + println!("BETTY 1 {ev:?}"); + assert!(matches!(ev, StoreEvent::Ingested(0, _, EntryOrigin::Local))); + + let ev = alfie_sub.next().await.unwrap().unwrap(); + println!("ALFIE 1 {ev:?}"); + assert!(matches!( + ev, + StoreEvent::Ingested(0, _, EntryOrigin::Remote(_)) + )); + + alfie_space + .insert_bytes( + EntryForm::new(alfie_user, Path::from_bytes(&[b"bar"])?), + "hi!!", + ) + .await?; + + let ev = alfie_sub.next().await.unwrap().unwrap(); + println!("ALFIE 2 {ev:?}"); + assert!(matches!(ev, StoreEvent::Ingested(1, _, EntryOrigin::Local))); + + let ev = betty_sub.next().await.unwrap().unwrap(); + println!("BETTY 2 {ev:?}"); + assert!(matches!( + ev, + StoreEvent::Ingested(1, _, EntryOrigin::Remote(_)) + )); + + // let resume_sub = alfie_space + // .resume_subscription(0, Area::new_full(), Default::default()) + // .await?; + // assert_eq!(resume_sub.count().await, 2); + + Ok(()) +}