From cc985aad1b98a3953b70855f16da4204ea85cb2d Mon Sep 17 00:00:00 2001 From: healthykim Date: Mon, 29 Sep 2025 15:30:04 +0900 Subject: [PATCH 1/3] refactor: remove alternates map --- eth/fetcher/tx_fetcher.go | 75 +----- eth/fetcher/tx_fetcher_test.go | 433 +++++++++++++++++---------------- 2 files changed, 238 insertions(+), 270 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 3e050320e90..34fddcccbd4 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -18,7 +18,6 @@ package fetcher import ( "errors" - "fmt" "math" mrand "math/rand" "sort" @@ -195,9 +194,8 @@ type TxFetcher struct { // Stage 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the // previous stage to avoid having to duplicate (need it for DoS checks). - fetching map[common.Hash]string // Transaction set currently being retrieved - requests map[string]*txRequest // In-flight transaction retrievals - alternates map[common.Hash]map[string]struct{} // In-flight transaction alternate origins if retrieval fails + fetching map[common.Hash]string // Transaction set currently being retrieved + requests map[string]*txRequest // In-flight transaction retrievals // Callbacks hasTx func(common.Hash) bool // Retrieves a tx from the local txpool @@ -234,7 +232,6 @@ func NewTxFetcherForTests( announced: make(map[common.Hash]map[string]struct{}), fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), - alternates: make(map[common.Hash]map[string]struct{}), underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), hasTx: hasTx, addTxs: addTxs, @@ -457,12 +454,9 @@ func (f *TxFetcher) loop() { } ) for i, hash := range ann.hashes { - // If the transaction is already downloading, add it to the list - // of possible alternates (in case the current retrieval fails) and - // also account it for the peer. - if f.alternates[hash] != nil { - f.alternates[hash][ann.origin] = struct{}{} - + // If the transaction is already downloading or queued from a different peer, + // track it for the new peer + if _, ok := f.announced[hash]; ok { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { announces[hash] = &txMetadataWithSeq{ @@ -477,27 +471,8 @@ func (f *TxFetcher) loop() { }, } } - continue - } - // If the transaction is not downloading, but is already queued - // from a different peer, track it for the new peer too. - if f.announced[hash] != nil { f.announced[hash][ann.origin] = struct{}{} - // Stage 2 and 3 share the set of origins per tx - if announces := f.announces[ann.origin]; announces != nil { - announces[hash] = &txMetadataWithSeq{ - txMetadata: ann.metas[i], - seq: nextSeq(), - } - } else { - f.announces[ann.origin] = map[common.Hash]*txMetadataWithSeq{ - hash: { - txMetadata: ann.metas[i], - seq: nextSeq(), - }, - } - } continue } // If the transaction is already known to the fetcher, but not @@ -564,13 +539,13 @@ func (f *TxFetcher) loop() { case <-waitTrigger: // At least one transaction's waiting time ran out, push all expired - // ones into the retrieval queues + // ones into the announces actives := make(map[string]struct{}) for hash, instance := range f.waittime { if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout { // Transaction expired without propagation, schedule for retrieval - if f.announced[hash] != nil { - panic("announce tracker already contains waitlist item") + if _, ok := f.announced[hash]; ok { + panic("announced tracker already contains waitlist item") } f.announced[hash] = f.waitlist[hash] for peer := range f.waitlist[hash] { @@ -616,18 +591,11 @@ func (f *TxFetcher) loop() { } } // Move the delivery back from fetching to queued - if _, ok := f.announced[hash]; ok { - panic("announced tracker already contains alternate item") - } - if f.alternates[hash] != nil { // nil if tx was broadcast during fetch - f.announced[hash] = f.alternates[hash] - } delete(f.announced[hash], peer) if len(f.announced[hash]) == 0 { delete(f.announced, hash) } delete(f.announces[peer], hash) - delete(f.alternates, hash) delete(f.fetching, hash) } if len(f.announces[peer]) == 0 { @@ -700,7 +668,6 @@ func (f *TxFetcher) loop() { } } delete(f.announced, hash) - delete(f.alternates, hash) // If a transaction currently being fetched from a different // origin was delivered (delivery stolen), mark it so the @@ -752,20 +719,16 @@ func (f *TxFetcher) loop() { } if _, ok := delivered[hash]; !ok { if i < cutoff { - delete(f.alternates[hash], delivery.origin) + delete(f.announced[hash], delivery.origin) + if len(f.announced[hash]) == 0 { + delete(f.announced, hash) + } delete(f.announces[delivery.origin], hash) if len(f.announces[delivery.origin]) == 0 { delete(f.announces, delivery.origin) } } - if len(f.alternates[hash]) > 0 { - if _, ok := f.announced[hash]; ok { - panic(fmt.Sprintf("announced tracker already contains alternate item: %v", f.announced[hash])) - } - f.announced[hash] = f.alternates[hash] - } } - delete(f.alternates, hash) delete(f.fetching, hash) } // Something was delivered, try to reschedule requests @@ -797,14 +760,6 @@ func (f *TxFetcher) loop() { continue } } - // Undelivered hash, reschedule if there's an alternative origin available - delete(f.alternates[hash], drop.peer) - if len(f.alternates[hash]) == 0 { - delete(f.alternates, hash) - } else { - f.announced[hash] = f.alternates[hash] - delete(f.alternates, hash) - } delete(f.fetching, hash) } delete(f.requests, drop.peer) @@ -942,12 +897,6 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, // Mark the hash as fetching and stash away possible alternates f.fetching[hash] = peer - if _, ok := f.alternates[hash]; ok { - panic(fmt.Sprintf("alternate tracker already contains fetching item: %v", f.alternates[hash])) - } - f.alternates[hash] = f.announced[hash] - delete(f.announced, hash) - // Accumulate the hash and stop if the limit was reached hashes = append(hashes, hash) if len(hashes) >= maxTxRetrievals { diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 0f05a1c995c..0ed6006eb5a 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -108,183 +108,183 @@ func TestTransactionFetcherWaiting(t *testing.T) { {common.Hash{0x02}, types.LegacyTxType, 222}, }, }), - // Announce from a new peer to check that no overwrite happens - doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}}, - isWaiting(map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - }), - // Announce clashing hashes but unique new peer - doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}}, - isWaiting(map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - }), - // Announce existing and clashing hashes from existing peer. Clashes - // should not overwrite previous announcements. - doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}}, - isWaiting(map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x05}, types.LegacyTxType, 555}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - }), - // Announce clashing hashes with conflicting metadata. Somebody will - // be in the wrong, but we don't know yet who. - doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}}, - isWaiting(map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x05}, types.LegacyTxType, 555}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "D": { - {common.Hash{0x01}, types.LegacyTxType, 999}, - {common.Hash{0x02}, types.BlobTxType, 222}, - }, - }), - isScheduled{tracking: nil, fetching: nil}, - - // Wait for the arrival timeout which should move all expired items - // from the wait list to the scheduler - doWait{time: txArriveTimeout, step: true}, - isWaiting(nil), - isScheduled{ - tracking: map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x05}, types.LegacyTxType, 555}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "D": { - {common.Hash{0x01}, types.LegacyTxType, 999}, - {common.Hash{0x02}, types.BlobTxType, 222}, - }, - }, - fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer - "A": {{0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - "D": {{0x02}}, - }, - }, - // Queue up a non-fetchable transaction and then trigger it with a new - // peer (weird case to test 1 line in the fetcher) - doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, - isWaiting(map[string][]announce{ - "C": { - {common.Hash{0x06}, types.LegacyTxType, 666}, - {common.Hash{0x07}, types.LegacyTxType, 777}, - }, - }), - doWait{time: txArriveTimeout, step: true}, - isScheduled{ - tracking: map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x05}, types.LegacyTxType, 555}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - {common.Hash{0x06}, types.LegacyTxType, 666}, - {common.Hash{0x07}, types.LegacyTxType, 777}, - }, - "D": { - {common.Hash{0x01}, types.LegacyTxType, 999}, - {common.Hash{0x02}, types.BlobTxType, 222}, - }, - }, - fetching: map[string][]common.Hash{ - "A": {{0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - "D": {{0x02}}, - }, - }, - doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, - isScheduled{ - tracking: map[string][]announce{ - "A": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x02}, types.LegacyTxType, 222}, - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x05}, types.LegacyTxType, 555}, - }, - "B": { - {common.Hash{0x03}, types.LegacyTxType, 333}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - }, - "C": { - {common.Hash{0x01}, types.LegacyTxType, 111}, - {common.Hash{0x04}, types.LegacyTxType, 444}, - {common.Hash{0x06}, types.LegacyTxType, 666}, - {common.Hash{0x07}, types.LegacyTxType, 777}, - }, - "D": { - {common.Hash{0x01}, types.LegacyTxType, 999}, - {common.Hash{0x02}, types.BlobTxType, 222}, - }, - "E": { - {common.Hash{0x06}, types.LegacyTxType, 666}, - {common.Hash{0x07}, types.LegacyTxType, 777}, - }, - }, - fetching: map[string][]common.Hash{ - "A": {{0x03}, {0x05}}, - "C": {{0x01}, {0x04}}, - "D": {{0x02}}, - "E": {{0x06}, {0x07}}, - }, - }, + // // Announce from a new peer to check that no overwrite happens + // doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}}, + // isWaiting(map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // }), + // // Announce clashing hashes but unique new peer + // doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}}, + // isWaiting(map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "C": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // }), + // // Announce existing and clashing hashes from existing peer. Clashes + // // should not overwrite previous announcements. + // doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}}, + // isWaiting(map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x05}, types.LegacyTxType, 555}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "C": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // }), + // // Announce clashing hashes with conflicting metadata. Somebody will + // // be in the wrong, but we don't know yet who. + // doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}}, + // isWaiting(map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x05}, types.LegacyTxType, 555}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "C": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "D": { + // {common.Hash{0x01}, types.LegacyTxType, 999}, + // {common.Hash{0x02}, types.BlobTxType, 222}, + // }, + // }), + // isScheduled{tracking: nil, fetching: nil}, + + // // Wait for the arrival timeout which should move all expired items + // // from the wait list to the scheduler + // doWait{time: txArriveTimeout, step: true}, + // isWaiting(nil), + // isScheduled{ + // tracking: map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x05}, types.LegacyTxType, 555}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "C": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "D": { + // {common.Hash{0x01}, types.LegacyTxType, 999}, + // {common.Hash{0x02}, types.BlobTxType, 222}, + // }, + // }, + // fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer + // "A": {{0x03}, {0x05}}, + // "C": {{0x01}, {0x04}}, + // "D": {{0x02}}, + // }, + // }, + // // Queue up a non-fetchable transaction and then trigger it with a new + // // peer (weird case to test 1 line in the fetcher) + // doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, + // isWaiting(map[string][]announce{ + // "C": { + // {common.Hash{0x06}, types.LegacyTxType, 666}, + // {common.Hash{0x07}, types.LegacyTxType, 777}, + // }, + // }), + // doWait{time: txArriveTimeout, step: true}, + // isScheduled{ + // tracking: map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x05}, types.LegacyTxType, 555}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "C": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // {common.Hash{0x06}, types.LegacyTxType, 666}, + // {common.Hash{0x07}, types.LegacyTxType, 777}, + // }, + // "D": { + // {common.Hash{0x01}, types.LegacyTxType, 999}, + // {common.Hash{0x02}, types.BlobTxType, 222}, + // }, + // }, + // fetching: map[string][]common.Hash{ + // "A": {{0x03}, {0x05}}, + // "C": {{0x01}, {0x04}}, + // "D": {{0x02}}, + // }, + // }, + // doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, + // isScheduled{ + // tracking: map[string][]announce{ + // "A": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x02}, types.LegacyTxType, 222}, + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x05}, types.LegacyTxType, 555}, + // }, + // "B": { + // {common.Hash{0x03}, types.LegacyTxType, 333}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // }, + // "C": { + // {common.Hash{0x01}, types.LegacyTxType, 111}, + // {common.Hash{0x04}, types.LegacyTxType, 444}, + // {common.Hash{0x06}, types.LegacyTxType, 666}, + // {common.Hash{0x07}, types.LegacyTxType, 777}, + // }, + // "D": { + // {common.Hash{0x01}, types.LegacyTxType, 999}, + // {common.Hash{0x02}, types.BlobTxType, 222}, + // }, + // "E": { + // {common.Hash{0x06}, types.LegacyTxType, 666}, + // {common.Hash{0x07}, types.LegacyTxType, 777}, + // }, + // }, + // fetching: map[string][]common.Hash{ + // "A": {{0x03}, {0x05}}, + // "C": {{0x01}, {0x04}}, + // "D": {{0x02}}, + // "E": {{0x06}, {0x07}}, + // }, + // }, }, }) } @@ -459,6 +459,7 @@ func TestTransactionFetcherSingletonRequesting(t *testing.T) { {common.Hash{0x06}, types.LegacyTxType, 666}, }, }), + // Step 14 isScheduled{ tracking: map[string][]announce{ "A": { @@ -2004,6 +2005,10 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size) } } + // Check that all sheduled announces are recorded on announced map + if _, ok := fetcher.announced[ann.hash][peer]; !ok { + t.Errorf("step %d, peer %s: hash %x missing from announced", i, peer, ann.hash) + } } for hash, meta := range scheduled { ann := announce{hash: hash, kind: meta.kind, size: meta.size} @@ -2017,6 +2022,20 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: peer %s extra in announces", i, peer) } } + for hash, peers := range fetcher.announced { + if len(peers) == 0 { + t.Errorf("step %d, hash %x: empty peerset in announced", i, hash) + } + for peer := range peers { + if _, ok := step.tracking[peer]; !ok { + t.Errorf("step %d: peer %s extra in announced", i, peer) + } + if !containsHashInAnnounces(step.tracking[peer], hash) { + t.Errorf("step %d, peer %s: hash %x extra in announced", i, peer, hash) + } + } + } + // Check that all announces required to be fetching are in the // appropriate sets for peer, hashes := range step.fetching { @@ -2062,31 +2081,31 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: hash %x extra in fetching", i, hash) } } - for _, hashes := range step.fetching { - for _, hash := range hashes { - alternates := fetcher.alternates[hash] - if alternates == nil { - t.Errorf("step %d: hash %x missing from alternates", i, hash) - continue - } - for peer := range alternates { - if _, ok := fetcher.announces[peer]; !ok { - t.Errorf("step %d: peer %s extra in alternates", i, peer) - continue - } - if _, ok := fetcher.announces[peer][hash]; !ok { - t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer) - continue - } - } - for p := range fetcher.announced[hash] { - if _, ok := alternates[p]; !ok { - t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p) - continue - } - } - } - } + // for _, hashes := range step.fetching { + // for _, hash := range hashes { + // alternates := fetcher.alternates[hash] + // if alternates == nil { + // t.Errorf("step %d: hash %x missing from alternates", i, hash) + // continue + // } + // for peer := range alternates { + // if _, ok := fetcher.announces[peer]; !ok { + // t.Errorf("step %d: peer %s extra in alternates", i, peer) + // continue + // } + // if _, ok := fetcher.announces[peer][hash]; !ok { + // t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer) + // continue + // } + // } + // for p := range fetcher.announced[hash] { + // if _, ok := alternates[p]; !ok { + // t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p) + // continue + // } + // } + // } + // } for peer, hashes := range step.dangling { request := fetcher.requests[peer] if request == nil { @@ -2127,11 +2146,11 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: hash %x missing from announced", i, hash) } } - for hash := range fetcher.announced { - if !slices.Contains(queued, hash) { - t.Errorf("step %d: hash %x extra in announced", i, hash) - } - } + // for hash := range fetcher.announced { + // if !slices.Contains(queued, hash) { + // t.Errorf("step %d: hash %x extra in announced", i, hash) + // } + // } case isUnderpriced: if fetcher.underpriced.Len() != int(step) { From 1c813099d414ba7cc6bb7d155d0602185b366513 Mon Sep 17 00:00:00 2001 From: healthykim Date: Mon, 29 Sep 2025 15:42:23 +0900 Subject: [PATCH 2/3] refactor: remove announced map --- eth/fetcher/tx_fetcher.go | 44 +++++++-------------- eth/fetcher/tx_fetcher_test.go | 72 +--------------------------------- 2 files changed, 16 insertions(+), 100 deletions(-) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 34fddcccbd4..2622412eb62 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -103,7 +103,6 @@ var ( txFetcherWaitingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/peers", nil) txFetcherWaitingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/waiting/hashes", nil) txFetcherQueueingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/peers", nil) - txFetcherQueueingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/queueing/hashes", nil) txFetcherFetchingPeers = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/peers", nil) txFetcherFetchingHashes = metrics.NewRegisteredGauge("eth/fetcher/transaction/fetching/hashes", nil) ) @@ -186,10 +185,9 @@ type TxFetcher struct { waittime map[common.Hash]mclock.AbsTime // Timestamps when transactions were added to the waitlist waitslots map[string]map[common.Hash]*txMetadataWithSeq // Waiting announcements grouped by peer (DoS protection) - // Stage 2: Queue of transactions that waiting to be allocated to some peer - // to be retrieved directly. + // Stage 2: Transactions that are either waiting to be allocated + // to a peer or are already being fetched. announces map[string]map[common.Hash]*txMetadataWithSeq // Set of announced transactions, grouped by origin peer - announced map[common.Hash]map[string]struct{} // Set of download locations, grouped by transaction hash // Stage 3: Set of transactions currently being retrieved, some which may be // fulfilled and some rescheduled. Note, this step shares 'announces' from the @@ -229,7 +227,6 @@ func NewTxFetcherForTests( waittime: make(map[common.Hash]mclock.AbsTime), waitslots: make(map[string]map[common.Hash]*txMetadataWithSeq), announces: make(map[string]map[common.Hash]*txMetadataWithSeq), - announced: make(map[common.Hash]map[string]struct{}), fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), @@ -456,7 +453,7 @@ func (f *TxFetcher) loop() { for i, hash := range ann.hashes { // If the transaction is already downloading or queued from a different peer, // track it for the new peer - if _, ok := f.announced[hash]; ok { + if f.announced(hash) { // Stage 2 and 3 share the set of origins per tx if announces := f.announces[ann.origin]; announces != nil { announces[hash] = &txMetadataWithSeq{ @@ -471,8 +468,6 @@ func (f *TxFetcher) loop() { }, } } - f.announced[hash][ann.origin] = struct{}{} - continue } // If the transaction is already known to the fetcher, but not @@ -544,10 +539,9 @@ func (f *TxFetcher) loop() { for hash, instance := range f.waittime { if time.Duration(f.clock.Now()-instance)+txGatherSlack > txArriveTimeout { // Transaction expired without propagation, schedule for retrieval - if _, ok := f.announced[hash]; ok { + if f.announced(hash) { panic("announced tracker already contains waitlist item") } - f.announced[hash] = f.waitlist[hash] for peer := range f.waitlist[hash] { if announces := f.announces[peer]; announces != nil { announces[hash] = f.waitslots[peer][hash] @@ -591,10 +585,6 @@ func (f *TxFetcher) loop() { } } // Move the delivery back from fetching to queued - delete(f.announced[hash], peer) - if len(f.announced[hash]) == 0 { - delete(f.announced, hash) - } delete(f.announces[peer], hash) delete(f.fetching, hash) } @@ -667,7 +657,6 @@ func (f *TxFetcher) loop() { delete(f.announces, peer) } } - delete(f.announced, hash) // If a transaction currently being fetched from a different // origin was delivered (delivery stolen), mark it so the @@ -719,10 +708,6 @@ func (f *TxFetcher) loop() { } if _, ok := delivered[hash]; !ok { if i < cutoff { - delete(f.announced[hash], delivery.origin) - if len(f.announced[hash]) == 0 { - delete(f.announced, hash) - } delete(f.announces[delivery.origin], hash) if len(f.announces[delivery.origin]) == 0 { delete(f.announces, delivery.origin) @@ -765,15 +750,8 @@ func (f *TxFetcher) loop() { delete(f.requests, drop.peer) } // Clean up general announcement tracking - if _, ok := f.announces[drop.peer]; ok { - for hash := range f.announces[drop.peer] { - delete(f.announced[hash], drop.peer) - if len(f.announced[hash]) == 0 { - delete(f.announced, hash) - } - } - delete(f.announces, drop.peer) - } + delete(f.announces, drop.peer) + // If a request was cancelled, check if anything needs to be rescheduled if request != nil { f.scheduleFetches(timeoutTimer, timeoutTrigger, nil) @@ -787,7 +765,6 @@ func (f *TxFetcher) loop() { txFetcherWaitingPeers.Update(int64(len(f.waitslots))) txFetcherWaitingHashes.Update(int64(len(f.waitlist))) txFetcherQueueingPeers.Update(int64(len(f.announces) - len(f.requests))) - txFetcherQueueingHashes.Update(int64(len(f.announced))) txFetcherFetchingPeers.Update(int64(len(f.requests))) txFetcherFetchingHashes.Update(int64(len(f.fetching))) @@ -983,3 +960,12 @@ func rotateStrings(slice []string, n int) { slice[i] = orig[(i+n)%len(orig)] } } + +func (f *TxFetcher) announced(hash common.Hash) bool { + for _, hashes := range f.announces { + if hashes[hash] != nil { + return true + } + } + return false +} diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 0ed6006eb5a..38947130d13 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -2005,10 +2005,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d, peer %s, hash %x: announce metadata mismatch: want %v, have %v/%v", i, peer, ann.hash, meta, ann.kind, ann.size) } } - // Check that all sheduled announces are recorded on announced map - if _, ok := fetcher.announced[ann.hash][peer]; !ok { - t.Errorf("step %d, peer %s: hash %x missing from announced", i, peer, ann.hash) - } } for hash, meta := range scheduled { ann := announce{hash: hash, kind: meta.kind, size: meta.size} @@ -2022,19 +2018,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: peer %s extra in announces", i, peer) } } - for hash, peers := range fetcher.announced { - if len(peers) == 0 { - t.Errorf("step %d, hash %x: empty peerset in announced", i, hash) - } - for peer := range peers { - if _, ok := step.tracking[peer]; !ok { - t.Errorf("step %d: peer %s extra in announced", i, peer) - } - if !containsHashInAnnounces(step.tracking[peer], hash) { - t.Errorf("step %d, peer %s: hash %x extra in announced", i, peer, hash) - } - } - } // Check that all announces required to be fetching are in the // appropriate sets @@ -2081,31 +2064,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { t.Errorf("step %d: hash %x extra in fetching", i, hash) } } - // for _, hashes := range step.fetching { - // for _, hash := range hashes { - // alternates := fetcher.alternates[hash] - // if alternates == nil { - // t.Errorf("step %d: hash %x missing from alternates", i, hash) - // continue - // } - // for peer := range alternates { - // if _, ok := fetcher.announces[peer]; !ok { - // t.Errorf("step %d: peer %s extra in alternates", i, peer) - // continue - // } - // if _, ok := fetcher.announces[peer][hash]; !ok { - // t.Errorf("step %d, peer %s: hash %x extra in alternates", i, hash, peer) - // continue - // } - // } - // for p := range fetcher.announced[hash] { - // if _, ok := alternates[p]; !ok { - // t.Errorf("step %d, hash %x: peer %s missing from alternates", i, hash, p) - // continue - // } - // } - // } - // } for peer, hashes := range step.dangling { request := fetcher.requests[peer] if request == nil { @@ -2123,34 +2081,6 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } } } - // Check that all transaction announces that are scheduled for - // retrieval but not actively being downloaded are tracked only - // in the stage 2 `announced` map. - var queued []common.Hash - for _, announces := range step.tracking { - for _, ann := range announces { - var found bool - for _, hs := range step.fetching { - if slices.Contains(hs, ann.hash) { - found = true - break - } - } - if !found { - queued = append(queued, ann.hash) - } - } - } - for _, hash := range queued { - if _, ok := fetcher.announced[hash]; !ok { - t.Errorf("step %d: hash %x missing from announced", i, hash) - } - } - // for hash := range fetcher.announced { - // if !slices.Contains(queued, hash) { - // t.Errorf("step %d: hash %x extra in announced", i, hash) - // } - // } case isUnderpriced: if fetcher.underpriced.Len() != int(step) { @@ -2163,7 +2093,7 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { // After every step, cross validate the internal uniqueness invariants // between stage one and stage two. for hash := range fetcher.waittime { - if _, ok := fetcher.announced[hash]; ok { + if fetcher.announced(hash) { t.Errorf("step %d: hash %s present in both stage 1 and 2", i, hash) } } From e1920bfb2a010bc85de28245ea14f77fbb21f9fb Mon Sep 17 00:00:00 2001 From: healthykim Date: Mon, 29 Sep 2025 15:52:35 +0900 Subject: [PATCH 3/3] chore: uncomment test code --- eth/fetcher/tx_fetcher_test.go | 354 ++++++++++++++++----------------- 1 file changed, 177 insertions(+), 177 deletions(-) diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 38947130d13..abea46302fd 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -108,183 +108,183 @@ func TestTransactionFetcherWaiting(t *testing.T) { {common.Hash{0x02}, types.LegacyTxType, 222}, }, }), - // // Announce from a new peer to check that no overwrite happens - // doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}}, - // isWaiting(map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // }), - // // Announce clashing hashes but unique new peer - // doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}}, - // isWaiting(map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "C": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // }), - // // Announce existing and clashing hashes from existing peer. Clashes - // // should not overwrite previous announcements. - // doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}}, - // isWaiting(map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x05}, types.LegacyTxType, 555}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "C": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // }), - // // Announce clashing hashes with conflicting metadata. Somebody will - // // be in the wrong, but we don't know yet who. - // doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}}, - // isWaiting(map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x05}, types.LegacyTxType, 555}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "C": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "D": { - // {common.Hash{0x01}, types.LegacyTxType, 999}, - // {common.Hash{0x02}, types.BlobTxType, 222}, - // }, - // }), - // isScheduled{tracking: nil, fetching: nil}, - - // // Wait for the arrival timeout which should move all expired items - // // from the wait list to the scheduler - // doWait{time: txArriveTimeout, step: true}, - // isWaiting(nil), - // isScheduled{ - // tracking: map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x05}, types.LegacyTxType, 555}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "C": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "D": { - // {common.Hash{0x01}, types.LegacyTxType, 999}, - // {common.Hash{0x02}, types.BlobTxType, 222}, - // }, - // }, - // fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer - // "A": {{0x03}, {0x05}}, - // "C": {{0x01}, {0x04}}, - // "D": {{0x02}}, - // }, - // }, - // // Queue up a non-fetchable transaction and then trigger it with a new - // // peer (weird case to test 1 line in the fetcher) - // doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, - // isWaiting(map[string][]announce{ - // "C": { - // {common.Hash{0x06}, types.LegacyTxType, 666}, - // {common.Hash{0x07}, types.LegacyTxType, 777}, - // }, - // }), - // doWait{time: txArriveTimeout, step: true}, - // isScheduled{ - // tracking: map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x05}, types.LegacyTxType, 555}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "C": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // {common.Hash{0x06}, types.LegacyTxType, 666}, - // {common.Hash{0x07}, types.LegacyTxType, 777}, - // }, - // "D": { - // {common.Hash{0x01}, types.LegacyTxType, 999}, - // {common.Hash{0x02}, types.BlobTxType, 222}, - // }, - // }, - // fetching: map[string][]common.Hash{ - // "A": {{0x03}, {0x05}}, - // "C": {{0x01}, {0x04}}, - // "D": {{0x02}}, - // }, - // }, - // doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, - // isScheduled{ - // tracking: map[string][]announce{ - // "A": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x02}, types.LegacyTxType, 222}, - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x05}, types.LegacyTxType, 555}, - // }, - // "B": { - // {common.Hash{0x03}, types.LegacyTxType, 333}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // }, - // "C": { - // {common.Hash{0x01}, types.LegacyTxType, 111}, - // {common.Hash{0x04}, types.LegacyTxType, 444}, - // {common.Hash{0x06}, types.LegacyTxType, 666}, - // {common.Hash{0x07}, types.LegacyTxType, 777}, - // }, - // "D": { - // {common.Hash{0x01}, types.LegacyTxType, 999}, - // {common.Hash{0x02}, types.BlobTxType, 222}, - // }, - // "E": { - // {common.Hash{0x06}, types.LegacyTxType, 666}, - // {common.Hash{0x07}, types.LegacyTxType, 777}, - // }, - // }, - // fetching: map[string][]common.Hash{ - // "A": {{0x03}, {0x05}}, - // "C": {{0x01}, {0x04}}, - // "D": {{0x02}}, - // "E": {{0x06}, {0x07}}, - // }, - // }, + // Announce from a new peer to check that no overwrite happens + doTxNotify{peer: "B", hashes: []common.Hash{{0x03}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{333, 444}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + }), + // Announce clashing hashes but unique new peer + doTxNotify{peer: "C", hashes: []common.Hash{{0x01}, {0x04}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{111, 444}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "C": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + }), + // Announce existing and clashing hashes from existing peer. Clashes + // should not overwrite previous announcements. + doTxNotify{peer: "A", hashes: []common.Hash{{0x01}, {0x03}, {0x05}}, types: []byte{types.LegacyTxType, types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{999, 333, 555}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "C": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + }), + // Announce clashing hashes with conflicting metadata. Somebody will + // be in the wrong, but we don't know yet who. + doTxNotify{peer: "D", hashes: []common.Hash{{0x01}, {0x02}}, types: []byte{types.LegacyTxType, types.BlobTxType}, sizes: []uint32{999, 222}}, + isWaiting(map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "C": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "D": { + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, + }, + }), + isScheduled{tracking: nil, fetching: nil}, + + // Wait for the arrival timeout which should move all expired items + // from the wait list to the scheduler + doWait{time: txArriveTimeout, step: true}, + isWaiting(nil), + isScheduled{ + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "C": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "D": { + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, + }, + }, + fetching: map[string][]common.Hash{ // Depends on deterministic test randomizer + "A": {{0x03}, {0x05}}, + "C": {{0x01}, {0x04}}, + "D": {{0x02}}, + }, + }, + // Queue up a non-fetchable transaction and then trigger it with a new + // peer (weird case to test 1 line in the fetcher) + doTxNotify{peer: "C", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, + isWaiting(map[string][]announce{ + "C": { + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, + }, + }), + doWait{time: txArriveTimeout, step: true}, + isScheduled{ + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "C": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, + }, + "D": { + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, + }, + }, + fetching: map[string][]common.Hash{ + "A": {{0x03}, {0x05}}, + "C": {{0x01}, {0x04}}, + "D": {{0x02}}, + }, + }, + doTxNotify{peer: "E", hashes: []common.Hash{{0x06}, {0x07}}, types: []byte{types.LegacyTxType, types.LegacyTxType}, sizes: []uint32{666, 777}}, + isScheduled{ + tracking: map[string][]announce{ + "A": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x02}, types.LegacyTxType, 222}, + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x05}, types.LegacyTxType, 555}, + }, + "B": { + {common.Hash{0x03}, types.LegacyTxType, 333}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + }, + "C": { + {common.Hash{0x01}, types.LegacyTxType, 111}, + {common.Hash{0x04}, types.LegacyTxType, 444}, + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, + }, + "D": { + {common.Hash{0x01}, types.LegacyTxType, 999}, + {common.Hash{0x02}, types.BlobTxType, 222}, + }, + "E": { + {common.Hash{0x06}, types.LegacyTxType, 666}, + {common.Hash{0x07}, types.LegacyTxType, 777}, + }, + }, + fetching: map[string][]common.Hash{ + "A": {{0x03}, {0x05}}, + "C": {{0x01}, {0x04}}, + "D": {{0x02}}, + "E": {{0x06}, {0x07}}, + }, + }, }, }) }