diff --git a/Cargo.toml b/Cargo.toml index e5a895f..218481f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ bytes = "1" tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", features = ["compat"], optional = true } num-traits = "0.2" -dashmap = "5" +scc = "2.3.4" crossbeam-queue = "0.3" uuid = { version = "1", features = ["v4"] } regex = { version = "1", default-features = false, features = [ diff --git a/src/backend.rs b/src/backend.rs index c1fef3e..49f18db 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -7,7 +7,6 @@ use crate::{ use async_trait::async_trait; use crossbeam_queue::SegQueue; -use dashmap::DashMap; use futures::channel::mpsc; use futures::SinkExt; use parking_lot::Mutex; @@ -19,7 +18,7 @@ pub(crate) struct Peer { } pub(crate) struct GenericSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, fair_queue_inner: Option>>>, pub(crate) round_robin: SegQueue, socket_type: SocketType, @@ -34,7 +33,7 @@ impl GenericSocketBackend { options: SocketOptions, ) -> Self { Self { - peers: DashMap::new(), + peers: scc::HashMap::new(), fair_queue_inner, round_robin: SegQueue::new(), socket_type, @@ -63,7 +62,7 @@ impl GenericSocketBackend { } }, }; - let send_result = match self.peers.get_mut(&next_peer_id) { + let send_result = match self.peers.get_async(&next_peer_id).await { Some(mut peer) => peer.send_queue.send(message).await, None => continue, }; @@ -103,7 +102,9 @@ impl SocketBackend for GenericSocketBackend { impl MultiPeerBackend for GenericSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.insert(peer_id.clone(), Peer { send_queue }); + self.peers + .upsert_async(peer_id.clone(), Peer { send_queue }) + .await; self.round_robin.push(peer_id.clone()); match &self.fair_queue_inner { None => {} diff --git a/src/pub.rs b/src/pub.rs index 0db5d30..62bcbca 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -10,7 +10,6 @@ use crate::{ }; use async_trait::async_trait; -use dashmap::DashMap; use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; @@ -27,7 +26,7 @@ pub(crate) struct Subscriber { } pub(crate) struct PubSocketBackend { - subscribers: DashMap, + subscribers: scc::HashMap, socket_monitor: Mutex>>, socket_options: SocketOptions, } @@ -53,7 +52,7 @@ impl PubSocketBackend { Some(1) => { // Subscribe self.subscribers - .get_mut(peer_id) + .get(peer_id) .unwrap() .subscriptions .push(Vec::from(&data[1..])); @@ -77,7 +76,7 @@ impl PubSocketBackend { } if let Some(index) = del_index { self.subscribers - .get_mut(peer_id) + .get(peer_id) .unwrap() .subscriptions .remove(index); @@ -115,14 +114,16 @@ impl MultiPeerBackend for PubSocketBackend { let (mut recv_queue, send_queue) = io.into_parts(); // TODO provide handling for recv_queue let (sender, stop_receiver) = oneshot::channel(); - self.subscribers.insert( - peer_id.clone(), - Subscriber { - subscriptions: vec![], - send_queue: Box::pin(send_queue), - _subscription_coro_stop: sender, - }, - ); + self.subscribers + .upsert_async( + peer_id.clone(), + Subscriber { + subscriptions: vec![], + send_queue: Box::pin(send_queue), + _subscription_coro_stop: sender, + }, + ) + .await; let backend = self; let peer_id = peer_id.clone(); async_rt::task::spawn(async move { @@ -173,7 +174,8 @@ impl Drop for PubSocket { impl SocketSend for PubSocket { async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> { let mut dead_peers = Vec::new(); - for mut subscriber in self.backend.subscribers.iter_mut() { + let mut iter = self.backend.subscribers.first_entry_async().await; + while let Some(mut subscriber) = iter { for sub_filter in &subscriber.subscriptions { if sub_filter.len() <= message.get(0).unwrap().len() && sub_filter.as_slice() == &message.get(0).unwrap()[0..sub_filter.len()] @@ -205,6 +207,7 @@ impl SocketSend for PubSocket { break; } } + iter = subscriber.next_async().await; } for peer in dead_peers { self.backend.peer_disconnected(&peer); @@ -220,7 +223,7 @@ impl Socket for PubSocket { fn with_options(options: SocketOptions) -> Self { Self { backend: Arc::new(PubSocketBackend { - subscribers: DashMap::new(), + subscribers: scc::HashMap::new(), socket_monitor: Mutex::new(None), socket_options: options, }), diff --git a/src/rep.rs b/src/rep.rs index eb2e85a..5d6716f 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -7,7 +7,6 @@ use crate::*; use crate::{SocketType, ZmqResult}; use async_trait::async_trait; -use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use parking_lot::Mutex; @@ -20,7 +19,7 @@ struct RepPeer { } struct RepSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, fair_queue_inner: Arc>>, socket_monitor: Mutex>>, socket_options: SocketOptions, @@ -46,7 +45,7 @@ impl Socket for RepSocket { let fair_queue = FairQueue::new(true); Self { backend: Arc::new(RepSocketBackend { - peers: DashMap::new(), + peers: scc::HashMap::new(), fair_queue_inner: fair_queue.inner(), socket_monitor: Mutex::new(None), socket_options: options, @@ -78,13 +77,15 @@ impl MultiPeerBackend for RepSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.insert( - peer_id.clone(), - RepPeer { - _identity: peer_id.clone(), - send_queue, - }, - ); + self.peers + .upsert_async( + peer_id.clone(), + RepPeer { + _identity: peer_id.clone(), + send_queue, + }, + ) + .await; self.fair_queue_inner .lock() .insert(peer_id.clone(), recv_queue); @@ -121,7 +122,7 @@ impl SocketSend for RepSocket { async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> { match self.current_request.take() { Some(peer_id) => { - if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) { + if let Some(mut peer) = self.backend.peers.get_async(&peer_id).await { if let Some(envelope) = self.envelope.take() { message.prepend(&envelope); } diff --git a/src/req.rs b/src/req.rs index f9da50a..8c9920d 100644 --- a/src/req.rs +++ b/src/req.rs @@ -9,14 +9,13 @@ use crate::{SocketType, ZmqResult}; use async_trait::async_trait; use bytes::Bytes; use crossbeam_queue::SegQueue; -use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use std::collections::HashMap; use std::sync::Arc; struct ReqSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, pub(crate) round_robin: SegQueue, socket_monitor: Mutex>>, socket_options: SocketOptions, @@ -58,7 +57,7 @@ impl SocketSend for ReqSocket { }) } }; - if let Some(mut peer) = self.backend.peers.get_mut(&next_peer_id) { + if let Some(mut peer) = self.backend.peers.get_async(&next_peer_id).await { self.backend.round_robin.push(next_peer_id.clone()); message.push_front(Bytes::new()); peer.send_queue.send(Message::Message(message)).await?; @@ -74,7 +73,7 @@ impl SocketRecv for ReqSocket { async fn recv(&mut self) -> ZmqResult { match self.current_request.take() { Some(peer_id) => { - if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) { + if let Some(mut peer) = self.backend.peers.get_async(&peer_id).await { match peer.recv_queue.next().await { Some(Ok(Message::Message(mut m))) => { if m.len() < 2 { @@ -110,7 +109,7 @@ impl Socket for ReqSocket { fn with_options(options: SocketOptions) -> Self { Self { backend: Arc::new(ReqSocketBackend { - peers: DashMap::new(), + peers: scc::HashMap::new(), round_robin: SegQueue::new(), socket_monitor: Mutex::new(None), socket_options: options, @@ -139,14 +138,16 @@ impl Socket for ReqSocket { impl MultiPeerBackend for ReqSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.insert( - peer_id.clone(), - Peer { - _identity: peer_id.clone(), - send_queue, - recv_queue, - }, - ); + self.peers + .upsert_async( + peer_id.clone(), + Peer { + _identity: peer_id.clone(), + send_queue, + recv_queue, + }, + ) + .await; self.round_robin.push(peer_id.clone()); } diff --git a/src/router.rs b/src/router.rs index 235a514..1aa8ddc 100644 --- a/src/router.rs +++ b/src/router.rs @@ -92,7 +92,7 @@ impl SocketSend for RouterSocket { async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> { assert!(message.len() > 1); let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?; - match self.backend.peers.get_mut(&peer_id) { + match self.backend.peers.get_async(&peer_id).await { Some(mut peer) => { peer.send_queue.send(Message::Message(message)).await?; Ok(()) diff --git a/src/sub.rs b/src/sub.rs index a1a46a7..e3083cc 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -14,7 +14,6 @@ use crate::{ use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use crossbeam_queue::SegQueue; -use dashmap::DashMap; use futures::channel::mpsc; use futures::{SinkExt, StreamExt}; use parking_lot::Mutex; @@ -28,7 +27,7 @@ pub enum SubBackendMsgType { } pub(crate) struct SubSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, fair_queue_inner: Option>>>, pub(crate) round_robin: SegQueue, socket_type: SocketType, @@ -44,7 +43,7 @@ impl SubSocketBackend { options: SocketOptions, ) -> Self { Self { - peers: DashMap::new(), + peers: scc::HashMap::new(), fair_queue_inner, round_robin: SegQueue::new(), socket_type, @@ -97,7 +96,9 @@ impl MultiPeerBackend for SubSocketBackend { send_queue.send(Message::Message(message)).await.unwrap(); } - self.peers.insert(peer_id.clone(), Peer { send_queue }); + self.peers + .upsert_async(peer_id.clone(), Peer { send_queue }) + .await; self.round_robin.push(peer_id.clone()); match &self.fair_queue_inner { None => {} @@ -143,11 +144,13 @@ impl SubSocket { msg_type: SubBackendMsgType, ) -> ZmqResult<()> { let message: ZmqMessage = SubSocketBackend::create_subs_message(subscription, msg_type); + let mut iter = self.backend.peers.first_entry_async().await; - for mut peer in self.backend.peers.iter_mut() { + while let Some(mut peer) = iter { peer.send_queue .send(Message::Message(message.clone())) .await?; + iter = peer.next_async().await; } Ok(()) }