Skip to content

feat: streaming ListTags rpc call #131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/expiring-tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:
// find tags to delete one by one and then delete them
//
// this allows us to print the tags before deleting them
let mut tags = blobs.tags().list().await?;
let mut tags = blobs.tags().list().stream();
let mut to_delete = Vec::new();
while let Some(tag) = tags.next().await {
let tag = tag?.name;
Expand Down Expand Up @@ -102,7 +102,7 @@ async fn delete_expired_tags(blobs: &Store, prefix: &str, bulk: bool) -> anyhow:

async fn print_store_info(store: &Store) -> anyhow::Result<()> {
let now = chrono::Utc::now();
let mut tags = store.tags().list().await?;
let mut tags = store.tags().list().stream();
println!(
"Current time: {}",
now.to_rfc3339_opts(chrono::SecondsFormat::Secs, true)
Expand All @@ -112,7 +112,7 @@ async fn print_store_info(store: &Store) -> anyhow::Result<()> {
let tag = tag?;
println!(" {tag:?}");
}
let mut blobs = store.list().stream().await?;
let mut blobs = store.list().stream();
println!("Blobs:");
while let Some(item) = blobs.next().await {
println!(" {}", item?);
Expand Down
51 changes: 34 additions & 17 deletions src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,13 @@ use super::{
ApiClient, RequestResult, Tags,
};
use crate::{
api::proto::{BatchRequest, ImportByteStreamUpdate},
api::proto::{BatchRequest, ImportByteStreamUpdate, ListBlobsItem},
provider::StreamContext,
store::IROH_BLOCK_SIZE,
util::temp_tag::TempTag,
util::{
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
temp_tag::TempTag,
},
BlobFormat, Hash, HashAndFormat,
};

Expand Down Expand Up @@ -835,34 +838,48 @@ impl ImportBaoHandle {

/// A progress handle for a blobs list operation.
pub struct BlobsListProgress {
inner: future::Boxed<irpc::Result<mpsc::Receiver<super::Result<Hash>>>>,
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListBlobsItem>>>,
}

impl BlobsListProgress {
fn new(
fut: impl Future<Output = irpc::Result<mpsc::Receiver<super::Result<Hash>>>> + Send + 'static,
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListBlobsItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
}
}

pub async fn hashes(self) -> RequestResult<Vec<Hash>> {
let mut rx: mpsc::Receiver<Result<Hash, super::Error>> = self.inner.await?;
let mut hashes = Vec::new();
while let Some(item) = rx.recv().await? {
hashes.push(item?);
pub async fn hashes(self) -> super::Result<Vec<Hash>> {
self.inner.try_collect().await
}

pub fn stream(self) -> impl Stream<Item = super::Result<Hash>> {
self.inner.into_stream()
}
}

impl IrpcStreamItem for ListBlobsItem {
type Error = super::Error;
type Item = Hash;

fn into_result_opt(self) -> Option<Result<Hash, super::Error>> {
match self {
Self::Item(hash) => Some(Ok(hash)),
Self::Error(e) => Some(Err(e)),
Self::Done => None,
}
Ok(hashes)
}

pub async fn stream(self) -> irpc::Result<impl Stream<Item = super::Result<Hash>>> {
let mut rx = self.inner.await?;
Ok(Gen::new(|co| async move {
while let Ok(Some(item)) = rx.recv().await {
co.yield_(item).await;
}
}))
fn from_result(item: std::result::Result<Hash, super::Error>) -> Self {
match item {
Ok(hash) => Self::Item(hash),
Err(e) => Self::Error(e),
}
}

fn done() -> Self {
Self::Done
}
}

Expand Down
153 changes: 148 additions & 5 deletions src/api/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
//! The file system store is quite complex and optimized, so to get started take a look at
//! the much simpler memory store.
use std::{
collections::HashSet,
fmt::{self, Debug},
future::{Future, IntoFuture},
io,
num::NonZeroU64,
ops::{Bound, RangeBounds},
Expand All @@ -32,13 +34,20 @@ use irpc::{
channel::{mpsc, oneshot},
rpc_requests,
};
use n0_future::Stream;
use n0_future::{future, Stream};
use range_collections::RangeSet2;
use serde::{Deserialize, Serialize};
pub(crate) mod bitfield;
pub use bitfield::Bitfield;

use crate::{store::util::Tag, util::temp_tag::TempTag, BlobFormat, Hash, HashAndFormat};
use crate::{
store::util::Tag,
util::{
irpc::{IrpcReceiverFutExt, IrpcStreamItem},
temp_tag::TempTag,
},
BlobFormat, Hash, HashAndFormat,
};

pub(crate) trait HashSpecific {
fn hash(&self) -> Hash;
Expand Down Expand Up @@ -89,7 +98,7 @@ impl HashSpecific for CreateTagMsg {
#[rpc_requests(message = Command, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
#[rpc(tx = mpsc::Sender<ListBlobsItem>)]
ListBlobs(ListRequest),
#[rpc(tx = oneshot::Sender<Scope>, rx = mpsc::Receiver<BatchResponse>)]
Batch(BatchRequest),
Expand All @@ -113,7 +122,7 @@ pub enum Request {
ImportPath(ImportPathRequest),
#[rpc(tx = mpsc::Sender<ExportProgressItem>)]
ExportPath(ExportPathRequest),
#[rpc(tx = oneshot::Sender<Vec<super::Result<TagInfo>>>)]
#[rpc(tx = mpsc::Sender<ListTagsItem>)]
ListTags(ListTagsRequest),
#[rpc(tx = oneshot::Sender<super::Result<()>>)]
SetTag(SetTagRequest),
Expand All @@ -123,7 +132,7 @@ pub enum Request {
RenameTag(RenameTagRequest),
#[rpc(tx = oneshot::Sender<super::Result<Tag>>)]
CreateTag(CreateTagRequest),
#[rpc(tx = oneshot::Sender<Vec<HashAndFormat>>)]
#[rpc(tx = mpsc::Sender<ListTempTagsItem>)]
ListTempTags(ListTempTagsRequest),
#[rpc(tx = oneshot::Sender<TempTag>)]
CreateTempTag(CreateTempTagRequest),
Expand Down Expand Up @@ -351,6 +360,109 @@ pub struct TagInfo {
pub hash: Hash,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ListBlobsItem {
Item(Hash),
Error(super::Error),
Done,
}

#[derive(Debug, Serialize, Deserialize)]
pub enum ListTagsItem {
Item(TagInfo),
Error(super::Error),
Done,
}

impl From<std::result::Result<TagInfo, super::Error>> for ListTagsItem {
fn from(item: std::result::Result<TagInfo, super::Error>) -> Self {
match item {
Ok(item) => ListTagsItem::Item(item),
Err(err) => ListTagsItem::Error(err),
}
}
}

impl IrpcStreamItem for ListTagsItem {
type Error = super::Error;
type Item = TagInfo;

fn into_result_opt(self) -> Option<Result<TagInfo, super::Error>> {
match self {
ListTagsItem::Item(item) => Some(Ok(item)),
ListTagsItem::Done => None,
ListTagsItem::Error(err) => Some(Err(err)),
}
}

fn from_result(item: std::result::Result<TagInfo, super::Error>) -> Self {
match item {
Ok(i) => Self::Item(i),
Err(e) => Self::Error(e),
}
}

fn done() -> Self {
Self::Done
}
}

pub struct ListTempTagsProgress {
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTempTagsItem>>>,
}

impl IntoFuture for ListTempTagsProgress {
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.inner.try_collect())
}

type IntoFuture = future::Boxed<Self::Output>;

type Output = super::Result<HashSet<HashAndFormat>>;
}

impl ListTempTagsProgress {
pub(super) fn new(
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTempTagsItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
}
}

pub fn stream(self) -> impl Stream<Item = super::Result<HashAndFormat>> {
self.inner.into_stream()
}
}

pub struct ListTagsProgress {
inner: future::Boxed<irpc::Result<mpsc::Receiver<ListTagsItem>>>,
}

impl IntoFuture for ListTagsProgress {
fn into_future(self) -> Self::IntoFuture {
Box::pin(self.inner.try_collect())
}

type IntoFuture = future::Boxed<Self::Output>;

type Output = super::Result<Vec<TagInfo>>;
}

impl ListTagsProgress {
pub(super) fn new(
fut: impl Future<Output = irpc::Result<mpsc::Receiver<ListTagsItem>>> + Send + 'static,
) -> Self {
Self {
inner: Box::pin(fut),
}
}

pub fn stream(self) -> impl Stream<Item = super::Result<TagInfo>> {
self.inner.into_stream()
}
}

impl From<TagInfo> for HashAndFormat {
fn from(tag_info: TagInfo) -> Self {
HashAndFormat {
Expand Down Expand Up @@ -410,6 +522,37 @@ pub struct CreateTempTagRequest {
#[derive(Debug, Serialize, Deserialize)]
pub struct ListTempTagsRequest;

#[derive(Debug, Serialize, Deserialize)]
pub enum ListTempTagsItem {
Item(HashAndFormat),
Error(super::Error),
Done,
}

impl IrpcStreamItem for ListTempTagsItem {
type Error = super::Error;
type Item = HashAndFormat;

fn into_result_opt(self) -> Option<Result<HashAndFormat, super::Error>> {
match self {
ListTempTagsItem::Item(item) => Some(Ok(item)),
ListTempTagsItem::Done => None,
ListTempTagsItem::Error(err) => Some(Err(err)),
}
}

fn from_result(item: std::result::Result<HashAndFormat, super::Error>) -> Self {
match item {
Ok(i) => Self::Item(i),
Err(e) => Self::Error(e),
}
}

fn done() -> Self {
Self::Done
}
}

/// Rename a tag atomically
#[derive(Debug, Serialize, Deserialize)]
pub struct RenameTagRequest {
Expand Down
Loading
Loading