From 5895f50af3438a6a89c977916d2596fdf118295a Mon Sep 17 00:00:00 2001 From: Frando Date: Tue, 30 Sep 2025 10:24:10 +0200 Subject: [PATCH] tests: add test about rejoining a document --- src/actor.rs | 1 - tests/sync.rs | 143 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 143 insertions(+), 1 deletion(-) diff --git a/src/actor.rs b/src/actor.rs index 5fe90123..bcb72300 100644 --- a/src/actor.rs +++ b/src/actor.rs @@ -993,7 +993,6 @@ impl OpenReplicas { } hash_map::Entry::Occupied(mut e) => { let state = e.get_mut(); - tracing::debug!("STATE {state:?}"); state.handles = state.handles.wrapping_sub(1); if state.handles == 0 { let _ = e.remove_entry(); diff --git a/tests/sync.rs b/tests/sync.rs index 72d04da7..1dcb2a6d 100644 --- a/tests/sync.rs +++ b/tests/sync.rs @@ -1373,3 +1373,146 @@ fn match_sync_finished(event: &LiveEvent, peer: PublicKey) -> bool { }; e.peer == peer && e.result.is_ok() } + +/// This tests the simplest scenario: A node connects to another node, and performs sync. +#[tokio::test] +// #[traced_test] +async fn sync_rejoin() -> Result<()> { + tracing_subscriber::fmt::try_init().ok(); + let mut rng = test_rng(b"sync_rejoin"); + let nodes = spawn_nodes(2, &mut rng).await?; + let clients = nodes.iter().map(|node| node.client()).collect::>(); + + // create doc on node0 + let peer0 = nodes[0].node_id(); + let author0 = clients[0].docs().author_create().await?; + let doc0 = clients[0].docs().create().await?; + let blobs0 = clients[0].blobs(); + let hash0 = doc0 + .set_bytes(author0, b"k1".to_vec(), b"v1".to_vec()) + .await?; + assert_latest(blobs0, &doc0, b"k1", b"v1").await; + let ticket = doc0 + .share(ShareMode::Write, AddrInfoOptions::RelayAndAddresses) + .await?; + + let mut events0 = doc0.subscribe().await?; + + info!("node1: join"); + let peer1 = nodes[1].node_id(); + let doc1 = clients[1].docs().import(ticket.clone()).await?; + let blobs1 = clients[1].blobs(); + let mut events1 = doc1.subscribe().await?; + info!("node1: assert 5 events"); + assert_next_unordered( + &mut events1, + TIMEOUT, + vec![ + Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)), + Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, .. } if *from == peer0 )), + Box::new(move |e| match_sync_finished(e, peer0)), + Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash0)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + assert_latest(blobs1, &doc1, b"k1", b"v1").await; + + info!("node0: assert 3 events"); + assert_next( + &mut events0, + TIMEOUT, + vec![ + Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)), + Box::new(move |e| match_sync_finished(e, peer1)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + info!("node0: add blob"); + let hash2 = doc0.set_bytes(author0, "k2", "v2").await?; + assert_next( + &mut events0, + TIMEOUT, + vec![Box::new(move |e| { + matches!(e, LiveEvent::InsertLocal { .. }) + })], + ) + .await; + assert_next( + &mut events1, + TIMEOUT, + vec![ + Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, entry, .. } if *from == peer0 && entry.key() == b"k2")), + Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash2)), + ], + ) + .await; + + info!("node1: close doc"); + doc1.leave().await?; + drop(events1); + doc1.close().await?; + tokio::time::sleep(Duration::from_secs(1)).await; + + info!("node0: assert neighbor down"); + assert_next( + &mut events0, + TIMEOUT, + vec![Box::new( + move |e| matches!(e, LiveEvent::NeighborDown(peer) if *peer == peer1), + )], + ) + .await; + + info!("node1: reopen doc"); + let doc1 = clients[1].docs().import(ticket.clone()).await?; + let mut events1 = doc1.subscribe().await?; + assert_next( + &mut events1, + TIMEOUT, + vec![ + Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer0)), + Box::new(move |e| match_sync_finished(e, peer0)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + assert_next( + &mut events0, + TIMEOUT, + vec![ + Box::new(move |e| matches!(e, LiveEvent::NeighborUp(peer) if *peer == peer1)), + Box::new(move |e| match_sync_finished(e, peer1)), + match_event!(LiveEvent::PendingContentReady), + ], + ) + .await; + + info!("node0: add blob"); + let hash3 = doc0.set_bytes(author0, "k3", "v3").await?; + assert_next( + &mut events0, + TIMEOUT, + vec![Box::new(move |e| { + matches!(e, LiveEvent::InsertLocal { .. }) + })], + ) + .await; + info!("node1: confirm live event"); + assert_next( + &mut events1, + TIMEOUT, + vec![ + Box::new(move |e| matches!(e, LiveEvent::InsertRemote { from, entry, .. } if *from == peer0 && entry.key() == b"k3")), + Box::new(move |e| matches!(e, LiveEvent::ContentReady { hash } if *hash == hash3)), + ], + ) + .await; + + for node in nodes { + node.shutdown().await?; + } + Ok(()) +}