From 7506ef40791430baa16ec1c0e7e7a84e0a25d9f3 Mon Sep 17 00:00:00 2001 From: jangko Date: Thu, 13 Mar 2025 19:38:14 +0700 Subject: [PATCH 1/8] Implement transactions broadcast Filter and accepts potentially good transactions. Removes bad transactions from circulation. Disconnect bad peer if they messing around with blob transactions with no or invalid sidecar. --- execution_chain/networking/p2p.nim | 7 + execution_chain/networking/p2p_types.nim | 3 + execution_chain/networking/peer_pool.nim | 14 - execution_chain/networking/rlpx.nim | 23 +- execution_chain/nimbus_desc.nim | 29 +- execution_chain/nimbus_execution_client.nim | 2 +- execution_chain/sync/wire_protocol.nim | 5 +- .../sync/wire_protocol/broadcast.nim | 330 ++++++++++++++++++ .../sync/wire_protocol/handler.nim | 29 +- .../sync/wire_protocol/requester.nim | 37 +- .../sync/wire_protocol/responder.nim | 9 +- execution_chain/sync/wire_protocol/setup.nim | 6 +- execution_chain/sync/wire_protocol/types.nim | 62 +++- .../nodocker/engine/engine_env.nim | 8 +- 14 files changed, 468 insertions(+), 96 deletions(-) create mode 100644 execution_chain/sync/wire_protocol/broadcast.nim diff --git a/execution_chain/networking/p2p.nim b/execution_chain/networking/p2p.nim index 414a2108b2..4cc9b3ef90 100644 --- a/execution_chain/networking/p2p.nim +++ b/execution_chain/networking/p2p.nim @@ -214,6 +214,13 @@ proc randomPeerWith*(node: EthereumNode, Protocol: type): Peer = if candidates.len > 0: return candidates.rand() +iterator randomPeersWith*(node: EthereumNode, Protocol: type): Peer = + var peers = newSeqOfCap[Peer](node.peerPool.connectedNodes.len) + for peer in node.peers(Protocol): + peers.add(peer) + shuffle(peers) + for p in peers: yield p + proc getPeer*(node: EthereumNode, peerId: NodeId, Protocol: type): Opt[Peer] = for peer in node.peers(Protocol): if peer.remote.id == peerId: diff --git a/execution_chain/networking/p2p_types.nim b/execution_chain/networking/p2p_types.nim index 2cac17f9bf..a6a56f7522 100644 --- a/execution_chain/networking/p2p_types.nim +++ b/execution_chain/networking/p2p_types.nim @@ -219,3 +219,6 @@ proc `$`*(v: Capability): string = v.name & "/" & $v.version proc toENode*(v: EthereumNode): ENode = ENode(pubkey: v.keys.pubkey, address: v.address) + +func id*(peer: Peer): NodeId = + peer.remote.id diff --git a/execution_chain/networking/peer_pool.nim b/execution_chain/networking/peer_pool.nim index e818d108e5..49787adb03 100644 --- a/execution_chain/networking/peer_pool.nim +++ b/execution_chain/networking/peer_pool.nim @@ -270,20 +270,6 @@ proc start*(p: PeerPool) = asyncSpawn p.run() proc len*(p: PeerPool): int = p.connectedNodes.len -# @property -# def peers(self) -> List[BasePeer]: -# peers = list(self.connected_nodes.values()) -# # Shuffle the list of peers so that dumb callsites are less likely to send -# # all requests to -# # a single peer even if they always pick the first one from the list. -# random.shuffle(peers) -# return peers - -# async def get_random_peer(self) -> BasePeer: -# while not self.peers: -# self.logger.debug("No connected peers, sleeping a bit") -# await asyncio.sleep(0.5) -# return random.choice(self.peers) iterator peers*(p: PeerPool): Peer = for remote, peer in p.connectedNodes: diff --git a/execution_chain/networking/rlpx.nim b/execution_chain/networking/rlpx.nim index 24d7ae011a..3056283bb5 100644 --- a/execution_chain/networking/rlpx.nim +++ b/execution_chain/networking/rlpx.nim @@ -309,9 +309,18 @@ proc nextMsgResolver[MsgType]( msgData: Rlp, future: FutureBase ) {.gcsafe, raises: [RlpError].} = var reader = msgData - Future[MsgType](future).complete reader.readRecordType( - MsgType, MsgType.rlpFieldsCount > 1 - ) + when MsgType is ref: + # TODO: rlp support ref types + type T = typeof(MsgType()[]) + var msg = MsgType() + msg[] = reader.readRecordType( + T, T.rlpFieldsCount > 1 + ) + Future[MsgType](future).complete msg + else: + Future[MsgType](future).complete reader.readRecordType( + MsgType, MsgType.rlpFieldsCount > 1 + ) proc failResolver[MsgType](reason: DisconnectionReason, future: FutureBase) = Future[MsgType](future).fail( @@ -1348,11 +1357,11 @@ proc rlpxAccept*( rlpx_accept_failure.inc(labelValues = ["timeout"]) return nil except PeerDisconnected as exc: - debug "Accped handshake disconnection", err = exc.msg, reason = exc.reason + debug "Accept handshake disconnection", err = exc.msg, reason = exc.reason rlpx_accept_failure.inc(labelValues = [$exc.reason]) return nil except EthP2PError as exc: - debug "Accped handshake error", err = exc.msg + debug "Accept handshake error", err = exc.msg rlpx_accept_failure.inc(labelValues = ["error"]) return nil @@ -1516,7 +1525,7 @@ template rlpxWithPacketHandler*(PROTO: distinct type; wrapRlpxWithPacketException(MSGTYPE, peer): var rlp = data - packet {.inject.}: MSGTYPE + packet {.inject.} = MSGTYPE() when numFields > 1: tryEnterList(rlp) @@ -1547,7 +1556,7 @@ template rlpxWithFutureHandler*(PROTO: distinct type; wrapRlpxWithPacketException(MSGTYPE, peer): var rlp = data - packet: MSGTYPE + packet = MSGTYPE() tryEnterList(rlp) let diff --git a/execution_chain/nimbus_desc.nim b/execution_chain/nimbus_desc.nim index 9be0392e42..d6599b16c6 100644 --- a/execution_chain/nimbus_desc.nim +++ b/execution_chain/nimbus_desc.nim @@ -8,7 +8,9 @@ # those terms. import + std/sequtils, chronos, + chronicles, ./networking/p2p, metrics/chronos_httpserver, ./rpc/rpc_server, @@ -16,6 +18,7 @@ import ./core/tx_pool, ./sync/peers, ./sync/beacon as beacon_sync, + ./sync/wire_protocol, ./beacon/beacon_engine, ./common, ./config @@ -50,22 +53,34 @@ type beaconSyncRef*: BeaconSyncRef beaconEngine*: BeaconEngineRef metricsServer*: MetricsHttpServerRef + wire*: EthWireRef {.push gcsafe, raises: [].} proc stop*(nimbus: NimbusNode, conf: NimbusConf) {.async, gcsafe.} = trace "Graceful shutdown" + var waitedFutures: seq[Future[void]] if nimbus.httpServer.isNil.not: - await nimbus.httpServer.stop() + waitedFutures.add nimbus.httpServer.stop() if nimbus.engineApiServer.isNil.not: - await nimbus.engineApiServer.stop() - if nimbus.beaconSyncRef.isNil.not: - await nimbus.beaconSyncRef.stop() + waitedFutures.add nimbus.engineApiServer.stop() if conf.maxPeers > 0: - await nimbus.networkLoop.cancelAndWait() + waitedFutures.add nimbus.networkLoop.cancelAndWait() if nimbus.peerManager.isNil.not: - await nimbus.peerManager.stop() + waitedFutures.add nimbus.peerManager.stop() + if nimbus.beaconSyncRef.isNil.not: + waitedFutures.add nimbus.beaconSyncRef.stop() if nimbus.metricsServer.isNil.not: - await nimbus.metricsServer.stop() + waitedFutures.add nimbus.metricsServer.stop() + if nimbus.wire.isNil.not: + waitedFutures.add nimbus.wire.stop() + + if waitedFutures.len > 0: + let + timeout = chronos.seconds(5) + completed = await withTimeout(allFutures(waitedFutures), timeout) + if not completed: + trace "Nimbus.stop(): timeout reached", timeout, + futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) {.pop.} diff --git a/execution_chain/nimbus_execution_client.nim b/execution_chain/nimbus_execution_client.nim index 9d2a8c3603..827e985369 100644 --- a/execution_chain/nimbus_execution_client.nim +++ b/execution_chain/nimbus_execution_client.nim @@ -103,7 +103,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, rng = nimbus.ctx.rng) # Add protocol capabilities - nimbus.ethNode.addEthHandlerCapability(nimbus.txPool) + nimbus.wire = nimbus.ethNode.addEthHandlerCapability(nimbus.txPool) # Always initialise beacon syncer nimbus.beaconSyncRef = BeaconSyncRef.init( diff --git a/execution_chain/sync/wire_protocol.nim b/execution_chain/sync/wire_protocol.nim index 254bf8be20..8d062cb280 100644 --- a/execution_chain/sync/wire_protocol.nim +++ b/execution_chain/sync/wire_protocol.nim @@ -10,14 +10,13 @@ import ./wire_protocol/requester, ./wire_protocol/responder, + ./wire_protocol/broadcast, ./wire_protocol/types, ./wire_protocol/setup export requester, responder, + broadcast, types, setup - -type - eth* = eth68 diff --git a/execution_chain/sync/wire_protocol/broadcast.nim b/execution_chain/sync/wire_protocol/broadcast.nim new file mode 100644 index 0000000000..970bcf8659 --- /dev/null +++ b/execution_chain/sync/wire_protocol/broadcast.nim @@ -0,0 +1,330 @@ +# nimbus-execution-client +# Copyright (c) 2025 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import + std/[tables, sets, times, sequtils], + chronos, + chronos/ratelimit, + chronicles, + eth/common/hashes, + stew/assign2, + results, + ./types, + ./requester, + ../../networking/p2p, + ../../core/tx_pool, + ../../core/eip4844 + +logScope: + topics = "tx-broadcast" + +const + maxOperationQuota = 1000000 + fullReplenishTime = chronos.seconds(5) + NUM_PEERS_REBROADCAST_QUOTIENT = 4 + POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) + cleanupTicker = chronos.minutes(5) + +template awaitQuota(bcParam: EthWireRef, costParam: float, protocolIdParam: string) = + let + wire = bcParam + cost = int(costParam) + protocolId = protocolIdParam + + try: + if not wire.quota.tryConsume(cost): + debug "Awaiting broadcast quota", cost = cost, protocolId = protocolId + await wire.quota.consume(cost) + except CancelledError as exc: + raise exc + except CatchableError as exc: + debug "Error while waiting broadcast quota", + cost = cost, protocolId = protocolId, msg = exc.msg + +template reqisterAction(wire: EthWireRef, actionDesc: string, body) = + block: + proc actionHandler(): Future[void] {.async: (raises: [CancelledError]).} = + debug "Invoking broadcast action", desc=actionDesc + body + + await wire.actionQueue.addLast(actionHandler) + +func allowedOpsPerSecondCost(n: int): float = + const replenishRate = (maxOperationQuota / fullReplenishTime.nanoseconds.float) + (replenishRate * 1000000000'f / n.float) + +const + txPoolProcessCost = allowedOpsPerSecondCost(1000) + hashLookupCost = allowedOpsPerSecondCost(2000) + hashingCost = allowedOpsPerSecondCost(5000) + +func add(seen: SeenObject, peer: Peer) = + seen.peers.incl(peer.id) + +proc seenByPeer(wire: EthWireRef, + packet: NewPooledTransactionHashesPacket, + peer: Peer) {.async: (raises: [CancelledError]).} = + for hash in packet.txHashes: + var seen = wire.seenTransactions.getOrDefault(hash, nil) + if seen.isNil: + seen = SeenObject( + lastSeen: getTime(), + ) + seen.add peer + wire.seenTransactions[hash] = seen + else: + seen.add peer + + awaitQuota(wire, hashLookupCost, "seen by peer") + +proc broadcastTransactions(wire: EthWireRef, + packet: TransactionsPacket, + hashes: NewPooledTransactionHashesPacket , + peer: Peer) {.async: (raises: [CancelledError]).} = + # This is used to avoid re-sending along newPooledTransactionHashes + # announcements/re-broadcasts + + var msg = newSeqOfCap[Transaction](packet.transactions.len) + for i, hash in hashes.txHashes: + var seen = wire.seenTransactions.getOrDefault(hash, nil) + if seen.isNil: + seen = SeenObject( + lastSeen: getTime(), + ) + seen.add peer + wire.seenTransactions[hash] = seen + msg.add packet.transactions[i] + elif peer.id notin seen.peers: + seen.add peer + msg.add packet.transactions[i] + + awaitQuota(wire, hashLookupCost, "broadcast transactions") + + try: + await peer.transactions(msg) + except EthP2PError as exc: + debug "broadcast transactions failed", + msg=exc.msg + +proc prepareTxHashesAnnouncement(wire: EthWireRef, packet: TransactionsPacket): + Future[NewPooledTransactionHashesPacket] + {.async: (raises: [CancelledError]).} = + let len = packet.transactions.len + var ann = NewPooledTransactionHashesPacket( + txTypes : newSeqOfCap[byte](len), + txSizes : newSeqOfCap[uint64](len), + txHashes: newSeqOfCap[Hash32](len), + ) + for tx in packet.transactions: + let (size, hash) = getEncodedLengthAndHash(tx) + ann.txTypes.add tx.txType.byte + ann.txSizes.add size.uint64 + ann.txHashes.add hash + + awaitQuota(wire, hashingCost, "broadcast transactions") + + ann + +proc broadcastTxHashes(wire: EthWireRef, + hashes: NewPooledTransactionHashesPacket, + peer: Peer) {.async: (raises: [CancelledError]).} = + let len = hashes.txHashes.len + var msg = NewPooledTransactionHashesPacket( + txTypes : newSeqOfCap[byte](len), + txSizes : newSeqOfCap[uint64](len), + txHashes: newSeqOfCap[Hash32](len), + ) + + template copyFrom(msg, hashes, i) = + msg.txTypes.add hashes.txTypes[i] + msg.txSizes.add hashes.txSizes[i] + msg.txHashes.add hashes.txHashes[i] + + for i, hash in hashes.txHashes: + var seen = wire.seenTransactions.getOrDefault(hash, nil) + if seen.isNil: + seen = SeenObject( + lastSeen: getTime(), + ) + seen.add peer + wire.seenTransactions[hash] = seen + msg.copyFrom(hashes, i) + elif peer.id notin seen.peers: + seen.add peer + msg.copyFrom(hashes, i) + + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + + try: + await peer.newPooledTransactionHashes(msg.txTypes, msg.txSizes, msg.txHashes) + except EthP2PError as exc: + debug "broadcast tx hashes failed", + msg=exc.msg + +proc handleTransactionsBroadcast*(wire: EthWireRef, + packet: TransactionsPacket, + peer: Peer) {.async: (raises: [CancelledError]).} = + if packet.transactions.len == 0: + return + + debug "received new transactions", + number = packet.transactions.len + + # Don't rebroadcast invalid transactions + let newPacket = TransactionsPacket( + transactions: newSeqOfCap[Transaction](packet.transactions.len) + ) + + wire.reqisterAction("TxPool consume incoming transactions"): + for tx in packet.transactions: + if tx.txType == TxEip4844: + # Disallow blob transaction broadcast + await peer.disconnect(ClientQuitting) + return + + wire.txPool.addTx(tx).isOkOr: + continue + + # Only rebroadcast good transactions + newPacket.transactions.add tx + awaitQuota(wire, txPoolProcessCost, "adding into txpool") + + wire.reqisterAction("Broadcast transactions or hashes"): + let hashes = await wire.prepareTxHashesAnnouncement(newPacket) + await wire.seenByPeer(hashes, peer) + + let + numPeers = wire.node.numPeers + maxPeers = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT) + + var i = 0 + for peer in wire.node.randomPeersWith(eth68): + if peer.isNil: + continue + + if peer.connectionState != ConnectionState.Connected: + continue + + if i < maxPeers: + await wire.broadcastTransactions(newPacket, hashes, peer) + else: + await wire.broadcastTxHashes(hashes, peer) + inc i + +proc handleTxHashesBroadcast*(wire: EthWireRef, + packet: NewPooledTransactionHashesPacket, + peer: Peer) {.async: (raises: [CancelledError]).} = + if packet.txHashes.len == 0: + return + + debug "received new pooled tx hashes", + hashes = packet.txHashes.len + + if packet.txHashes.len != packet.txSizes.len or + packet.txHashes.len != packet.txTypes.len: + debug "new pooled tx hashes invalid params", + hashes = packet.txHashes.len, + sizes = packet.txSizes.len, + types = packet.txTypes.len + return + + wire.reqisterAction("Broadcast transactions hashes"): + await wire.seenByPeer(packet, peer) + var + msg: PooledTransactionsRequest + res: Opt[PooledTransactionsPacket] + + assign(msg.txHashes, packet.txHashes) + + try: + res = await peer.getPooledTransactions(msg) + except EthP2PError as exc: + debug "request pooled transactions failed", + msg=exc.msg + + if res.isNone: + debug "request pooled transactions get nothing" + return + + let + ptx = res.get() + len = ptx.transactions.len + + var hashes = NewPooledTransactionHashesPacket( + txTypes : newSeqOfCap[byte](len), + txSizes : newSeqOfCap[uint64](len), + txHashes: newSeqOfCap[Hash32](len), + ) + + for i, tx in ptx.transactions: + # If we receive any blob transactions missing sidecars, or with + # sidecars that don't correspond to the versioned hashes reported + # in the header, disconnect from the sending peer. + if tx.tx.txType == TxEip4844: + if tx.networkPayload.isNil: + debug "Received sidecar-less blob transaction", peer + await peer.disconnect(ClientQuitting) + return + validateBlobTransactionWrapper(tx).isOkOr: + debug "Sidecar validation error", msg=error + await peer.disconnect(ClientQuitting) + return + + wire.txPool.addTx(tx).isOkOr: + continue + + # TODO: What if peer give us scrambled order of transactions? + # maybe need some hash map? + hashes.txTypes.add packet.txTypes[i] + hashes.txSizes.add packet.txSizes[i] + hashes.txHashes.add packet.txHashes[i] + + awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes") + + for peer in wire.node.peers(eth68): + if peer.connectionState != ConnectionState.Connected: + continue + + await wire.broadcastTxHashes(hashes, peer) + +proc setupCleanup*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = + while true: + await sleepAsync(cleanupTicker) + + wire.reqisterAction("Periodical cleanup"): + var expireds: seq[Hash32] + for key, seen in wire.seenTransactions: + if getTime() - seen.lastSeen > POOLED_STORAGE_TIME_LIMIT: + expireds.add key + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + + for expire in expireds: + wire.seenTransactions.del(expire) + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + +proc setupTokenBucket*(): TokenBucket = + TokenBucket.new(maxOperationQuota.int, fullReplenishTime) + +proc setupAction*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = + while true: + let action = await wire.actionQueue.popFirst() + await action() + +proc stop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = + var waitedFutures = @[ + wire.cleanupHeartbeat.cancelAndWait(), + wire.actionHeartbeat.cancelAndWait(), + ] + + let + timeout = chronos.seconds(5) + completed = await withTimeout(allFutures(waitedFutures), timeout) + if not completed: + trace "Broadcast.stop(): timeout reached", timeout, + futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg) diff --git a/execution_chain/sync/wire_protocol/handler.nim b/execution_chain/sync/wire_protocol/handler.nim index ea8efaa575..ccb14bf305 100644 --- a/execution_chain/sync/wire_protocol/handler.nim +++ b/execution_chain/sync/wire_protocol/handler.nim @@ -15,6 +15,7 @@ import stew/endians2, ./types, ./requester, + ./broadcast, ../../core/[chain, tx_pool], ../../networking/p2p @@ -28,17 +29,25 @@ const # https://github.com/ethereum/devp2p/blob/master/caps/eth.md#getpooledtransactions-0x09 MAX_TXS_SERVE = 256 SOFT_RESPONSE_LIMIT = 2 * 1024 * 1024 + MAX_ACTION_HANDLER = 128 # ------------------------------------------------------------------------------ # Public constructor/destructor # ------------------------------------------------------------------------------ proc new*(_: type EthWireRef, - txPool: TxPoolRef): EthWireRef = - EthWireRef( - chain: txPool.chain, - txPool: txPool + txPool: TxPoolRef, + node: EthereumNode): EthWireRef = + let wire = EthWireRef( + chain : txPool.chain, + txPool: txPool, + node : node, + quota : setupTokenBucket(), + actionQueue : newAsyncQueue[ActionHandler](maxsize = MAX_ACTION_HANDLER), ) + wire.cleanupHeartbeat = setupCleanup(wire) + wire.actionHeartbeat = setupAction(wire) + wire # ------------------------------------------------------------------------------ # Public functions: eth wire protocol handlers @@ -156,18 +165,6 @@ proc getBlockHeaders*(ctx: EthWireRef, move(list) -proc handleAnnouncedTxs*(ctx: EthWireRef, - packet: TransactionsPacket) = - if packet.transactions.len == 0: - return - - debug "received new transactions", - number = packet.transactions.len - - for tx in packet.transactions: - ctx.txPool.addTx(tx).isOkOr: - continue - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/execution_chain/sync/wire_protocol/requester.nim b/execution_chain/sync/wire_protocol/requester.nim index 26c06eeab1..1b0baca728 100644 --- a/execution_chain/sync/wire_protocol/requester.nim +++ b/execution_chain/sync/wire_protocol/requester.nim @@ -31,41 +31,8 @@ defineProtocol(PROTO = eth68, networkState = EthWireRef) type - StatusPacket* = object - ethVersion*: uint64 - networkId*: NetworkId - totalDifficulty*: DifficultyInt - bestHash*: Hash32 - genesisHash*: Hash32 - forkId*: ChainForkId - - BlockHeadersPacket* = object - headers*: seq[Header] - - BlockBodiesPacket* = object - bodies*: seq[BlockBody] - - PooledTransactionsPacket* = object - transactions*: seq[PooledTransaction] - - ReceiptsPacket* = object - receipts*: seq[seq[Receipt]] - - NewBlockHashesPacket* = object - hashes*: seq[NewBlockHashesAnnounce] - - NewBlockPacket* = object - blk*: EthBlock - totalDifficulty*: DifficultyInt - - TransactionsPacket* = object - transactions*: seq[Transaction] - - NewPooledTransactionHashesPacket* = object - txTypes*: seq[byte] - txSizes*: seq[uint64] - txHashes*: seq[Hash32] - + eth* = eth68 + const StatusMsg* = 0'u64 NewBlockHashesMsg* = 1'u64 diff --git a/execution_chain/sync/wire_protocol/responder.nim b/execution_chain/sync/wire_protocol/responder.nim index cc9583ce96..dae9cd5d5e 100644 --- a/execution_chain/sync/wire_protocol/responder.nim +++ b/execution_chain/sync/wire_protocol/responder.nim @@ -11,9 +11,12 @@ import stint, chronicles, stew/byteutils, + eth/common/transactions_rlp, + ./types, ./handler, ./requester, - ./trace_config, + ./broadcast, + ./trace_config, ../../utils/utils, ../../common/logging, ../../networking/p2p_protocol_dsl, @@ -104,7 +107,7 @@ proc transactionsUserHandler(peer: Peer; packet: TransactionsPacket) {. trace trEthRecvReceived & "Transactions (0x02)", peer, transactions = packet.transactions.len let ctx = peer.networkState(eth68) - ctx.handleAnnouncedTxs(packet) + await ctx.handleTransactionsBroadcast(packet, peer) proc transactionsThunk(peer: Peer; data: Rlp) {. async: (raises: [CancelledError, EthP2PError]).} = @@ -190,6 +193,8 @@ proc newPooledTransactionHashesUserHandler(peer: Peer; packet: NewPooledTransact trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer, txTypes = packet.txTypes.toHex, txSizes = packet.txSizes.toStr, hashes = packet.txHashes.len + let ctx = peer.networkState(eth68) + await ctx.handleTxHashesBroadcast(packet, peer) proc newPooledTransactionHashesThunk(peer: Peer; data: Rlp) {. async: (raises: [CancelledError, EthP2PError]).} = diff --git a/execution_chain/sync/wire_protocol/setup.nim b/execution_chain/sync/wire_protocol/setup.nim index 348d9685f8..9ed6063928 100644 --- a/execution_chain/sync/wire_protocol/setup.nim +++ b/execution_chain/sync/wire_protocol/setup.nim @@ -22,11 +22,13 @@ import proc addEthHandlerCapability*( node: EthereumNode; txPool: TxPoolRef; - ) = + ): EthWireRef = ## Install `eth` handlers. + let wire = EthWireRef.new(txPool, node) node.addCapability( requester.eth68, - EthWireRef.new(txPool)) + wire) + wire # ------------------------------------------------------------------------------ # End diff --git a/execution_chain/sync/wire_protocol/types.nim b/execution_chain/sync/wire_protocol/types.nim index 8886739f73..c92cbbd660 100644 --- a/execution_chain/sync/wire_protocol/types.nim +++ b/execution_chain/sync/wire_protocol/types.nim @@ -11,10 +11,49 @@ {.push raises: [].} import + std/[sets, tables, times], eth/common, - ../../core/[chain, tx_pool] - + chronos, + chronos/ratelimit, + ../../core/[chain, tx_pool], + ../../networking/p2p_types + type + StatusPacket* = object + ethVersion*: uint64 + networkId*: NetworkId + totalDifficulty*: DifficultyInt + bestHash*: Hash32 + genesisHash*: Hash32 + forkId*: ChainForkId + + BlockHeadersPacket* = object + headers*: seq[Header] + + BlockBodiesPacket* = object + bodies*: seq[BlockBody] + + PooledTransactionsPacket* = object + transactions*: seq[PooledTransaction] + + ReceiptsPacket* = object + receipts*: seq[seq[Receipt]] + + NewBlockHashesPacket* = object + hashes*: seq[NewBlockHashesAnnounce] + + NewBlockPacket* = object + blk*: EthBlock + totalDifficulty*: DifficultyInt + + TransactionsPacket* = ref object + transactions*: seq[Transaction] + + NewPooledTransactionHashesPacket* = ref object + txTypes*: seq[byte] + txSizes*: seq[uint64] + txHashes*: seq[Hash32] + NewBlockHashesAnnounce* = object hash*: Hash32 number*: BlockNumber @@ -39,16 +78,27 @@ type maxResults*, skip*: uint reverse*: bool - BlockBodiesRequest* =object + BlockBodiesRequest* = object blockHashes*: seq[Hash32] - PooledTransactionsRequest* =object + PooledTransactionsRequest* = object txHashes*: seq[Hash32] - ReceiptsRequest* =object + ReceiptsRequest* = object blockHashes*: seq[Hash32] + SeenObject* = ref object + lastSeen*: Time + peers*: HashSet[NodeId] + + ActionHandler* = proc(): Future[void] {.async: (raises: [CancelledError]).} + EthWireRef* = ref object of RootRef chain* : ForkedChainRef txPool*: TxPoolRef - \ No newline at end of file + node* : EthereumNode + quota* : TokenBucket + seenTransactions*: Table[Hash32, SeenObject] + cleanupHeartbeat*: Future[void].Raising([CancelledError]) + actionQueue*: AsyncQueue[ActionHandler] + actionHeartbeat*: Future[void].Raising([CancelledError]) diff --git a/hive_integration/nodocker/engine/engine_env.nim b/hive_integration/nodocker/engine/engine_env.nim index ae5b4323e0..33339d161f 100644 --- a/hive_integration/nodocker/engine/engine_env.nim +++ b/hive_integration/nodocker/engine/engine_env.nim @@ -47,6 +47,7 @@ type client : RpcHttpClient txPool : TxPoolRef chain : ForkedChainRef + wire : EthWireRef const baseFolder = "hive_integration/nodocker/engine" @@ -84,8 +85,7 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E com = makeCom(conf) chain = ForkedChainRef.init(com) txPool = TxPoolRef.new(chain) - - node.addEthHandlerCapability(txPool) + wire = node.addEthHandlerCapability(txPool) var key: JwtSharedKey key.fromHex(jwtSecret).isOkOr: @@ -126,13 +126,15 @@ proc newEngineEnv*(conf: var NimbusConf, chainFile: string, enableAuth: bool): E server : server, client : client, txPool : txPool, - chain : chain + chain : chain, + wire : wire, ) proc close*(env: EngineEnv) = waitFor env.node.closeWait() waitFor env.client.close() waitFor env.server.closeWait() + waitFor env.wire.stop() proc setRealTTD*(env: EngineEnv) = let genesis = env.com.genesisHeader From a6db0a76d1c5a37492da10081d446dbedae33143 Mon Sep 17 00:00:00 2001 From: jangko Date: Thu, 15 May 2025 15:36:02 +0700 Subject: [PATCH 2/8] Resolve merge conflict --- .../sync/wire_protocol/requester.nim | 54 ------------------- execution_chain/sync/wire_protocol/types.nim | 22 +++++++- 2 files changed, 20 insertions(+), 56 deletions(-) diff --git a/execution_chain/sync/wire_protocol/requester.nim b/execution_chain/sync/wire_protocol/requester.nim index 970572571a..17835e47e5 100644 --- a/execution_chain/sync/wire_protocol/requester.nim +++ b/execution_chain/sync/wire_protocol/requester.nim @@ -33,60 +33,6 @@ defineProtocol(PROTO = eth69, peerState = Eth69PeerState, networkState = EthWireRef) -type - Status68Packet* = object - version*: uint64 - networkId*: NetworkId - totalDifficulty*: DifficultyInt - bestHash*: Hash32 - genesisHash*: Hash32 - forkId*: ChainForkId - - # https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#status-0x00 - Status69Packet* = object - version*: uint64 - networkId*: NetworkId - genesisHash*: Hash32 - forkId*: ChainForkId - earliest*: uint64 # earliest available full block - latest*: uint64 # latest available full block - latestHash*: Hash32 # hash of latest available full block - - BlockHeadersPacket* = object - headers*: seq[Header] - - BlockBodiesPacket* = object - bodies*: seq[BlockBody] - - PooledTransactionsPacket* = object - transactions*: seq[PooledTransaction] - - ReceiptsPacket* = object - receipts*: seq[seq[Receipt]] - - StoredReceiptsPacket* = object - receipts*: seq[seq[StoredReceipt]] - - NewBlockHashesPacket* = object - hashes*: seq[NewBlockHashesAnnounce] - - NewBlockPacket* = object - blk*: EthBlock - totalDifficulty*: DifficultyInt - - TransactionsPacket* = object - transactions*: seq[Transaction] - - NewPooledTransactionHashesPacket* = object - txTypes*: seq[byte] - txSizes*: seq[uint64] - txHashes*: seq[Hash32] - - BlockRangeUpdatePacket* = object - earliest*: uint64 - latest*: uint64 - latestHash*: Hash32 - const StatusMsg* = 0'u64 NewBlockHashesMsg* = 1'u64 diff --git a/execution_chain/sync/wire_protocol/types.nim b/execution_chain/sync/wire_protocol/types.nim index 55858cbbb7..513cab2823 100644 --- a/execution_chain/sync/wire_protocol/types.nim +++ b/execution_chain/sync/wire_protocol/types.nim @@ -19,14 +19,24 @@ import ../../networking/p2p_types type - StatusPacket* = object - ethVersion*: uint64 + Status68Packet* = object + version*: uint64 networkId*: NetworkId totalDifficulty*: DifficultyInt bestHash*: Hash32 genesisHash*: Hash32 forkId*: ChainForkId + # https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#status-0x00 + Status69Packet* = object + version*: uint64 + networkId*: NetworkId + genesisHash*: Hash32 + forkId*: ChainForkId + earliest*: uint64 # earliest available full block + latest*: uint64 # latest available full block + latestHash*: Hash32 # hash of latest available full block + BlockHeadersPacket* = object headers*: seq[Header] @@ -97,6 +107,14 @@ type ReceiptsRequest* = object blockHashes*: seq[Hash32] + StoredReceiptsPacket* = object + receipts*: seq[seq[StoredReceipt]] + + BlockRangeUpdatePacket* = object + earliest*: uint64 + latest*: uint64 + latestHash*: Hash32 + SeenObject* = ref object lastSeen*: Time peers*: HashSet[NodeId] From b46e0537150a816609322d4005f579394185d7bb Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 20 May 2025 10:34:24 +0700 Subject: [PATCH 3/8] Avoid recompute block hash in getStatus68/69 --- execution_chain/sync/wire_protocol/handler.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/execution_chain/sync/wire_protocol/handler.nim b/execution_chain/sync/wire_protocol/handler.nim index 96bab6fac7..e7e48c5d1b 100644 --- a/execution_chain/sync/wire_protocol/handler.nim +++ b/execution_chain/sync/wire_protocol/handler.nim @@ -56,14 +56,14 @@ proc new*(_: type EthWireRef, proc getStatus68*(ctx: EthWireRef): Eth68State = let com = ctx.chain.com - bestBlock = ctx.chain.latestHeader + bestBlock = ctx.chain.latestHeader txFrame = ctx.chain.baseTxFrame forkId = com.forkId(bestBlock.number, bestBlock.timestamp) Eth68State( totalDifficulty: txFrame.headTotalDifficulty, genesisHash: com.genesisHash, - bestBlockHash: bestBlock.computeBlockHash, + bestBlockHash: ctx.chain.latestHash, forkId: ChainForkId( forkHash: forkId.crc.toBytesBE, forkNext: forkId.nextFork @@ -83,7 +83,7 @@ proc getStatus69*(ctx: EthWireRef): Eth69State = ), earliest: 0, latest: bestBlock.number, - latestHash: bestBlock.computeBlockHash, + latestHash: ctx.chain.latestHash, ) proc getReceipts*(ctx: EthWireRef, From 4469e130bc34e267b0eb1998ab2a9c34ca01d9a6 Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 20 May 2025 10:34:57 +0700 Subject: [PATCH 4/8] Broadcast tx to both eth/68 or eth/69 --- .../sync/wire_protocol/broadcast.nim | 25 ++++++++++++++++--- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/execution_chain/sync/wire_protocol/broadcast.nim b/execution_chain/sync/wire_protocol/broadcast.nim index 970bcf8659..49e670f80a 100644 --- a/execution_chain/sync/wire_protocol/broadcast.nim +++ b/execution_chain/sync/wire_protocol/broadcast.nim @@ -8,7 +8,7 @@ # those terms. import - std/[tables, sets, times, sequtils], + std/[tables, sets, times, sequtils, random], chronos, chronos/ratelimit, chronicles, @@ -203,11 +203,20 @@ proc handleTransactionsBroadcast*(wire: EthWireRef, numPeers = wire.node.numPeers maxPeers = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT) - var i = 0 - for peer in wire.node.randomPeersWith(eth68): + var + i = 0 + peers = newSeqOfCap[Peer](numPeers) + + for peer in wire.node.peers: if peer.isNil: continue + if peer.supports(eth68) or peer.supports(eth69): + peers.add peer + + shuffle(peers) + + for peer in peers: if peer.connectionState != ConnectionState.Connected: continue @@ -287,7 +296,15 @@ proc handleTxHashesBroadcast*(wire: EthWireRef, awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes") - for peer in wire.node.peers(eth68): + var peers = newSeqOfCap[Peer](wire.node.numPeers) + for peer in wire.node.peers: + if peer.isNil: + continue + + if peer.supports(eth68) or peer.supports(eth69): + peers.add peer + + for peer in peers: if peer.connectionState != ConnectionState.Connected: continue From 7b5b3e57712c29001c3781f2af5c620618b85375 Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 20 May 2025 11:18:00 +0700 Subject: [PATCH 5/8] Broadcast blockRangeUpdate --- .../sync/wire_protocol/broadcast.nim | 119 +++++++++++------- 1 file changed, 77 insertions(+), 42 deletions(-) diff --git a/execution_chain/sync/wire_protocol/broadcast.nim b/execution_chain/sync/wire_protocol/broadcast.nim index 49e670f80a..97c7e76cba 100644 --- a/execution_chain/sync/wire_protocol/broadcast.nim +++ b/execution_chain/sync/wire_protocol/broadcast.nim @@ -19,7 +19,8 @@ import ./requester, ../../networking/p2p, ../../core/tx_pool, - ../../core/eip4844 + ../../core/eip4844, + ../../core/chain/forked_chain logScope: topics = "tx-broadcast" @@ -30,6 +31,8 @@ const NUM_PEERS_REBROADCAST_QUOTIENT = 4 POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) cleanupTicker = chronos.minutes(5) + # https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#blockrangeupdate-0x11 + blockRangeUpdateTicker = chronos.minutes(2) template awaitQuota(bcParam: EthWireRef, costParam: float, protocolIdParam: string) = let @@ -63,10 +66,47 @@ const txPoolProcessCost = allowedOpsPerSecondCost(1000) hashLookupCost = allowedOpsPerSecondCost(2000) hashingCost = allowedOpsPerSecondCost(5000) + blockRangeUpdateCost = allowedOpsPerSecondCost(20) func add(seen: SeenObject, peer: Peer) = seen.peers.incl(peer.id) +iterator peers(wire: EthWireRef, random: bool = false): Peer = + var peers = newSeqOfCap[Peer](wire.node.numPeers) + for peer in wire.node.peers: + if peer.isNil: + continue + + if peer.supports(eth68) or peer.supports(eth69): + peers.add peer + + if random: + shuffle(peers) + + for peer in peers: + if peer.connectionState != ConnectionState.Connected: + continue + + yield peer + +iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer = + var peers = newSeqOfCap[Peer](wire.node.numPeers) + for peer in wire.node.peers: + if peer.isNil: + continue + + if peer.supports(eth69): + peers.add peer + + if random: + shuffle(peers) + + for peer in peers: + if peer.connectionState != ConnectionState.Connected: + continue + + yield peer + proc seenByPeer(wire: EthWireRef, packet: NewPooledTransactionHashesPacket, peer: Peer) {.async: (raises: [CancelledError]).} = @@ -203,23 +243,8 @@ proc handleTransactionsBroadcast*(wire: EthWireRef, numPeers = wire.node.numPeers maxPeers = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT) - var - i = 0 - peers = newSeqOfCap[Peer](numPeers) - - for peer in wire.node.peers: - if peer.isNil: - continue - - if peer.supports(eth68) or peer.supports(eth69): - peers.add peer - - shuffle(peers) - - for peer in peers: - if peer.connectionState != ConnectionState.Connected: - continue - + var i = 0 + for peer in wire.peers(random = true): if i < maxPeers: await wire.broadcastTransactions(newPacket, hashes, peer) else: @@ -296,34 +321,44 @@ proc handleTxHashesBroadcast*(wire: EthWireRef, awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes") - var peers = newSeqOfCap[Peer](wire.node.numPeers) - for peer in wire.node.peers: - if peer.isNil: - continue - - if peer.supports(eth68) or peer.supports(eth69): - peers.add peer - - for peer in peers: - if peer.connectionState != ConnectionState.Connected: - continue - + for peer in wire.peers: await wire.broadcastTxHashes(hashes, peer) proc setupCleanup*(wire: EthWireRef) {.async: (raises: [CancelledError]).} = while true: - await sleepAsync(cleanupTicker) - - wire.reqisterAction("Periodical cleanup"): - var expireds: seq[Hash32] - for key, seen in wire.seenTransactions: - if getTime() - seen.lastSeen > POOLED_STORAGE_TIME_LIMIT: - expireds.add key - awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") - - for expire in expireds: - wire.seenTransactions.del(expire) - awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + let + cleanup = sleepAsync(cleanupTicker) + update = sleepAsync(blockRangeUpdateTicker) + res = await one(cleanup, update) + + if res == cleanup: + wire.reqisterAction("Periodical cleanup"): + var expireds: seq[Hash32] + for key, seen in wire.seenTransactions: + if getTime() - seen.lastSeen > POOLED_STORAGE_TIME_LIMIT: + expireds.add key + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + + for expire in expireds: + wire.seenTransactions.del(expire) + awaitQuota(wire, hashLookupCost, "broadcast transactions hashes") + + if res == update: + wire.reqisterAction("Periodical blockRangeUpdate"): + let + packet = BlockRangeUpdatePacket( + earliest: 0, + latest: wire.chain.latestNumber, + latestHash: wire.chain.latestHash, + ) + + for peer in wire.peers69OrLater: + try: + await peer.blockRangeUpdate(packet) + except EthP2PError as exc: + debug "broadcast block range update failed", + msg=exc.msg + awaitQuota(wire, blockRangeUpdateCost, "broadcast blockRangeUpdate") proc setupTokenBucket*(): TokenBucket = TokenBucket.new(maxOperationQuota.int, fullReplenishTime) From 670f3774471bbf48d64d9fbec4a4e41adbc0bd98 Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 20 May 2025 12:16:09 +0700 Subject: [PATCH 6/8] Disconnect peers with breach of protocol reason --- execution_chain/sync/wire_protocol/broadcast.nim | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/execution_chain/sync/wire_protocol/broadcast.nim b/execution_chain/sync/wire_protocol/broadcast.nim index 97c7e76cba..50649503c7 100644 --- a/execution_chain/sync/wire_protocol/broadcast.nim +++ b/execution_chain/sync/wire_protocol/broadcast.nim @@ -225,7 +225,9 @@ proc handleTransactionsBroadcast*(wire: EthWireRef, for tx in packet.transactions: if tx.txType == TxEip4844: # Disallow blob transaction broadcast - await peer.disconnect(ClientQuitting) + debug "Protocol Breach: Peer broadcast blob transaction", + remote=peer.remote, clientId=peer.clientId + await peer.disconnect(BreachOfProtocol) return wire.txPool.addTx(tx).isOkOr: @@ -302,12 +304,14 @@ proc handleTxHashesBroadcast*(wire: EthWireRef, # in the header, disconnect from the sending peer. if tx.tx.txType == TxEip4844: if tx.networkPayload.isNil: - debug "Received sidecar-less blob transaction", peer - await peer.disconnect(ClientQuitting) + debug "Protocol Breach: Received sidecar-less blob transaction", + remote=peer.remote, clientId=peer.clientId + await peer.disconnect(BreachOfProtocol) return validateBlobTransactionWrapper(tx).isOkOr: - debug "Sidecar validation error", msg=error - await peer.disconnect(ClientQuitting) + debug "Protocol Breach: Sidecar validation error", msg=error, + remote=peer.remote, clientId=peer.clientId + await peer.disconnect(BreachOfProtocol) return wire.txPool.addTx(tx).isOkOr: From e31f07de4f71185b1524921aaaa666919bf23575 Mon Sep 17 00:00:00 2001 From: jangko Date: Tue, 10 Jun 2025 18:33:10 +0700 Subject: [PATCH 7/8] Update blockRangeUpdateThunk and newPooledTransactionHashesThunk --- execution_chain/sync/wire_protocol/responder.nim | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/execution_chain/sync/wire_protocol/responder.nim b/execution_chain/sync/wire_protocol/responder.nim index 3093f50c53..9172df6b75 100644 --- a/execution_chain/sync/wire_protocol/responder.nim +++ b/execution_chain/sync/wire_protocol/responder.nim @@ -204,21 +204,23 @@ proc newBlockThunk[PROTO](peer: Peer; data: Rlp) {. await newBlockUserHandler(peer, packet) -proc newPooledTransactionHashesUserHandler(peer: Peer; packet: NewPooledTransactionHashesPacket) {. +proc newPooledTransactionHashesUserHandler(peer: Peer; + wire: EthWireRef; + packet: NewPooledTransactionHashesPacket) {. async: (raises: [CancelledError, EthP2PError]).} = when trEthTraceGossipOk: trace trEthRecvReceived & "NewPooledTransactionHashes (0x08)", peer, txTypes = packet.txTypes.toHex, txSizes = packet.txSizes.toStr, hashes = packet.txHashes.len - let ctx = peer.networkState(eth68) - await ctx.handleTxHashesBroadcast(packet, peer) + await wire.handleTxHashesBroadcast(packet, peer) proc newPooledTransactionHashesThunk[PROTO](peer: Peer; data: Rlp) {. async: (raises: [CancelledError, EthP2PError]).} = PROTO.rlpxWithPacketHandler(NewPooledTransactionHashesPacket, peer, data, [txTypes, txSizes, txHashes]): - await newPooledTransactionHashesUserHandler(peer, packet) + let wire = peer.networkState(PROTO) + await newPooledTransactionHashesUserHandler(peer, wire, packet) proc getPooledTransactionsUserHandler[PROTO](response: Responder; @@ -300,9 +302,9 @@ proc blockRangeUpdateUserHandler(peer: Peer; packet: BlockRangeUpdatePacket) {. peer.state(eth69).latest = packet.latest peer.state(eth69).latestHash = packet.latestHash -proc blockRangeUpdateThunk(peer: Peer; data: Rlp) {. +proc blockRangeUpdateThunk[PROTO](peer: Peer; data: Rlp) {. async: (raises: [CancelledError, EthP2PError]).} = - eth68.rlpxWithPacketHandler(BlockRangeUpdatePacket, + PROTO.rlpxWithPacketHandler(BlockRangeUpdatePacket, peer, data, [earliest, latest, latestHash]): await blockRangeUpdateUserHandler(peer, packet) @@ -447,7 +449,7 @@ proc eth69Registration() = status69Thunk, Status69Packet) registerCommonThunk(protocol, eth69) registerMsg(protocol, BlockRangeUpdateMsg, "blockRangeUpdate", - blockRangeUpdateThunk, BlockRangeUpdatePacket) + blockRangeUpdateThunk[eth69], BlockRangeUpdatePacket) registerProtocol(protocol) eth68Registration() From 26bfaef4145efe97da0c6ab9a9596c1d9bd1a7e8 Mon Sep 17 00:00:00 2001 From: jangko Date: Wed, 11 Jun 2025 13:55:34 +0700 Subject: [PATCH 8/8] Remove unused spaces --- execution_chain/sync/wire_protocol/broadcast.nim | 2 -- 1 file changed, 2 deletions(-) diff --git a/execution_chain/sync/wire_protocol/broadcast.nim b/execution_chain/sync/wire_protocol/broadcast.nim index 5c4565deb6..60e9e10d2f 100644 --- a/execution_chain/sync/wire_protocol/broadcast.nim +++ b/execution_chain/sync/wire_protocol/broadcast.nim @@ -97,7 +97,6 @@ iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer = for peer in wire.node.peers: if peer.isNil: continue - if peer.supports(eth69): peers.add peer @@ -107,7 +106,6 @@ iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer = for peer in peers: if peer.connectionState != ConnectionState.Connected: continue - yield peer proc seenByPeer(wire: EthWireRef,