Skip to content

Commit 3e5c5aa

Browse files
authored
Merge branch 'main' into expiring-tags-example
2 parents 8b31cdd + 9991168 commit 3e5c5aa

File tree

13 files changed

+320
-324
lines changed

13 files changed

+320
-324
lines changed

Cargo.lock

Lines changed: 229 additions & 229 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "iroh-blobs"
3-
version = "0.91.0"
3+
version = "0.93.0"
44
edition = "2021"
55
description = "content-addressed blobs for iroh"
66
license = "MIT OR Apache-2.0"
@@ -37,12 +37,12 @@ chrono = "0.4.39"
3737
nested_enum_utils = "0.2.1"
3838
ref-cast = "1.0.24"
3939
arrayvec = "0.7.6"
40-
iroh = "0.90"
40+
iroh = "0.91"
4141
self_cell = "1.1.0"
4242
genawaiter = { version = "0.99.1", features = ["futures03"] }
43-
iroh-base = "0.90"
43+
iroh-base = "0.91"
4444
reflink-copy = "0.1.24"
45-
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false }
45+
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
4747

4848
[dev-dependencies]
@@ -58,15 +58,9 @@ testresult = "0.4.1"
5858
tracing-subscriber = { version = "0.3.19", features = ["fmt"] }
5959
tracing-test = "0.2.5"
6060
walkdir = "2.5.0"
61-
iroh = { version = "0.90", features = ["discovery-local-network"]}
61+
iroh = { version = "0.91", features = ["discovery-local-network"]}
6262

6363
[features]
6464
hide-proto-docs = []
6565
metrics = []
6666
default = ["hide-proto-docs"]
67-
68-
[patch.crates-io]
69-
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
70-
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
71-
irpc = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }
72-
irpc-derive = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }

deny.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,3 @@ name = "ring"
3939
[[licenses.clarify.license-files]]
4040
hash = 3171872035
4141
path = "LICENSE"
42-
43-
[sources]
44-
allow-git = [
45-
"https://github.com/n0-computer/irpc.git",
46-
"https://github.com/n0-computer/iroh.git",
47-
]

examples/random_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
241241
let router = iroh::protocol::Router::builder(endpoint.clone())
242242
.accept(iroh_blobs::ALPN, blobs)
243243
.spawn();
244-
let addr = router.endpoint().node_addr().initialized().await?;
244+
let addr = router.endpoint().node_addr().initialized().await;
245245
let ticket = NodeTicket::from(addr.clone());
246246
println!("Node address: {addr:?}");
247247
println!("ticket:\n{ticket}");

src/api.rs

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
//!
1313
//! You can also [`connect`](Store::connect) to a remote store that is listening
1414
//! to rpc requests.
15-
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
15+
use std::{io, net::SocketAddr, ops::Deref};
1616

1717
use bao_tree::io::EncodeError;
1818
use iroh::Endpoint;
19-
use irpc::rpc::{listen, Handler};
19+
use irpc::rpc::{listen, RemoteService};
2020
use n0_snafu::SpanTrace;
2121
use nested_enum_utils::common_fields;
2222
use proto::{Request, ShutdownRequest, SyncDbRequest};
@@ -32,7 +32,7 @@ pub mod remote;
3232
pub mod tags;
3333
pub use crate::{store::util::Tag, util::temp_tag::TempTag};
3434

35-
pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
35+
pub(crate) type ApiClient = irpc::Client<proto::Request>;
3636

3737
#[common_fields({
3838
backtrace: Option<Backtrace>,
@@ -281,42 +281,8 @@ impl Store {
281281

282282
/// Listen on a quinn endpoint for incoming rpc connections.
283283
pub async fn listen(self, endpoint: quinn::Endpoint) {
284-
let local = self.client.local().unwrap().clone();
285-
let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
286-
let local = local.clone();
287-
Box::pin({
288-
match req {
289-
Request::SetTag(msg) => local.send((msg, tx)),
290-
Request::CreateTag(msg) => local.send((msg, tx)),
291-
Request::DeleteTags(msg) => local.send((msg, tx)),
292-
Request::RenameTag(msg) => local.send((msg, tx)),
293-
Request::ListTags(msg) => local.send((msg, tx)),
294-
295-
Request::ListTempTags(msg) => local.send((msg, tx)),
296-
Request::CreateTempTag(msg) => local.send((msg, tx)),
297-
298-
Request::BlobStatus(msg) => local.send((msg, tx)),
299-
300-
Request::ImportBytes(msg) => local.send((msg, tx)),
301-
Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
302-
Request::ImportBao(msg) => local.send((msg, tx, rx)),
303-
Request::ImportPath(msg) => local.send((msg, tx)),
304-
Request::ListBlobs(msg) => local.send((msg, tx)),
305-
Request::DeleteBlobs(msg) => local.send((msg, tx)),
306-
Request::Batch(msg) => local.send((msg, tx, rx)),
307-
308-
Request::ExportBao(msg) => local.send((msg, tx)),
309-
Request::ExportRanges(msg) => local.send((msg, tx)),
310-
Request::ExportPath(msg) => local.send((msg, tx)),
311-
312-
Request::Observe(msg) => local.send((msg, tx)),
313-
314-
Request::ClearProtected(msg) => local.send((msg, tx)),
315-
Request::SyncDb(msg) => local.send((msg, tx)),
316-
Request::Shutdown(msg) => local.send((msg, tx)),
317-
}
318-
})
319-
});
284+
let local = self.client.as_local().unwrap().clone();
285+
let handler = Request::remote_handler(local);
320286
listen::<Request>(endpoint, handler).await
321287
}
322288

src/api/downloader.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,10 @@ use crate::{
2929

3030
#[derive(Debug, Clone)]
3131
pub struct Downloader {
32-
client: irpc::Client<SwarmMsg, SwarmProtocol, DownloaderService>,
32+
client: irpc::Client<SwarmProtocol>,
3333
}
3434

35-
#[derive(Debug, Clone)]
36-
pub struct DownloaderService;
37-
38-
impl irpc::Service for DownloaderService {}
39-
40-
#[rpc_requests(DownloaderService, message = SwarmMsg, alias = "Msg")]
35+
#[rpc_requests(message = SwarmMsg, alias = "Msg")]
4136
#[derive(Debug, Serialize, Deserialize)]
4237
enum SwarmProtocol {
4338
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
@@ -711,9 +706,9 @@ mod tests {
711706
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
712707
let tt1 = store1.add_slice("hello world").await?;
713708
let tt2 = store2.add_slice("hello world 2").await?;
714-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
709+
let node1_addr = r1.endpoint().node_addr().initialized().await;
715710
let node1_id = node1_addr.node_id;
716-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
711+
let node2_addr = r2.endpoint().node_addr().initialized().await;
717712
let node2_id = node2_addr.node_id;
718713
let swarm = Downloader::new(&store3, r3.endpoint());
719714
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -750,9 +745,9 @@ mod tests {
750745
format: crate::BlobFormat::HashSeq,
751746
})
752747
.await?;
753-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
748+
let node1_addr = r1.endpoint().node_addr().initialized().await;
754749
let node1_id = node1_addr.node_id;
755-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
750+
let node2_addr = r2.endpoint().node_addr().initialized().await;
756751
let node2_id = node2_addr.node_id;
757752
let swarm = Downloader::new(&store3, r3.endpoint());
758753
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -819,9 +814,9 @@ mod tests {
819814
format: crate::BlobFormat::HashSeq,
820815
})
821816
.await?;
822-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
817+
let node1_addr = r1.endpoint().node_addr().initialized().await;
823818
let node1_id = node1_addr.node_id;
824-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
819+
let node2_addr = r2.endpoint().node_addr().initialized().await;
825820
let node2_id = node2_addr.node_id;
826821
let swarm = Downloader::new(&store3, r3.endpoint());
827822
r3.endpoint().add_node_addr(node1_addr.clone())?;

src/api/proto.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,7 @@ impl HashSpecific for CreateTagMsg {
8686
}
8787
}
8888

89-
#[derive(Debug, Clone)]
90-
pub struct StoreService;
91-
impl irpc::Service for StoreService {}
92-
93-
#[rpc_requests(StoreService, message = Command, alias = "Msg")]
89+
#[rpc_requests(message = Command, alias = "Msg")]
9490
#[derive(Debug, Serialize, Deserialize)]
9591
pub enum Request {
9692
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]

src/net_protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl BlobsProtocol {
9999
/// just a convenience method to create a ticket from content and the address of this node.
100100
pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
101101
let content = content.into();
102-
let addr = self.inner.endpoint.node_addr().initialized().await?;
102+
let addr = self.inner.endpoint.node_addr().initialized().await;
103103
let ticket = BlobTicket::new(addr, content.hash, content.format);
104104
Ok(ticket)
105105
}

src/store/fs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1259,7 +1259,7 @@ impl AsRef<Store> for FsStore {
12591259

12601260
impl FsStore {
12611261
fn new(
1262-
sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1262+
sender: irpc::LocalSender<proto::Request>,
12631263
db: tokio::sync::mpsc::Sender<InternalCommand>,
12641264
) -> Self {
12651265
Self {

src/store/fs/gc.rs

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
use std::collections::HashSet;
1+
use std::{collections::HashSet, pin::Pin, sync::Arc};
22

33
use bao_tree::ChunkRanges;
44
use genawaiter::sync::{Co, Gen};
55
use n0_future::{Stream, StreamExt};
6-
use tracing::{debug, error, warn};
6+
use tracing::{debug, error, info, warn};
77

88
use crate::{api::Store, Hash, HashAndFormat};
99

@@ -130,14 +130,52 @@ fn gc_sweep<'a>(
130130
})
131131
}
132132

133-
#[derive(Debug, Clone)]
133+
/// Configuration for garbage collection.
134+
#[derive(derive_more::Debug, Clone)]
134135
pub struct GcConfig {
136+
/// Interval in which to run garbage collection.
135137
pub interval: std::time::Duration,
138+
/// Optional callback to manually add protected blobs.
139+
///
140+
/// The callback is called before each garbage collection run. It gets a `&mut HashSet<Hash>`
141+
/// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the
142+
/// [`HashSet`] will be protected from garbage collection during this run.
143+
///
144+
/// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return
145+
/// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your
146+
/// source of hashes to protect returned an error, and thus garbage collection should be skipped
147+
/// completely to not unintentionally delete blobs that should be protected.
148+
#[debug("ProtectCallback")]
149+
pub add_protected: Option<ProtectCb>,
136150
}
137151

152+
/// Returned from [`ProtectCb`].
153+
///
154+
/// See [`GcConfig::add_protected] for details.
155+
#[derive(Debug)]
156+
pub enum ProtectOutcome {
157+
/// Continue with the garbage collection run.
158+
Continue,
159+
/// Abort the garbage collection run.
160+
Abort,
161+
}
162+
163+
/// The type of the garbage collection callback.
164+
///
165+
/// See [`GcConfig::add_protected] for details.
166+
pub type ProtectCb = Arc<
167+
dyn for<'a> Fn(
168+
&'a mut HashSet<Hash>,
169+
)
170+
-> Pin<Box<dyn std::future::Future<Output = ProtectOutcome> + Send + Sync + 'a>>
171+
+ Send
172+
+ Sync
173+
+ 'static,
174+
>;
175+
138176
pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api::Result<()> {
177+
debug!(externally_protected = live.len(), "gc: start");
139178
{
140-
live.clear();
141179
store.clear_protected().await?;
142180
let mut stream = gc_mark(store, live);
143181
while let Some(ev) = stream.next().await {
@@ -155,6 +193,7 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
155193
}
156194
}
157195
}
196+
debug!(total_protected = live.len(), "gc: sweep");
158197
{
159198
let mut stream = gc_sweep(store, live);
160199
while let Some(ev) = stream.next().await {
@@ -172,14 +211,26 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet<Hash>) -> crate::api:
172211
}
173212
}
174213
}
214+
debug!("gc: done");
175215

176216
Ok(())
177217
}
178218

179219
pub async fn run_gc(store: Store, config: GcConfig) {
220+
debug!("gc enabled with interval {:?}", config.interval);
180221
let mut live = HashSet::new();
181222
loop {
223+
live.clear();
182224
tokio::time::sleep(config.interval).await;
225+
if let Some(ref cb) = config.add_protected {
226+
match (cb)(&mut live).await {
227+
ProtectOutcome::Continue => {}
228+
ProtectOutcome::Abort => {
229+
info!("abort gc run: protect callback indicated abort");
230+
continue;
231+
}
232+
}
233+
}
183234
if let Err(e) = gc_run_once(&store, &mut live).await {
184235
error!("error during gc run: {e}");
185236
break;
@@ -288,6 +339,7 @@ mod tests {
288339
assert!(!data_path.exists());
289340
assert!(!outboard_path.exists());
290341
}
342+
live.clear();
291343
// create a large partial file and check that the data and outboard file as well as
292344
// the sizes and bitfield files are deleted by gc
293345
{

0 commit comments

Comments
 (0)