Skip to content

Commit 93176c7

Browse files
committed
refactor: port to irpc
1 parent 58e5c4e commit 93176c7

19 files changed

+495
-2659
lines changed

src/actor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ use tokio::{sync::oneshot, task::JoinSet};
1818
use tracing::{debug, error, error_span, trace, warn};
1919

2020
use crate::{
21-
metrics::Metrics,
22-
ranger::Message,
23-
rpc2::{
21+
api::{
2422
protocol::{AuthorListResponse, ListResponse},
2523
RpcError, RpcResult,
2624
},
25+
metrics::Metrics,
26+
ranger::Message,
2727
store::{
2828
fs::{ContentHashesIterator, StoreInstance},
2929
DownloadPolicy, ImportNamespaceOutcome, Query, Store,

src/api.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
//! irpc-based RPC implementation for docs.
2+
3+
#![allow(missing_docs)]
4+
5+
pub(crate) mod actor;
6+
mod api;
7+
pub(crate) mod docs_handle_request;
8+
pub mod protocol;
9+
10+
pub use self::api::*;
11+
12+
pub type RpcError = serde_error::Error;
13+
pub type RpcResult<T> = std::result::Result<T, RpcError>;

src/rpc2.rs renamed to src/api/actor.rs

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,22 @@
1-
//! irpc-based RPC implementation for docs.
2-
3-
#![allow(missing_docs)]
4-
51
use std::sync::Arc;
62

7-
use api::DocsApi;
8-
use irpc::{LocalSender, WithChannels};
3+
use irpc::LocalSender;
4+
use irpc::WithChannels;
95
use n0_future::task::{self};
106
use tokio::sync::mpsc as tokio_mpsc;
117
use tracing::error;
128

139
use crate::engine::Engine;
1410

15-
use self::protocol::{DocsMessage, DocsService};
16-
17-
pub mod api;
18-
pub mod docs_handle_request;
19-
pub mod protocol;
20-
21-
impl std::ops::Deref for RpcActor {
22-
type Target = Engine;
23-
24-
fn deref(&self) -> &Self::Target {
25-
&self.engine
26-
}
27-
}
28-
29-
pub type RpcError = serde_error::Error;
30-
pub type RpcResult<T> = std::result::Result<T, RpcError>;
11+
use super::{
12+
protocol::{DocsMessage, DocsService},
13+
DocsApi,
14+
};
3115

3216
/// The docs RPC actor that handles incoming messages
33-
struct RpcActor {
34-
recv: tokio::sync::mpsc::Receiver<DocsMessage>,
35-
engine: Arc<Engine>,
17+
pub(crate) struct RpcActor {
18+
pub(crate) recv: tokio::sync::mpsc::Receiver<DocsMessage>,
19+
pub(crate) engine: Arc<Engine>,
3620
}
3721

3822
impl RpcActor {
@@ -46,14 +30,14 @@ impl RpcActor {
4630
}
4731
}
4832

49-
async fn run(mut self) {
33+
pub(crate) async fn run(mut self) {
5034
while let Some(msg) = self.recv.recv().await {
5135
tracing::trace!("handle rpc request: {msg:?}");
5236
self.handle(msg).await;
5337
}
5438
}
5539

56-
async fn handle(&mut self, msg: DocsMessage) {
40+
pub(crate) async fn handle(&mut self, msg: DocsMessage) {
5741
match msg {
5842
DocsMessage::Open(open) => {
5943
let WithChannels { tx, inner, .. } = open;
@@ -236,3 +220,11 @@ impl RpcActor {
236220
}
237221
}
238222
}
223+
224+
impl std::ops::Deref for RpcActor {
225+
type Target = Engine;
226+
227+
fn deref(&self) -> &Self::Target {
228+
&self.engine
229+
}
230+
}

src/rpc2/api.rs renamed to src/api/api.rs

Lines changed: 194 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,14 @@ use n0_future::task::{self, AbortOnDropHandle};
1010

1111
use crate::engine::Engine;
1212

13-
use crate::rpc2::protocol::DocsProtocol;
14-
15-
use super::{protocol, RpcActor};
13+
use super::{
14+
actor::RpcActor,
15+
protocol::{DocsMessage, DocsProtocol, DocsService},
16+
};
1617

1718
pub use self::client::*;
1819

19-
type Client = irpc::Client<protocol::DocsMessage, protocol::DocsProtocol, protocol::DocsService>;
20+
type Client = irpc::Client<DocsMessage, DocsProtocol, DocsService>;
2021

2122
/// API wrapper for the docs service
2223
#[derive(Debug, Clone)]
@@ -79,21 +80,29 @@ impl DocsApi {
7980
}
8081

8182
mod client {
82-
use std::sync::{
83-
atomic::{AtomicBool, Ordering},
84-
Arc,
83+
use std::{
84+
future::Future,
85+
path::Path,
86+
pin::Pin,
87+
sync::{
88+
atomic::{AtomicBool, Ordering},
89+
Arc,
90+
},
91+
task::{ready, Poll},
8592
};
8693

8794
use anyhow::Result;
8895
use bytes::Bytes;
8996
use iroh::NodeAddr;
90-
use iroh_blobs::Hash;
91-
use n0_future::{Stream, StreamExt};
97+
use iroh_blobs::{
98+
api::blobs::{AddPathOptions, AddProgressItem, ExportMode, ExportOptions, ExportProgress},
99+
Hash,
100+
};
101+
use n0_future::{FutureExt, Stream, StreamExt};
92102

93103
use crate::{
94104
actor::OpenState,
95-
engine::LiveEvent,
96-
rpc2::protocol::{
105+
api::protocol::{
97106
AddrInfoOptions, AuthorCreateRequest, AuthorDeleteRequest, AuthorExportRequest,
98107
AuthorGetDefaultRequest, AuthorImportRequest, AuthorListRequest,
99108
AuthorSetDefaultRequest, CloseRequest, CreateRequest, DelRequest, DropRequest,
@@ -102,6 +111,7 @@ mod client {
102111
SetHashRequest, SetRequest, ShareMode, ShareRequest, StartSyncRequest, StatusRequest,
103112
SubscribeRequest,
104113
},
114+
engine::LiveEvent,
105115
store::{DownloadPolicy, Query},
106116
Author, AuthorId, Capability, CapabilityKind, DocTicket, Entry, NamespaceId, PeerIdBytes,
107117
};
@@ -317,7 +327,7 @@ mod client {
317327
Ok(response.entry.content_hash())
318328
}
319329

320-
/// Sets an entries on the doc via its key, hash, and size.
330+
/// Sets an entry on the doc via its key, hash, and size.
321331
pub async fn set_hash(
322332
&self,
323333
author_id: AuthorId,
@@ -519,5 +529,177 @@ mod client {
519529
.await??;
520530
Ok(response.peers)
521531
}
532+
533+
/// Adds an entry from an absolute file path
534+
pub async fn import_file(
535+
&self,
536+
blobs: &iroh_blobs::api::Store,
537+
author: AuthorId,
538+
key: Bytes,
539+
path: impl AsRef<Path>,
540+
import_mode: iroh_blobs::api::blobs::ImportMode,
541+
) -> Result<ImportFileProgress> {
542+
self.ensure_open()?;
543+
let progress = blobs.add_path_with_opts(AddPathOptions {
544+
path: path.as_ref().to_owned(),
545+
format: iroh_blobs::BlobFormat::Raw,
546+
mode: import_mode,
547+
});
548+
let stream = progress.stream().await;
549+
let doc = self.clone();
550+
let ctx = EntryContext {
551+
doc,
552+
author,
553+
key,
554+
size: None,
555+
};
556+
Ok(ImportFileProgress(ImportInner::Blobs(
557+
Box::pin(stream),
558+
Some(ctx),
559+
)))
560+
}
561+
562+
/// Exports an entry as a file to a given absolute path.
563+
pub async fn export_file(
564+
&self,
565+
blobs: &iroh_blobs::api::Store,
566+
entry: Entry,
567+
path: impl AsRef<Path>,
568+
mode: ExportMode,
569+
) -> Result<ExportProgress> {
570+
self.ensure_open()?;
571+
let hash = entry.content_hash();
572+
let progress = blobs.export_with_opts(ExportOptions {
573+
hash,
574+
mode,
575+
target: path.as_ref().to_path_buf(),
576+
});
577+
Ok(progress)
578+
}
579+
}
580+
581+
///
582+
#[derive(Debug)]
583+
pub enum ImportFileProgressItem {
584+
Error(anyhow::Error),
585+
Blobs(AddProgressItem),
586+
Done(ImportFileOutcome),
587+
}
588+
589+
///
590+
#[derive(Debug)]
591+
pub struct ImportFileProgress(ImportInner);
592+
593+
#[derive(derive_more::Debug)]
594+
enum ImportInner {
595+
#[debug("Blobs")]
596+
Blobs(
597+
n0_future::boxed::BoxStream<AddProgressItem>,
598+
Option<EntryContext>,
599+
),
600+
#[debug("Entry")]
601+
Entry(n0_future::boxed::BoxFuture<Result<ImportFileOutcome>>),
602+
Done,
603+
}
604+
605+
struct EntryContext {
606+
doc: Doc,
607+
author: AuthorId,
608+
key: Bytes,
609+
size: Option<u64>,
610+
}
611+
612+
impl Stream for ImportFileProgress {
613+
type Item = ImportFileProgressItem;
614+
615+
fn poll_next(
616+
self: Pin<&mut Self>,
617+
cx: &mut std::task::Context<'_>,
618+
) -> Poll<Option<Self::Item>> {
619+
let this = self.get_mut();
620+
match this.0 {
621+
ImportInner::Blobs(ref mut progress, ref mut context) => {
622+
match ready!(progress.poll_next(cx)) {
623+
Some(item) => match item {
624+
AddProgressItem::Size(size) => {
625+
context
626+
.as_mut()
627+
.expect("Size must be emitted before done")
628+
.size = Some(size);
629+
Poll::Ready(Some(ImportFileProgressItem::Blobs(
630+
AddProgressItem::Size(size),
631+
)))
632+
}
633+
AddProgressItem::Error(err) => {
634+
*this = Self(ImportInner::Done);
635+
Poll::Ready(Some(ImportFileProgressItem::Error(err.into())))
636+
}
637+
AddProgressItem::Done(tag) => {
638+
let EntryContext {
639+
doc,
640+
author,
641+
key,
642+
size,
643+
} = context
644+
.take()
645+
.expect("AddProgressItem::Done may be emitted only once");
646+
let size = size.expect("Size must be emitted before done");
647+
let hash = *tag.hash();
648+
*this = Self(ImportInner::Entry(Box::pin(async move {
649+
doc.set_hash(author, key.clone(), hash, size).await?;
650+
Ok(ImportFileOutcome { hash, size, key })
651+
})));
652+
Poll::Ready(Some(ImportFileProgressItem::Blobs(
653+
AddProgressItem::Done(tag),
654+
)))
655+
}
656+
item => Poll::Ready(Some(ImportFileProgressItem::Blobs(item))),
657+
},
658+
None => todo!(),
659+
}
660+
}
661+
ImportInner::Entry(ref mut fut) => {
662+
let res = ready!(fut.poll(cx));
663+
*this = Self(ImportInner::Done);
664+
match res {
665+
Ok(outcome) => Poll::Ready(Some(ImportFileProgressItem::Done(outcome))),
666+
Err(err) => Poll::Ready(Some(ImportFileProgressItem::Error(err))),
667+
}
668+
}
669+
ImportInner::Done => Poll::Ready(None),
670+
}
671+
}
672+
}
673+
674+
impl Future for ImportFileProgress {
675+
type Output = Result<ImportFileOutcome>;
676+
fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
677+
loop {
678+
match self.as_mut().poll_next(cx) {
679+
Poll::Ready(Some(item)) => match item {
680+
ImportFileProgressItem::Error(error) => return Poll::Ready(Err(error)),
681+
ImportFileProgressItem::Blobs(_add_progress_item) => continue,
682+
ImportFileProgressItem::Done(outcome) => return Poll::Ready(Ok(outcome)),
683+
},
684+
Poll::Ready(None) => {
685+
return Poll::Ready(Err(anyhow::anyhow!(
686+
"ImportFileProgress polled after completion"
687+
)))
688+
}
689+
Poll::Pending => return Poll::Pending,
690+
}
691+
}
692+
}
693+
}
694+
695+
/// Outcome of a [`Doc::import_file`] operation
696+
#[derive(Debug, Clone, PartialEq, Eq)]
697+
pub struct ImportFileOutcome {
698+
/// The hash of the entry's content
699+
pub hash: Hash,
700+
/// The size of the entry
701+
pub size: u64,
702+
/// The key of the entry
703+
pub key: Bytes,
522704
}
523705
}

src/rpc2/docs_handle_request.rs renamed to src/api/docs_handle_request.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use iroh::Watcher;
44
use irpc::channel::mpsc;
55

66
use super::{
7+
actor::RpcActor,
78
protocol::{
89
AuthorCreateRequest, AuthorCreateResponse, AuthorDeleteRequest, AuthorDeleteResponse,
910
AuthorExportRequest, AuthorExportResponse, AuthorGetDefaultRequest,
@@ -18,7 +19,7 @@ use super::{
1819
StartSyncRequest, StartSyncResponse, StatusRequest, StatusResponse, SubscribeRequest,
1920
SubscribeResponse,
2021
},
21-
RpcActor, RpcError, RpcResult,
22+
RpcError, RpcResult,
2223
};
2324
use crate::{Author, DocTicket, NamespaceSecret, SignedEntry};
2425

File renamed without changes.

src/lib.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,12 @@ mod ticket;
4545

4646
#[cfg(feature = "engine")]
4747
pub mod engine;
48-
// #[cfg(feature = "rpc")]
49-
// pub mod rpc;
5048

5149
pub mod actor;
50+
pub mod api;
5251
pub mod store;
5352
pub mod sync;
5453

55-
pub mod rpc2;
56-
5754
mod heads;
5855
mod keys;
5956
mod ranger;

0 commit comments

Comments
 (0)