Skip to content

Commit fdbc857

Browse files
Clean up lastRebroadcast on transaction rejection (#2038)
* Clean up lastRebroadcast on transaction rejection * Add test for lastRebroadcast cleanup on promotion rejection
1 parent 09b0330 commit fdbc857

File tree

2 files changed

+44
-66
lines changed

2 files changed

+44
-66
lines changed

core/txpool/legacypool/legacypool.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1232,6 +1232,7 @@ func (pool *LegacyPool) promoteTx(addr common.Address, hash common.Hash, tx *typ
12321232
pool.all.Remove(hash)
12331233
pool.priced.Removed(1)
12341234
pendingDiscardMeter.Mark(1)
1235+
delete(pool.lastRebroadcast, hash)
12351236
return false
12361237
}
12371238
// Otherwise discard any previous transaction and mark this

core/txpool/legacypool/legacypool_test.go

Lines changed: 43 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5197,91 +5197,76 @@ func TestRebroadcastCleanup(t *testing.T) {
51975197
func TestRebroadcastCleanupAllPaths(t *testing.T) {
51985198
t.Parallel()
51995199

5200+
// Helper to verify lastRebroadcast cleanup
5201+
verifyCleanup := func(t *testing.T, pool *LegacyPool, txHash common.Hash, sizeBefore int, expectRemoved bool) {
5202+
t.Helper()
5203+
pool.mu.RLock()
5204+
_, stillTracked := pool.lastRebroadcast[txHash]
5205+
sizeAfter := len(pool.lastRebroadcast)
5206+
pool.mu.RUnlock()
5207+
5208+
if expectRemoved && stillTracked {
5209+
t.Error("transaction should be removed from lastRebroadcast")
5210+
}
5211+
if expectRemoved && sizeAfter >= sizeBefore {
5212+
t.Errorf("lastRebroadcast map size should decrease: before=%d, after=%d", sizeBefore, sizeAfter)
5213+
}
5214+
}
5215+
52005216
t.Run("PendingReplacement", func(t *testing.T) {
52015217
t.Parallel()
5202-
// Test that replacing a pending transaction cleans up lastRebroadcast
52035218
pool, key := setupPoolWithConfig(params.TestChainConfig)
52045219
defer pool.Close()
52055220

52065221
from := crypto.PubkeyToAddress(key.PublicKey)
52075222
testAddBalance(pool, from, big.NewInt(1000000000000000000))
52085223

5209-
// Add original transaction
52105224
tx1 := pricedTransaction(0, 100000, big.NewInt(1), key)
52115225
if err := pool.addRemoteSync(tx1); err != nil {
52125226
t.Fatalf("failed to add original transaction: %v", err)
52135227
}
52145228

5215-
// Manually mark it as rebroadcast and record size
52165229
pool.mu.Lock()
52175230
pool.lastRebroadcast[tx1.Hash()] = time.Now()
52185231
sizeBefore := len(pool.lastRebroadcast)
52195232
pool.mu.Unlock()
52205233

5221-
// Replace with higher price transaction
52225234
tx2 := pricedTransaction(0, 100000, big.NewInt(2), key)
52235235
if err := pool.addRemoteSync(tx2); err != nil {
52245236
t.Fatalf("failed to replace transaction: %v", err)
52255237
}
52265238

5227-
// Verify old tx is cleaned up and map size decreased
5228-
pool.mu.RLock()
5229-
_, stillTracked := pool.lastRebroadcast[tx1.Hash()]
5230-
sizeAfter := len(pool.lastRebroadcast)
5231-
pool.mu.RUnlock()
5232-
5233-
if stillTracked {
5234-
t.Error("replaced pending transaction should be removed from lastRebroadcast")
5235-
}
5236-
if sizeAfter >= sizeBefore {
5237-
t.Errorf("lastRebroadcast map size should decrease: before=%d, after=%d", sizeBefore, sizeAfter)
5238-
}
5239+
verifyCleanup(t, pool, tx1.Hash(), sizeBefore, true)
52395240
})
52405241

52415242
t.Run("QueuedReplacement", func(t *testing.T) {
52425243
t.Parallel()
5243-
// Test that replacing a queued transaction cleans up lastRebroadcast
52445244
pool, key := setupPoolWithConfig(params.TestChainConfig)
52455245
defer pool.Close()
52465246

52475247
from := crypto.PubkeyToAddress(key.PublicKey)
52485248
testAddBalance(pool, from, big.NewInt(1000000000000000000))
52495249

5250-
// Add a queued transaction (nonce gap)
52515250
tx1 := pricedTransaction(5, 100000, big.NewInt(1), key)
52525251
if err := pool.addRemoteSync(tx1); err != nil {
52535252
t.Fatalf("failed to add original queued transaction: %v", err)
52545253
}
52555254

5256-
// Manually mark it as rebroadcast and record size
52575255
pool.mu.Lock()
52585256
pool.lastRebroadcast[tx1.Hash()] = time.Now()
52595257
sizeBefore := len(pool.lastRebroadcast)
52605258
pool.mu.Unlock()
52615259

5262-
// Replace with higher price transaction
52635260
tx2 := pricedTransaction(5, 100000, big.NewInt(2), key)
52645261
if err := pool.addRemoteSync(tx2); err != nil {
52655262
t.Fatalf("failed to replace queued transaction: %v", err)
52665263
}
52675264

5268-
// Verify old tx is cleaned up and map size decreased
5269-
pool.mu.RLock()
5270-
_, stillTracked := pool.lastRebroadcast[tx1.Hash()]
5271-
sizeAfter := len(pool.lastRebroadcast)
5272-
pool.mu.RUnlock()
5273-
5274-
if stillTracked {
5275-
t.Error("replaced queued transaction should be removed from lastRebroadcast")
5276-
}
5277-
if sizeAfter >= sizeBefore {
5278-
t.Errorf("lastRebroadcast map size should decrease: before=%d, after=%d", sizeBefore, sizeAfter)
5279-
}
5265+
verifyCleanup(t, pool, tx1.Hash(), sizeBefore, true)
52805266
})
52815267

52825268
t.Run("DemoteUnexecutables", func(t *testing.T) {
52835269
t.Parallel()
5284-
// Test that demoting transactions (when nonce advances externally) cleans up lastRebroadcast
52855270
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
52865271
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
52875272

@@ -5293,41 +5278,24 @@ func TestRebroadcastCleanupAllPaths(t *testing.T) {
52935278
from := crypto.PubkeyToAddress(key.PublicKey)
52945279
testAddBalance(pool, from, big.NewInt(1000000000000000000))
52955280

5296-
// Add a pending transaction at nonce 0
52975281
tx := pricedTransaction(0, 100000, big.NewInt(1), key)
52985282
if err := pool.addRemoteSync(tx); err != nil {
52995283
t.Fatalf("failed to add transaction: %v", err)
53005284
}
53015285

5302-
// Manually mark it as rebroadcast and record size
53035286
pool.mu.Lock()
53045287
pool.lastRebroadcast[tx.Hash()] = time.Now()
53055288
sizeBefore := len(pool.lastRebroadcast)
53065289
pool.mu.Unlock()
53075290

5308-
// Simulate external nonce advancement (e.g., tx included in block)
53095291
statedb.SetNonce(from, 1, tracing.NonceChangeUnspecified)
5310-
5311-
// Trigger reset which calls demoteUnexecutables
53125292
<-pool.requestReset(nil, nil)
53135293

5314-
// Verify old tx is cleaned up and map size decreased
5315-
pool.mu.RLock()
5316-
_, stillTracked := pool.lastRebroadcast[tx.Hash()]
5317-
sizeAfter := len(pool.lastRebroadcast)
5318-
pool.mu.RUnlock()
5319-
5320-
if stillTracked {
5321-
t.Error("demoted transaction should be removed from lastRebroadcast")
5322-
}
5323-
if sizeAfter >= sizeBefore {
5324-
t.Errorf("lastRebroadcast map size should decrease: before=%d, after=%d", sizeBefore, sizeAfter)
5325-
}
5294+
verifyCleanup(t, pool, tx.Hash(), sizeBefore, true)
53265295
})
53275296

53285297
t.Run("PromoteExecutablesDropsOld", func(t *testing.T) {
53295298
t.Parallel()
5330-
// Test that promoting executables cleans up old queued txs from lastRebroadcast
53315299
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabaseForTesting())
53325300
blockchain := newTestBlockChain(params.TestChainConfig, 1000000, statedb, new(event.Feed))
53335301

@@ -5339,36 +5307,45 @@ func TestRebroadcastCleanupAllPaths(t *testing.T) {
53395307
from := crypto.PubkeyToAddress(key.PublicKey)
53405308
testAddBalance(pool, from, big.NewInt(1000000000000000000))
53415309

5342-
// Add a queued transaction at nonce 5 (future nonce)
53435310
tx := pricedTransaction(5, 100000, big.NewInt(1), key)
53445311
if err := pool.addRemoteSync(tx); err != nil {
53455312
t.Fatalf("failed to add queued transaction: %v", err)
53465313
}
53475314

5348-
// Manually mark it as rebroadcast and record size
53495315
pool.mu.Lock()
53505316
pool.lastRebroadcast[tx.Hash()] = time.Now()
53515317
sizeBefore := len(pool.lastRebroadcast)
53525318
pool.mu.Unlock()
53535319

5354-
// Advance the nonce past the queued tx (simulating txs 0-5 being included)
53555320
statedb.SetNonce(from, 6, tracing.NonceChangeUnspecified)
5356-
5357-
// Trigger reset which calls promoteExecutables and drops old queued txs
53585321
<-pool.requestReset(nil, nil)
53595322

5360-
// Verify old tx is cleaned up and map size decreased
5361-
pool.mu.RLock()
5362-
_, stillTracked := pool.lastRebroadcast[tx.Hash()]
5363-
sizeAfter := len(pool.lastRebroadcast)
5364-
pool.mu.RUnlock()
5323+
verifyCleanup(t, pool, tx.Hash(), sizeBefore, true)
5324+
})
53655325

5366-
if stillTracked {
5367-
t.Error("dropped queued transaction should be removed from lastRebroadcast")
5368-
}
5369-
if sizeAfter >= sizeBefore {
5370-
t.Errorf("lastRebroadcast map size should decrease: before=%d, after=%d", sizeBefore, sizeAfter)
5326+
t.Run("PromotionRejection", func(t *testing.T) {
5327+
t.Parallel()
5328+
pool, key := setupPoolWithConfig(params.TestChainConfig)
5329+
defer pool.Close()
5330+
5331+
from := crypto.PubkeyToAddress(key.PublicKey)
5332+
testAddBalance(pool, from, big.NewInt(1000000000000000000))
5333+
5334+
tx1 := pricedTransaction(0, 100000, big.NewInt(100), key)
5335+
if err := pool.addRemoteSync(tx1); err != nil {
5336+
t.Fatalf("failed to add original transaction: %v", err)
53715337
}
5338+
5339+
tx2 := pricedTransaction(0, 100000, big.NewInt(50), key)
5340+
pool.mu.Lock()
5341+
pool.all.Add(tx2)
5342+
pool.priced.Put(tx2)
5343+
pool.lastRebroadcast[tx2.Hash()] = time.Now()
5344+
sizeBefore := len(pool.lastRebroadcast)
5345+
pool.promoteTx(from, tx2.Hash(), tx2)
5346+
pool.mu.Unlock()
5347+
5348+
verifyCleanup(t, pool, tx2.Hash(), sizeBefore, true)
53725349
})
53735350
}
53745351

0 commit comments

Comments
 (0)