Skip to content
This repository was archived by the owner on Dec 4, 2024. It is now read-only.

Commit ae408db

Browse files
Comments fix
1 parent 8e5ec7f commit ae408db

File tree

4 files changed

+50
-37
lines changed

4 files changed

+50
-37
lines changed

consensus/polybft/eventtracker/event_tracker.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ type EventTrackerConfig struct {
6262
// LogFilter defines which events are tracked and from which contracts on the tracked chain
6363
LogFilter map[ethgo.Address][]ethgo.Hash
6464

65-
// Store is the store implementation for data that tracker saves (lastProcessedBlock and logs)
66-
Store EventTrackerStore
67-
6865
// BlockProvider is the implementation of a provider that returns blocks and logs from tracked chain
6966
BlockProvider BlockProvider
7067

@@ -81,6 +78,9 @@ type EventTracker struct {
8178

8279
blockTracker blocktracker.BlockTrackerInterface
8380
blockContainer *TrackerBlockContainer
81+
82+
// store is the store implementation for data that tracker saves (lastProcessedBlock and logs)
83+
store EventTrackerStore
8484
}
8585

8686
// NewEventTracker is a constructor function that creates a new instance of the EventTracker struct.
@@ -89,10 +89,9 @@ type EventTracker struct {
8989
//
9090
// config := &EventTrackerConfig{
9191
// RpcEndpoint: "http://some-json-rpc-url.com",
92-
// StartBlockFromConfig: 100_000,
9392
// NumBlockConfirmations: 10,
9493
// SyncBatchSize: 20,
95-
// MaxBacklogSize: 10_000,
94+
// NumOfBlocksToReconcile:10_000,
9695
// PollInterval: 2 * time.Second,
9796
// Logger: logger,
9897
// Store: store,
@@ -103,15 +102,16 @@ type EventTracker struct {
103102
// IDs: []ethgo.Hash{idHashOfSomeEvent},
104103
// },
105104
// }
106-
// t := NewEventTracker(config)
105+
// t := NewEventTracker(config, store)
107106
//
108107
// Inputs:
109108
// - config (TrackerConfig): configuration of EventTracker.
110109
//
111110
// Outputs:
112111
// - A new instance of the EventTracker struct.
113-
func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) (*EventTracker, error) {
114-
lastProcessedBlock, err := config.Store.GetLastProcessedBlock()
112+
func NewEventTracker(config *EventTrackerConfig, store EventTrackerStore,
113+
startBlockFromGenesis uint64) (*EventTracker, error) {
114+
lastProcessedBlock, err := store.GetLastProcessedBlock()
115115
if err != nil {
116116
return nil, err
117117
}
@@ -137,6 +137,7 @@ func NewEventTracker(config *EventTrackerConfig, startBlockFromGenesis uint64) (
137137

138138
return &EventTracker{
139139
config: config,
140+
store: store,
140141
closeCh: make(chan struct{}),
141142
blockTracker: blocktracker.NewJSONBlockTracker(config.BlockProvider),
142143
blockContainer: NewTrackerBlockContainer(lastProcessedBlock),
@@ -166,7 +167,7 @@ func (e *EventTracker) Close() {
166167
// Start is a method in the EventTracker struct that starts the tracking of blocks
167168
// and retrieval of logs from given blocks from the tracked chain.
168169
// If the tracker was turned off (node was down) for some time, it will sync up all the missed
169-
// blocks and logs from the last start (in regards to MaxBacklogSize field in config).
170+
// blocks and logs from the last start (in regards to NumOfBlocksToReconcile field in config).
170171
//
171172
// Returns:
172173
// - nil if start passes successfully.
@@ -416,7 +417,7 @@ func (e *EventTracker) processLogs() error {
416417
}
417418
}
418419

419-
if err := e.config.Store.InsertLastProcessedBlock(toBlock); err != nil {
420+
if err := e.store.InsertLastProcessedBlock(toBlock); err != nil {
420421
e.config.Logger.Error("Process logs failed on saving last processed block",
421422
"fromBlock", fromBlock,
422423
"toBlock", toBlock,
@@ -425,7 +426,7 @@ func (e *EventTracker) processLogs() error {
425426
return err
426427
}
427428

428-
if err := e.config.Store.InsertLogs(filteredLogs); err != nil {
429+
if err := e.store.InsertLogs(filteredLogs); err != nil {
429430
e.config.Logger.Error("Process logs failed on saving logs to store",
430431
"fromBlock", fromBlock,
431432
"toBlock", toBlock,

consensus/polybft/eventtracker/event_tracker_fuzz_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ type getNewStateF struct {
1616
LastProcessed uint64
1717
BatchSize uint64
1818
NumBlockConfirmations uint64
19-
MaxBackLogSize uint64
19+
NumBlocksToReconcile uint64
2020
}
2121

2222
func FuzzGetNewState(f *testing.F) {
@@ -27,23 +27,23 @@ func FuzzGetNewState(f *testing.F) {
2727
LastProcessed: 9,
2828
BatchSize: 5,
2929
NumBlockConfirmations: 3,
30-
MaxBackLogSize: 1000,
30+
NumBlocksToReconcile: 1000,
3131
},
3232
{
3333
Address: types.Address(types.StringToAddress("1").Bytes()),
3434
Number: 30,
3535
LastProcessed: 29,
3636
BatchSize: 5,
3737
NumBlockConfirmations: 3,
38-
MaxBackLogSize: 1000,
38+
NumBlocksToReconcile: 1000,
3939
},
4040
{
4141
Address: types.Address(types.StringToAddress("2").Bytes()),
4242
Number: 100,
4343
LastProcessed: 10,
4444
BatchSize: 10,
4545
NumBlockConfirmations: 3,
46-
MaxBackLogSize: 15,
46+
NumBlocksToReconcile: 15,
4747
},
4848
}
4949

@@ -74,12 +74,13 @@ func FuzzGetNewState(f *testing.F) {
7474
}
7575
providerMock.On("GetLogs", mock.Anything).Return(logs, nil)
7676

77-
testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.MaxBackLogSize)
77+
testConfig := createTestTrackerConfig(t, data.NumBlockConfirmations, data.BatchSize, data.NumBlocksToReconcile)
7878
testConfig.BlockProvider = providerMock
7979

8080
eventTracker := &EventTracker{
8181
config: testConfig,
8282
blockContainer: NewTrackerBlockContainer(data.LastProcessed),
83+
store: newTestTrackerStore(t),
8384
}
8485

8586
require.NoError(t, eventTracker.getNewState(&ethgo.Block{Number: data.Number}))

consensus/polybft/eventtracker/event_tracker_test.go

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
8989
t.Run("Add block by block - no confirmed blocks", func(t *testing.T) {
9090
t.Parallel()
9191

92-
tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), 0)
92+
tracker, err := NewEventTracker(createTestTrackerConfig(t, 10, 10, 0), newTestTrackerStore(t), 0)
9393

9494
require.NoError(t, err)
9595

@@ -112,7 +112,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
112112

113113
// check that the last processed block is 0, since we did not have any confirmed blocks
114114
require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlockLocked())
115-
lastProcessedBlockInStore, err := tracker.config.Store.GetLastProcessedBlock()
115+
lastProcessedBlockInStore, err := tracker.store.GetLastProcessedBlock()
116116
require.NoError(t, err)
117117
require.Equal(t, uint64(0), lastProcessedBlockInStore)
118118

@@ -131,7 +131,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
131131
blockProviderMock := new(mockProvider)
132132
blockProviderMock.On("GetLogs", mock.Anything).Return([]*ethgo.Log{}, nil).Once()
133133

134-
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
134+
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
135+
newTestTrackerStore(t), 0)
135136
require.NoError(t, err)
136137

137138
tracker.config.BlockProvider = blockProviderMock
@@ -164,7 +165,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
164165
// check if the last confirmed block processed is as expected
165166
require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock())
166167
// check if the last confirmed block is saved in db as well
167-
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
168+
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
168169
require.NoError(t, err)
169170
require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock)
170171
// check that in memory cache removed processed confirmed logs
@@ -197,7 +198,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
197198
blockProviderMock := new(mockProvider)
198199
blockProviderMock.On("GetLogs", mock.Anything).Return(logs, nil).Once()
199200

200-
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
201+
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
202+
newTestTrackerStore(t), 0)
201203
require.NoError(t, err)
202204

203205
tracker.config.BlockProvider = blockProviderMock
@@ -230,12 +232,12 @@ func TestEventTracker_TrackBlock(t *testing.T) {
230232
// check if the last confirmed block processed is as expected
231233
require.Equal(t, numOfConfirmedBlocks, tracker.blockContainer.LastProcessedBlock())
232234
// check if the last confirmed block is saved in db as well
233-
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
235+
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
234236
require.NoError(t, err)
235237
require.Equal(t, numOfConfirmedBlocks, lastProcessedConfirmedBlock)
236238
// check if we have logs in store
237239
for _, log := range logs {
238-
logFromDB, err := tracker.config.Store.GetLog(log.BlockNumber, log.LogIndex)
240+
logFromDB, err := tracker.store.GetLog(log.BlockNumber, log.LogIndex)
239241
require.NoError(t, err)
240242
require.Equal(t, log.Address, logFromDB.Address)
241243
require.Equal(t, log.BlockNumber, log.BlockNumber)
@@ -265,7 +267,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
265267
blockProviderMock := new(mockProvider)
266268
blockProviderMock.On("GetLogs", mock.Anything).Return(nil, errors.New("some error occurred")).Once()
267269

268-
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0), 0)
270+
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, 10, 0),
271+
newTestTrackerStore(t), 0)
269272
require.NoError(t, err)
270273

271274
tracker.config.BlockProvider = blockProviderMock
@@ -298,7 +301,7 @@ func TestEventTracker_TrackBlock(t *testing.T) {
298301
// check if the last confirmed block processed is as expected, in this case 0, because an error occurred
299302
require.Equal(t, uint64(0), tracker.blockContainer.LastProcessedBlock())
300303
// check if the last confirmed block is saved in db as well
301-
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
304+
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
302305
require.NoError(t, err)
303306
require.Equal(t, uint64(0), lastProcessedConfirmedBlock)
304307
// check that in memory cache nothing got removed, and that we have the latest block as well
@@ -335,7 +338,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
335338
// just mock the call, it will use the provider.blocks map to handle proper returns
336339
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks))
337340

338-
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
341+
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
342+
newTestTrackerStore(t), 0)
339343
require.NoError(t, err)
340344

341345
tracker.config.BlockProvider = blockProviderMock
@@ -369,11 +373,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
369373
expectedLastProcessed := numOfMissedBlocks + 1 - numBlockConfirmations
370374
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
371375
// check if the last confirmed block is saved in db as well
372-
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
376+
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
373377
require.NoError(t, err)
374378
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
375379
// check if we have logs in store
376-
logsFromDB, err := tracker.config.Store.GetAllLogs()
380+
logsFromDB, err := tracker.store.GetAllLogs()
377381
require.NoError(t, err)
378382
require.Len(t, logsFromDB, len(logs))
379383

@@ -420,7 +424,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
420424
// just mock the call, it will use the provider.blocks map to handle proper returns
421425
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfMissedBlocks + numOfCachedBlocks))
422426

423-
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
427+
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
428+
newTestTrackerStore(t), 0)
424429
require.NoError(t, err)
425430

426431
tracker.config.BlockProvider = blockProviderMock
@@ -467,11 +472,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
467472
expectedLastProcessed := numOfMissedBlocks + numOfCachedBlocks + 1 - numBlockConfirmations
468473
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
469474
// check if the last confirmed block is saved in db as well
470-
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
475+
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
471476
require.NoError(t, err)
472477
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
473478
// check if we have logs in store
474-
logsFromDB, err := tracker.config.Store.GetAllLogs()
479+
logsFromDB, err := tracker.store.GetAllLogs()
475480
require.NoError(t, err)
476481
require.Len(t, logsFromDB, len(logs))
477482

@@ -515,7 +520,8 @@ func TestEventTracker_TrackBlock(t *testing.T) {
515520
// just mock the call, it will use the provider.blocks map to handle proper returns
516521
blockProviderMock.On("GetBlockByNumber", mock.Anything, mock.Anything).Return(nil, nil).Times(int(numOfCachedBlocks))
517522

518-
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0), 0)
523+
tracker, err := NewEventTracker(createTestTrackerConfig(t, numBlockConfirmations, batchSize, 0),
524+
newTestTrackerStore(t), 0)
519525
require.NoError(t, err)
520526

521527
tracker.config.BlockProvider = blockProviderMock
@@ -560,11 +566,11 @@ func TestEventTracker_TrackBlock(t *testing.T) {
560566
expectedLastProcessed := numOfCachedBlocks + 1 - numBlockConfirmations
561567
require.Equal(t, expectedLastProcessed, tracker.blockContainer.LastProcessedBlock())
562568
// check if the last confirmed block is saved in db as well
563-
lastProcessedConfirmedBlock, err := tracker.config.Store.GetLastProcessedBlock()
569+
lastProcessedConfirmedBlock, err := tracker.store.GetLastProcessedBlock()
564570
require.NoError(t, err)
565571
require.Equal(t, expectedLastProcessed, lastProcessedConfirmedBlock)
566572
// check if we have logs in store
567-
logsFromDB, err := tracker.config.Store.GetAllLogs()
573+
logsFromDB, err := tracker.store.GetAllLogs()
568574
require.NoError(t, err)
569575
require.Len(t, logsFromDB, len(logs))
570576

@@ -598,7 +604,6 @@ func createTestTrackerConfig(t *testing.T, numBlockConfirmations, batchSize,
598604
LogFilter: map[ethgo.Address][]ethgo.Hash{
599605
ethgo.ZeroAddress: {stateSyncEvent.Sig()},
600606
},
601-
Store: newTestTrackerStore(t),
602607
EventSubscriber: new(mockEventSubscriber),
603608
BlockProvider: new(mockProvider),
604609
}

consensus/polybft/state_sync_manager.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ type stateSyncManager struct {
9494
nextCommittedIndex uint64
9595

9696
runtime Runtime
97+
tracker *eventtracker.EventTracker
9798
}
9899

99100
// topic is an interface for p2p message gossiping
@@ -129,6 +130,10 @@ func (s *stateSyncManager) Init() error {
129130

130131
func (s *stateSyncManager) Close() {
131132
close(s.closeCh)
133+
134+
if s.tracker != nil {
135+
s.tracker.Close()
136+
}
132137
}
133138

134139
// initTracker sets up and starts the event tracker implementation
@@ -152,17 +157,18 @@ func (s *stateSyncManager) initTracker() error {
152157
NumOfBlocksToReconcile: s.config.trackerBlocksToReconcile,
153158
PollInterval: s.config.blockTrackerPollInterval,
154159
Logger: s.logger,
155-
Store: store,
156160
EventSubscriber: s,
157161
BlockProvider: clt.Eth(),
158162
LogFilter: map[ethgo.Address][]ethgo.Hash{
159163
ethgo.Address(s.config.stateSenderAddr): {stateSyncEvent.Sig()},
160164
},
161-
}, s.config.stateSenderStartBlock)
165+
}, store, s.config.stateSenderStartBlock)
162166
if err != nil {
163167
return err
164168
}
165169

170+
s.tracker = tracker
171+
166172
return tracker.Start()
167173
}
168174

0 commit comments

Comments
 (0)