Skip to content

Commit 2c68093

Browse files
authored
feat(transport): start working on transport metering & metrics (#102)
2 parents cbdbcce + 54eba05 commit 2c68093

File tree

19 files changed

+430
-53
lines changed

19 files changed

+430
-53
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@
44
*.svg
55
*.log
66
.DS_Store
7+
8+
todo.md

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[workspace]
22
members = [
3-
"msg",
4-
"msg-socket",
5-
"msg-wire",
6-
"msg-transport",
7-
"msg-common",
8-
"msg-sim",
3+
"msg",
4+
"msg-socket",
5+
"msg-wire",
6+
"msg-transport",
7+
"msg-common",
8+
"msg-sim",
99
]
1010
resolver = "2"
1111

@@ -19,19 +19,19 @@ authors = ["Chainbound Developers <dev@chainbound.io>"]
1919
homepage = "https://github.com/chainbound/msg-rs"
2020
repository = "https://github.com/chainbound/msg-rs"
2121
keywords = [
22-
"messaging",
23-
"distributed",
24-
"systems",
25-
"networking",
26-
"quic",
27-
"quinn",
28-
"tokio",
29-
"async",
30-
"simulation",
31-
"pnet",
32-
"udp",
33-
"tcp",
34-
"socket",
22+
"messaging",
23+
"distributed",
24+
"systems",
25+
"networking",
26+
"quic",
27+
"quinn",
28+
"tokio",
29+
"async",
30+
"simulation",
31+
"pnet",
32+
"udp",
33+
"tcp",
34+
"socket",
3535
]
3636

3737
[workspace.dependencies]
@@ -56,12 +56,14 @@ thiserror = "1"
5656
tracing = "0.1"
5757
rustc-hash = "1"
5858
rand = "0.8"
59+
libc = "0.2"
5960
derive_more = { version = "2.0.1", features = [
60-
"from",
61-
"into",
62-
"deref",
63-
"deref_mut",
61+
"from",
62+
"into",
63+
"deref",
64+
"deref_mut",
6465
] }
66+
arc-swap = "1.7.1"
6567

6668
# networking
6769
quinn = "0.11.9"

msg-socket/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ rustc-hash.workspace = true
2525
tracing.workspace = true
2626
tokio-stream.workspace = true
2727
parking_lot.workspace = true
28+
arc-swap.workspace = true
29+
30+
derive_more = { workspace = true, features = ["deref"] }
2831

2932
[dev-dependencies]
3033
rand.workspace = true

msg-socket/src/req/driver.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tracing::{debug, error, trace};
2020
use super::{Command, ReqError, ReqOptions};
2121
use crate::{ConnectionState, ExponentialBackoff, req::SocketState};
2222

23-
use msg_transport::{Address, Transport};
23+
use msg_transport::{Address, MeteredIo, PeerAddress, Transport};
2424
use msg_wire::{
2525
auth,
2626
compression::{Compressor, try_decompress_payload},
@@ -31,15 +31,16 @@ use msg_wire::{
3131
type ConnectionTask<Io, Err> = Pin<Box<dyn Future<Output = Result<Io, Err>> + Send>>;
3232

3333
/// A connection controller that manages the connection to a server with an exponential backoff.
34-
type ConnectionCtl<Io, Addr> = ConnectionState<Framed<Io, reqrep::Codec>, ExponentialBackoff, Addr>;
34+
type ConnectionCtl<Io, S, A> =
35+
ConnectionState<Framed<MeteredIo<Io, S, A>, reqrep::Codec>, ExponentialBackoff, A>;
3536

3637
/// The request socket driver. Endless future that drives
37-
/// the the socket forward.
38+
/// the socket forward.
3839
pub(crate) struct ReqDriver<T: Transport<A>, A: Address> {
3940
/// Options shared with the socket.
4041
pub(crate) options: Arc<ReqOptions>,
4142
/// State shared with the socket.
42-
pub(crate) socket_state: Arc<SocketState>,
43+
pub(crate) socket_state: SocketState<T::Stats>,
4344
/// ID counter for outgoing requests.
4445
pub(crate) id_counter: u32,
4546
/// Commands from the socket.
@@ -52,7 +53,7 @@ pub(crate) struct ReqDriver<T: Transport<A>, A: Address> {
5253
pub(crate) conn_task: Option<ConnectionTask<T::Io, T::Error>>,
5354
/// The transport controller, wrapped in a [`ConnectionState`] for backoff.
5455
/// The [`Framed`] object can send and receive messages from the socket.
55-
pub(crate) conn_state: ConnectionCtl<T::Io, A>,
56+
pub(crate) conn_state: ConnectionCtl<T::Io, T::Stats, A>,
5657
/// The outgoing message queue.
5758
pub(crate) egress_queue: VecDeque<reqrep::Message>,
5859
/// The currently pending requests, if any. Uses [`FxHashMap`] for performance.
@@ -276,7 +277,12 @@ where
276277
this.conn_task = None;
277278

278279
if let Ok(io) = result {
279-
let mut framed = Framed::new(io, reqrep::Codec::new());
280+
tracing::debug!(target = ?io.peer_addr(), "new connection");
281+
282+
let metered =
283+
MeteredIo::new(io, Arc::clone(&this.socket_state.transport_stats));
284+
285+
let mut framed = Framed::new(metered, reqrep::Codec::new());
280286
framed.set_backpressure_boundary(this.options.backpressure_boundary);
281287
this.conn_state = ConnectionState::Active { channel: framed };
282288
}

msg-socket/src/req/mod.rs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1+
use arc_swap::ArcSwap;
12
use bytes::Bytes;
2-
use std::time::Duration;
3+
use std::{sync::Arc, time::Duration};
34
use thiserror::Error;
45
use tokio::sync::oneshot;
56

@@ -57,7 +58,8 @@ pub struct ReqOptions {
5758
/// Default is `None`, and the connection is flushed after every send.
5859
flush_interval: Option<std::time::Duration>,
5960
/// The maximum number of bytes that can be buffered in the session before being flushed.
60-
/// This internally sets [`Framed::set_backpressure_boundary`](tokio_util::codec::Framed).
61+
/// This internally sets
62+
/// [`Framed::set_backpressure_boundary`](tokio_util::codec::Framed::set_backpressure_boundary).
6163
backpressure_boundary: usize,
6264
/// The maximum number of retry attempts. If `None`, the connection will retry indefinitely.
6365
retry_attempts: Option<usize>,
@@ -180,7 +182,19 @@ impl ReqMessage {
180182
}
181183

182184
/// The request socket state, shared between the backend task and the socket.
185+
/// Generic over the transport-level stats type.
183186
#[derive(Debug, Default)]
184-
pub(crate) struct SocketState {
185-
pub(crate) stats: SocketStats<ReqStats>,
187+
pub(crate) struct SocketState<S: Default> {
188+
/// The socket stats.
189+
pub(crate) stats: Arc<SocketStats<ReqStats>>,
190+
/// The transport-level stats. We wrap the inner stats in an `Arc`
191+
/// for cheap clone on read.
192+
pub(crate) transport_stats: Arc<ArcSwap<S>>,
193+
}
194+
195+
// Manual clone implementation needed here because `S` is not `Clone`.
196+
impl<S: Default> Clone for SocketState<S> {
197+
fn clone(&self) -> Self {
198+
Self { stats: Arc::clone(&self.stats), transport_stats: self.transport_stats.clone() }
199+
}
186200
}

msg-socket/src/req/socket.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use arc_swap::Guard;
12
use bytes::Bytes;
23
use rustc_hash::FxHashMap;
34
use std::{marker::PhantomData, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
@@ -13,6 +14,7 @@ use super::{Command, DEFAULT_BUFFER_SIZE, ReqError, ReqOptions};
1314
use crate::{
1415
ConnectionState, ExponentialBackoff, ReqMessage,
1516
req::{SocketState, driver::ReqDriver, stats::ReqStats},
17+
stats::SocketStats,
1618
};
1719

1820
/// The request socket.
@@ -24,7 +26,7 @@ pub struct ReqSocket<T: Transport<A>, A: Address> {
2426
/// Options for the socket. These are shared with the backend task.
2527
options: Arc<ReqOptions>,
2628
/// Socket state. This is shared with the backend task.
27-
state: Arc<SocketState>,
29+
state: SocketState<T::Stats>,
2830
/// Optional message compressor. This is shared with the backend task.
2931
// NOTE: for now we're using dynamic dispatch, since using generics here
3032
// complicates the API a lot. We can always change this later for perf reasons.
@@ -70,7 +72,7 @@ where
7072
to_driver: None,
7173
transport: Some(transport),
7274
options: Arc::new(options),
73-
state: Arc::new(SocketState::default()),
75+
state: SocketState::default(),
7476
compressor: None,
7577
_marker: PhantomData,
7678
}
@@ -82,8 +84,14 @@ where
8284
self
8385
}
8486

85-
pub fn stats(&self) -> &ReqStats {
86-
&self.state.stats.specific
87+
/// Returns the socket stats.
88+
pub fn stats(&self) -> &SocketStats<ReqStats> {
89+
&self.state.stats
90+
}
91+
92+
/// Get the latest transport-level stats snapshot.
93+
pub fn transport_stats(&self) -> Guard<Arc<T::Stats>> {
94+
self.state.transport_stats.load()
8795
}
8896

8997
pub async fn request(&self, message: Bytes) -> Result<Bytes, ReqError> {
@@ -128,7 +136,7 @@ where
128136
let driver: ReqDriver<T, A> = ReqDriver {
129137
addr: endpoint,
130138
options: Arc::clone(&self.options),
131-
socket_state: Arc::clone(&self.state),
139+
socket_state: self.state.clone(),
132140
id_counter: 0,
133141
from_socket,
134142
transport,

msg-socket/src/stats.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use std::fmt::Debug;
22

3-
#[derive(Debug)]
3+
use derive_more::Deref;
4+
5+
/// Statistics for a socket.
6+
#[derive(Debug, Deref)]
47
pub struct SocketStats<S> {
8+
/// Socket-specific stats.
59
pub(crate) specific: S,
610
}
711

msg-socket/src/sub/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl<A: Address> PubMessage<A> {
158158
}
159159

160160
/// The subscriber socket state, shared between the backend task and the socket frontend.
161-
#[derive(Debug)] // Should derive default fine now
161+
#[derive(Debug)]
162162
pub(crate) struct SocketState<A: Address> {
163163
pub(crate) stats: SocketStats<SubStats<A>>,
164164
}
@@ -184,7 +184,7 @@ mod tests {
184184
use super::*;
185185

186186
async fn spawn_listener() -> SocketAddr {
187-
let listener = TcpListener::bind("0.0.0.0:0").await.unwrap();
187+
let listener = TcpListener::bind("[::]:0").await.unwrap();
188188

189189
let addr = listener.local_addr().unwrap();
190190

msg-transport/Cargo.toml

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,36 @@ msg-common.workspace = true
1616
async-trait.workspace = true
1717
futures.workspace = true
1818
tokio.workspace = true
19-
tokio-openssl.workspace = true
2019
tracing.workspace = true
21-
thiserror.workspace = true
22-
derive_more.workspace = true
20+
arc-swap.workspace = true
21+
22+
libc.workspace = true
23+
24+
thiserror = { workspace = true, optional = true }
25+
derive_more = { workspace = true, features = [
26+
"from",
27+
"into",
28+
"deref",
29+
"deref_mut",
30+
], optional = true }
2331

2432
# QUIC
2533
quinn = { workspace = true, optional = true }
2634
rcgen = { workspace = true, optional = true }
2735

2836
# TLS
2937
openssl = { workspace = true, optional = true }
38+
tokio-openssl = { workspace = true, optional = true }
3039

3140
[dev-dependencies]
3241
tracing-subscriber = "0.3"
3342

3443
[features]
3544
default = []
36-
quic = ["dep:quinn", "dep:rcgen"]
37-
tcp-tls = ["dep:openssl"]
45+
quic = ["dep:quinn", "dep:rcgen", "dep:thiserror"]
46+
tcp-tls = [
47+
"dep:openssl",
48+
"dep:derive_more",
49+
"dep:thiserror",
50+
"dep:tokio-openssl",
51+
]

0 commit comments

Comments
 (0)