Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions msg-socket/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use tokio::io::{AsyncRead, AsyncWrite};

use msg_transport::Address;

pub mod stats;

#[path = "pub/mod.rs"]
mod pubs;
pub use pubs::{PubError, PubOptions, PubSocket};
Expand Down
10 changes: 5 additions & 5 deletions msg-socket/src/pub/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ where
}
Err(e) => {
error!(err = ?e, "Error authenticating client");
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}

Expand All @@ -93,15 +93,15 @@ where
Ok(io) => {
if let Err(e) = this.on_incoming(io) {
error!(err = ?e, "Error accepting incoming connection");
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}
Err(e) => {
error!(err = ?e, "Error accepting incoming connection");

// Active clients have already been incremented in the initial call to
// `poll_accept`, so we need to decrement them here.
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}

Expand All @@ -112,15 +112,15 @@ where
// incoming connection tasks.
if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) {
if let Some(max) = this.options.max_clients {
if this.state.stats.active_clients() >= max {
if this.state.stats.specific.active_clients() >= max {
warn!("Max connections reached ({}), rejecting incoming connection", max);
continue;
}
}

// Increment the active clients counter. If the authentication fails,
// this counter will be decremented.
this.state.stats.increment_active_clients();
this.state.stats.specific.increment_active_clients();

this.conn_tasks.push(accept);

Expand Down
5 changes: 3 additions & 2 deletions msg-socket/src/pub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ mod socket;
pub use socket::*;

mod stats;
use stats::SocketStats;
use crate::stats::SocketStats;
use stats::PubStats;

mod trie;

Expand Down Expand Up @@ -167,7 +168,7 @@ impl PubMessage {
/// The publisher socket state, shared between the backend task and the socket.
#[derive(Debug, Default)]
pub(crate) struct SocketState {
pub(crate) stats: SocketStats,
pub(crate) stats: SocketStats<PubStats>,
}

#[cfg(test)]
Expand Down
4 changes: 2 additions & 2 deletions msg-socket/src/pub/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<Io: AsyncRead + AsyncWrite + Unpin> SubscriberSession<Io> {

impl<Io> Drop for SubscriberSession<Io> {
fn drop(&mut self) {
self.state.stats.decrement_active_clients();
self.state.stats.specific.decrement_active_clients();
}
}

Expand Down Expand Up @@ -157,7 +157,7 @@ impl<Io: AsyncRead + AsyncWrite + Unpin> Future for SubscriberSession<Io> {

match this.conn.start_send_unpin(msg) {
Ok(_) => {
this.state.stats.increment_tx(msg_len);
this.state.stats.specific.increment_tx(msg_len);

this.should_flush = true;
// We might be able to send more queued messages
Expand Down
8 changes: 4 additions & 4 deletions msg-socket/src/pub/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ use tokio::{
};
use tracing::{debug, trace, warn};

use super::{driver::PubDriver, stats::SocketStats, PubError, PubMessage, PubOptions, SocketState};
use super::{driver::PubDriver, stats::PubStats, PubError, PubMessage, PubOptions, SocketState};
use crate::Authenticator;

use msg_transport::{Address, Transport};
use msg_wire::compression::Compressor;

/// A publisher socket. This is thread-safe and can be cloned.
#[derive(Clone, Default)]
#[derive(Clone)]
pub struct PubSocket<T: Transport<A>, A: Address> {
/// The reply socket options, shared with the driver.
options: Arc<PubOptions>,
Expand Down Expand Up @@ -190,8 +190,8 @@ where
Ok(())
}

pub fn stats(&self) -> &SocketStats {
&self.state.stats
pub fn stats(&self) -> &PubStats {
&self.state.stats.specific
}

/// Returns the local address this socket is bound to. `None` if the socket is not bound.
Expand Down
26 changes: 12 additions & 14 deletions msg-socket/src/pub/stats.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,36 @@
use std::sync::atomic::{AtomicUsize, Ordering};

/// Statistics for a reply socket. These are shared between the driver task
/// and the socket.
#[derive(Debug, Default)]
pub struct SocketStats {
pub struct PubStats {
/// Total bytes sent
bytes_tx: AtomicUsize,
/// Total number of active request clients
active_clients: AtomicUsize,
}

impl SocketStats {
impl PubStats {
#[inline]
pub(crate) fn increment_tx(&self, bytes: usize) {
self.bytes_tx.fetch_add(bytes, Ordering::Relaxed);
pub fn bytes_tx(&self) -> usize {
self.bytes_tx.load(Ordering::Relaxed)
}

#[inline]
pub(crate) fn increment_active_clients(&self) {
self.active_clients.fetch_add(1, Ordering::Relaxed);
pub fn active_clients(&self) -> usize {
self.active_clients.load(Ordering::Relaxed)
}

#[inline]
pub(crate) fn decrement_active_clients(&self) {
self.active_clients.fetch_sub(1, Ordering::Relaxed);
pub(crate) fn increment_tx(&self, bytes: usize) {
self.bytes_tx.fetch_add(bytes, Ordering::Relaxed);
}

#[inline]
pub fn bytes_tx(&self) -> usize {
self.bytes_tx.load(Ordering::Relaxed)
pub(crate) fn increment_active_clients(&self) {
self.active_clients.fetch_add(1, Ordering::Relaxed);
}

#[inline]
pub fn active_clients(&self) -> usize {
self.active_clients.load(Ordering::Relaxed)
pub(crate) fn decrement_active_clients(&self) {
self.active_clients.fetch_sub(1, Ordering::Relaxed);
}
}
18 changes: 9 additions & 9 deletions msg-socket/src/rep/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ where
}
}

this.state.stats.increment_rx(size);
this.state.stats.specific.increment_rx(size);
let _ = this.to_socket.try_send(request);
}
Some(Err(e)) => {
error!(err = ?e, "Error receiving message from peer {:?}", peer);
}
None => {
warn!("Peer {:?} disconnected", peer);
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}

Expand Down Expand Up @@ -125,7 +125,7 @@ where
}
Err(e) => {
error!(err = ?e, "Error authenticating client");
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}

Expand All @@ -137,15 +137,15 @@ where
Ok(io) => {
if let Err(e) = this.on_incoming(io) {
error!(err = ?e, "Error accepting incoming connection");
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}
Err(e) => {
error!(err = ?e, "Error accepting incoming connection");

// Active clients have already been incremented in the initial call to
// `poll_accept`, so we need to decrement them here.
this.state.stats.decrement_active_clients();
this.state.stats.specific.decrement_active_clients();
}
}

Expand All @@ -156,7 +156,7 @@ where
// incoming connection tasks.
if let Poll::Ready(accept) = Pin::new(&mut this.transport).poll_accept(cx) {
if let Some(max) = this.options.max_clients {
if this.state.stats.active_clients() >= max {
if this.state.stats.specific.active_clients() >= max {
warn!(
"Max connections reached ({}), rejecting new incoming connection",
max
Expand All @@ -168,7 +168,7 @@ where

// Increment the active clients counter. If the authentication fails, this counter
// will be decremented.
this.state.stats.increment_active_clients();
this.state.stats.specific.increment_active_clients();

this.conn_tasks.push(accept);

Expand Down Expand Up @@ -271,14 +271,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin, A: Address + Unpin> Stream for PeerState
let msg_len = msg.size();
match this.conn.start_send_unpin(msg) {
Ok(_) => {
this.state.stats.increment_tx(msg_len);
this.state.stats.specific.increment_tx(msg_len);
this.should_flush = true;

// We might be able to send more queued messages
continue;
}
Err(e) => {
this.state.stats.increment_failed_requests();
this.state.stats.specific.increment_failed_requests();
error!(err = ?e, "Failed to send message to socket");
// End this stream as we can't send any more messages
return Poll::Ready(None);
Expand Down
5 changes: 3 additions & 2 deletions msg-socket/src/rep/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use tokio::sync::oneshot;
mod driver;
mod socket;
mod stats;
use crate::stats::SocketStats;
pub use socket::*;
use stats::SocketStats;
use stats::RepStats;

/// Errors that can occur when using a reply socket.
#[derive(Debug, Error)]
Expand Down Expand Up @@ -55,7 +56,7 @@ impl RepOptions {
/// The request socket state, shared between the backend task and the socket.
#[derive(Debug, Default)]
pub(crate) struct SocketState {
pub(crate) stats: SocketStats,
pub(crate) stats: SocketStats<RepStats>,
}

/// A request received by the socket.
Expand Down
9 changes: 5 additions & 4 deletions msg-socket/src/rep/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ use tokio_stream::StreamMap;
use tracing::{debug, warn};

use crate::{
rep::{driver::RepDriver, RepError, SocketState, SocketStats},
rep::{driver::RepDriver, RepError, SocketState},
Authenticator, RepOptions, Request, DEFAULT_BUFFER_SIZE,
};

use msg_transport::{Address, Transport};
use msg_wire::compression::Compressor;

use super::stats::RepStats;

/// A reply socket. This socket implements [`Stream`] and yields incoming [`Request`]s.
#[derive(Default)]
pub struct RepSocket<T: Transport<A>, A: Address> {
/// The reply socket options, shared with the driver.
options: Arc<RepOptions>,
Expand Down Expand Up @@ -143,8 +144,8 @@ where
}

/// Returns the statistics for this socket.
pub fn stats(&self) -> &SocketStats {
&self.state.stats
pub fn stats(&self) -> &RepStats {
&self.state.stats.specific
}

/// Returns the local address this socket is bound to. `None` if the socket is not bound.
Expand Down
4 changes: 2 additions & 2 deletions msg-socket/src/rep/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
/// Statistics for a reply socket.
/// These are shared between the driver task and the socket.
#[derive(Debug, Default)]
pub struct SocketStats {
pub struct RepStats {
/// Total bytes sent
bytes_tx: AtomicUsize,
/// Total bytes received
Expand All @@ -14,7 +14,7 @@ pub struct SocketStats {
failed_requests: AtomicUsize,
}

impl SocketStats {
impl RepStats {
#[inline]
pub(crate) fn increment_tx(&self, bytes: usize) {
self.bytes_tx.fetch_add(bytes, Ordering::Relaxed);
Expand Down
6 changes: 3 additions & 3 deletions msg-socket/src/req/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ where
let _ = pending.sender.send(Ok(payload));

// Update stats
self.socket_state.stats.update_rtt(rtt);
self.socket_state.stats.increment_rx(size);
self.socket_state.stats.specific.update_rtt(rtt);
self.socket_state.stats.specific.increment_rx(size);
}
}

Expand Down Expand Up @@ -346,7 +346,7 @@ where
debug!("Sending msg {}", msg.id());
match channel.start_send_unpin(msg) {
Ok(_) => {
this.socket_state.stats.increment_tx(size);
this.socket_state.stats.specific.increment_tx(size);
this.should_flush = true;
}
Err(e) => {
Expand Down
6 changes: 3 additions & 3 deletions msg-socket/src/req/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ use msg_wire::{
mod driver;
mod socket;
mod stats;
use driver::*;
pub use socket::*;

use self::stats::SocketStats;
use crate::stats::SocketStats;
use stats::ReqStats;

/// The default buffer size for the socket.
const DEFAULT_BUFFER_SIZE: usize = 1024;
Expand Down Expand Up @@ -182,5 +182,5 @@ impl ReqMessage {
/// The request socket state, shared between the backend task and the socket.
#[derive(Debug, Default)]
pub(crate) struct SocketState {
pub(crate) stats: SocketStats,
pub(crate) stats: SocketStats<ReqStats>,
}
8 changes: 4 additions & 4 deletions msg-socket/src/req/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use tokio::{
use msg_transport::{Address, Transport};
use msg_wire::compression::Compressor;

use super::{Command, ReqDriver, ReqError, ReqOptions, DEFAULT_BUFFER_SIZE};
use super::{Command, ReqError, ReqOptions, DEFAULT_BUFFER_SIZE};
use crate::{
req::{stats::SocketStats, SocketState},
req::{driver::ReqDriver, stats::ReqStats, SocketState},
ConnectionState, ExponentialBackoff, ReqMessage,
};

Expand Down Expand Up @@ -82,8 +82,8 @@ where
self
}

pub fn stats(&self) -> &SocketStats {
&self.state.stats
pub fn stats(&self) -> &ReqStats {
&self.state.stats.specific
}

pub async fn request(&self, message: Bytes) -> Result<Bytes, ReqError> {
Expand Down
4 changes: 2 additions & 2 deletions msg-socket/src/req/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
/// Statistics for a request socket. These are shared between the backend task
/// and the socket.
#[derive(Debug, Default)]
pub struct SocketStats {
pub struct ReqStats {
/// Total bytes sent
bytes_tx: AtomicUsize,
/// Total bytes received
Expand All @@ -14,7 +14,7 @@ pub struct SocketStats {
rtt_idx: AtomicUsize,
}

impl SocketStats {
impl ReqStats {
#[inline]
/// Atomically updates the RTT according to the CA formula:
/// CA = (rtt + n * prev_ca) / (n + 1)
Expand Down
Loading
Loading