diff --git a/CMakeLists.txt b/CMakeLists.txt index 3500ff4c..5f7129d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,6 +114,8 @@ if ("${CMAKE_CXX_COMPILER_ID}" MATCHES "^(AppleClang|Clang|GNU)$") add_flag(-Werror-non-virtual-dtor) # warn the user if a class with virtual functions has a non-virtual destructor. This helps catch hard to track down memory errors add_flag(-Werror-sign-compare) # warn the user if they compare a signed and unsigned numbers add_flag(-Werror-reorder) # field '$1' will be initialized after field '$2' + + add_link_options(-rdynamic) elseif ("${CMAKE_CXX_COMPILER_ID}" STREQUAL "MSVC") # using Visual Studio C++ # TODO(warchant): add flags https://github.com/lefticus/cppbestpractices/blob/master/02-Use_the_Tools_Available.md#msvc diff --git a/include/libp2p/muxer/yamux/hardware_tracker.hpp b/include/libp2p/muxer/yamux/hardware_tracker.hpp new file mode 100644 index 00000000..5653c93d --- /dev/null +++ b/include/libp2p/muxer/yamux/hardware_tracker.hpp @@ -0,0 +1,91 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace libp2p::connection { + +class YamuxedConnection; + +class HardwareSharedPtrTracker { +public: + static HardwareSharedPtrTracker& getInstance() { + static HardwareSharedPtrTracker instance; + return instance; + } + + // Start tracking the reference count of a shared_ptr + void startTracking(const std::shared_ptr& ptr); + + // Stop current tracking + void stopTracking(); + + // Check if tracking is active + bool isTracking() const { return is_tracking_; } + + // Enable/disable tracking + void enable() { enabled_ = true; } + void disable() { enabled_ = false; } + +private: + HardwareSharedPtrTracker(); + ~HardwareSharedPtrTracker(); + + // Get the address of the reference count in a shared_ptr + void* getRefCountAddress(const std::shared_ptr& ptr); + + // Set hardware watchpoint + bool setHardwareWatchpoint(void* address); + + // Remove hardware watchpoint + bool removeHardwareWatchpoint(); + + // Signal handler for memory changes + static void signalHandler(int sig, siginfo_t* info, void* context); + + // Print stack trace + void printStackTrace(); + + // Check counter and switch to the next object if needed + void checkAndSwitchIfNeeded(); + +private: + std::atomic enabled_{false}; + std::atomic is_tracking_{false}; + std::atomic should_stop_{false}; // Flag for signal handler + + void* watched_address_{nullptr}; + int watchpoint_fd_{-1}; // Store perf_event fd + std::weak_ptr current_tracked_ptr_; + + // For debug registers + static constexpr int DR7_L0 = 1; // Local enable for DR0 + static constexpr int DR7_RW0_WRITE = (1 << 16); // Watch writes to DR0 + static constexpr int DR7_LEN0_4BYTES = (3 << 18); // 4-byte length for DR0 + + static HardwareSharedPtrTracker* instance_; + struct sigaction old_sigtrap_action_; +}; + +// Global function for setting in yamux.cpp +void trackNextYamuxedConnection(const std::shared_ptr& ptr); + +// Macros for convenience +#define YAMUX_HARDWARE_TRACK_ENABLE() \ + libp2p::connection::HardwareSharedPtrTracker::getInstance().enable() + +#define YAMUX_HARDWARE_TRACK_DISABLE() \ + libp2p::connection::HardwareSharedPtrTracker::getInstance().disable() + +#define YAMUX_HARDWARE_TRACK_SHARED_PTR(ptr) \ + libp2p::connection::trackNextYamuxedConnection(ptr) + +} // namespace libp2p::connection \ No newline at end of file diff --git a/include/libp2p/muxer/yamux/yamux_stream.hpp b/include/libp2p/muxer/yamux/yamux_stream.hpp index 7ed91e8f..4d72ef84 100644 --- a/include/libp2p/muxer/yamux/yamux_stream.hpp +++ b/include/libp2p/muxer/yamux/yamux_stream.hpp @@ -133,7 +133,7 @@ namespace libp2p::connection { closeCompleted(); /// Underlying connection (secured) - std::shared_ptr connection_; + std::weak_ptr connection_; /// Yamux-specific interface of connection YamuxStreamFeedback &feedback_; diff --git a/include/libp2p/muxer/yamux/yamuxed_connection.hpp b/include/libp2p/muxer/yamux/yamuxed_connection.hpp index 6ac7dd59..651c555e 100644 --- a/include/libp2p/muxer/yamux/yamuxed_connection.hpp +++ b/include/libp2p/muxer/yamux/yamuxed_connection.hpp @@ -78,6 +78,16 @@ namespace libp2p::connection { ReadCallbackFunc cb) override; void deferWriteCallback(std::error_code ec, WriteCallbackFunc cb) override; + void markAsRegistered(); + + size_t getStreamsCount() const { return streams_.size(); } + size_t getPendingStreamsCount() const { return pending_outbound_streams_.size(); } + long getSharedPtrUseCount() const { return shared_from_this().use_count(); } + + void debugPrintActiveStreams() const; + + void debugPrintMemoryLeakSources() const; + private: using Streams = std::unordered_map>; @@ -241,6 +251,8 @@ namespace libp2p::connection { bool close_after_write_ = false; + bool registered_in_manager_ = false; + public: LIBP2P_METRICS_INSTANCE_COUNT_IF_ENABLED( libp2p::connection::YamuxedConnection); diff --git a/include/libp2p/network/impl/connection_manager_impl.hpp b/include/libp2p/network/impl/connection_manager_impl.hpp index 629ce4d4..f207ba1c 100644 --- a/include/libp2p/network/impl/connection_manager_impl.hpp +++ b/include/libp2p/network/impl/connection_manager_impl.hpp @@ -7,6 +7,7 @@ #pragma once #include +#include #include #include @@ -46,6 +47,12 @@ namespace libp2p::network { /// Reentrancy resolver between closeConnectionsToPeer and /// onConnectionClosed boost::optional closing_connections_to_peer_; + + /// Mutex to protect connection_is_closing_ set + std::mutex connection_mutex_; + + /// Set of connections currently being closed to prevent double closing + std::unordered_set connection_is_closing_; }; } // namespace libp2p::network diff --git a/src/muxer/yamux/CMakeLists.txt b/src/muxer/yamux/CMakeLists.txt index 0b3ec6fd..9ea71d1b 100644 --- a/src/muxer/yamux/CMakeLists.txt +++ b/src/muxer/yamux/CMakeLists.txt @@ -16,6 +16,7 @@ libp2p_add_library(p2p_yamuxed_connection yamux_frame.cpp yamux_stream.cpp yamux_reading_state.cpp + hardware_tracker.cpp ) target_link_libraries(p2p_yamuxed_connection Boost::boost diff --git a/src/muxer/yamux/hardware_tracker.cpp b/src/muxer/yamux/hardware_tracker.cpp new file mode 100644 index 00000000..a512e29b --- /dev/null +++ b/src/muxer/yamux/hardware_tracker.cpp @@ -0,0 +1,277 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace libp2p::connection { + +HardwareSharedPtrTracker* HardwareSharedPtrTracker::instance_ = nullptr; + +HardwareSharedPtrTracker::HardwareSharedPtrTracker() { + instance_ = this; + + struct sigaction sa; + sa.sa_sigaction = signalHandler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_SIGINFO | SA_RESTART; + + if (sigaction(SIGTRAP, &sa, &old_sigtrap_action_) == -1) { + std::cerr << "Failed to install SIGTRAP handler\n"; + } + + std::cout << "HardwareSharedPtrTracker initialized\n"; +} + +HardwareSharedPtrTracker::~HardwareSharedPtrTracker() { + stopTracking(); + sigaction(SIGTRAP, &old_sigtrap_action_, nullptr); + instance_ = nullptr; +} + +void* HardwareSharedPtrTracker::getRefCountAddress(const std::shared_ptr& ptr) { + struct shared_ptr_internal { + void* ptr; + void* control_block; + }; + + auto* internal = reinterpret_cast(&ptr); + void* control_block = internal->control_block; + + if (!control_block) { + return nullptr; + } + + uint32_t* ref_count_addr = (uint32_t*)((uint8_t*)control_block + sizeof(void*)); + + std::cout << "Control block address: " << control_block << "\n"; + std::cout << "Reference count address: " << ref_count_addr << "\n"; + std::cout << "Current use_count: " << ptr.use_count() << "\n"; + assert(*ref_count_addr == ptr.use_count()); + + return ref_count_addr; +} + +bool HardwareSharedPtrTracker::setHardwareWatchpoint(void* address) { + struct perf_event_attr pe; + memset(&pe, 0, sizeof(pe)); + + pe.type = PERF_TYPE_BREAKPOINT; + pe.size = sizeof(pe); + pe.config = 0; + pe.bp_type = HW_BREAKPOINT_W; // Только запись, чтение может генерировать много событий + pe.bp_addr = reinterpret_cast(address); + pe.bp_len = HW_BREAKPOINT_LEN_4; // 4 байта для int + pe.disabled = 0; // Включен сразу + pe.exclude_kernel = 1; + pe.exclude_hv = 1; + pe.exclude_user = 0; // Отслеживаем user space + pe.sample_period = 1; // Генерировать событие на каждое изменение + pe.wakeup_events = 1; // Пробуждать на каждое событие + + std::cout << "🔧 Настройка hardware watchpoint:\n"; + std::cout << " Адрес: " << address << "\n"; + std::cout << " bp_addr: 0x" << std::hex << pe.bp_addr << std::dec << "\n"; + std::cout << " bp_len: " << pe.bp_len << "\n"; + std::cout << " bp_type: " << pe.bp_type << " (только запись)\n"; + + int fd = syscall(__NR_perf_event_open, &pe, 0, -1, -1, 0); + if (fd == -1) { + perror("❌ perf_event_open неудачен"); + std::cout << " Ошибка: " << strerror(errno) << "\n"; + std::cout << " Код ошибки: " << errno << "\n"; + return false; + } + + // Настраиваем доставку сигнала + struct f_owner_ex owner; + owner.type = F_OWNER_TID; + owner.pid = syscall(SYS_gettid); + + if (fcntl(fd, F_SETOWN_EX, &owner) == -1) { + perror("⚠️ fcntl F_SETOWN_EX failed"); + } + + if (fcntl(fd, F_SETFL, O_ASYNC) == -1) { + perror("⚠️ fcntl F_SETFL failed"); + } + + if (fcntl(fd, F_SETSIG, SIGTRAP) == -1) { + perror("⚠️ fcntl F_SETSIG failed"); + } + + watchpoint_fd_ = fd; + + std::cout << "✅ Hardware watchpoint установлен на адрес " << address + << " (fd=" << fd << ")\n"; + + return true; +} + +bool HardwareSharedPtrTracker::removeHardwareWatchpoint() { + if (watchpoint_fd_ != -1) { + close(watchpoint_fd_); + watchpoint_fd_ = -1; + std::cout << "Hardware watchpoint removed\n"; + return true; + } + return false; +} + +void HardwareSharedPtrTracker::signalHandler(int sig, siginfo_t* info, void* context) { + if (!instance_ || sig != SIGTRAP) { + return; + } + + static int call_number = 0; + call_number++; + + const char msg[] = "\n🚨 === HARDWARE BREAKPOINT: REFERENCE COUNT CHANGED === 🚨\n"; + write(STDOUT_FILENO, msg, sizeof(msg) - 1); + + char call_msg[200]; + int len = snprintf(call_msg, sizeof(call_msg), "📍 Изменение #%d - Адрес: %p\n", call_number, info->si_addr); + write(STDOUT_FILENO, call_msg, len); + + const char stack_msg[] = "📚 Стек трассировки (точное место изменения счетчика ссылок):\n"; + write(STDOUT_FILENO, stack_msg, sizeof(stack_msg) - 1); + + // Получаем стек трассировки + const int max_frames = 7; + void* buffer[max_frames]; + int nframes = backtrace(buffer, max_frames); + + // Используем backtrace_symbols для получения символов + char** symbols = backtrace_symbols(buffer, nframes); + + if (symbols) { + for (int i = 0; i < nframes; ++i) { + char frame_msg[500]; + int frame_len = snprintf(frame_msg, sizeof(frame_msg), + " [%2d] %s\n", i, symbols[i]); + write(STDOUT_FILENO, frame_msg, frame_len); + } + free(symbols); + } else { + // Fallback - используем backtrace_symbols_fd если symbols == NULL + backtrace_symbols_fd(buffer, nframes, STDOUT_FILENO); + } + + const char end_msg[] = "🔚 ============================================== 🔚\n\n"; + write(STDOUT_FILENO, end_msg, sizeof(end_msg) - 1); +} + +void HardwareSharedPtrTracker::printStackTrace() { + const int max_frames = 7; + void* buffer[max_frames]; + + int nframes = backtrace(buffer, max_frames); + char** symbols = backtrace_symbols(buffer, nframes); + + std::cout << "Stack trace (reference count change):\n"; + + for (int i = 0; i < nframes; ++i) { + std::cout << " [" << i << "] " << (symbols ? symbols[i] : "???") << "\n"; + } + + if (symbols) { + free(symbols); + } +} + +void HardwareSharedPtrTracker::checkAndSwitchIfNeeded() { + if (should_stop_.exchange(false)) { + std::cout << "\n=== HARDWARE BREAKPOINT TRIGGERED ===\n"; + printStackTrace(); + + if (auto ptr = current_tracked_ptr_.lock()) { + long count = ptr.use_count(); + std::cout << "Current use_count: " << count << "\n"; + + if (count <= 1) { + std::cout << "Object will be deleted soon (use_count=" << count << ")\n"; + std::cout << "Stopping hardware tracking\n"; + stopTracking(); + } + } else { + std::cout << "Tracked object already deleted\n"; + std::cout << "Stopping hardware tracking\n"; + stopTracking(); + } + + std::cout << "====================================\n\n"; + } +} + +void HardwareSharedPtrTracker::startTracking(const std::shared_ptr& ptr) { + if (!enabled_) { + return; + } + + if (!current_tracked_ptr_.expired()) { + std::cout << "Already tracking another YamuxedConnection (address: active" + << "), ignoring new request\n"; + return; + } + + stopTracking(); + std::cout << "\n=== HARDWARE TRACKING STARTED ===\n"; + std::cout << "YamuxedConnection address: " << ptr.get() << "\n"; + std::cout << "shared_ptr use_count: " << ptr.use_count() << "\n"; + + void* ref_count_addr = getRefCountAddress(ptr); + if (!ref_count_addr) { + std::cerr << "Failed to get reference count address\n"; + return; + } + + if (!setHardwareWatchpoint(ref_count_addr)) { + std::cerr << "Failed to set hardware watchpoint\n"; + return; + } + + watched_address_ = ref_count_addr; + current_tracked_ptr_ = ptr; + is_tracking_ = true; + should_stop_ = false; + + std::cout << "Hardware tracking activated\n"; + std::cout << "=================================\n\n"; +} + +void HardwareSharedPtrTracker::stopTracking() { + if (!is_tracking_) { + return; + } + + std::cout << "\n=== HARDWARE TRACKING STOPPED ===\n"; + + removeHardwareWatchpoint(); + + watched_address_ = nullptr; + current_tracked_ptr_.reset(); + is_tracking_ = false; + should_stop_ = false; + + std::cout << "Hardware tracking stopped\n"; + std::cout << "=================================\n\n"; +} + +void trackNextYamuxedConnection(const std::shared_ptr& ptr) { + auto& tracker = HardwareSharedPtrTracker::getInstance(); + tracker.startTracking(ptr); +} + +} // namespace libp2p::connection \ No newline at end of file diff --git a/src/muxer/yamux/yamux.cpp b/src/muxer/yamux/yamux.cpp index 96ee3d85..169f0bd5 100644 --- a/src/muxer/yamux/yamux.cpp +++ b/src/muxer/yamux/yamux.cpp @@ -8,12 +8,14 @@ #include #include +#include namespace libp2p::muxer { Yamux::Yamux(MuxedConnectionConfig config, std::shared_ptr scheduler, std::shared_ptr cmgr) : config_{config}, scheduler_{std::move(scheduler)} { + connection::HardwareSharedPtrTracker::getInstance().enable(); assert(scheduler_); if (cmgr) { std::weak_ptr w(cmgr); @@ -43,7 +45,11 @@ namespace libp2p::muxer { "inactive connection passed to muxer: {}", res.error()); return cb(res.error()); } - cb(std::make_shared( - std::move(conn), scheduler_, close_cb_, config_)); + + auto yamux_connection = std::make_shared( + std::move(conn), scheduler_, close_cb_, config_); + connection::trackNextYamuxedConnection(yamux_connection); + + cb(yamux_connection); } } // namespace libp2p::muxer diff --git a/src/muxer/yamux/yamux_stream.cpp b/src/muxer/yamux/yamux_stream.cpp index 3dbe594e..6ae4a74a 100644 --- a/src/muxer/yamux/yamux_stream.cpp +++ b/src/muxer/yamux/yamux_stream.cpp @@ -30,14 +30,14 @@ namespace libp2p::connection { uint32_t stream_id, size_t maximum_window_size, size_t write_queue_limit) - : connection_(std::move(connection)), + : connection_(connection), feedback_(feedback), stream_id_(stream_id), window_size_(YamuxFrame::kInitialWindowSize), peers_window_size_(YamuxFrame::kInitialWindowSize), maximum_window_size_(maximum_window_size), write_queue_(write_queue_limit) { - assert(connection_); + assert(connection); assert(stream_id_ > 0); assert(window_size_ <= maximum_window_size_); assert(peers_window_size_ <= maximum_window_size_); @@ -151,19 +151,31 @@ namespace libp2p::connection { } outcome::result YamuxStream::remotePeerId() const { - return connection_->remotePeer(); + if (auto conn = connection_.lock()) { + return conn->remotePeer(); + } + return Error::STREAM_RESET_BY_HOST; } outcome::result YamuxStream::isInitiator() const { - return connection_->isInitiator(); + if (auto conn = connection_.lock()) { + return conn->isInitiator(); + } + return Error::STREAM_RESET_BY_HOST; } outcome::result YamuxStream::localMultiaddr() const { - return connection_->localMultiaddr(); + if (auto conn = connection_.lock()) { + return conn->localMultiaddr(); + } + return Error::STREAM_RESET_BY_HOST; } outcome::result YamuxStream::remoteMultiaddr() const { - return connection_->remoteMultiaddr(); + if (auto conn = connection_.lock()) { + return conn->remoteMultiaddr(); + } + return Error::STREAM_RESET_BY_HOST; } void YamuxStream::increaseSendWindow(size_t delta) { diff --git a/src/muxer/yamux/yamuxed_connection.cpp b/src/muxer/yamux/yamuxed_connection.cpp index 198cbd63..172e5587 100644 --- a/src/muxer/yamux/yamuxed_connection.cpp +++ b/src/muxer/yamux/yamuxed_connection.cpp @@ -87,6 +87,11 @@ namespace libp2p::connection { return; } started_ = false; + + ping_handle_.reset(); + cleanup_handle_.reset(); + inactivity_handle_.reset(); + SL_INFO(log(), "=== ALL TIMERS CANCELLED IN stop() ==="); } outcome::result> YamuxedConnection::newStream() { @@ -537,6 +542,11 @@ namespace libp2p::connection { auto self = shared_from_this(); started_ = false; + ping_handle_.reset(); + cleanup_handle_.reset(); + inactivity_handle_.reset(); + SL_INFO(log(), "=== ALL TIMERS CANCELLED IN close() ==="); + SL_DEBUG(log(), "closing connection, reason: {}", notify_streams_code); Streams streams; @@ -553,8 +563,18 @@ namespace libp2p::connection { cb(notify_streams_code); } - if (closed_callback_) { - closed_callback_(remote_peer_, shared_from_this()); + if (closed_callback_ && registered_in_manager_) { + auto self_ptr = shared_from_this(); + SL_INFO(log(), "=== CALLING closed_callback_ ==="); + SL_INFO(log(), "YamuxedConnection address: {}", (void*)this); + SL_INFO(log(), "shared_ptr address: {}", (void*)self_ptr.get()); + SL_INFO(log(), "shared_ptr use_count: {}", self_ptr.use_count()); + SL_INFO(log(), "remote_peer_: {}", remote_peer_.toBase58()); + + closed_callback_(remote_peer_, std::static_pointer_cast(self_ptr)); + + SL_INFO(log(), "=== closed_callback_ FINISHED ==="); + SL_INFO(log(), "shared_ptr use_count after callback: {}", self_ptr.use_count()); } close_after_write_ = true; @@ -744,7 +764,8 @@ namespace libp2p::connection { } std::vector abandoned; for (auto &[id, stream] : self->streams_) { - if (stream.use_count() == 1) { + auto stream_holder = stream; + if (stream_holder.use_count() == 2) { // 2 = our + в streams_ abandoned.push_back(id); self->enqueue(resetStreamMsg(id)); } @@ -779,4 +800,65 @@ namespace libp2p::connection { }, config_.ping_interval); } + + void YamuxedConnection::markAsRegistered() { + registered_in_manager_ = true; + } + + void YamuxedConnection::debugPrintActiveStreams() const { + log()->info("=== DEBUG: YamuxedConnection {} active streams analysis ===", + (void*)this); + log()->info("YamuxedConnection shared_ptr use_count: {}", + shared_from_this().use_count()); + log()->info("Total streams: {}, pending: {}", + streams_.size(), pending_outbound_streams_.size()); + + for (const auto& [id, stream] : streams_) { + log()->info("Stream {}: use_count={}, readable={}, writable={}", + id, stream.use_count(), + !stream->isClosedForRead(), !stream->isClosedForWrite()); + } + + for (const auto& [id, cb] : pending_outbound_streams_) { + log()->info("Pending stream {}: callback set", id); + } + log()->info("=== END DEBUG ==="); + } + + void YamuxedConnection::debugPrintMemoryLeakSources() const { + log()->info("=== MEMORY LEAK DIAGNOSTIC for YamuxedConnection {} ===", (void*)this); + log()->info("shared_ptr use_count: {}", shared_from_this().use_count()); + log()->info("started_: {}", started_); + log()->info("registered_in_manager_: {}", registered_in_manager_); + log()->info("is_writing_: {}", is_writing_); + log()->info("close_after_write_: {}", close_after_write_); + + log()->info("Active streams: {}", streams_.size()); + log()->info("Pending outbound streams: {}", pending_outbound_streams_.size()); + log()->info("Fresh streams (awaiting handlers): {}", fresh_streams_.size()); + log()->info("Write queue size: {}", write_queue_.size()); + + log()->info("Timer handles active:"); + log()->info(" ping_handle_ active: {}", static_cast(ping_handle_)); + log()->info(" cleanup_handle_ active: {}", static_cast(cleanup_handle_)); + log()->info(" inactivity_handle_ active: {}", static_cast(inactivity_handle_)); + + log()->info("Connection state:"); + log()->info(" connection_.use_count(): {}", connection_.use_count()); + log()->info(" connection_->isClosed(): {}", connection_->isClosed()); + + log()->info("Callbacks registered:"); + log()->info(" closed_callback_ set: {}", static_cast(closed_callback_)); + log()->info(" new_stream_handler_ set: {}", static_cast(new_stream_handler_)); + + for (const auto& [id, stream] : streams_) { + log()->info("Stream {} detailed info:", id); + log()->info(" use_count: {}", stream.use_count()); + log()->info(" is closed: {}", stream->isClosed()); + log()->info(" closed for read: {}", stream->isClosedForRead()); + log()->info(" closed for write: {}", stream->isClosedForWrite()); + } + + log()->info("=== END MEMORY LEAK DIAGNOSTIC ==="); + } } // namespace libp2p::connection diff --git a/src/network/impl/CMakeLists.txt b/src/network/impl/CMakeLists.txt index 274ab550..c794e1db 100644 --- a/src/network/impl/CMakeLists.txt +++ b/src/network/impl/CMakeLists.txt @@ -22,6 +22,7 @@ target_link_libraries(p2p_listener_manager p2p_multiaddress p2p_peer_id p2p_logger + p2p_yamuxed_connection ) diff --git a/src/network/impl/connection_manager_impl.cpp b/src/network/impl/connection_manager_impl.cpp index 1b9d7d54..e5bc21b2 100644 --- a/src/network/impl/connection_manager_impl.cpp +++ b/src/network/impl/connection_manager_impl.cpp @@ -7,6 +7,12 @@ #include #include +#include +#include +#include +#include +#include +#include namespace libp2p::network { @@ -51,6 +57,11 @@ namespace libp2p::network { void ConnectionManagerImpl::addConnectionToPeer( const peer::PeerId &p, ConnectionManager::ConnectionSPtr c) { + SL_INFO(log(), "=== addConnectionToPeer CALLED ==="); + SL_INFO(log(), "peer: {}", p.toBase58()); + SL_INFO(log(), "connection address: {}", (void*)c.get()); + SL_INFO(log(), "connection use_count: {}", c.use_count()); + if (c == nullptr) { log()->error("inconsistency: not adding nullptr to active connections"); return; @@ -58,11 +69,22 @@ namespace libp2p::network { auto it = connections_.find(p); if (it == connections_.end()) { + SL_INFO(log(), "Creating new peer entry in connections_"); connections_.insert({p, {c}}); } else { + SL_INFO(log(), "Adding to existing peer entry (current size: {})", it->second.size()); connections_[p].insert(c); } + + SL_INFO(log(), "Connection successfully added. Total peers: {}", connections_.size()); + auto it_check = connections_.find(p); + if (it_check != connections_.end()) { + SL_INFO(log(), "Verification: peer {} now has {} connections", + p.toBase58(), it_check->second.size()); + } + bus_->getChannel().publish(c); + SL_INFO(log(), "=== addConnectionToPeer FINISHED ==="); } std::vector @@ -139,29 +161,58 @@ namespace libp2p::network { } } - void ConnectionManagerImpl::onConnectionClosed( - const peer::PeerId &peer_id, - const std::shared_ptr &conn) { + void ConnectionManagerImpl::onConnectionClosed(const peer::PeerId &peer, + const std::shared_ptr &connection) { + SL_INFO(log(), "=== onConnectionClosed CALLED ==="); + SL_INFO(log(), "peer: {}", peer.toBase58()); + SL_INFO(log(), "connection address: {}", (void*)connection.get()); + SL_INFO(log(), "connection use_count: {}", connection.use_count()); + if (closing_connections_to_peer_.has_value() - && closing_connections_to_peer_.value() == peer_id) { + && closing_connections_to_peer_.value() == peer) { return; } - auto it = connections_.find(peer_id); + + auto it = connections_.find(peer); if (it == connections_.end()) { - log()->error("inconsistency in onConnectionClosed, peer not found"); + SL_WARN(log(), "onConnectionClosed called for peer {} that was not in connection manager (connection may have been closed before being added)", peer.toBase58()); return; } - [[maybe_unused]] auto erased = it->second.erase(conn); + SL_INFO(log(), "Found peer in connections_, set size: {}", it->second.size()); + + // Log all connections in set for comparison + for (const auto& conn : it->second) { + SL_INFO(log(), "Existing connection in set: address={}, use_count={}", + (void*)conn.get(), conn.use_count()); + } + + std::unique_lock lock(connection_mutex_); + if (connection_is_closing_.count(connection) != 0) { + SL_WARN(log(), "Connection {} is already being closed", (void*)connection.get()); + return; + } + connection_is_closing_.insert(connection); + lock.unlock(); + + auto erased = it->second.erase(connection); + SL_INFO(log(), "Erased count: {}", erased); + SL_INFO(log(), "Peer connection set size after erase: {}", it->second.size()); + if (erased == 0) { - log()->error("inconsistency in onConnectionClosed, connection not found"); + SL_WARN(log(), "Connection {} was not found in connections_ for peer {}", + (void*)connection.get(), peer.toBase58()); + } else { + SL_INFO(log(), "Connection {} was successfully removed", (void*)connection.get()); } if (it->second.empty()) { - connections_.erase(peer_id); - bus_->getChannel().publish( - peer_id); + connections_.erase(peer); + SL_INFO(log(), "Peer {} removed from connections_ (no more connections)", peer.toBase58()); + bus_->getChannel().publish(peer); } + + SL_INFO(log(), "=== onConnectionClosed FINISHED ==="); } } // namespace libp2p::network diff --git a/src/network/impl/listener_manager_impl.cpp b/src/network/impl/listener_manager_impl.cpp index b2217826..374cc4fd 100644 --- a/src/network/impl/listener_manager_impl.cpp +++ b/src/network/impl/listener_manager_impl.cpp @@ -7,6 +7,7 @@ #include #include +#include namespace libp2p::network { @@ -240,6 +241,9 @@ namespace libp2p::network { }); // store connection + if (auto yamux_conn = std::dynamic_pointer_cast(conn)) { + yamux_conn->markAsRegistered(); + } this->cmgr_->addConnectionToPeer(id, conn); } diff --git a/test_your_hardware_tracker.cpp b/test_your_hardware_tracker.cpp new file mode 100644 index 00000000..493dc335 --- /dev/null +++ b/test_your_hardware_tracker.cpp @@ -0,0 +1,339 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Простая заглушка для YamuxedConnection для тестирования hardware tracker'а +class YamuxedConnection { +public: + YamuxedConnection(int id) : id_(id) { + std::cout << "YamuxedConnection(" << id_ << ") создан по адресу " << this << std::endl; + } + + ~YamuxedConnection() { + std::cout << "YamuxedConnection(" << id_ << ") удален с адреса " << this << std::endl; + } + + int getId() const { return id_; } + +private: + int id_; +}; + +class HardwareSharedPtrTracker { +public: + static HardwareSharedPtrTracker& getInstance() { + static HardwareSharedPtrTracker instance; + return instance; + } + + // Start tracking the reference count of a shared_ptr + void startTracking(std::shared_ptr ptr); + + // Stop current tracking + void stopTracking(); + + // Check if tracking is active + bool isTracking() const { return is_tracking_; } + + // Enable/disable tracking + void enable() { enabled_ = true; } + void disable() { enabled_ = false; } + +private: + HardwareSharedPtrTracker(); + ~HardwareSharedPtrTracker(); + + // Get the address of the reference count in a shared_ptr + void* getRefCountAddress(const std::shared_ptr& ptr); + + // Set hardware watchpoint + bool setHardwareWatchpoint(void* address); + + // Remove hardware watchpoint + bool removeHardwareWatchpoint(); + + // Signal handler for memory changes + static void signalHandler(int sig, siginfo_t* info, void* context); + + // Print stack trace + void printStackTrace(); + + // Check counter and switch to the next object if needed + void checkAndSwitchIfNeeded(); + +private: + std::atomic enabled_{false}; + std::atomic is_tracking_{false}; + std::atomic should_stop_{false}; // Flag for signal handler + + void* watched_address_{nullptr}; + int watchpoint_fd_{-1}; // Store perf_event fd + std::weak_ptr current_tracked_ptr_; + + // For debug registers + static constexpr int DR7_L0 = 1; // Local enable for DR0 + static constexpr int DR7_RW0_WRITE = (1 << 16); // Watch writes to DR0 + static constexpr int DR7_LEN0_4BYTES = (3 << 18); // 4-byte length for DR0 + + static HardwareSharedPtrTracker* instance_; + struct sigaction old_sigtrap_action_; +}; + +// Global function for setting in yamux.cpp +void trackNextYamuxedConnection(std::shared_ptr ptr); + +// Macros for convenience +#define YAMUX_HARDWARE_TRACK_ENABLE() \ + libp2p::connection::HardwareSharedPtrTracker::getInstance().enable() + +#define YAMUX_HARDWARE_TRACK_DISABLE() \ + libp2p::connection::HardwareSharedPtrTracker::getInstance().disable() + +#define YAMUX_HARDWARE_TRACK_SHARED_PTR(ptr) \ + libp2p::connection::trackNextYamuxedConnection(ptr) + + +HardwareSharedPtrTracker* HardwareSharedPtrTracker::instance_ = nullptr; + +HardwareSharedPtrTracker::HardwareSharedPtrTracker() { + instance_ = this; + + struct sigaction sa; + sa.sa_sigaction = signalHandler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = SA_SIGINFO | SA_RESTART; + + if (sigaction(SIGTRAP, &sa, &old_sigtrap_action_) == -1) { + std::cerr << "Failed to install SIGTRAP handler\n"; + } + + std::cout << "HardwareSharedPtrTracker initialized\n"; +} + +HardwareSharedPtrTracker::~HardwareSharedPtrTracker() { + stopTracking(); + sigaction(SIGTRAP, &old_sigtrap_action_, nullptr); + instance_ = nullptr; +} + +void* HardwareSharedPtrTracker::getRefCountAddress(const std::shared_ptr& ptr) { + struct shared_ptr_internal { + void* ptr; + void* control_block; + }; + + auto* internal = reinterpret_cast(&ptr); + void* control_block = internal->control_block; + + if (!control_block) { + return nullptr; + } + + void* ref_count_addr = control_block; + + std::cout << "Control block address: " << control_block << "\n"; + std::cout << "Reference count address: " << ref_count_addr << "\n"; + std::cout << "Current use_count: " << ptr.use_count() << "\n"; + + return ref_count_addr; +} + +bool HardwareSharedPtrTracker::setHardwareWatchpoint(void* address) { + struct perf_event_attr pe; + memset(&pe, 0, sizeof(pe)); + + pe.type = PERF_TYPE_BREAKPOINT; + pe.size = sizeof(pe); + pe.config = 0; + pe.bp_type = HW_BREAKPOINT_W | HW_BREAKPOINT_R; + pe.bp_addr = reinterpret_cast(address); + pe.bp_len = sizeof(long); + pe.disabled = 0; + pe.exclude_kernel = 1; + pe.exclude_hv = 1; + + int fd = syscall(__NR_perf_event_open, &pe, 0, -1, -1, 0); + if (fd == -1) { + perror("perf_event_open for hardware watchpoint failed"); + return false; + } + + watchpoint_fd_ = fd; + + std::cout << "Hardware watchpoint set on address " << address + << " (fd=" << fd << ")\n"; + + return true; +} + +bool HardwareSharedPtrTracker::removeHardwareWatchpoint() { + if (watchpoint_fd_ != -1) { + close(watchpoint_fd_); + watchpoint_fd_ = -1; + std::cout << "Hardware watchpoint removed\n"; + return true; + } + return false; +} + +void HardwareSharedPtrTracker::signalHandler(int sig, siginfo_t* info, void* context) { + if (!instance_ || sig != SIGTRAP) { + return; + } + + // printStackTrace(); + + static int call_number = 0; + call_number++; + + const char msg[] = "\n=== HARDWARE BREAKPOINT: REFERENCE COUNT CHANGED ===\n"; + write(STDOUT_FILENO, msg, sizeof(msg) - 1); + + char call_msg[100]; + int len = snprintf(call_msg, sizeof(call_msg), "Call #%d - Address: %p\n", call_number, info->si_addr); + write(STDOUT_FILENO, call_msg, len); + + const int max_frames = 15; + void* buffer[max_frames]; + + const char stack_msg[] = "Stack trace (exact location of reference count change):\n"; + write(STDOUT_FILENO, stack_msg, sizeof(stack_msg) - 1); + + int nframes = backtrace(buffer, max_frames); + + backtrace_symbols_fd(buffer, nframes, STDOUT_FILENO); + + const char end_msg[] = "================================================\n\n"; + write(STDOUT_FILENO, end_msg, sizeof(end_msg) - 1); + + //instance_->signal_count_.fetch_add(1, std::memory_order_relaxed); +} + +void HardwareSharedPtrTracker::printStackTrace() { + const int max_frames = 7; + void* buffer[max_frames]; + + int nframes = backtrace(buffer, max_frames); + char** symbols = backtrace_symbols(buffer, nframes); + + std::cout << "Stack trace (reference count change):\n"; + + for (int i = 0; i < nframes; ++i) { + std::cout << " [" << i << "] " << (symbols ? symbols[i] : "???") << "\n"; + } + + if (symbols) { + free(symbols); + } +} + +void HardwareSharedPtrTracker::checkAndSwitchIfNeeded() { + if (should_stop_.exchange(false)) { + std::cout << "\n=== HARDWARE BREAKPOINT TRIGGERED ===\n"; + printStackTrace(); + + if (auto ptr = current_tracked_ptr_.lock()) { + long count = ptr.use_count(); + std::cout << "Current use_count: " << count << "\n"; + + if (count <= 1) { + std::cout << "Object will be deleted soon (use_count=" << count << ")\n"; + std::cout << "Stopping hardware tracking\n"; + stopTracking(); + } + } else { + std::cout << "Tracked object already deleted\n"; + std::cout << "Stopping hardware tracking\n"; + stopTracking(); + } + + std::cout << "====================================\n\n"; + } +} + +void HardwareSharedPtrTracker::startTracking(std::shared_ptr ptr) { + if (!enabled_) { + return; + } + + if (!current_tracked_ptr_.expired()) { + std::cout << "Already tracking another YamuxedConnection (address: active" + << "), ignoring new request\n"; + return; + } + + stopTracking(); + std::cout << "\n=== HARDWARE TRACKING STARTED ===\n"; + std::cout << "YamuxedConnection address: " << ptr.get() << "\n"; + std::cout << "shared_ptr use_count: " << ptr.use_count() << "\n"; + + void* ref_count_addr = getRefCountAddress(ptr); + if (!ref_count_addr) { + std::cerr << "Failed to get reference count address\n"; + return; + } + + if (!setHardwareWatchpoint(ref_count_addr)) { + std::cerr << "Failed to set hardware watchpoint\n"; + return; + } + + watched_address_ = ref_count_addr; + current_tracked_ptr_ = ptr; + is_tracking_ = true; + should_stop_ = false; + + std::cout << "Hardware tracking activated\n"; + std::cout << "=================================\n\n"; +} + +void HardwareSharedPtrTracker::stopTracking() { + if (!is_tracking_) { + return; + } + + std::cout << "\n=== HARDWARE TRACKING STOPPED ===\n"; + + removeHardwareWatchpoint(); + + watched_address_ = nullptr; + current_tracked_ptr_.reset(); + is_tracking_ = false; + should_stop_ = false; + + std::cout << "Hardware tracking stopped\n"; + std::cout << "=================================\n\n"; +} + +void trackNextYamuxedConnection(std::shared_ptr ptr) { + auto& tracker = HardwareSharedPtrTracker::getInstance(); + tracker.startTracking(std::move(ptr)); +} + + +int main() { + auto& tracker = HardwareSharedPtrTracker::getInstance(); + tracker.enable(); + + std::shared_ptr ptr1 = std::make_shared(1); + trackNextYamuxedConnection(ptr1); + + std::shared_ptr ptr2 = ptr1; + ptr2.reset(); + ptr1.reset(); + + std::shared_ptr ptr = std::make_shared(10); + trackNextYamuxedConnection(ptr); + ptr.reset(); + return 0; +} \ No newline at end of file