Skip to content

Rough prototype sending fcU earlier #7752

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1143,6 +1143,7 @@ fn verify_head_block_is_known<T: BeaconChainTypes>(
.fork_choice_read_lock()
.get_block(&attestation_data.beacon_block_root)
.or_else(|| {
// FIXME(sproul): abolish?
chain
.early_attester_cache
.get_proto_block(attestation_data.beacon_block_root)
Expand Down
72 changes: 23 additions & 49 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3824,7 +3824,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// (i.e., this function is not atomic).
#[allow(clippy::too_many_arguments)]
fn import_block(
&self,
self: &Arc<Self>,
signed_block: AvailableBlock<T::EthSpec>,
block_root: Hash256,
mut state: BeaconState<T::EthSpec>,
Expand Down Expand Up @@ -3907,57 +3907,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.map_err(|e| BlockError::BeaconChainError(Box::new(e.into())))?;
}

// If the block is recent enough and it was not optimistically imported, check to see if it
// becomes the head block. If so, apply it to the early attester cache. This will allow
// attestations to the block without waiting for the block and state to be inserted to the
// database.
//
// Only performing this check on recent blocks avoids slowing down sync with lots of calls
// to fork choice `get_head`.
// Run a fork choice update immediately in the common case where we are in sync.
//
// Optimistically imported blocks are not added to the cache since the cache is only useful
// for a small window of time and the complexity of keeping track of the optimistic status
// is not worth it.
// This gets the fcU to the EL ASAP, prior to disk writes and other slow operations.
if !payload_verification_status.is_optimistic()
&& block.slot() + EARLY_ATTESTER_CACHE_HISTORIC_SLOTS >= current_slot
{
let fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE);
match fork_choice.get_head(current_slot, &self.spec) {
// This block became the head, add it to the early attester cache.
Ok(new_head_root) if new_head_root == block_root => {
if let Some(proto_block) = fork_choice.get_block(&block_root) {
if let Err(e) = self.early_attester_cache.add_head_block(
block_root,
&signed_block,
proto_block,
&state,
&self.spec,
) {
warn!(
error = ?e,
"Early attester cache insert failed"
);
} else {
let attestable_timestamp =
self.slot_clock.now_duration().unwrap_or_default();
self.block_times_cache.write().set_time_attestable(
block_root,
signed_block.slot(),
attestable_timestamp,
)
}
} else {
warn!(?block_root, "Early attester block missing");
}
}
// This block did not become the head, nothing to do.
Ok(_) => (),
Err(e) => error!(
error = ?e,
"Failed to compute head during block import"
),
}
drop(fork_choice_timer);
let _fork_choice_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_FORK_CHOICE);
let pending_block_snapshot = BeaconSnapshot {
beacon_block: signed_block.block_cloned(),
beacon_block_root: block_root,
beacon_state: state.clone(),
};
self.recompute_head_at_slot_blocking(
current_slot,
fork_choice,
Some(pending_block_snapshot),
)?;
}
drop(post_exec_timer);

Expand Down Expand Up @@ -4005,10 +3971,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
?block_root,
"Failed to store data columns into the database"
);
return Err(BlockError::InternalError(e));
/*
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(BlockError::InternalError(e)));
*/
}
}

Expand All @@ -4023,15 +3992,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
error = ?e,
"Database write failed!"
);
/* FIXME(sproul): hmmm
return Err(self
.handle_import_block_db_write_error(fork_choice)
.err()
.unwrap_or(e.into()));
*/
return Err(e.into());
}

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);
// FIXME(sproul): consider how to restore this invariant
// drop(fork_choice);

// We're declaring the block "imported" at this point, since fork choice and the DB know
// about it.
Expand Down Expand Up @@ -4070,6 +4043,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(block_root)
}

#[allow(dead_code)]
fn handle_import_block_db_write_error(
&self,
// We don't actually need this value, however it's always present when we call this function
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/src/block_times_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub struct BlockDelays {
/// We need to use `available` again rather than `attestable` to handle the case where the block
/// does not get added to the early-attester cache.
pub imported: Option<Duration>,
/// Time after `imported`.
/// Time after `available`.
pub set_as_head: Option<Duration>,
}

Expand Down Expand Up @@ -83,7 +83,7 @@ impl BlockDelays {
.and_then(|imported_time| imported_time.checked_sub(available_time?));
let set_as_head = times
.set_as_head
.and_then(|set_as_head_time| set_as_head_time.checked_sub(times.imported?));
.and_then(|set_as_head_time| set_as_head_time.checked_sub(available_time?));
BlockDelays {
observed,
all_blobs_observed,
Expand Down
86 changes: 66 additions & 20 deletions beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let chain = self.clone();
match self
.spawn_blocking_handle(
move || chain.recompute_head_at_slot_internal(current_slot),
"recompute_head_internal",
move || {
let fork_choice_write_lock = chain.canonical_head.fork_choice_write_lock();
chain.recompute_head_at_slot_blocking(
current_slot,
fork_choice_write_lock,
None,
)
},
"recompute_head_blocking",
)
.await
{
Expand Down Expand Up @@ -546,10 +553,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// This function performs long-running, heavy-lifting tasks which should not be performed on
/// the core `tokio` executor.
fn recompute_head_at_slot_internal(
pub fn recompute_head_at_slot_blocking(
self: &Arc<Self>,
current_slot: Slot,
mut fork_choice_write_lock: RwLockWriteGuard<'_, BeaconForkChoice<T>>,
pending_block_snapshot: Option<BeaconSnapshot<T::EthSpec>>,
) -> Result<Option<JoinHandle<Option<()>>>, Error> {
// FIXME(sproul): consider getting rid of this lock
let recompute_head_lock = self.canonical_head.recompute_head_lock.lock();

// Take a clone of the current ("old") head.
Expand All @@ -567,8 +577,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
finalized_checkpoint: old_cached_head.finalized_checkpoint(),
};

let mut fork_choice_write_lock = self.canonical_head.fork_choice_write_lock();

// Recompute the current head via the fork choice algorithm.
fork_choice_write_lock.get_head(current_slot, &self.spec)?;

Expand Down Expand Up @@ -636,34 +644,77 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// will just cause lock contention.
drop(fork_choice_read_lock);

// Update the early attester cache as soon as the fork choice lock is dropped.
if let Some(ref snapshot) = pending_block_snapshot {
if snapshot.beacon_block_root == new_view.head_block_root {
// FIXME(sproul): update add_head_block to take a snapshot
if let Err(e) = self.early_attester_cache.add_head_block(
snapshot.beacon_block_root,
snapshot.beacon_block.clone(),
new_head_proto_block.clone(),
&snapshot.beacon_state,
&self.spec,
) {
warn!(
error = ?e,
"Early attester cache insert failed"
);
} else {
let attestable_timestamp = self.slot_clock.now_duration().unwrap_or_default();
self.block_times_cache.write().set_time_attestable(
snapshot.beacon_block_root,
snapshot.beacon_block.slot(),
attestable_timestamp,
)
}
}
}

// The execution layer updates might attempt to take a write-lock on fork choice, so it's
// important to ensure the fork-choice lock isn't being held (dropped a few lines earlier).
//
// We want to shoot this update off ASAP so that the EL can update its view of the head and
// start work like block building, etc. We used to unnecessarily do a bunch of work before
// this: https://github.com/sigp/lighthouse/issues/7745
let el_update_handle =
spawn_execution_layer_updates(self.clone(), new_forkchoice_update_parameters)?;

// If the head has changed, update `self.canonical_head`.
let new_cached_head = if new_view.head_block_root != old_view.head_block_root {
metrics::inc_counter(&metrics::FORK_CHOICE_CHANGED_HEAD);

let mut new_snapshot = {
let beacon_block = self
.store
.get_full_block(&new_view.head_block_root)?
.ok_or(Error::MissingBeaconBlock(new_view.head_block_root))?;
// FIXME(sproul): this would be nicer with a let chain.
let head_block_root = new_view.head_block_root;
let store = self.store.clone();
let get_snapshot = move || -> Result<BeaconSnapshot<_>, Error> {
if let Some(snapshot) = pending_block_snapshot {
if snapshot.beacon_block_root == head_block_root {
return Ok(snapshot);
}
}
let beacon_block = store
.get_full_block(&head_block_root)?
.ok_or(Error::MissingBeaconBlock(head_block_root))?;

let (_, beacon_state) = self
.store
let (_, beacon_state) = store
.get_advanced_hot_state(
new_view.head_block_root,
head_block_root,
current_slot,
beacon_block.state_root(),
)?
.ok_or(Error::MissingBeaconState(beacon_block.state_root()))?;

BeaconSnapshot {
Ok(BeaconSnapshot {
beacon_block: Arc::new(beacon_block),
beacon_block_root: new_view.head_block_root,
beacon_state,
}
})
};

// Regardless of where we got the state from, attempt to build all the
// caches except the tree hash cache.
let mut new_snapshot = get_snapshot()?;

new_snapshot.beacon_state.build_all_caches(&self.spec)?;

let new_cached_head = CachedHead {
Expand Down Expand Up @@ -747,11 +798,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

// The execution layer updates might attempt to take a write-lock on fork choice, so it's
// important to ensure the fork-choice lock isn't being held.
let el_update_handle =
spawn_execution_layer_updates(self.clone(), new_forkchoice_update_parameters)?;

// We have completed recomputing the head and it's now valid for another process to do the
// same.
drop(recompute_head_lock);
Expand Down
25 changes: 10 additions & 15 deletions beacon_node/beacon_chain/src/early_attester_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::data_availability_checker::{AvailableBlock, AvailableBlockData};
use crate::{
attester_cache::{CommitteeLengths, Error},
metrics,
Expand All @@ -21,8 +20,6 @@ pub struct CacheItem<E: EthSpec> {
* Values used to make the block available.
*/
block: Arc<SignedBeaconBlock<E>>,
blobs: Option<BlobSidecarList<E>>,
data_columns: Option<DataColumnSidecarList<E>>,
proto_block: ProtoBlock,
}

Expand Down Expand Up @@ -52,7 +49,7 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
pub fn add_head_block(
&self,
beacon_block_root: Hash256,
block: &AvailableBlock<E>,
block: Arc<SignedBeaconBlock<E>>,
proto_block: ProtoBlock,
state: &BeaconState<E>,
spec: &ChainSpec,
Expand All @@ -70,21 +67,13 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
},
};

let (blobs, data_columns) = match block.data() {
AvailableBlockData::NoData => (None, None),
AvailableBlockData::Blobs(blobs) => (Some(blobs.clone()), None),
AvailableBlockData::DataColumns(data_columns) => (None, Some(data_columns.clone())),
};

let item = CacheItem {
epoch,
committee_lengths,
beacon_block_root,
source,
target,
block: block.block_cloned(),
blobs,
data_columns,
block,
proto_block,
};

Expand Down Expand Up @@ -163,21 +152,27 @@ impl<E: EthSpec> EarlyAttesterCache<E> {
}

/// Returns the blobs, if `block_root` matches the cached item.
pub fn get_blobs(&self, block_root: Hash256) -> Option<BlobSidecarList<E>> {
pub fn get_blobs(&self, _block_root: Hash256) -> Option<BlobSidecarList<E>> {
/* FIXME(sproul): nah bruv
self.item
.read()
.as_ref()
.filter(|item| item.beacon_block_root == block_root)
.and_then(|item| item.blobs.clone())
*/
None
}

/// Returns the data columns, if `block_root` matches the cached item.
pub fn get_data_columns(&self, block_root: Hash256) -> Option<DataColumnSidecarList<E>> {
pub fn get_data_columns(&self, _block_root: Hash256) -> Option<DataColumnSidecarList<E>> {
/* FIXME(sproul): nah bruv
self.item
.read()
.as_ref()
.filter(|item| item.beacon_block_root == block_root)
.and_then(|item| item.data_columns.clone())
*/
None
}

/// Returns the proto-array block, if `block_root` matches the cached item.
Expand Down
4 changes: 2 additions & 2 deletions beacon_node/beacon_chain/tests/attestation_production.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ async fn produces_attestations() {
.early_attester_cache
.add_head_block(
block_root,
&available_block,
available_block.block_cloned(),
proto_block,
&state,
&chain.spec,
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn early_attester_cache_old_request() {
.early_attester_cache
.add_head_block(
head.beacon_block_root,
&available_block,
available_block.block_cloned(),
head_proto_block,
&head.beacon_state,
&harness.chain.spec,
Expand Down
Loading