Skip to content

Commit f891633

Browse files
committed
Merge branch 'entity-manager' into lightweight-watcher
2 parents 591769f + 91af79e commit f891633

File tree

14 files changed

+497
-327
lines changed

14 files changed

+497
-327
lines changed

Cargo.lock

Lines changed: 219 additions & 232 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "iroh-blobs"
3-
version = "0.91.0"
3+
version = "0.93.0"
44
edition = "2021"
55
description = "content-addressed blobs for iroh"
66
license = "MIT OR Apache-2.0"
@@ -37,12 +37,12 @@ chrono = "0.4.39"
3737
nested_enum_utils = "0.2.1"
3838
ref-cast = "1.0.24"
3939
arrayvec = "0.7.6"
40-
iroh = "0.90"
40+
iroh = "0.91"
4141
self_cell = "1.1.0"
4242
genawaiter = { version = "0.99.1", features = ["futures03"] }
43-
iroh-base = "0.90"
43+
iroh-base = "0.91"
4444
reflink-copy = "0.1.24"
45-
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false }
45+
irpc = { version = "0.7.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
4747
atomic_refcell = "0.1.13"
4848

@@ -60,15 +60,9 @@ tracing-subscriber = { version = "0.3.19", features = ["fmt"] }
6060
tracing-test = "0.2.5"
6161
walkdir = "2.5.0"
6262
atomic_refcell = "0.1.13"
63-
iroh = { version = "0.90", features = ["discovery-local-network"]}
63+
iroh = { version = "0.91", features = ["discovery-local-network"]}
6464

6565
[features]
6666
hide-proto-docs = []
6767
metrics = []
6868
default = ["hide-proto-docs"]
69-
70-
[patch.crates-io]
71-
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
72-
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
73-
irpc = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }
74-
irpc-derive = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }

deny.toml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,3 @@ name = "ring"
3939
[[licenses.clarify.license-files]]
4040
hash = 3171872035
4141
path = "LICENSE"
42-
43-
[sources]
44-
allow-git = [
45-
"https://github.com/n0-computer/irpc.git",
46-
"https://github.com/n0-computer/iroh.git",
47-
]

examples/expiring-tags.rs

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
//! This example shows how to create tags that expire after a certain time.
2+
//!
3+
//! We use a prefix so we can distinguish between expiring and normal tags, and
4+
//! then encode the expiry date in the tag name after the prefix, in a format
5+
//! that sorts in the same order as the expiry date.
6+
//!
7+
//! The example creates a number of blobs and protects them directly or indirectly
8+
//! with expiring tags. Watch as the expired tags are deleted and the blobs
9+
//! are removed from the store.
10+
use std::{
11+
ops::Deref,
12+
time::{Duration, SystemTime},
13+
};
14+
15+
use chrono::Utc;
16+
use futures_lite::StreamExt;
17+
use iroh_blobs::{
18+
api::{blobs::AddBytesOptions, Store, Tag},
19+
hashseq::HashSeq,
20+
store::fs::options::{BatchOptions, GcConfig, InlineOptions, Options, PathOptions},
21+
BlobFormat, Hash,
22+
};
23+
use tokio::signal::ctrl_c;
24+
25+
/// Using an iroh rpc client, create a tag that is marked to expire at `expiry` for all the given hashes.
26+
///
27+
/// The tag name will be `prefix`- followed by the expiry date in iso8601 format (e.g. `expiry-2025-01-01T12:00:00Z`).
28+
async fn create_expiring_tag(
29+
store: &Store,
30+
hashes: &[Hash],
31+
prefix: &str,
32+
expiry: SystemTime,
33+
) -> anyhow::Result<()> {
34+
let expiry = chrono::DateTime::<chrono::Utc>::from(expiry);
35+
let expiry = expiry.to_rfc3339_opts(chrono::SecondsFormat::Secs, true);
36+
let tagname = format!("{prefix}-{expiry}");
37+
if hashes.is_empty() {
38+
return Ok(());
39+
} else if hashes.len() == 1 {
40+
let hash = hashes[0];
41+
store.tags().set(&tagname, hash).await?;
42+
} else {
43+
let hs = hashes.iter().copied().collect::<HashSeq>();
44+
store
45+
.add_bytes_with_opts(AddBytesOptions {
46+
data: hs.into(),
47+
format: BlobFormat::HashSeq,
48+
})
49+
.with_named_tag(&tagname)
50+
.await?;
51+
};
52+
println!("Created tag {tagname}");
53+
Ok(())
54+
}
55+
56+
async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow::Result<()> {
57+
let prefix = format!("{prefix}-");
58+
let now = chrono::Utc::now();
59+
let end = format!(
60+
"{}-{}",
61+
prefix,
62+
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
63+
);
64+
if bulk {
65+
// delete all tags with the prefix and an expiry date before now
66+
//
67+
// this should be very efficient, since it is just a single database operation
68+
blobs
69+
.tags()
70+
.delete_range(Tag::from(prefix.clone())..Tag::from(end))
71+
.await?;
72+
} else {
73+
// find tags to delete one by one and then delete them
74+
//
75+
// this allows us to print the tags before deleting them
76+
let mut tags = blobs.tags().list().await?;
77+
let mut to_delete = Vec::new();
78+
while let Some(tag) = tags.next().await {
79+
let tag = tag?.name;
80+
if let Some(rest) = tag.0.strip_prefix(prefix.as_bytes()) {
81+
let Ok(expiry) = std::str::from_utf8(rest) else {
82+
tracing::warn!("Tag {} does have non utf8 expiry", tag);
83+
continue;
84+
};
85+
let Ok(expiry) = chrono::DateTime::parse_from_rfc3339(expiry) else {
86+
tracing::warn!("Tag {} does have invalid expiry date", tag);
87+
continue;
88+
};
89+
let expiry = expiry.with_timezone(&Utc);
90+
if expiry < now {
91+
to_delete.push(tag);
92+
}
93+
}
94+
}
95+
for tag in to_delete {
96+
println!("Deleting expired tag {tag}\n");
97+
blobs.tags().delete(tag).await?;
98+
}
99+
}
100+
Ok(())
101+
}
102+
103+
async fn print_store_info(store: &Store) -> anyhow::Result<()> {
104+
let now = chrono::Utc::now();
105+
let mut tags = store.tags().list().await?;
106+
println!(
107+
"Current time: {}",
108+
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
109+
);
110+
println!("Tags:");
111+
while let Some(tag) = tags.next().await {
112+
let tag = tag?;
113+
println!(" {tag:?}");
114+
}
115+
let mut blobs = store.list().stream().await?;
116+
println!("Blobs:");
117+
while let Some(item) = blobs.next().await {
118+
println!(" {}", item?);
119+
}
120+
println!();
121+
Ok(())
122+
}
123+
124+
async fn info_task(store: Store) -> anyhow::Result<()> {
125+
tokio::time::sleep(Duration::from_secs(1)).await;
126+
loop {
127+
print_store_info(&store).await?;
128+
tokio::time::sleep(Duration::from_secs(5)).await;
129+
}
130+
}
131+
132+
async fn delete_expired_tags_task(store: Store, prefix: &str) -> anyhow::Result<()> {
133+
loop {
134+
delete_expired_tags(&store, prefix, false).await?;
135+
tokio::time::sleep(Duration::from_secs(5)).await;
136+
}
137+
}
138+
139+
#[tokio::main]
140+
async fn main() -> anyhow::Result<()> {
141+
tracing_subscriber::fmt::init();
142+
let path = std::env::current_dir()?.join("blobs");
143+
let options = Options {
144+
path: PathOptions::new(&path),
145+
gc: Some(GcConfig {
146+
add_protected: None,
147+
interval: Duration::from_secs(10),
148+
}),
149+
inline: InlineOptions::default(),
150+
batch: BatchOptions::default(),
151+
};
152+
let store =
153+
iroh_blobs::store::fs::FsStore::load_with_opts(path.join("blobs.db"), options).await?;
154+
155+
// setup: add some data and tag it
156+
{
157+
// add several blobs and tag them with an expiry date 10 seconds in the future
158+
let batch = store.batch().await?;
159+
let a = batch.add_bytes("blob 1".as_bytes()).await?;
160+
let b = batch.add_bytes("blob 2".as_bytes()).await?;
161+
162+
let expires_at = SystemTime::now()
163+
.checked_add(Duration::from_secs(10))
164+
.unwrap();
165+
create_expiring_tag(&store, &[*a.hash(), *b.hash()], "expiring", expires_at).await?;
166+
167+
// add a single blob and tag it with an expiry date 60 seconds in the future
168+
let c = batch.add_bytes("blob 3".as_bytes()).await?;
169+
let expires_at = SystemTime::now()
170+
.checked_add(Duration::from_secs(60))
171+
.unwrap();
172+
create_expiring_tag(&store, &[*c.hash()], "expiring", expires_at).await?;
173+
// batch goes out of scope, so data is only protected by the tags we created
174+
}
175+
176+
// delete expired tags every 5 seconds
177+
let delete_task = tokio::spawn(delete_expired_tags_task(store.deref().clone(), "expiring"));
178+
// print all tags and blobs every 5 seconds
179+
let info_task = tokio::spawn(info_task(store.deref().clone()));
180+
181+
ctrl_c().await?;
182+
delete_task.abort();
183+
info_task.abort();
184+
store.shutdown().await?;
185+
Ok(())
186+
}

examples/random_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
241241
let router = iroh::protocol::Router::builder(endpoint.clone())
242242
.accept(iroh_blobs::ALPN, blobs)
243243
.spawn();
244-
let addr = router.endpoint().node_addr().initialized().await?;
244+
let addr = router.endpoint().node_addr().initialized().await;
245245
let ticket = NodeTicket::from(addr.clone());
246246
println!("Node address: {addr:?}");
247247
println!("ticket:\n{ticket}");

src/api.rs

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
//!
1313
//! You can also [`connect`](Store::connect) to a remote store that is listening
1414
//! to rpc requests.
15-
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
15+
use std::{io, net::SocketAddr, ops::Deref};
1616

1717
use bao_tree::io::EncodeError;
1818
use iroh::Endpoint;
19-
use irpc::rpc::{listen, Handler};
19+
use irpc::rpc::{listen, RemoteService};
2020
use n0_snafu::SpanTrace;
2121
use nested_enum_utils::common_fields;
2222
use proto::{Request, ShutdownRequest, SyncDbRequest};
@@ -32,7 +32,7 @@ pub mod remote;
3232
pub mod tags;
3333
pub use crate::{store::util::Tag, util::temp_tag::TempTag};
3434

35-
pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
35+
pub(crate) type ApiClient = irpc::Client<proto::Request>;
3636

3737
#[common_fields({
3838
backtrace: Option<Backtrace>,
@@ -281,42 +281,8 @@ impl Store {
281281

282282
/// Listen on a quinn endpoint for incoming rpc connections.
283283
pub async fn listen(self, endpoint: quinn::Endpoint) {
284-
let local = self.client.local().unwrap().clone();
285-
let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
286-
let local = local.clone();
287-
Box::pin({
288-
match req {
289-
Request::SetTag(msg) => local.send((msg, tx)),
290-
Request::CreateTag(msg) => local.send((msg, tx)),
291-
Request::DeleteTags(msg) => local.send((msg, tx)),
292-
Request::RenameTag(msg) => local.send((msg, tx)),
293-
Request::ListTags(msg) => local.send((msg, tx)),
294-
295-
Request::ListTempTags(msg) => local.send((msg, tx)),
296-
Request::CreateTempTag(msg) => local.send((msg, tx)),
297-
298-
Request::BlobStatus(msg) => local.send((msg, tx)),
299-
300-
Request::ImportBytes(msg) => local.send((msg, tx)),
301-
Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
302-
Request::ImportBao(msg) => local.send((msg, tx, rx)),
303-
Request::ImportPath(msg) => local.send((msg, tx)),
304-
Request::ListBlobs(msg) => local.send((msg, tx)),
305-
Request::DeleteBlobs(msg) => local.send((msg, tx)),
306-
Request::Batch(msg) => local.send((msg, tx, rx)),
307-
308-
Request::ExportBao(msg) => local.send((msg, tx)),
309-
Request::ExportRanges(msg) => local.send((msg, tx)),
310-
Request::ExportPath(msg) => local.send((msg, tx)),
311-
312-
Request::Observe(msg) => local.send((msg, tx)),
313-
314-
Request::ClearProtected(msg) => local.send((msg, tx)),
315-
Request::SyncDb(msg) => local.send((msg, tx)),
316-
Request::Shutdown(msg) => local.send((msg, tx)),
317-
}
318-
})
319-
});
284+
let local = self.client.as_local().unwrap().clone();
285+
let handler = Request::remote_handler(local);
320286
listen::<Request>(endpoint, handler).await
321287
}
322288

src/api/downloader.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,10 @@ use crate::{
2929

3030
#[derive(Debug, Clone)]
3131
pub struct Downloader {
32-
client: irpc::Client<SwarmMsg, SwarmProtocol, DownloaderService>,
32+
client: irpc::Client<SwarmProtocol>,
3333
}
3434

35-
#[derive(Debug, Clone)]
36-
pub struct DownloaderService;
37-
38-
impl irpc::Service for DownloaderService {}
39-
40-
#[rpc_requests(DownloaderService, message = SwarmMsg, alias = "Msg")]
35+
#[rpc_requests(message = SwarmMsg, alias = "Msg")]
4136
#[derive(Debug, Serialize, Deserialize)]
4237
enum SwarmProtocol {
4338
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
@@ -711,9 +706,9 @@ mod tests {
711706
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
712707
let tt1 = store1.add_slice("hello world").await?;
713708
let tt2 = store2.add_slice("hello world 2").await?;
714-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
709+
let node1_addr = r1.endpoint().node_addr().initialized().await;
715710
let node1_id = node1_addr.node_id;
716-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
711+
let node2_addr = r2.endpoint().node_addr().initialized().await;
717712
let node2_id = node2_addr.node_id;
718713
let swarm = Downloader::new(&store3, r3.endpoint());
719714
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -750,9 +745,9 @@ mod tests {
750745
format: crate::BlobFormat::HashSeq,
751746
})
752747
.await?;
753-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
748+
let node1_addr = r1.endpoint().node_addr().initialized().await;
754749
let node1_id = node1_addr.node_id;
755-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
750+
let node2_addr = r2.endpoint().node_addr().initialized().await;
756751
let node2_id = node2_addr.node_id;
757752
let swarm = Downloader::new(&store3, r3.endpoint());
758753
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -819,9 +814,9 @@ mod tests {
819814
format: crate::BlobFormat::HashSeq,
820815
})
821816
.await?;
822-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
817+
let node1_addr = r1.endpoint().node_addr().initialized().await;
823818
let node1_id = node1_addr.node_id;
824-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
819+
let node2_addr = r2.endpoint().node_addr().initialized().await;
825820
let node2_id = node2_addr.node_id;
826821
let swarm = Downloader::new(&store3, r3.endpoint());
827822
r3.endpoint().add_node_addr(node1_addr.clone())?;

src/api/proto.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,7 @@ impl HashSpecific for CreateTagMsg {
8686
}
8787
}
8888

89-
#[derive(Debug, Clone)]
90-
pub struct StoreService;
91-
impl irpc::Service for StoreService {}
92-
93-
#[rpc_requests(StoreService, message = Command, alias = "Msg")]
89+
#[rpc_requests(message = Command, alias = "Msg")]
9490
#[derive(Debug, Serialize, Deserialize)]
9591
pub enum Request {
9692
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]

src/net_protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl BlobsProtocol {
9999
/// just a convenience method to create a ticket from content and the address of this node.
100100
pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
101101
let content = content.into();
102-
let addr = self.inner.endpoint.node_addr().initialized().await?;
102+
let addr = self.inner.endpoint.node_addr().initialized().await;
103103
let ticket = BlobTicket::new(addr, content.hash, content.format);
104104
Ok(ticket)
105105
}

src/store/fs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1266,7 +1266,7 @@ impl AsRef<Store> for FsStore {
12661266

12671267
impl FsStore {
12681268
fn new(
1269-
sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1269+
sender: irpc::LocalSender<proto::Request>,
12701270
db: tokio::sync::mpsc::Sender<InternalCommand>,
12711271
) -> Self {
12721272
Self {

0 commit comments

Comments
 (0)