From 5d15e597db5c1b22748e6008c139f69a55932d19 Mon Sep 17 00:00:00 2001 From: remalloc Date: Sun, 25 May 2025 19:32:52 +0800 Subject: [PATCH 1/3] Replace dashmap to scc --- Cargo.toml | 2 +- src/backend.rs | 9 ++++----- src/pub.rs | 17 +++++++++-------- src/rep.rs | 11 +++++------ src/req.rs | 13 ++++++------- src/router.rs | 2 +- src/sub.rs | 11 ++++++----- 7 files changed, 32 insertions(+), 33 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e5a895f..218481f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ bytes = "1" tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", features = ["compat"], optional = true } num-traits = "0.2" -dashmap = "5" +scc = "2.3.4" crossbeam-queue = "0.3" uuid = { version = "1", features = ["v4"] } regex = { version = "1", default-features = false, features = [ diff --git a/src/backend.rs b/src/backend.rs index c1fef3e..c1ce993 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -7,7 +7,6 @@ use crate::{ use async_trait::async_trait; use crossbeam_queue::SegQueue; -use dashmap::DashMap; use futures::channel::mpsc; use futures::SinkExt; use parking_lot::Mutex; @@ -19,7 +18,7 @@ pub(crate) struct Peer { } pub(crate) struct GenericSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, fair_queue_inner: Option>>>, pub(crate) round_robin: SegQueue, socket_type: SocketType, @@ -34,7 +33,7 @@ impl GenericSocketBackend { options: SocketOptions, ) -> Self { Self { - peers: DashMap::new(), + peers: scc::HashMap::new(), fair_queue_inner, round_robin: SegQueue::new(), socket_type, @@ -63,7 +62,7 @@ impl GenericSocketBackend { } }, }; - let send_result = match self.peers.get_mut(&next_peer_id) { + let send_result = match self.peers.get_async(&next_peer_id).await { Some(mut peer) => peer.send_queue.send(message).await, None => continue, }; @@ -103,7 +102,7 @@ impl SocketBackend for GenericSocketBackend { impl MultiPeerBackend for GenericSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.insert(peer_id.clone(), Peer { send_queue }); + self.peers.upsert_async(peer_id.clone(), Peer { send_queue }).await; self.round_robin.push(peer_id.clone()); match &self.fair_queue_inner { None => {} diff --git a/src/pub.rs b/src/pub.rs index 0db5d30..30f6569 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -10,7 +10,6 @@ use crate::{ }; use async_trait::async_trait; -use dashmap::DashMap; use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, StreamExt}; use parking_lot::Mutex; @@ -27,7 +26,7 @@ pub(crate) struct Subscriber { } pub(crate) struct PubSocketBackend { - subscribers: DashMap, + subscribers: scc::HashMap, socket_monitor: Mutex>>, socket_options: SocketOptions, } @@ -53,7 +52,7 @@ impl PubSocketBackend { Some(1) => { // Subscribe self.subscribers - .get_mut(peer_id) + .get(peer_id) .unwrap() .subscriptions .push(Vec::from(&data[1..])); @@ -77,7 +76,7 @@ impl PubSocketBackend { } if let Some(index) = del_index { self.subscribers - .get_mut(peer_id) + .get(peer_id) .unwrap() .subscriptions .remove(index); @@ -115,14 +114,14 @@ impl MultiPeerBackend for PubSocketBackend { let (mut recv_queue, send_queue) = io.into_parts(); // TODO provide handling for recv_queue let (sender, stop_receiver) = oneshot::channel(); - self.subscribers.insert( + self.subscribers.upsert_async( peer_id.clone(), Subscriber { subscriptions: vec![], send_queue: Box::pin(send_queue), _subscription_coro_stop: sender, }, - ); + ).await; let backend = self; let peer_id = peer_id.clone(); async_rt::task::spawn(async move { @@ -173,7 +172,8 @@ impl Drop for PubSocket { impl SocketSend for PubSocket { async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> { let mut dead_peers = Vec::new(); - for mut subscriber in self.backend.subscribers.iter_mut() { + let mut iter = self.backend.subscribers.first_entry_async().await; + while let Some(mut subscriber) = iter { for sub_filter in &subscriber.subscriptions { if sub_filter.len() <= message.get(0).unwrap().len() && sub_filter.as_slice() == &message.get(0).unwrap()[0..sub_filter.len()] @@ -205,6 +205,7 @@ impl SocketSend for PubSocket { break; } } + iter = subscriber.next_async().await; } for peer in dead_peers { self.backend.peer_disconnected(&peer); @@ -220,7 +221,7 @@ impl Socket for PubSocket { fn with_options(options: SocketOptions) -> Self { Self { backend: Arc::new(PubSocketBackend { - subscribers: DashMap::new(), + subscribers: scc::HashMap::new(), socket_monitor: Mutex::new(None), socket_options: options, }), diff --git a/src/rep.rs b/src/rep.rs index 4eae8e7..b6717cf 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -7,7 +7,6 @@ use crate::*; use crate::{SocketType, ZmqResult}; use async_trait::async_trait; -use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use parking_lot::Mutex; @@ -20,7 +19,7 @@ struct RepPeer { } struct RepSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, fair_queue_inner: Arc>>, socket_monitor: Mutex>>, socket_options: SocketOptions, @@ -46,7 +45,7 @@ impl Socket for RepSocket { let fair_queue = FairQueue::new(true); Self { backend: Arc::new(RepSocketBackend { - peers: DashMap::new(), + peers: scc::HashMap::new(), fair_queue_inner: fair_queue.inner(), socket_monitor: Mutex::new(None), socket_options: options, @@ -78,13 +77,13 @@ impl MultiPeerBackend for RepSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.insert( + self.peers.upsert_async( peer_id.clone(), RepPeer { _identity: peer_id.clone(), send_queue, }, - ); + ).await; self.fair_queue_inner .lock() .insert(peer_id.clone(), recv_queue); @@ -121,7 +120,7 @@ impl SocketSend for RepSocket { async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> { match self.current_request.take() { Some(peer_id) => { - if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) { + if let Some(mut peer) = self.backend.peers.get_async(&peer_id).await { if let Some(envelope) = self.envelope.take() { message.prepend(&envelope); } diff --git a/src/req.rs b/src/req.rs index ad9f2fe..e62c702 100644 --- a/src/req.rs +++ b/src/req.rs @@ -9,14 +9,13 @@ use crate::{SocketType, ZmqResult}; use async_trait::async_trait; use bytes::Bytes; use crossbeam_queue::SegQueue; -use dashmap::DashMap; use futures::{SinkExt, StreamExt}; use std::collections::HashMap; use std::sync::Arc; struct ReqSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, pub(crate) round_robin: SegQueue, socket_monitor: Mutex>>, socket_options: SocketOptions, @@ -58,7 +57,7 @@ impl SocketSend for ReqSocket { }) } }; - match self.backend.peers.get_mut(&next_peer_id) { + match self.backend.peers.get_async(&next_peer_id).await { Some(mut peer) => { self.backend.round_robin.push(next_peer_id.clone()); message.push_front(Bytes::new()); @@ -77,7 +76,7 @@ impl SocketRecv for ReqSocket { async fn recv(&mut self) -> ZmqResult { match self.current_request.take() { Some(peer_id) => { - if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) { + if let Some(mut peer) = self.backend.peers.get_async(&peer_id).await { match peer.recv_queue.next().await { Some(Ok(Message::Message(mut m))) => { if m.len() < 2 { @@ -113,7 +112,7 @@ impl Socket for ReqSocket { fn with_options(options: SocketOptions) -> Self { Self { backend: Arc::new(ReqSocketBackend { - peers: DashMap::new(), + peers: scc::HashMap::new(), round_robin: SegQueue::new(), socket_monitor: Mutex::new(None), socket_options: options, @@ -142,14 +141,14 @@ impl Socket for ReqSocket { impl MultiPeerBackend for ReqSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.insert( + self.peers.upsert_async( peer_id.clone(), Peer { _identity: peer_id.clone(), send_queue, recv_queue, }, - ); + ).await; self.round_robin.push(peer_id.clone()); } diff --git a/src/router.rs b/src/router.rs index fda8a3f..94e1219 100644 --- a/src/router.rs +++ b/src/router.rs @@ -94,7 +94,7 @@ impl SocketSend for RouterSocket { async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> { assert!(message.len() > 1); let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?; - match self.backend.peers.get_mut(&peer_id) { + match self.backend.peers.get_async(&peer_id).await { Some(mut peer) => { peer.send_queue.send(Message::Message(message)).await?; Ok(()) diff --git a/src/sub.rs b/src/sub.rs index 72217dd..0d14c82 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -14,7 +14,6 @@ use crate::{ use async_trait::async_trait; use bytes::{BufMut, BytesMut}; use crossbeam_queue::SegQueue; -use dashmap::DashMap; use futures::channel::mpsc; use futures::{SinkExt, StreamExt}; use parking_lot::Mutex; @@ -28,7 +27,7 @@ pub enum SubBackendMsgType { } pub(crate) struct SubSocketBackend { - pub(crate) peers: DashMap, + pub(crate) peers: scc::HashMap, fair_queue_inner: Option>>>, pub(crate) round_robin: SegQueue, socket_type: SocketType, @@ -44,7 +43,7 @@ impl SubSocketBackend { options: SocketOptions, ) -> Self { Self { - peers: DashMap::new(), + peers: scc::HashMap::new(), fair_queue_inner, round_robin: SegQueue::new(), socket_type, @@ -97,7 +96,7 @@ impl MultiPeerBackend for SubSocketBackend { send_queue.send(Message::Message(message)).await.unwrap(); } - self.peers.insert(peer_id.clone(), Peer { send_queue }); + self.peers.upsert_async(peer_id.clone(), Peer { send_queue }).await; self.round_robin.push(peer_id.clone()); match &self.fair_queue_inner { None => {} @@ -143,11 +142,13 @@ impl SubSocket { msg_type: SubBackendMsgType, ) -> ZmqResult<()> { let message: ZmqMessage = SubSocketBackend::create_subs_message(subscription, msg_type); + let mut iter = self.backend.peers.first_entry_async().await; - for mut peer in self.backend.peers.iter_mut() { + while let Some(mut peer) = iter{ peer.send_queue .send(Message::Message(message.clone())) .await?; + iter = peer.next_async().await; } Ok(()) } From 28e7b129b16f931210bada4e361006d56898263e Mon Sep 17 00:00:00 2001 From: John Law Date: Thu, 13 Nov 2025 00:44:26 +0000 Subject: [PATCH 2/3] Run cargo fmt --- src/backend.rs | 4 +++- src/pub.rs | 18 ++++++++++-------- src/rep.rs | 16 +++++++++------- src/req.rs | 18 ++++++++++-------- src/sub.rs | 6 ++++-- 5 files changed, 36 insertions(+), 26 deletions(-) diff --git a/src/backend.rs b/src/backend.rs index c1ce993..49f18db 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -102,7 +102,9 @@ impl SocketBackend for GenericSocketBackend { impl MultiPeerBackend for GenericSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.upsert_async(peer_id.clone(), Peer { send_queue }).await; + self.peers + .upsert_async(peer_id.clone(), Peer { send_queue }) + .await; self.round_robin.push(peer_id.clone()); match &self.fair_queue_inner { None => {} diff --git a/src/pub.rs b/src/pub.rs index 30f6569..62bcbca 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -114,14 +114,16 @@ impl MultiPeerBackend for PubSocketBackend { let (mut recv_queue, send_queue) = io.into_parts(); // TODO provide handling for recv_queue let (sender, stop_receiver) = oneshot::channel(); - self.subscribers.upsert_async( - peer_id.clone(), - Subscriber { - subscriptions: vec![], - send_queue: Box::pin(send_queue), - _subscription_coro_stop: sender, - }, - ).await; + self.subscribers + .upsert_async( + peer_id.clone(), + Subscriber { + subscriptions: vec![], + send_queue: Box::pin(send_queue), + _subscription_coro_stop: sender, + }, + ) + .await; let backend = self; let peer_id = peer_id.clone(); async_rt::task::spawn(async move { diff --git a/src/rep.rs b/src/rep.rs index f79889d..5d6716f 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -77,13 +77,15 @@ impl MultiPeerBackend for RepSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.upsert_async( - peer_id.clone(), - RepPeer { - _identity: peer_id.clone(), - send_queue, - }, - ).await; + self.peers + .upsert_async( + peer_id.clone(), + RepPeer { + _identity: peer_id.clone(), + send_queue, + }, + ) + .await; self.fair_queue_inner .lock() .insert(peer_id.clone(), recv_queue); diff --git a/src/req.rs b/src/req.rs index 25c1149..8c9920d 100644 --- a/src/req.rs +++ b/src/req.rs @@ -138,14 +138,16 @@ impl Socket for ReqSocket { impl MultiPeerBackend for ReqSocketBackend { async fn peer_connected(self: Arc, peer_id: &PeerIdentity, io: FramedIo) { let (recv_queue, send_queue) = io.into_parts(); - self.peers.upsert_async( - peer_id.clone(), - Peer { - _identity: peer_id.clone(), - send_queue, - recv_queue, - }, - ).await; + self.peers + .upsert_async( + peer_id.clone(), + Peer { + _identity: peer_id.clone(), + send_queue, + recv_queue, + }, + ) + .await; self.round_robin.push(peer_id.clone()); } diff --git a/src/sub.rs b/src/sub.rs index 0847554..e3083cc 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -96,7 +96,9 @@ impl MultiPeerBackend for SubSocketBackend { send_queue.send(Message::Message(message)).await.unwrap(); } - self.peers.upsert_async(peer_id.clone(), Peer { send_queue }).await; + self.peers + .upsert_async(peer_id.clone(), Peer { send_queue }) + .await; self.round_robin.push(peer_id.clone()); match &self.fair_queue_inner { None => {} @@ -144,7 +146,7 @@ impl SubSocket { let message: ZmqMessage = SubSocketBackend::create_subs_message(subscription, msg_type); let mut iter = self.backend.peers.first_entry_async().await; - while let Some(mut peer) = iter{ + while let Some(mut peer) = iter { peer.send_queue .send(Message::Message(message.clone())) .await?; From f547b0d5da09dd1deb9e761a97f37acf21e2b4d6 Mon Sep 17 00:00:00 2001 From: John Law Date: Sun, 7 Dec 2025 04:02:05 +0000 Subject: [PATCH 3/3] Upversion scc to 3.4.4 --- Cargo.toml | 2 +- src/backend.rs | 4 ++-- src/pub.rs | 36 +++++++++--------------------------- src/rep.rs | 4 ++-- src/req.rs | 4 ++-- src/sub.rs | 6 +++--- 6 files changed, 19 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 218481f..84f019e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,7 @@ bytes = "1" tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.7", features = ["compat"], optional = true } num-traits = "0.2" -scc = "2.3.4" +scc = "3.4.4" crossbeam-queue = "0.3" uuid = { version = "1", features = ["v4"] } regex = { version = "1", default-features = false, features = [ diff --git a/src/backend.rs b/src/backend.rs index 49f18db..7b58366 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -90,7 +90,7 @@ impl SocketBackend for GenericSocketBackend { } fn shutdown(&self) { - self.peers.clear(); + self.peers.clear_sync(); } fn monitor(&self) -> &Mutex>> { @@ -115,7 +115,7 @@ impl MultiPeerBackend for GenericSocketBackend { } fn peer_disconnected(&self, peer_id: &PeerIdentity) { - self.peers.remove(peer_id); + self.peers.remove_sync(peer_id); match &self.fair_queue_inner { None => {} Some(inner) => { diff --git a/src/pub.rs b/src/pub.rs index 62bcbca..6af1956 100644 --- a/src/pub.rs +++ b/src/pub.rs @@ -51,36 +51,18 @@ impl PubSocketBackend { match data.first() { Some(1) => { // Subscribe - self.subscribers - .get(peer_id) - .unwrap() - .subscriptions - .push(Vec::from(&data[1..])); + if let Some(mut entry) = self.subscribers.get_sync(peer_id) { + entry.subscriptions.push(Vec::from(&data[1..])); + } } Some(0) => { // Unsubscribe - let mut del_index = None; let sub = Vec::from(&data[1..]); - for (idx, subscription) in self - .subscribers - .get(peer_id) - .unwrap() - .subscriptions - .iter() - .enumerate() - { - if &sub == subscription { - del_index = Some(idx); - break; + if let Some(mut entry) = self.subscribers.get_sync(peer_id) { + if let Some(index) = entry.subscriptions.iter().position(|s| s == &sub) { + entry.subscriptions.remove(index); } } - if let Some(index) = del_index { - self.subscribers - .get(peer_id) - .unwrap() - .subscriptions - .remove(index); - } } _ => log::warn!( "Received message with unexpected first byte: {:?}", @@ -100,7 +82,7 @@ impl SocketBackend for PubSocketBackend { } fn shutdown(&self) { - self.subscribers.clear(); + self.subscribers.clear_sync(); } fn monitor(&self) -> &Mutex>> { @@ -155,7 +137,7 @@ impl MultiPeerBackend for PubSocketBackend { fn peer_disconnected(&self, peer_id: &PeerIdentity) { log::info!("Client disconnected {:?}", peer_id); - self.subscribers.remove(peer_id); + self.subscribers.remove_sync(peer_id); } } @@ -174,7 +156,7 @@ impl Drop for PubSocket { impl SocketSend for PubSocket { async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> { let mut dead_peers = Vec::new(); - let mut iter = self.backend.subscribers.first_entry_async().await; + let mut iter = self.backend.subscribers.begin_async().await; while let Some(mut subscriber) = iter { for sub_filter in &subscriber.subscriptions { if sub_filter.len() <= message.get(0).unwrap().len() diff --git a/src/rep.rs b/src/rep.rs index 5d6716f..470ea9c 100644 --- a/src/rep.rs +++ b/src/rep.rs @@ -95,7 +95,7 @@ impl MultiPeerBackend for RepSocketBackend { if let Some(monitor) = self.monitor().lock().as_mut() { let _ = monitor.try_send(SocketEvent::Disconnected(peer_id.clone())); } - self.peers.remove(peer_id); + self.peers.remove_sync(peer_id); } } @@ -109,7 +109,7 @@ impl SocketBackend for RepSocketBackend { } fn shutdown(&self) { - self.peers.clear(); + self.peers.clear_sync(); } fn monitor(&self) -> &Mutex>> { diff --git a/src/req.rs b/src/req.rs index 8c9920d..86a2085 100644 --- a/src/req.rs +++ b/src/req.rs @@ -152,7 +152,7 @@ impl MultiPeerBackend for ReqSocketBackend { } fn peer_disconnected(&self, peer_id: &PeerIdentity) { - self.peers.remove(peer_id); + self.peers.remove_sync(peer_id); } } @@ -166,7 +166,7 @@ impl SocketBackend for ReqSocketBackend { } fn shutdown(&self) { - self.peers.clear(); + self.peers.clear_sync(); } fn monitor(&self) -> &Mutex>> { diff --git a/src/sub.rs b/src/sub.rs index e3083cc..ba8a637 100644 --- a/src/sub.rs +++ b/src/sub.rs @@ -72,7 +72,7 @@ impl SocketBackend for SubSocketBackend { } fn shutdown(&self) { - self.peers.clear(); + self.peers.clear_sync(); } fn monitor(&self) -> &Mutex>> { @@ -109,7 +109,7 @@ impl MultiPeerBackend for SubSocketBackend { } fn peer_disconnected(&self, peer_id: &PeerIdentity) { - self.peers.remove(peer_id); + self.peers.remove_sync(peer_id); } } @@ -144,7 +144,7 @@ impl SubSocket { msg_type: SubBackendMsgType, ) -> ZmqResult<()> { let message: ZmqMessage = SubSocketBackend::create_subs_message(subscription, msg_type); - let mut iter = self.backend.peers.first_entry_async().await; + let mut iter = self.backend.peers.begin_async().await; while let Some(mut peer) = iter { peer.send_queue