@@ -68,23 +68,34 @@ impl Client {
68
68
pub async fn request ( & self , req : Request ) -> Result < Response > {
69
69
let timeout_nano = req. timeout_nano ;
70
70
let stream_id = self . next_stream_id . fetch_add ( 2 , Ordering :: Relaxed ) ;
71
-
72
- let msg: GenMessage = Message :: new_request ( stream_id, req) ?
73
- . try_into ( )
74
- . map_err ( |err : std:: io:: Error | Error :: Others ( err. to_string ( ) ) ) ?;
71
+ let msg: GenMessage ;
72
+ #[ cfg( not( feature = "prost" ) ) ]
73
+ {
74
+ msg = Message :: new_request ( stream_id, req) ?
75
+ . try_into ( )
76
+ . map_err ( |err : protobuf:: Error | Error :: Others ( err. to_string ( ) ) ) ?;
77
+ }
78
+
79
+ #[ cfg( feature = "prost" ) ]
80
+ {
81
+ msg = Message :: new_request ( stream_id, req) ?
82
+ . try_into ( )
83
+ . map_err ( |err : std:: io:: Error | Error :: Others ( err. to_string ( ) ) ) ?;
84
+ }
75
85
76
86
let ( tx, mut rx) : ( ResultSender , ResultReceiver ) = mpsc:: channel ( 100 ) ;
77
-
87
+
78
88
self . streams
79
89
. lock ( )
80
90
. map_err ( |_| Error :: Others ( "Failed to acquire lock on streams" . to_string ( ) ) ) ?
81
91
. insert ( stream_id, tx) ;
82
-
92
+
83
93
self . req_tx
84
94
. send ( SendingMessage :: new ( msg) )
85
95
. await
86
96
. map_err ( |_| Error :: LocalClosed ) ?;
87
-
97
+
98
+ #[ allow( clippy:: unnecessary_lazy_evaluations) ]
88
99
let result = if timeout_nano == 0 {
89
100
rx. recv ( )
90
101
. await
@@ -134,20 +145,28 @@ impl Client {
134
145
let stream_id = self . next_stream_id . fetch_add ( 2 , Ordering :: Relaxed ) ;
135
146
let is_req_payload_empty = req. payload . is_empty ( ) ;
136
147
148
+ #[ cfg( not( feature = "prost" ) ) ]
137
149
let mut msg: GenMessage = Message :: new_request ( stream_id, req) ?
138
150
. try_into ( )
139
- . map_err ( |e : protobuf:: Error | Error :: Others ( e. to_string ( ) ) ) ?;
151
+ . map_err ( |err : protobuf:: Error | Error :: Others ( err. to_string ( ) ) ) ?;
152
+
140
153
#[ cfg( feature = "prost" ) ]
141
- let mut msg: GenMessage = Message :: new_request ( stream_id, req)
154
+ let mut msg: GenMessage = Message :: new_request ( stream_id, req) ?
142
155
. try_into ( )
143
156
. map_err ( |err : std:: io:: Error | Error :: Others ( err. to_string ( ) ) ) ?;
144
157
145
158
if streaming_client {
146
159
if !is_req_payload_empty {
160
+ #[ cfg( not( feature = "prost" ) ) ]
147
161
return Err ( get_rpc_status (
148
162
Code :: INVALID_ARGUMENT ,
149
163
"Creating a ClientStream and sending payload at the same time is not allowed" ,
150
164
) ) ;
165
+ #[ cfg( feature = "prost" ) ]
166
+ return Err ( get_rpc_status (
167
+ Code :: Unknown ,
168
+ "Creating a ClientStream and sending payload at the same time is not allowed" ,
169
+ ) ) ;
151
170
}
152
171
msg. header . add_flags ( FLAG_REMOTE_OPEN | FLAG_NO_DATA ) ;
153
172
} else {
0 commit comments