Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ bytes = "1"
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
num-traits = "0.2"
dashmap = "5"
scc = "2.3.4"
crossbeam-queue = "0.3"
uuid = { version = "1", features = ["v4"] }
regex = { version = "1", default-features = false, features = [
Expand Down
11 changes: 6 additions & 5 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{

use async_trait::async_trait;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::SinkExt;
use parking_lot::Mutex;
Expand All @@ -19,7 +18,7 @@ pub(crate) struct Peer {
}

pub(crate) struct GenericSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
pub(crate) peers: scc::HashMap<PeerIdentity, Peer>,
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
Expand All @@ -34,7 +33,7 @@ impl GenericSocketBackend {
options: SocketOptions,
) -> Self {
Self {
peers: DashMap::new(),
peers: scc::HashMap::new(),
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
Expand Down Expand Up @@ -63,7 +62,7 @@ impl GenericSocketBackend {
}
},
};
let send_result = match self.peers.get_mut(&next_peer_id) {
let send_result = match self.peers.get_async(&next_peer_id).await {
Some(mut peer) => peer.send_queue.send(message).await,
None => continue,
};
Expand Down Expand Up @@ -103,7 +102,9 @@ impl SocketBackend for GenericSocketBackend {
impl MultiPeerBackend for GenericSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(peer_id.clone(), Peer { send_queue });
self.peers
.upsert_async(peer_id.clone(), Peer { send_queue })
.await;
self.round_robin.push(peer_id.clone());
match &self.fair_queue_inner {
None => {}
Expand Down
31 changes: 17 additions & 14 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::{
};

use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use futures::{select, FutureExt, StreamExt};
use parking_lot::Mutex;
Expand All @@ -27,7 +26,7 @@ pub(crate) struct Subscriber {
}

pub(crate) struct PubSocketBackend {
subscribers: DashMap<PeerIdentity, Subscriber>,
subscribers: scc::HashMap<PeerIdentity, Subscriber>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
socket_options: SocketOptions,
}
Expand All @@ -53,7 +52,7 @@ impl PubSocketBackend {
Some(1) => {
// Subscribe
self.subscribers
.get_mut(peer_id)
.get(peer_id)
.unwrap()
.subscriptions
.push(Vec::from(&data[1..]));
Expand All @@ -77,7 +76,7 @@ impl PubSocketBackend {
}
if let Some(index) = del_index {
self.subscribers
.get_mut(peer_id)
.get(peer_id)
.unwrap()
.subscriptions
.remove(index);
Expand Down Expand Up @@ -115,14 +114,16 @@ impl MultiPeerBackend for PubSocketBackend {
let (mut recv_queue, send_queue) = io.into_parts();
// TODO provide handling for recv_queue
let (sender, stop_receiver) = oneshot::channel();
self.subscribers.insert(
peer_id.clone(),
Subscriber {
subscriptions: vec![],
send_queue: Box::pin(send_queue),
_subscription_coro_stop: sender,
},
);
self.subscribers
.upsert_async(
peer_id.clone(),
Subscriber {
subscriptions: vec![],
send_queue: Box::pin(send_queue),
_subscription_coro_stop: sender,
},
)
.await;
let backend = self;
let peer_id = peer_id.clone();
async_rt::task::spawn(async move {
Expand Down Expand Up @@ -173,7 +174,8 @@ impl Drop for PubSocket {
impl SocketSend for PubSocket {
async fn send(&mut self, message: ZmqMessage) -> ZmqResult<()> {
let mut dead_peers = Vec::new();
for mut subscriber in self.backend.subscribers.iter_mut() {
let mut iter = self.backend.subscribers.first_entry_async().await;
while let Some(mut subscriber) = iter {
for sub_filter in &subscriber.subscriptions {
if sub_filter.len() <= message.get(0).unwrap().len()
&& sub_filter.as_slice() == &message.get(0).unwrap()[0..sub_filter.len()]
Expand Down Expand Up @@ -205,6 +207,7 @@ impl SocketSend for PubSocket {
break;
}
}
iter = subscriber.next_async().await;
}
for peer in dead_peers {
self.backend.peer_disconnected(&peer);
Expand All @@ -220,7 +223,7 @@ impl Socket for PubSocket {
fn with_options(options: SocketOptions) -> Self {
Self {
backend: Arc::new(PubSocketBackend {
subscribers: DashMap::new(),
subscribers: scc::HashMap::new(),
socket_monitor: Mutex::new(None),
socket_options: options,
}),
Expand Down
23 changes: 12 additions & 11 deletions src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::*;
use crate::{SocketType, ZmqResult};

use async_trait::async_trait;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use parking_lot::Mutex;

Expand All @@ -20,7 +19,7 @@ struct RepPeer {
}

struct RepSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, RepPeer>,
pub(crate) peers: scc::HashMap<PeerIdentity, RepPeer>,
fair_queue_inner: Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
socket_options: SocketOptions,
Expand All @@ -46,7 +45,7 @@ impl Socket for RepSocket {
let fair_queue = FairQueue::new(true);
Self {
backend: Arc::new(RepSocketBackend {
peers: DashMap::new(),
peers: scc::HashMap::new(),
fair_queue_inner: fair_queue.inner(),
socket_monitor: Mutex::new(None),
socket_options: options,
Expand Down Expand Up @@ -78,13 +77,15 @@ impl MultiPeerBackend for RepSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();

self.peers.insert(
peer_id.clone(),
RepPeer {
_identity: peer_id.clone(),
send_queue,
},
);
self.peers
.upsert_async(
peer_id.clone(),
RepPeer {
_identity: peer_id.clone(),
send_queue,
},
)
.await;
self.fair_queue_inner
.lock()
.insert(peer_id.clone(), recv_queue);
Expand Down Expand Up @@ -121,7 +122,7 @@ impl SocketSend for RepSocket {
async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
match self.current_request.take() {
Some(peer_id) => {
if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
if let Some(mut peer) = self.backend.peers.get_async(&peer_id).await {
if let Some(envelope) = self.envelope.take() {
message.prepend(&envelope);
}
Expand Down
27 changes: 14 additions & 13 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ use crate::{SocketType, ZmqResult};
use async_trait::async_trait;
use bytes::Bytes;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};

use std::collections::HashMap;
use std::sync::Arc;

struct ReqSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
pub(crate) peers: scc::HashMap<PeerIdentity, Peer>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_monitor: Mutex<Option<mpsc::Sender<SocketEvent>>>,
socket_options: SocketOptions,
Expand Down Expand Up @@ -58,7 +57,7 @@ impl SocketSend for ReqSocket {
})
}
};
if let Some(mut peer) = self.backend.peers.get_mut(&next_peer_id) {
if let Some(mut peer) = self.backend.peers.get_async(&next_peer_id).await {
self.backend.round_robin.push(next_peer_id.clone());
message.push_front(Bytes::new());
peer.send_queue.send(Message::Message(message)).await?;
Expand All @@ -74,7 +73,7 @@ impl SocketRecv for ReqSocket {
async fn recv(&mut self) -> ZmqResult<ZmqMessage> {
match self.current_request.take() {
Some(peer_id) => {
if let Some(mut peer) = self.backend.peers.get_mut(&peer_id) {
if let Some(mut peer) = self.backend.peers.get_async(&peer_id).await {
match peer.recv_queue.next().await {
Some(Ok(Message::Message(mut m))) => {
if m.len() < 2 {
Expand Down Expand Up @@ -110,7 +109,7 @@ impl Socket for ReqSocket {
fn with_options(options: SocketOptions) -> Self {
Self {
backend: Arc::new(ReqSocketBackend {
peers: DashMap::new(),
peers: scc::HashMap::new(),
round_robin: SegQueue::new(),
socket_monitor: Mutex::new(None),
socket_options: options,
Expand Down Expand Up @@ -139,14 +138,16 @@ impl Socket for ReqSocket {
impl MultiPeerBackend for ReqSocketBackend {
async fn peer_connected(self: Arc<Self>, peer_id: &PeerIdentity, io: FramedIo) {
let (recv_queue, send_queue) = io.into_parts();
self.peers.insert(
peer_id.clone(),
Peer {
_identity: peer_id.clone(),
send_queue,
recv_queue,
},
);
self.peers
.upsert_async(
peer_id.clone(),
Peer {
_identity: peer_id.clone(),
send_queue,
recv_queue,
},
)
.await;
self.round_robin.push(peer_id.clone());
}

Expand Down
2 changes: 1 addition & 1 deletion src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl SocketSend for RouterSocket {
async fn send(&mut self, mut message: ZmqMessage) -> ZmqResult<()> {
assert!(message.len() > 1);
let peer_id: PeerIdentity = message.pop_front().unwrap().try_into()?;
match self.backend.peers.get_mut(&peer_id) {
match self.backend.peers.get_async(&peer_id).await {
Some(mut peer) => {
peer.send_queue.send(Message::Message(message)).await?;
Ok(())
Expand Down
13 changes: 8 additions & 5 deletions src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use crate::{
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use parking_lot::Mutex;
Expand All @@ -28,7 +27,7 @@ pub enum SubBackendMsgType {
}

pub(crate) struct SubSocketBackend {
pub(crate) peers: DashMap<PeerIdentity, Peer>,
pub(crate) peers: scc::HashMap<PeerIdentity, Peer>,
fair_queue_inner: Option<Arc<Mutex<QueueInner<ZmqFramedRead, PeerIdentity>>>>,
pub(crate) round_robin: SegQueue<PeerIdentity>,
socket_type: SocketType,
Expand All @@ -44,7 +43,7 @@ impl SubSocketBackend {
options: SocketOptions,
) -> Self {
Self {
peers: DashMap::new(),
peers: scc::HashMap::new(),
fair_queue_inner,
round_robin: SegQueue::new(),
socket_type,
Expand Down Expand Up @@ -97,7 +96,9 @@ impl MultiPeerBackend for SubSocketBackend {
send_queue.send(Message::Message(message)).await.unwrap();
}

self.peers.insert(peer_id.clone(), Peer { send_queue });
self.peers
.upsert_async(peer_id.clone(), Peer { send_queue })
.await;
self.round_robin.push(peer_id.clone());
match &self.fair_queue_inner {
None => {}
Expand Down Expand Up @@ -143,11 +144,13 @@ impl SubSocket {
msg_type: SubBackendMsgType,
) -> ZmqResult<()> {
let message: ZmqMessage = SubSocketBackend::create_subs_message(subscription, msg_type);
let mut iter = self.backend.peers.first_entry_async().await;

for mut peer in self.backend.peers.iter_mut() {
while let Some(mut peer) = iter {
peer.send_queue
.send(Message::Message(message.clone()))
.await?;
iter = peer.next_async().await;
}
Ok(())
}
Expand Down
Loading