Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 21 additions & 86 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package fetcher

import (
"errors"
"fmt"
"math"
mrand "math/rand"
"sort"
Expand Down Expand Up @@ -104,7 +103,6 @@ var (
txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil)
txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil)
txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil)
txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to keep this metric somehow

Copy link
Contributor Author

@healthykim healthykim Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the current change resolves this comment but it was not. I made this unresolved again. I'll take a look

txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil)
txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil)
)
Expand Down Expand Up @@ -187,17 +185,15 @@ type TxFetcher struct {
waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist
waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection)

// Stage 2: Queue of transactions that waiting to be allocated to some peer
// to be retrieved directly.
// Stage 2: Transactions that are either waiting to be allocated
// to a peer or are already being fetched.
announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer
announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash

// Stage 3: Set of transactions currently being retrieved, some which may be
// fulfilled and some rescheduled. Note, this step shares 'announces' from the
// previous stage to avoid having to duplicate (need it for DoS checks).
fetching map[common.Hash]string // Transaction set currently being retrieved
requests map[string]*txRequest // In-flight transaction retrievals
alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails
fetching map[common.Hash]string // Transaction set currently being retrieved
requests map[string]*txRequest // In-flight transaction retrievals

// Callbacks
hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
Expand Down Expand Up @@ -231,10 +227,8 @@ func NewTxFetcherForTests(
waittime: make(map[common.Hash]mclock.AbsTime),
waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq),
announces: make(map[string]map[common.Hash]*txMetadataWithSeq),
announced: make(map[common.Hash]map[string]struct{}),
fetching: make(map[common.Hash]string),
requests: make(map[string]*txRequest),
alternates: make(map[common.Hash]map[string]struct{}),
underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize),
hasTx: hasTx,
addTxs: addTxs,
Expand Down Expand Up @@ -457,33 +451,9 @@ func (f *TxFetcher) loop() {
}
)
for i, hash := range ann.hashes {
// If the transaction is already downloading, add it to the list
// of possible alternates (in case the current retrieval fails) and
// also account it for the peer.
if f.alternates[hash] != nil {
f.alternates[hash][ann.origin] = struct{}{}

// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &txMetadataWithSeq{
txMetadata: ann.metas[i],
seq: nextSeq(),
}
} else {
f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{
hash: {
txMetadata: ann.metas[i],
seq: nextSeq(),
},
}
}
continue
}
// If the transaction is not downloading, but is already queued
// from a different peer, track it for the new peer too.
if f.announced[hash] != nil {
f.announced[hash][ann.origin] = struct{}{}

// If the transaction is already downloading or queued from a different peer,
// track it for the new peer
if f.announced(hash) {
// Stage 2 and 3 share the set of origins per tx
if announces := f.announces[ann.origin]; announces != nil {
announces[hash] = &txMetadataWithSeq{
Expand Down Expand Up @@ -564,15 +534,14 @@ func (f *TxFetcher) loop() {

case <-waitTrigger:
// At least one transaction's waiting time ran out, push all expired
// ones into the retrieval queues
// ones into the announces
actives := make(map[string]struct{})
for hash, instance := range f.waittime {
if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout {
// Transaction expired without propagation, schedule for retrieval
if f.announced[hash] != nil {
panic("announce tracker already contains waitlist item")
if f.announced(hash) {
panic("announced tracker already contains waitlist item")
}
f.announced[hash] = f.waitlist[hash]
for peer := range f.waitlist[hash] {
if announces := f.announces[peer]; announces != nil {
announces[hash] = f.waitslots[peer][hash]
Expand Down Expand Up @@ -616,18 +585,7 @@ func (f *TxFetcher) loop() {
}
}
// Move the delivery back from fetching to queued
if _, ok := f.announced[hash]; ok {
panic("announced tracker already contains alternate item")
}
if f.alternates[hash] != nil { // nil if tx was broadcast during fetch
f.announced[hash] = f.alternates[hash]
}
delete(f.announced[hash], peer)
if len(f.announced[hash]) == 0 {
delete(f.announced, hash)
}
delete(f.announces[peer], hash)
delete(f.alternates, hash)
delete(f.fetching, hash)
}
if len(f.announces[peer]) == 0 {
Expand Down Expand Up @@ -699,8 +657,6 @@ func (f *TxFetcher) loop() {
delete(f.announces, peer)
}
}
delete(f.announced, hash)
delete(f.alternates, hash)

// If a transaction currently being fetched from a different
// origin was delivered (delivery stolen), mark it so the
Expand Down Expand Up @@ -752,20 +708,12 @@ func (f *TxFetcher) loop() {
}
if _, ok := delivered[hash]; !ok {
if i < cutoff {
delete(f.alternates[hash], delivery.origin)
delete(f.announces[delivery.origin], hash)
if len(f.announces[delivery.origin]) == 0 {
delete(f.announces, delivery.origin)
}
}
if len(f.alternates[hash]) > 0 {
if _, ok := f.announced[hash]; ok {
panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash]))
}
f.announced[hash] = f.alternates[hash]
}
}
delete(f.alternates, hash)
delete(f.fetching, hash)
}
// Something was delivered, try to reschedule requests
Expand Down Expand Up @@ -797,28 +745,13 @@ func (f *TxFetcher) loop() {
continue
}
}
// Undelivered hash, reschedule if there's an alternative origin available
delete(f.alternates[hash], drop.peer)
if len(f.alternates[hash]) == 0 {
delete(f.alternates, hash)
} else {
f.announced[hash] = f.alternates[hash]
delete(f.alternates, hash)
}
delete(f.fetching, hash)
}
delete(f.requests, drop.peer)
}
// Clean up general announcement tracking
if _, ok := f.announces[drop.peer]; ok {
for hash := range f.announces[drop.peer] {
delete(f.announced[hash], drop.peer)
if len(f.announced[hash]) == 0 {
delete(f.announced, hash)
}
}
delete(f.announces, drop.peer)
}
delete(f.announces, drop.peer)

// If a request was cancelled, check if anything needs to be rescheduled
if request != nil {
f.scheduleFetches(timeoutTimer, timeoutTrigger, nil)
Expand All @@ -832,7 +765,6 @@ func (f *TxFetcher) loop() {
txFetcherWaitingPeers.Update(int64(len(f.waitslots)))
txFetcherWaitingHashes.Update(int64(len(f.waitlist)))
txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests)))
txFetcherQueueingHashes.Update(int64(len(f.announced)))
txFetcherFetchingPeers.Update(int64(len(f.requests)))
txFetcherFetchingHashes.Update(int64(len(f.fetching)))

Expand Down Expand Up @@ -942,12 +874,6 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{},
// Mark the hash as fetching and stash away possible alternates
f.fetching[hash] = peer

if _, ok := f.alternates[hash]; ok {
panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash]))
}
f.alternates[hash] = f.announced[hash]
delete(f.announced, hash)

// Accumulate the hash and stop if the limit was reached
hashes = append(hashes, hash)
if len(hashes) >= maxTxRetrievals {
Expand Down Expand Up @@ -1034,3 +960,12 @@ func rotateStrings(slice []string, n int) {
slice[i] = orig[(i+n)%len(orig)]
}
}

func (f *TxFetcher) announced(hash common.Hash) bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rely on the announces map[string]map[common.Hash]*txMetadataWithSeq for determining whether the tx is announced or not.

You are right that the map iteration should be fast enough, especially the size of peer set is very limited. But we should also consider that this function will be invoked for every tx announcement.

However, another downside is we lose the ability to know how many transactions are in the announced status.

An alternative approach will be maintaining the map like

announced map[common.Hash]struct{}.

for _, hashes := range f.announces {
if hashes[hash] != nil {
return true
}
}
return false
}
57 changes: 3 additions & 54 deletions eth/fetcher/tx_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) {
{common.Hash{0x06}, types.LegacyTxType, 666},
},
}),
// Step 14
isScheduled{
tracking: map[string][]announce{
"A": {
Expand Down Expand Up @@ -2017,6 +2018,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
t.Errorf("step %d: peer %s extra in announces", i, peer)
}
}

// Check that all announces required to be fetching are in the
// appropriate sets
for peer, hashes := range step.fetching {
Expand Down Expand Up @@ -2062,31 +2064,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
t.Errorf("step %d: hash %x extra in fetching", i, hash)
}
}
for _, hashes := range step.fetching {
for _, hash := range hashes {
alternates := fetcher.alternates[hash]
if alternates == nil {
t.Errorf("step %d: hash %x missing from alternates", i, hash)
continue
}
for peer := range alternates {
if _, ok := fetcher.announces[peer]; !ok {
t.Errorf("step %d: peer %s extra in alternates", i, peer)
continue
}
if _, ok := fetcher.announces[peer][hash]; !ok {
t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer)
continue
}
}
for p := range fetcher.announced[hash] {
if _, ok := alternates[p]; !ok {
t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p)
continue
}
}
}
}
for peer, hashes := range step.dangling {
request := fetcher.requests[peer]
if request == nil {
Expand All @@ -2104,34 +2081,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
}
}
}
// Check that all transaction announces that are scheduled for
// retrieval but not actively being downloaded are tracked only
// in the stage 2 `announced` map.
var queued []common.Hash
for _, announces := range step.tracking {
for _, ann := range announces {
var found bool
for _, hs := range step.fetching {
if slices.Contains(hs, ann.hash) {
found = true
break
}
}
if !found {
queued = append(queued, ann.hash)
}
}
}
for _, hash := range queued {
if _, ok := fetcher.announced[hash]; !ok {
t.Errorf("step %d: hash %x missing from announced", i, hash)
}
}
for hash := range fetcher.announced {
if !slices.Contains(queued, hash) {
t.Errorf("step %d: hash %x extra in announced", i, hash)
}
}

case isUnderpriced:
if fetcher.underpriced.Len() != int(step) {
Expand All @@ -2144,7 +2093,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) {
// After every step, cross validate the internal uniqueness invariants
// between stage one and stage two.
for hash := range fetcher.waittime {
if _, ok := fetcher.announced[hash]; ok {
if fetcher.announced(hash) {
t.Errorf("step %d: hash %s present in both stage 1 and 2", i, hash)
}
}
Expand Down