Skip to content

Commit df8aefa

Browse files
mpolitzervfusco
authored andcommitted
fix: respect max_retries and subscription_timeout
- fixed evm-reader `Run`. It was not respecting CARTESI_BLOCKCHAIN_SUBSCRIPTION_TIMEOUT and CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES options. - Propagate `node` Context and Cancel function to services.
1 parent fe00c63 commit df8aefa

File tree

8 files changed

+86
-88
lines changed

8 files changed

+86
-88
lines changed

internal/config/generate/Config.toml

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,6 @@ The default block to be used by EVM Reader and Claimer when requesting new block
150150
One of 'latest', 'pending', 'safe', 'finalized'"""
151151
used-by = ["evmreader", "claimer", "node", "prt"]
152152

153-
[blockchain.CARTESI_BLOCKCHAIN_SUBSCRIPTION_TIMEOUT]
154-
default = "60"
155-
go-type = "Duration"
156-
description = """
157-
Block subscription timeout in seconds."""
158-
used-by = ["evmreader", "node"]
159-
160153
[rollups.CARTESI_BLOCKCHAIN_HTTP_MAX_RETRIES]
161154
default = "4"
162155
go-type = "uint64"

internal/config/generated.go

Lines changed: 14 additions & 51 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/evmreader/evmreader.go

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,28 @@ type appContracts struct {
7272
}
7373

7474
func (r *Service) Run(ctx context.Context, ready chan struct{}) error {
75-
for {
75+
for attempt := uint64(1); ; attempt++ {
7676
err := r.watchForNewBlocks(ctx, ready)
77-
// If the error is a SubscriptionError, re run watchForNewBlocks
78-
// that it will restart the websocket subscription
79-
if _, ok := err.(*SubscriptionError); !ok {
77+
r.Logger.Error(err.Error())
78+
79+
if attempt > r.blockchainMaxRetries {
80+
r.Logger.Error("Max attempts reached for subscription restart. Exititng",
81+
"max_retries", r.blockchainMaxRetries,
82+
)
8083
return err
8184
}
82-
r.Logger.Error(err.Error())
83-
r.Logger.Info("Restarting subscription")
85+
86+
r.Logger.Info("Restarting subscription",
87+
"attempt", attempt,
88+
"remaining", r.blockchainMaxRetries - attempt,
89+
"time_between_attempts", r.blockchainSubscriptionRetryInterval,
90+
)
91+
92+
// sleep or cancel
93+
select {
94+
case <-ctx.Done():
95+
case <-time.After(r.blockchainSubscriptionRetryInterval):
96+
}
8497
}
8598
}
8699

internal/evmreader/evmreader_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -145,13 +145,15 @@ func (s *EvmReaderSuite) SetupTest() {
145145
s.contractFactory = newMockAdapterFactory().SetupDefaultBehavior(s.applicationContract1, s.applicationContract2, s.inputBox)
146146

147147
s.evmReader = &Service{
148-
client: s.client,
149-
wsClient: s.wsClient,
150-
repository: s.repository,
151-
defaultBlock: DefaultBlock_Latest,
152-
adapterFactory: s.contractFactory,
153-
hasEnabledApps: true,
154-
inputReaderEnabled: true,
148+
client: s.client,
149+
wsClient: s.wsClient,
150+
repository: s.repository,
151+
defaultBlock: DefaultBlock_Latest,
152+
adapterFactory: s.contractFactory,
153+
hasEnabledApps: true,
154+
inputReaderEnabled: true,
155+
blockchainMaxRetries: 0,
156+
blockchainSubscriptionRetryInterval: time.Second,
155157
}
156158

157159
logLevel, err := config.GetLogLevel()

internal/evmreader/output_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -527,13 +527,15 @@ func (s *EvmReaderSuite) setupOutputMismatchTest() {
527527
s.contractFactory = newMockAdapterFactory()
528528

529529
s.evmReader = &Service{
530-
client: s.client,
531-
wsClient: s.wsClient,
532-
repository: s.repository,
533-
defaultBlock: DefaultBlock_Latest,
534-
adapterFactory: s.contractFactory,
535-
hasEnabledApps: true,
536-
inputReaderEnabled: true,
530+
client: s.client,
531+
wsClient: s.wsClient,
532+
repository: s.repository,
533+
defaultBlock: DefaultBlock_Latest,
534+
adapterFactory: s.contractFactory,
535+
hasEnabledApps: true,
536+
inputReaderEnabled: true,
537+
blockchainMaxRetries: 0,
538+
blockchainSubscriptionRetryInterval: time.Second,
537539
}
538540

539541
logLevel, err := config.GetLogLevel()

internal/evmreader/service.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"errors"
99
"fmt"
1010
"math/big"
11+
"time"
1112

1213
"github.com/cartesi/rollups-node/internal/config"
1314
. "github.com/cartesi/rollups-node/internal/model"
@@ -30,14 +31,16 @@ type CreateInfo struct {
3031
type Service struct {
3132
service.Service
3233

33-
client EthClientInterface
34-
wsClient EthClientInterface
35-
adapterFactory AdapterFactory
36-
repository EvmReaderRepository
37-
chainId uint64
38-
defaultBlock DefaultBlock
39-
hasEnabledApps bool
40-
inputReaderEnabled bool
34+
client EthClientInterface
35+
wsClient EthClientInterface
36+
adapterFactory AdapterFactory
37+
repository EvmReaderRepository
38+
chainId uint64
39+
defaultBlock DefaultBlock
40+
hasEnabledApps bool
41+
inputReaderEnabled bool
42+
blockchainMaxRetries uint64
43+
blockchainSubscriptionRetryInterval time.Duration
4144
}
4245

4346
const EvmReaderConfigKey = "evm-reader"
@@ -99,6 +102,8 @@ func Create(ctx context.Context, c *CreateInfo) (*Service, error) {
99102
return nil, fmt.Errorf("NodeConfig chainId mismatch: network %d != config %d",
100103
chainId.Uint64(), nodeConfig.ChainID)
101104
}
105+
s.blockchainMaxRetries = c.Config.BlockchainHttpMaxRetries
106+
s.blockchainSubscriptionRetryInterval = c.Config.BlockchainHttpRetryMinWait
102107

103108
s.client = c.EthClient
104109
s.wsClient = c.EthWsClient
@@ -140,7 +145,10 @@ func (s *Service) Tick() []error {
140145

141146
func (s *Service) Serve() error {
142147
ready := make(chan struct{}, 1)
143-
go s.Run(s.Context, ready)
148+
go func() {
149+
s.Run(s.Context, ready)
150+
s.Service.Stop(false)
151+
}()
144152
return s.Service.Serve()
145153
}
146154

internal/node/node.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ func newEVMReader(ctx context.Context, c *CreateInfo, s *Service) service.IServi
155155
readerArgs := evmreader.CreateInfo{
156156
CreateInfo: service.CreateInfo{
157157
Name: "evm-reader",
158+
Context: s.Context,
159+
Cancel: s.Cancel,
158160
LogLevel: c.Config.LogLevel,
159161
LogColor: c.Config.LogColor,
160162
EnableSignalHandling: false,
@@ -179,6 +181,8 @@ func newAdvancer(ctx context.Context, c *CreateInfo, s *Service) service.IServic
179181
advancerArgs := advancer.CreateInfo{
180182
CreateInfo: service.CreateInfo{
181183
Name: "advancer",
184+
Context: s.Context,
185+
Cancel: s.Cancel,
182186
LogLevel: c.Config.LogLevel,
183187
LogColor: c.Config.LogColor,
184188
EnableSignalHandling: false,
@@ -202,6 +206,8 @@ func newValidator(ctx context.Context, c *CreateInfo, s *Service) service.IServi
202206
validatorArgs := validator.CreateInfo{
203207
CreateInfo: service.CreateInfo{
204208
Name: "validator",
209+
Context: s.Context,
210+
Cancel: s.Cancel,
205211
LogLevel: c.Config.LogLevel,
206212
LogColor: c.Config.LogColor,
207213
EnableSignalHandling: false,
@@ -225,6 +231,8 @@ func newClaimer(ctx context.Context, c *CreateInfo, s *Service) service.IService
225231
claimerArgs := claimer.CreateInfo{
226232
CreateInfo: service.CreateInfo{
227233
Name: "claimer",
234+
Context: s.Context,
235+
Cancel: s.Cancel,
228236
LogLevel: c.Config.LogLevel,
229237
LogColor: c.Config.LogColor,
230238
EnableSignalHandling: false,
@@ -249,6 +257,8 @@ func newJsonrpc(ctx context.Context, c *CreateInfo, s *Service) service.IService
249257
jsonrpcArgs := jsonrpc.CreateInfo{
250258
CreateInfo: service.CreateInfo{
251259
Name: "jsonrpc",
260+
Context: s.Context,
261+
Cancel: s.Cancel,
252262
LogLevel: c.Config.LogLevel,
253263
LogColor: c.Config.LogColor,
254264
EnableSignalHandling: false,
@@ -271,6 +281,8 @@ func newPrt(ctx context.Context, c *CreateInfo, s *Service) service.IService {
271281
prtArgs := prt.CreateInfo{
272282
CreateInfo: service.CreateInfo{
273283
Name: "prt",
284+
Context: s.Context,
285+
Cancel: s.Cancel,
274286
LogLevel: c.Config.LogLevel,
275287
LogColor: c.Config.LogColor,
276288
EnableSignalHandling: false,

pkg/service/service.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ type CreateInfo struct {
103103
Impl ServiceImpl
104104
ServeMux *http.ServeMux
105105
Context context.Context
106+
Cancel context.CancelFunc
106107
}
107108

108109
// Service stores runtime information.
@@ -151,7 +152,10 @@ func Create(ctx context.Context, c *CreateInfo, s *Service) error {
151152
s.Context = c.Context
152153
}
153154
if s.Cancel == nil {
154-
s.Context, s.Cancel = context.WithCancel(c.Context)
155+
if c.Cancel == nil {
156+
s.Context, c.Cancel = context.WithCancel(c.Context)
157+
}
158+
s.Cancel = c.Cancel
155159
}
156160

157161
// ticker
@@ -246,6 +250,7 @@ func (s *Service) Stop(force bool) []error {
246250
elapsed := time.Since(start)
247251

248252
s.Running.Store(false)
253+
s.Cancel()
249254
if len(errs) > 0 {
250255
s.Logger.Error("Stop",
251256
"force", force,

0 commit comments

Comments
 (0)