Skip to content

Commit 5a0533d

Browse files
authored
feat: stop synchronizing interests (#632)
* feat: stop synchronizing interests With this change the interests ring is no longer synchronized between peers. A local node will still use the interest-svc to know its own interests and the beginning of each Recon conversation negotiates shared interests. Therefore it is no longer necessary to synchronize interests. In the future we may decide that we want to use Recon to sync interests instead of a linear sharing of interests before each conversation, however that is only a performance optimization that is not important at this stage. Fixes #610 Fixes #611 * fix: test
1 parent 17e559e commit 5a0533d

File tree

10 files changed

+56
-160
lines changed

10 files changed

+56
-160
lines changed

one/src/daemon.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use ceramic_sql::sqlite::SqlitePool;
1919
use clap::Args;
2020
use object_store::aws::AmazonS3Builder;
2121
use object_store::local::LocalFileSystem;
22-
use recon::{FullInterests, Recon, ReconInterestProvider};
22+
use recon::{Recon, ReconInterestProvider};
2323
use signal_hook::consts::signal::*;
2424
use signal_hook_tokio::Signals;
2525
use std::sync::Arc;
@@ -530,13 +530,6 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
530530
// Construct a recon implementation for peers.
531531
let recon_peer = Recon::new(peer_svc.clone(), PeerKeyInterests, recon_metrics.clone());
532532

533-
// Construct a recon implementation for interests.
534-
let recon_interest = Recon::new(
535-
interest_svc.clone(),
536-
FullInterests::default(),
537-
recon_metrics.clone(),
538-
);
539-
540533
// Construct a recon implementation for models.
541534
let recon_model = Recon::new(
542535
model_svc.clone(),
@@ -545,7 +538,7 @@ pub async fn run(opts: DaemonOpts) -> Result<()> {
545538
recon_metrics,
546539
);
547540

548-
let recons = Some((recon_peer, recon_interest, recon_model));
541+
let recons = Some((recon_peer, recon_model));
549542
let ipfs_metrics =
550543
ceramic_metrics::MetricsHandle::register(ceramic_kubo_rpc::IpfsMetrics::register);
551544
let p2p_metrics = MetricsHandle::register(ceramic_p2p::Metrics::register);

one/src/network.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44

55
use anyhow::{anyhow, Result};
66
use async_trait::async_trait;
7-
use ceramic_core::{EventId, Interest, NodeId, NodeKey, PeerKey};
7+
use ceramic_core::{EventId, NodeId, NodeKey, PeerKey};
88
use ceramic_kubo_rpc::{IpfsMetrics, IpfsMetricsMiddleware, IpfsService};
99
use ceramic_p2p::{Config as P2pConfig, Libp2pConfig, Node, PeerService};
1010
use iroh_rpc_client::P2pClient;
@@ -34,18 +34,17 @@ impl BuilderState for WithP2p {}
3434

3535
/// Configure the p2p service
3636
impl Builder<Init> {
37-
pub async fn with_p2p<P, I, M, S>(
37+
pub async fn with_p2p<P, M, S>(
3838
self,
3939
libp2p_config: Libp2pConfig,
4040
node_key: NodeKey,
4141
peer_svc: impl PeerService + 'static,
42-
recons: Option<(P, I, M)>,
42+
recons: Option<(P, M)>,
4343
block_store: Arc<S>,
4444
metrics: ceramic_p2p::Metrics,
4545
) -> anyhow::Result<Builder<WithP2p>>
4646
where
4747
P: Recon<Key = PeerKey, Hash = Sha256a>,
48-
I: Recon<Key = Interest, Hash = Sha256a>,
4948
M: Recon<Key = EventId, Hash = Sha256a>,
5049
S: iroh_bitswap::Store,
5150
{

p2p/src/behaviour.rs

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::time::Duration;
22

33
use anyhow::Result;
4-
use ceramic_core::{EventId, Interest, PeerKey};
4+
use ceramic_core::{EventId, PeerKey};
55
use iroh_bitswap::{Bitswap, Block, Config as BitswapConfig};
66
use libp2p::{
77
autonat,
@@ -36,7 +36,7 @@ pub const AGENT_VERSION: &str = concat!("ceramic-one/", env!("CARGO_PKG_VERSION"
3636
/// Libp2p behaviour for the node.
3737
#[derive(NetworkBehaviour)]
3838
#[behaviour(to_swarm = "Event")]
39-
pub(crate) struct NodeBehaviour<P, I, M, S>
39+
pub(crate) struct NodeBehaviour<P, M, S>
4040
where
4141
S: iroh_bitswap::Store + Send + Sync,
4242
{
@@ -56,21 +56,20 @@ where
5656
relay: Toggle<relay::Behaviour>,
5757
relay_client: Toggle<relay::client::Behaviour>,
5858
dcutr: Toggle<dcutr::Behaviour>,
59-
recon: Toggle<recon::libp2p::Behaviour<P, I, M>>,
59+
recon: Toggle<recon::libp2p::Behaviour<P, M>>,
6060
}
6161

62-
impl<P, I, M, S> NodeBehaviour<P, I, M, S>
62+
impl<P, M, S> NodeBehaviour<P, M, S>
6363
where
6464
P: Recon<Key = PeerKey, Hash = Sha256a> + Send + Sync,
65-
I: Recon<Key = Interest, Hash = Sha256a> + Send + Sync,
6665
M: Recon<Key = EventId, Hash = Sha256a> + Send + Sync,
6766
S: iroh_bitswap::Store + Send + Sync,
6867
{
6968
pub async fn new(
7069
local_key: &Keypair,
7170
config: &Libp2pConfig,
7271
relay_client: Option<relay::client::Behaviour>,
73-
recons: Option<(P, I, M)>,
72+
recons: Option<(P, M)>,
7473
block_store: Arc<S>,
7574
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
7675
metrics: Metrics,
@@ -186,8 +185,8 @@ where
186185
.with_max_pending_incoming(Some(config.max_conns_pending_in))
187186
.with_max_established_per_peer(Some(config.max_conns_per_peer)),
188187
);
189-
let recon = recons.map(|(peer, interest, model)| {
190-
recon::libp2p::Behaviour::new(peer, interest, model, recon::libp2p::Config::default())
188+
let recon = recons.map(|(peer, model)| {
189+
recon::libp2p::Behaviour::new(peer, model, recon::libp2p::Config::default())
191190
});
192191
Ok(NodeBehaviour {
193192
ping: Ping::default(),

p2p/src/node.rs

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{sync::atomic::Ordering, time::Duration};
77

88
use ahash::AHashMap;
99
use anyhow::{anyhow, bail, Context, Result};
10-
use ceramic_core::{EventId, Interest, NodeKey, PeerKey};
10+
use ceramic_core::{EventId, NodeKey, PeerKey};
1111
use ceramic_metrics::{libp2p_metrics, Recorder};
1212
use cid::Cid;
1313
use futures_util::stream::StreamExt;
@@ -62,15 +62,14 @@ pub enum NetworkEvent {
6262
/// Node implements a peer to peer node that participates on the Ceramic network.
6363
///
6464
/// Node provides an external API via RpcMessages.
65-
pub struct Node<P, I, M, S>
65+
pub struct Node<P, M, S>
6666
where
6767
P: Recon<Key = PeerKey, Hash = Sha256a>,
68-
I: Recon<Key = Interest, Hash = Sha256a>,
6968
M: Recon<Key = EventId, Hash = Sha256a>,
7069
S: iroh_bitswap::Store,
7170
{
7271
metrics: Metrics,
73-
swarm: Swarm<NodeBehaviour<P, I, M, S>>,
72+
swarm: Swarm<NodeBehaviour<P, M, S>>,
7473
supported_protocols: HashSet<String>,
7574
net_receiver_in: Receiver<RpcMessage>,
7675
dial_queries: AHashMap<PeerId, Vec<OneShotSender<Result<()>>>>,
@@ -92,10 +91,9 @@ where
9291
active_address_probe: Option<Multiaddr>,
9392
}
9493

95-
impl<P, I, M, S> fmt::Debug for Node<P, I, M, S>
94+
impl<P, M, S> fmt::Debug for Node<P, M, S>
9695
where
9796
P: Recon<Key = PeerKey, Hash = Sha256a>,
98-
I: Recon<Key = Interest, Hash = Sha256a>,
9997
M: Recon<Key = EventId, Hash = Sha256a>,
10098
S: iroh_bitswap::Store,
10199
{
@@ -128,10 +126,9 @@ const NICE_INTERVAL: Duration = Duration::from_secs(6);
128126
const BOOTSTRAP_INTERVAL: Duration = Duration::from_secs(5 * 60);
129127
const EXPIRY_INTERVAL: Duration = Duration::from_secs(1);
130128

131-
impl<P, I, M, S> Drop for Node<P, I, M, S>
129+
impl<P, M, S> Drop for Node<P, M, S>
132130
where
133131
P: Recon<Key = PeerKey, Hash = Sha256a>,
134-
I: Recon<Key = Interest, Hash = Sha256a>,
135132
M: Recon<Key = EventId, Hash = Sha256a>,
136133
S: iroh_bitswap::Store,
137134
{
@@ -143,12 +140,10 @@ where
143140

144141
// Allow IntoConnectionHandler deprecated associated type.
145142
// We are not using IntoConnectionHandler directly only referencing the type as part of this event signature.
146-
type NodeSwarmEvent<P, I, M, S> =
147-
SwarmEvent<<NodeBehaviour<P, I, M, S> as NetworkBehaviour>::ToSwarm>;
148-
impl<P, I, M, S> Node<P, I, M, S>
143+
type NodeSwarmEvent<P, M, S> = SwarmEvent<<NodeBehaviour<P, M, S> as NetworkBehaviour>::ToSwarm>;
144+
impl<P, M, S> Node<P, M, S>
149145
where
150146
P: Recon<Key = PeerKey, Hash = Sha256a> + Send + Sync,
151-
I: Recon<Key = Interest, Hash = Sha256a> + Send + Sync,
152147
M: Recon<Key = EventId, Hash = Sha256a> + Send + Sync,
153148
S: iroh_bitswap::Store + Send + Sync,
154149
{
@@ -157,7 +152,7 @@ where
157152
rpc_addr: P2pAddr,
158153
node_key: NodeKey,
159154
peer_svc: impl PeerService + 'static,
160-
recons: Option<(P, I, M)>,
155+
recons: Option<(P, M)>,
161156
block_store: Arc<S>,
162157
metrics: Metrics,
163158
) -> Result<Self> {
@@ -494,7 +489,7 @@ where
494489
#[tracing::instrument(skip_all)]
495490
async fn handle_swarm_event(
496491
&mut self,
497-
event: NodeSwarmEvent<P, I, M, S>,
492+
event: NodeSwarmEvent<P, M, S>,
498493
) -> Result<Option<SwarmEventResult>> {
499494
libp2p_metrics().record(&event);
500495
match event {

p2p/src/swarm.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::Result;
2-
use ceramic_core::{EventId, Interest, PeerKey};
2+
use ceramic_core::{EventId, PeerKey};
33
use libp2p::{dns, noise, relay, tcp, tls, yamux, Swarm, SwarmBuilder};
44
use libp2p_identity::Keypair;
55
use recon::{libp2p::Recon, Sha256a};
@@ -28,17 +28,16 @@ fn get_dns_config() -> (dns::ResolverConfig, dns::ResolverOpts) {
2828
}
2929
}
3030

31-
pub(crate) async fn build_swarm<P, I, M, S>(
31+
pub(crate) async fn build_swarm<P, M, S>(
3232
config: &Libp2pConfig,
3333
keypair: Keypair,
34-
recons: Option<(P, I, M)>,
34+
recons: Option<(P, M)>,
3535
block_store: Arc<S>,
3636
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
3737
metrics: Metrics,
38-
) -> Result<Swarm<NodeBehaviour<P, I, M, S>>>
38+
) -> Result<Swarm<NodeBehaviour<P, M, S>>>
3939
where
4040
P: Recon<Key = PeerKey, Hash = Sha256a>,
41-
I: Recon<Key = Interest, Hash = Sha256a>,
4241
M: Recon<Key = EventId, Hash = Sha256a>,
4342
S: iroh_bitswap::Store,
4443
{
@@ -105,18 +104,17 @@ where
105104
}
106105
}
107106

108-
fn new_behavior<P, I, M, S>(
107+
fn new_behavior<P, M, S>(
109108
config: &Libp2pConfig,
110109
keypair: &Keypair,
111110
relay_client: Option<relay::client::Behaviour>,
112-
recons: Option<(P, I, M)>,
111+
recons: Option<(P, M)>,
113112
block_store: Arc<S>,
114113
peers_tx: tokio::sync::mpsc::Sender<peers::Message>,
115114
metrics: Metrics,
116-
) -> Result<NodeBehaviour<P, I, M, S>>
115+
) -> Result<NodeBehaviour<P, M, S>>
117116
where
118117
P: Recon<Key = PeerKey, Hash = Sha256a> + Send,
119-
I: Recon<Key = Interest, Hash = Sha256a> + Send,
120118
M: Recon<Key = EventId, Hash = Sha256a> + Send,
121119
S: iroh_bitswap::Store,
122120
{

p2p/tests/node.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use ceramic_event_svc::store::SqlitePool;
66
use iroh_rpc_client::P2pClient;
77
use iroh_rpc_types::Addr;
88
use libp2p::{Multiaddr, PeerId};
9-
use recon::{FullInterests, Recon, ReconInterestProvider};
9+
use recon::{Recon, ReconInterestProvider};
1010
use test_log::test;
1111

1212
use ceramic_p2p::{Config, Metrics, NetworkEvent, Node, PeerKeyInterests};
@@ -50,11 +50,6 @@ impl TestRunnerBuilder {
5050
Arc::clone(&peer_svc),
5151
Some((
5252
Recon::new(peer_svc, PeerKeyInterests, recon_metrics.clone()),
53-
Recon::new(
54-
Arc::clone(&interest_svc),
55-
FullInterests::default(),
56-
recon_metrics.clone(),
57-
),
5853
Recon::new(
5954
Arc::clone(&event_svc),
6055
ReconInterestProvider::new(node_key.id(), interest_svc),

recon/src/libp2p.rs

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ mod upgrade;
2020
pub use crate::protocol::Recon;
2121
pub use stream_set::StreamSet;
2222

23-
use ceramic_core::{EventId, Interest, PeerKey};
23+
use ceramic_core::{EventId, PeerKey};
2424
use futures::{future::BoxFuture, FutureExt};
2525
use libp2p::{
2626
core::ConnectedPoint,
@@ -43,8 +43,6 @@ use crate::{
4343

4444
/// Name of the Recon protocol for synchronizing peers
4545
pub const PROTOCOL_NAME_PEER: &str = "/ceramic/recon/0.1.0/peer";
46-
/// Name of the Recon protocol for synchronizing interests
47-
pub const PROTOCOL_NAME_INTEREST: &str = "/ceramic/recon/0.1.0/interest";
4846
/// Name of the Recon protocol for synchronizing models
4947
pub const PROTOCOL_NAME_MODEL: &str = "/ceramic/recon/0.1.0/model";
5048

@@ -76,9 +74,8 @@ impl Default for Config {
7674
/// The Behavior tracks all peers on the network that speak the Recon protocol.
7775
/// It is responsible for starting and stopping syncs with various peers depending on the needs of
7876
/// the application.
79-
pub struct Behaviour<P, I, M> {
77+
pub struct Behaviour<P, M> {
8078
peer: P,
81-
interest: I,
8279
model: M,
8380
config: Config,
8481
peers: BTreeMap<PeerId, PeerInfo>,
@@ -87,15 +84,13 @@ pub struct Behaviour<P, I, M> {
8784
next_sync: Option<BoxFuture<'static, ()>>,
8885
}
8986

90-
impl<P, I, M> std::fmt::Debug for Behaviour<P, I, M>
87+
impl<P, M> std::fmt::Debug for Behaviour<P, M>
9188
where
9289
P: std::fmt::Debug,
93-
I: std::fmt::Debug,
9490
M: std::fmt::Debug,
9591
{
9692
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
9793
f.debug_struct("Behaviour")
98-
.field("interest", &self.interest)
9994
.field("model", &self.model)
10095
.field("config", &self.config)
10196
.field("peers", &self.peers)
@@ -148,18 +143,16 @@ pub enum PeerStatus {
148143
Stopped,
149144
}
150145

151-
impl<P, I, M> Behaviour<P, I, M> {
146+
impl<P, M> Behaviour<P, M> {
152147
/// Create a new Behavior with the provided Recon implementation.
153-
pub fn new(peer: P, interest: I, model: M, config: Config) -> Self
148+
pub fn new(peer: P, model: M, config: Config) -> Self
154149
where
155150
P: Recon<Key = PeerKey, Hash = Sha256a>,
156-
I: Recon<Key = Interest, Hash = Sha256a>,
157151
M: Recon<Key = EventId, Hash = Sha256a>,
158152
{
159153
let (tx, rx) = tokio::sync::mpsc::channel(1000);
160154
Self {
161155
peer,
162-
interest,
163156
model,
164157
config,
165158
peers: BTreeMap::new(),
@@ -178,13 +171,12 @@ impl<P, I, M> Behaviour<P, I, M> {
178171
}
179172
}
180173

181-
impl<P, I, M> NetworkBehaviour for Behaviour<P, I, M>
174+
impl<P, M> NetworkBehaviour for Behaviour<P, M>
182175
where
183176
P: Recon<Key = PeerKey, Hash = Sha256a>,
184-
I: Recon<Key = Interest, Hash = Sha256a>,
185177
M: Recon<Key = EventId, Hash = Sha256a>,
186178
{
187-
type ConnectionHandler = Handler<P, I, M>;
179+
type ConnectionHandler = Handler<P, M>;
188180

189181
type ToSwarm = Event;
190182

@@ -205,13 +197,8 @@ where
205197
next_sync: BTreeMap::from_iter([
206198
// Schedule all stream_sets initially
207199
(StreamSet::Peer, Instant::now()),
208-
// Schedule interests after peers
209-
(
210-
StreamSet::Interest,
211-
Instant::now() + Duration::from_millis(1),
212-
),
213-
// Schedule models after interests
214-
(StreamSet::Model, Instant::now() + Duration::from_millis(2)),
200+
// Schedule models after peers
201+
(StreamSet::Model, Instant::now() + Duration::from_millis(1)),
215202
]),
216203
sync_delay: Default::default(),
217204
});
@@ -395,7 +382,6 @@ where
395382
connection_id,
396383
handler::State::WaitingInbound,
397384
self.peer.clone(),
398-
self.interest.clone(),
399385
self.model.clone(),
400386
))
401387
}
@@ -416,7 +402,6 @@ where
416402
stream_set: StreamSet::Peer,
417403
},
418404
self.peer.clone(),
419-
self.interest.clone(),
420405
self.model.clone(),
421406
))
422407
}

0 commit comments

Comments
 (0)