Skip to content

feat: Optimise HDiffBuffer by using Arc instead of Vec #7675

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: unstable
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions beacon_node/store/src/hdiff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::cmp::Ordering;
use std::io::{Read, Write};
use std::ops::RangeInclusive;
use std::str::FromStr;
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};
use superstruct::superstruct;
use types::historical_summary::HistoricalSummary;
use types::{BeaconState, ChainSpec, Epoch, EthSpec, Hash256, List, Slot, Validator};
Expand Down Expand Up @@ -100,7 +100,7 @@ pub struct HDiffBuffer {
state: Vec<u8>,
balances: Vec<u64>,
inactivity_scores: Vec<u64>,
validators: Vec<Validator>,
validators: Vec<Arc<Validator>>,
historical_roots: Vec<Hash256>,
historical_summaries: Vec<HistoricalSummary>,
}
Expand Down Expand Up @@ -183,7 +183,11 @@ impl HDiffBuffer {
// is post altair, all its items will show up in the diff as is.
vec![]
};
let validators = std::mem::take(beacon_state.validators_mut()).to_vec();
let validators = std::mem::take(beacon_state.validators_mut())
.iter()
.cloned()
.map(Arc::new)
Comment on lines +188 to +189
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe OK, but we could make it even better if we added an iter_arc method to List. In the case of validators, they are not packed in leaves, and are stored behind an Arc:

https://github.com/sigp/milhouse/blob/6da903ce8f783295714a68de4a2dcfc8b7c6ee01/src/leaf.rs#L17

Would you be interested in making a PR to milhouse to add this? I think the API would have to consider the case where T is packed, and return an error in this case. Something like:

pub fn iter_arc(&self) -> Result<impl Iterator<Item = &Arc<T>>, Error> {
   if T::tree_hash_type() == TreeHashType::Basic {
       // Can't return `Arc`s for packed leaves.
       return Err(Error::PackedLeavesNoArc);
   }
   // TODO
   Ok(iterator)
}

I'm happy to help advise on the implementation. Take a look at the other iterators in:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! I will look into it and try to implement the iter_arc method to List. Leaving a comment on the issue in Milhouse

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

.collect::<Vec<_>>();
let historical_roots = std::mem::take(beacon_state.historical_roots_mut()).to_vec();
let historical_summaries =
if let Ok(historical_summaries) = beacon_state.historical_summaries_mut() {
Expand Down Expand Up @@ -221,8 +225,9 @@ impl HDiffBuffer {
.map_err(|_| Error::InvalidBalancesLength)?;
}

*state.validators_mut() = List::try_from_iter(self.validators.iter().cloned())
.map_err(|_| Error::InvalidBalancesLength)?;
*state.validators_mut() =
List::try_from_iter(self.validators.iter().map(|arc| (**arc).clone()))
.map_err(|_| Error::InvalidBalancesLength)?;

*state.historical_roots_mut() = List::try_from_iter(self.historical_roots.iter().copied())
.map_err(|_| Error::InvalidBalancesLength)?;
Expand Down Expand Up @@ -284,6 +289,7 @@ impl HDiff {
.apply(&mut source.inactivity_scores, config)?;
self.validators_diff()
.apply(&mut source.validators, config)?;

self.historical_roots().apply(&mut source.historical_roots);
self.historical_summaries()
.apply(&mut source.historical_summaries);
Expand Down Expand Up @@ -446,8 +452,8 @@ fn uncompress_bytes(input: &[u8], config: &StoreConfig) -> Result<Vec<u8>, Error

impl ValidatorsDiff {
pub fn compute(
xs: &[Validator],
ys: &[Validator],
xs: &[Arc<Validator>],
ys: &[Arc<Validator>],
config: &StoreConfig,
) -> Result<Self, Error> {
if xs.len() > ys.len() {
Expand All @@ -457,8 +463,10 @@ impl ValidatorsDiff {
let uncompressed_bytes = ys
.iter()
.enumerate()
.filter_map(|(i, y)| {
let validator_diff = if let Some(x) = xs.get(i) {
.filter_map(|(i, y_arc)| {
let y = &**y_arc;
let validator_diff = if let Some(x_arc) = xs.get(i) {
let x = &**x_arc;
if y == x {
return None;
} else {
Expand Down Expand Up @@ -538,7 +546,7 @@ impl ValidatorsDiff {
})
}

pub fn apply(&self, xs: &mut Vec<Validator>, config: &StoreConfig) -> Result<(), Error> {
pub fn apply(&self, xs: &mut Vec<Arc<Validator>>, config: &StoreConfig) -> Result<(), Error> {
let validator_diff_bytes = uncompress_bytes(&self.bytes, config)?;

for diff_bytes in
Expand All @@ -550,37 +558,39 @@ impl ValidatorsDiff {
} = ValidatorDiffEntry::from_ssz_bytes(diff_bytes)
.map_err(|_| Error::BalancesIncompleteChunk)?;

if let Some(x) = xs.get_mut(index as usize) {
if let Some(x_arc) = xs.get_mut(index as usize) {
let mut v = (**x_arc).clone();
// Note: a pubkey change implies index re-use. In that case over-write
// withdrawal_credentials and slashed inconditionally as their default values
// are valid values.
let pubkey_changed = diff.pubkey != *EMPTY_PUBKEY;
if pubkey_changed {
x.pubkey = diff.pubkey;
v.pubkey = diff.pubkey;
}
if pubkey_changed || diff.withdrawal_credentials != Hash256::ZERO {
x.withdrawal_credentials = diff.withdrawal_credentials;
v.withdrawal_credentials = diff.withdrawal_credentials;
}
if diff.effective_balance != 0 {
x.effective_balance = x.effective_balance.wrapping_add(diff.effective_balance);
v.effective_balance = v.effective_balance.wrapping_add(diff.effective_balance);
}
if pubkey_changed || diff.slashed {
x.slashed = diff.slashed;
v.slashed = diff.slashed;
}
if diff.activation_eligibility_epoch != Epoch::new(0) {
x.activation_eligibility_epoch = diff.activation_eligibility_epoch;
v.activation_eligibility_epoch = diff.activation_eligibility_epoch;
}
if diff.activation_epoch != Epoch::new(0) {
x.activation_epoch = diff.activation_epoch;
v.activation_epoch = diff.activation_epoch;
}
if diff.exit_epoch != Epoch::new(0) {
x.exit_epoch = diff.exit_epoch;
v.exit_epoch = diff.exit_epoch;
}
if diff.withdrawable_epoch != Epoch::new(0) {
x.withdrawable_epoch = diff.withdrawable_epoch;
v.withdrawable_epoch = diff.withdrawable_epoch;
}
*x_arc = Arc::new(v);
} else {
xs.push(diff)
xs.push(Arc::new(diff))
}
}

Expand Down Expand Up @@ -918,12 +928,12 @@ mod tests {
let config = &StoreConfig::default();
let xs = (0..10)
.map(|_| rand_validator(&mut rng))
.map(Arc::new)
.collect::<Vec<_>>();
let mut ys = xs.clone();
ys[5] = rand_validator(&mut rng);
ys.push(rand_validator(&mut rng));
ys[5] = Arc::new(rand_validator(&mut rng));
ys.push(Arc::new(rand_validator(&mut rng)));
let diff = ValidatorsDiff::compute(&xs, &ys, config).unwrap();

let mut xs_out = xs.clone();
diff.apply(&mut xs_out, config).unwrap();
assert_eq!(xs_out, ys);
Expand Down Expand Up @@ -959,7 +969,10 @@ mod tests {
let pre_inactivity_scores = vec![1, 1, 1];
let post_inactivity_scores = vec![0, 0, 0, 1];

let pre_validators = (0..3).map(|_| rand_validator(&mut rng)).collect::<Vec<_>>();
let pre_validators = (0..3)
.map(|_| rand_validator(&mut rng))
.map(Arc::new)
.collect::<Vec<_>>();
let post_validators = pre_validators.clone();

let pre_historical_roots = vec![Hash256::repeat_byte(0xff)];
Expand Down