Skip to content

Commit b5e0bfc

Browse files
Merge branch 'develop' of github.com:0xPolygon/bor into avalkov/Sync-metrics
2 parents 3c7e888 + 6b8fde2 commit b5e0bfc

File tree

18 files changed

+3454
-132
lines changed

18 files changed

+3454
-132
lines changed

cmd/utils/bor_flags.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ var (
1616
// Bor Specific flags
1717
//
1818

19-
// HeimdallURLFlag flag for heimdall url
19+
// HeimdallURLFlag flag for heimdall url (comma-separated for failover)
2020
HeimdallURLFlag = &cli.StringFlag{
2121
Name: "bor.heimdall",
22-
Usage: "URL of Heimdall service",
22+
Usage: "URL of Heimdall service (comma-separated for failover: \"url1,url2\")",
2323
Value: "http://localhost:1317",
2424
}
2525

@@ -36,17 +36,17 @@ var (
3636
Usage: "Run without Heimdall service (for testing purpose)",
3737
}
3838

39-
// HeimdallgRPCAddressFlag flag for heimdall gRPC address
39+
// HeimdallgRPCAddressFlag flag for heimdall gRPC address (comma-separated for failover)
4040
HeimdallgRPCAddressFlag = &cli.StringFlag{
4141
Name: "bor.heimdallgRPC",
42-
Usage: "Address of Heimdall gRPC service",
42+
Usage: "Address of Heimdall gRPC service (comma-separated for failover: \"addr1,addr2\")",
4343
Value: "",
4444
}
4545

46-
// HeimdallWSAddressFlag flag for heimdall websocket subscription service
46+
// HeimdallWSAddressFlag flag for heimdall websocket subscription service (comma-separated for failover)
4747
HeimdallWSAddressFlag = &cli.StringFlag{
4848
Name: "bor.heimdallWS",
49-
Usage: "Address of Heimdall WS Subscription service",
49+
Usage: "Address of Heimdall WS Subscription service (comma-separated for failover: \"addr1,addr2\")",
5050
Value: "",
5151
}
5252

consensus/bor/heimdall/client.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@ var (
3939
ErrServiceUnavailable = errors.New("service unavailable")
4040
)
4141

42+
// HTTPStatusError is returned when Heimdall responds with a non-2xx, non-503 status code.
43+
// It wraps ErrNotSuccessfulResponse for backwards-compatibility with errors.Is checks.
44+
type HTTPStatusError struct {
45+
StatusCode int
46+
}
47+
48+
func (e *HTTPStatusError) Error() string {
49+
return fmt.Sprintf("%s: response code %d", ErrNotSuccessfulResponse.Error(), e.StatusCode)
50+
}
51+
52+
func (e *HTTPStatusError) Unwrap() error {
53+
return ErrNotSuccessfulResponse
54+
}
55+
4256
const (
4357
heimdallAPIBodyLimit = 128 * 1024 * 1024 // 128 MB
4458
stateFetchLimit = 50
@@ -455,7 +469,7 @@ func internalFetch(ctx context.Context, client http.Client, u *url.URL) ([]byte,
455469

456470
// check status code
457471
if res.StatusCode != 200 && res.StatusCode != 204 {
458-
return nil, fmt.Errorf("%w: response code %d", ErrNotSuccessfulResponse, res.StatusCode)
472+
return nil, &HTTPStatusError{StatusCode: res.StatusCode}
459473
}
460474

461475
// unmarshall data from buffer
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
package heimdall
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net"
8+
"time"
9+
10+
"github.com/0xPolygon/heimdall-v2/x/bor/types"
11+
ctypes "github.com/cometbft/cometbft/rpc/core/types"
12+
13+
"github.com/ethereum/go-ethereum/consensus/bor/clerk"
14+
"github.com/ethereum/go-ethereum/consensus/bor/heimdall/checkpoint"
15+
"github.com/ethereum/go-ethereum/consensus/bor/heimdall/milestone"
16+
"github.com/ethereum/go-ethereum/log"
17+
)
18+
19+
const (
20+
defaultAttemptTimeout = 30 * time.Second
21+
defaultProbeTimeout = 5 * time.Second
22+
defaultHealthCheckInterval = 10 * time.Second
23+
defaultConsecutiveThreshold = 3
24+
defaultPromotionCooldown = 60 * time.Second
25+
)
26+
27+
// Endpoint matches bor.IHeimdallClient. It is exported so that external
28+
// packages can build []Endpoint slices for NewMultiHeimdallClient without
29+
// running into Go's covariant-slice restriction.
30+
type Endpoint interface {
31+
StateSyncEvents(ctx context.Context, fromID uint64, to int64) ([]*clerk.EventRecordWithTime, error)
32+
GetSpan(ctx context.Context, spanID uint64) (*types.Span, error)
33+
GetLatestSpan(ctx context.Context) (*types.Span, error)
34+
FetchCheckpoint(ctx context.Context, number int64) (*checkpoint.Checkpoint, error)
35+
FetchCheckpointCount(ctx context.Context) (int64, error)
36+
FetchMilestone(ctx context.Context) (*milestone.Milestone, error)
37+
FetchMilestoneCount(ctx context.Context) (int64, error)
38+
FetchStatus(ctx context.Context) (*ctypes.SyncInfo, error)
39+
Close()
40+
}
41+
42+
// MultiHeimdallClient wraps N heimdall clients (primary at index 0, failovers
43+
// at 1..N-1) and transparently cascades through them when the active client is
44+
// unreachable. A background health registry continuously probes ALL endpoints,
45+
// requires consecutive successes + cooldown before promotion, and gives cascade
46+
// full visibility into endpoint health.
47+
type MultiHeimdallClient struct {
48+
clients []Endpoint
49+
registry *HealthRegistry
50+
attemptTimeout time.Duration
51+
probeTimeout time.Duration
52+
probeCtx context.Context // cancelled on Close to abort in-flight probes
53+
probeCancel context.CancelFunc
54+
}
55+
56+
func NewMultiHeimdallClient(clients ...Endpoint) (*MultiHeimdallClient, error) {
57+
if len(clients) == 0 {
58+
return nil, fmt.Errorf("NewMultiHeimdallClient requires at least one client")
59+
}
60+
61+
probeCtx, probeCancel := context.WithCancel(context.Background())
62+
63+
f := &MultiHeimdallClient{
64+
clients: clients,
65+
attemptTimeout: defaultAttemptTimeout,
66+
probeTimeout: defaultProbeTimeout,
67+
probeCtx: probeCtx,
68+
probeCancel: probeCancel,
69+
}
70+
71+
f.registry = NewHealthRegistry(
72+
len(clients),
73+
f.probeEndpoint,
74+
nil, // HTTP client doesn't need onSwitch callback
75+
RegistryMetrics{
76+
ProbeAttempts: failoverProbeAttempts,
77+
ProbeSuccesses: failoverProbeSuccesses,
78+
ProactiveSwitches: failoverProactiveSwitches,
79+
ActiveGauge: failoverActiveGauge,
80+
HealthyEndpoints: failoverHealthyEndpoints,
81+
},
82+
)
83+
84+
return f, nil
85+
}
86+
87+
// probeEndpoint probes a single endpoint via FetchStatus.
88+
func (f *MultiHeimdallClient) probeEndpoint(i int) error {
89+
ctx, cancel := context.WithTimeout(f.probeCtx, f.probeTimeout)
90+
defer cancel()
91+
92+
_, err := f.clients[i].FetchStatus(ctx)
93+
94+
return err
95+
}
96+
97+
// ensureHealthRegistry lazily starts the health registry goroutine on the first
98+
// API call. This allows tests to configure fields (thresholds, intervals) after
99+
// construction but before the goroutine reads them.
100+
func (f *MultiHeimdallClient) ensureHealthRegistry() {
101+
if len(f.clients) > 1 {
102+
f.registry.Start()
103+
}
104+
}
105+
106+
func (f *MultiHeimdallClient) StateSyncEvents(ctx context.Context, fromID uint64, to int64) ([]*clerk.EventRecordWithTime, error) {
107+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) ([]*clerk.EventRecordWithTime, error) {
108+
return c.StateSyncEvents(ctx, fromID, to)
109+
})
110+
}
111+
112+
func (f *MultiHeimdallClient) GetSpan(ctx context.Context, spanID uint64) (*types.Span, error) {
113+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (*types.Span, error) {
114+
return c.GetSpan(ctx, spanID)
115+
})
116+
}
117+
118+
func (f *MultiHeimdallClient) GetLatestSpan(ctx context.Context) (*types.Span, error) {
119+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (*types.Span, error) {
120+
return c.GetLatestSpan(ctx)
121+
})
122+
}
123+
124+
func (f *MultiHeimdallClient) FetchCheckpoint(ctx context.Context, number int64) (*checkpoint.Checkpoint, error) {
125+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (*checkpoint.Checkpoint, error) {
126+
return c.FetchCheckpoint(ctx, number)
127+
})
128+
}
129+
130+
func (f *MultiHeimdallClient) FetchCheckpointCount(ctx context.Context) (int64, error) {
131+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (int64, error) {
132+
return c.FetchCheckpointCount(ctx)
133+
})
134+
}
135+
136+
func (f *MultiHeimdallClient) FetchMilestone(ctx context.Context) (*milestone.Milestone, error) {
137+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (*milestone.Milestone, error) {
138+
return c.FetchMilestone(ctx)
139+
})
140+
}
141+
142+
func (f *MultiHeimdallClient) FetchMilestoneCount(ctx context.Context) (int64, error) {
143+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (int64, error) {
144+
return c.FetchMilestoneCount(ctx)
145+
})
146+
}
147+
148+
func (f *MultiHeimdallClient) FetchStatus(ctx context.Context) (*ctypes.SyncInfo, error) {
149+
return callWithFailover(f, ctx, func(ctx context.Context, c Endpoint) (*ctypes.SyncInfo, error) {
150+
return c.FetchStatus(ctx)
151+
})
152+
}
153+
154+
func (f *MultiHeimdallClient) Close() {
155+
f.probeCancel() // cancel in-flight probes first
156+
f.registry.Stop()
157+
158+
for _, c := range f.clients {
159+
c.Close()
160+
}
161+
}
162+
163+
// callWithFailover executes fn against the active client. If the active client
164+
// fails with a failover-eligible error, it marks it unhealthy and cascades
165+
// through remaining clients using health registry information.
166+
func callWithFailover[T any](f *MultiHeimdallClient, ctx context.Context, fn func(context.Context, Endpoint) (T, error)) (T, error) {
167+
f.ensureHealthRegistry()
168+
169+
active := f.registry.Active()
170+
171+
subCtx, cancel := context.WithTimeout(ctx, f.attemptTimeout)
172+
result, err := fn(subCtx, f.clients[active])
173+
cancel()
174+
175+
if err == nil {
176+
return result, nil
177+
}
178+
179+
if !isFailoverError(err, ctx) {
180+
var zero T
181+
return zero, err
182+
}
183+
184+
// Mark the active endpoint unhealthy in the registry.
185+
f.registry.MarkUnhealthy(active, err)
186+
187+
if active == 0 {
188+
log.Warn("Heimdall failover: primary failed, cascading", "err", err)
189+
}
190+
191+
return cascadeClients(f, ctx, fn, active, err)
192+
}
193+
194+
// cascadeClients tries all endpoints in priority order using health registry
195+
// information. It uses a three-pass approach:
196+
// 1. Healthy + cooled endpoints in priority order (skipping failed active)
197+
// 2. Healthy but NOT cooled endpoints in priority order
198+
// 3. Unhealthy endpoints in priority order (last resort)
199+
func cascadeClients[T any](f *MultiHeimdallClient, ctx context.Context, fn func(context.Context, Endpoint) (T, error), failed int, lastErr error) (T, error) {
200+
n := len(f.clients)
201+
202+
// Build candidate lists based on health state.
203+
snap := f.registry.HealthSnapshot()
204+
cooldown := f.registry.PromotionCooldown
205+
206+
var cooled, uncooled, unhealthy []int
207+
208+
for i := 0; i < n; i++ {
209+
if i == failed {
210+
continue
211+
}
212+
213+
if snap[i].Healthy {
214+
if time.Since(snap[i].HealthySince) >= cooldown {
215+
cooled = append(cooled, i)
216+
} else {
217+
uncooled = append(uncooled, i)
218+
}
219+
} else {
220+
unhealthy = append(unhealthy, i)
221+
}
222+
}
223+
224+
// Try each pass in order.
225+
passes := [][]int{cooled, uncooled, unhealthy}
226+
227+
for _, candidates := range passes {
228+
for _, i := range candidates {
229+
subCtx, cancel := context.WithTimeout(ctx, f.attemptTimeout)
230+
result, err := fn(subCtx, f.clients[i])
231+
cancel()
232+
233+
if err == nil {
234+
f.registry.SetActive(i)
235+
f.registry.MarkSuccess(i)
236+
237+
failoverSwitchCounter.Inc(1)
238+
239+
log.Warn("Heimdall failover: switched to client", "index", i)
240+
241+
return result, nil
242+
}
243+
244+
lastErr = err
245+
246+
if !isFailoverError(err, ctx) {
247+
var zero T
248+
return zero, err
249+
}
250+
251+
// Mark this endpoint unhealthy too.
252+
f.registry.MarkUnhealthy(i, err)
253+
}
254+
}
255+
256+
var zero T
257+
return zero, lastErr
258+
}
259+
260+
// isFailoverError returns true if the error warrants trying the secondary.
261+
// It distinguishes between sub-context timeouts (failover-eligible) and
262+
// caller context cancellation (not eligible).
263+
func isFailoverError(err error, callerCtx context.Context) bool {
264+
if err == nil {
265+
return false
266+
}
267+
268+
// If the caller's context is done, this is not a failover scenario
269+
if callerCtx.Err() != nil {
270+
return false
271+
}
272+
273+
// Shutdown detected - not a transport error
274+
if errors.Is(err, ErrShutdownDetected) {
275+
return false
276+
}
277+
278+
// 503 is a Heimdall feature-gate, not a transport issue
279+
if errors.Is(err, ErrServiceUnavailable) {
280+
return false
281+
}
282+
283+
// Transport errors
284+
var netErr net.Error
285+
if errors.As(err, &netErr) {
286+
return true
287+
}
288+
289+
// No response from Heimdall
290+
if errors.Is(err, ErrNoResponse) {
291+
return true
292+
}
293+
294+
// Server-side HTTP error (5xx, excluding 503 which is already handled above).
295+
// Client errors (4xx) are logical errors; the secondary would return the same response.
296+
var httpErr *HTTPStatusError
297+
if errors.As(err, &httpErr) {
298+
return httpErr.StatusCode >= 500
299+
}
300+
301+
// Sub-context deadline exceeded (the caller's context is still alive at this point)
302+
if errors.Is(err, context.DeadlineExceeded) {
303+
return true
304+
}
305+
306+
// Context canceled from sub-context (caller ctx is still alive)
307+
if errors.Is(err, context.Canceled) {
308+
return true
309+
}
310+
311+
return false
312+
}

0 commit comments

Comments
 (0)