Skip to content

Commit a39d825

Browse files
authored
fix(syncer) - Metadata syncer bug fixes/improvements (#2374)
* Debug logs * Remove sleeping when no metadata was refreshed * Update comment * Add condition not to stream when no validators fetched * Syncer - change batch fetch condition * Share Storage - update 'BeaconMetadataLastUpdated' for unchanged shares * Implement unit test, fix one of the conditions * SyncAll - unconditionally sync all shares * Add comment and condition for early return * Update comment * Populate metadata also for keys without CL provided metadata * fix/add more tests, linter * Minor refactor, added comment * Change some log levels to debug * Do not persist shares with unchanged metadata * fmt, add comments
1 parent 600fa69 commit a39d825

File tree

6 files changed

+119
-100
lines changed

6 files changed

+119
-100
lines changed

cli/operator/node.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -582,7 +582,7 @@ var StartNodeCmd = &cobra.Command{
582582
nodeProber.AddNode("event syncer", eventSyncer)
583583
}
584584

585-
if _, err := metadataSyncer.SyncOnStartup(cmd.Context()); err != nil {
585+
if _, err := metadataSyncer.SyncAll(cmd.Context()); err != nil {
586586
logger.Fatal("failed to sync metadata on startup", zap.Error(err))
587587
}
588588

logging/names.go

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package logging
22

33
const (
4-
NameBootNode = "BootNode"
5-
NameController = "Controller"
6-
NameDiscoveryService = "DiscoveryService"
7-
NameDutyScheduler = "DutyScheduler"
8-
NameEthClient = "EthClient"
9-
NameMetricsHandler = "MetricsHandler"
10-
NameOperator = "Operator"
11-
NameP2PNetwork = "P2PNetwork"
12-
NameSignerStorage = "SignerStorage"
13-
NameValidator = "Validator"
14-
NameWSServer = "WSServer"
15-
NameConnHandler = "ConnHandler"
4+
NameBootNode = "BootNode"
5+
NameController = "Controller"
6+
NameDiscoveryService = "DiscoveryService"
7+
NameDutyScheduler = "DutyScheduler"
8+
NameEthClient = "EthClient"
9+
NameMetricsHandler = "MetricsHandler"
10+
NameOperator = "Operator"
11+
NameP2PNetwork = "P2PNetwork"
12+
NameSignerStorage = "SignerStorage"
13+
NameValidator = "Validator"
14+
NameWSServer = "WSServer"
15+
NameConnHandler = "ConnHandler"
16+
NameShareMetadataSyncer = "ShareMetadataSyncer"
1617

1718
NameBadgerDBLog = "BadgerDBLog"
1819
NameBadgerDBReporting = "BadgerDBReporting"

operator/validator/metadata/syncer.go

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
spectypes "github.com/ssvlabs/ssv-spec/types"
1515

16+
"github.com/ssvlabs/ssv/logging"
1617
"github.com/ssvlabs/ssv/logging/fields"
1718
networkcommons "github.com/ssvlabs/ssv/network/commons"
1819
"github.com/ssvlabs/ssv/protocol/v2/blockchain/beacon"
@@ -27,7 +28,19 @@ const (
2728
defaultSyncInterval = 12 * time.Minute
2829
defaultStreamInterval = 2 * time.Second
2930
defaultUpdateSendTimeout = 30 * time.Second
30-
batchSize = 512
31+
// NOTE:
32+
// A higher value of `batchSize` results in fewer HTTP calls to the Consensus Node,
33+
// but each call will have larger payloads and responses. While this speeds up
34+
// metadata synchronization, it also increases the risk of timeouts.
35+
//
36+
// The value of `batchSize` should depend on the number of validators the node handles
37+
// (especially relevant when comparing Exporter vs. Non-Exporter nodes)
38+
// and the sync interval (how often metadata should be refreshed).
39+
//
40+
// ⚠️ Caution: Since there is no prioritization implemented, if the node cannot
41+
// sync all validator shares within the given sync interval, there is a high risk
42+
// that some validators will not be refreshed for an unpredictable amount of time.
43+
batchSize = 512
3144
)
3245

3346
type Syncer struct {
@@ -60,7 +73,7 @@ func NewSyncer(
6073
opts ...Option,
6174
) *Syncer {
6275
u := &Syncer{
63-
logger: logger,
76+
logger: logger.Named(logging.NameShareMetadataSyncer),
6477
shareStorage: shareStorage,
6578
validatorStore: validatorStore,
6679
beaconNode: beaconNode,
@@ -85,7 +98,10 @@ func WithSyncInterval(interval time.Duration) Option {
8598
}
8699
}
87100

88-
func (s *Syncer) SyncOnStartup(ctx context.Context) (beacon.ValidatorMetadataMap, error) {
101+
// SyncAll loads all non-liquidated validator shares that belong to operator's subnets,
102+
// and triggers a full metadata synchronization for them.
103+
// It returns a mapping of validator public keys to their updated metadata.
104+
func (s *Syncer) SyncAll(ctx context.Context) (beacon.ValidatorMetadataMap, error) {
89105
subnetsBuf := new(big.Int)
90106
ownSubnets := s.selfSubnets(subnetsBuf)
91107

@@ -100,19 +116,9 @@ func (s *Syncer) SyncOnStartup(ctx context.Context) (beacon.ValidatorMetadataMap
100116
return nil, nil
101117
}
102118

103-
// Skip syncing if metadata was already fetched before
104-
// to prevent blocking startup after first sync.
105-
needToSync := false
106119
pubKeysToFetch := make([]spectypes.ValidatorPK, 0, len(shares))
107120
for _, share := range shares {
108121
pubKeysToFetch = append(pubKeysToFetch, share.ValidatorPubKey)
109-
if !share.HasBeaconMetadata() {
110-
needToSync = true
111-
}
112-
}
113-
if !needToSync {
114-
// Stream should take it over from here.
115-
return nil, nil
116122
}
117123

118124
// Sync all pubkeys that belong to own subnets. We don't need to batch them because we need to wait here until all of them are synced.
@@ -122,8 +128,12 @@ func (s *Syncer) SyncOnStartup(ctx context.Context) (beacon.ValidatorMetadataMap
122128
// Sync retrieves metadata for the provided public keys and updates storage accordingly.
123129
// Returns updated metadata for keys that had changes. Returns nil if no keys were provided or no updates occurred.
124130
func (s *Syncer) Sync(ctx context.Context, pubKeys []spectypes.ValidatorPK) (beacon.ValidatorMetadataMap, error) {
131+
if len(pubKeys) == 0 {
132+
return nil, nil
133+
}
134+
125135
fetchStart := time.Now()
126-
metadata, err := s.Fetch(ctx, pubKeys)
136+
metadata, err := s.fetchMetadata(ctx, pubKeys)
127137
if err != nil {
128138
return nil, fmt.Errorf("fetch metadata: %w", err)
129139
}
@@ -150,11 +160,10 @@ func (s *Syncer) Sync(ctx context.Context, pubKeys []spectypes.ValidatorPK) (bea
150160
return updatedValidators, nil
151161
}
152162

153-
func (s *Syncer) Fetch(ctx context.Context, pubKeys []spectypes.ValidatorPK) (beacon.ValidatorMetadataMap, error) {
154-
if len(pubKeys) == 0 {
155-
return nil, nil
156-
}
157-
163+
// fetchMetadata is responsible for fetching validator metadata from the beacon node for the provided public keys.
164+
// The beacon node response is sometimes empty for certain public keys — for such validators,
165+
// the ValidatorMetadataMap will contain empty metadata objects.
166+
func (s *Syncer) fetchMetadata(ctx context.Context, pubKeys []spectypes.ValidatorPK) (beacon.ValidatorMetadataMap, error) {
158167
blsPubKeys := make([]phase0.BLSPubKey, len(pubKeys))
159168
for i, pk := range pubKeys {
160169
blsPubKeys[i] = phase0.BLSPubKey(pk)
@@ -165,7 +174,12 @@ func (s *Syncer) Fetch(ctx context.Context, pubKeys []spectypes.ValidatorPK) (be
165174
return nil, fmt.Errorf("get validator data from beacon node: %w", err)
166175
}
167176

168-
results := make(beacon.ValidatorMetadataMap, len(validatorsIndexMap))
177+
results := make(beacon.ValidatorMetadataMap, len(pubKeys))
178+
179+
for _, key := range pubKeys {
180+
results[key] = &beacon.ValidatorMetadata{}
181+
}
182+
169183
for _, v := range validatorsIndexMap {
170184
meta := &beacon.ValidatorMetadata{
171185
Status: v.Status,
@@ -179,6 +193,9 @@ func (s *Syncer) Fetch(ctx context.Context, pubKeys []spectypes.ValidatorPK) (be
179193
return results, nil
180194
}
181195

196+
// Stream continuously fetches and streams batches of validator metadata updates as they become available.
197+
// It yields updates through a channel (`SyncBatch`) and handles retries, sleeping between sync attempts
198+
// when all metadata is up to date. The loop respects the provided context and stops gracefully when canceled.
182199
func (s *Syncer) Stream(ctx context.Context) <-chan SyncBatch {
183200
metadataUpdates := make(chan SyncBatch)
184201

@@ -199,33 +216,33 @@ func (s *Syncer) Stream(ctx context.Context) <-chan SyncBatch {
199216
continue
200217
}
201218

202-
if len(batch.After) == 0 {
219+
if len(batch.Before) == 0 {
220+
s.logger.Debug("sleeping because all validators’ metadata has been refreshed.",
221+
zap.Duration("sleep_for", s.streamInterval),
222+
zap.Duration("refresh_interval", s.syncInterval))
203223
if slept := s.sleep(ctx, s.streamInterval); !slept {
204224
return
205225
}
206226
continue
207227
}
208228

209-
// TODO: use time.After when Go is updated to 1.23
210-
timer := time.NewTimer(s.updateSendTimeout)
211229
select {
212230
case metadataUpdates <- batch:
213231
// Only sleep if there aren't more validators to fetch metadata for.
214232
// It's done to wait for some data to appear. Without sleep, the next batch would likely be empty.
215233
if done {
234+
s.logger.Debug("sleeping after batch was streamed because all validators’ metadata has been refreshed.",
235+
zap.Duration("sleep_for", s.streamInterval),
236+
zap.Duration("refresh_interval", s.syncInterval))
216237
if slept := s.sleep(ctx, s.streamInterval); !slept {
217-
// canceled context
218-
timer.Stop()
219238
return
220239
}
221240
}
222241
case <-ctx.Done():
223-
timer.Stop()
224242
return
225-
case <-timer.C:
243+
case <-time.After(s.updateSendTimeout):
226244
s.logger.Warn("timed out waiting for sending update")
227245
}
228-
timer.Stop()
229246
}
230247
}()
231248

@@ -289,7 +306,7 @@ func (s *Syncer) nextBatchFromDB(_ context.Context, subnetsBuf *big.Int) beacon.
289306
staleShares = append(staleShares, share)
290307
}
291308

292-
return len(newShares) < batchSize
309+
return len(newShares)+len(staleShares) < batchSize
293310
})
294311

295312
// Combine validators up to batchSize, prioritizing the new ones.
@@ -307,14 +324,10 @@ func (s *Syncer) nextBatchFromDB(_ context.Context, subnetsBuf *big.Int) beacon.
307324
}
308325

309326
func (s *Syncer) sleep(ctx context.Context, d time.Duration) (slept bool) {
310-
// TODO: use time.After when Go is updated to 1.23
311-
timer := time.NewTimer(d)
312-
defer timer.Stop()
313-
314327
select {
315328
case <-ctx.Done():
316329
return false
317-
case <-timer.C:
330+
case <-time.After(d):
318331
return true
319332
}
320333
}

0 commit comments

Comments
 (0)