diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9df51eb9edc5a..0a6797347f85e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -244,6 +244,7 @@ add_library(bitcoin_node STATIC EXCLUDE_FROM_ALL policy/rbf.cpp policy/settings.cpp policy/truc_policy.cpp + private_broadcast.cpp rest.cpp rpc/blockchain.cpp rpc/external_signer.cpp diff --git a/src/bitcoin-cli.cpp b/src/bitcoin-cli.cpp index 279aa89e88e3b..5cfa2ec4becff 100644 --- a/src/bitcoin-cli.cpp +++ b/src/bitcoin-cli.cpp @@ -452,6 +452,7 @@ class NetinfoRequestHandler : public BaseRequestHandler if (conn_type == "block-relay-only") return "block"; if (conn_type == "manual" || conn_type == "feeler") return conn_type; if (conn_type == "addr-fetch") return "addr"; + if (conn_type == "private-broadcast") return "priv"; return ""; } std::string FormatServices(const UniValue& services) @@ -703,6 +704,7 @@ class NetinfoRequestHandler : public BaseRequestHandler " \"manual\" - peer we manually added using RPC addnode or the -addnode/-connect config options\n" " \"feeler\" - short-lived connection for testing addresses\n" " \"addr\" - address fetch; short-lived connection for requesting addresses\n" + " \"priv\" - private broadcast; short-lived connection for broadcasting our transactions\n" " net Network the peer connected through (\"ipv4\", \"ipv6\", \"onion\", \"i2p\", \"cjdns\", or \"npr\" (not publicly routable))\n" " serv Services offered by the peer\n" " \"n\" - NETWORK: peer can serve the full block chain\n" diff --git a/src/init.cpp b/src/init.cpp index 1a33226b7c800..48ca6dd2ba953 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -543,7 +543,7 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) argsman.AddArg("-forcednsseed", strprintf("Always query for peer addresses via DNS lookup (default: %u)", DEFAULT_FORCEDNSSEED), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listen", strprintf("Accept connections from outside (default: %u if no -proxy, -connect or -maxconnections=0)", DEFAULT_LISTEN), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-listenonion", strprintf("Automatically create Tor onion service (default: %d)", DEFAULT_LISTEN_ONION), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); - argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); + argsman.AddArg("-maxconnections=", strprintf("Maintain at most automatic connections to peers (default: %u). This limit does not apply to connections manually added via -addnode or the addnode RPC, which have a separate limit of %u. It does not apply to short-lived private broadcast connections either, which have a separate limit of %u.", DEFAULT_MAX_PEER_CONNECTIONS, MAX_ADDNODE_CONNECTIONS, MAX_PRIVATE_BROADCAST_CONNECTIONS), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxreceivebuffer=", strprintf("Maximum per-connection receive buffer, *1000 bytes (default: %u)", DEFAULT_MAXRECEIVEBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxsendbuffer=", strprintf("Maximum per-connection memory usage for the send buffer, *1000 bytes (default: %u)", DEFAULT_MAXSENDBUFFER), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); argsman.AddArg("-maxuploadtarget=", strprintf("Tries to keep outbound traffic under the given target per 24h. Limit does not apply to peers with 'download' permission or blocks created within past week. 0 = no limit (default: %s). Optional suffix units [k|K|m|M|g|G|t|T] (default: M). Lowercase is 1000 base while uppercase is 1024 base", DEFAULT_MAX_UPLOAD_TARGET), ArgsManager::ALLOW_ANY, OptionsCategory::CONNECTION); @@ -666,6 +666,15 @@ void SetupServerArgs(ArgsManager& argsman, bool can_listen_ipc) OptionsCategory::NODE_RELAY); argsman.AddArg("-minrelaytxfee=", strprintf("Fees (in %s/kvB) smaller than this are considered zero fee for relaying, mining and transaction creation (default: %s)", CURRENCY_UNIT, FormatMoney(DEFAULT_MIN_RELAY_TX_FEE)), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); + argsman.AddArg("-privatebroadcast", + strprintf( + "Broadcast transactions submitted via sendrawtransaction RPC using short-lived " + "connections through the Tor or I2P networks, without putting them in the mempool first. " + "Transactions submitted through the wallet are not affected by this option " + "(default: %u)", + DEFAULT_PRIVATE_BROADCAST), + ArgsManager::ALLOW_ANY, + OptionsCategory::NODE_RELAY); argsman.AddArg("-whitelistforcerelay", strprintf("Add 'forcerelay' permission to whitelisted peers with default permissions. This will relay transactions even if the transactions were already in the mempool. (default: %d)", DEFAULT_WHITELISTFORCERELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); argsman.AddArg("-whitelistrelay", strprintf("Add 'relay' permission to whitelisted peers with default permissions. This will accept relayed transactions even when not relaying transactions (default: %d)", DEFAULT_WHITELISTRELAY), ArgsManager::ALLOW_ANY, OptionsCategory::NODE_RELAY); @@ -999,11 +1008,14 @@ bool AppInitParameterInteraction(const ArgsManager& args) if (user_max_connection < 0) { return InitError(Untranslated("-maxconnections must be greater or equal than zero")); } + const size_t max_private{args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST) + ? MAX_PRIVATE_BROADCAST_CONNECTIONS + : 0}; // Reserve enough FDs to account for the bare minimum, plus any manual connections, plus the bound interfaces int min_required_fds = MIN_CORE_FDS + MAX_ADDNODE_CONNECTIONS + nBind; // Try raising the FD limit to what we need (available_fds may be smaller than the requested amount if this fails) - available_fds = RaiseFileDescriptorLimit(user_max_connection + min_required_fds); + available_fds = RaiseFileDescriptorLimit(user_max_connection + max_private + min_required_fds); // If we are using select instead of poll, our actual limit may be even smaller #ifndef USE_POLL available_fds = std::min(FD_SETSIZE, available_fds); @@ -1718,13 +1730,13 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } + const bool listenonion{args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)}; if (onion_proxy.IsValid()) { SetProxy(NET_ONION, onion_proxy); } else { // If -listenonion is set, then we will (try to) connect to the Tor control port // later from the torcontrol thread and may retrieve the onion proxy from there. - const bool listenonion_disabled{!args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)}; - if (onlynet_used_with_onion && listenonion_disabled) { + if (onlynet_used_with_onion && !listenonion) { return InitError( _("Outbound connections restricted to Tor (-onlynet=onion) but the proxy for " "reaching the Tor network is not provided: none of -proxy, -onion or " @@ -2100,7 +2112,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.onion_binds.push_back(onion_service_target); } - if (args.GetBoolArg("-listenonion", DEFAULT_LISTEN_ONION)) { + if (listenonion) { if (connOptions.onion_binds.size() > 1) { InitWarning(strprintf(_("More than one onion bind address is provided. Using %s " "for the automatically created Tor onion service."), @@ -2173,6 +2185,32 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) conflict->ToStringAddrPort())); } + if (args.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + // If -listenonion is set, then NET_ONION may not be reachable now + // but may become reachable later, thus only error here if it is not + // reachable and will not become reachable for sure. + const bool onion_may_become_reachable{listenonion && (!args.IsArgSet("-onlynet") || onlynet_used_with_onion)}; + if (!g_reachable_nets.Contains(NET_I2P) && + !g_reachable_nets.Contains(NET_ONION) && + !onion_may_become_reachable) { + return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), " + "but none of Tor or I2P networks is reachable")); + } + if (!connOptions.m_use_addrman_outgoing) { + return InitError(_("Private broadcast of own transactions requested (-privatebroadcast), " + "but -connect is also configured. They are incompatible because the " + "private broadcast needs to open new connections to randomly " + "chosen Tor or I2P peers. Consider using -maxconnections=0 -addnode=... " + "instead")); + } + if (!proxyRandomize && (g_reachable_nets.Contains(NET_ONION) || onion_may_become_reachable)) { + InitWarning(_("Private broadcast of own transactions requested (-privatebroadcast) and " + "-proxyrandomize is disabled. Tor circuits for private broadcast connections " + "may be correlated to other connections over Tor. For maximum privacy set " + "-proxyrandomize=1.")); + } + } + if (!node.connman->Start(scheduler, connOptions)) { return false; } diff --git a/src/logging.cpp b/src/logging.cpp index f9d8f98048819..47b83d1a45bbb 100644 --- a/src/logging.cpp +++ b/src/logging.cpp @@ -202,6 +202,7 @@ static const std::map> LOG_CATEGORIES_ {"scan", BCLog::SCAN}, {"txpackages", BCLog::TXPACKAGES}, {"kernel", BCLog::KERNEL}, + {"privatebroadcast", BCLog::PRIVATE_BROADCAST}, }; static const std::unordered_map LOG_CATEGORIES_BY_FLAG{ diff --git a/src/logging.h b/src/logging.h index defff61d3008a..2061950ddf27c 100644 --- a/src/logging.h +++ b/src/logging.h @@ -95,6 +95,7 @@ namespace BCLog { SCAN = (CategoryMask{1} << 27), TXPACKAGES = (CategoryMask{1} << 28), KERNEL = (CategoryMask{1} << 29), + PRIVATE_BROADCAST = (CategoryMask{1} << 30), ALL = ~NONE, }; enum class Level { diff --git a/src/net.cpp b/src/net.cpp index d335f2dc526ec..c258dcf94e9b1 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -354,7 +354,16 @@ bool CConnman::CheckIncomingNonce(uint64_t nonce) { LOCK(m_nodes_mutex); for (const CNode* pnode : m_nodes) { - if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && pnode->GetLocalNonce() == nonce) + // Omit private broadcast connections from this check to prevent this privacy attack: + // - We connect to a peer in an attempt to privately broadcast a transaction. From our + // VERSION message the peer deducts that this is a short-lived connection for + // broadcasting a transaction, takes our nonce and delays their VERACK. + // - The peer starts connecting to (clearnet) nodes and sends them a VERSION message + // which contains our nonce. If the peer manages to connect to us we would disconnect. + // - Upon a disconnect, the peer knows our clearnet address. They go back to the short + // lived privacy broadcast connection and continue with VERACK. + if (!pnode->fSuccessfullyConnected && !pnode->IsInboundConn() && !pnode->IsPrivateBroadcastConn() && + pnode->GetLocalNonce() == nonce) return false; } return true; @@ -458,7 +467,10 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, i2p::Connection conn; bool connected{false}; - if (m_i2p_sam_session) { + // If an I2P SAM session already exists, normally we would re-use it. But in the case of + // private broadcast we force a new transient session. A Connect() using m_i2p_sam_session + // would use our permanent I2P address as a source address. + if (m_i2p_sam_session && conn_type != ConnectionType::PRIVATE_BROADCAST) { connected = m_i2p_sam_session->Connect(target_addr, conn, proxyConnectionFailed); } else { { @@ -1876,6 +1888,7 @@ bool CConnman::AddConnection(const std::string& address, ConnectionType conn_typ switch (conn_type) { case ConnectionType::INBOUND: case ConnectionType::MANUAL: + case ConnectionType::PRIVATE_BROADCAST: return false; case ConnectionType::OUTBOUND_FULL_RELAY: max_connections = m_max_outbound_full_relay; @@ -2665,6 +2678,7 @@ void CConnman::ThreadOpenConnections(const std::vector connect, std // peers from addrman. case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: + case ConnectionType::PRIVATE_BROADCAST: break; case ConnectionType::MANUAL: case ConnectionType::OUTBOUND_FULL_RELAY: @@ -3049,6 +3063,73 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, return true; } +std::optional CConnman::PrivateBroadcast::PickNetwork(std::optional& proxy) const +{ + prevector<4, Network> nets; + std::optional clearnet_proxy; + if (g_reachable_nets.Contains(NET_ONION)) { + nets.push_back(NET_ONION); + + clearnet_proxy = ProxyForIPv4or6(); + if (clearnet_proxy.has_value()) { + if (g_reachable_nets.Contains(NET_IPV4)) { + nets.push_back(NET_IPV4); + } + if (g_reachable_nets.Contains(NET_IPV6)) { + nets.push_back(NET_IPV6); + } + } + } + if (g_reachable_nets.Contains(NET_I2P)) { + nets.push_back(NET_I2P); + } + + if (nets.empty()) { + return std::nullopt; + } + + const Network net{nets[FastRandomContext{}.randrange(nets.size())]}; + if (net == NET_IPV4 || net == NET_IPV6) { + proxy = clearnet_proxy; + } + return net; +} + +size_t CConnman::PrivateBroadcast::NumToOpen() const +{ + return m_num_to_open; +} + +void CConnman::PrivateBroadcast::NumToOpenAdd(size_t n) +{ + m_num_to_open += n; + m_num_to_open.notify_all(); +} + +size_t CConnman::PrivateBroadcast::NumToOpenSub(size_t n) +{ + size_t current_value{m_num_to_open.load()}; + size_t new_value; + do { + new_value = current_value > n ? current_value - n : 0; + } while (!m_num_to_open.compare_exchange_weak(current_value, new_value)); + return new_value; +} + +void CConnman::PrivateBroadcast::NumToOpenWait() const +{ + m_num_to_open.wait(0); +} + +std::optional CConnman::PrivateBroadcast::ProxyForIPv4or6() const +{ + Proxy tor_proxy; + if (m_outbound_tor_ok_at_least_once.load() && GetProxy(NET_ONION, tor_proxy)) { + return tor_proxy; + } + return std::nullopt; +} + Mutex NetEventsInterface::g_msgproc_mutex; void CConnman::ThreadMessageHandler() @@ -3133,6 +3214,85 @@ void CConnman::ThreadI2PAcceptIncoming() } } +void CConnman::ThreadPrivateBroadcast() +{ + AssertLockNotHeld(m_unused_i2p_sessions_mutex); + + size_t addrman_num_bad_addresses{0}; + while (!m_interrupt_net->interrupted()) { + + if (!fNetworkActive) { + m_interrupt_net->sleep_for(5s); + continue; + } + + CountingSemaphoreGrant<> conn_max_grant{m_private_broadcast.m_sem_conn_max}; // Would block if too many are opened. + + m_private_broadcast.NumToOpenWait(); + + if (m_interrupt_net->interrupted()) { + break; + } + + std::optional proxy; + const std::optional net{m_private_broadcast.PickNetwork(proxy)}; + if (!net.has_value()) { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Warning, + "Connections needed but none of the Tor or I2P networks is reachable"); + m_interrupt_net->sleep_for(5s); + continue; + } + + const auto [addr, _] = addrman.Select(/*new_only=*/false, {net.value()}); + + if (!addr.IsValid() || IsLocal(addr)) { + ++addrman_num_bad_addresses; + if (addrman_num_bad_addresses > 100) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Connections needed but addrman keeps returning bad addresses, will retry"); + m_interrupt_net->sleep_for(500ms); + } + continue; + } + addrman_num_bad_addresses = 0; + + auto target_str{addr.ToStringAddrPort()}; + if (proxy.has_value()) { + target_str += " through the proxy at " + proxy->ToString(); + } + + const bool use_v2transport(addr.nServices & GetLocalServices() & NODE_P2P_V2); + + if (OpenNetworkConnection(addr, + /*fCountFailure=*/true, + std::move(conn_max_grant), + /*pszDest=*/nullptr, + ConnectionType::PRIVATE_BROADCAST, + use_v2transport, + proxy)) { + const size_t remaining{m_private_broadcast.NumToOpenSub(1)}; + LogDebug(BCLog::PRIVATE_BROADCAST, + "Socket connected to %s; remaining connections to open: %d", + target_str, + remaining); + } else { + const size_t remaining{m_private_broadcast.NumToOpen()}; + if (remaining == 0) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Failed to connect to %s, will not retry, no more connections needed", + target_str); + } else { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Failed to connect to %s, will retry to a different address; remaining connections " + "to open: %d", + target_str, + remaining); + } + } + } +} + bool CConnman::BindListenPort(const CService& addrBind, bilingual_str& strError, NetPermissionFlags permissions) { int nOne = 1; @@ -3413,6 +3573,11 @@ bool CConnman::Start(CScheduler& scheduler, const Options& connOptions) std::thread(&util::TraceThread, "i2paccept", [this] { ThreadI2PAcceptIncoming(); }); } + if (gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)) { + threadPrivateBroadcast = + std::thread(&util::TraceThread, "privbcast", [this] { ThreadPrivateBroadcast(); }); + } + // Dump network addresses scheduler.scheduleEvery([this] { DumpAddresses(); }, DUMP_PEERS_INTERVAL); @@ -3462,10 +3627,16 @@ void CConnman::Interrupt() semAddnode->release(); } } + + m_private_broadcast.m_sem_conn_max.release(); + m_private_broadcast.NumToOpenAdd(1); // Just unblock NumToOpenWait() to be able to continue with shutdown. } void CConnman::StopThreads() { + if (threadPrivateBroadcast.joinable()) { + threadPrivateBroadcast.join(); + } if (threadI2PAcceptIncoming.joinable()) { threadI2PAcceptIncoming.join(); } @@ -3895,6 +4066,30 @@ bool CConnman::NodeFullyConnected(const CNode* pnode) void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { AssertLockNotHeld(m_total_bytes_sent_mutex); + + if (pnode->IsPrivateBroadcastConn() && + msg.m_type != NetMsgType::VERSION && + msg.m_type != NetMsgType::VERACK && + msg.m_type != NetMsgType::INV && + msg.m_type != NetMsgType::TX && + msg.m_type != NetMsgType::PING) { + // Ensure private broadcast connections only send the above message types. + // Others are not needed and may degrade privacy. + LogDebug(BCLog::PRIVATE_BROADCAST, + "Omitting send of message '%s', peer=%d%s", + msg.m_type, + pnode->GetId(), + pnode->LogIP(fLogIPs)); + return; + } + + if (!m_private_broadcast.m_outbound_tor_ok_at_least_once.load() && !pnode->IsInboundConn() && + pnode->addr.IsTor() && msg.m_type == NetMsgType::VERACK) { + // If we are sending the peer VERACK that means we successfully sent + // and received another message to/from that peer (VERSION). + m_private_broadcast.m_outbound_tor_ok_at_least_once.store(true); + } + size_t nMessageSize = msg.data.size(); LogDebug(BCLog::NET, "sending %s (%d bytes) peer=%d\n", msg.m_type, nMessageSize, pnode->GetId()); if (gArgs.GetBoolArg("-capturemessages", false)) { diff --git a/src/net.h b/src/net.h index 25cb8236a3c26..dbc01d8a031be 100644 --- a/src/net.h +++ b/src/net.h @@ -73,6 +73,8 @@ static const int MAX_ADDNODE_CONNECTIONS = 8; static const int MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2; /** Maximum number of feeler connections */ static const int MAX_FEELER_CONNECTIONS = 1; +/** Maximum number of private broadcast connections */ +static constexpr size_t MAX_PRIVATE_BROADCAST_CONNECTIONS{64}; /** -listen default */ static const bool DEFAULT_LISTEN = true; /** The maximum number of peer connections to maintain. */ @@ -83,6 +85,8 @@ static const std::string DEFAULT_MAX_UPLOAD_TARGET{"0M"}; static const bool DEFAULT_BLOCKSONLY = false; /** -peertimeout default */ static const int64_t DEFAULT_PEER_CONNECT_TIMEOUT = 60; +/** Default for -privatebroadcast. */ +static constexpr bool DEFAULT_PRIVATE_BROADCAST{false}; /** Number of file descriptors required for message capture **/ static const int NUM_FDS_MESSAGE_CAPTURE = 1; /** Interval for ASMap Health Check **/ @@ -773,6 +777,7 @@ class CNode case ConnectionType::MANUAL: case ConnectionType::ADDR_FETCH: case ConnectionType::FEELER: + case ConnectionType::PRIVATE_BROADCAST: return false; } // no default case, so the compiler can warn about missing cases @@ -794,6 +799,7 @@ class CNode case ConnectionType::FEELER: case ConnectionType::BLOCK_RELAY: case ConnectionType::ADDR_FETCH: + case ConnectionType::PRIVATE_BROADCAST: return false; case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::MANUAL: @@ -815,6 +821,10 @@ class CNode return m_conn_type == ConnectionType::ADDR_FETCH; } + bool IsPrivateBroadcastConn() const { + return m_conn_type == ConnectionType::PRIVATE_BROADCAST; + } + bool IsInboundConn() const { return m_conn_type == ConnectionType::INBOUND; } @@ -828,6 +838,7 @@ class CNode case ConnectionType::OUTBOUND_FULL_RELAY: case ConnectionType::BLOCK_RELAY: case ConnectionType::ADDR_FETCH: + case ConnectionType::PRIVATE_BROADCAST: return true; } // no default case, so the compiler can warn about missing cases @@ -1171,6 +1182,70 @@ class CConnman const std::optional& proxy_override = std::nullopt) EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); + /// Group of private broadcast related members. + class PrivateBroadcast + { + public: + /** + * Remember if we ever established at least one outbound connection to a + * Tor peer, including sending and receiving P2P messages. If this is + * true then the Tor proxy indeed works and is a proxy to the Tor network, + * not a misconfigured ordinary SOCKS5 proxy as -proxy or -onion. If that + * is the case, then we assume that connecting to an IPv4 or IPv6 address + * via that proxy will be done through the Tor network and a Tor exit node. + */ + std::atomic_bool m_outbound_tor_ok_at_least_once{false}; + + /** + * Semaphore used to guard against opening too many connections. + * Opening private broadcast connections will be paused if this is equal to 0. + */ + std::counting_semaphore<> m_sem_conn_max{MAX_PRIVATE_BROADCAST_CONNECTIONS}; + + /** + * Choose a network to open a connection to. + * @param[out] proxy Optional proxy to override the normal proxy selection. + * Will be set if !std::nullopt is returned. Could be set to `std::nullopt` + * if there is no need to override the proxy that would be used for connecting + * to the returned network. + * @retval std::nullopt No network could be selected. + * @retval !std::nullopt The network was selected and `proxy` is set (maybe to `std::nullopt`). + */ + std::optional PickNetwork(std::optional& proxy) const; + + /// Get the pending number of connections to open. + size_t NumToOpen() const; + + /** + * Increment the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Increment by this number. + */ + void NumToOpenAdd(size_t n); + + /** + * Decrement the number of new connections of type `ConnectionType::PRIVATE_BROADCAST` + * to be opened by `CConnman::ThreadPrivateBroadcast()`. + * @param[in] n Decrement by this number. + * @return The number of connections that remain to be opened after the operation. + */ + size_t NumToOpenSub(size_t n); + + /// Wait for the number of needed connections to become greater than 0. + void NumToOpenWait() const; + + private: + /** + * Check if private broadcast can be done to IPv4 or IPv6 peers and if so via which proxy. + * If private broadcast connections should not be opened to IPv4 or IPv6, then this will + * return an empty optional. + */ + std::optional ProxyForIPv4or6() const; + + /// Number of `ConnectionType::PRIVATE_BROADCAST` connections to open. + std::atomic_size_t m_num_to_open{0}; + } m_private_broadcast; + bool CheckIncomingNonce(uint64_t nonce); void ASMapHealthCheck(); @@ -1345,6 +1420,7 @@ class CConnman void ThreadOpenConnections(std::vector connect, std::span seed_nodes) EXCLUSIVE_LOCKS_REQUIRED(!m_addr_fetches_mutex, !m_added_nodes_mutex, !m_nodes_mutex, !m_unused_i2p_sessions_mutex, !m_reconnections_mutex); void ThreadMessageHandler() EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadI2PAcceptIncoming(); + void ThreadPrivateBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_unused_i2p_sessions_mutex); void AcceptConnection(const ListenSocket& hListenSocket); /** @@ -1635,6 +1711,7 @@ class CConnman std::thread threadOpenConnections; std::thread threadMessageHandler; std::thread threadI2PAcceptIncoming; + std::thread threadPrivateBroadcast; /** flag for deciding to connect to an extra outbound peer, * in excess of m_max_outbound_full_relay diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 2ef43b8dc9afd..af5e1c7b39a5e 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -195,6 +196,10 @@ static constexpr double MAX_ADDR_RATE_PER_SECOND{0.1}; static constexpr size_t MAX_ADDR_PROCESSING_TOKEN_BUCKET{MAX_ADDR_TO_SEND}; /** The compactblocks version we support. See BIP 152. */ static constexpr uint64_t CMPCTBLOCKS_VERSION{2}; +/** For private broadcast, send a transaction to this many peers. */ +static constexpr size_t NUM_PRIVATE_BROADCAST_PER_TX{3}; +/** Private broadcast connections must complete within this time. Disconnect the peer if it takes longer. */ +static constexpr auto PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME{3min}; // Internal stuff namespace { @@ -536,7 +541,8 @@ class PeerManagerImpl final : public PeerManager std::vector GetOrphanTransactions() override EXCLUSIVE_LOCKS_REQUIRED(!m_tx_download_mutex); PeerManagerInfo GetInfo() const override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SendPings() override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); - void RelayTransaction(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + void InitiateTxBroadcastPrivate(const CTransactionRef& tx) override EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); void SetBestBlock(int height, std::chrono::seconds time) override { m_best_height = height; @@ -559,6 +565,9 @@ class PeerManagerImpl final : public PeerManager /** Retrieve unbroadcast transactions from the mempool and reattempt sending to peers */ void ReattemptInitialBroadcast(CScheduler& scheduler) EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); + /** Rebroadcast stale private transactions (already broadcast but not received back from the network). */ + void ReattemptPrivateBroadcast(CScheduler& scheduler); + /** Get a shared pointer to the Peer object. * May return an empty shared_ptr if the Peer object can't be found. */ PeerRef GetPeerRef(NodeId id) const EXCLUSIVE_LOCKS_REQUIRED(!m_peer_mutex); @@ -718,8 +727,8 @@ class PeerManagerImpl final : public PeerManager /** Send a version message to a peer */ void PushNodeVersion(CNode& pnode, const Peer& peer); - /** Send a ping message every PING_INTERVAL or if requested via RPC. May - * mark the peer to be disconnected if a ping has timed out. + /** Send a ping message every PING_INTERVAL or if requested via RPC (peer.m_ping_queued is true). + * May mark the peer to be disconnected if a ping has timed out. * We use mockable time for ping timeouts, so setmocktime may cause pings * to time out. */ void MaybeSendPing(CNode& node_to, Peer& peer, std::chrono::microseconds now); @@ -957,6 +966,14 @@ class PeerManagerImpl final : public PeerManager void ProcessCompactBlockTxns(CNode& pfrom, Peer& peer, const BlockTransactions& block_transactions) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + /** + * Schedule an INV for a transaction to be sent to the given peer (via `PushMessage()`). + * The transaction is picked from the list of transactions for private broadcast. + * It is assumed that the connection to the peer is `ConnectionType::PRIVATE_BROADCAST`. + * Calling this for other peers will degrade privacy. Don't do that. + */ + void PushPrivateBroadcastTx(CNode& node) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex, !m_most_recent_block_mutex); + /** * When a peer sends us a valid block, instruct it to announce blocks to us * using CMPCTBLOCK if possible by adding its nodeid to the end of @@ -1069,6 +1086,9 @@ class PeerManagerImpl final : public PeerManager void PushAddress(Peer& peer, const CAddress& addr) EXCLUSIVE_LOCKS_REQUIRED(g_msgproc_mutex); void LogBlockHeader(const CBlockIndex& index, const CNode& peer, bool via_compact_block); + + /// A list of transactions to be broadcast privately. + PrivateBroadcast m_tx_for_private_broadcast; }; const CNodeState* PeerManagerImpl::State(NodeId pnode) const @@ -1520,26 +1540,64 @@ void PeerManagerImpl::FindNextBlocks(std::vector& vBlocks, c void PeerManagerImpl::PushNodeVersion(CNode& pnode, const Peer& peer) { - uint64_t my_services{peer.m_our_services}; - const int64_t nTime{count_seconds(GetTime())}; - uint64_t nonce = pnode.GetLocalNonce(); - const int nNodeStartingHeight{m_best_height}; - NodeId nodeid = pnode.GetId(); - CAddress addr = pnode.addr; - - CService addr_you = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? addr : CService(); - uint64_t your_services{addr.nServices}; - - const bool tx_relay{!RejectIncomingTxs(pnode)}; - MakeAndPushMessage(pnode, NetMsgType::VERSION, PROTOCOL_VERSION, my_services, nTime, - your_services, CNetAddr::V1(addr_you), // Together the pre-version-31402 serialization of CAddress "addrYou" (without nTime) - my_services, CNetAddr::V1(CService{}), // Together the pre-version-31402 serialization of CAddress "addrMe" (without nTime) - nonce, strSubVersion, nNodeStartingHeight, tx_relay); - + uint64_t my_services; + int64_t my_time; + uint64_t your_services; + CService your_addr; + std::string my_user_agent; + int my_height; + bool my_tx_relay; + if (pnode.IsPrivateBroadcastConn()) { + my_services = NODE_NONE; + my_time = 0; + your_services = NODE_NONE; + your_addr = CService{}; + my_user_agent = "/pynode:0.0.1/"; // Use a constant other than the default (or user-configured). See https://github.com/bitcoin/bitcoin/pull/27509#discussion_r1214671917 + my_height = 0; + my_tx_relay = false; + } else { + CAddress addr{pnode.addr}; + + my_services = peer.m_our_services; + my_time = count_seconds(GetTime()); + your_services = addr.nServices; + your_addr = addr.IsRoutable() && !IsProxy(addr) && addr.IsAddrV1Compatible() ? CService{addr} : CService{}; + my_user_agent = strSubVersion; + my_height = m_best_height; + my_tx_relay = !RejectIncomingTxs(pnode); + } + + MakeAndPushMessage( + pnode, + NetMsgType::VERSION, + PROTOCOL_VERSION, + my_services, + my_time, + // your_services + CNetAddr::V1(your_addr) is the pre-version-31402 serialization of your_addr (without nTime) + your_services, CNetAddr::V1(your_addr), + // same, for a dummy address + my_services, CNetAddr::V1(CService{}), + pnode.GetLocalNonce(), + my_user_agent, + my_height, + my_tx_relay); + + const NodeId nodeid{pnode.GetId()}; if (fLogIPs) { - LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addr_you.ToStringAddrPort(), tx_relay, nodeid); + LogDebug(BCLog::NET, + "send version message: version %d, blocks=%d, them=%s, txrelay=%d, peer=%d", + PROTOCOL_VERSION, + my_height, + your_addr.ToStringAddrPort(), + my_tx_relay, + nodeid); } else { - LogDebug(BCLog::NET, "send version message: version %d, blocks=%d, txrelay=%d, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, tx_relay, nodeid); + LogDebug(BCLog::NET, + "send version message: version %d, blocks=%d, txrelay=%d, peer=%d", + PROTOCOL_VERSION, + my_height, + my_tx_relay, + nodeid); } } @@ -1570,6 +1628,13 @@ void PeerManagerImpl::InitializeNode(const CNode& node, ServiceFlags our_service } } +/** Calculate the delta time after which to run the next transactions broadcast. */ +static std::chrono::milliseconds NextTxBroadcast() +{ + // We add randomness on every cycle to avoid the possibility of P2P fingerprinting. + return 10min + FastRandomContext().randrange(5min); +} + void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) { std::set unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); @@ -1578,16 +1643,61 @@ void PeerManagerImpl::ReattemptInitialBroadcast(CScheduler& scheduler) CTransactionRef tx = m_mempool.get(txid); if (tx != nullptr) { - RelayTransaction(txid, tx->GetWitnessHash()); + InitiateTxBroadcastToAll(txid, tx->GetWitnessHash()); } else { m_mempool.RemoveUnbroadcastTx(txid, true); } } - // Schedule next run for 10-15 minutes in the future. - // We add randomness on every cycle to avoid the possibility of P2P fingerprinting. - const auto delta = 10min + FastRandomContext().randrange(5min); - scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, NextTxBroadcast()); +} + +void PeerManagerImpl::ReattemptPrivateBroadcast(CScheduler& scheduler) +{ + // The following heuristic is subject to races, but that is ok: if it overshoots, + // we will open some private connections in vain, if it undershoots, the stale + // transactions will be picked on the next run. + + size_t active_connections{0}; + m_connman.ForEachNode([&active_connections](const CNode* node) { + if (node->IsPrivateBroadcastConn()) { + ++active_connections; + } + }); + + const size_t to_open_connections{m_connman.m_private_broadcast.NumToOpen()}; + + // Remove stale transactions that are no longer relevant (e.g. already in + // the mempool or mined) and count the remaining ones. + size_t num_for_rebroadcast{0}; + const auto stale_txs = m_tx_for_private_broadcast.GetStale(); + { + LOCK(cs_main); + for (const auto& stale_tx : stale_txs) { + auto mempool_acceptable = m_chainman.ProcessTransaction(stale_tx, /*test_accept=*/true); + if (mempool_acceptable.m_result_type == MempoolAcceptResult::ResultType::VALID) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Reattempting broadcast of stale txid=%s wtxid=%s", + stale_tx->GetHash().ToString(), + stale_tx->GetWitnessHash().ToString()); + ++num_for_rebroadcast; + } else { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Giving up broadcast attempts for txid=%s wtxid=%s: %s", + stale_tx->GetHash().ToString(), + stale_tx->GetWitnessHash().ToString(), + mempool_acceptable.m_state.ToString()); + m_tx_for_private_broadcast.Remove(stale_tx); + } + } + } + + if (num_for_rebroadcast > active_connections + to_open_connections) { + m_connman.m_private_broadcast.NumToOpenAdd(num_for_rebroadcast - active_connections - to_open_connections); + } + + scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, NextTxBroadcast()); } void PeerManagerImpl::FinalizeNode(const CNode& node) @@ -1647,16 +1757,25 @@ void PeerManagerImpl::FinalizeNode(const CNode& node) } } // cs_main if (node.fSuccessfullyConnected && - !node.IsBlockOnlyConn() && !node.IsInboundConn()) { + !node.IsBlockOnlyConn() && !node.IsPrivateBroadcastConn() && !node.IsInboundConn()) { // Only change visible addrman state for full outbound peers. We don't // call Connected() for feeler connections since they don't have - // fSuccessfullyConnected set. + // fSuccessfullyConnected set. Also don't call Connected() for private broadcast + // connections since they could leak information in addrman. m_addrman.Connected(node.addr); } { LOCK(m_headers_presync_mutex); m_headers_presync_stats.erase(nodeid); } + if (node.IsPrivateBroadcastConn()) { + // FinishBroadcast() is called when we get a PONG from the peer which means that the send + // has concluded successfully. Call FinishBroadcast() here as well in case we did not call + // it before (unsuccessful, never concluded with the reception of a PONG). + if (m_tx_for_private_broadcast.FinishBroadcast(nodeid, /*confirmed_by_node=*/false)) { + m_connman.m_private_broadcast.NumToOpenAdd(1); + } + } LogDebug(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); } @@ -1918,9 +2037,9 @@ void PeerManagerImpl::StartScheduledTasks(CScheduler& scheduler) static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); scheduler.scheduleEvery([this] { this->CheckForStaleTipAndEvictPeers(); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL}); - // schedule next run for 10-15 minutes in the future - const auto delta = 10min + FastRandomContext().randrange(5min); - scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); + scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, NextTxBroadcast()); + + scheduler.scheduleFromNow([&] { ReattemptPrivateBroadcast(scheduler); }, NextTxBroadcast()); } void PeerManagerImpl::ActiveTipChange(const CBlockIndex& new_tip, bool is_ibd) @@ -2123,7 +2242,7 @@ void PeerManagerImpl::SendPings() for(auto& it : m_peer_map) it.second->m_ping_queued = true; } -void PeerManagerImpl::RelayTransaction(const Txid& txid, const Wtxid& wtxid) +void PeerManagerImpl::InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) { LOCK(m_peer_mutex); for(auto& it : m_peer_map) { @@ -2146,6 +2265,25 @@ void PeerManagerImpl::RelayTransaction(const Txid& txid, const Wtxid& wtxid) } } +void PeerManagerImpl::InitiateTxBroadcastPrivate(const CTransactionRef& tx) +{ + if (m_tx_for_private_broadcast.Add(tx)) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Requesting %d new connections due to txid=%s, wtxid=%s", + NUM_PRIVATE_BROADCAST_PER_TX, + tx->GetHash().ToString(), + tx->GetWitnessHash().ToString()); + + m_connman.m_private_broadcast.NumToOpenAdd(NUM_PRIVATE_BROADCAST_PER_TX); + } else { + LogDebug( + BCLog::PRIVATE_BROADCAST, + "Ignoring unnecessary request to schedule an already scheduled transaction: txid=%s, wtxid=%s", + tx->GetHash().ToString(), + tx->GetWitnessHash().ToString()); + } +} + void PeerManagerImpl::RelayAddress(NodeId originator, const CAddress& addr, bool fReachable) @@ -3023,7 +3161,7 @@ void PeerManagerImpl::ProcessValidTx(NodeId nodeid, const CTransactionRef& tx, c tx->GetWitnessHash().ToString(), m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000); - RelayTransaction(tx->GetHash(), tx->GetWitnessHash()); + InitiateTxBroadcastToAll(tx->GetHash(), tx->GetWitnessHash()); for (const CTransactionRef& removedTx : replaced_transactions) { AddToCompactExtraTransactions(removedTx); @@ -3417,6 +3555,34 @@ void PeerManagerImpl::LogBlockHeader(const CBlockIndex& index, const CNode& peer } } +void PeerManagerImpl::PushPrivateBroadcastTx(CNode& node) +{ + Assume(node.IsPrivateBroadcastConn()); + + auto opt_tx = m_tx_for_private_broadcast.GetTxForBroadcast(); + if (!opt_tx) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Disconnecting: no more transactions for private broadcast (connected in vain), peer=%d%s", + node.GetId(), + node.LogIP(fLogIPs)); + node.fDisconnect = true; + return; + } + const CTransactionRef& tx{*opt_tx}; + + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "P2P handshake completed, sending INV for txid=%s%s, peer=%d%s", + tx->GetHash().ToString(), + tx->HasWitness() ? strprintf(", wtxid=%s", tx->GetWitnessHash().ToString()) : "", + node.GetId(), + node.LogIP(fLogIPs)); + + MakeAndPushMessage(node, NetMsgType::INV, std::vector{{CInv{MSG_TX, tx->GetHash().ToUint256()}}}); + + m_tx_for_private_broadcast.PushedToNode(node.GetId(), tx->GetHash()); +} + void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, DataStream& vRecv, const std::chrono::microseconds time_received, const std::atomic& interruptMsgProc) @@ -3515,19 +3681,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, pfrom.SetCommonVersion(greatest_common_version); pfrom.nVersion = nVersion; - if (greatest_common_version >= WTXID_RELAY_VERSION) { - MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); - } - - // Signal ADDRv2 support (BIP155). - if (greatest_common_version >= 70016) { - // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some - // implementations reject messages they don't know. As a courtesy, don't send - // it to nodes with a version before 70016, as no software is known to support - // BIP155 that doesn't announce at least that protocol version number. - MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2); - } - pfrom.m_has_all_wanted_services = HasAllDesirableServiceFlags(nServices); peer->m_their_services = nServices; pfrom.SetAddrLocal(addrMe); @@ -3554,6 +3707,39 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (fRelay) pfrom.m_relays_txs = true; } + const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; + LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", + cleanSubVer, pfrom.nVersion, + peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), + pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); + + if (pfrom.IsPrivateBroadcastConn()) { + if (fRelay) { + MakeAndPushMessage(pfrom, NetMsgType::VERACK); + } else { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: does not support transactions relay (connected in vain), peer=%d%s", + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + pfrom.fDisconnect = true; + } + return; + } + + if (greatest_common_version >= WTXID_RELAY_VERSION) { + MakeAndPushMessage(pfrom, NetMsgType::WTXIDRELAY); + } + + // Signal ADDRv2 support (BIP155). + if (greatest_common_version >= 70016) { + // BIP155 defines addrv2 and sendaddrv2 for all protocol versions, but some + // implementations reject messages they don't know. As a courtesy, don't send + // it to nodes with a version before 70016, as no software is known to support + // BIP155 that doesn't announce at least that protocol version number. + MakeAndPushMessage(pfrom, NetMsgType::SENDADDRV2); + } + if (greatest_common_version >= WTXID_RELAY_VERSION && m_txreconciliation) { // Per BIP-330, we announce txreconciliation support if: // - protocol version per the peer's VERSION message supports WTXID_RELAY; @@ -3619,12 +3805,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, m_addrman.Good(pfrom.addr); } - const auto mapped_as{m_connman.GetMappedAS(pfrom.addr)}; - LogDebug(BCLog::NET, "receive version message: %s: version %d, blocks=%d, us=%s, txrelay=%d, peer=%d%s%s\n", - cleanSubVer, pfrom.nVersion, - peer->m_starting_height, addrMe.ToStringAddrPort(), fRelay, pfrom.GetId(), - pfrom.LogIP(fLogIPs), (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); - peer->m_time_offset = NodeSeconds{std::chrono::seconds{nTime}} - Now(); if (!pfrom.IsInboundConn()) { // Don't use timedata samples from inbound peers to make it @@ -3671,6 +3851,31 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, (mapped_as ? strprintf(", mapped_as=%d", mapped_as) : "")); } + if (auto tx_relay = peer->GetTxRelay()) { + // `TxRelay::m_tx_inventory_to_send` must be empty before the + // version handshake is completed as + // `TxRelay::m_next_inv_send_time` is first initialised in + // `SendMessages` after the verack is received. Any transactions + // received during the version handshake would otherwise + // immediately be advertised without random delay, potentially + // leaking the time of arrival to a spy. + Assume(WITH_LOCK( + tx_relay->m_tx_inventory_mutex, + return tx_relay->m_tx_inventory_to_send.empty() && + tx_relay->m_next_inv_send_time == 0s)); + } + + if (pfrom.IsPrivateBroadcastConn()) { + pfrom.fSuccessfullyConnected = true; + // The peer may intend to later send us NetMsgType::FEEFILTER limiting + // cheap transactions, but we don't wait for that and thus we may send + // them a transaction below their threshold. This is ok because this + // relay logic is designed to work even in cases when the peer drops + // the transaction (due to it being too cheap, or for other reasons). + PushPrivateBroadcastTx(pfrom); + return; + } + if (pfrom.GetCommonVersion() >= SHORT_IDS_BLOCKS_VERSION) { // Tell our peer we are willing to provide version 2 cmpctblocks. // However, we do not request new block announcements using @@ -3689,20 +3894,6 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } } - if (auto tx_relay = peer->GetTxRelay()) { - // `TxRelay::m_tx_inventory_to_send` must be empty before the - // version handshake is completed as - // `TxRelay::m_next_inv_send_time` is first initialised in - // `SendMessages` after the verack is received. Any transactions - // received during the version handshake would otherwise - // immediately be advertised without random delay, potentially - // leaking the time of arrival to a spy. - Assume(WITH_LOCK( - tx_relay->m_tx_inventory_mutex, - return tx_relay->m_tx_inventory_to_send.empty() && - tx_relay->m_next_inv_send_time == 0s)); - } - { LOCK2(::cs_main, m_tx_download_mutex); const CNodeState* state = State(pfrom.GetId()); @@ -3836,6 +4027,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, return; } + if (pfrom.IsPrivateBroadcastConn()) { + if (msg_type != NetMsgType::PONG && msg_type != NetMsgType::GETDATA) { + LogDebug(BCLog::PRIVATE_BROADCAST, + "Ignoring incoming message '%s', peer=%d%s", + msg_type, + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + return; + } + } + if (msg_type == NetMsgType::ADDR || msg_type == NetMsgType::ADDRV2) { const auto ser_params{ msg_type == NetMsgType::ADDRV2 ? @@ -4039,6 +4241,39 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, LogDebug(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId()); } + if (pfrom.IsPrivateBroadcastConn()) { + const auto pushed_tx_opt = m_tx_for_private_broadcast.GetTxPushedToNode(pfrom.GetId()); + if (!pushed_tx_opt) { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: got GETDATA without sending an INV, peer=%d%s", + pfrom.GetId(), + fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); + pfrom.fDisconnect = true; + return; + } + + const CTransactionRef& pushed_tx{*pushed_tx_opt}; + + // The GETDATA request must contain exactly one inv and it must be for the transaction + // that we INVed to the peer earlier. + if (vInv.size() == 1 && vInv[0].IsMsgTx() && vInv[0].hash == pushed_tx->GetHash().ToUint256()) { + + MakeAndPushMessage(pfrom, NetMsgType::TX, TX_WITH_WITNESS(*pushed_tx)); + + peer->m_ping_queued = true; // Ensure a ping will be sent: mimic a request via RPC. + MaybeSendPing(pfrom, *peer, GetTime()); + } else { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: got an unexpected GETDATA message, peer=%d%s", + pfrom.GetId(), + fLogIPs ? strprintf(", peeraddr=%s", pfrom.addr.ToStringAddrPort()) : ""); + pfrom.fDisconnect = true; + } + return; + } + { LOCK(peer->m_getdata_requests_mutex); peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end()); @@ -4276,6 +4511,21 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, const uint256& hash = peer->m_wtxid_relay ? wtxid.ToUint256() : txid.ToUint256(); AddKnownTx(*peer, hash); + if (auto num_broadcasted = m_tx_for_private_broadcast.Remove(ptx)) { + LogPrintLevel(BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Received our privately broadcast transaction (txid=%s) from the " + "network from peer=%d%s; stopping private broadcast attempts", + txid.ToString(), + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + if (NUM_PRIVATE_BROADCAST_PER_TX > num_broadcasted.value()) { + // Not all of the initial NUM_PRIVATE_BROADCAST_PER_TX connections were needed. + // Tell CConnman it does not need to start the remaining ones. + m_connman.m_private_broadcast.NumToOpenSub(NUM_PRIVATE_BROADCAST_PER_TX - num_broadcasted.value()); + } + } + LOCK2(cs_main, m_tx_download_mutex); const auto& [should_validate, package_to_validate] = m_txdownloadman.ReceivedTx(pfrom.GetId(), ptx); @@ -4290,7 +4540,7 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, } else { LogPrintf("Force relaying tx %s (wtxid=%s) from peer=%d\n", txid.ToString(), wtxid.ToString(), pfrom.GetId()); - RelayTransaction(txid, wtxid); + InitiateTxBroadcastToAll(txid, wtxid); } } @@ -4780,6 +5030,17 @@ void PeerManagerImpl::ProcessMessage(CNode& pfrom, const std::string& msg_type, if (ping_time.count() >= 0) { // Let connman know about this successful ping-pong pfrom.PongReceived(ping_time); + if (pfrom.IsPrivateBroadcastConn()) { + m_tx_for_private_broadcast.FinishBroadcast(pfrom.GetId(), /*confirmed_by_node=*/true); + LogPrintLevel( + BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Got a PONG (the transaction will probably reach the network), " + "marking for disconnect, peer=%d%s", + pfrom.GetId(), + pfrom.LogIP(fLogIPs)); + pfrom.fDisconnect = true; + } } else { // This should never happen sProblem = "Timing mishap"; @@ -5487,6 +5748,23 @@ bool PeerManagerImpl::SendMessages(CNode* pto) const auto current_time{GetTime()}; + // The logic below does not apply to private broadcast peers, so skip it. + // Also in CConnman::PushMessage() we make sure that unwanted messages are + // not sent. This here is just an optimization. + if (pto->IsPrivateBroadcastConn()) { + if (pto->m_connected + PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME < current_time) { + LogPrintLevel( + BCLog::PRIVATE_BROADCAST, + BCLog::Level::Info, + "Disconnecting: did not complete the transaction send within %d seconds, peer=%d%s", + std::chrono::duration_cast(PRIVATE_BROADCAST_MAX_CONNECTION_LIFETIME).count(), + pto->GetId(), + pto->LogIP(fLogIPs)); + pto->fDisconnect = true; + } + return true; + } + if (pto->IsAddrFetchConn() && current_time - pto->m_connected > 10 * AVG_ADDRESS_BROADCAST_INTERVAL) { LogDebug(BCLog::NET, "addrfetch connection timeout, %s\n", pto->DisconnectMsg(fLogIPs)); pto->fDisconnect = true; diff --git a/src/net_processing.h b/src/net_processing.h index 6eb4a5e16a2c0..4e2784013cf71 100644 --- a/src/net_processing.h +++ b/src/net_processing.h @@ -116,8 +116,19 @@ class PeerManager : public CValidationInterface, public NetEventsInterface /** Get peer manager info. */ virtual PeerManagerInfo GetInfo() const = 0; - /** Relay transaction to all peers. */ - virtual void RelayTransaction(const Txid& txid, const Wtxid& wtxid) = 0; + /** + * Initiate a transaction broadcast to eligible peers. + * Queue the witness transaction id to `Peer::TxRelay::m_tx_inventory_to_send` + * for each peer. Later, depending on `Peer::TxRelay::m_next_inv_send_time` and if + * the transaction is in the mempool, an `INV` about it may be sent to the peer. + */ + virtual void InitiateTxBroadcastToAll(const Txid& txid, const Wtxid& wtxid) = 0; + + /** + * Initiate a private transaction broadcast. This is done + * asynchronously via short-lived connections to peers on privacy networks. + */ + virtual void InitiateTxBroadcastPrivate(const CTransactionRef& tx) = 0; /** Send ping message to all peers */ virtual void SendPings() = 0; diff --git a/src/node/connection_types.cpp b/src/node/connection_types.cpp index 5e4dc5bf2ef94..4cf98047cf193 100644 --- a/src/node/connection_types.cpp +++ b/src/node/connection_types.cpp @@ -20,6 +20,8 @@ std::string ConnectionTypeAsString(ConnectionType conn_type) return "block-relay-only"; case ConnectionType::ADDR_FETCH: return "addr-fetch"; + case ConnectionType::PRIVATE_BROADCAST: + return "private-broadcast"; } // no default case, so the compiler can warn about missing cases assert(false); diff --git a/src/node/connection_types.h b/src/node/connection_types.h index a00895e2a8a55..eeb106b616d76 100644 --- a/src/node/connection_types.h +++ b/src/node/connection_types.h @@ -75,6 +75,13 @@ enum class ConnectionType { * AddrMan is empty. */ ADDR_FETCH, + + /** + * Private broadcast connections are short-lived and only opened to + * privacy networks (Tor, I2P) for relaying privacy-sensitive data (like + * our own transactions) and closed afterwards. + */ + PRIVATE_BROADCAST, }; /** Convert ConnectionType enum to a string value */ diff --git a/src/node/transaction.cpp b/src/node/transaction.cpp index f5bd0efe744b6..8b9eb80dbc085 100644 --- a/src/node/transaction.cpp +++ b/src/node/transaction.cpp @@ -74,13 +74,14 @@ TransactionError BroadcastTransaction(NodeContext& node, wtxid = mempool_tx->GetWitnessHash(); } else { // Transaction is not already in the mempool. - if (max_tx_fee > 0) { + const bool check_max_fee{max_tx_fee > 0}; + if (check_max_fee || broadcast_method == TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST) { // First, call ATMP with test_accept and check the fee. If ATMP // fails here, return error immediately. const MempoolAcceptResult result = node.chainman->ProcessTransaction(tx, /*test_accept=*/ true); if (result.m_result_type != MempoolAcceptResult::ResultType::VALID) { return HandleATMPError(result.m_state, err_string); - } else if (result.m_base_fees.value() > max_tx_fee) { + } else if (check_max_fee && result.m_base_fees.value() > max_tx_fee) { return TransactionError::MAX_FEE_EXCEEDED; } } @@ -104,6 +105,8 @@ TransactionError BroadcastTransaction(NodeContext& node, node.mempool->AddUnbroadcastTx(txid); } break; + case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + break; } if (wait_callback && node.validation_signals) { @@ -133,7 +136,10 @@ TransactionError BroadcastTransaction(NodeContext& node, case TxBroadcast::MEMPOOL_NO_BROADCAST: break; case TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL: - node.peerman->RelayTransaction(txid, wtxid); + node.peerman->InitiateTxBroadcastToAll(txid, wtxid); + break; + case TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + node.peerman->InitiateTxBroadcastPrivate(tx); break; } diff --git a/src/node/types.h b/src/node/types.h index 6c2687626c98c..bf11c2cba5541 100644 --- a/src/node/types.h +++ b/src/node/types.h @@ -108,6 +108,9 @@ enum class TxBroadcast : uint8_t { MEMPOOL_AND_BROADCAST_TO_ALL, /// Add the transaction to the mempool, but don't broadcast to anybody. MEMPOOL_NO_BROADCAST, + /// Omit the mempool and directly send the transaction via a few dedicated connections to + /// peers on privacy networks. + NO_MEMPOOL_PRIVATE_BROADCAST, }; } // namespace node diff --git a/src/private_broadcast.cpp b/src/private_broadcast.cpp new file mode 100644 index 0000000000000..2f4397d6d811e --- /dev/null +++ b/src/private_broadcast.cpp @@ -0,0 +1,133 @@ +// Copyright (c) 2023-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#include +#include + +/// If a transaction is not received back from the network for this duration +/// after it is broadcast, then we consider it stale / for rebroadcasting. +static constexpr auto STALE_DURATION{1min}; + +bool PrivateBroadcast::Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + const Txid& txid = tx->GetHash(); + LOCK(m_mutex); + auto [pos, inserted] = m_by_txid.emplace(txid, TxWithPriority{.tx = tx, .priority = Priority{}}); + if (inserted) { + m_by_priority.emplace(Priority{}, txid); + } + return inserted; +} + +std::optional PrivateBroadcast::Remove(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + auto iters = Find(tx->GetHash()); + if (!iters || iters->by_txid->second.tx->GetWitnessHash() != tx->GetWitnessHash()) { + return std::nullopt; + } + const size_t num_broadcasted{iters->by_priority->first.num_broadcasted}; + m_by_priority.erase(iters->by_priority); + m_by_txid.erase(iters->by_txid); + return num_broadcasted; +} + +std::optional PrivateBroadcast::GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + if (m_by_priority.empty()) { + return std::nullopt; + } + const Txid& txid = m_by_priority.begin()->second; + auto it = m_by_txid.find(txid); + if (Assume(it != m_by_txid.end())) { + return it->second.tx; + } + m_by_priority.erase(m_by_priority.begin()); + return std::nullopt; +} + +void PrivateBroadcast::PushedToNode(const NodeId& nodeid, const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + m_by_nodeid.emplace(nodeid, txid); +} + +std::optional PrivateBroadcast::GetTxPushedToNode(const NodeId& nodeid) const + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + + auto it_by_node = m_by_nodeid.find(nodeid); + if (it_by_node == m_by_nodeid.end()) { + return std::nullopt; + } + const Txid txid{it_by_node->second}; + + auto it_by_txid = m_by_txid.find(txid); + if (it_by_txid == m_by_txid.end()) { + return std::nullopt; + } + return it_by_txid->second.tx; +} + +bool PrivateBroadcast::FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto handle{m_by_nodeid.extract(nodeid)}; + if (!handle) { + return false; + } + + const Txid& txid{handle.mapped()}; + + auto iters{Find(txid)}; + + if (!iters.has_value()) { + return false; + } + + if (confirmed_by_node) { + // Update broadcast stats, since txid was found and its reception is confirmed by the node. + Priority& priority = iters->by_txid->second.priority; + + ++priority.num_broadcasted; + priority.last_broadcasted = NodeClock::now(); + + // Remove and re-add the entry in the m_by_priority map because we have changed the key. + m_by_priority.erase(iters->by_priority); + m_by_priority.emplace(priority, txid); + } + + return true; +} + +std::vector PrivateBroadcast::GetStale() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex) +{ + LOCK(m_mutex); + const auto stale_time = NodeClock::now() - STALE_DURATION; + std::vector stale; + for (const auto& [txid, tx_with_priority] : m_by_txid) { + if (tx_with_priority.priority.last_broadcasted < stale_time) { + stale.push_back(tx_with_priority.tx); + } + } + return stale; +} + +std::optional PrivateBroadcast::Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex) +{ + AssertLockHeld(m_mutex); + auto i = m_by_txid.find(txid); + if (i == m_by_txid.end()) { + return std::nullopt; + } + const Priority& priority = i->second.priority; + for (auto j = m_by_priority.lower_bound(priority); j != m_by_priority.end(); ++j) { + if (j->second == txid) { + return Iterators{.by_txid = i, .by_priority = j}; + } + } + return std::nullopt; +} diff --git a/src/private_broadcast.h b/src/private_broadcast.h new file mode 100644 index 0000000000000..9702214421b51 --- /dev/null +++ b/src/private_broadcast.h @@ -0,0 +1,117 @@ +// Copyright (c) 2023-present The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or https://opensource.org/license/mit/. + +#ifndef BITCOIN_PRIVATE_BROADCAST_H +#define BITCOIN_PRIVATE_BROADCAST_H + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +/** + * Store a list of transactions to be broadcast privately. Supports the following operations: + * - Add a new transaction + * - Remove a transaction, after it has been seen by the network + * - Mark a broadcast of a transaction (remember when and how many times) + * - Get a transaction for broadcast, the one that has been broadcast fewer times and least recently + */ +class PrivateBroadcast +{ +public: + /** + * Add a transaction to the storage. + * @param[in] tx The transaction to add. + * @retval true The transaction was added. + * @retval false The transaction was already present. + */ + bool Add(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Forget a transaction. + * @return the number of times the transaction was broadcast if the transaction existed and was removed, + * otherwise empty optional (the transaction was not in the storage). + */ + std::optional Remove(const CTransactionRef& tx) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Get the transaction that has been broadcast fewest times and least recently. + */ + std::optional GetTxForBroadcast() EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Mark a transaction as pushed to a given node. This is an intermediate state before + * we get a PONG from the node which would confirm that the transaction has been received. + * At the time we get the PONG we need to know which transaction we sent to that node, + * so that we can account how many times we broadcast each transaction. + */ + void PushedToNode(const NodeId& nodeid, const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Get the transaction that was pushed to a given node by PushedToNode(). + */ + std::optional GetTxPushedToNode(const NodeId& nodeid) const + EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Mark the end of a broadcast of a transaction. Either successful by receiving a PONG, + * or unsuccessful by closing the connection to the node without getting PONG. + * @return true if the reference by the given node id was removed and the transaction + * we tried to send to this node is still in the private broadcast pool. + */ + bool FinishBroadcast(const NodeId& nodeid, bool confirmed_by_node) EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + + /** + * Get the transactions that have not been broadcast recently. + */ + std::vector GetStale() const EXCLUSIVE_LOCKS_REQUIRED(!m_mutex); + +private: + struct Priority { + // Note: operator<=>() depends on the declaration order. + size_t num_broadcasted{0}; + NodeClock::time_point last_broadcasted{}; + + auto operator<=>(const Priority&) const = default; + }; + + struct TxWithPriority { + CTransactionRef tx; + Priority priority; + }; + + using ByTxid = std::unordered_map; + using ByPriority = std::multimap; + using ByNodeId = std::unordered_map; + + struct Iterators { + ByTxid::iterator by_txid; + ByPriority::iterator by_priority; + }; + + /** + * Get iterators in `m_by_txid` and `m_by_priority` for a given transaction. + */ + std::optional Find(const Txid& txid) EXCLUSIVE_LOCKS_REQUIRED(m_mutex); + + mutable Mutex m_mutex; + ByTxid m_by_txid GUARDED_BY(m_mutex); + ByPriority m_by_priority GUARDED_BY(m_mutex); + + /** + * Remember which transaction was sent to which node, so that when we get the PONG + * from that node we can mark the transaction as broadcast. + */ + ByNodeId m_by_nodeid GUARDED_BY(m_mutex); +}; + +#endif // BITCOIN_PRIVATE_BROADCAST_H diff --git a/src/qt/guiutil.cpp b/src/qt/guiutil.cpp index 28610db451279..f7b6c15c1486b 100644 --- a/src/qt/guiutil.cpp +++ b/src/qt/guiutil.cpp @@ -729,6 +729,8 @@ QString ConnectionTypeToQString(ConnectionType conn_type, bool prepend_direction case ConnectionType::FEELER: return prefix + QObject::tr("Feeler"); //: Short-lived peer connection type that solicits known addresses from a peer. case ConnectionType::ADDR_FETCH: return prefix + QObject::tr("Address Fetch"); + //: Short-lived peer connection type that is used for broadcasting privacy-sensitive data. + case ConnectionType::PRIVATE_BROADCAST: return prefix + QObject::tr("Private Broadcast"); } // no default case, so the compiler can warn about missing cases assert(false); } diff --git a/src/qt/rpcconsole.cpp b/src/qt/rpcconsole.cpp index d6d2be7b3934b..8723a52a08785 100644 --- a/src/qt/rpcconsole.cpp +++ b/src/qt/rpcconsole.cpp @@ -484,7 +484,10 @@ RPCConsole::RPCConsole(interfaces::Node& node, const PlatformStyle *_platformSty tr("Outbound Feeler: short-lived, for testing addresses"), /*: Explanatory text for a short-lived outbound peer connection that is used to request addresses from a peer. */ - tr("Outbound Address Fetch: short-lived, for soliciting addresses")}; + tr("Outbound Address Fetch: short-lived, for soliciting addresses"), + /*: Explanatory text for a short-lived outbound peer connection that is used + to broadcast privacy-sensitive data (like our transactions). */ + tr("Private broadcast: short-lived, for broadcasting privacy-sensitive transactions")}; const QString connection_types_list{"
  • " + Join(CONNECTION_TYPE_DOC, QString("
  • ")) + "
"}; ui->peerConnectionTypeLabel->setToolTip(ui->peerConnectionTypeLabel->toolTip().arg(connection_types_list)); const std::vector TRANSPORT_TYPE_DOC{ diff --git a/src/rpc/mempool.cpp b/src/rpc/mempool.cpp index 62a2e29dac909..50f8036f4f50a 100644 --- a/src/rpc/mempool.cpp +++ b/src/rpc/mempool.cpp @@ -8,10 +8,12 @@ #include #include +#include #include #include #include #include +#include // for g_reachable_nets #include #include #include @@ -44,11 +46,21 @@ static RPCHelpMan sendrawtransaction() { return RPCHelpMan{ "sendrawtransaction", - "Submit a raw transaction (serialized, hex-encoded) to local node and network.\n" - "\nThe transaction will be sent unconditionally to all peers, so using sendrawtransaction\n" - "for manual rebroadcast may degrade privacy by leaking the transaction's origin, as\n" - "nodes will normally not rebroadcast non-wallet transactions already in their mempool.\n" + "Submit a raw transaction (serialized, hex-encoded) to the network.\n" + + "\nIf -privatebroadcast is disabled, then the transaction will be put into the\n" + "local mempool of the node and will be sent unconditionally to all currently\n" + "connected peers, so using sendrawtransaction for manual rebroadcast will degrade\n" + "privacy by leaking the transaction's origin, as nodes will normally not\n" + "rebroadcast non-wallet transactions already in their mempool.\n" + + "\nIf -privatebroadcast is enabled, then the transaction will be sent only via\n" + "dedicated, short-lived connections to Tor or I2P peers or IPv4/IPv6 peers\n" + "through the Tor network. This conceals the transaction origin. The transaction\n" + "will only enter the local mempool when it is received back from the network.\n" + "\nA specific exception, RPC_TRANSACTION_ALREADY_IN_UTXO_SET, may throw if the transaction cannot be added to the mempool.\n" + "\nRelated RPCs: createrawtransaction, signrawtransactionwithkey\n", { {"hexstring", RPCArg::Type::STR_HEX, RPCArg::Optional::NO, "The hex string of the raw transaction"}, @@ -98,11 +110,23 @@ static RPCHelpMan sendrawtransaction() std::string err_string; AssertLockNotHeld(cs_main); NodeContext& node = EnsureAnyNodeContext(request.context); + const bool private_broadcast_enabled{gArgs.GetBoolArg("-privatebroadcast", DEFAULT_PRIVATE_BROADCAST)}; + if (private_broadcast_enabled && + !g_reachable_nets.Contains(NET_ONION) && + !g_reachable_nets.Contains(NET_I2P)) { + throw JSONRPCError(RPC_MISC_ERROR, + "-privatebroadcast is enabled, but none of the Tor or I2P networks is " + "reachable. Maybe the location of the Tor proxy couldn't be retrieved " + "from the Tor daemon at startup. Check whether the Tor daemon is running " + "and that -torcontrol, -torpassword and -i2psam are configured properly."); + } + const auto method = private_broadcast_enabled ? node::TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST + : node::TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL; const TransactionError err = BroadcastTransaction(node, tx, err_string, max_raw_tx_fee, - node::TxBroadcast::MEMPOOL_AND_BROADCAST_TO_ALL, + method, /*wait_callback=*/true); if (TransactionError::OK != err) { throw JSONRPCTransactionError(err, err_string); diff --git a/src/rpc/net.cpp b/src/rpc/net.cpp index c97d4c75af0e9..e48ca1a51389b 100644 --- a/src/rpc/net.cpp +++ b/src/rpc/net.cpp @@ -48,7 +48,8 @@ const std::vector CONNECTION_TYPE_DOC{ "inbound (initiated by the peer)", "manual (added via addnode RPC or -addnode/-connect configuration options)", "addr-fetch (short-lived automatic connection for soliciting addresses)", - "feeler (short-lived automatic connection for testing addresses)" + "feeler (short-lived automatic connection for testing addresses)", + "private-broadcast (short-lived automatic connection for broadcasting privacy-sensitive transactions)" }; const std::vector TRANSPORT_TYPE_DOC{ diff --git a/src/test/util/net.h b/src/test/util/net.h index 77954d92a4867..605b2fa81a07c 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -143,6 +143,7 @@ constexpr ConnectionType ALL_CONNECTION_TYPES[]{ ConnectionType::FEELER, ConnectionType::BLOCK_RELAY, ConnectionType::ADDR_FETCH, + ConnectionType::PRIVATE_BROADCAST, }; constexpr auto ALL_NETWORKS = std::array{ diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index 86474a456d7b9..7f32c8368adf5 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1983,6 +1983,9 @@ bool CWallet::SubmitTxMemoryPoolAndRelay(CWalletTx& wtx, case node::TxBroadcast::MEMPOOL_NO_BROADCAST: what = "to mempool without broadcast"; break; + case node::TxBroadcast::NO_MEMPOOL_PRIVATE_BROADCAST: + what = "for private broadcast without adding to the mempool"; + break; } WalletLogPrintf("Submitting wtx %s %s\n", wtx.GetHash().ToString(), what); // We must set TxStateInMempool here. Even though it will also be set later by the diff --git a/test/functional/feature_config_args.py b/test/functional/feature_config_args.py index 441c21f03a34e..1ead02f9e1ead 100755 --- a/test/functional/feature_config_args.py +++ b/test/functional/feature_config_args.py @@ -411,6 +411,24 @@ def test_connect_with_seednode(self): self.restart_node(0, extra_args=[connect_arg, '-dnsseed', '-proxy=localhost:1080']) self.stop_node(0) + def test_privatebroadcast(self): + self.log.info("Test that an invalid usage of -privatebroadcast throws an init error") + self.stop_node(0) + args_errors = { + "Private broadcast of own transactions requested (-privatebroadcast), " + "but none of Tor or I2P networks is reachable": + ["-privatebroadcast"], + + "Private broadcast of own transactions requested (-privatebroadcast), " + "but -connect is also configured. They are incompatible because the " + "private broadcast needs to open new connections to randomly chosen " + "Tor or I2P peers. Consider using -maxconnections=0 -addnode=... instead" : + # -onion= makes the Tor network reachable + ["-privatebroadcast", "-connect=127.0.0.1:8333", "-onion=127.0.0.1:9050"] + } + for msg, args in args_errors.items(): + self.nodes[0].assert_start_raises_init_error(extra_args=args, expected_msg=f"Error: {msg}") + def test_ignored_conf(self): self.log.info('Test error is triggered when the datadir in use contains a bitcoin.conf file that would be ignored ' 'because a conflicting -conf file argument is passed.') @@ -496,6 +514,7 @@ def run_test(self): self.test_seed_peers() self.test_networkactive() self.test_connect_with_seednode() + self.test_privatebroadcast() self.test_dir_config() self.test_negated_config() diff --git a/test/functional/p2p_private_broadcast.py b/test/functional/p2p_private_broadcast.py new file mode 100755 index 0000000000000..9ee8dce0faa87 --- /dev/null +++ b/test/functional/p2p_private_broadcast.py @@ -0,0 +1,419 @@ +#!/usr/bin/env python3 +# Copyright (c) 2017-present The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or http://www.opensource.org/licenses/mit-license.php. +""" +Test how locally submitted transactions are sent to the network when private broadcast is used. +""" + +import time +import threading + +from test_framework.p2p import ( + P2PDataStore, + P2PInterface, + P2P_SERVICES, + P2P_VERSION, +) +from test_framework.messages import ( + CAddress, + CInv, + COIN, + MSG_WTX, + malleate_tx_to_invalid_witness, + msg_inv, + msg_tx, +) +from test_framework.netutil import ( + format_addr_port +) +from test_framework.script_util import ValidWitnessMalleatedTx +from test_framework.socks5 import ( + Socks5Configuration, + Socks5Server, +) +from test_framework.test_framework import ( + BitcoinTestFramework, +) +from test_framework.util import ( + MAX_NODES, + assert_equal, + assert_not_equal, + assert_raises_rpc_error, + p2p_port, + tor_port, +) +from test_framework.wallet import ( + MiniWallet, +) + +MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8 +MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2 +NUM_INITIAL_CONNECTIONS = MAX_OUTBOUND_FULL_RELAY_CONNECTIONS + MAX_BLOCK_RELAY_ONLY_CONNECTIONS +NUM_PRIVATE_BROADCAST_PER_TX = 3 + +# Fill addrman with these addresses. Must have enough Tor addresses, so that even +# if all 10 default connections are opened to a Tor address (!?) there must be more +# for private broadcast. +ADDRMAN_ADDRESSES = [ + "1.65.195.98", + "2.59.236.56", + "2.83.114.20", + "2.248.194.16", + "5.2.154.6", + "5.101.140.30", + "5.128.87.126", + "5.144.21.49", + "5.172.132.104", + "5.188.62.18", + "5.200.2.180", + "8.129.184.255", + "8.209.105.138", + "12.34.98.148", + "14.199.102.151", + "18.27.79.17", + "18.27.124.231", + "18.216.249.151", + "23.88.155.58", + "23.93.101.158", + "[2001:19f0:1000:1db3:5400:4ff:fe56:5a8d]", + "[2001:19f0:5:24da:3eec:efff:feb9:f36e]", + "[2001:19f0:5:24da::]", + "[2001:19f0:5:4535:3eec:efff:feb9:87e4]", + "[2001:19f0:5:4535::]", + "[2001:1bc0:c1::2000]", + "[2001:1c04:4008:6300:8a5f:2678:114b:a660]", + "[2001:41d0:203:3739::]", + "[2001:41d0:203:8f49::]", + "[2001:41d0:203:bb0a::]", + "[2001:41d0:2:bf8f::]", + "[2001:41d0:303:de8b::]", + "[2001:41d0:403:3d61::]", + "[2001:41d0:405:9600::]", + "[2001:41d0:8:ed7f::1]", + "[2001:41d0:a:69a2::1]", + "[2001:41f0::62:6974:636f:696e]", + "[2001:470:1b62::]", + "[2001:470:1f05:43b:2831:8530:7179:5864]", + "[2001:470:1f09:b14::11]", + "2bqghnldu6mcug4pikzprwhtjjnsyederctvci6klcwzepnjd46ikjyd.onion", + "4lr3w2iyyl5u5l6tosizclykf5v3smqroqdn2i4h3kq6pfbbjb2xytad.onion", + "5g72ppm3krkorsfopcm2bi7wlv4ohhs4u4mlseymasn7g7zhdcyjpfid.onion", + "5sbmcl4m5api5tqafi4gcckrn3y52sz5mskxf3t6iw4bp7erwiptrgqd.onion", + "776aegl7tfhg6oiqqy76jnwrwbvcytsx2qegcgh2mjqujll4376ohlid.onion", + "77mdte42srl42shdh2mhtjr7nf7dmedqrw6bkcdekhdvmnld6ojyyiad.onion", + "azbpsh4arqlm6442wfimy7qr65bmha2zhgjg7wbaji6vvaug53hur2qd.onion", + "b64xcbleqmwgq2u46bh4hegnlrzzvxntyzbmucn3zt7cssm7y4ubv3id.onion", + "bsqbtcparrfihlwolt4xgjbf4cgqckvrvsfyvy6vhiqrnh4w6ghixoid.onion", + "bsqbtctulf2g4jtjsdfgl2ed7qs6zz5wqx27qnyiik7laockryvszqqd.onion", + "cwi3ekrwhig47dhhzfenr5hbvckj7fzaojygvazi2lucsenwbzwoyiqd.onion", + "devinbtcmwkuitvxl3tfi5of4zau46ymeannkjv6fpnylkgf3q5fa3id.onion", + "devinbtcyk643iruzfpaxw3on2jket7rbjmwygm42dmdyub3ietrbmid.onion", + "dtql5vci4iaml4anmueftqr7bfgzqlauzfy4rc2tfgulldd3ekyijjyd.onion", + "emzybtc25oddoa2prol2znpz2axnrg6k77xwgirmhv7igoiucddsxiad.onion", + "emzybtc3ewh7zihpkdvuwlgxrhzcxy2p5fvjggp7ngjbxcytxvt4rjid.onion", + "emzybtc454ewbviqnmgtgx3rgublsgkk23r4onbhidcv36wremue4kqd.onion", + "emzybtc5bnpb2o6gh54oquiox54o4r7yn4a2wiiwzrjonlouaibm2zid.onion", + "fpz6r5ppsakkwypjcglz6gcnwt7ytfhxskkfhzu62tnylcknh3eq6pad.onion", + "255fhcp6ajvftnyo7bwz3an3t4a4brhopm3bamyh2iu5r3gnr2rq.b32.i2p", + "27yrtht5b5bzom2w5ajb27najuqvuydtzb7bavlak25wkufec5mq.b32.i2p", + "3gocb7wc4zvbmmebktet7gujccuux4ifk3kqilnxnj5wpdpqx2hq.b32.i2p", + "4fcc23wt3hyjk3csfzcdyjz5pcwg5dzhdqgma6bch2qyiakcbboa.b32.i2p", + "4osyqeknhx5qf3a73jeimexwclmt42cju6xdp7icja4ixxguu2hq.b32.i2p", + "4umsi4nlmgyp4rckosg4vegd2ysljvid47zu7pqsollkaszcbpqq.b32.i2p", + "6j2ezegd3e2e2x3o3pox335f5vxfthrrigkdrbgfbdjchm5h4awa.b32.i2p", + "6n36ljyr55szci5ygidmxqer64qr24f4qmnymnbvgehz7qinxnla.b32.i2p", + "72yjs6mvlby3ky6mgpvvlemmwq5pfcznrzd34jkhclgrishqdxva.b32.i2p", + "a5qsnv3maw77mlmmzlcglu6twje6ttctd3fhpbfwcbpmewx6fczq.b32.i2p", + "aovep2pco7v2k4rheofrgytbgk23eg22dczpsjqgqtxcqqvmxk6a.b32.i2p", + "bitcoi656nll5hu6u7ddzrmzysdtwtnzcnrjd4rfdqbeey7dmn5a.b32.i2p", + "brifkruhlkgrj65hffybrjrjqcgdgqs2r7siizb5b2232nruik3a.b32.i2p", + "c4gfnttsuwqomiygupdqqqyy5y5emnk5c73hrfvatri67prd7vyq.b32.i2p", + "day3hgxyrtwjslt54sikevbhxxs4qzo7d6vi72ipmscqtq3qmijq.b32.i2p", + "du5kydummi23bjfp6bd7owsvrijgt7zhvxmz5h5f5spcioeoetwq.b32.i2p", + "e55k6wu46rzp4pg5pk5npgbr3zz45bc3ihtzu2xcye5vwnzdy7pq.b32.i2p", + "eciohu5nq7vsvwjjc52epskuk75d24iccgzmhbzrwonw6lx4gdva.b32.i2p", + "ejlnngarmhqvune74ko7kk55xtgbz5i5ncs4vmnvjpy3l7y63xaa.b32.i2p", + "fhzlp3xroabohnmjonu5iqazwhlbbwh5cpujvw2azcu3srqdceja.b32.i2p", + "[fc32:17ea:e415:c3bf:9808:149d:b5a2:c9aa]", + "[fcc7:be49:ccd1:dc91:3125:f0da:457d:8ce]", + "[fcdc:73ae:b1a9:1bf8:d4c2:811:a4c7:c34e]", +] + + +class P2PPrivateBroadcast(BitcoinTestFramework): + def set_test_params(self): + self.disable_autoconnect = False + self.num_nodes = 2 + + def setup_nodes(self): + # Start a SOCKS5 proxy server. + socks5_server_config = Socks5Configuration() + # self.nodes[0] listens on p2p_port(0), + # self.nodes[1] listens on p2p_port(1), + # thus we tell the SOCKS5 server to listen on p2p_port(self.num_nodes) (self.num_nodes is 2) + socks5_server_config.addr = ("127.0.0.1", p2p_port(self.num_nodes)) + socks5_server_config.unauth = True + socks5_server_config.auth = True + + self.socks5_server = Socks5Server(socks5_server_config) + self.socks5_server.start() + + # Tor ports are the highest among p2p/rpc/tor, so this should be the first available port. + ports_base = tor_port(MAX_NODES) + 1 + + self.destinations = [] + + self.destinations_lock = threading.Lock() + + def destinations_factory(requested_to_addr, requested_to_port): + with self.destinations_lock: + i = len(self.destinations) + actual_to_addr = "" + actual_to_port = 0 + listener = None + if i == NUM_INITIAL_CONNECTIONS: + # Instruct the SOCKS5 server to redirect the first private + # broadcast connection from nodes[0] to nodes[1] + actual_to_addr = "127.0.0.1" # nodes[1] listen address + actual_to_port = tor_port(1) # nodes[1] listen port for Tor + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (nodes[1])") + else: + # Create a Python P2P listening node and instruct the SOCKS5 proxy to + # redirect the connection to it. The first outbound connection is used + # later to serve GETDATA, thus make it P2PDataStore(). + listener = P2PDataStore() if i == 0 else P2PInterface() + listener.peer_connect_helper(dstaddr="0.0.0.0", dstport=0, net=self.chain, timeout_factor=self.options.timeout_factor) + listener.peer_connect_send_version(services=P2P_SERVICES) + + def on_listen_done(addr, port): + nonlocal actual_to_addr + nonlocal actual_to_port + actual_to_addr = addr + actual_to_port = port + + self.network_thread.listen( + addr="127.0.0.1", + port=ports_base + i, + p2p=listener, + callback=on_listen_done) + # Wait until the callback has been called. + self.wait_until(lambda: actual_to_port != 0) + self.log.debug(f"Instructing the SOCKS5 proxy to redirect connection i={i} to " + f"{format_addr_port(actual_to_addr, actual_to_port)} (a Python node)") + + self.destinations.append({ + "requested_to": format_addr_port(requested_to_addr, requested_to_port), + "node": listener, + }) + assert_equal(len(self.destinations), i + 1) + + return { + "actual_to_addr": actual_to_addr, + "actual_to_port": actual_to_port, + } + + self.socks5_server.conf.destinations_factory = destinations_factory + + self.extra_args = [ + [ + # Needed to be able to add CJDNS addresses to addrman (otherwise they are unroutable). + "-cjdnsreachable", + # Connecting, sending garbage, being disconnected messes up with this test's + # check_broadcasts() which waits for a particular Python node to receive a connection. + "-v2transport=0", + "-test=addrman", + "-privatebroadcast", + f"-proxy={socks5_server_config.addr[0]}:{socks5_server_config.addr[1]}", + ], + [ + "-connect=0", + f"-bind=127.0.0.1:{tor_port(1)}=onion", + ], + ] + super().setup_nodes() + + def setup_network(self): + self.setup_nodes() + + def check_broadcasts(self, label, tx, broadcasts_to_expect, skip_destinations): + broadcasts_done = 0 + i = skip_destinations - 1 + while broadcasts_done < broadcasts_to_expect: + i += 1 + self.log.debug(f"{label}: waiting for outbound connection i={i}") + # At this point the connection may not yet have been established (A), + # may be active (B), or may have already been closed (C). + self.wait_until(lambda: len(self.destinations) > i) + dest = self.destinations[i] + peer = dest["node"] + peer.wait_until(lambda: peer.message_count["version"] == 1, check_connected=False) + # Now it is either (B) or (C). + if peer.last_message["version"].nServices != 0: + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} not a private broadcast, ignoring it (maybe feeler or extra block only)") + continue + self.log.debug(f"{label}: outbound connection i={i} to {dest['requested_to']} must be a private broadcast, checking it") + peer.wait_for_disconnect() + # Now it is (C). + assert_equal(peer.message_count, { + "version": 1, + "verack": 1, + "inv": 1, + "tx": 1, + "ping": 1 + }) + dummy_address = CAddress() + dummy_address.nServices = 0 + assert_equal(peer.last_message["version"].nVersion, P2P_VERSION) + assert_equal(peer.last_message["version"].nServices, 0) + assert_equal(peer.last_message["version"].nTime, 0) + assert_equal(peer.last_message["version"].addrTo, dummy_address) + assert_equal(peer.last_message["version"].addrFrom, dummy_address) + assert_equal(peer.last_message["version"].strSubVer, "/pynode:0.0.1/") + assert_equal(peer.last_message["version"].nStartingHeight, 0) + assert_equal(peer.last_message["version"].relay, 0) + assert_equal(peer.last_message["tx"].tx.txid_hex, tx["txid"]) + self.log.info(f"{label}: ok: outbound connection i={i} is private broadcast of txid={tx['txid']}") + broadcasts_done += 1 + + def run_test(self): + tx_originator = self.nodes[0] + tx_receiver = self.nodes[1] + far_observer = tx_receiver.add_p2p_connection(P2PInterface()) + + wallet = MiniWallet(tx_originator) + + # Fill tx_originator's addrman. + for addr in ADDRMAN_ADDRESSES: + res = tx_originator.addpeeraddress(address=addr, port=8333, tried=False) + if not res["success"]: + self.log.debug(f"Could not add {addr} to tx_originator's addrman (collision?)") + + self.wait_until(lambda: len(self.destinations) == NUM_INITIAL_CONNECTIONS) + + # The next opened connection by tx_originator should be "private broadcast" + # for sending the transaction. The SOCKS5 proxy should redirect it to tx_receiver. + + txs = wallet.create_self_transfer_chain(chain_length=3) + self.log.info(f"Created txid={txs[0]['txid']}: for basic test") + self.log.info(f"Created txid={txs[1]['txid']}: for broadcast with dependency in mempool + rebroadcast") + self.log.info(f"Created txid={txs[2]['txid']}: for broadcast with dependency not in mempool") + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0.1) + + self.log.debug(f"Waiting for outbound connection i={NUM_INITIAL_CONNECTIONS}, " + "must be the first private broadcast connection") + self.wait_until(lambda: len(tx_receiver.getrawmempool()) > 0) + far_observer.wait_for_tx(txs[0]["txid"]) + self.log.info(f"Outbound connection i={NUM_INITIAL_CONNECTIONS}: " + "the private broadcast target received and further relayed the transaction") + + # One already checked above, check the other NUM_PRIVATE_BROADCAST_PER_TX - 1 broadcasts. + self.check_broadcasts("Basic", txs[0], NUM_PRIVATE_BROADCAST_PER_TX - 1, NUM_INITIAL_CONNECTIONS + 1) + + self.log.info("Resending the same transaction via RPC again (it is not in the mempool yet)") + ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={txs[0]['txid']}, wtxid={txs[0]['wtxid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]): + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + + self.log.info("Sending a malleated transaction with an invalid witness via RPC") + malleated_invalid = malleate_tx_to_invalid_witness(txs[0]) + assert_raises_rpc_error(-26, "mempool-script-verify-flag-failed", + tx_originator.sendrawtransaction, + hexstring=malleated_invalid.serialize_with_witness().hex(), + maxfeerate=0.1) + + self.log.info("Checking that the transaction is not in the originator node's mempool") + assert_equal(len(tx_originator.getrawmempool()), 0) + + wtxid_int = int(txs[0]["wtxid"], 16) + inv = CInv(MSG_WTX, wtxid_int) + + self.log.info("Sending INV and waiting for GETDATA from node") + tx_returner = self.destinations[0]["node"] # Will return the transaction back to the originator. + tx_returner.tx_store[wtxid_int] = txs[0]["tx"] + assert "getdata" not in tx_returner.last_message + received_back_msg = f"Received our privately broadcast transaction (txid={txs[0]['txid']}) from the network" + with tx_originator.assert_debug_log(expected_msgs=[received_back_msg]): + tx_returner.send_without_ping(msg_inv([inv])) + tx_returner.wait_until(lambda: "getdata" in tx_returner.last_message) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 0) + + self.log.info("Waiting for normal broadcast to another peer") + self.destinations[1]["node"].wait_for_inv([inv]) + + self.log.info("Sending a transaction that is already in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[0]["hex"], maxfeerate=0) + self.check_broadcasts("Broadcast of mempool transaction", txs[0], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency in the mempool") + skip_destinations = len(self.destinations) + tx_originator.sendrawtransaction(hexstring=txs[1]["hex"], maxfeerate=0.1) + self.check_broadcasts("Dependency in mempool", txs[1], NUM_PRIVATE_BROADCAST_PER_TX, skip_destinations) + + self.log.info("Sending a transaction with a dependency not in the mempool (should be rejected)") + assert_equal(len(tx_originator.getrawmempool()), 1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0.1) + assert_raises_rpc_error(-25, "bad-txns-inputs-missingorspent", + tx_originator.sendrawtransaction, hexstring=txs[2]["hex"], maxfeerate=0) + + # Since txs[1] has not been received back by tx_originator, + # it should be re-broadcast after a while. Advance tx_originator's clock + # to trigger a re-broadcast. Should be more than the maximum returned by + # NextTxBroadcast() in net_processing.cpp. + self.log.info("Checking that rebroadcast works") + delta = 20 * 60 # 20min + skip_destinations = len(self.destinations) + rebroadcast_msg = f"Reattempting broadcast of stale txid={txs[1]['txid']}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[rebroadcast_msg.encode()]): + tx_originator.setmocktime(int(time.time()) + delta) + tx_originator.mockscheduler(delta) + self.check_broadcasts("Rebroadcast", txs[1], 1, skip_destinations) + tx_originator.setmocktime(0) # Let the clock tick again (it will go backwards due to this). + + self.log.info("Sending a pair of transactions with the same txid but different valid wtxids via RPC") + txgen = ValidWitnessMalleatedTx() + funding = wallet.get_utxo() + fee_sat = 1000 + siblings_parent = txgen.build_parent_tx(funding["txid"], amount=funding["value"] * COIN - fee_sat) + sibling1, sibling2 = txgen.build_malleated_children(siblings_parent.txid_hex, amount=siblings_parent.vout[0].nValue - fee_sat) + self.log.info(f" - sibling1: txid={sibling1.txid_hex}, wtxid={sibling1.wtxid_hex}") + self.log.info(f" - sibling2: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}") + assert_equal(sibling1.txid_hex, sibling2.txid_hex) + assert_not_equal(sibling1.wtxid_hex, sibling2.wtxid_hex) + wallet.sign_tx(siblings_parent) + assert_equal(len(tx_originator.getrawmempool()), 1) + tx_returner.send_without_ping(msg_tx(siblings_parent)) + self.wait_until(lambda: len(tx_originator.getrawmempool()) > 1) + self.log.info(" - siblings' parent added to the mempool") + tx_originator.sendrawtransaction(hexstring=sibling1.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sent sibling1: ok") + ignoring_msg = f"Ignoring unnecessary request to schedule an already scheduled transaction: txid={sibling2.txid_hex}, wtxid={sibling2.wtxid_hex}" + with tx_originator.busy_wait_for_debug_log(expected_msgs=[ignoring_msg.encode()]): + tx_originator.sendrawtransaction(hexstring=sibling2.serialize_with_witness().hex(), maxfeerate=0.1) + self.log.info(" - sibling2 rejected because it has the same txid: ok") + + # Stop the SOCKS5 proxy server to avoid it being upset by the bitcoin + # node disconnecting in the middle of the SOCKS5 handshake when we + # restart below. + self.socks5_server.stop() + + self.log.info("Trying to send a transaction when none of Tor or I2P is reachable") + self.restart_node(0, extra_args=[ + "-privatebroadcast", + "-v2transport=0", + # A location where definitely a Tor control is not listening. This would allow + # Bitcoin Core to start, hoping/assuming that the location of the Tor proxy + # may be retrieved after startup from the Tor control, but it will not be, so + # the RPC should throw. + "-torcontrol=127.0.0.1:1", + "-listenonion", + ]) + assert_raises_rpc_error(-1, "none of the Tor or I2P networks is reachable", + tx_originator.sendrawtransaction, hexstring=txs[0]["hex"], maxfeerate=0.1) + + +if __name__ == "__main__": + P2PPrivateBroadcast(__file__).main() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index a4c19dd11d5f2..ad588096bb526 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -301,6 +301,7 @@ 'rpc_dumptxoutset.py', 'feature_minchainwork.py', 'rpc_estimatefee.py', + 'p2p_private_broadcast.py', 'rpc_getblockstats.py', 'feature_port.py', 'feature_bind_port_externalip.py',