Skip to content

Commit 4b18802

Browse files
committed
Move the utilities for dealing with item/error/done into utils.
1 parent 7c05401 commit 4b18802

File tree

11 files changed

+369
-105
lines changed

11 files changed

+369
-105
lines changed

examples/expiring-tags.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:
7373
// find tags to delete one by one and then delete them
7474
//
7575
// this allows us to print the tags before deleting them
76-
let mut tags = blobs.tags().list().await?;
76+
let mut tags = blobs.tags().list().stream();
7777
let mut to_delete = Vec::new();
7878
while let Some(tag) = tags.next().await {
7979
let tag = tag?.name;
@@ -102,7 +102,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:
102102

103103
async fn print_store_info(store: &Store) -> anyhow::Result<()> {
104104
let now = chrono::Utc::now();
105-
let mut tags = store.tags().list().await?;
105+
let mut tags = store.tags().list().stream();
106106
println!(
107107
"Current time: {}",
108108
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
@@ -112,7 +112,7 @@ async fn print_store_info(store: &Store) -> anyhow::Result<()> {
112112
let tag = tag?;
113113
println!(" {tag:?}");
114114
}
115-
let mut blobs = store.list().stream().await?;
115+
let mut blobs = store.list().stream();
116116
println!("Blobs:");
117117
while let Some(item) = blobs.next().await {
118118
println!(" {}", item?);

src/api/blobs.rs

Lines changed: 31 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ use crate::{
5959
api::proto::{BatchRequest, ImportByteStreamUpdate, ListBlobsItem},
6060
provider::StreamContext,
6161
store::IROH_BLOCK_SIZE,
62-
util::temp_tag::TempTag,
62+
util::{
63+
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
64+
temp_tag::TempTag,
65+
},
6366
BlobFormat, Hash, HashAndFormat,
6467
};
6568

@@ -847,43 +850,36 @@ impl BlobsListProgress {
847850
}
848851
}
849852

850-
pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
851-
let mut rx = self.inner.await?;
852-
let mut hashes = Vec::new();
853-
loop {
854-
match rx.recv().await? {
855-
Some(ListBlobsItem::Item(hash)) => hashes.push(hash),
856-
Some(ListBlobsItem::Error(cause)) => return Err(cause.into()),
857-
Some(ListBlobsItem::Done) => break,
858-
None => return Err(super::Error::other("unexpected end of stream").into()),
859-
}
853+
pub async fn hashes(self) -> super::Result<Vec<Hash>> {
854+
self.inner.try_collect().await
855+
}
856+
857+
pub fn stream(self) -> impl Stream<Item = super::Result<Hash>> {
858+
self.inner.into_stream()
859+
}
860+
}
861+
862+
impl IrpcStreamItem for ListBlobsItem {
863+
type Error = super::Error;
864+
type Item = Hash;
865+
866+
fn into_result_opt(self) -> Option<Result<Hash, super::Error>> {
867+
match self {
868+
Self::Item(hash) => Some(Ok(hash)),
869+
Self::Error(e) => Some(Err(e.into())),
870+
Self::Done => None,
860871
}
861-
Ok(hashes)
862872
}
863873

864-
pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
865-
let mut rx = self.inner.await?;
866-
Ok(Gen::new(|co| async move {
867-
loop {
868-
match rx.recv().await {
869-
Ok(Some(ListBlobsItem::Item(hash))) => co.yield_(Ok(hash)).await,
870-
Ok(Some(ListBlobsItem::Error(cause))) => {
871-
co.yield_(Err(cause)).await;
872-
break;
873-
}
874-
Ok(Some(ListBlobsItem::Done)) => break,
875-
Ok(None) => {
876-
co.yield_(Err(super::Error::other("unexpected end of stream").into()))
877-
.await;
878-
break;
879-
}
880-
Err(cause) => {
881-
co.yield_(Err(cause.into())).await;
882-
break;
883-
}
884-
}
885-
}
886-
}))
874+
fn from_result(item: std::result::Result<Hash, super::Error>) -> Self {
875+
match item {
876+
Ok(hash) => Self::Item(hash),
877+
Err(e) => Self::Error(e.into()),
878+
}
879+
}
880+
881+
fn done() -> Self {
882+
Self::Done
887883
}
888884
}
889885

src/api/proto.rs

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
//! the much simpler memory store.
1616
use std::{
1717
fmt::{self, Debug},
18+
future::{Future, IntoFuture},
1819
io,
1920
num::NonZeroU64,
2021
ops::{Bound, RangeBounds},
@@ -28,17 +29,25 @@ use bao_tree::{
2829
ChunkRanges,
2930
};
3031
use bytes::Bytes;
32+
use genawaiter::sync::Gen;
3133
use irpc::{
3234
channel::{mpsc, oneshot},
3335
rpc_requests,
3436
};
35-
use n0_future::Stream;
37+
use n0_future::{future, Stream};
3638
use range_collections::RangeSet2;
3739
use serde::{Deserialize, Serialize};
3840
pub(crate) mod bitfield;
3941
pub use bitfield::Bitfield;
4042

41-
use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
43+
use crate::{
44+
store::util::Tag,
45+
util::{
46+
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
47+
temp_tag::TempTag,
48+
},
49+
BlobFormat, Hash, HashAndFormat,
50+
};
4251

4352
pub(crate) trait HashSpecific {
4453
fn hash(&self) -> Hash;
@@ -113,7 +122,7 @@ pub enum Request {
113122
ImportPath(ImportPathRequest),
114123
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
115124
ExportPath(ExportPathRequest),
116-
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
125+
#[rpc(tx = mpsc::Sender<ListTagsItem>)]
117126
ListTags(ListTagsRequest),
118127
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
119128
SetTag(SetTagRequest),
@@ -354,8 +363,110 @@ pub struct TagInfo {
354363
#[derive(Debug, Serialize, Deserialize)]
355364
pub enum ListBlobsItem {
356365
Item(Hash),
366+
Error(super::Error),
357367
Done,
368+
}
369+
370+
#[derive(Debug, Serialize, Deserialize)]
371+
pub enum ListTagsItem {
372+
Item(TagInfo),
358373
Error(super::Error),
374+
Done,
375+
}
376+
377+
impl From<std::result::Result<TagInfo, super::Error>> for ListTagsItem {
378+
fn from(item: std::result::Result<TagInfo, super::Error>) -> Self {
379+
match item {
380+
Ok(item) => ListTagsItem::Item(item),
381+
Err(err) => ListTagsItem::Error(err),
382+
}
383+
}
384+
}
385+
386+
impl IrpcStreamItem for ListTagsItem {
387+
type Error = super::Error;
388+
type Item = TagInfo;
389+
390+
fn into_result_opt(self) -> Option<Result<TagInfo, super::Error>> {
391+
match self {
392+
ListTagsItem::Item(item) => Some(Ok(item)),
393+
ListTagsItem::Done => None,
394+
ListTagsItem::Error(err) => Some(Err(err)),
395+
}
396+
}
397+
398+
fn from_result(item: std::result::Result<TagInfo, super::Error>) -> Self {
399+
match item {
400+
Ok(i) => Self::Item(i),
401+
Err(e) => Self::Error(e.into()),
402+
}
403+
}
404+
405+
fn done() -> Self {
406+
Self::Done
407+
}
408+
}
409+
410+
pub struct ListTagsProgress {
411+
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTagsItem>>>,
412+
}
413+
414+
impl IntoFuture for ListTagsProgress {
415+
fn into_future(self) -> Self::IntoFuture {
416+
Box::pin(self.inner.try_collect())
417+
}
418+
419+
type IntoFuture = future::Boxed<Self::Output>;
420+
421+
type Output = super::Result<Vec<TagInfo>>;
422+
}
423+
424+
impl ListTagsProgress {
425+
pub(super) fn new(
426+
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTagsItem>>> + Send + 'static,
427+
) -> Self {
428+
Self {
429+
inner: Box::pin(fut),
430+
}
431+
}
432+
433+
pub fn stream(self) -> impl Stream<Item = super::Result<TagInfo>> {
434+
Gen::new(|co| async move {
435+
let mut rx = match self.inner.await {
436+
Ok(rx) => rx,
437+
Err(err) => {
438+
co.yield_(Err(super::Error::from(err))).await;
439+
return;
440+
}
441+
};
442+
loop {
443+
match rx.recv().await {
444+
Ok(Some(ListTagsItem::Item(item))) => {
445+
co.yield_(Ok(item)).await;
446+
}
447+
Ok(Some(ListTagsItem::Done)) => {
448+
break;
449+
}
450+
Ok(Some(ListTagsItem::Error(err))) => {
451+
co.yield_(Err(err.into())).await;
452+
break;
453+
}
454+
Ok(None) => {
455+
co.yield_(Err(super::Error::Io(io::Error::new(
456+
io::ErrorKind::UnexpectedEof,
457+
"stream ended",
458+
))))
459+
.await;
460+
break;
461+
}
462+
Err(cause) => {
463+
co.yield_(Err(super::Error::from(cause))).await;
464+
break;
465+
}
466+
}
467+
}
468+
})
469+
}
359470
}
360471

361472
impl From<TagInfo> for HashAndFormat {

src/api/tags.rs

Lines changed: 15 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@ use super::{
1616
proto::{CreateTempTagRequest, Scope},
1717
ApiClient, Tag, TempTag,
1818
};
19-
use crate::{api::proto::ListTempTagsRequest, HashAndFormat};
19+
use crate::{
20+
api::proto::{ListTagsProgress, ListTempTagsRequest},
21+
HashAndFormat,
22+
};
2023

2124
/// The API for interacting with tags and temp tags.
2225
#[derive(Debug, Clone, ref_cast::RefCast)]
@@ -41,21 +44,15 @@ impl Tags {
4144
///
4245
/// This is the most flexible way to list tags. All the other list methods are just convenience
4346
/// methods that call this one with the appropriate options.
44-
pub async fn list_with_opts(
45-
&self,
46-
options: ListOptions,
47-
) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
47+
pub fn list_with_opts(&self, options: ListOptions) -> ListTagsProgress {
4848
trace!("{:?}", options);
49-
let res = self.client.rpc(options).await?;
50-
Ok(n0_future::stream::iter(res))
49+
ListTagsProgress::new(self.client.server_streaming(options, 32))
5150
}
5251

5352
/// Get the value of a single tag
5453
pub async fn get(&self, name: impl AsRef<[u8]>) -> super::RequestResult<Option<TagInfo>> {
55-
let mut stream = self
56-
.list_with_opts(ListOptions::single(name.as_ref()))
57-
.await?;
58-
Ok(stream.next().await.transpose()?)
54+
let progress = self.list_with_opts(ListOptions::single(name.as_ref()));
55+
Ok(progress.stream().next().await.transpose()?)
5956
}
6057

6158
pub async fn set_with_opts(&self, options: SetOptions) -> super::RequestResult<()> {
@@ -77,34 +74,27 @@ impl Tags {
7774
}
7875

7976
/// List a range of tags
80-
pub async fn list_range<R, E>(
81-
&self,
82-
range: R,
83-
) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>>
77+
pub fn list_range<R, E>(&self, range: R) -> ListTagsProgress
8478
where
8579
R: RangeBounds<E>,
8680
E: AsRef<[u8]>,
8781
{
88-
self.list_with_opts(ListOptions::range(range)).await
82+
self.list_with_opts(ListOptions::range(range))
8983
}
9084

9185
/// Lists all tags with the given prefix.
92-
pub async fn list_prefix(
93-
&self,
94-
prefix: impl AsRef<[u8]>,
95-
) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
86+
pub fn list_prefix(&self, prefix: impl AsRef<[u8]>) -> ListTagsProgress {
9687
self.list_with_opts(ListOptions::prefix(prefix.as_ref()))
97-
.await
9888
}
9989

10090
/// Lists all tags.
101-
pub async fn list(&self) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
102-
self.list_with_opts(ListOptions::all()).await
91+
pub fn list(&self) -> ListTagsProgress {
92+
self.list_with_opts(ListOptions::all())
10393
}
10494

10595
/// Lists all tags with a hash_seq format.
106-
pub async fn list_hash_seq(&self) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
107-
self.list_with_opts(ListOptions::hash_seq()).await
96+
pub fn list_hash_seq(&self) -> ListTagsProgress {
97+
self.list_with_opts(ListOptions::hash_seq())
10898
}
10999

110100
/// Deletes a tag.

src/store/fs/gc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub(super) async fn gc_mark_task(
4848
}
4949
let mut roots = HashSet::new();
5050
trace!("traversing tags");
51-
let mut tags = store.tags().list().await?;
51+
let mut tags = store.tags().list().stream();
5252
while let Some(tag) = tags.next().await {
5353
let info = tag?;
5454
trace!("adding root {:?} {:?}", info.name, info.hash_and_format());
@@ -85,7 +85,7 @@ async fn gc_sweep_task(
8585
live: &HashSet<Hash>,
8686
co: &Co<GcSweepEvent>,
8787
) -> crate::api::Result<()> {
88-
let mut blobs = store.blobs().list().stream().await?;
88+
let mut blobs = store.blobs().list().stream();
8989
let mut count = 0;
9090
let mut batch = Vec::new();
9191
while let Some(hash) = blobs.next().await {

src/store/fs/meta.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ use crate::{
2424
proto::{
2525
BlobDeleteRequest, BlobStatusMsg, BlobStatusRequest, ClearProtectedMsg,
2626
CreateTagRequest, DeleteBlobsMsg, DeleteTagsRequest, ListBlobsItem, ListBlobsMsg,
27-
ListRequest, ListTagsRequest, RenameTagRequest, SetTagRequest, ShutdownMsg, SyncDbMsg,
27+
ListRequest, ListTagsItem, ListTagsRequest, RenameTagRequest, SetTagRequest,
28+
ShutdownMsg, SyncDbMsg,
2829
},
2930
tags::TagInfo,
3031
},
@@ -319,7 +320,20 @@ async fn handle_list_tags(msg: ListTagsMsg, tables: &impl ReadableTables) -> Act
319320
}
320321
}
321322
}
322-
tx.send(res).await.ok();
323+
for item in res {
324+
match item {
325+
Ok(tag_info) => {
326+
if tx.send(ListTagsItem::Item(tag_info)).await.is_err() {
327+
return Ok(());
328+
}
329+
}
330+
Err(err) => {
331+
tx.send(ListTagsItem::Error(err)).await.ok();
332+
return Ok(());
333+
}
334+
}
335+
}
336+
tx.send(ListTagsItem::Done).await.ok();
323337
Ok(())
324338
}
325339

0 commit comments

Comments
 (0)