diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index b6129670e012..2bfec4151ec8 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -7,11 +7,12 @@ package evm import ( "context" + "errors" "fmt" "sync" - "sync/atomic" "github.com/ava-labs/libevm/core/types" + "github.com/ava-labs/libevm/event" "github.com/ava-labs/libevm/log" "github.com/prometheus/client_golang/prometheus" @@ -33,8 +34,13 @@ var ( _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) _ eth.PushGossiper = (*EthPushGossiper)(nil) + + errSubscribing = errors.New("subscribing to the mempool failed") ) +// NewGossipEthTxPool creates a new GossipEthTxPool. +// +// If a nil error is returned, UpdateBloomFilter must be called. func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer) (*GossipEthTxPool, error) { bloom, err := gossip.NewBloomFilter( registerer, @@ -47,9 +53,16 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer return nil, fmt.Errorf("failed to initialize bloom filter: %w", err) } + pendingTxs := make(chan core.NewTxsEvent, pendingTxsBuffer) + sub := mempool.SubscribeTransactions(pendingTxs, true) + if sub == nil { + return nil, errSubscribing + } + return &GossipEthTxPool{ mempool: mempool, - pendingTxs: make(chan core.NewTxsEvent, pendingTxsBuffer), + pendingTxs: pendingTxs, + sub: sub, bloom: bloom, }, nil } @@ -57,38 +70,29 @@ func NewGossipEthTxPool(mempool *txpool.TxPool, registerer prometheus.Registerer type GossipEthTxPool struct { mempool *txpool.TxPool pendingTxs chan core.NewTxsEvent + sub event.Subscription - bloom *gossip.BloomFilter lock sync.RWMutex - - // subscribed is set to true when the gossip subscription is active - // mostly used for testing - subscribed atomic.Bool -} - -// IsSubscribed returns whether or not the gossip subscription is active. -func (g *GossipEthTxPool) IsSubscribed() bool { - return g.subscribed.Load() + bloom *gossip.BloomFilter } -func (g *GossipEthTxPool) Subscribe(ctx context.Context) { - sub := g.mempool.SubscribeTransactions(g.pendingTxs, false) - if sub == nil { - log.Warn("failed to subscribe to new txs event") - return - } - g.subscribed.CompareAndSwap(false, true) - defer func() { - sub.Unsubscribe() - g.subscribed.CompareAndSwap(true, false) - }() +// UpdateBloomFilter continuously listens for new pending transactions from the +// mempool and adds them to the bloom filter. If the bloom filter reaches its +// capacity, it is reset and all pending transactions are re-added. +func (g *GossipEthTxPool) UpdateBloomFilter(ctx context.Context) { + defer g.sub.Unsubscribe() for { select { case <-ctx.Done(): log.Debug("shutting down subscription") return - case pendingTxs := <-g.pendingTxs: + case pendingTxs, ok := <-g.pendingTxs: + if !ok { + log.Debug("pending txs channel closed, shutting down subscription") + return + } + g.lock.Lock() optimalElements := (g.mempool.PendingSize(txpool.PendingFilter{}) + len(pendingTxs.Txs)) * config.TxGossipBloomChurnMultiplier for _, pendingTx := range pendingTxs.Txs { diff --git a/graft/coreth/plugin/evm/gossip_test.go b/graft/coreth/plugin/evm/gossip_test.go index 15523adc24b6..feec8c6a52ea 100644 --- a/graft/coreth/plugin/evm/gossip_test.go +++ b/graft/coreth/plugin/evm/gossip_test.go @@ -63,11 +63,7 @@ func TestGossipSubscribe(t *testing.T) { require.NoError(err) ctx, cancel := context.WithCancel(t.Context()) defer cancel() - go gossipTxPool.Subscribe(ctx) - - require.Eventually(func() bool { - return gossipTxPool.IsSubscribed() - }, 10*time.Second, 500*time.Millisecond, "expected gossipTxPool to be subscribed") + go gossipTxPool.UpdateBloomFilter(ctx) // create eth txs ethTxs := getValidEthTxs(key, 10, big.NewInt(226*utils.GWei)) diff --git a/graft/coreth/plugin/evm/vm.go b/graft/coreth/plugin/evm/vm.go index 44fac1fa0d31..528d7fb39717 100644 --- a/graft/coreth/plugin/evm/vm.go +++ b/graft/coreth/plugin/evm/vm.go @@ -775,7 +775,7 @@ func (vm *VM) initBlockBuilding() error { } vm.shutdownWg.Add(1) go func() { - ethTxPool.Subscribe(ctx) + ethTxPool.UpdateBloomFilter(ctx) vm.shutdownWg.Done() }() pushGossipParams := avalanchegossip.BranchingFactor{