Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ var (
blockPrefetchTxsInvalidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/invalid", nil)
blockPrefetchTxsValidMeter = metrics.NewRegisteredMeter("chain/prefetch/txs/valid", nil)

// Witness and write-path metrics for block production observability.
// These track the time spent in each phase of writeBlockWithState, which runs
// on the critical path between block sealing and broadcasting. Delays here
// (e.g. from large witness encoding, DB compaction stalls, or pathdb diff layer
// flushes) can cause blocks to be broadcast late, triggering span rotations.
witnessEncodeTimer = metrics.NewRegisteredTimer("chain/witness/encode", nil) // time to RLP-encode the witness (EncodeRLP)
witnessDbWriteTimer = metrics.NewRegisteredTimer("chain/witness/dbwrite", nil) // time to write encoded witness into the DB batch (WriteWitness)
witnessCollectionTimer = metrics.NewRegisteredTimer("chain/witness/collection", nil) // time spent collecting trie nodes into the witness during IntermediateRoot
blockBatchWriteTimer = metrics.NewRegisteredTimer("chain/batch/write", nil) // time to flush the block batch to disk (blockBatch.Write) — spikes indicate DB compaction stalls
stateCommitTimer = metrics.NewRegisteredTimer("chain/state/commit", nil) // time for statedb.CommitWithUpdate — in pathdb mode, spikes indicate diff layer flushes

errInsertionInterrupted = errors.New("insertion is interrupted")
errChainStopped = errors.New("blockchain is stopped")
errInvalidOldChain = errors.New("invalid old chain")
Expand Down Expand Up @@ -2254,24 +2265,52 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
rawdb.WritePreimages(blockBatch, statedb.Preimages())

if statedb.Witness() != nil {
encStart := time.Now()

var witBuf bytes.Buffer
if err := statedb.Witness().EncodeRLP(&witBuf); err != nil {
log.Error("error in witness encoding", "caughterr", err)
}

log.Debug("Writing witness", "block", block.NumberU64(), "hash", block.Hash(), "header", statedb.Witness().Header())
encodeDuration := time.Since(encStart)
witnessEncodeTimer.Update(encodeDuration)

witnessBytes := witBuf.Bytes()

writeStart := time.Now()
log.Debug("Writing witness", "block", block.NumberU64(), "hash", block.Hash(), "header", statedb.Witness().Header())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea is that we can add warning logs if the time of any operation is greater than a threshold, e.g. 100ms.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done here. Added for witness encode, witness write, batch flush and state commit

bc.WriteWitness(blockBatch, block.Hash(), witnessBytes)
dbWriteDuration := time.Since(writeStart)
witnessDbWriteTimer.Update(dbWriteDuration)

if encodeDuration > 100*time.Millisecond {
log.Warn("Slow witness encoding", "block", block.NumberU64(), "elapsed", common.PrettyDuration(encodeDuration), "size", common.StorageSize(len(witnessBytes)))
}
if dbWriteDuration > 100*time.Millisecond {
log.Warn("Slow witness DB write", "block", block.NumberU64(), "elapsed", common.PrettyDuration(dbWriteDuration), "size", common.StorageSize(len(witnessBytes)))
}
} else {
log.Debug("No witness to write", "block", block.NumberU64())
}

batchStart := time.Now()
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
batchFlushDuration := time.Since(batchStart)
blockBatchWriteTimer.Update(batchFlushDuration)
if batchFlushDuration > 100*time.Millisecond {
log.Warn("Slow block batch flush", "block", block.NumberU64(), "elapsed", common.PrettyDuration(batchFlushDuration))
}

// Commit all cached state changes into underlying memory database.
commitStart := time.Now()
root, stateUpdate, err := statedb.CommitWithUpdate(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()), bc.chainConfig.IsCancun(block.Number()))
commitDuration := time.Since(commitStart)
stateCommitTimer.Update(commitDuration)
if commitDuration > 100*time.Millisecond {
log.Warn("Slow state commit", "block", block.NumberU64(), "elapsed", common.PrettyDuration(commitDuration))
}
if err != nil {
return []*types.Log{}, err
}
Expand Down Expand Up @@ -3237,6 +3276,7 @@ func (bc *BlockChain) insertChainWithWitnesses(chain types.Blocks, setHead bool,
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them
witnessCollectionTimer.Update(statedb.WitnessCollection)

blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockInsertTimer.UpdateSince(start)
Expand Down Expand Up @@ -3441,6 +3481,7 @@ func (bc *BlockChain) processBlock(block *types.Block, statedb *state.StateDB, s
storageCommitTimer.Update(statedb.StorageCommits) // Storage commits are complete, we can mark them
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them
witnessCollectionTimer.Update(statedb.WitnessCollection)

blockWriteTimer.Update(time.Since(wstart) - max(statedb.AccountCommits, statedb.StorageCommits) /* concurrent */ - statedb.SnapshotCommits - statedb.TrieDBCommits)
elapsed := time.Since(startTime) + 1 // prevent zero division
Expand Down
61 changes: 61 additions & 0 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"github.com/ethereum/go-ethereum/eth/tracers/logger"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/ethdb/pebble"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/trie"
Expand Down Expand Up @@ -6186,3 +6187,63 @@ func TestStateAtWithReaders(t *testing.T) {
t.Logf("Got expected error for invalid root: %v", err)
})
}

// TestWriteBlockMetrics verifies that the block write path metrics
// (batch write, state commit, witness collection) are updated after
// inserting blocks into the chain, and that the slow-operation warning
// log code paths execute without errors.
func TestWriteBlockMetrics(t *testing.T) {
metrics.Enable()

var (
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000000000)
gspec = &Genesis{
Config: params.TestChainConfig,
Alloc: types.GenesisAlloc{address: {Balance: funds}},
BaseFee: big.NewInt(params.InitialBaseFee),
}
signer = types.LatestSigner(gspec.Config)
)

_, blocks, _ := GenerateChainWithGenesis(gspec, ethash.NewFaker(), 5, func(i int, block *BlockGen) {
block.SetCoinbase(common.Address{0x01})
tx, err := types.SignTx(types.NewTransaction(block.TxNonce(address), common.Address{0x02}, big.NewInt(1000), params.TxGas, block.header.BaseFee, nil), signer, key)
if err != nil {
panic(err)
}
block.AddTx(tx)
})

chain, _ := NewBlockChain(rawdb.NewMemoryDatabase(), gspec, ethash.NewFaker(), DefaultConfig().WithStateScheme(rawdb.HashScheme))
defer chain.Stop()

// Capture metric counts before insertion
batchCountBefore := blockBatchWriteTimer.Snapshot().Count()
commitCountBefore := stateCommitTimer.Snapshot().Count()

if _, err := chain.InsertChain(blocks, false); err != nil {
t.Fatalf("failed to insert chain: %v", err)
}

// Verify timers were updated
batchSnap := blockBatchWriteTimer.Snapshot()
commitSnap := stateCommitTimer.Snapshot()

if batchSnap.Count() <= batchCountBefore {
t.Error("blockBatchWriteTimer should have been updated after block insertion")
}
if commitSnap.Count() <= commitCountBefore {
t.Error("stateCommitTimer should have been updated after block insertion")
}

// Verify durations are non-negative (the duration variables that feed
// both the metrics and the >100ms warning log checks are valid)
if batchSnap.Mean() < 0 {
t.Error("blockBatchWriteTimer mean duration should be non-negative")
}
if commitSnap.Mean() < 0 {
t.Error("stateCommitTimer mean duration should be non-negative")
}
}
7 changes: 5 additions & 2 deletions core/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package core

import (
"time"

"github.com/ethereum/go-ethereum/core/stateless"
"github.com/ethereum/go-ethereum/core/types"
)
Expand All @@ -29,8 +31,9 @@ type StuckTxsEvent struct{ Txs []*types.Transaction }

// NewMinedBlockEvent is posted when a block has been imported.
type NewMinedBlockEvent struct {
Block *types.Block
Witness *stateless.Witness
Block *types.Block
Witness *stateless.Witness
SealedAt time.Time // time when WriteBlockAndSetHead completed, used to measure broadcast latency
}

// RemovedLogsEvent is posted when a reorg happens
Expand Down
5 changes: 5 additions & 0 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type StateDB struct {
SnapshotStorageReads time.Duration
SnapshotCommits time.Duration
TrieDBCommits time.Duration
WitnessCollection time.Duration // time spent collecting trie nodes into witness during IntermediateRoot (sequential portion only)

// Bor metrics
BorConsensusTime time.Duration
Expand Down Expand Up @@ -1330,6 +1331,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
// Skip witness collection in Verkle mode, they will be gathered
// together at the end.
if s.witness != nil && !s.db.TrieDB().IsVerkle() {
witStart := time.Now()
// Pull in anything that has been accessed before destruction
for _, obj := range s.stateObjectsDestruct {
// Skip any objects that haven't touched their storage
Expand Down Expand Up @@ -1374,6 +1376,7 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {
}
}
}
s.WitnessCollection += time.Since(witStart)
}
workers.Wait()
s.StorageUpdates += time.Since(start)
Expand Down Expand Up @@ -1437,11 +1440,13 @@ func (s *StateDB) IntermediateRoot(deleteEmptyObjects bool) common.Hash {

// If witness building is enabled, gather the account trie witness
if s.witness != nil {
witStart := time.Now()
witness := s.trie.Witness()
s.witness.AddState(witness)
if s.witnessStats != nil {
s.witnessStats.Add(witness, common.Hash{})
}
s.WitnessCollection += time.Since(witStart)
}
return hash
}
Expand Down
46 changes: 46 additions & 0 deletions core/state/statedb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/ethereum/go-ethereum/core/blockstm"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/state/snapshot"
"github.com/ethereum/go-ethereum/core/stateless"
"github.com/ethereum/go-ethereum/core/tracing"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -2077,3 +2078,48 @@ func TestRevertWriteSelfDestruct(t *testing.T) {
assert.True(t, containsKey(s.MVFullWriteList(), keySuicide))
assert.False(t, containsKey(s.MVWriteList(), keySuicide))
}

// TestWitnessCollectionTiming verifies that IntermediateRoot populates
// the WitnessCollection duration field when a witness is attached.
func TestWitnessCollectionTiming(t *testing.T) {
db := rawdb.NewMemoryDatabase()
tdb := triedb.NewDatabase(db, nil)
sdb := NewDatabase(tdb, nil)

// Test with witness: WitnessCollection should be populated
state, _ := New(types.EmptyRootHash, sdb)

witness := &stateless.Witness{
Headers: []*types.Header{},
Codes: make(map[string]struct{}),
State: make(map[string]struct{}),
}
state.SetWitness(witness)

for i := byte(0); i < 20; i++ {
addr := common.BytesToAddress([]byte{i})
state.AddBalance(addr, uint256.NewInt(uint64(100*i+1)), tracing.BalanceChangeUnspecified)
state.SetState(addr, common.BytesToHash([]byte{i}), common.BytesToHash([]byte{i, i}))
}

state.IntermediateRoot(true)

if state.WitnessCollection == 0 {
t.Error("WitnessCollection should be > 0 when witness is attached")
}

// Test without witness: WitnessCollection should remain zero
state2, _ := New(types.EmptyRootHash, sdb)

for i := byte(0); i < 20; i++ {
addr := common.BytesToAddress([]byte{i})
state2.AddBalance(addr, uint256.NewInt(uint64(100*i+1)), tracing.BalanceChangeUnspecified)
state2.SetState(addr, common.BytesToHash([]byte{i}), common.BytesToHash([]byte{i, i}))
}

state2.IntermediateRoot(true)

if state2.WitnessCollection != 0 {
t.Errorf("WitnessCollection should be 0 without witness, got %v", state2.WitnessCollection)
}
}
17 changes: 14 additions & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,12 @@ const (
txMaxBroadcastSize = 4096
)

var syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
var (
syncChallengeTimeout = 15 * time.Second // Time allowance for a node to reply to the sync progress challenge
// sealToBroadcastTimer measures latency from seal+write completion to broadcast start.
// This captures event delivery delay through the TypeMux subscription channel.
sealToBroadcastTimer = metrics.NewRegisteredTimer("eth/seal2broadcast", nil)
)

// txPool defines the methods needed from a transaction pool implementation to
// support all the operations needed by the Ethereum chain protocols.
Expand Down Expand Up @@ -786,10 +791,16 @@ func (h *handler) minedBroadcastLoop() {

for obj := range h.minedBlockSub.Chan() {
if ev, ok := obj.Data.(core.NewMinedBlockEvent); ok {
now := time.Now()
var sealToBcast time.Duration
if !ev.SealedAt.IsZero() {
sealToBcast = now.Sub(ev.SealedAt)
sealToBroadcastTimer.Update(sealToBcast)
}
if h.enableBlockTracking {
delayInMs := time.Now().UnixMilli() - int64(ev.Block.Time())*1000
delayInMs := now.UnixMilli() - int64(ev.Block.Time())*1000
delay := common.PrettyDuration(time.Millisecond * time.Duration(delayInMs))
log.Info("[block tracker] Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "blockTime", ev.Block.Time(), "now", time.Now().Unix(), "delay", delay, "delayInMs", delayInMs)
log.Info("[block tracker] Broadcasting mined block", "number", ev.Block.NumberU64(), "hash", ev.Block.Hash(), "blockTime", ev.Block.Time(), "now", now.Unix(), "delay", delay, "delayInMs", delayInMs, "sealToBroadcast", common.PrettyDuration(sealToBcast))
}
h.BroadcastBlock(ev.Block, ev.Witness, true) // First propagate block to peers
h.BroadcastBlock(ev.Block, ev.Witness, false) // Only then announce to the rest
Expand Down
46 changes: 46 additions & 0 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/ethereum/go-ethereum/eth/protocols/eth"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/params"
Expand Down Expand Up @@ -511,3 +512,48 @@ func closePeers(peers []*ethPeer) {
p.Close()
}
}

// TestSealToBroadcastTimer verifies that the sealToBroadcastTimer metric is
// updated when minedBroadcastLoop processes a NewMinedBlockEvent with SealedAt set.
func TestSealToBroadcastTimer(t *testing.T) {
t.Parallel()
metrics.Enable()

handler := newTestHandlerWithBlocks(1)
defer handler.close()

countBefore := sealToBroadcastTimer.Snapshot().Count()

// Get a valid block from the chain to use in the event
block := handler.chain.GetBlockByNumber(1)
if block == nil {
t.Fatal("failed to get block 1")
}

// Post a NewMinedBlockEvent with SealedAt set
handler.handler.eventMux.Post(core.NewMinedBlockEvent{
Block: block,
SealedAt: time.Now(),
})

// Give the broadcast loop time to process
time.Sleep(200 * time.Millisecond)

if sealToBroadcastTimer.Snapshot().Count() <= countBefore {
t.Error("sealToBroadcastTimer should have been updated after NewMinedBlockEvent with SealedAt")
}

// Test that zero SealedAt does NOT update the timer
countAfterFirst := sealToBroadcastTimer.Snapshot().Count()

handler.handler.eventMux.Post(core.NewMinedBlockEvent{
Block: block,
// SealedAt is zero (default)
})

time.Sleep(200 * time.Millisecond)

if sealToBroadcastTimer.Snapshot().Count() != countAfterFirst {
t.Error("sealToBroadcastTimer should NOT be updated when SealedAt is zero")
}
}
8 changes: 7 additions & 1 deletion miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ var (
intermediateRootTimer = metrics.NewRegisteredTimer("worker/intermediateRoot", nil)
// commitTimer measures total time for complete block building (tx execution + finalization + state root)
commitTimer = metrics.NewRegisteredTimer("worker/commit", nil)
// writeBlockAndSetHeadTimer measures total time for WriteBlockAndSetHead in the seal result loop.
// This covers the entire gap between block sealing and event posting: witness encoding, batch write,
// state commit, and (in hashdb mode) trie GC. Spikes here directly delay block broadcasting.
writeBlockAndSetHeadTimer = metrics.NewRegisteredTimer("worker/writeBlockAndSetHead", nil)

// Cache hit/miss metrics for block production (miner path)
// These are the same meters used by the import path in blockchain.go
Expand Down Expand Up @@ -965,7 +969,9 @@ func (w *worker) resultLoop() {
}

// Commit block and state to database.
writeStart := time.Now()
_, err = w.chain.WriteBlockAndSetHead(block, receipts, logs, task.state, true)
writeBlockAndSetHeadTimer.Update(time.Since(writeStart))

if err != nil {
log.Error("Failed writing block to chain", "err", err)
Expand All @@ -980,7 +986,7 @@ func (w *worker) resultLoop() {
"elapsed", common.PrettyDuration(time.Since(task.createdAt)))

// Broadcast the block and announce chain insertion event
w.mux.Post(core.NewMinedBlockEvent{Block: block, Witness: witness})
w.mux.Post(core.NewMinedBlockEvent{Block: block, Witness: witness, SealedAt: time.Now()})

sealedBlocksCounter.Inc(1)

Expand Down
Loading
Loading