Skip to content

Implement transactions broadcast #3214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 24 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7506ef4
Implement transactions broadcast
jangko Mar 13, 2025
f006c20
Merge branch 'master' into broadcast
jangko Apr 21, 2025
1ba79ae
Merge branch 'master' into broadcast
jangko Apr 22, 2025
c9311bc
Merge branch 'master' into broadcast
jangko Apr 23, 2025
dcf4c70
Merge branch 'master' into broadcast
jangko May 8, 2025
6a70b44
Merge branch 'master' into broadcast
jangko May 15, 2025
a6db0a7
Resolve merge conflict
jangko May 15, 2025
2be1675
Merge branch 'master' into broadcast
jangko May 20, 2025
b46e053
Avoid recompute block hash in getStatus68/69
jangko May 20, 2025
4469e13
Broadcast tx to both eth/68 or eth/69
jangko May 20, 2025
7b5b3e5
Broadcast blockRangeUpdate
jangko May 20, 2025
670f377
Disconnect peers with breach of protocol reason
jangko May 20, 2025
89c300d
Merge branch 'master' into broadcast
jangko May 28, 2025
0c37d38
Merge branch 'master' into broadcast
jangko Jun 4, 2025
8736fb7
Merge branch 'master' into broadcast
jangko Jun 10, 2025
e31f07d
Update blockRangeUpdateThunk and newPooledTransactionHashesThunk
jangko Jun 10, 2025
91014b5
Merge branch 'master' into broadcast
jangko Jun 11, 2025
26bfaef
Remove unused spaces
jangko Jun 11, 2025
dc3f463
Merge branch 'master' into broadcast
jangko Jun 17, 2025
d7c17f3
Merge branch 'master' into broadcast
jangko Jun 17, 2025
25274b3
Merge branch 'master' into broadcast
jangko Jul 3, 2025
f2c63b0
Merge branch 'master' into broadcast
jangko Jul 10, 2025
6643d1a
Merge branch 'master' into broadcast
jangko Jul 24, 2025
31790fb
Merge branch 'master' into broadcast
jangko Aug 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions execution_chain/sync/wire_protocol/broadcast.nim
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ logScope:
const
maxOperationQuota = 1000000
fullReplenishTime = chronos.seconds(5)
NUM_PEERS_REBROADCAST_QUOTIENT = 4
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
cleanupTicker = chronos.minutes(5)
# https://github.com/ethereum/devp2p/blob/b0c213de97978053a0f62c3ea4d23c0a3d8784bc/caps/eth.md#blockrangeupdate-0x11
Expand Down Expand Up @@ -67,8 +68,30 @@ func allowedOpsPerSecondCost(n: int): float =
const
txPoolProcessCost = allowedOpsPerSecondCost(1000)
hashLookupCost = allowedOpsPerSecondCost(2000)
hashingCost = allowedOpsPerSecondCost(5000)
blockRangeUpdateCost = allowedOpsPerSecondCost(20)

func add(seen: SeenObject, peer: Peer) =
seen.peers.incl(peer.id)

iterator peers(wire: EthWireRef, random: bool = false): Peer =
var peers = newSeqOfCap[Peer](wire.node.numPeers)
for peer in wire.node.peers:
if peer.isNil:
continue

if peer.supports(eth68) or peer.supports(eth69):
peers.add peer

if random:
shuffle(peers)

for peer in peers:
if peer.connectionState != ConnectionState.Connected:
continue

yield peer

iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer =
var peers = newSeqOfCap[Peer](wire.node.numPeers)
for peer in wire.node.peers(eth69):
Expand All @@ -77,11 +100,112 @@ iterator peers69OrLater(wire: EthWireRef, random: bool = false): Peer =
peers.add peer
if random:
shuffle(peers)

for peer in peers:
if peer.connectionState != ConnectionState.Connected:
continue
yield peer

proc seenByPeer(wire: EthWireRef,
packet: NewPooledTransactionHashesPacket,
peer: Peer) {.async: (raises: [CancelledError]).} =
for hash in packet.txHashes:
var seen = wire.seenTransactions.getOrDefault(hash, nil)
if seen.isNil:
seen = SeenObject(
lastSeen: getTime(),
)
seen.add peer
wire.seenTransactions[hash] = seen
else:
seen.add peer

awaitQuota(wire, hashLookupCost, "seen by peer")

proc broadcastTransactions(wire: EthWireRef,
packet: TransactionsPacket,
hashes: NewPooledTransactionHashesPacket ,
peer: Peer) {.async: (raises: [CancelledError]).} =
# This is used to avoid re-sending along newPooledTransactionHashes
# announcements/re-broadcasts

var msg = newSeqOfCap[Transaction](packet.transactions.len)
for i, hash in hashes.txHashes:
var seen = wire.seenTransactions.getOrDefault(hash, nil)
if seen.isNil:
seen = SeenObject(
lastSeen: getTime(),
)
seen.add peer
wire.seenTransactions[hash] = seen
msg.add packet.transactions[i]
elif peer.id notin seen.peers:
seen.add peer
msg.add packet.transactions[i]

awaitQuota(wire, hashLookupCost, "broadcast transactions")

try:
await peer.transactions(msg)
except EthP2PError as exc:
debug "broadcast transactions failed",
msg=exc.msg

proc prepareTxHashesAnnouncement(wire: EthWireRef, packet: TransactionsPacket):
Future[NewPooledTransactionHashesPacket]
{.async: (raises: [CancelledError]).} =
let len = packet.transactions.len
var ann = NewPooledTransactionHashesPacket(
txTypes : newSeqOfCap[byte](len),
txSizes : newSeqOfCap[uint64](len),
txHashes: newSeqOfCap[Hash32](len),
)
for tx in packet.transactions:
let (size, hash) = getEncodedLengthAndHash(tx)
ann.txTypes.add tx.txType.byte
ann.txSizes.add size.uint64
ann.txHashes.add hash

awaitQuota(wire, hashingCost, "broadcast transactions")

ann

proc broadcastTxHashes(wire: EthWireRef,
hashes: NewPooledTransactionHashesPacket,
peer: Peer) {.async: (raises: [CancelledError]).} =
let len = hashes.txHashes.len
var msg = NewPooledTransactionHashesPacket(
txTypes : newSeqOfCap[byte](len),
txSizes : newSeqOfCap[uint64](len),
txHashes: newSeqOfCap[Hash32](len),
)

template copyFrom(msg, hashes, i) =
msg.txTypes.add hashes.txTypes[i]
msg.txSizes.add hashes.txSizes[i]
msg.txHashes.add hashes.txHashes[i]

for i, hash in hashes.txHashes:
var seen = wire.seenTransactions.getOrDefault(hash, nil)
if seen.isNil:
seen = SeenObject(
lastSeen: getTime(),
)
seen.add peer
wire.seenTransactions[hash] = seen
msg.copyFrom(hashes, i)
elif peer.id notin seen.peers:
seen.add peer
msg.copyFrom(hashes, i)

awaitQuota(wire, hashLookupCost, "broadcast transactions hashes")

try:
await peer.newPooledTransactionHashes(msg.txTypes, msg.txSizes, msg.txHashes)
except EthP2PError as exc:
debug "broadcast tx hashes failed",
msg=exc.msg

proc syncerRunning*(wire: EthWireRef): bool =
# Disable transactions gossip and processing when
# the syncer is still busy
Expand Down Expand Up @@ -111,6 +235,11 @@ proc handleTransactionsBroadcast*(wire: EthWireRef,
debug "Received new transactions",
number = packet.transactions.len

# Don't rebroadcast invalid transactions
let newPacket = TransactionsPacket(
transactions: newSeqOfCap[Transaction](packet.transactions.len)
)

wire.reqisterAction("TxPool consume incoming transactions"):
for tx in packet.transactions:
if tx.txType == TxEip4844:
Expand All @@ -123,8 +252,26 @@ proc handleTransactionsBroadcast*(wire: EthWireRef,
wire.txPool.addTx(tx).isOkOr:
continue

# Only rebroadcast good transactions
newPacket.transactions.add tx
awaitQuota(wire, txPoolProcessCost, "adding into txpool")

wire.reqisterAction("Broadcast transactions or hashes"):
let hashes = await wire.prepareTxHashesAnnouncement(newPacket)
await wire.seenByPeer(hashes, peer)

let
numPeers = wire.node.numPeers
maxPeers = max(1, numPeers div NUM_PEERS_REBROADCAST_QUOTIENT)

var i = 0
for peer in wire.peers(random = true):
if i < maxPeers:
await wire.broadcastTransactions(newPacket, hashes, peer)
else:
await wire.broadcastTxHashes(hashes, peer)
inc i

proc handleTxHashesBroadcast*(wire: EthWireRef,
packet: NewPooledTransactionHashesPacket,
peer: Peer) {.async: (raises: [CancelledError]).} =
Expand Down Expand Up @@ -152,12 +299,19 @@ proc handleTxHashesBroadcast*(wire: EthWireRef,
size: uint64
txType: byte

await wire.seenByPeer(packet, peer)

let
numTx = packet.txHashes.len

var
i = 0
map: Table[Hash32, SizeType]
hashes = NewPooledTransactionHashesPacket(
txTypes : newSeqOfCap[byte](numTx),
txSizes : newSeqOfCap[uint64](numTx),
txHashes: newSeqOfCap[Hash32](numTx),
)

while i < numTx:
var
Expand All @@ -178,6 +332,10 @@ proc handleTxHashesBroadcast*(wire: EthWireRef,
size: size,
txType: packet.txTypes[i],
)
else:
hashes.txTypes.add packet.txTypes[i]
hashes.txSizes.add size
hashes.txHashes.add txHash

awaitQuota(wire, hashLookupCost, "check transaction exists in pool")
inc i
Expand Down Expand Up @@ -245,8 +403,15 @@ proc handleTxHashesBroadcast*(wire: EthWireRef,
wire.txPool.addTx(tx).isOkOr:
continue

hashes.txTypes.add tx.tx.txType.byte
hashes.txSizes.add size.uint64
hashes.txHashes.add hash

awaitQuota(wire, txPoolProcessCost, "broadcast transactions hashes")

for peer in wire.peers:
await wire.broadcastTxHashes(hashes, peer)

proc tickerLoop*(wire: EthWireRef) {.async: (raises: [CancelledError]).} =
while true:
# Create or replenish timer
Expand Down
Loading