Skip to content

Commit 1abbaca

Browse files
authored
Merge pull request #1049 from rocket-pool/refactor-network-state
Refactor createNetworkStateForNode and createNetworkState
2 parents 4a07fa6 + 03d7610 commit 1abbaca

File tree

3 files changed

+555
-196
lines changed

3 files changed

+555
-196
lines changed

shared/services/state/manager.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (m *NetworkStateManager) GetHeadState() (*NetworkState, error) {
6464
if err != nil {
6565
return nil, fmt.Errorf("error getting latest Beacon slot: %w", err)
6666
}
67-
return m.createNetworkState(targetSlot)
67+
return m.createNetworkState(targetSlot, nil)
6868
}
6969

7070
// Get the state of the network for a single node using the latest Execution layer block, along with the total effective RPL stake for the network
@@ -73,12 +73,12 @@ func (m *NetworkStateManager) GetHeadStateForNode(nodeAddress common.Address) (*
7373
if err != nil {
7474
return nil, fmt.Errorf("error getting latest Beacon slot: %w", err)
7575
}
76-
return m.createNetworkStateForNode(targetSlot, nodeAddress)
76+
return m.createNetworkState(targetSlot, []common.Address{nodeAddress})
7777
}
7878

7979
// Get the state of the network at the provided Beacon slot
8080
func (m *NetworkStateManager) GetStateForSlot(slotNumber uint64) (*NetworkState, error) {
81-
return m.createNetworkState(slotNumber)
81+
return m.createNetworkState(slotNumber, nil)
8282
}
8383

8484
// Gets the latest valid block

shared/services/state/network-state.go

Lines changed: 50 additions & 193 deletions
Original file line numberDiff line numberDiff line change
@@ -166,8 +166,12 @@ func (ns *NetworkState) UnmarshalJSON(data []byte) error {
166166
return nil
167167
}
168168

169-
// Creates a snapshot of the entire Rocket Pool network state, on both the Execution and Consensus layers
170-
func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkState, error) {
169+
// Creates a snapshot of the Rocket Pool network state, on both the Execution and Consensus layers.
170+
// If nodeAddresses is nil, all nodes are queried. Otherwise, only the specified nodes are included.
171+
func (m *NetworkStateManager) createNetworkState(slotNumber uint64, nodeAddresses []common.Address) (*NetworkState, error) {
172+
allNodes := len(nodeAddresses) == 0
173+
steps := 9
174+
currentStep := 0
171175

172176
// Get the execution block for the given slot
173177
beaconBlock, exists, err := m.bc.GetBeaconBlock(fmt.Sprintf("%d", slotNumber))
@@ -211,21 +215,46 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
211215
if err != nil {
212216
return nil, fmt.Errorf("error getting network details: %w", err)
213217
}
214-
m.logLine("1/7 - Retrieved network details (%s so far)", time.Since(start))
218+
currentStep++
219+
m.logLine("%d/%d - Retrieved network details (%s so far)", currentStep, steps, time.Since(start))
215220

216221
// Node details
217-
state.NodeDetails, err = rpstate.GetAllNativeNodeDetails(m.rp, contracts)
218-
if err != nil {
219-
return nil, fmt.Errorf("error getting all node details: %w", err)
222+
if allNodes {
223+
state.NodeDetails, err = rpstate.GetAllNativeNodeDetails(m.rp, contracts)
224+
if err != nil {
225+
return nil, fmt.Errorf("error getting all node details: %w", err)
226+
}
227+
} else {
228+
state.NodeDetails = make([]rpstate.NativeNodeDetails, 0, len(nodeAddresses))
229+
for _, addr := range nodeAddresses {
230+
nodeDetails, err := rpstate.GetNativeNodeDetails(m.rp, contracts, addr)
231+
if err != nil {
232+
return nil, fmt.Errorf("error getting node details for %s: %w", addr.Hex(), err)
233+
}
234+
state.NodeDetails = append(state.NodeDetails, nodeDetails)
235+
}
220236
}
221-
m.logLine("2/7 - Retrieved node details (%s so far)", time.Since(start))
237+
currentStep++
238+
m.logLine("%d/%d - Retrieved node details (%s so far)", currentStep, steps, time.Since(start))
222239

223240
// Minipool details
224-
state.MinipoolDetails, err = rpstate.GetAllNativeMinipoolDetails(m.rp, contracts)
225-
if err != nil {
226-
return nil, fmt.Errorf("error getting all minipool details: %w", err)
241+
if allNodes {
242+
state.MinipoolDetails, err = rpstate.GetAllNativeMinipoolDetails(m.rp, contracts)
243+
if err != nil {
244+
return nil, fmt.Errorf("error getting all minipool details: %w", err)
245+
}
246+
} else {
247+
state.MinipoolDetails = []rpstate.NativeMinipoolDetails{}
248+
for _, addr := range nodeAddresses {
249+
nodeMinipools, err := rpstate.GetNodeNativeMinipoolDetails(m.rp, contracts, addr)
250+
if err != nil {
251+
return nil, fmt.Errorf("error getting minipool details for node %s: %w", addr.Hex(), err)
252+
}
253+
state.MinipoolDetails = append(state.MinipoolDetails, nodeMinipools...)
254+
}
227255
}
228-
m.logLine("3/7 - Retrieved minipool details (%s so far)", time.Since(start))
256+
currentStep++
257+
m.logLine("%d/%d - Retrieved minipool details (%s so far)", currentStep, steps, time.Since(start))
229258

230259
// Create the node lookup
231260
for i, details := range state.NodeDetails {
@@ -254,7 +283,8 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
254283
if err != nil {
255284
return nil, fmt.Errorf("error getting all megapool validator details: %w", err)
256285
}
257-
m.logLine("4/7 - Retrieved megapool validator global index (%s so far)", time.Since(start))
286+
currentStep++
287+
m.logLine("%d/%d - Retrieved megapool validator global index (%s so far)", currentStep, steps, time.Since(start))
258288

259289
megapoolValidatorPubkeys := make([]types.ValidatorPubkey, 0, len(state.MegapoolValidatorGlobalIndex))
260290
megapoolAddressMap := make(map[common.Address][]types.ValidatorPubkey)
@@ -296,7 +326,8 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
296326
if err := megapoolWg.Wait(); err != nil {
297327
return nil, fmt.Errorf("error getting megapool details: %w", err)
298328
}
299-
m.logLine("4/7 - Retrieved megapool validator details (%s so far)", time.Since(start))
329+
currentStep++
330+
m.logLine("%d/%d - Retrieved megapool validator details (%s so far)", currentStep, steps, time.Since(start))
300331

301332
// Calculate avg node fees and distributor shares
302333
for _, details := range state.NodeDetails {
@@ -308,133 +339,8 @@ func (m *NetworkStateManager) createNetworkState(slotNumber uint64) (*NetworkSta
308339
if err != nil {
309340
return nil, fmt.Errorf("error getting Oracle DAO details: %w", err)
310341
}
311-
m.logLine("5/7 - Retrieved Oracle DAO details (%s so far)", time.Since(start))
312-
313-
// Get the validator stats from Beacon
314-
statusMap, err := m.bc.GetValidatorStatuses(pubkeys, &beacon.ValidatorStatusOptions{
315-
Slot: &slotNumber,
316-
})
317-
if err != nil {
318-
return nil, err
319-
}
320-
state.MinipoolValidatorDetails = statusMap
321-
m.logLine("6/7 - Retrieved validator details (total time: %s)", time.Since(start))
322-
323-
// Get the complete node and user shares
324-
mpds := make([]*rpstate.NativeMinipoolDetails, len(state.MinipoolDetails))
325-
beaconBalances := make([]*big.Int, len(state.MinipoolDetails))
326-
for i, mpd := range state.MinipoolDetails {
327-
mpds[i] = &state.MinipoolDetails[i]
328-
validator := state.MinipoolValidatorDetails[mpd.Pubkey]
329-
if !validator.Exists {
330-
beaconBalances[i] = big.NewInt(0)
331-
} else {
332-
beaconBalances[i] = eth.GweiToWei(float64(validator.Balance))
333-
}
334-
}
335-
err = rpstate.CalculateCompleteMinipoolShares(m.rp, contracts, mpds, beaconBalances)
336-
if err != nil {
337-
return nil, err
338-
}
339-
state.MinipoolValidatorDetails = statusMap
340-
m.logLine("7/7 - Calculated complete node and user balance shares (total time: %s)", time.Since(start))
341-
342-
return state, nil
343-
}
344-
345-
// Creates a snapshot of the Rocket Pool network, but only for a single node
346-
func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeAddress common.Address) (*NetworkState, error) {
347-
steps := 7
348-
349-
// Get the execution block for the given slot
350-
beaconBlock, exists, err := m.bc.GetBeaconBlock(fmt.Sprintf("%d", slotNumber))
351-
if err != nil {
352-
return nil, fmt.Errorf("error getting Beacon block for slot %d: %w", slotNumber, err)
353-
}
354-
if !exists {
355-
return nil, fmt.Errorf("slot %d did not have a Beacon block", slotNumber)
356-
}
357-
358-
// Get the corresponding block on the EL
359-
elBlockNumber := beaconBlock.ExecutionBlockNumber
360-
opts := &bind.CallOpts{
361-
BlockNumber: big.NewInt(0).SetUint64(elBlockNumber),
362-
}
363-
364-
beaconConfig, err := m.getBeaconConfig()
365-
if err != nil {
366-
return nil, fmt.Errorf("error getting Beacon config: %w", err)
367-
}
368-
369-
// Create the state wrapper
370-
state := &NetworkState{
371-
NodeDetailsByAddress: map[common.Address]*rpstate.NativeNodeDetails{},
372-
MinipoolDetailsByAddress: map[common.Address]*rpstate.NativeMinipoolDetails{},
373-
MinipoolDetailsByNode: map[common.Address][]*rpstate.NativeMinipoolDetails{},
374-
BeaconSlotNumber: slotNumber,
375-
ElBlockNumber: elBlockNumber,
376-
BeaconConfig: *beaconConfig,
377-
}
378-
379-
m.logLine("Getting network state for EL block %d, Beacon slot %d", elBlockNumber, slotNumber)
380-
start := time.Now()
381-
382-
// Network contracts and details
383-
contracts, err := rpstate.NewNetworkContracts(m.rp, m.multicaller, m.balanceBatcher, opts)
384-
if err != nil {
385-
return nil, fmt.Errorf("error getting network contracts: %w", err)
386-
}
387-
state.NetworkDetails, err = rpstate.NewNetworkDetails(m.rp, contracts)
388-
if err != nil {
389-
return nil, fmt.Errorf("error getting network details: %w", err)
390-
}
391-
m.logLine("1/%d - Retrieved network details (%s so far)", steps, time.Since(start))
392-
393-
// Node details
394-
nodeDetails, err := rpstate.GetNativeNodeDetails(m.rp, contracts, nodeAddress)
395-
if err != nil {
396-
return nil, fmt.Errorf("error getting node details: %w", err)
397-
}
398-
state.NodeDetails = []rpstate.NativeNodeDetails{nodeDetails}
399-
m.logLine("2/%d - Retrieved node details (%s so far)", steps, time.Since(start))
400-
401-
// Minipool details
402-
state.MinipoolDetails, err = rpstate.GetNodeNativeMinipoolDetails(m.rp, contracts, nodeAddress)
403-
if err != nil {
404-
return nil, fmt.Errorf("error getting all minipool details: %w", err)
405-
}
406-
m.logLine("3/%d - Retrieved minipool details (%s so far)", steps, time.Since(start))
407-
408-
// Create the node lookup
409-
for i, details := range state.NodeDetails {
410-
state.NodeDetailsByAddress[details.NodeAddress] = &state.NodeDetails[i]
411-
}
412-
413-
// Create the minipool lookups
414-
pubkeys := make([]types.ValidatorPubkey, 0, len(state.MinipoolDetails))
415-
emptyPubkey := types.ValidatorPubkey{}
416-
for i, details := range state.MinipoolDetails {
417-
state.MinipoolDetailsByAddress[details.MinipoolAddress] = &state.MinipoolDetails[i]
418-
if details.Pubkey != emptyPubkey {
419-
pubkeys = append(pubkeys, details.Pubkey)
420-
}
421-
422-
// The map of nodes to minipools
423-
nodeList, exists := state.MinipoolDetailsByNode[details.NodeAddress]
424-
if !exists {
425-
nodeList = []*rpstate.NativeMinipoolDetails{}
426-
}
427-
nodeList = append(nodeList, &state.MinipoolDetails[i])
428-
state.MinipoolDetailsByNode[details.NodeAddress] = nodeList
429-
}
430-
431-
// Calculate avg node fees and distributor shares
432-
for _, details := range state.NodeDetails {
433-
details.CalculateAverageFeeAndDistributorShares(state.MinipoolDetailsByNode[details.NodeAddress])
434-
}
435-
436-
// Get the total network effective RPL stake
437-
currentStep := 4
342+
currentStep++
343+
m.logLine("%d/%d - Retrieved Oracle DAO details (%s so far)", currentStep, steps, time.Since(start))
438344

439345
// Get the validator stats from Beacon
440346
statusMap, err := m.bc.GetValidatorStatuses(pubkeys, &beacon.ValidatorStatusOptions{
@@ -444,8 +350,8 @@ func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeA
444350
return nil, err
445351
}
446352
state.MinipoolValidatorDetails = statusMap
447-
m.logLine("%d/%d - Retrieved validator details (total time: %s)", currentStep, steps, time.Since(start))
448353
currentStep++
354+
m.logLine("%d/%d - Retrieved validator details (%s so far)", currentStep, steps, time.Since(start))
449355

450356
// Get the complete node and user shares
451357
mpds := make([]*rpstate.NativeMinipoolDetails, len(state.MinipoolDetails))
@@ -464,65 +370,16 @@ func (m *NetworkStateManager) createNetworkStateForNode(slotNumber uint64, nodeA
464370
return nil, err
465371
}
466372
state.MinipoolValidatorDetails = statusMap
467-
m.logLine("%d/%d - Calculated complete node and user balance shares (total time: %s)", currentStep, steps, time.Since(start))
468373
currentStep++
374+
m.logLine("%d/%d - Calculated complete node and user balance shares (%s so far)", currentStep, steps, time.Since(start))
469375

470-
// Get the protocol DAO proposals
376+
// Protocol DAO proposals
471377
state.ProtocolDaoProposalDetails, err = rpstate.GetAllProtocolDaoProposalDetails(m.rp, contracts)
472378
if err != nil {
473379
return nil, fmt.Errorf("error getting Protocol DAO proposal details: %w", err)
474380
}
475-
m.logLine("%d/%d - Retrieved Protocol DAO proposals (total time: %s)", currentStep, steps, time.Since(start))
476-
currentStep++
477-
478-
state.MegapoolValidatorGlobalIndex, err = rpstate.GetAllMegapoolValidators(m.rp, contracts)
479-
if err != nil {
480-
return nil, fmt.Errorf("error getting all megapool validator details: %w", err)
481-
}
482-
483-
megapoolValidatorPubkeys := make([]types.ValidatorPubkey, 0, len(state.MegapoolValidatorGlobalIndex))
484-
megapoolAddressMap := make(map[common.Address][]types.ValidatorPubkey)
485-
megapoolValidatorInfo := make(map[types.ValidatorPubkey]*megapool.ValidatorInfoFromGlobalIndex)
486-
for i := range state.MegapoolValidatorGlobalIndex {
487-
validator := &state.MegapoolValidatorGlobalIndex[i]
488-
if len(validator.Pubkey) > 0 {
489-
pubkey := types.ValidatorPubkey(validator.Pubkey)
490-
megapoolAddressMap[validator.MegapoolAddress] = append(megapoolAddressMap[validator.MegapoolAddress], pubkey)
491-
megapoolValidatorPubkeys = append(megapoolValidatorPubkeys, pubkey)
492-
megapoolValidatorInfo[pubkey] = validator
493-
}
494-
}
495-
state.MegapoolToPubkeysMap = megapoolAddressMap
496-
state.MegapoolValidatorInfo = megapoolValidatorInfo
497-
498-
megapoolAddresses := make([]common.Address, 0, len(megapoolAddressMap))
499-
for addr := range megapoolAddressMap {
500-
megapoolAddresses = append(megapoolAddresses, addr)
501-
}
502-
503-
// Fetch beacon validator statuses and EL megapool details in parallel
504-
var megapoolWg errgroup.Group
505-
megapoolWg.Go(func() error {
506-
statusMap, err := m.bc.GetValidatorStatuses(megapoolValidatorPubkeys, &beacon.ValidatorStatusOptions{
507-
Slot: &slotNumber,
508-
})
509-
if err != nil {
510-
return err
511-
}
512-
state.MegapoolValidatorDetails = statusMap
513-
return nil
514-
})
515-
megapoolWg.Go(func() error {
516-
var err error
517-
state.MegapoolDetails, err = rpstate.GetBulkMegapoolDetails(m.rp, contracts, megapoolAddresses)
518-
return err
519-
})
520-
if err := megapoolWg.Wait(); err != nil {
521-
return nil, fmt.Errorf("error getting megapool details: %w", err)
522-
}
523-
m.logLine("%d/%d - Retrieved megapool validator details (total time: %s)", currentStep, steps, time.Since(start))
524-
525381
currentStep++
382+
m.logLine("%d/%d - Retrieved Protocol DAO proposals (total time: %s)", currentStep, steps, time.Since(start))
526383

527384
return state, nil
528385
}

0 commit comments

Comments
 (0)