Skip to content

Commit 6315152

Browse files
authored
Merge of #7816
2 parents cafb364 + 83a61db commit 6315152

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+633
-1164
lines changed

Cargo.lock

Lines changed: 282 additions & 21 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,9 @@ network = { path = "beacon_node/network" }
193193
node_test_rig = { path = "testing/node_test_rig" }
194194
num_cpus = "1"
195195
once_cell = "1.17.1"
196+
opentelemetry = "0.30.0"
197+
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic"] }
198+
opentelemetry_sdk = "0.30.0"
196199
operation_pool = { path = "beacon_node/operation_pool" }
197200
parking_lot = "0.12"
198201
paste = "1"
@@ -253,6 +256,7 @@ tracing = "0.1.40"
253256
tracing-appender = "0.2"
254257
tracing-core = "0.1"
255258
tracing-log = "0.2"
259+
tracing-opentelemetry = "0.31.0"
256260
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
257261
tree_hash = "0.10.0"
258262
tree_hash_derive = "0.10.0"

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 48 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ use store::{
126126
};
127127
use task_executor::{ShutdownReason, TaskExecutor};
128128
use tokio_stream::Stream;
129-
use tracing::{debug, error, info, trace, warn};
129+
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn, Span};
130130
use tree_hash::TreeHash;
131131
use types::blob_sidecar::FixedBlobSidecarList;
132132
use types::data_column_sidecar::ColumnIndex;
@@ -2203,6 +2203,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
22032203
})
22042204
}
22052205

2206+
#[instrument(skip_all, level = "trace")]
22062207
pub fn verify_data_column_sidecar_for_gossip(
22072208
self: &Arc<Self>,
22082209
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
@@ -2215,6 +2216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
22152216
})
22162217
}
22172218

2219+
#[instrument(skip_all, level = "trace")]
22182220
pub fn verify_blob_sidecar_for_gossip(
22192221
self: &Arc<Self>,
22202222
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
@@ -2851,8 +2853,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28512853

28522854
// Filter uninteresting blocks from the chain segment in a blocking task.
28532855
let chain = self.clone();
2856+
let filter_chain_segment = debug_span!("filter_chain_segment");
28542857
let filtered_chain_segment_future = self.spawn_blocking_handle(
2855-
move || chain.filter_chain_segment(chain_segment),
2858+
move || {
2859+
let _guard = filter_chain_segment.enter();
2860+
chain.filter_chain_segment(chain_segment)
2861+
},
28562862
"filter_chain_segment",
28572863
);
28582864
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
@@ -2883,8 +2889,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
28832889
std::mem::swap(&mut blocks, &mut filtered_chain_segment);
28842890

28852891
let chain = self.clone();
2892+
let current_span = Span::current();
28862893
let signature_verification_future = self.spawn_blocking_handle(
2887-
move || signature_verify_chain_segment(blocks, &chain),
2894+
move || {
2895+
let _guard = current_span.enter();
2896+
signature_verify_chain_segment(blocks, &chain)
2897+
},
28882898
"signature_verify_chain_segment",
28892899
);
28902900

@@ -2974,10 +2984,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29742984
block: Arc<SignedBeaconBlock<T::EthSpec>>,
29752985
) -> Result<GossipVerifiedBlock<T>, BlockError> {
29762986
let chain = self.clone();
2987+
let span = Span::current();
29772988
self.task_executor
29782989
.clone()
29792990
.spawn_blocking_handle(
29802991
move || {
2992+
let _guard = span.enter();
29812993
let slot = block.slot();
29822994
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();
29832995

@@ -3006,7 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30063018
}
30073019
}
30083020
},
3009-
"payload_verification_handle",
3021+
"gossip_block_verification_handle",
30103022
)
30113023
.ok_or(BeaconChainError::RuntimeShutdown)?
30123024
.await
@@ -3015,6 +3027,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30153027

30163028
/// Cache the blob in the processing cache, process it, then evict it from the cache if it was
30173029
/// imported or errors.
3030+
#[instrument(skip_all, level = "debug")]
30183031
pub async fn process_gossip_blob(
30193032
self: &Arc<Self>,
30203033
blob: GossipVerifiedBlob<T>,
@@ -3088,6 +3101,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30883101

30893102
/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
30903103
/// imported or errors.
3104+
#[instrument(skip_all, level = "debug")]
30913105
pub async fn process_rpc_blobs(
30923106
self: &Arc<Self>,
30933107
slot: Slot,
@@ -3383,6 +3397,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33833397
///
33843398
/// Returns an `Err` if the given block was invalid, or an error was encountered during
33853399
/// verification.
3400+
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
33863401
pub async fn process_block<B: IntoExecutionPendingBlock<T>>(
33873402
self: &Arc<Self>,
33883403
block_root: Hash256,
@@ -3499,6 +3514,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
34993514
/// get a fully `ExecutedBlock`.
35003515
///
35013516
/// An error is returned if the verification handle couldn't be awaited.
3517+
#[instrument(skip_all, level = "debug")]
35023518
pub async fn into_executed_block(
35033519
self: Arc<Self>,
35043520
execution_pending_block: ExecutionPendingBlock<T>,
@@ -3547,6 +3563,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
35473563

35483564
/// Checks if the block is available, and imports immediately if so, otherwise caches the block
35493565
/// in the data availability checker.
3566+
#[instrument(skip_all)]
35503567
async fn check_block_availability_and_import(
35513568
self: &Arc<Self>,
35523569
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
@@ -3747,6 +3764,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37473764
}
37483765
}
37493766

3767+
#[instrument(skip_all)]
37503768
pub async fn import_available_block(
37513769
self: &Arc<Self>,
37523770
block: Box<AvailableExecutedBlock<T::EthSpec>>,
@@ -3775,11 +3793,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37753793

37763794
// TODO(das) record custody column available timestamp
37773795

3778-
// import
3779-
let chain = self.clone();
3780-
let block_root = self
3781-
.spawn_blocking_handle(
3796+
let block_root = {
3797+
// Capture the current span before moving into the blocking task
3798+
let current_span = tracing::Span::current();
3799+
let chain = self.clone();
3800+
self.spawn_blocking_handle(
37823801
move || {
3802+
// Enter the captured span in the blocking thread
3803+
let _guard = current_span.enter();
37833804
chain.import_block(
37843805
block,
37853806
block_root,
@@ -3791,7 +3812,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
37913812
},
37923813
"payload_verification_handle",
37933814
)
3794-
.await??;
3815+
.await??
3816+
};
37953817

37963818
// Remove block components from da_checker AFTER completing block import. Then we can assert
37973819
// the following invariant:
@@ -3815,6 +3837,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38153837
/// An error is returned if the block was unable to be imported. It may be partially imported
38163838
/// (i.e., this function is not atomic).
38173839
#[allow(clippy::too_many_arguments)]
3840+
#[instrument(skip_all)]
38183841
fn import_block(
38193842
&self,
38203843
signed_block: AvailableBlock<T::EthSpec>,
@@ -3852,6 +3875,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38523875

38533876
// Only take a write lock if there are new keys to import.
38543877
if state.validators().len() > pubkey_cache.len() {
3878+
let _pubkey_span = debug_span!(
3879+
"pubkey_cache_update",
3880+
new_validators = tracing::field::Empty,
3881+
cache_len_before = pubkey_cache.len()
3882+
)
3883+
.entered();
3884+
38553885
parking_lot::RwLockUpgradableReadGuard::upgrade(pubkey_cache)
38563886
.import_new_pubkeys(&state)?
38573887
} else {
@@ -3865,6 +3895,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
38653895
// However, latency between the VC and the BN might cause the VC to produce attestations at
38663896
// a previous slot.
38673897
if state.current_epoch().saturating_add(1_u64) >= current_epoch {
3898+
let _attester_span = debug_span!("attester_cache_update").entered();
38683899
self.attester_cache
38693900
.maybe_cache_state(&state, block_root, &self.spec)
38703901
.map_err(BeaconChainError::from)?;
@@ -4009,6 +4040,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
40094040
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
40104041
ops.push(StoreOp::PutState(block.state_root(), &state));
40114042

4043+
let db_span = info_span!("persist_blocks_and_blobs").entered();
4044+
40124045
if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
40134046
error!(
40144047
msg = "Restoring fork choice from disk",
@@ -4021,6 +4054,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
40214054
.unwrap_or(e.into()));
40224055
}
40234056

4057+
drop(db_span);
4058+
40244059
// The fork choice write-lock is dropped *after* the on-disk database has been updated.
40254060
// This prevents inconsistency between the two at the expense of concurrency.
40264061
drop(fork_choice);
@@ -4155,6 +4190,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
41554190
}
41564191

41574192
/// Process a block for the validator monitor, including all its constituent messages.
4193+
#[instrument(skip_all, level = "debug")]
41584194
fn import_block_update_validator_monitor(
41594195
&self,
41604196
block: BeaconBlockRef<T::EthSpec>,
@@ -4249,6 +4285,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
42494285
/// Iterate through the attestations in the block and register them as "observed".
42504286
///
42514287
/// This will stop us from propagating them on the gossip network.
4288+
#[instrument(skip_all, level = "debug")]
42524289
fn import_block_observe_attestations(
42534290
&self,
42544291
block: BeaconBlockRef<T::EthSpec>,
@@ -4311,6 +4348,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
43114348
}
43124349

43134350
/// If a slasher is configured, provide the attestations from the block.
4351+
#[instrument(skip_all, level = "debug")]
43144352
fn import_block_update_slasher(
43154353
&self,
43164354
block: BeaconBlockRef<T::EthSpec>,
@@ -4409,6 +4447,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
44094447

44104448
// For the current and next epoch of this state, ensure we have the shuffling from this
44114449
// block in our cache.
4450+
#[instrument(skip_all, level = "debug")]
44124451
fn import_block_update_shuffling_cache(
44134452
&self,
44144453
block_root: Hash256,

beacon_node/beacon_chain/src/blob_verification.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{metrics, BeaconChainError};
1414
use kzg::{Error as KzgError, Kzg, KzgCommitment};
1515
use ssz_derive::{Decode, Encode};
1616
use std::time::Duration;
17-
use tracing::debug;
17+
use tracing::{debug, instrument};
1818
use tree_hash::TreeHash;
1919
use types::blob_sidecar::BlobIdentifier;
2020
use types::{
@@ -374,6 +374,7 @@ impl<E: EthSpec> IntoIterator for KzgVerifiedBlobList<E> {
374374
///
375375
/// Note: This function should be preferred over calling `verify_kzg_for_blob`
376376
/// in a loop since this function kzg verifies a list of blobs more efficiently.
377+
#[instrument(skip_all, level = "debug")]
377378
pub fn verify_kzg_for_blob_list<'a, E: EthSpec, I>(
378379
blob_iter: I,
379380
kzg: &'a Kzg,

0 commit comments

Comments
 (0)