Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 50 additions & 76 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
"bytes"
"context"
"encoding/hex"
"encoding/json"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -42,9 +41,7 @@
"github.com/obolnetwork/charon/app/tracer"
"github.com/obolnetwork/charon/app/version"
"github.com/obolnetwork/charon/app/z"
clusterpkg "github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/cluster/manifest"
manifestpb "github.com/obolnetwork/charon/cluster/manifestpb/v1"
"github.com/obolnetwork/charon/cluster"
"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/aggsigdb"
"github.com/obolnetwork/charon/core/bcast"
Expand Down Expand Up @@ -73,8 +70,8 @@
P2P p2p.Config
Log log.Config
Feature featureset.Config
LockFile string
ManifestFile string
LockFile string
NoVerify bool
PrivKeyFile string
PrivKeyLocking bool
Expand Down Expand Up @@ -118,7 +115,7 @@
p2p.TestPingConfig

// Lock provides the lock explicitly, skips loading from disk.
Lock *clusterpkg.Lock
Lock *cluster.Lock
// P2PKey provides the p2p privkey explicitly, skips loading from keystore on disk.
P2PKey *k1.PrivateKey
// ParSigExFunc provides an in-memory partial signature exchange.
Expand Down Expand Up @@ -177,19 +174,18 @@
eth1Cl := eth1wrap.NewDefaultEthClientRunner(conf.ExecutionEngineAddr)
life.RegisterStart(lifecycle.AsyncAppCtx, lifecycle.StartEth1Client, lifecycle.HookFuncCtx(eth1Cl.Run))

cluster, err := loadClusterManifest(ctx, conf, eth1Cl)
lock, err := loadClusterLock(ctx, conf, eth1Cl)
if err != nil {
return err
}

clusterHash := cluster.GetInitialMutationHash()
core.SetClusterHash(clusterHash)
core.SetClusterHash(lock.LockHash)

if err := wireTracing(life, conf, clusterHash); err != nil {
if err := wireTracing(life, conf, lock.LockHash); err != nil {
return err
}

network, err := eth2util.ForkVersionToNetwork(cluster.GetForkVersion())
network, err := eth2util.ForkVersionToNetwork(lock.ForkVersion)
if err != nil {
network = "unknown"
}
Expand All @@ -212,7 +208,7 @@
}
}

peers, err := manifest.ClusterPeers(cluster)
peers, err := lock.Peers()
if err != nil {
return err
}
Expand All @@ -221,14 +217,14 @@
return err
}

lockHashHex := Hex7(cluster.GetInitialMutationHash())
lockHashHex := Hex7(lock.LockHash)

p2pNode, err := wireP2P(ctx, life, conf, cluster, p2pKey, lockHashHex)
p2pNode, err := wireP2P(ctx, life, conf, lock, p2pKey, lockHashHex, lock.UUID)
if err != nil {
return err
}

nodeIdx, err := manifest.ClusterNodeIdx(cluster, p2pNode.ID())
nodeIdx, err := lock.NodeIdx(p2pNode.ID())
if err != nil {
return errors.Wrap(err, "private key not matching cluster manifest file")
}
Expand All @@ -242,16 +238,16 @@
z.Str("peer_name", p2p.PeerName(p2pNode.ID())),
z.Str("nickname", conf.Nickname),
z.Int("peer_index", nodeIdx.PeerIdx),
z.Str("cluster_name", cluster.GetName()),
z.Str("cluster_name", lock.Name),
z.Str("cluster_hash", lockHashHex),
z.Str("cluster_hash_full", hex.EncodeToString(cluster.GetInitialMutationHash())),
z.Str("cluster_hash_full", hex.EncodeToString(lock.LockHash)),
z.Str("enr", enrRec.String()),
z.Int("peers", len(cluster.GetOperators())))
z.Int("peers", len(lock.Operators)))

// Metric and logging labels.
labels := map[string]string{
"cluster_hash": lockHashHex,
"cluster_name": cluster.GetName(),
"cluster_name": lock.Name,
"cluster_peer": p2p.PeerName(p2pNode.ID()),
"nickname": conf.Nickname,
"cluster_network": network,
Expand All @@ -264,9 +260,9 @@
return err
}

initStartupMetrics(p2p.PeerName(p2pNode.ID()), int(cluster.GetThreshold()), len(cluster.GetOperators()), len(cluster.GetValidators()), network)
initStartupMetrics(p2p.PeerName(p2pNode.ID()), lock.Threshold, len(lock.Operators), len(lock.Validators), network)

eth2Cl, subEth2Cl, err := newETH2Client(ctx, conf, life, cluster, cluster.GetForkVersion(), conf.BeaconNodeTimeout, conf.BeaconNodeSubmitTimeout)
eth2Cl, subEth2Cl, err := newETH2Client(ctx, conf, life, lock, lock.ForkVersion, conf.BeaconNodeTimeout, conf.BeaconNodeSubmitTimeout)
if err != nil {
return err
}
Expand All @@ -276,7 +272,7 @@
return err
}

peerIDs, err := manifest.ClusterPeerIDs(cluster)
peerIDs, err := lock.PeerIDs()
if err != nil {
return err
}
Expand All @@ -292,7 +288,7 @@
return errors.New("nickname cannot exceed 32 characters")
}

wirePeerInfo(life, p2pNode, peerIDs, cluster.GetInitialMutationHash(), sender, conf.BuilderAPI, conf.Nickname)
wirePeerInfo(life, p2pNode, peerIDs, lock.LockHash, sender, conf.BuilderAPI, conf.Nickname)

// seenPubkeys channel to send seen public keys from validatorapi to monitoringapi.
seenPubkeys := make(chan core.PubKey)
Expand All @@ -311,17 +307,17 @@
}
}

pubkeys, err := getDVPubkeys(cluster)
pubkeys, err := getDVPubkeys(lock)
if err != nil {
return err
}

consensusDebugger := consensus.NewDebugger()

wireMonitoringAPI(ctx, life, conf.MonitoringAddr, conf.DebugAddr, p2pNode, eth2Cl, peerIDs,
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(cluster.GetValidators()))
promRegistry, consensusDebugger, pubkeys, seenPubkeys, vapiCalls, len(lock.Validators))

err = wireCoreWorkflow(ctx, life, conf, cluster, nodeIdx, p2pNode, p2pKey, eth2Cl, subEth2Cl,
err = wireCoreWorkflow(ctx, life, conf, lock, nodeIdx, p2pNode, p2pKey, eth2Cl, subEth2Cl,
peerIDs, sender, consensusDebugger, pubkeys, seenPubkeysFunc, sseListener, vapiCallsFunc)
if err != nil {
return err
Expand All @@ -340,14 +336,14 @@

// wireP2P constructs the p2p tcp or udp (libp2p) nodes and registers it with the life cycle manager.
func wireP2P(ctx context.Context, life *lifecycle.Manager, conf Config,
cluster *manifestpb.Cluster, p2pKey *k1.PrivateKey, lockHashHex string,
lock *cluster.Lock, p2pKey *k1.PrivateKey, lockHashHex, uuid string,
) (host.Host, error) {
peerIDs, err := manifest.ClusterPeerIDs(cluster)
peerIDs, err := lock.PeerIDs()
if err != nil {
return nil, err
}

relays, err := p2p.NewRelays(ctx, conf.P2P.Relays, lockHashHex)
relays, err := p2p.NewRelays(ctx, conf.P2P.Relays, lockHashHex, uuid)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -398,34 +394,30 @@

// wireCoreWorkflow wires the core workflow components.
func wireCoreWorkflow(ctx context.Context, life *lifecycle.Manager, conf Config,
cluster *manifestpb.Cluster, nodeIdx clusterpkg.NodeIdx, p2pNode host.Host, p2pKey *k1.PrivateKey,
lock *cluster.Lock, nodeIdx cluster.NodeIdx, p2pNode host.Host, p2pKey *k1.PrivateKey,
eth2Cl, submissionEth2Cl eth2wrap.Client, peerIDs []peer.ID, sender *p2p.Sender,
consensusDebugger consensus.Debugger, pubkeys []core.PubKey, seenPubkeys func(core.PubKey),
sseListener sse.Listener, vapiCalls func(),
) error {
// Convert and prep public keys and public shares
var (
builderRegistrations []clusterpkg.BuilderRegistration
builderRegistrations []cluster.BuilderRegistration
eth2Pubkeys []eth2p0.BLSPubKey
pubshares []eth2p0.BLSPubKey
allPubSharesByKey = make(map[core.PubKey]map[int]tbls.PublicKey) // map[pubkey]map[shareIdx]pubshare
feeRecipientAddrByCorePubkey = make(map[core.PubKey]string)
lockFeeRecipientAddresses = lock.FeeRecipientAddresses()
)

for _, val := range cluster.GetValidators() {
pubkey, err := manifest.ValidatorPublicKey(val)
if err != nil {
return err
}

corePubkey, err := core.PubKeyFromBytes(pubkey[:])
for vi, val := range lock.Validators {
corePubkey, err := core.PubKeyFromBytes(val.PubKey)
if err != nil {
return err
}

allPubShares := make(map[int]tbls.PublicKey)

for i, b := range val.GetPubShares() {
for i, b := range val.PubShares {
pubshare, err := tblsconv.PubkeyFromBytes(b)
if err != nil {
return err
Expand All @@ -435,29 +427,24 @@
allPubShares[i+1] = pubshare
}

pubShare, err := manifest.ValidatorPublicShare(val, nodeIdx.PeerIdx)
pubShare, err := val.PublicShare(nodeIdx.PeerIdx)
if err != nil {
return err
}

eth2Share := eth2p0.BLSPubKey(pubShare)

eth2Pubkey := eth2p0.BLSPubKey(pubkey)
eth2Pubkey := eth2p0.BLSPubKey(val.PubKey)

eth2Pubkeys = append(eth2Pubkeys, eth2Pubkey)
pubshares = append(pubshares, eth2Share)
allPubSharesByKey[corePubkey] = allPubShares
feeRecipientAddrByCorePubkey[corePubkey] = val.GetFeeRecipientAddress()

var builderRegistration clusterpkg.BuilderRegistration
if err := json.Unmarshal(val.GetBuilderRegistrationJson(), &builderRegistration); err != nil {
return errors.Wrap(err, "unmarshal builder registration")
}
feeRecipientAddrByCorePubkey[corePubkey] = lockFeeRecipientAddresses[vi]

builderRegistrations = append(builderRegistrations, builderRegistration)
builderRegistrations = append(builderRegistrations, val.BuilderRegistration)
}

peers, err := manifest.ClusterPeers(cluster)
peers, err := lock.Peers()
if err != nil {
return err
}
Expand Down Expand Up @@ -566,7 +553,7 @@

dutyDB := dutydb.NewMemDB(deadlinerFunc("dutydb"))

vapi, err := validatorapi.NewComponent(eth2Cl, allPubSharesByKey, nodeIdx.ShareIdx, feeRecipientFunc, conf.BuilderAPI, uint(cluster.GetTargetGasLimit()), seenPubkeys)
vapi, err := validatorapi.NewComponent(eth2Cl, allPubSharesByKey, nodeIdx.ShareIdx, feeRecipientFunc, conf.BuilderAPI, lock.TargetGasLimit, seenPubkeys)
if err != nil {
return err
}
Expand All @@ -575,7 +562,7 @@
return err
}

parSigDB := parsigdb.NewMemDB(int(cluster.GetThreshold()), deadlinerFunc("parsigdb"))
parSigDB := parsigdb.NewMemDB(lock.Threshold, deadlinerFunc("parsigdb"))

var parSigEx core.ParSigEx
if conf.TestConfig.ParSigExFunc != nil {
Expand All @@ -589,7 +576,7 @@
parSigEx = parsigex.NewParSigEx(p2pNode, sender.SendAsync, nodeIdx.PeerIdx, peerIDs, verifyFunc, gaterFunc)
}

sigAgg, err := sigagg.New(int(cluster.GetThreshold()), sigagg.NewVerifier(eth2Cl))
sigAgg, err := sigagg.New(lock.Threshold, sigagg.NewVerifier(eth2Cl))
if err != nil {
return err
}
Expand Down Expand Up @@ -624,9 +611,9 @@
coreConsensus := consensusController.CurrentConsensus() // initially points to DefaultConsensus()

// Priority protocol always uses QBFTv2.
err = wirePrioritise(ctx, conf, life, p2pNode, peerIDs, int(cluster.GetThreshold()),
err = wirePrioritise(ctx, conf, life, p2pNode, peerIDs, lock.Threshold,
sender.SendReceive, defaultConsensus, sched, p2pKey, deadlineFunc,
consensusController, cluster.GetConsensusProtocol())
consensusController, lock.ConsensusProtocol)
if err != nil {
return err
}
Expand Down Expand Up @@ -812,29 +799,21 @@
}

// eth2PubKeys returns a list of BLS pubkeys of validators in the cluster lock.
func eth2PubKeys(cluster *manifestpb.Cluster) ([]eth2p0.BLSPubKey, error) {
func eth2PubKeys(lock *cluster.Lock) []eth2p0.BLSPubKey {
var pubkeys []eth2p0.BLSPubKey

for _, val := range cluster.GetValidators() {
pubkey, err := manifest.ValidatorPublicKey(val)
if err != nil {
return []eth2p0.BLSPubKey{}, err
}

pk := eth2p0.BLSPubKey(pubkey)
for _, dv := range lock.Validators {
pk := eth2p0.BLSPubKey(dv.PubKey)
pubkeys = append(pubkeys, pk)
}

return pubkeys, nil
return pubkeys
}

// newETH2Client returns a new eth2client for the configured timeouts; it is either a beaconmock for
// simnet or a multi http client to a real beacon node.
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, cluster *manifestpb.Cluster, forkVersion []byte, bnTimeout time.Duration, submissionBnTimeout time.Duration) (eth2Cl eth2wrap.Client, submissionEth2Cl eth2wrap.Client, err error) {
pubkeys, err := eth2PubKeys(cluster)
if err != nil {
return nil, nil, err
}
func newETH2Client(ctx context.Context, conf Config, life *lifecycle.Manager, lock *cluster.Lock, forkVersion []byte, bnTimeout time.Duration, submissionBnTimeout time.Duration) (eth2Cl eth2wrap.Client, submissionEth2Cl eth2wrap.Client, err error) {

Check failure on line 815 in app/app.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 26 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZrFv4h2flGFhJuDeQmb&open=AZrFv4h2flGFhJuDeQmb&pullRequest=4130
pubkeys := eth2PubKeys(lock)

// Default to 1s slot duration if not set.
if conf.SimnetSlotDuration == 0 {
Expand Down Expand Up @@ -1124,16 +1103,11 @@
}

// getDVPubkeys returns DV public keys from given cluster.Lock.
func getDVPubkeys(cluster *manifestpb.Cluster) ([]core.PubKey, error) {
func getDVPubkeys(lock *cluster.Lock) ([]core.PubKey, error) {

Check warning on line 1106 in app/app.go

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the 'Get' prefix from this function name.

See more on https://sonarcloud.io/project/issues?id=ObolNetwork_charon&issues=AZrFv4h2flGFhJuDeQma&open=AZrFv4h2flGFhJuDeQma&pullRequest=4130
var pubkeys []core.PubKey

for _, val := range cluster.GetValidators() {
pk, err := manifest.ValidatorPublicKey(val)
if err != nil {
return nil, err
}

pubkey, err := core.PubKeyFromBytes(pk[:])
for _, dv := range lock.Validators {
pubkey, err := core.PubKeyFromBytes(dv.PubKey)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading