Skip to content

Instrument tracing spans for block processing and import #7816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 39 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
c1a50bb
Add opentelemetry OTLP exporter for spans.
jimmygchen Jul 30, 2025
6dba431
Add `workspace_filter` to telemetry layer to exclude libp2p traces.
jimmygchen Jul 30, 2025
4147599
Squashed commit of the following:
jimmygchen Jul 30, 2025
04c23d9
Remove all spans.
jimmygchen Jul 30, 2025
03c647a
load_parent test.
jimmygchen Jul 30, 2025
c0804dc
Add gossip block instrumentation.
jimmygchen Jul 30, 2025
0a8d22f
more instrument.
jimmygchen Jul 30, 2025
172d859
Instrument `process_block` and `import_block`.
jimmygchen Jul 30, 2025
386b425
Add more detailed instruments to `import_block`
jimmygchen Jul 30, 2025
4268ae5
Add more detailed instruments to `import_block`
jimmygchen Jul 30, 2025
3f55de0
Add more detailed instruments to `import_block`
jimmygchen Jul 30, 2025
26e30c3
More instruments.
jimmygchen Jul 30, 2025
db4d342
More instruments.
jimmygchen Jul 30, 2025
62940aa
More instruments.
jimmygchen Jul 30, 2025
e3f6273
Add service name
jimmygchen Jul 30, 2025
2f6c32b
More instruments
jimmygchen Jul 30, 2025
e7fcb4c
Attempt to fix orphan spans
jimmygchen Jul 30, 2025
868320a
A few more
jimmygchen Jul 31, 2025
a8055ae
disable resource to test root span not yet received issue
jimmygchen Jul 31, 2025
e4e9746
Clean ups.
jimmygchen Jul 31, 2025
533ecc0
Revert potential bad change from 62940aa1 (resulted in missing root s…
jimmygchen Jul 31, 2025
29d65bc
Add instrument back in.
jimmygchen Jul 31, 2025
033861b
Merge branch 'tracing-spans-test' of github.com:jimmygchen/lighthouse…
jimmygchen Jul 31, 2025
3ac075f
Make `process_block` a root span.
jimmygchen Jul 31, 2025
fb3bf31
More fixes
jimmygchen Jul 31, 2025
333773f
More fixes
jimmygchen Jul 31, 2025
e1f584e
Fix span levels and add tracing to gossip/rpc block and blob paths.
jimmygchen Jul 31, 2025
c0991f5
Add process chain segment
jimmygchen Jul 31, 2025
d340231
Set parent to None on network beacon processors to prevent inheriting…
jimmygchen Jul 31, 2025
91a2400
Add RPC method instruments.
jimmygchen Jul 31, 2025
4a3689d
Remove unnecessary enter()
jimmygchen Aug 1, 2025
d1751f1
Fill more instrument gaps in `process_chain_segment`
jimmygchen Aug 1, 2025
61a772f
Fill more instrument gaps in `process_chain_segment`
jimmygchen Aug 1, 2025
b358a1a
Instrument `PendingComponents`.
jimmygchen Aug 1, 2025
4af2855
Instrument `PendingComponents` set parent to `None` to avoid incorrec…
jimmygchen Aug 1, 2025
1cd9683
Remove unnecessary `allow` macro usage
jimmygchen Aug 6, 2025
ec37ad9
Merge branch 'unstable' into tracing-spans-test
jimmygchen Aug 6, 2025
83a61db
Merge remote-tracking branch 'origin/unstable' into tracing-spans-test
jimmygchen Aug 7, 2025
5d8a353
Merge branch 'unstable' into tracing-spans-test
jimmygchen Aug 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 282 additions & 21 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ network = { path = "beacon_node/network" }
node_test_rig = { path = "testing/node_test_rig" }
num_cpus = "1"
once_cell = "1.17.1"
opentelemetry = "0.30.0"
opentelemetry-otlp = { version = "0.30.0", features = ["grpc-tonic"] }
opentelemetry_sdk = "0.30.0"
operation_pool = { path = "beacon_node/operation_pool" }
parking_lot = "0.12"
paste = "1"
Expand Down Expand Up @@ -253,6 +256,7 @@ tracing = "0.1.40"
tracing-appender = "0.2"
tracing-core = "0.1"
tracing-log = "0.2"
tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tree_hash = "0.10.0"
tree_hash_derive = "0.10.0"
Expand Down
57 changes: 48 additions & 9 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ use store::{
};
use task_executor::{ShutdownReason, TaskExecutor};
use tokio_stream::Stream;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, debug_span, error, info, info_span, instrument, trace, warn, Span};
use tree_hash::TreeHash;
use types::blob_sidecar::FixedBlobSidecarList;
use types::data_column_sidecar::ColumnIndex;
Expand Down Expand Up @@ -2203,6 +2203,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

#[instrument(skip_all, level = "trace")]
pub fn verify_data_column_sidecar_for_gossip(
self: &Arc<Self>,
data_column_sidecar: Arc<DataColumnSidecar<T::EthSpec>>,
Expand All @@ -2215,6 +2216,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

#[instrument(skip_all, level = "trace")]
pub fn verify_blob_sidecar_for_gossip(
self: &Arc<Self>,
blob_sidecar: Arc<BlobSidecar<T::EthSpec>>,
Expand Down Expand Up @@ -2851,8 +2853,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Filter uninteresting blocks from the chain segment in a blocking task.
let chain = self.clone();
let filter_chain_segment = debug_span!("filter_chain_segment");
let filtered_chain_segment_future = self.spawn_blocking_handle(
move || chain.filter_chain_segment(chain_segment),
move || {
let _guard = filter_chain_segment.enter();
chain.filter_chain_segment(chain_segment)
},
"filter_chain_segment",
);
let mut filtered_chain_segment = match filtered_chain_segment_future.await {
Expand Down Expand Up @@ -2883,8 +2889,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
std::mem::swap(&mut blocks, &mut filtered_chain_segment);

let chain = self.clone();
let current_span = Span::current();
let signature_verification_future = self.spawn_blocking_handle(
move || signature_verify_chain_segment(blocks, &chain),
move || {
let _guard = current_span.enter();
signature_verify_chain_segment(blocks, &chain)
},
"signature_verify_chain_segment",
);

Expand Down Expand Up @@ -2974,10 +2984,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
block: Arc<SignedBeaconBlock<T::EthSpec>>,
) -> Result<GossipVerifiedBlock<T>, BlockError> {
let chain = self.clone();
let span = Span::current();
self.task_executor
.clone()
.spawn_blocking_handle(
move || {
let _guard = span.enter();
let slot = block.slot();
let graffiti_string = block.message().body().graffiti().as_utf8_lossy();

Expand Down Expand Up @@ -3006,7 +3018,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}
},
"payload_verification_handle",
"gossip_block_verification_handle",
)
.ok_or(BeaconChainError::RuntimeShutdown)?
.await
Expand All @@ -3015,6 +3027,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Cache the blob in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
#[instrument(skip_all, level = "debug")]
pub async fn process_gossip_blob(
self: &Arc<Self>,
blob: GossipVerifiedBlob<T>,
Expand Down Expand Up @@ -3088,6 +3101,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
/// imported or errors.
#[instrument(skip_all, level = "debug")]
pub async fn process_rpc_blobs(
self: &Arc<Self>,
slot: Slot,
Expand Down Expand Up @@ -3383,6 +3397,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
///
/// Returns an `Err` if the given block was invalid, or an error was encountered during
/// verification.
#[instrument(skip_all, fields(block_root = ?block_root, block_source = %block_source))]
pub async fn process_block<B: IntoExecutionPendingBlock<T>>(
self: &Arc<Self>,
block_root: Hash256,
Expand Down Expand Up @@ -3499,6 +3514,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// get a fully `ExecutedBlock`.
///
/// An error is returned if the verification handle couldn't be awaited.
#[instrument(skip_all, level = "debug")]
pub async fn into_executed_block(
self: Arc<Self>,
execution_pending_block: ExecutionPendingBlock<T>,
Expand Down Expand Up @@ -3547,6 +3563,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

/// Checks if the block is available, and imports immediately if so, otherwise caches the block
/// in the data availability checker.
#[instrument(skip_all)]
async fn check_block_availability_and_import(
self: &Arc<Self>,
block: AvailabilityPendingExecutedBlock<T::EthSpec>,
Expand Down Expand Up @@ -3747,6 +3764,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}
}

#[instrument(skip_all)]
pub async fn import_available_block(
self: &Arc<Self>,
block: Box<AvailableExecutedBlock<T::EthSpec>>,
Expand Down Expand Up @@ -3775,11 +3793,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// TODO(das) record custody column available timestamp

// import
let chain = self.clone();
let block_root = self
.spawn_blocking_handle(
let block_root = {
// Capture the current span before moving into the blocking task
let current_span = tracing::Span::current();
let chain = self.clone();
self.spawn_blocking_handle(
move || {
// Enter the captured span in the blocking thread
let _guard = current_span.enter();
chain.import_block(
block,
block_root,
Expand All @@ -3791,7 +3812,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
},
"payload_verification_handle",
)
.await??;
.await??
};

// Remove block components from da_checker AFTER completing block import. Then we can assert
// the following invariant:
Expand All @@ -3815,6 +3837,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// An error is returned if the block was unable to be imported. It may be partially imported
/// (i.e., this function is not atomic).
#[allow(clippy::too_many_arguments)]
#[instrument(skip_all)]
fn import_block(
&self,
signed_block: AvailableBlock<T::EthSpec>,
Expand Down Expand Up @@ -3852,6 +3875,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Only take a write lock if there are new keys to import.
if state.validators().len() > pubkey_cache.len() {
let _pubkey_span = debug_span!(
"pubkey_cache_update",
new_validators = tracing::field::Empty,
cache_len_before = pubkey_cache.len()
)
.entered();

parking_lot::RwLockUpgradableReadGuard::upgrade(pubkey_cache)
.import_new_pubkeys(&state)?
} else {
Expand All @@ -3865,6 +3895,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// However, latency between the VC and the BN might cause the VC to produce attestations at
// a previous slot.
if state.current_epoch().saturating_add(1_u64) >= current_epoch {
let _attester_span = debug_span!("attester_cache_update").entered();
self.attester_cache
.maybe_cache_state(&state, block_root, &self.spec)
.map_err(BeaconChainError::from)?;
Expand Down Expand Up @@ -4009,6 +4040,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
ops.push(StoreOp::PutBlock(block_root, signed_block.clone()));
ops.push(StoreOp::PutState(block.state_root(), &state));

let db_span = info_span!("persist_blocks_and_blobs").entered();

if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) {
error!(
msg = "Restoring fork choice from disk",
Expand All @@ -4021,6 +4054,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.unwrap_or(e.into()));
}

drop(db_span);

// The fork choice write-lock is dropped *after* the on-disk database has been updated.
// This prevents inconsistency between the two at the expense of concurrency.
drop(fork_choice);
Expand Down Expand Up @@ -4155,6 +4190,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// Process a block for the validator monitor, including all its constituent messages.
#[instrument(skip_all, level = "debug")]
fn import_block_update_validator_monitor(
&self,
block: BeaconBlockRef<T::EthSpec>,
Expand Down Expand Up @@ -4249,6 +4285,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// Iterate through the attestations in the block and register them as "observed".
///
/// This will stop us from propagating them on the gossip network.
#[instrument(skip_all, level = "debug")]
fn import_block_observe_attestations(
&self,
block: BeaconBlockRef<T::EthSpec>,
Expand Down Expand Up @@ -4311,6 +4348,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

/// If a slasher is configured, provide the attestations from the block.
#[instrument(skip_all, level = "debug")]
fn import_block_update_slasher(
&self,
block: BeaconBlockRef<T::EthSpec>,
Expand Down Expand Up @@ -4409,6 +4447,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
#[instrument(skip_all, level = "debug")]
fn import_block_update_shuffling_cache(
&self,
block_root: Hash256,
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/blob_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{metrics, BeaconChainError};
use kzg::{Error as KzgError, Kzg, KzgCommitment};
use ssz_derive::{Decode, Encode};
use std::time::Duration;
use tracing::debug;
use tracing::{debug, instrument};
use tree_hash::TreeHash;
use types::blob_sidecar::BlobIdentifier;
use types::{
Expand Down Expand Up @@ -374,6 +374,7 @@ impl<E: EthSpec> IntoIterator for KzgVerifiedBlobList<E> {
///
/// Note: This function should be preferred over calling `verify_kzg_for_blob`
/// in a loop since this function kzg verifies a list of blobs more efficiently.
#[instrument(skip_all, level = "debug")]
pub fn verify_kzg_for_blob_list<'a, E: EthSpec, I>(
blob_iter: I,
kzg: &'a Kzg,
Expand Down
Loading