Skip to content

feat(val): Reconstructed Validator Attestation Service to construct SingleAttestation directly #7649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions consensus/types/src/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,28 @@ impl SingleAttestation {
})
}
}

pub fn empty_for_signing(
committee_index: u64,
attester_index: u64,
slot: Slot,
beacon_block_root: Hash256,
source: Checkpoint,
target: Checkpoint,
) -> Self {
Self {
committee_index,
attester_index,
data: AttestationData {
slot,
index: committee_index,
beacon_block_root,
source,
target,
},
signature: AggregateSignature::infinity(),
}
}
}

#[cfg(test)]
Expand Down
94 changes: 94 additions & 0 deletions validator_client/lighthouse_validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use types::{
SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData,
VoluntaryExit,
};
use types::{AggregateSignature, SingleAttestation};
use validator_store::{
DoppelgangerStatus, Error as ValidatorStoreError, ProposalData, SignedBlock, UnsignedBlock,
ValidatorStore,
Expand Down Expand Up @@ -840,6 +841,99 @@ impl<T: SlotClock + 'static, E: EthSpec> ValidatorStore for LighthouseValidatorS
}
}

async fn sign_single_attestation(
&self,
validator_pubkey: PublicKeyBytes,
single_attestation: &mut SingleAttestation,
current_epoch: Epoch,
) -> Result<(), Error> {
// Make sure the target epoch is not higher than the current epoch to avoid potential attacks.
if single_attestation.data.target.epoch > current_epoch {
return Err(Error::GreaterThanCurrentEpoch {
epoch: single_attestation.data.target.epoch,
current_epoch,
});
}

// Get the signing method and check doppelganger protection.
let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?;

// Checking for slashing conditions.
let signing_epoch = single_attestation.data.target.epoch;
let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch);
let domain_hash = signing_context.domain_hash(&self.spec);

let slashing_status = if signing_method
.requires_local_slashing_protection(self.enable_web3signer_slashing_protection)
{
self.slashing_protection.check_and_insert_attestation(
&validator_pubkey,
&single_attestation.data,
domain_hash,
)
} else {
Ok(Safe::Valid)
};

match slashing_status {
// We can safely sign this attestation.
Ok(Safe::Valid) => {
let signature = signing_method
.get_signature::<E, BlindedPayload<E>>(
SignableMessage::AttestationData(&single_attestation.data),
signing_context,
&self.spec,
&self.task_executor,
)
.await?;

// Create an aggregate signature from the individual signature
let mut aggregate_signature = AggregateSignature::empty();
aggregate_signature.add_assign(&signature);
single_attestation.signature = aggregate_signature;

validator_metrics::inc_counter_vec(
&validator_metrics::SIGNED_ATTESTATIONS_TOTAL,
&[validator_metrics::SUCCESS],
);

Ok(())
}
Ok(Safe::SameData) => {
warn!("Skipping signing of previously signed attestation");
validator_metrics::inc_counter_vec(
&validator_metrics::SIGNED_ATTESTATIONS_TOTAL,
&[validator_metrics::SAME_DATA],
);
Err(Error::SameData)
}
Err(NotSafe::UnregisteredValidator(pk)) => {
warn!(
msg = "Carefully consider running with --init-slashing-protection (see --help)",
public_key = format!("{:?}", pk),
"Not signing attestation for unregistered validator"
);
validator_metrics::inc_counter_vec(
&validator_metrics::SIGNED_ATTESTATIONS_TOTAL,
&[validator_metrics::UNREGISTERED],
);
Err(Error::Slashable(NotSafe::UnregisteredValidator(pk)))
}
Err(e) => {
crit!(
attestation = format!("{:?}", single_attestation.data),
error = format!("{:?}", e),
"Not signing slashable attestation"
);
validator_metrics::inc_counter_vec(
&validator_metrics::SIGNED_ATTESTATIONS_TOTAL,
&[validator_metrics::SLASHABLE],
);
Err(Error::Slashable(e))
}
}
}

async fn sign_validator_registration_data(
&self,
validator_registration_data: ValidatorRegistrationData,
Expand Down
91 changes: 32 additions & 59 deletions validator_client/validator_services/src/attestation_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use task_executor::TaskExecutor;
use tokio::time::{sleep, sleep_until, Duration, Instant};
use tracing::{debug, error, info, trace, warn};
use tree_hash::TreeHash;
use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot};
use types::{AttestationData, ChainSpec, CommitteeIndex, EthSpec, SingleAttestation, Slot};
use validator_store::{Error as ValidatorStoreError, ValidatorStore};

/// Builds an `AttestationService`.
Expand Down Expand Up @@ -379,41 +379,23 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
return None;
}

let mut attestation = match Attestation::empty_for_signing(
let mut single_attestation = SingleAttestation::empty_for_signing(
duty.committee_index,
duty.committee_length as usize,
duty.validator_index,
attestation_data.slot,
attestation_data.beacon_block_root,
attestation_data.source,
attestation_data.target,
&self.chain_spec,
) {
Ok(attestation) => attestation,
Err(err) => {
crit!(
validator = ?duty.pubkey,
?duty,
?err,
"Invalid validator duties during signing"
);
return None;
}
};
);

// Sign the SingleAttestation
match self
.validator_store
.sign_attestation(
duty.pubkey,
duty.validator_committee_index as usize,
&mut attestation,
current_epoch,
)
.sign_single_attestation(duty.pubkey, &mut single_attestation, current_epoch)
.await
{
Ok(()) => Some((attestation, duty.validator_index)),
Ok(()) => Some(single_attestation),
Err(ValidatorStoreError::UnknownPubkey(pubkey)) => {
// A pubkey can be missing when a validator was recently
// removed via the API.
warn!(
info = "a validator may have recently been removed from this VC",
pubkey = ?pubkey,
Expand All @@ -438,59 +420,50 @@ impl<S: ValidatorStore + 'static, T: SlotClock + 'static> AttestationService<S,
});

// Execute all the futures in parallel, collecting any successful results.
let (ref attestations, ref validator_indices): (Vec<_>, Vec<_>) = join_all(signing_futures)
let single_attestations: Vec<SingleAttestation> = join_all(signing_futures)
.await
.into_iter()
.flatten()
.unzip();
.collect();

if attestations.is_empty() {
if single_attestations.is_empty() {
warn!("No attestations were published");
return Ok(None);
}

// Extract validator indices BEFORE moving single_attestations into the closure
let validator_indices: Vec<u64> = single_attestations
.iter()
.map(|a| a.attester_index)
.collect();

let fork_name = self
.chain_spec
.fork_name_at_slot::<S::E>(attestation_data.slot);

// Clone single_attestations before using it in the closure
let attestations_to_send = single_attestations.clone();

// Post the attestations to the BN.
match self
.beacon_nodes
.request(ApiTopic::Attestations, |beacon_node| async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_POST],
);

let single_attestations = attestations
.iter()
.zip(validator_indices)
.filter_map(|(a, i)| {
match a.to_single_attestation_with_attester_index(*i) {
Ok(a) => Some(a),
Err(e) => {
// This shouldn't happen unless BN and VC are out of sync with
// respect to the Electra fork.
error!(
error = ?e,
committee_index = attestation_data.index,
slot = slot.as_u64(),
"type" = "unaggregated",
"Unable to convert to SingleAttestation"
);
None
}
}
})
.collect::<Vec<_>>();
.request(ApiTopic::Attestations, |beacon_node| {
let attestations = attestations_to_send.clone();
async move {
let _timer = validator_metrics::start_timer_vec(
&validator_metrics::ATTESTATION_SERVICE_TIMES,
&[validator_metrics::ATTESTATIONS_HTTP_POST],
);

beacon_node
.post_beacon_pool_attestations_v2::<S::E>(single_attestations, fork_name)
.await
beacon_node
.post_beacon_pool_attestations_v2::<S::E>(attestations, fork_name)
.await
}
})
.await
{
Ok(()) => info!(
count = attestations.len(),
count = single_attestations.len(),
validator_indices = ?validator_indices,
head_block = ?attestation_data.beacon_block_root,
committee_index = attestation_data.index,
Expand Down
14 changes: 12 additions & 2 deletions validator_client/validator_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use std::sync::Arc;
use types::{
Address, Attestation, AttestationError, BlindedBeaconBlock, Epoch, EthSpec, Graffiti, Hash256,
PublicKeyBytes, SelectionProof, Signature, SignedAggregateAndProof, SignedBlindedBeaconBlock,
SignedContributionAndProof, SignedValidatorRegistrationData, Slot, SyncCommitteeContribution,
SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId, ValidatorRegistrationData,
SignedContributionAndProof, SignedValidatorRegistrationData, SingleAttestation, Slot,
SyncCommitteeContribution, SyncCommitteeMessage, SyncSelectionProof, SyncSubnetId,
ValidatorRegistrationData,
};

#[derive(Debug, PartialEq, Clone)]
Expand All @@ -20,6 +21,7 @@ pub enum Error<T> {
GreaterThanCurrentSlot { slot: Slot, current_slot: Slot },
GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch },
UnableToSignAttestation(AttestationError),
ValidatorNotEnabled(PublicKeyBytes),
SpecificError(T),
}

Expand Down Expand Up @@ -169,6 +171,14 @@ pub trait ValidatorStore: Send + Sync {
/// `ProposalData` fields include defaulting logic described in `get_fee_recipient_defaulting`,
/// `get_gas_limit_defaulting`, and `get_builder_proposals_defaulting`.
fn proposal_data(&self, pubkey: &PublicKeyBytes) -> Option<ProposalData>;
/// Signs a SingleAttestation for a given validator.
///
fn sign_single_attestation(
&self,
validator_pubkey: PublicKeyBytes,
single_attestation: &mut SingleAttestation,
current_epoch: Epoch,
) -> impl Future<Output = Result<(), Error<Self::Error>>> + Send;
}

#[derive(Debug)]
Expand Down