diff --git a/execution_chain/sync/wire_protocol/broadcast.nim b/execution_chain/sync/wire_protocol/broadcast.nim index 82f63b994f..b1b760594d 100644 --- a/execution_chain/sync/wire_protocol/broadcast.nim +++ b/execution_chain/sync/wire_protocol/broadcast.nim @@ -30,6 +30,7 @@ logScope: const maxOperationQuota = 1000000 fullReplenishTime = chronos.seconds(5) + NUM_PEERS_REBROADCAST_QUOTIENT = 4 POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) cleanupTicker = chronos.minutes(5) # https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#blockrangeupdate-0x11 @@ -67,8 +68,30 @@ func allowedOpsPerSecondCost(n: int): float = const txPoolProcessCost = allowedOpsPerSecondCost(1000) hashLookupCost = allowedOpsPerSecondCost(2000) + hashingCost = allowedOpsPerSecondCost(5000) blockRangeUpdateCost = allowedOpsPerSecondCost(20) +func add(seen: SeenObject, peer: Peer) = + seen.peers.incl(peer.id) + +iterator peers(wire: EthWireRef, random: bool = false): Peer = + var peers = newSeqOfCap[Peer](wire.node.numPeers) + for peer in wire.node.peers: + if peer.isNil: + continue + + if peer.supports(eth68) or peer.supports(eth69): + peers.add peer + + if random: + shuffle(peers) + + for peer in peers: + if peer.connectionState != ConnectionState.Connected: + continue + + yield peer + iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer = var peers = newSeqOfCap[Peer](wire.node.numPeers) for peer in wire.node.peers(eth69): @@ -77,11 +100,112 @@ iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer = peers.add peer if random: shuffle(peers) + for peer in peers: if peer.connectionState != ConnectionState.Connected: continue yield peer +proc seenByPeer(wire: EthWireRef, + packet: NewPooledTransactionHashesPacket, + peer: Peer) {.async: (raises: [CancelledError]).} = + for hash in packet.txHashes: + var seen = wire.seenTransactions.getOrDefault(hash, nil) + if seen.isNil: + seen = SeenObject( + lastSeen: getTime(), + ) + seen.add peer + wire.seenTransactions[hash] = seen + else: + seen.add peer + + awaitQuota(wire, hashLookupCost, "seen by peer") + +proc broadcastTransactions(wire: EthWireRef, + packet: TransactionsPacket, + hashes: NewPooledTransactionHashesPacket , + peer: Peer) {.async: (raises: [CancelledError]).} = + # This is used to avoid re-sending along newPooledTransactionHashes + # announcements/re-broadcasts + + var msg = newSeqOfCap[Transaction](packet.transactions.len) + for i, hash in hashes.txHashes: + var seen = wire.seenTransactions.getOrDefault(hash, nil) + if seen.isNil: + seen = SeenObject( + lastSeen: getTime(), + ) + seen.add peer + wire.seenTransactions[hash] = seen + msg.add packet.transactions[i] + elif peer.id notin seen.peers: + seen.add peer + msg.add packet.transactions[i] + + awaitQuota(wire, hashLookupCost, "broadcast transactions") + + try: + await peer.transactions(msg) + except EthP2PError as exc: + debug "broadcast transactions failed", + msg=exc.msg + +proc prepareTxHashesAnnouncement(wire: EthWireRef, packet: TransactionsPacket): + Future[NewPooledTransactionHashesPacket] + {.async: (raises: [CancelledError]).} = + let len = packet.transactions.len + var ann = NewPooledTransactionHashesPacket( + txTypes : newSeqOfCap[byte](len), + txSizes : newSeqOfCap[uint64](len), + txHashes: newSeqOfCap[Hash32](len), + ) + for tx in packet.transactions: + let (size, hash) = getEncodedLengthAndHash(tx) + ann.txTypes.add tx.txType.byte + ann.txSizes.add size.uint64 + ann.txHashes.add hash + + awaitQuota(wire, hashingCost, "broadcast transactions") + + ann + +proc broadcastTxHashes(wire: EthWireRef, + hashes: NewPooledTransactionHashesPacket, + peer: Peer) {.async: (raises: [CancelledError]).} = + let len = hashes.txHashes.len + var msg = NewPooledTransactionHashesPacket( + txTypes : newSeqOfCap[byte](len), + txSizes : newSeqOfCap[uint64](len), + txHashes: newSeqOfCap[Hash32](len), + ) + + template copyFrom(msg, hashes, i) = + msg.txTypes.add hashes.txTypes[i] + msg.txSizes.add hashes.txSizes[i] + msg.txHashes.add hashes.txHashes[i] + + for i, hash in hashes.txHashes: + var seen = wire.seenTransactions.getOrDefault(hash, nil) + if seen.isNil: + seen = SeenObject( + lastSeen: getTime(), + ) + seen.add peer + wire.seenTransactions[hash] = seen + msg.copyFrom(hashes, i) + elif peer.id notin seen.peers: + seen.add peer + msg.copyFrom(hashes, i) + + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + + try: + await peer.newPooledTransactionHashes(msg.txTypes, msg.txSizes, msg.txHashes) + except EthP2PError as exc: + debug "broadcast tx hashes failed", + msg=exc.msg + proc syncerRunning*(wire: EthWireRef): bool = # Disable transactions gossip and processing when # the syncer is still busy @@ -111,6 +235,11 @@ proc handleTransactionsBroadcast*(wire: EthWireRef, debug "Received new transactions", number = packet.transactions.len + # Don't rebroadcast invalid transactions + let newPacket = TransactionsPacket( + transactions: newSeqOfCap[Transaction](packet.transactions.len) + ) + wire.reqisterAction("TxPool consume incoming transactions"): for tx in packet.transactions: if tx.txType == TxEip4844: @@ -123,8 +252,26 @@ proc handleTransactionsBroadcast*(wire: EthWireRef, wire.txPool.addTx(tx).isOkOr: continue + # Only rebroadcast good transactions + newPacket.transactions.add tx awaitQuota(wire, txPoolProcessCost, "adding into txpool") + wire.reqisterAction("Broadcast transactions or hashes"): + let hashes = await wire.prepareTxHashesAnnouncement(newPacket) + await wire.seenByPeer(hashes, peer) + + let + numPeers = wire.node.numPeers + maxPeers = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT) + + var i = 0 + for peer in wire.peers(random = true): + if i < maxPeers: + await wire.broadcastTransactions(newPacket, hashes, peer) + else: + await wire.broadcastTxHashes(hashes, peer) + inc i + proc handleTxHashesBroadcast*(wire: EthWireRef, packet: NewPooledTransactionHashesPacket, peer: Peer) {.async: (raises: [CancelledError]).} = @@ -152,12 +299,19 @@ proc handleTxHashesBroadcast*(wire: EthWireRef, size: uint64 txType: byte + await wire.seenByPeer(packet, peer) + let numTx = packet.txHashes.len var i = 0 map: Table[Hash32, SizeType] + hashes = NewPooledTransactionHashesPacket( + txTypes : newSeqOfCap[byte](numTx), + txSizes : newSeqOfCap[uint64](numTx), + txHashes: newSeqOfCap[Hash32](numTx), + ) while i < numTx: var @@ -178,6 +332,10 @@ proc handleTxHashesBroadcast*(wire: EthWireRef, size: size, txType: packet.txTypes[i], ) + else: + hashes.txTypes.add packet.txTypes[i] + hashes.txSizes.add size + hashes.txHashes.add txHash awaitQuota(wire, hashLookupCost, "check transaction exists in pool") inc i @@ -245,8 +403,15 @@ proc handleTxHashesBroadcast*(wire: EthWireRef, wire.txPool.addTx(tx).isOkOr: continue + hashes.txTypes.add tx.tx.txType.byte + hashes.txSizes.add size.uint64 + hashes.txHashes.add hash + awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes") + for peer in wire.peers: + await wire.broadcastTxHashes(hashes, peer) + proc tickerLoop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = while true: # Create or replenish timer