@@ -12,20 +12,24 @@ use anyhow::{anyhow, Context, Result};
12
12
use bytes:: Bytes ;
13
13
use futures_util:: FutureExt ;
14
14
use iroh_blobs:: Hash ;
15
+ use irpc:: channel:: mpsc;
15
16
use serde:: { Deserialize , Serialize } ;
16
17
use tokio:: { sync:: oneshot, task:: JoinSet } ;
17
18
use tracing:: { debug, error, error_span, trace, warn} ;
18
19
19
20
use crate :: {
20
21
metrics:: Metrics ,
21
22
ranger:: Message ,
23
+ rpc2:: {
24
+ protocol:: { AuthorListResponse , ListResponse } ,
25
+ RpcError , RpcResult ,
26
+ } ,
22
27
store:: {
23
28
fs:: { ContentHashesIterator , StoreInstance } ,
24
29
DownloadPolicy , ImportNamespaceOutcome , Query , Store ,
25
30
} ,
26
- Author , AuthorHeads , AuthorId , Capability , CapabilityKind , ContentStatus ,
27
- ContentStatusCallback , Event , NamespaceId , NamespaceSecret , PeerIdBytes , Replica , ReplicaInfo ,
28
- SignedEntry , SyncOutcome ,
31
+ Author , AuthorHeads , AuthorId , Capability , ContentStatus , ContentStatusCallback , Event ,
32
+ NamespaceId , NamespaceSecret , PeerIdBytes , Replica , ReplicaInfo , SignedEntry , SyncOutcome ,
29
33
} ;
30
34
31
35
const ACTION_CAP : usize = 1024 ;
@@ -60,12 +64,12 @@ enum Action {
60
64
#[ display( "ListAuthors" ) ]
61
65
ListAuthors {
62
66
#[ debug( "reply" ) ]
63
- reply : async_channel :: Sender < Result < AuthorId > > ,
67
+ reply : mpsc :: Sender < RpcResult < AuthorListResponse > > ,
64
68
} ,
65
69
#[ display( "ListReplicas" ) ]
66
70
ListReplicas {
67
71
#[ debug( "reply" ) ]
68
- reply : async_channel :: Sender < Result < ( NamespaceId , CapabilityKind ) > > ,
72
+ reply : mpsc :: Sender < RpcResult < ListResponse > > ,
69
73
} ,
70
74
#[ display( "ContentHashes" ) ]
71
75
ContentHashes {
@@ -165,7 +169,7 @@ enum ReplicaAction {
165
169
} ,
166
170
GetMany {
167
171
query : Query ,
168
- reply : async_channel :: Sender < Result < SignedEntry > > ,
172
+ reply : mpsc :: Sender < RpcResult < SignedEntry > > ,
169
173
} ,
170
174
DropReplica {
171
175
reply : oneshot:: Sender < Result < ( ) > > ,
@@ -290,6 +294,7 @@ impl SyncHandle {
290
294
}
291
295
292
296
pub async fn open ( & self , namespace : NamespaceId , opts : OpenOpts ) -> Result < ( ) > {
297
+ tracing:: debug!( "SyncHandle::open called" ) ;
293
298
let ( reply, rx) = oneshot:: channel ( ) ;
294
299
let action = ReplicaAction :: Open { reply, opts } ;
295
300
self . send_replica ( namespace, action) . await ?;
@@ -443,7 +448,7 @@ impl SyncHandle {
443
448
& self ,
444
449
namespace : NamespaceId ,
445
450
query : Query ,
446
- reply : async_channel :: Sender < Result < SignedEntry > > ,
451
+ reply : mpsc :: Sender < RpcResult < SignedEntry > > ,
447
452
) -> Result < ( ) > {
448
453
let action = ReplicaAction :: GetMany { query, reply } ;
449
454
self . send_replica ( namespace, action) . await ?;
@@ -497,14 +502,14 @@ impl SyncHandle {
497
502
Ok ( store)
498
503
}
499
504
500
- pub async fn list_authors ( & self , reply : async_channel:: Sender < Result < AuthorId > > ) -> Result < ( ) > {
505
+ pub async fn list_authors (
506
+ & self ,
507
+ reply : mpsc:: Sender < RpcResult < AuthorListResponse > > ,
508
+ ) -> Result < ( ) > {
501
509
self . send ( Action :: ListAuthors { reply } ) . await
502
510
}
503
511
504
- pub async fn list_replicas (
505
- & self ,
506
- reply : async_channel:: Sender < Result < ( NamespaceId , CapabilityKind ) > > ,
507
- ) -> Result < ( ) > {
512
+ pub async fn list_replicas ( & self , reply : mpsc:: Sender < RpcResult < ListResponse > > ) -> Result < ( ) > {
508
513
self . send ( Action :: ListReplicas { reply } ) . await
509
514
}
510
515
@@ -696,15 +701,18 @@ impl Actor {
696
701
let iter = self
697
702
. store
698
703
. list_authors ( )
699
- . map ( |a| a. map ( |a| a. map ( |a| a. id ( ) ) ) ) ;
704
+ . map ( |a| a. map ( |a| a. map ( |a| AuthorListResponse { author_id : a. id ( ) } ) ) ) ;
700
705
self . tasks
701
- . spawn_local ( iter_to_channel_async ( reply, iter) . map ( |_| ( ) ) ) ;
706
+ . spawn_local ( iter_to_irpc ( reply, iter) . map ( |_| ( ) ) ) ;
702
707
Ok ( ( ) )
703
708
}
704
709
Action :: ListReplicas { reply } => {
705
710
let iter = self . store . list_namespaces ( ) ;
711
+ let iter = iter. map ( |inner| {
712
+ inner. map ( |res| res. map ( |( id, capability) | ListResponse { id, capability } ) )
713
+ } ) ;
706
714
self . tasks
707
- . spawn_local ( iter_to_channel_async ( reply, iter) . map ( |_| ( ) ) ) ;
715
+ . spawn_local ( iter_to_irpc ( reply, iter) . map ( |_| ( ) ) ) ;
708
716
Ok ( ( ) )
709
717
}
710
718
Action :: ContentHashes { reply } => {
@@ -838,7 +846,7 @@ impl Actor {
838
846
. ensure_open ( & namespace)
839
847
. and_then ( |_| self . store . get_many ( namespace, query) ) ;
840
848
self . tasks
841
- . spawn_local ( iter_to_channel_async ( reply, iter) . map ( |_| ( ) ) ) ;
849
+ . spawn_local ( iter_to_irpc ( reply, iter) . map ( |_| ( ) ) ) ;
842
850
Ok ( ( ) )
843
851
}
844
852
ReplicaAction :: DropReplica { reply } => send_reply_with ( reply, self , |this| {
@@ -984,6 +992,7 @@ impl OpenReplicas {
984
992
}
985
993
hash_map:: Entry :: Occupied ( mut e) => {
986
994
let state = e. get_mut ( ) ;
995
+ tracing:: debug!( "STATE {state:?}" ) ;
987
996
state. handles = state. handles . wrapping_sub ( 1 ) ;
988
997
if state. handles == 0 {
989
998
let _ = e. remove_entry ( ) ;
@@ -1001,14 +1010,18 @@ impl OpenReplicas {
1001
1010
}
1002
1011
}
1003
1012
1004
- async fn iter_to_channel_async < T : Send + ' static > (
1005
- channel : async_channel :: Sender < Result < T > > ,
1013
+ async fn iter_to_irpc < T : irpc :: RpcMessage > (
1014
+ channel : mpsc :: Sender < RpcResult < T > > ,
1006
1015
iter : Result < impl Iterator < Item = Result < T > > > ,
1007
1016
) -> Result < ( ) , SendReplyError > {
1008
1017
match iter {
1009
- Err ( err) => channel. send ( Err ( err) ) . await . map_err ( send_reply_error) ?,
1018
+ Err ( err) => channel
1019
+ . send ( Err ( RpcError :: new ( & * err) ) )
1020
+ . await
1021
+ . map_err ( send_reply_error) ?,
1010
1022
Ok ( iter) => {
1011
1023
for item in iter {
1024
+ let item = item. map_err ( |err| RpcError :: new ( & * err) ) ;
1012
1025
channel. send ( item) . await . map_err ( send_reply_error) ?;
1013
1026
}
1014
1027
}
0 commit comments