From aabf0f078ec9fad1d97d02e37b2b1c8baab6d3f4 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 26 Nov 2025 11:14:08 -0500 Subject: [PATCH 1/3] Cleanup GossipEthTxPool --- graft/coreth/plugin/evm/eth_gossiper.go | 49 +++++++++++++------------ graft/coreth/plugin/evm/gossip_test.go | 6 +-- graft/coreth/plugin/evm/vm.go | 2 +- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index b6129670e012..91641b17833f 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -9,9 +9,9 @@ import ( "context" "fmt" "sync" - "sync/atomic" "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/event" "github.com/ava-labs/libevm/log" "github.com/prometheus/client_golang/prometheus" @@ -33,8 +33,13 @@ var ( _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) _ eth.PushGossiper = (*EthPushGossiper)(nil) + + errSubscribing = fmt.Errorf("subscribing to the mempool failed") ) +// NewGossipEthTxPool creates a new GossipEthTxPool. +// +// If a nil error is returned, UpdateBloomFilter must be called. func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer) (*GossipEthTxPool, error) { bloom, err := gossip.NewBloomFilter( registerer, @@ -47,9 +52,16 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) } + pendingTxs := make(chan core.NewTxsEvent, pendingTxsBuffer) + sub := mempool.SubscribeTransactions(pendingTxs, true) + if sub == nil { + return nil, errSubscribing + } + return &GossipEthTxPool{ mempool: mempool, pendingTxs: make(chan core.NewTxsEvent, pendingTxsBuffer), + sub: sub, bloom: bloom, }, nil } @@ -57,38 +69,29 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer type GossipEthTxPool struct { mempool *txpool.TxPool pendingTxs chan core.NewTxsEvent + sub event.Subscription - bloom *gossip.BloomFilter lock sync.RWMutex - - // subscribed is set to true when the gossip subscription is active - // mostly used for testing - subscribed atomic.Bool -} - -// IsSubscribed returns whether or not the gossip subscription is active. -func (g *GossipEthTxPool) IsSubscribed() bool { - return g.subscribed.Load() + bloom *gossip.BloomFilter } -func (g *GossipEthTxPool) Subscribe(ctx context.Context) { - sub := g.mempool.SubscribeTransactions(g.pendingTxs, false) - if sub == nil { - log.Warn("failed to subscribe to new txs event") - return - } - g.subscribed.CompareAndSwap(false, true) - defer func() { - sub.Unsubscribe() - g.subscribed.CompareAndSwap(true, false) - }() +// UpdateBloomFilter continuously listens for new pending transactions from the +// mempool and adds them to the bloom filter. If the bloom filter reaches its +// capacity, it is reset and all pending transactions are re-added. +func (g *GossipEthTxPool) UpdateBloomFilter(ctx context.Context) { + defer g.sub.Unsubscribe() for { select { case <-ctx.Done(): log.Debug("shutting down subscription") return - case pendingTxs := <-g.pendingTxs: + case pendingTxs, ok := <-g.pendingTxs: + if !ok { + log.Debug("pending txs channel closed, shutting down subscription") + return + } + g.lock.Lock() optimalElements := (g.mempool.PendingSize(txpool.PendingFilter{}) + len(pendingTxs.Txs)) * config.TxGossipBloomChurnMultiplier for _, pendingTx := range pendingTxs.Txs { diff --git a/graft/coreth/plugin/evm/gossip_test.go b/graft/coreth/plugin/evm/gossip_test.go index 15523adc24b6..feec8c6a52ea 100644 --- a/graft/coreth/plugin/evm/gossip_test.go +++ b/graft/coreth/plugin/evm/gossip_test.go @@ -63,11 +63,7 @@ func TestGossipSubscribe(t *testing.T) { require.NoError(err) ctx, cancel := context.WithCancel(t.Context()) defer cancel() - go gossipTxPool.Subscribe(ctx) - - require.Eventually(func() bool { - return gossipTxPool.IsSubscribed() - }, 10*time.Second, 500*time.Millisecond, "expected gossipTxPool to be subscribed") + go gossipTxPool.UpdateBloomFilter(ctx) // create eth txs ethTxs := getValidEthTxs(key, 10, big.NewInt(226*utils.GWei)) diff --git a/graft/coreth/plugin/evm/vm.go b/graft/coreth/plugin/evm/vm.go index 44fac1fa0d31..528d7fb39717 100644 --- a/graft/coreth/plugin/evm/vm.go +++ b/graft/coreth/plugin/evm/vm.go @@ -775,7 +775,7 @@ func (vm *VM) initBlockBuilding() error { } vm.shutdownWg.Add(1) go func() { - ethTxPool.Subscribe(ctx) + ethTxPool.UpdateBloomFilter(ctx) vm.shutdownWg.Done() }() pushGossipParams := avalanchegossip.BranchingFactor{ From 558e73827d107251c2257062b61f3cca346b9306 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 26 Nov 2025 11:24:47 -0500 Subject: [PATCH 2/3] fix --- graft/coreth/plugin/evm/eth_gossiper.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index 91641b17833f..0192b2aca31b 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -60,7 +60,7 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer return &GossipEthTxPool{ mempool: mempool, - pendingTxs: make(chan core.NewTxsEvent, pendingTxsBuffer), + pendingTxs: pendingTxs, sub: sub, bloom: bloom, }, nil From d480b0aa9268913b14730cf81618e5d083b8c5eb Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 26 Nov 2025 11:25:49 -0500 Subject: [PATCH 3/3] fix lint --- graft/coreth/plugin/evm/eth_gossiper.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index 0192b2aca31b..2bfec4151ec8 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -7,6 +7,7 @@ package evm import ( "context" + "errors" "fmt" "sync" @@ -34,7 +35,7 @@ var ( _ eth.PushGossiper = (*EthPushGossiper)(nil) - errSubscribing = fmt.Errorf("subscribing to the mempool failed") + errSubscribing = errors.New("subscribing to the mempool failed") ) // NewGossipEthTxPool creates a new GossipEthTxPool.