Skip to content

feat: streaming ListTags rpc call #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/expiring-tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:
// find tags to delete one by one and then delete them
//
// this allows us to print the tags before deleting them
let mut tags = blobs.tags().list().await?;
let mut tags = blobs.tags().list().stream();
let mut to_delete = Vec::new();
while let Some(tag) = tags.next().await {
let tag = tag?.name;
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:

async fn print_store_info(store: &Store) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let mut tags = store.tags().list().await?;
let mut tags = store.tags().list().stream();
println!(
"Current time: {}",
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
Expand All @@ -112,7 +112,7 @@ async fn print_store_info(store: &Store) -> anyhow::Result<()> {
let tag = tag?;
println!(" {tag:?}");
}
let mut blobs = store.list().stream().await?;
let mut blobs = store.list().stream();
println!("Blobs:");
while let Some(item) = blobs.next().await {
println!(" {}", item?);
Expand Down
51 changes: 34 additions & 17 deletions src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ use super::{
ApiClient, RequestResult, Tags,
};
use crate::{
api::proto::{BatchRequest, ImportByteStreamUpdate},
api::proto::{BatchRequest, ImportByteStreamUpdate, ListBlobsItem},
provider::StreamContext,
store::IROH_BLOCK_SIZE,
util::temp_tag::TempTag,
util::{
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
temp_tag::TempTag,
},
BlobFormat, Hash, HashAndFormat,
};

Expand Down Expand Up @@ -835,34 +838,48 @@ impl ImportBaoHandle {

/// A progress handle for a blobs list operation.
pub struct BlobsListProgress {
inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListBlobsItem>>>,
}

impl BlobsListProgress {
fn new(
fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListBlobsItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
}
}

pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
let mut hashes = Vec::new();
while let Some(item) = rx.recv().await? {
hashes.push(item?);
pub async fn hashes(self) -> super::Result<Vec<Hash>> {
self.inner.try_collect().await
}

pub fn stream(self) -> impl Stream<Item = super::Result<Hash>> {
self.inner.into_stream()
}
}

impl IrpcStreamItem for ListBlobsItem {
type Error = super::Error;
type Item = Hash;

fn into_result_opt(self) -> Option<Result<Hash, super::Error>> {
match self {
Self::Item(hash) => Some(Ok(hash)),
Self::Error(e) => Some(Err(e)),
Self::Done => None,
}
Ok(hashes)
}

pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
let mut rx = self.inner.await?;
Ok(Gen::new(|co| async move {
while let Ok(Some(item)) = rx.recv().await {
co.yield_(item).await;
}
}))
fn from_result(item: std::result::Result<Hash, super::Error>) -> Self {
match item {
Ok(hash) => Self::Item(hash),
Err(e) => Self::Error(e),
}
}

fn done() -> Self {
Self::Done
}
}

Expand Down
91 changes: 87 additions & 4 deletions src/api/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! the much simpler memory store.
use std::{
fmt::{self, Debug},
future::{Future, IntoFuture},
io,
num::NonZeroU64,
ops::{Bound, RangeBounds},
Expand All @@ -32,13 +33,20 @@ use irpc::{
channel::{mpsc, oneshot},
rpc_requests,
};
use n0_future::Stream;
use n0_future::{future, Stream};
use range_collections::RangeSet2;
use serde::{Deserialize, Serialize};
pub(crate) mod bitfield;
pub use bitfield::Bitfield;

use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
use crate::{
store::util::Tag,
util::{
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
temp_tag::TempTag,
},
BlobFormat, Hash, HashAndFormat,
};

pub(crate) trait HashSpecific {
fn hash(&self) -> Hash;
Expand Down Expand Up @@ -89,7 +97,7 @@ impl HashSpecific for CreateTagMsg {
#[rpc_requests(message = Command, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
#[rpc(tx = mpsc::Sender<ListBlobsItem>)]
ListBlobs(ListRequest),
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
Batch(BatchRequest),
Expand All @@ -113,7 +121,7 @@ pub enum Request {
ImportPath(ImportPathRequest),
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
ExportPath(ExportPathRequest),
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
#[rpc(tx = mpsc::Sender<ListTagsItem>)]
ListTags(ListTagsRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
SetTag(SetTagRequest),
Expand Down Expand Up @@ -351,6 +359,81 @@ pub struct TagInfo {
pub hash: Hash,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ListBlobsItem {
Item(Hash),
Error(super::Error),
Done,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ListTagsItem {
Item(TagInfo),
Error(super::Error),
Done,
}

impl From<std::result::Result<TagInfo, super::Error>> for ListTagsItem {
fn from(item: std::result::Result<TagInfo, super::Error>) -> Self {
match item {
Ok(item) => ListTagsItem::Item(item),
Err(err) => ListTagsItem::Error(err),
}
}
}

impl IrpcStreamItem for ListTagsItem {
type Error = super::Error;
type Item = TagInfo;

fn into_result_opt(self) -> Option<Result<TagInfo, super::Error>> {
match self {
ListTagsItem::Item(item) => Some(Ok(item)),
ListTagsItem::Done => None,
ListTagsItem::Error(err) => Some(Err(err)),
}
}

fn from_result(item: std::result::Result<TagInfo, super::Error>) -> Self {
match item {
Ok(i) => Self::Item(i),
Err(e) => Self::Error(e),
}
}

fn done() -> Self {
Self::Done
}
}

pub struct ListTagsProgress {
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTagsItem>>>,
}

impl IntoFuture for ListTagsProgress {
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.inner.try_collect())
}

type IntoFuture = future::Boxed<Self::Output>;

type Output = super::Result<Vec<TagInfo>>;
}

impl ListTagsProgress {
pub(super) fn new(
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTagsItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
}
}

pub fn stream(self) -> impl Stream<Item = super::Result<TagInfo>> {
self.inner.into_stream()
}
}

impl From<TagInfo> for HashAndFormat {
fn from(tag_info: TagInfo) -> Self {
HashAndFormat {
Expand Down
40 changes: 15 additions & 25 deletions src/api/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use super::{
proto::{CreateTempTagRequest, Scope},
ApiClient, Tag, TempTag,
};
use crate::{api::proto::ListTempTagsRequest, HashAndFormat};
use crate::{
api::proto::{ListTagsProgress, ListTempTagsRequest},
HashAndFormat,
};

/// The API for interacting with tags and temp tags.
#[derive(Debug, Clone, ref_cast::RefCast)]
Expand All @@ -41,21 +44,15 @@ impl Tags {
///
/// This is the most flexible way to list tags. All the other list methods are just convenience
/// methods that call this one with the appropriate options.
pub async fn list_with_opts(
&self,
options: ListOptions,
) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
pub fn list_with_opts(&self, options: ListOptions) -> ListTagsProgress {
trace!("{:?}", options);
let res = self.client.rpc(options).await?;
Ok(n0_future::stream::iter(res))
ListTagsProgress::new(self.client.server_streaming(options, 32))
}

/// Get the value of a single tag
pub async fn get(&self, name: impl AsRef<[u8]>) -> super::RequestResult<Option<TagInfo>> {
let mut stream = self
.list_with_opts(ListOptions::single(name.as_ref()))
.await?;
Ok(stream.next().await.transpose()?)
let progress = self.list_with_opts(ListOptions::single(name.as_ref()));
Ok(progress.stream().next().await.transpose()?)
}

pub async fn set_with_opts(&self, options: SetOptions) -> super::RequestResult<()> {
Expand All @@ -77,34 +74,27 @@ impl Tags {
}

/// List a range of tags
pub async fn list_range<R, E>(
&self,
range: R,
) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>>
pub fn list_range<R, E>(&self, range: R) -> ListTagsProgress
where
R: RangeBounds<E>,
E: AsRef<[u8]>,
{
self.list_with_opts(ListOptions::range(range)).await
self.list_with_opts(ListOptions::range(range))
}

/// Lists all tags with the given prefix.
pub async fn list_prefix(
&self,
prefix: impl AsRef<[u8]>,
) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
pub fn list_prefix(&self, prefix: impl AsRef<[u8]>) -> ListTagsProgress {
self.list_with_opts(ListOptions::prefix(prefix.as_ref()))
.await
}

/// Lists all tags.
pub async fn list(&self) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
self.list_with_opts(ListOptions::all()).await
pub fn list(&self) -> ListTagsProgress {
self.list_with_opts(ListOptions::all())
}

/// Lists all tags with a hash_seq format.
pub async fn list_hash_seq(&self) -> irpc::Result<impl Stream<Item = super::Result<TagInfo>>> {
self.list_with_opts(ListOptions::hash_seq()).await
pub fn list_hash_seq(&self) -> ListTagsProgress {
self.list_with_opts(ListOptions::hash_seq())
}

/// Deletes a tag.
Expand Down
5 changes: 1 addition & 4 deletions src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ use entry_state::{DataLocation, OutboardLocation};
use gc::run_gc;
use import::{ImportEntry, ImportSource};
use irpc::channel::mpsc;
use meta::list_blobs;
use n0_future::{future::yield_now, io};
use nested_enum_utils::enum_conversions;
use range_collections::range_set::RangeSetRange;
Expand Down Expand Up @@ -507,9 +506,7 @@ impl Actor {
}
Command::ListBlobs(cmd) => {
trace!("{cmd:?}");
if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await {
self.spawn(list_blobs(snapshot, cmd));
}
self.db().send(cmd.into()).await.ok();
}
Command::Batch(cmd) => {
trace!("{cmd:?}");
Expand Down
4 changes: 2 additions & 2 deletions src/store/fs/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub(super) async fn gc_mark_task(
}
let mut roots = HashSet::new();
trace!("traversing tags");
let mut tags = store.tags().list().await?;
let mut tags = store.tags().list().stream();
while let Some(tag) = tags.next().await {
let info = tag?;
trace!("adding root {:?} {:?}", info.name, info.hash_and_format());
Expand Down Expand Up @@ -85,7 +85,7 @@ async fn gc_sweep_task(
live: &HashSet<Hash>,
co: &Co<GcSweepEvent>,
) -> crate::api::Result<()> {
let mut blobs = store.blobs().list().stream().await?;
let mut blobs = store.blobs().list().stream();
let mut count = 0;
let mut batch = Vec::new();
while let Some(hash) = blobs.next().await {
Expand Down
Loading
Loading