Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions polkadot/node/network/collator-protocol/src/collator_side/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,10 +697,14 @@ fn has_assigned_cores(
false
}

/// Get a list of backing validators at all allowed relay parents.
/// If `pending_collation` is true, we will only return the validators
/// that have a collation pending.
fn list_of_backing_validators_in_view(
implicit_view: &Option<ImplicitView>,
per_relay_parent: &HashMap<Hash, PerRelayParent>,
para_id: ParaId,
pending_collation: bool,
) -> Vec<AuthorityDiscoveryId> {
let mut backing_validators = HashSet::new();
let Some(implicit_view) = implicit_view else { return vec![] };
Expand All @@ -711,7 +715,17 @@ fn list_of_backing_validators_in_view(
.unwrap_or_default();

for allowed_relay_parent in allowed_ancestry {
if let Some(relay_parent) = per_relay_parent.get(allowed_relay_parent) {
let Some(relay_parent) = per_relay_parent.get(allowed_relay_parent) else { continue };

if pending_collation {
// Check if there is any collation for this relay parent.
for collation_data in relay_parent.collations.values() {
let core_index = collation_data.core_index();
if let Some(group) = relay_parent.validator_group.get(core_index) {
backing_validators.extend(group.validators.iter().cloned());
}
}
} else {
for group in relay_parent.validator_group.values() {
backing_validators.extend(group.validators.iter().cloned());
}
Expand Down Expand Up @@ -743,7 +757,7 @@ async fn update_validator_connections<Context>(
// to the network bridge passing an empty list of validator ids. Otherwise, it will keep
// connecting to the last requested validators until a new request is issued.
let validator_ids = if cores_assigned {
list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id)
list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id, false)
} else {
Vec::new()
};
Expand All @@ -764,15 +778,18 @@ async fn update_validator_connections<Context>(
return
}

let validator_ids =
list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id, true);

gum::trace!(
target: LOG_TARGET,
"Disconnecting from validators: {:?}",
peer_ids.keys(),
"Keeping connections to validators with pending collations: {:?}",
validator_ids,
);

// Disconnect from all connected validators on the `Collation` protocol.
// Disconnect from all validators with no pending collations.
NetworkBridgeTxMessage::ConnectToValidators {
validator_ids: vec![],
validator_ids,
peer_set: PeerSet::Collation,
failed,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ async fn distribute_collation(
expected_connected: Vec<AuthorityDiscoveryId>,
test_state: &TestState,
relay_parent: Hash,
core_index: CoreIndex,
) -> DistributeCollation {
// Now we want to distribute a `PoVBlock`
let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) };
Expand All @@ -426,6 +427,7 @@ async fn distribute_collation(
para_id: test_state.para_id,
relay_parent,
pov_hash,
core_index,
..Default::default()
}
.build();
Expand Down Expand Up @@ -484,10 +486,9 @@ async fn expect_advertise_collation_msg(
virtual_overseer: &mut VirtualOverseer,
any_peers: &[PeerId],
expected_relay_parent: Hash,
expected_candidate_hashes: Vec<CandidateHash>,
mut expected_candidate_hashes: Vec<CandidateHash>,
) {
let mut candidate_hashes: HashSet<_> = expected_candidate_hashes.into_iter().collect();
let iter_num = candidate_hashes.len();
let iter_num = expected_candidate_hashes.len();

for _ in 0..iter_num {
assert_matches!(
Expand All @@ -511,10 +512,12 @@ async fn expect_advertise_collation_msg(
..
} => {
assert_eq!(relay_parent, expected_relay_parent);
assert!(candidate_hashes.contains(&candidate_hash));
assert!(expected_candidate_hashes.contains(&candidate_hash));

// Drop the hash we've already seen.
candidate_hashes.remove(&candidate_hash);
if let Some(pos) = expected_candidate_hashes.iter().position(|h| h == &candidate_hash) {
expected_candidate_hashes.remove(pos);
}
}
);
},
Expand Down Expand Up @@ -584,6 +587,7 @@ fn v1_protocol_rejected() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -644,6 +648,7 @@ fn advertise_and_send_collation() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -786,6 +791,7 @@ fn advertise_and_send_collation() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -848,6 +854,7 @@ fn delay_reputation_change() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1066,6 +1073,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1140,6 +1148,7 @@ fn collate_on_two_different_relay_chain_blocks() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand All @@ -1162,6 +1171,7 @@ fn collate_on_two_different_relay_chain_blocks() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1228,6 +1238,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1358,6 +1369,7 @@ where
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1519,6 +1531,7 @@ fn connect_to_group_in_view() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1604,6 +1617,7 @@ fn connect_to_group_in_view() {
expected_group,
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1652,6 +1666,7 @@ fn connect_with_no_cores_assigned() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1803,6 +1818,7 @@ fn distribute_collation_forces_connect() {
test_state.current_group_validator_authority_ids(),
&test_state,
test_state.relay_parent,
CoreIndex(0),
)
.await;

Expand Down Expand Up @@ -1911,7 +1927,7 @@ fn connect_advertise_disconnect_three_backing_groups() {
let validator_peer_ids: Vec<_> =
(0..expected_validators.len()).map(|_| PeerId::random()).sorted().collect();

for (auth_id, peer_id) in expected_validators.iter().zip(validator_peer_ids.iter()) {
for (peer_id, auth_id) in validator_peer_ids.iter().zip(expected_validators.iter()) {
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::NetworkBridgeUpdate(
Expand All @@ -1926,32 +1942,85 @@ fn connect_advertise_disconnect_three_backing_groups() {
.await;
}

// Expect collation advertisement for each validator
// Expect declare messages for each validator
for peer_id in validator_peer_ids.iter() {
expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await;
}

// Distribute collations for first 2 cores
let mut candidate_hashes = HashMap::new();
for core_idx in [0, 1] {
let DistributeCollation { candidate, .. } = distribute_collation(
&mut virtual_overseer,
expected_validators.clone(),
&test_state,
test_state.relay_parent,
CoreIndex(core_idx),
)
.await;

// Add the same candidate hash twice we remove them once per validator.
candidate_hashes.insert(core_idx as usize, vec![candidate.hash(); 2]);
}

// Send peer view changes for all validators to trigger advertisements
for peer_id in validator_peer_ids.iter() {
send_peer_view_change(
&mut virtual_overseer,
peer_id,
vec![test_state.relay_parent],
)
.await;
}

// Expect advertisements for 2 collations to each validator
for (idx, peer_ids) in
validator_peer_ids.iter().take(4).chunks(2).into_iter().enumerate()
{
let peer_ids_vec: Vec<PeerId> = peer_ids.copied().collect();
expect_advertise_collation_msg(
&mut virtual_overseer,
&peer_ids_vec,
test_state.relay_parent,
candidate_hashes[&idx].clone(),
)
.await;
}

// Send the disconnect message
overseer_send(
&mut virtual_overseer,
CollatorProtocolMessage::DisconnectFromBackingGroups,
)
.await;

// Expect a DisconnectPeers for all connected validators
// We should disconnect from validator of core 2, but keep the other validators
// connected
assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToValidators{
validator_ids,
mut validator_ids,
peer_set,
failed: _,
}) => {
// We should disconnect from all validators we were connected to
assert_eq!(validator_ids, vec![], "Expected to disconnect from all validators");
let mut expected: Vec<_> = expected_validators.into_iter().take(4).collect();
validator_ids.sort();
expected.sort();
assert_eq!(validator_ids, expected, "Expected to disconnect validator assigned to core 2");
assert_eq!(peer_set, PeerSet::Collation);
}
);

// Update view and expect connections to all validators to be dropped.
update_view(
Some(vec![]),
&test_state,
&mut virtual_overseer,
vec![(Hash::random(), 11)],
1,
)
.await;

TestHarness { virtual_overseer, req_v2_cfg: req_cfg }
},
);
Expand Down
9 changes: 9 additions & 0 deletions prdoc/pr_10446.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
title: 'collator-protocol: pre-connect fix'
doc:
- audience: Node Dev
description: |-
Keep the connections to backers open until the relay parent of the advertised
collation expires.
crates:
- name: polkadot-collator-protocol
bump: patch
Loading