Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 32 additions & 14 deletions cumulus/client/consensus/aura/src/equivocation_import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams, ForkChoiceStrategy,
};
use sc_consensus_aura::standalone as aura_internal;
use sc_consensus_aura::{standalone as aura_internal, AuthoritiesTracker, CompatibilityMode};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
use schnellru::{ByLength, LruMap};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::{error::Error as ConsensusError, BlockOrigin};
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
use sp_core::crypto::Pair;
Expand Down Expand Up @@ -73,12 +74,12 @@ impl<N: std::hash::Hash + PartialEq> NaiveEquivocationDefender<N> {
}

/// A parachain block import verifier that checks for equivocation limits within each slot.
pub struct Verifier<P, Client, Block: BlockT, CIDP> {
pub struct Verifier<P: Pair, Client, Block: BlockT, CIDP> {
client: Arc<Client>,
create_inherent_data_providers: CIDP,
defender: Mutex<NaiveEquivocationDefender<NumberFor<Block>>>,
telemetry: Option<TelemetryHandle>,
_phantom: std::marker::PhantomData<fn() -> (Block, P)>,
authorities_tracker: AuthoritiesTracker<P, Block, Client>,
}

impl<P, Client, Block, CIDP> Verifier<P, Client, Block, CIDP>
Expand All @@ -100,11 +101,11 @@ where
telemetry: Option<TelemetryHandle>,
) -> Self {
Self {
client,
client: client.clone(),
create_inherent_data_providers: inherent_data_provider,
defender: Mutex::new(NaiveEquivocationDefender::default()),
telemetry,
_phantom: std::marker::PhantomData,
authorities_tracker: AuthoritiesTracker::new(client),
}
}
}
Expand All @@ -116,7 +117,11 @@ where
P::Signature: Codec,
P::Public: Codec + Debug,
Block: BlockT,
Client: ProvideRuntimeApi<Block> + Send + Sync,
Client: HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ ProvideRuntimeApi<Block>
+ Send
+ Sync,
<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,

CIDP: CreateInherentDataProviders<Block, ()>,
Expand All @@ -139,10 +144,10 @@ where

// check seal and update pre-hash/post-hash
{
let authorities = aura_internal::fetch_authorities(self.client.as_ref(), parent_hash)
.map_err(|e| {
format!("Could not fetch authorities at {:?}: {}", parent_hash, e)
})?;
let authorities = self
.authorities_tracker
.fetch_or_update(&block_params.header, &CompatibilityMode::None)
.map_err(|e| format!("Could not fetch authorities: {}", e))?;

let slot_duration = self
.client
Expand Down Expand Up @@ -197,6 +202,14 @@ where
post_hash,
))
}

self.authorities_tracker.import(&block_params.header).map_err(|e| {
format!(
"Could not import authorities for block {:?} at number {}: {e}",
block_params.header.hash(),
block_params.header.number(),
)
})?;
},
Err(aura_internal::SealVerificationError::Deferred(hdr, slot)) => {
telemetry!(
Expand Down Expand Up @@ -275,16 +288,21 @@ where
+ Send
+ Sync
+ 'static,
Client: ProvideRuntimeApi<Block> + Send + Sync + 'static,
Client: HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>
+ ProvideRuntimeApi<Block>
+ Send
+ Sync
+ 'static,
<Client as ProvideRuntimeApi<Block>>::Api: BlockBuilderApi<Block> + AuraApi<Block, P::Public>,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
{
let verifier = Verifier::<P, _, _, _> {
client,
client: client.clone(),
create_inherent_data_providers,
defender: Mutex::new(NaiveEquivocationDefender::default()),
telemetry,
_phantom: std::marker::PhantomData,
authorities_tracker: AuthoritiesTracker::new(client.clone()),
};

BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)
Expand Down Expand Up @@ -319,7 +337,7 @@ mod test {
},
defender: Mutex::new(NaiveEquivocationDefender::default()),
telemetry: None,
_phantom: std::marker::PhantomData,
authorities_tracker: AuthoritiesTracker::new(client.clone()),
};

let genesis = client.info().best_hash;
Expand Down
9 changes: 5 additions & 4 deletions cumulus/client/consensus/aura/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use sc_consensus_slots::InherentDataProviderExt;
use sc_telemetry::TelemetryHandle;
use sp_api::{ApiExt, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::HeaderBackend;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::Error as ConsensusError;
use sp_consensus_aura::AuraApi;
use sp_core::crypto::Pair;
Expand Down Expand Up @@ -72,7 +72,8 @@ where
+ Sync
+ AuxStore
+ UsageProvider<Block>
+ HeaderBackend<Block>,
+ HeaderBackend<Block>
+ HeaderMetadata<Block, Error = sp_blockchain::Error>,
I: BlockImport<Block, Error = ConsensusError>
+ ParachainBlockImportMarker
+ Send
Expand Down Expand Up @@ -109,12 +110,12 @@ pub struct BuildVerifierParams<C, CIDP> {
}

/// Build the [`AuraVerifier`].
pub fn build_verifier<P, C, CIDP, N>(
pub fn build_verifier<P: Pair, C, CIDP, B: BlockT>(
BuildVerifierParams { client, create_inherent_data_providers, telemetry }: BuildVerifierParams<
C,
CIDP,
>,
) -> AuraVerifier<C, P, CIDP, N> {
) -> AuraVerifier<C, P, CIDP, B> {
sc_consensus_aura::build_verifier(sc_consensus_aura::BuildVerifierParams {
client,
create_inherent_data_providers,
Expand Down
12 changes: 12 additions & 0 deletions prdoc/pr_9272.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
title: track authorities from aura digests
doc:
- audience: Node Operator
description: |-
Closes https://github.com/paritytech/polkadot-sdk/issues/9064.

Tracks AURA authorities in a `ForkTree`. The fork tree is updated whenever there is an authorities change log in the digest. If the fork tree doesn't contain the authorities, they are fetched for the runtime (should only happen at startup, or if something weird is going on with forks maybe).
crates:
- name: cumulus-client-consensus-aura
bump: patch
- name: sc-consensus-aura
bump: patch
3 changes: 2 additions & 1 deletion substrate/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
async-trait = { workspace = true }
codec = { workspace = true, default-features = true }
fork-tree = { workspace = true, default-features = true }
futures = { workspace = true }
log = { workspace = true, default-features = true }
parking_lot = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }
sc-block-builder = { workspace = true, default-features = true }
sc-client-api = { workspace = true, default-features = true }
Expand All @@ -40,7 +42,6 @@ sp-runtime = { workspace = true, default-features = true }
thiserror = { workspace = true }

[dev-dependencies]
parking_lot = { workspace = true, default-features = true }
sc-keystore = { workspace = true, default-features = true }
sc-network = { workspace = true, default-features = true }
sc-network-test = { workspace = true }
Expand Down
180 changes: 180 additions & 0 deletions substrate/client/consensus/aura/src/authorities_tracker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// This file is part of Substrate.

// Copyright (C) Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Module implementing the logic for verifying and importing AuRa blocks.

use std::{fmt::Debug, sync::Arc};

use codec::Codec;
use fork_tree::ForkTree;
use parking_lot::RwLock;
use sp_api::ProvideRuntimeApi;
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus_aura::{AuraApi, ConsensusLog, AURA_ENGINE_ID};
use sp_core::Pair;
use sp_runtime::{
generic::OpaqueDigestItemId,
traits::{Block, Header, NumberFor},
};

use crate::{fetch_authorities_from_runtime, AuthorityId, CompatibilityMode};

const LOG_TARGET: &str = "aura::authorities_tracker";

/// AURA authorities tracker. Updates authorities based on the AURA authorities change
/// digest in the block header.
pub struct AuthoritiesTracker<P: Pair, B: Block, C> {
authorities: RwLock<ForkTree<B::Hash, NumberFor<B>, Vec<AuthorityId<P>>>>,
client: Arc<C>,
}

impl<P: Pair, B: Block, C> AuthoritiesTracker<P, B, C> {
/// Create a new `AuthoritiesTracker`.
pub fn new(client: Arc<C>) -> Self {
Self { authorities: RwLock::new(ForkTree::new()), client }
}
}

impl<P, B, C> AuthoritiesTracker<P, B, C>
where
P: Pair,
B: Block,
C: HeaderBackend<B> + HeaderMetadata<B, Error = sp_blockchain::Error> + ProvideRuntimeApi<B>,
P::Public: Codec + Debug,
C::Api: AuraApi<B, AuthorityId<P>>,
{
/// Fetch authorities from the tracker, if available. If not available, fetch from the client
/// and update the tracker.
pub fn fetch_or_update(
&self,
header: &B::Header,
compatibility_mode: &CompatibilityMode<NumberFor<B>>,
) -> Result<Vec<AuthorityId<P>>, String> {
let hash = header.hash();
let number = *header.number();
let parent_hash = *header.parent_hash();

// Fetch authorities from cache, if available.
let authorities = {
let is_descendent_of =
sc_client_api::utils::is_descendent_of(&*self.client, Some((hash, parent_hash)));
let authorities_cache = self.authorities.read();
authorities_cache
.find_node_where(&hash, &number, &is_descendent_of, &|_| true)
.map_err(|e| {
format!("Could not find authorities for block {hash:?} at number {number}: {e}")
})?
.map(|node| node.data.clone())
};

match authorities {
Some(authorities) => {
log::debug!(
target: LOG_TARGET,
"Authorities for block {:?} at number {} found in cache",
hash,
number,
);
Ok(authorities)
},
None => {
// Authorities are missing from the cache. Fetch them from the runtime and cache
// them.
log::debug!(
target: LOG_TARGET,
"Authorities for block {:?} at number {} not found in cache, fetching from runtime.",
hash,
number
);
let authorities = fetch_authorities_from_runtime(
&*self.client,
parent_hash,
number,
compatibility_mode,
)
.map_err(|e| format!("Could not fetch authorities at {:?}: {}", parent_hash, e))?;
let is_descendent_of = sc_client_api::utils::is_descendent_of(&*self.client, None);
let mut authorities_cache = self.authorities.write();
authorities_cache
.import(
parent_hash,
number - 1u32.into(),
authorities.clone(),
&is_descendent_of,
)
.map_err(|e| {
format!("Could not import authorities for block {parent_hash:?} at number {}: {e}", number - 1u32.into())
})?;
Ok(authorities)
},
}
}

/// If there is an authorities change digest in the header, import it into the tracker.
pub fn import(&self, header: &B::Header) -> Result<(), String> {
if let Some(authorities_change) = find_authorities_change_digest::<B, P>(header) {
let hash = header.hash();
let number = *header.number();
log::debug!(
target: LOG_TARGET,
"Importing authorities change for block {:?} at number {} found in header digest",
hash,
number,
);
self.prune_finalized()?;
let is_descendent_of = sc_client_api::utils::is_descendent_of(&*self.client, None);
let mut authorities_cache = self.authorities.write();
authorities_cache
.import(hash, number, authorities_change, &is_descendent_of)
.map_err(|e| {
format!(
"Could not import authorities for block {hash:?} at number {number}: {e}"
)
})?;
}
Ok(())
}

fn prune_finalized(&self) -> Result<(), String> {
let is_descendent_of = sc_client_api::utils::is_descendent_of(&*self.client, None);
let info = self.client.info();
let mut authorities_cache = self.authorities.write();
let _pruned = authorities_cache
.prune(&info.finalized_hash, &info.finalized_number, &is_descendent_of, &|_| true)
.map_err(|e| e.to_string())?;
Ok(())
}
}

/// Extract the AURA authorities change digest from the given header, if it exists.
fn find_authorities_change_digest<B, P>(header: &B::Header) -> Option<Vec<AuthorityId<P>>>
where
B: Block,
P: Pair,
P::Public: Codec,
{
for log in header.digest().logs() {
log::trace!(target: LOG_TARGET, "Checking log {:?}, looking for authorities change digest.", log);
let log = log
.try_to::<ConsensusLog<AuthorityId<P>>>(OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID));
if let Some(ConsensusLog::AuthoritiesChange(authorities)) = log {
return Some(authorities);
}
}
None
Comment on lines +171 to +179
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for log in header.digest().logs() {
log::trace!(target: LOG_TARGET, "Checking log {:?}, looking for authorities change digest.", log);
let log = log
.try_to::<ConsensusLog<AuthorityId<P>>>(OpaqueDigestItemId::Consensus(&AURA_ENGINE_ID));
if let Some(ConsensusLog::AuthoritiesChange(authorities)) = log {
return Some(authorities);
}
}
None
header.digest().convert_first(|log|
log::trace!(target: LOG_TARGET, "Checking log {:?}, looking for authorities change digest.", log);
let (engine, log) = log.as_consensus()?
if engine != AURA_ENGINE_ID { return None; }
if let Some(ConsensusLog::AuthoritiesChange(authorities)) = ConsensusLog<AuthorityId<P>>>::decode_all(&mut &log[..] {
return Some(authorities);
}
}
None

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as_consensus is very important here. You could also extend

pub trait CompatibleDigestItem<Signature>: Sized {
to support the consensuslog.

}
Loading
Loading