Skip to content

Commit 83a61db

Browse files
committed
Merge remote-tracking branch 'origin/unstable' into tracing-spans-test
# Conflicts: # beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs # beacon_node/beacon_chain/src/fetch_blobs/mod.rs # beacon_node/lighthouse_network/src/service/mod.rs
2 parents ec37ad9 + 3a02bdd commit 83a61db

File tree

29 files changed

+613
-301
lines changed

29 files changed

+613
-301
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ lint:
279279

280280
# Lints the code using Clippy and automatically fix some simple compiler warnings.
281281
lint-fix:
282-
EXTRA_CLIPPY_OPTS="--fix --allow-staged --allow-dirty" $(MAKE) lint
282+
EXTRA_CLIPPY_OPTS="--fix --allow-staged --allow-dirty" $(MAKE) lint-full
283283

284284
# Also run the lints on the optimized-only tests
285285
lint-full:

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2207,7 +2207,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
22072207
pub fn verify_data_column_sidecar_for_gossip(
22082208
self: &Arc<Self>,
22092209
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
2210-
subnet_id: u64,
2210+
subnet_id: DataColumnSubnetId,
22112211
) -> Result<GossipVerifiedDataColumn<T>, GossipDataColumnError> {
22122212
metrics::inc_counter(&metrics::DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS);
22132213
let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES);
@@ -3611,7 +3611,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
36113611

36123612
let availability = self
36133613
.data_availability_checker
3614-
.put_gossip_verified_data_columns(block_root, data_columns)?;
3614+
.put_gossip_verified_data_columns(block_root, slot, data_columns)?;
36153615

36163616
self.process_availability(slot, availability, publish_fn)
36173617
.await
@@ -3702,9 +3702,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37023702

37033703
// This slot value is purely informative for the consumers of
37043704
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
3705-
let availability = self
3706-
.data_availability_checker
3707-
.put_rpc_custody_columns(block_root, custody_columns)?;
3705+
let availability = self.data_availability_checker.put_rpc_custody_columns(
3706+
block_root,
3707+
slot,
3708+
custody_columns,
3709+
)?;
37083710

37093711
self.process_availability(slot, availability, || Ok(()))
37103712
.await
@@ -7149,6 +7151,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
71497151
roots.reverse();
71507152
roots
71517153
}
7154+
7155+
/// Returns a list of column indices that should be sampled for a given epoch.
7156+
/// Used for data availability sampling in PeerDAS.
7157+
pub fn sampling_columns_for_epoch(&self, epoch: Epoch) -> &[ColumnIndex] {
7158+
self.data_availability_checker
7159+
.custody_context()
7160+
.sampling_columns_for_epoch(epoch, &self.spec)
7161+
}
71527162
}
71537163

71547164
impl<T: BeaconChainTypes> Drop for BeaconChain<T> {

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 239 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use tracing::{debug, error, instrument};
2020
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
2121
use types::{
2222
BlobSidecarList, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, EthSpec, Hash256,
23-
RuntimeVariableList, SignedBeaconBlock,
23+
RuntimeVariableList, SignedBeaconBlock, Slot,
2424
};
2525

2626
mod error;
@@ -38,14 +38,20 @@ use crate::observed_data_sidecars::ObservationStrategy;
3838
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
3939
use types::non_zero_usize::new_non_zero_usize;
4040

41-
/// The LRU Cache stores `PendingComponents` which can store up to
42-
/// `MAX_BLOBS_PER_BLOCK = 6` blobs each. A `BlobSidecar` is 0.131256 MB. So
43-
/// the maximum size of a `PendingComponents` is ~ 0.787536 MB. Setting this
44-
/// to 1024 means the maximum size of the cache is ~ 0.8 GB. But the cache
45-
/// will target a size of less than 75% of capacity.
46-
pub const OVERFLOW_LRU_CAPACITY: NonZeroUsize = new_non_zero_usize(1024);
47-
/// Until tree-states is implemented, we can't store very many states in memory :(
48-
pub const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(2);
41+
/// The LRU Cache stores `PendingComponents`, which can store up to `MAX_BLOBS_PER_BLOCK` blobs each.
42+
///
43+
/// * Deneb blobs are 128 kb each and are stored in the form of `BlobSidecar`.
44+
/// * From Fulu (PeerDAS), blobs are erasure-coded and are 256 kb each, stored in the form of 128 `DataColumnSidecar`s.
45+
///
46+
/// With `MAX_BLOBS_PER_BLOCK` = 48 (expected in the next year), the maximum size of data columns
47+
/// in `PendingComponents` is ~12.29 MB. Setting this to 64 means the maximum size of the cache is
48+
/// approximately 0.8 GB.
49+
///
50+
/// Under normal conditions, the cache should only store the current pending block, but could
51+
/// occasionally spike to 2-4 for various reasons e.g. components arriving late, but would very
52+
/// rarely go above this, unless there are many concurrent forks.
53+
pub const OVERFLOW_LRU_CAPACITY: NonZeroUsize = new_non_zero_usize(64);
54+
pub const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32);
4955
pub const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get();
5056

5157
/// Cache to hold fully valid data that can't be imported to fork-choice yet. After Dencun hard-fork
@@ -76,7 +82,7 @@ pub struct DataAvailabilityChecker<T: BeaconChainTypes> {
7682
availability_cache: Arc<DataAvailabilityCheckerInner<T>>,
7783
slot_clock: T::SlotClock,
7884
kzg: Arc<Kzg>,
79-
custody_context: Arc<CustodyContext>,
85+
custody_context: Arc<CustodyContext<T::EthSpec>>,
8086
spec: Arc<ChainSpec>,
8187
}
8288

@@ -114,7 +120,7 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
114120
slot_clock: T::SlotClock,
115121
kzg: Arc<Kzg>,
116122
store: BeaconStore<T>,
117-
custody_context: Arc<CustodyContext>,
123+
custody_context: Arc<CustodyContext<T::EthSpec>>,
118124
spec: Arc<ChainSpec>,
119125
) -> Result<Self, AvailabilityCheckError> {
120126
let inner = DataAvailabilityCheckerInner::new(
@@ -132,8 +138,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
132138
})
133139
}
134140

135-
pub fn custody_context(&self) -> Arc<CustodyContext> {
136-
self.custody_context.clone()
141+
pub fn custody_context(&self) -> &Arc<CustodyContext<T::EthSpec>> {
142+
&self.custody_context
137143
}
138144

139145
/// Checks if the block root is currenlty in the availability cache awaiting import because
@@ -235,15 +241,25 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
235241
pub fn put_rpc_custody_columns(
236242
&self,
237243
block_root: Hash256,
244+
slot: Slot,
238245
custody_columns: DataColumnSidecarList<T::EthSpec>,
239246
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
240247
// Attributes fault to the specific peer that sent an invalid column
241248
let kzg_verified_columns =
242249
KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg)
243250
.map_err(AvailabilityCheckError::InvalidColumn)?;
244251

252+
// Filter out columns that aren't required for custody for this slot
253+
// This is required because `data_columns_by_root` requests the **latest** CGC that _may_
254+
// not be yet effective for data availability check, as CGC changes are only effecive from
255+
// a new epoch.
256+
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
257+
let sampling_columns = self
258+
.custody_context
259+
.sampling_columns_for_epoch(epoch, &self.spec);
245260
let verified_custody_columns = kzg_verified_columns
246261
.into_iter()
262+
.filter(|col| sampling_columns.contains(&col.index()))
247263
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
248264
.collect::<Vec<_>>();
249265

@@ -291,10 +307,16 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
291307
>(
292308
&self,
293309
block_root: Hash256,
310+
slot: Slot,
294311
data_columns: I,
295312
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
313+
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
314+
let sampling_columns = self
315+
.custody_context
316+
.sampling_columns_for_epoch(epoch, &self.spec);
296317
let custody_columns = data_columns
297318
.into_iter()
319+
.filter(|col| sampling_columns.contains(&col.index()))
298320
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
299321
.collect::<Vec<_>>();
300322

@@ -811,3 +833,207 @@ impl<E: EthSpec> MaybeAvailableBlock<E> {
811833
}
812834
}
813835
}
836+
837+
#[cfg(test)]
838+
mod test {
839+
use super::*;
840+
use crate::test_utils::{
841+
generate_rand_block_and_data_columns, get_kzg, EphemeralHarnessType, NumBlobs,
842+
};
843+
use crate::CustodyContext;
844+
use rand::prelude::StdRng;
845+
use rand::seq::SliceRandom;
846+
use rand::SeedableRng;
847+
use slot_clock::{SlotClock, TestingSlotClock};
848+
use std::collections::HashSet;
849+
use std::sync::Arc;
850+
use std::time::Duration;
851+
use store::HotColdDB;
852+
use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot};
853+
854+
type E = MainnetEthSpec;
855+
type T = EphemeralHarnessType<E>;
856+
857+
/// Test to verify any extra RPC columns received that are not part of the "effective" CGC for
858+
/// the slot are excluded from import.
859+
#[test]
860+
fn should_exclude_rpc_columns_not_required_for_sampling() {
861+
// SETUP
862+
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
863+
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
864+
865+
let da_checker = new_da_checker(spec.clone());
866+
let custody_context = &da_checker.custody_context;
867+
let all_column_indices_ordered =
868+
init_custody_context_with_ordered_columns(custody_context, &mut rng, &spec);
869+
870+
// GIVEN a single 32 ETH validator is attached slot 0
871+
let epoch = Epoch::new(0);
872+
let validator_0 = 0;
873+
custody_context.register_validators(
874+
vec![(validator_0, 32_000_000_000)],
875+
epoch.start_slot(E::slots_per_epoch()),
876+
&spec,
877+
);
878+
assert_eq!(
879+
custody_context.num_of_data_columns_to_sample(epoch, &spec),
880+
spec.validator_custody_requirement as usize,
881+
"sampling size should be the minimal custody requirement == 8"
882+
);
883+
884+
// WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
885+
let validator_1 = 1;
886+
let cgc_change_slot = epoch.end_slot(E::slots_per_epoch());
887+
custody_context.register_validators(
888+
vec![(validator_1, 32_000_000_000 * 9)],
889+
cgc_change_slot,
890+
&spec,
891+
);
892+
// AND custody columns (8) and any new extra columns (2) are received via RPC responses.
893+
// NOTE: block lookup uses the **latest** CGC (10) instead of the effective CGC (8) as the slot is unknown.
894+
let (_, data_columns) = generate_rand_block_and_data_columns::<E>(
895+
ForkName::Fulu,
896+
NumBlobs::Number(1),
897+
&mut rng,
898+
&spec,
899+
);
900+
let block_root = Hash256::random();
901+
let requested_columns = &all_column_indices_ordered[..10];
902+
da_checker
903+
.put_rpc_custody_columns(
904+
block_root,
905+
cgc_change_slot,
906+
data_columns
907+
.into_iter()
908+
.filter(|d| requested_columns.contains(&d.index))
909+
.collect(),
910+
)
911+
.expect("should put rpc custody columns");
912+
913+
// THEN the sampling size for the end slot of the same epoch remains unchanged
914+
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
915+
assert_eq!(
916+
sampling_columns.len(),
917+
spec.validator_custody_requirement as usize // 8
918+
);
919+
// AND any extra columns received via RPC responses are excluded from import.
920+
let actual_cached: HashSet<ColumnIndex> = da_checker
921+
.cached_data_column_indexes(&block_root)
922+
.expect("should have cached data columns")
923+
.into_iter()
924+
.collect();
925+
let expected_sampling_columns = sampling_columns.iter().copied().collect::<HashSet<_>>();
926+
assert_eq!(
927+
actual_cached, expected_sampling_columns,
928+
"should cache only the effective sampling columns"
929+
);
930+
assert!(
931+
actual_cached.len() < requested_columns.len(),
932+
"extra columns should be excluded"
933+
)
934+
}
935+
936+
/// Test to verify any extra gossip columns received that are not part of the "effective" CGC for
937+
/// the slot are excluded from import.
938+
#[test]
939+
fn should_exclude_gossip_columns_not_required_for_sampling() {
940+
// SETUP
941+
let spec = Arc::new(ForkName::Fulu.make_genesis_spec(E::default_spec()));
942+
let mut rng = StdRng::seed_from_u64(0xDEADBEEF0BAD5EEDu64);
943+
944+
let da_checker = new_da_checker(spec.clone());
945+
let custody_context = &da_checker.custody_context;
946+
let all_column_indices_ordered =
947+
init_custody_context_with_ordered_columns(custody_context, &mut rng, &spec);
948+
949+
// GIVEN a single 32 ETH validator is attached slot 0
950+
let epoch = Epoch::new(0);
951+
let validator_0 = 0;
952+
custody_context.register_validators(
953+
vec![(validator_0, 32_000_000_000)],
954+
epoch.start_slot(E::slots_per_epoch()),
955+
&spec,
956+
);
957+
assert_eq!(
958+
custody_context.num_of_data_columns_to_sample(epoch, &spec),
959+
spec.validator_custody_requirement as usize,
960+
"sampling size should be the minimal custody requirement == 8"
961+
);
962+
963+
// WHEN additional attached validators result in a CGC increase to 10 at the end slot of the same epoch
964+
let validator_1 = 1;
965+
let cgc_change_slot = epoch.end_slot(E::slots_per_epoch());
966+
custody_context.register_validators(
967+
vec![(validator_1, 32_000_000_000 * 9)],
968+
cgc_change_slot,
969+
&spec,
970+
);
971+
// AND custody columns (8) and any new extra columns (2) are received via gossip.
972+
// NOTE: CGC updates results in new topics subscriptions immediately, and extra columns may start to
973+
// arrive via gossip.
974+
let (_, data_columns) = generate_rand_block_and_data_columns::<E>(
975+
ForkName::Fulu,
976+
NumBlobs::Number(1),
977+
&mut rng,
978+
&spec,
979+
);
980+
let block_root = Hash256::random();
981+
let requested_columns = &all_column_indices_ordered[..10];
982+
let gossip_columns = data_columns
983+
.into_iter()
984+
.filter(|d| requested_columns.contains(&d.index))
985+
.map(GossipVerifiedDataColumn::<T>::__new_for_testing)
986+
.collect::<Vec<_>>();
987+
da_checker
988+
.put_gossip_verified_data_columns(block_root, cgc_change_slot, gossip_columns)
989+
.expect("should put gossip custody columns");
990+
991+
// THEN the sampling size for the end slot of the same epoch remains unchanged
992+
let sampling_columns = custody_context.sampling_columns_for_epoch(epoch, &spec);
993+
assert_eq!(
994+
sampling_columns.len(),
995+
spec.validator_custody_requirement as usize // 8
996+
);
997+
// AND any extra columns received via gossip responses are excluded from import.
998+
let actual_cached: HashSet<ColumnIndex> = da_checker
999+
.cached_data_column_indexes(&block_root)
1000+
.expect("should have cached data columns")
1001+
.into_iter()
1002+
.collect();
1003+
let expected_sampling_columns = sampling_columns.iter().copied().collect::<HashSet<_>>();
1004+
assert_eq!(
1005+
actual_cached, expected_sampling_columns,
1006+
"should cache only the effective sampling columns"
1007+
);
1008+
assert!(
1009+
actual_cached.len() < requested_columns.len(),
1010+
"extra columns should be excluded"
1011+
)
1012+
}
1013+
1014+
fn init_custody_context_with_ordered_columns(
1015+
custody_context: &Arc<CustodyContext<E>>,
1016+
mut rng: &mut StdRng,
1017+
spec: &ChainSpec,
1018+
) -> Vec<u64> {
1019+
let mut all_data_columns = (0..spec.number_of_custody_groups).collect::<Vec<_>>();
1020+
all_data_columns.shuffle(&mut rng);
1021+
custody_context
1022+
.init_ordered_data_columns_from_custody_groups(all_data_columns.clone(), spec)
1023+
.expect("should initialise ordered custody columns");
1024+
all_data_columns
1025+
}
1026+
1027+
fn new_da_checker(spec: Arc<ChainSpec>) -> DataAvailabilityChecker<T> {
1028+
let slot_clock = TestingSlotClock::new(
1029+
Slot::new(0),
1030+
Duration::from_secs(0),
1031+
Duration::from_secs(spec.seconds_per_slot),
1032+
);
1033+
let kzg = get_kzg(&spec);
1034+
let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap());
1035+
let custody_context = Arc::new(CustodyContext::new(false));
1036+
DataAvailabilityChecker::new(slot_clock, kzg, store, custody_context, spec)
1037+
.expect("should initialise data availability checker")
1038+
}
1039+
}

0 commit comments

Comments
 (0)