Skip to content

Commit 373a847

Browse files
committed
consensus, core, eth, miner: use the same insert behavior among validators
1 parent b74d4df commit 373a847

File tree

10 files changed

+251
-36
lines changed

10 files changed

+251
-36
lines changed

consensus/consensus.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,11 @@ type Istanbul interface {
113113
NewChainHead(block *types.Block) error
114114

115115
// Start starts the engine
116-
Start(chain ChainReader, inserter func(types.Blocks) (int, error)) error
116+
Start(chain ChainReader) error
117117

118118
// Stop stops the engine
119119
Stop() error
120+
121+
// Set commit and write proposed work functions
122+
SetWorkerFns(func(*types.Block) error, func(*types.Block) bool)
120123
}

consensus/istanbul/backend/backend.go

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"github.com/ethereum/go-ethereum/consensus/istanbul"
2828
istanbulCore "github.com/ethereum/go-ethereum/consensus/istanbul/core"
2929
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
30+
"github.com/ethereum/go-ethereum/core"
3031
"github.com/ethereum/go-ethereum/core/types"
3132
"github.com/ethereum/go-ethereum/crypto"
3233
"github.com/ethereum/go-ethereum/ethdb"
@@ -69,7 +70,10 @@ type backend struct {
6970
logger log.Logger
7071
db ethdb.Database
7172
chain consensus.ChainReader
72-
inserter func(types.Blocks) (int, error)
73+
// commitProposedWork tries to commit the transactions in proposed block into chain DB
74+
commitProposedWork func(*types.Block) error
75+
// writeProposedWork tries to write the proposed block into chain DB
76+
writeProposedWork func(*types.Block) bool
7377

7478
// the channels for istanbul engine notifications
7579
commitCh chan *types.Block
@@ -151,12 +155,13 @@ func (sb *backend) Commit(proposal istanbul.Proposal, seals [][]byte) error {
151155
if sb.proposedBlockHash == block.Hash() {
152156
// feed block hash to Seal() and wait the Seal() result
153157
sb.commitCh <- block
154-
// TODO: how do we check the block is inserted correctly?
158+
go sb.eventMux.Post(core.NewMinedBlockEvent{Block: block})
155159
return nil
156160
}
157-
// if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent
158-
if _, err := sb.inserter(types.Blocks{block}); err != nil {
159-
return err
161+
if result := sb.writeProposedWork(block); !result {
162+
// if I'm not a proposer, insert the block directly and broadcast NewCommittedEvent
163+
sb.logger.Warn("Write work failed")
164+
return nil
160165
}
161166
msg := istanbul.NewCommittedEvent{
162167
Block: block,
@@ -190,12 +195,12 @@ func (sb *backend) Verify(proposal istanbul.Proposal) (time.Duration, error) {
190195
// verify the header of proposed block
191196
err := sb.VerifyHeader(sb.chain, block.Header(), false)
192197
// ignore errEmptyCommittedSeals error because we don't have the committed seals yet
193-
if err == nil || err == errEmptyCommittedSeals {
194-
return 0, nil
195-
} else if err == consensus.ErrFutureBlock {
198+
if err == consensus.ErrFutureBlock {
196199
return time.Unix(block.Header().Time.Int64(), 0).Sub(now()), consensus.ErrFutureBlock
200+
} else if err != nil && err != errEmptyCommittedSeals {
201+
return 0, err
197202
}
198-
return 0, err
203+
return 0, sb.commitProposedWork(block)
199204
}
200205

201206
// Sign implements istanbul.Backend.Sign

consensus/istanbul/backend/backend_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ import (
2929
"github.com/ethereum/go-ethereum/consensus/istanbul/validator"
3030
"github.com/ethereum/go-ethereum/core/types"
3131
"github.com/ethereum/go-ethereum/crypto"
32-
"github.com/ethereum/go-ethereum/log"
32+
"github.com/ethereum/go-ethereum/ethdb"
33+
"github.com/ethereum/go-ethereum/event"
3334
)
3435

3536
func TestSign(t *testing.T) {
@@ -265,10 +266,10 @@ func (slice Keys) Swap(i, j int) {
265266
func newBackend() (b *backend, validatorKeys Keys, validatorSet istanbul.ValidatorSet) {
266267
key, _ := generatePrivateKey()
267268
validatorSet, validatorKeys = newTestValidatorSet(5)
268-
b = &backend{
269-
privateKey: key,
270-
logger: log.New("backend", "simple"),
271-
commitCh: make(chan *types.Block, 1),
272-
}
269+
eventMux := new(event.TypeMux)
270+
memDB, _ := ethdb.NewMemDatabase()
271+
config := istanbul.DefaultConfig
272+
// Use the first key as private key
273+
b, _ = New(config, eventMux, key, memDB).(*backend)
273274
return
274275
}

consensus/istanbul/backend/engine.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -531,7 +531,7 @@ func (sb *backend) NewChainHead(block *types.Block) error {
531531
}
532532

533533
// Start implements consensus.Istanbul.Start
534-
func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks) (int, error)) error {
534+
func (sb *backend) Start(chain consensus.ChainReader) error {
535535
sb.coreMu.Lock()
536536
defer sb.coreMu.Unlock()
537537
if sb.coreStarted {
@@ -546,7 +546,6 @@ func (sb *backend) Start(chain consensus.ChainReader, inserter func(types.Blocks
546546
sb.commitCh = make(chan *types.Block, 1)
547547

548548
sb.chain = chain
549-
sb.inserter = inserter
550549

551550
curHeader := chain.CurrentHeader()
552551
lastSequence := new(big.Int).Set(curHeader.Number)
@@ -583,6 +582,11 @@ func (sb *backend) Stop() error {
583582
return nil
584583
}
585584

585+
func (sb *backend) SetWorkerFns(commitFn func(*types.Block) error, writeFn func(*types.Block) bool) {
586+
sb.commitProposedWork = commitFn
587+
sb.writeProposedWork = writeFn
588+
}
589+
586590
// snapshot retrieves the authorization snapshot at a given point in time.
587591
func (sb *backend) snapshot(chain consensus.ChainReader, number uint64, hash common.Hash, parents []*types.Header) (*Snapshot, error) {
588592
// Search for a snapshot in memory or on disk for checkpoints

consensus/istanbul/backend/engine_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func newBlockChain(n int) (*core.BlockChain, *backend) {
5353
if err != nil {
5454
panic(err)
5555
}
56-
b.Start(blockchain, blockchain.InsertChain)
56+
b.Start(blockchain)
5757
snap, err := b.snapshot(blockchain, 0, common.Hash{}, nil)
5858
if err != nil {
5959
panic(err)

consensus/istanbul/core/preprepare.go

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -68,21 +68,23 @@ func (c *core) handlePreprepare(msg *message, src istanbul.Validator) error {
6868
}
6969

7070
// Verify the proposal we received
71-
if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
72-
logger.Warn("Failed to verify proposal", "err", err, "duration", duration)
73-
// if it's a future block, we will handle it again after the duration
74-
if err == consensus.ErrFutureBlock {
75-
c.stopFuturePreprepareTimer()
76-
c.futurePreprepareTimer = time.AfterFunc(duration, func() {
77-
c.sendEvent(backlogEvent{
78-
src: src,
79-
msg: msg,
71+
if !c.isProposer() {
72+
if duration, err := c.backend.Verify(preprepare.Proposal); err != nil {
73+
logger.Warn("Failed to verify proposal", "err", err, "duration", duration)
74+
// if it's a future block, we will handle it again after the duration
75+
if err == consensus.ErrFutureBlock {
76+
c.stopFuturePreprepareTimer()
77+
c.futurePreprepareTimer = time.AfterFunc(duration, func() {
78+
c.sendEvent(backlogEvent{
79+
src: src,
80+
msg: msg,
81+
})
8082
})
81-
})
82-
} else {
83-
c.sendNextRoundChange()
83+
} else {
84+
c.sendNextRoundChange()
85+
}
86+
return err
8487
}
85-
return err
8688
}
8789

8890
// Here is about to accept the preprepare

consensus/istanbul/events.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ type ConsensusDataEvent struct {
3131

3232
// NewCommittedEvent is posted when I'm not a proposer but
3333
// a block has been committed from Istanbul consensus.
34-
type NewCommittedEvent struct{ Block *types.Block }
34+
type NewCommittedEvent struct {
35+
Block *types.Block
36+
}
3537

3638
// RequestEvent is posted to propose a proposal
3739
type RequestEvent struct {

core/blockchain.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,11 @@ func (bc *BlockChain) WriteBlock(block *types.Block) (status WriteStatus, err er
827827
bc.mu.Lock()
828828
defer bc.mu.Unlock()
829829

830+
if bc.HasBlock(block.Hash()) {
831+
log.Debug("Block exists", "hash", block.Hash())
832+
return
833+
}
834+
830835
localTd := bc.GetTd(bc.currentBlock.Hash(), bc.currentBlock.NumberU64())
831836
externTd := new(big.Int).Add(block.Difficulty(), ptd)
832837

eth/backend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (s *Ethereum) StartMining(local bool) error {
355355
}
356356
clique.Authorize(eb, wallet.SignHash)
357357
} else if istanbul, ok := s.engine.(consensus.Istanbul); ok {
358-
istanbul.Start(s.blockchain, s.blockchain.InsertChain)
358+
istanbul.Start(s.blockchain)
359359
}
360360
if local {
361361
// If local (CPU) mining is started, we can disable the transaction rejection

0 commit comments

Comments
 (0)