Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion libopflex/comms/CommunicationPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ void CommunicationPeer::bumpLastHeard() const {
}

void CommunicationPeer::onConnect() {
disconnectPending_ = false;
connected_ = true;
status_ = internal::Peer::kPS_ONLINE;

Expand All @@ -145,6 +146,14 @@ void CommunicationPeer::onConnect() {
}

void CommunicationPeer::onDisconnect() {
if (disconnectPending_.exchange(true)) {
LOG(TRACE) << this << " disconnect already pending";
return;
}
getLoopData()->enqueueDisconnect(this);
}

void CommunicationPeer::onDisconnectInternal() {
LOG(DEBUG) << this << " connected_ = " << connected_;
if (!uv_is_closing(getHandle())) {
uv_close(getHandle(), on_close);
Expand Down Expand Up @@ -188,6 +197,8 @@ void CommunicationPeer::onDisconnect() {
insert(internal::Peer::LoopData::PENDING_DELETE);
status_ = kPS_PENDING_DELETE;
}

disconnectPending_ = false;
}

void CommunicationPeer::destroy(bool now) {
Expand Down Expand Up @@ -570,4 +581,3 @@ yajr::rpc::InboundMessage * comms::internal::CommunicationPeer::parseFrame() {
} // namespace internal
} // namespace comms
} // namespace yajr

26 changes: 25 additions & 1 deletion libopflex/comms/loopdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,31 @@ void internal::Peer::LoopData::onPrepareLoop(uv_prepare_t * h) {
->onPrepareLoop();
}

void internal::Peer::LoopData::enqueueDisconnect(CommunicationPeer* peer) {
if (!peer) {
return;
}
{
const std::lock_guard<std::mutex> lock(disconnectMutex_);
disconnectQueue_.push_back(peer);
}
uv_async_send(&disconnect_async_);
}

void internal::Peer::LoopData::onDisconnectAsync(uv_async_t* handle) {
auto* loopData = static_cast< ::yajr::comms::internal::Peer::LoopData * >(handle->data);
std::vector<CommunicationPeer*> pending;
{
const std::lock_guard<std::mutex> lock(loopData->disconnectMutex_);
pending.swap(loopData->disconnectQueue_);
}
for (CommunicationPeer* peer : pending) {
if (peer) {
peer->onDisconnectInternal();
}
}
}

void internal::Peer::LoopData::fini(uv_handle_t * h) {
LOG(INFO);
delete static_cast< ::yajr::comms::internal::Peer::LoopData *>(h->data);
Expand Down Expand Up @@ -212,4 +237,3 @@ Peer::LoopData::PeerDisposer::PeerDisposer(bool now) : now_(now) {}
} /* yajr::comms::internal namespace */
} /* yajr::comms namespace */
} /* yajr namespace */

19 changes: 18 additions & 1 deletion libopflex/include/opflex/yajr/internal/comms.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <iostream>
#include <atomic>
#include <mutex>
#include <vector>

#define uv_close(h, cb) \
do { \
Expand Down Expand Up @@ -169,6 +170,8 @@ class Peer : public SafeListBaseHook {
uv_timer_init(loop, &prepareAgain_);
prepareAgain_.data = this;
uv_async_init(loop, &kickLibuv_, NULL);
uv_async_init(loop, &disconnect_async_, &onDisconnectAsync);
disconnect_async_.data = this;
}

/**
Expand Down Expand Up @@ -227,6 +230,9 @@ class Peer : public SafeListBaseHook {
uv_async_send(&kickLibuv_);
}

/** Schedule a peer disconnect on the libuv loop thread */
void enqueueDisconnect(CommunicationPeer* peer);

/** Iterate over handles and close each one
*
* @param handle UV handle
Expand All @@ -241,6 +247,9 @@ class Peer : public SafeListBaseHook {
*/
static void walkAndCountHandlesCb(uv_handle_t* handle, void* countHandles);

/** Process queued disconnects */
static void onDisconnectAsync(uv_async_t* handle);

private:
friend std::ostream& operator<< (std::ostream&, Peer::LoopData const *);

Expand All @@ -259,11 +268,15 @@ class Peer : public SafeListBaseHook {
static std::recursive_mutex peerMutex;
uv_prepare_t prepare_;
uv_async_t kickLibuv_;
uv_async_t disconnect_async_;
uv_timer_t prepareAgain_;
uint64_t lastRun_;
std::atomic<bool> destroying_;
std::atomic<uint64_t> refCount_;

std::mutex disconnectMutex_;
std::vector<CommunicationPeer*> disconnectQueue_;

friend class Peer;
};

Expand Down Expand Up @@ -511,6 +524,7 @@ class CommunicationPeer : public Peer, virtual public ::yajr::Peer {
keepAliveInterval_(0),
lastHeard_(0),
connectTimeout_(connectTimeout),
disconnectPending_(false),
transport_(transport::PlainText::getPlainTextTransport()),
asyncDocParser_([this](Document& d) -> int { return asyncDocParserCb(d); })
{
Expand Down Expand Up @@ -822,6 +836,7 @@ class CommunicationPeer : public Peer, virtual public ::yajr::Peer {
virtual ~CommunicationPeer() {}

private:
void onDisconnectInternal();

mutable ::yajr::internal::StringQueue s_;

Expand All @@ -837,6 +852,7 @@ class CommunicationPeer : public Peer, virtual public ::yajr::Peer {
std::atomic<uint64_t> keepAliveInterval_;
mutable uint64_t lastHeard_;
const uint32_t connectTimeout_;
std::atomic<bool> disconnectPending_;


::yajr::transport::Transport transport_;
Expand Down Expand Up @@ -868,6 +884,8 @@ class CommunicationPeer : public Peer, virtual public ::yajr::Peer {
int asyncDocParserCb(rapidjson::Document &d);

mutable AsyncDocumentParser<> asyncDocParser_;

friend class ::yajr::comms::internal::Peer::LoopData;
};
static_assert (sizeof(CommunicationPeer) <= 4096, "CommunicationPeer won't fit on one page");

Expand Down Expand Up @@ -1335,4 +1353,3 @@ char const * getUvHandleField(uv_handle_t * h, internal::Peer * peer);
} // namespace yajr

#endif /* _INCLUDE__OPFLEX__COMMS_INTERNAL_HPP */

Loading