Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 86 additions & 29 deletions crates/core/protobuf/restate/network.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,28 +44,27 @@ message Header {
optional SpanContext span_context = 7;
}

message SpanContext {
map<string, string> fields = 1;
}
message SpanContext { map<string, string> fields = 1; }

// direction is defined from the lens of the connection initiator
enum ConnectionDirection {
// By default, it's assumed this is a bidirectional connection to maintain
// compatibility with v1.2
ConnectionDirection_UNKNOWN = 0;
// Connection is declared by initiator as bidirectional. Bidirectional
// connections are used to send and receive rpc and unary messages by both ends
// of the connection.
// connections are used to send and receive rpc and unary messages by both
// ends of the connection.
BIDIRECTIONAL = 1;
// Connection is declared by initiator that it's used by initiator to send rpc
// requests and unary messages to the peer (acceptor). The acceptor side should *not*
// use this connection to send requests back to us. Only servicing requests. This is
// a typical case for a server who wants to run a side dedicated connection to a peer
// or for a client session connecting to a server.
// requests and unary messages to the peer (acceptor). The acceptor side
// should *not* use this connection to send requests back to us. Only
// servicing requests. This is a typical case for a server who wants to run a
// side dedicated connection to a peer or for a client session connecting to a
// server.
FORWARD = 2;
// Connection is declared as a reverse connection by the initiator. A reverse connection
// is used to receive requests from peers. The acceptor can use this connection to send
// rpc requests to us.
// Connection is declared as a reverse connection by the initiator. A reverse
// connection is used to receive requests from peers. The acceptor can use
// this connection to send rpc requests to us.
//
// This can be used to initiate connections to a remote node that doesn't have
// network access to initiate connections back to us.
Expand All @@ -86,7 +85,7 @@ message Hello {
// the purpose of this connection. Default is GENERAL.
Swimlane swimlane = 6;

// a unique fingerprint for this cluster. It will be respected by the receiver
// a unique fingerprint for this cluster. It will be respected by the receiver
// if it was a non-zero value.
uint64 cluster_fingerprint = 7;
}
Expand All @@ -96,9 +95,9 @@ message Welcome {
restate.common.ProtocolVersion protocol_version = 2;
// generational node id of sender
restate.common.GenerationalNodeId my_node_id = 3;
// confirmation that this connection respects the direction state in the Hello message.
// if this is unset (UNKNOWN) then it's equivalent to if the Hello message had `BIDIRECTIONAL`
// for backward compatibility.
// confirmation that this connection respects the direction state in the Hello
// message. if this is unset (UNKNOWN) then it's equivalent to if the Hello
// message had `BIDIRECTIONAL` for backward compatibility.
ConnectionDirection direction_ack = 4;
}

Expand Down Expand Up @@ -151,6 +150,8 @@ message Datagram {
WatchUpdate watch_update = 4;
Ping ping = 5;
Pong pong = 6;
StreamInbound stream_inbound = 7;
StreamOutbound stream_outbound = 8;

// one way message
Unary unary = 100;
Expand All @@ -172,9 +173,9 @@ message Watch {
message WatchUpdate {
enum Start {
// Receiver will not respond to this request for unknown reason
// This is often returned when the receiver sent us a status that we don't recognize
// in this protocol version.
// The request *might* have been processed
// This is often returned when the receiver sent us a status that we don't
// recognize in this protocol version. The request *might* have been
// processed
Start_UNKNOWN = 0;
// Service identifier was not registered on this node
// Request has not been processed
Expand All @@ -201,9 +202,9 @@ message WatchUpdate {

enum End {
// Receiver will not respond to this request for unknown reason
// This is often returned when the receiver sent us a status that we don't recognize
// in this protocol version.
// The request *might* have been processed
// This is often returned when the receiver sent us a status that we don't
// recognize in this protocol version. The request *might* have been
// processed
End_UNKNOWN = 0;
// Watch was cleanly stopped
OK = 1;
Expand Down Expand Up @@ -231,9 +232,9 @@ message RpcCall {
message RpcReply {
enum Status {
// Receiver will not respond to this request for unknown reason
// This is often returned when the receiver sent us a status that we don't recognize
// in this protocol version.
// The request *might* have been processed
// This is often returned when the receiver sent us a status that we don't
// recognize in this protocol version. The request *might* have been
// processed
Status_UNKNOWN = 0;
// Service identifier was not registered on this node
// Request has not been processed
Expand Down Expand Up @@ -277,10 +278,66 @@ message Unary {
bytes payload = 100;
}

message Ping {
uint64 timestamp = 1;
message Ping { uint64 timestamp = 1; }

message Pong { uint64 timestamp = 1; }

enum StreamStatus {
// Receiver will not respond to this request for unknown reason
// This is often returned when the receiver sent us a status that we don't
// recognize in this protocol version. The request *might* have been
// processed
StreamStatus_UNKNOWN = 0;
// Service identifier was not registered on this node
// Request has not been processed
SERVICE_NOT_FOUND = 1;
// Service has stopped processing requests
// Request has not been processed
SERVICE_STOPPED = 2;
// Service is known but it has not started yet
// Request has not been processed
SERVICE_NOT_READY = 3;
// Service is known but it didn't recognize the sort code
// Request has not been processed
SORT_CODE_NOT_FOUND = 4;
// The received dropped the response handle
// The request *might* have been processed
STREAM_DROPPED = 5;
// The received dropped the response handle
// Channel has been closed
STREAM_NOT_FOUND = 6;
// Service did not process the request due to backpressure
// Request has not been processed
LOAD_SHEDDING = 7;
// Message type was unrecognized by the receiver
// Request has not been processed
MESSAGE_UNRECOGNIZED = 8;
// Channel with the same id is already open
ALREADY_OPEN = 9;
}

message Pong {
uint64 timestamp = 1;
message StreamInbound {

message Open {
restate.common.ServiceTag service = 1;
// routing code of the target inbox if applicable, otherwise, defaults to 0.
optional uint64 sort_code = 2;
// the type-name of the unary message (variant in the service)
string msg_type = 3;
}

uint64 stream_id = 1;
oneof body {
StreamStatus status = 2;
Open open = 3;
bytes payload = 4;
}
}

message StreamOutbound {
uint64 stream_id = 1;
oneof body {
StreamStatus status = 2;
bytes payload = 4;
}
}
104 changes: 104 additions & 0 deletions crates/core/src/network/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@

mod throttle;

use restate_types::net::StreamRequest;
use restate_types::net::codec::EncodeError;
// re-export
pub use throttle::ConnectThrottle;

use std::marker::PhantomData;
use std::sync::Arc;

use metrics::counter;
Expand All @@ -32,7 +34,9 @@ use crate::Metadata;
use crate::TaskId;
use crate::TaskKind;
use crate::network::PeerMetadataVersion;
use crate::network::TypedStream;
use crate::network::metric_definitions::NETWORK_CONNECTION_CREATED;
use crate::network::protobuf::network::{self, stream_inbound};

use super::ConnectError;
use super::ConnectionClosed;
Expand Down Expand Up @@ -150,6 +154,92 @@ impl OwnedSendPermit {
}
}

pub struct StreamOpenPermit {
connection: Connection,
permit: mpsc::OwnedPermit<EgressMessage>,
}

impl StreamOpenPermit {
/// Opens a stream with the specified receive capacity.
///
/// If the internal buffer fills because the caller stops consuming messages,
/// subsequent messages are dropped and a LoadShedding error is automatically
/// returned to the peer.
pub async fn open<M: StreamRequest>(
self,
capacity: usize,
sort_code: Option<u64>,
) -> Result<(StreamSink<M>, TypedStream<M::Response>), ConnectionClosed> {
let (msg, stream_id_rx, message_rx) = EgressMessage::make_open_stream_message::<M>(
capacity,
sort_code,
self.connection.protocol_version,
);

self.permit.send(msg);

let id = stream_id_rx.await.map_err(|_| ConnectionClosed)?;
let typed_stream = TypedStream::new(message_rx, self.connection.protocol_version);
Ok((
StreamSink {
id,
connection: Some(self.connection),
_phantom: PhantomData,
},
typed_stream,
))
}
}

pub struct StreamSink<M: StreamRequest> {
// stream id
id: u64,
connection: Option<Connection>,
_phantom: PhantomData<M>,
}

impl<M: StreamRequest> StreamSink<M> {
pub async fn send(&self, message: M) -> Result<(), ConnectionClosed> {
let Some(connection) = self.connection.as_ref() else {
// unreachable!
return Err(ConnectionClosed);
};

let permit = connection.reserve().await.ok_or(ConnectionClosed)?;

let bytes = message
.encode_to_bytes(connection.protocol_version)
.expect("stream message to encode");
let msg = EgressMessage::make_stream_message(self.id, stream_inbound::Body::Payload(bytes));

permit.send(msg);
Ok(())
}
}

impl<M: StreamRequest> Drop for StreamSink<M> {
fn drop(&mut self) {
let Some(connection) = self.connection.take() else {
return;
};

let id = self.id;
tokio::spawn(async move {
let Some(permit) = connection.reserve().await else {
//connection closed already
return;
};

let msg = EgressMessage::make_stream_message(
id,
stream_inbound::Body::Status(network::StreamStatus::StreamDropped as i32),
);

permit.send(msg);
});
}
}

/// A single streaming connection with a channel to the peer. A connection can be
/// opened by either ends of the connection and has no direction. Any connection
/// can be used to send or receive from a peer.
Expand Down Expand Up @@ -415,6 +505,20 @@ impl Connection {
})
}

/// Reserves capacity for opening a stream.
///
/// Streams remain bound to their originating connection, so this call consumes
/// the connection. Clone the connection before invoking [`Connection::stream`] if
/// you need to reuse the connection.
#[must_use]
pub async fn stream(self) -> Option<StreamOpenPermit> {
let permit = self.sender.clone().reserve_owned().await.ok()?;
Some(StreamOpenPermit {
connection: self,
permit,
})
}

/// Tries to allocate capacity to send one message on this connection.
/// Returns None if the connection was closed or is at capacity.
#[must_use]
Expand Down
52 changes: 52 additions & 0 deletions crates/core/src/network/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,18 @@ use restate_types::net::{CURRENT_PROTOCOL_VERSION, MIN_SUPPORTED_PROTOCOL_VERSIO
use restate_types::nodes_config::NodesConfigError;

use crate::ShutdownError;
use crate::network::protobuf::network;

use super::protobuf::network::{rpc_reply, watch_update};

#[derive(Debug, thiserror::Error)]
#[error("connection closed")]
pub struct ConnectionClosed;

#[derive(Debug, thiserror::Error)]
#[error("stream closed")]
pub struct StreamClosed;

#[derive(Debug, thiserror::Error)]
pub enum RouterError {
#[error("target service has not started serving requests yet")]
Expand All @@ -51,6 +56,43 @@ impl From<RouterError> for rpc_reply::Status {
}
}

#[derive(Debug, thiserror::Error)]
pub enum BidiStreamError {
#[error("stream already open with the same id")]
AlreadyOpen,
#[error("unknown stream id")]
StreamNotFound,
#[error("stream dropped")]
StreamDropped,
#[error("message dropped")]
LoadShedding,
#[error(transparent)]
Router(#[from] RouterError),
}

impl From<RouterError> for network::StreamStatus {
fn from(value: RouterError) -> Self {
match value {
RouterError::ServiceNotReady => Self::ServiceNotReady,
RouterError::ServiceStopped => Self::ServiceStopped,
RouterError::CapacityExceeded => Self::LoadShedding,
RouterError::ServiceNotFound => Self::ServiceNotFound,
RouterError::MessageUnrecognized => Self::MessageUnrecognized,
}
}
}

impl From<BidiStreamError> for network::StreamStatus {
fn from(value: BidiStreamError) -> Self {
match value {
BidiStreamError::Router(err) => err.into(),
BidiStreamError::AlreadyOpen => Self::AlreadyOpen,
BidiStreamError::StreamNotFound => Self::StreamNotFound,
BidiStreamError::StreamDropped => Self::StreamDropped,
BidiStreamError::LoadShedding => Self::LoadShedding,
}
}
}
/// A type to communicate rpc processing error to the caller
///
/// This is used by service handlers to report back that they are not able to process
Expand Down Expand Up @@ -90,6 +132,16 @@ impl From<Verdict> for watch_update::Start {
}
}

impl From<Verdict> for network::StreamStatus {
fn from(value: Verdict) -> Self {
match value {
Verdict::LoadShedding => Self::LoadShedding,
Verdict::SortCodeNotFound => Self::SortCodeNotFound,
Verdict::MessageUnrecognized => Self::MessageUnrecognized,
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum NetworkError {
#[error("operation aborted, node is shutting down")]
Expand Down
Loading
Loading