Skip to content

Commit 8cc32f0

Browse files
committed
[Fabric] bidirectional stream
Summary: This PR introduces a new network primitive "stream" which represents a bidirectional stream between peers. Once opened, the stream can be used to send any number of messages in both directions until one of the ends drops the stream, and the ther peer will get notified. ## Life Cycle - The stream initiator uses a Connection to open the stream - The initiator receives a "sink" to write messages over to remote peer, and a "stream" to receive incoming messages over. - When a stream is open, the receiver service will receive an "incoming" stream and "response" reciprocal. - Both peers can use their corresponding peers to write or receive messages from the remote peer until one of them drop the sending ends
1 parent c29a614 commit 8cc32f0

File tree

11 files changed

+1103
-41
lines changed

11 files changed

+1103
-41
lines changed

crates/core/protobuf/restate/network.proto

Lines changed: 86 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -44,28 +44,27 @@ message Header {
4444
optional SpanContext span_context = 7;
4545
}
4646

47-
message SpanContext {
48-
map<string, string> fields = 1;
49-
}
47+
message SpanContext { map<string, string> fields = 1; }
5048

5149
// direction is defined from the lens of the connection initiator
5250
enum ConnectionDirection {
5351
// By default, it's assumed this is a bidirectional connection to maintain
5452
// compatibility with v1.2
5553
ConnectionDirection_UNKNOWN = 0;
5654
// Connection is declared by initiator as bidirectional. Bidirectional
57-
// connections are used to send and receive rpc and unary messages by both ends
58-
// of the connection.
55+
// connections are used to send and receive rpc and unary messages by both
56+
// ends of the connection.
5957
BIDIRECTIONAL = 1;
6058
// Connection is declared by initiator that it's used by initiator to send rpc
61-
// requests and unary messages to the peer (acceptor). The acceptor side should *not*
62-
// use this connection to send requests back to us. Only servicing requests. This is
63-
// a typical case for a server who wants to run a side dedicated connection to a peer
64-
// or for a client session connecting to a server.
59+
// requests and unary messages to the peer (acceptor). The acceptor side
60+
// should *not* use this connection to send requests back to us. Only
61+
// servicing requests. This is a typical case for a server who wants to run a
62+
// side dedicated connection to a peer or for a client session connecting to a
63+
// server.
6564
FORWARD = 2;
66-
// Connection is declared as a reverse connection by the initiator. A reverse connection
67-
// is used to receive requests from peers. The acceptor can use this connection to send
68-
// rpc requests to us.
65+
// Connection is declared as a reverse connection by the initiator. A reverse
66+
// connection is used to receive requests from peers. The acceptor can use
67+
// this connection to send rpc requests to us.
6968
//
7069
// This can be used to initiate connections to a remote node that doesn't have
7170
// network access to initiate connections back to us.
@@ -86,7 +85,7 @@ message Hello {
8685
// the purpose of this connection. Default is GENERAL.
8786
Swimlane swimlane = 6;
8887

89-
// a unique fingerprint for this cluster. It will be respected by the receiver
88+
// a unique fingerprint for this cluster. It will be respected by the receiver
9089
// if it was a non-zero value.
9190
uint64 cluster_fingerprint = 7;
9291
}
@@ -96,9 +95,9 @@ message Welcome {
9695
restate.common.ProtocolVersion protocol_version = 2;
9796
// generational node id of sender
9897
restate.common.GenerationalNodeId my_node_id = 3;
99-
// confirmation that this connection respects the direction state in the Hello message.
100-
// if this is unset (UNKNOWN) then it's equivalent to if the Hello message had `BIDIRECTIONAL`
101-
// for backward compatibility.
98+
// confirmation that this connection respects the direction state in the Hello
99+
// message. if this is unset (UNKNOWN) then it's equivalent to if the Hello
100+
// message had `BIDIRECTIONAL` for backward compatibility.
102101
ConnectionDirection direction_ack = 4;
103102
}
104103

@@ -151,6 +150,8 @@ message Datagram {
151150
WatchUpdate watch_update = 4;
152151
Ping ping = 5;
153152
Pong pong = 6;
153+
StreamInbound stream_inbound = 7;
154+
StreamOutbound stream_outbound = 8;
154155

155156
// one way message
156157
Unary unary = 100;
@@ -172,9 +173,9 @@ message Watch {
172173
message WatchUpdate {
173174
enum Start {
174175
// Receiver will not respond to this request for unknown reason
175-
// This is often returned when the receiver sent us a status that we don't recognize
176-
// in this protocol version.
177-
// The request *might* have been processed
176+
// This is often returned when the receiver sent us a status that we don't
177+
// recognize in this protocol version. The request *might* have been
178+
// processed
178179
Start_UNKNOWN = 0;
179180
// Service identifier was not registered on this node
180181
// Request has not been processed
@@ -201,9 +202,9 @@ message WatchUpdate {
201202

202203
enum End {
203204
// Receiver will not respond to this request for unknown reason
204-
// This is often returned when the receiver sent us a status that we don't recognize
205-
// in this protocol version.
206-
// The request *might* have been processed
205+
// This is often returned when the receiver sent us a status that we don't
206+
// recognize in this protocol version. The request *might* have been
207+
// processed
207208
End_UNKNOWN = 0;
208209
// Watch was cleanly stopped
209210
OK = 1;
@@ -231,9 +232,9 @@ message RpcCall {
231232
message RpcReply {
232233
enum Status {
233234
// Receiver will not respond to this request for unknown reason
234-
// This is often returned when the receiver sent us a status that we don't recognize
235-
// in this protocol version.
236-
// The request *might* have been processed
235+
// This is often returned when the receiver sent us a status that we don't
236+
// recognize in this protocol version. The request *might* have been
237+
// processed
237238
Status_UNKNOWN = 0;
238239
// Service identifier was not registered on this node
239240
// Request has not been processed
@@ -277,10 +278,66 @@ message Unary {
277278
bytes payload = 100;
278279
}
279280

280-
message Ping {
281-
uint64 timestamp = 1;
281+
message Ping { uint64 timestamp = 1; }
282+
283+
message Pong { uint64 timestamp = 1; }
284+
285+
enum StreamStatus {
286+
// Receiver will not respond to this request for unknown reason
287+
// This is often returned when the receiver sent us a status that we don't
288+
// recognize in this protocol version. The request *might* have been
289+
// processed
290+
StreamStatus_UNKNOWN = 0;
291+
// Service identifier was not registered on this node
292+
// Request has not been processed
293+
SERVICE_NOT_FOUND = 1;
294+
// Service has stopped processing requests
295+
// Request has not been processed
296+
SERVICE_STOPPED = 2;
297+
// Service is known but it has not started yet
298+
// Request has not been processed
299+
SERVICE_NOT_READY = 3;
300+
// Service is known but it didn't recognize the sort code
301+
// Request has not been processed
302+
SORT_CODE_NOT_FOUND = 4;
303+
// The received dropped the response handle
304+
// The request *might* have been processed
305+
STREAM_DROPPED = 5;
306+
// The received dropped the response handle
307+
// Channel has been closed
308+
STREAM_NOT_FOUND = 6;
309+
// Service did not process the request due to backpressure
310+
// Request has not been processed
311+
LOAD_SHEDDING = 7;
312+
// Message type was unrecognized by the receiver
313+
// Request has not been processed
314+
MESSAGE_UNRECOGNIZED = 8;
315+
// Channel with the same id is already open
316+
ALREADY_OPEN = 9;
282317
}
283318

284-
message Pong {
285-
uint64 timestamp = 1;
319+
message StreamInbound {
320+
321+
message Open {
322+
restate.common.ServiceTag service = 1;
323+
// routing code of the target inbox if applicable, otherwise, defaults to 0.
324+
optional uint64 sort_code = 2;
325+
// the type-name of the unary message (variant in the service)
326+
string msg_type = 3;
327+
}
328+
329+
uint64 stream_id = 1;
330+
oneof body {
331+
StreamStatus status = 2;
332+
Open open = 3;
333+
bytes payload = 4;
334+
}
335+
}
336+
337+
message StreamOutbound {
338+
uint64 stream_id = 1;
339+
oneof body {
340+
StreamStatus status = 2;
341+
bytes payload = 4;
342+
}
286343
}

crates/core/src/network/connection.rs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010

1111
mod throttle;
1212

13+
use restate_types::net::StreamRequest;
1314
use restate_types::net::codec::EncodeError;
1415
// re-export
1516
pub use throttle::ConnectThrottle;
1617

18+
use std::marker::PhantomData;
1719
use std::sync::Arc;
1820

1921
use metrics::counter;
@@ -32,7 +34,9 @@ use crate::Metadata;
3234
use crate::TaskId;
3335
use crate::TaskKind;
3436
use crate::network::PeerMetadataVersion;
37+
use crate::network::TypedStream;
3538
use crate::network::metric_definitions::NETWORK_CONNECTION_CREATED;
39+
use crate::network::protobuf::network::{self, stream_inbound};
3640

3741
use super::ConnectError;
3842
use super::ConnectionClosed;
@@ -150,6 +154,92 @@ impl OwnedSendPermit {
150154
}
151155
}
152156

157+
pub struct StreamOpenPermit {
158+
connection: Connection,
159+
permit: mpsc::OwnedPermit<EgressMessage>,
160+
}
161+
162+
impl StreamOpenPermit {
163+
/// Opens a stream with the specified receive capacity.
164+
///
165+
/// If the internal buffer fills because the caller stops consuming messages,
166+
/// subsequent messages are dropped and a LoadShedding error is automatically
167+
/// returned to the peer.
168+
pub async fn open<M: StreamRequest>(
169+
self,
170+
capacity: usize,
171+
sort_code: Option<u64>,
172+
) -> Result<(StreamSink<M>, TypedStream<M::Response>), ConnectionClosed> {
173+
let (msg, stream_id_rx, message_rx) = EgressMessage::make_open_stream_message::<M>(
174+
capacity,
175+
sort_code,
176+
self.connection.protocol_version,
177+
);
178+
179+
self.permit.send(msg);
180+
181+
let id = stream_id_rx.await.map_err(|_| ConnectionClosed)?;
182+
let typed_stream = TypedStream::new(message_rx, self.connection.protocol_version);
183+
Ok((
184+
StreamSink {
185+
id,
186+
connection: Some(self.connection),
187+
_phantom: PhantomData,
188+
},
189+
typed_stream,
190+
))
191+
}
192+
}
193+
194+
pub struct StreamSink<M: StreamRequest> {
195+
// stream id
196+
id: u64,
197+
connection: Option<Connection>,
198+
_phantom: PhantomData<M>,
199+
}
200+
201+
impl<M: StreamRequest> StreamSink<M> {
202+
pub async fn send(&self, message: M) -> Result<(), ConnectionClosed> {
203+
let Some(connection) = self.connection.as_ref() else {
204+
// unreachable!
205+
return Err(ConnectionClosed);
206+
};
207+
208+
let permit = connection.reserve().await.ok_or(ConnectionClosed)?;
209+
210+
let bytes = message
211+
.encode_to_bytes(connection.protocol_version)
212+
.expect("stream message to encode");
213+
let msg = EgressMessage::make_stream_message(self.id, stream_inbound::Body::Payload(bytes));
214+
215+
permit.send(msg);
216+
Ok(())
217+
}
218+
}
219+
220+
impl<M: StreamRequest> Drop for StreamSink<M> {
221+
fn drop(&mut self) {
222+
let Some(connection) = self.connection.take() else {
223+
return;
224+
};
225+
226+
let id = self.id;
227+
tokio::spawn(async move {
228+
let Some(permit) = connection.reserve().await else {
229+
//connection closed already
230+
return;
231+
};
232+
233+
let msg = EgressMessage::make_stream_message(
234+
id,
235+
stream_inbound::Body::Status(network::StreamStatus::StreamDropped as i32),
236+
);
237+
238+
permit.send(msg);
239+
});
240+
}
241+
}
242+
153243
/// A single streaming connection with a channel to the peer. A connection can be
154244
/// opened by either ends of the connection and has no direction. Any connection
155245
/// can be used to send or receive from a peer.
@@ -415,6 +505,20 @@ impl Connection {
415505
})
416506
}
417507

508+
/// Reserves capacity for opening a stream.
509+
///
510+
/// Streams remain bound to their originating connection, so this call consumes
511+
/// the connection. Clone the connection before invoking [`Connection::stream`] if
512+
/// you need to reuse the connection.
513+
#[must_use]
514+
pub async fn stream(self) -> Option<StreamOpenPermit> {
515+
let permit = self.sender.clone().reserve_owned().await.ok()?;
516+
Some(StreamOpenPermit {
517+
connection: self,
518+
permit,
519+
})
520+
}
521+
418522
/// Tries to allocate capacity to send one message on this connection.
419523
/// Returns None if the connection was closed or is at capacity.
420524
#[must_use]

crates/core/src/network/error.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,18 @@ use restate_types::net::{CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSIO
1818
use restate_types::nodes_config::NodesConfigError;
1919

2020
use crate::ShutdownError;
21+
use crate::network::protobuf::network;
2122

2223
use super::protobuf::network::{rpc_reply, watch_update};
2324

2425
#[derive(Debug, thiserror::Error)]
2526
#[error("connection closed")]
2627
pub struct ConnectionClosed;
2728

29+
#[derive(Debug, thiserror::Error)]
30+
#[error("stream closed")]
31+
pub struct StreamClosed;
32+
2833
#[derive(Debug, thiserror::Error)]
2934
pub enum RouterError {
3035
#[error("target service has not started serving requests yet")]
@@ -51,6 +56,43 @@ impl From<RouterError> for rpc_reply::Status {
5156
}
5257
}
5358

59+
#[derive(Debug, thiserror::Error)]
60+
pub enum BidiStreamError {
61+
#[error("stream already open with the same id")]
62+
AlreadyOpen,
63+
#[error("unknown stream id")]
64+
StreamNotFound,
65+
#[error("stream dropped")]
66+
StreamDropped,
67+
#[error("message dropped")]
68+
LoadShedding,
69+
#[error(transparent)]
70+
Router(#[from] RouterError),
71+
}
72+
73+
impl From<RouterError> for network::StreamStatus {
74+
fn from(value: RouterError) -> Self {
75+
match value {
76+
RouterError::ServiceNotReady => Self::ServiceNotReady,
77+
RouterError::ServiceStopped => Self::ServiceStopped,
78+
RouterError::CapacityExceeded => Self::LoadShedding,
79+
RouterError::ServiceNotFound => Self::ServiceNotFound,
80+
RouterError::MessageUnrecognized => Self::MessageUnrecognized,
81+
}
82+
}
83+
}
84+
85+
impl From<BidiStreamError> for network::StreamStatus {
86+
fn from(value: BidiStreamError) -> Self {
87+
match value {
88+
BidiStreamError::Router(err) => err.into(),
89+
BidiStreamError::AlreadyOpen => Self::AlreadyOpen,
90+
BidiStreamError::StreamNotFound => Self::StreamNotFound,
91+
BidiStreamError::StreamDropped => Self::StreamDropped,
92+
BidiStreamError::LoadShedding => Self::LoadShedding,
93+
}
94+
}
95+
}
5496
/// A type to communicate rpc processing error to the caller
5597
///
5698
/// This is used by service handlers to report back that they are not able to process
@@ -90,6 +132,16 @@ impl From<Verdict> for watch_update::Start {
90132
}
91133
}
92134

135+
impl From<Verdict> for network::StreamStatus {
136+
fn from(value: Verdict) -> Self {
137+
match value {
138+
Verdict::LoadShedding => Self::LoadShedding,
139+
Verdict::SortCodeNotFound => Self::SortCodeNotFound,
140+
Verdict::MessageUnrecognized => Self::MessageUnrecognized,
141+
}
142+
}
143+
}
144+
93145
#[derive(Debug, thiserror::Error)]
94146
pub enum NetworkError {
95147
#[error("operation aborted, node is shutting down")]

0 commit comments

Comments
 (0)