diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 97c95d1b06ad1..4ecd36f69203a 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -697,10 +697,14 @@ fn has_assigned_cores( false } +/// Get a list of backing validators at all allowed relay parents. +/// If `pending_collation` is true, we will only return the validators +/// that have a collation pending. fn list_of_backing_validators_in_view( implicit_view: &Option, per_relay_parent: &HashMap, para_id: ParaId, + pending_collation: bool, ) -> Vec { let mut backing_validators = HashSet::new(); let Some(implicit_view) = implicit_view else { return vec![] }; @@ -711,7 +715,17 @@ fn list_of_backing_validators_in_view( .unwrap_or_default(); for allowed_relay_parent in allowed_ancestry { - if let Some(relay_parent) = per_relay_parent.get(allowed_relay_parent) { + let Some(relay_parent) = per_relay_parent.get(allowed_relay_parent) else { continue }; + + if pending_collation { + // Check if there is any collation for this relay parent. + for collation_data in relay_parent.collations.values() { + let core_index = collation_data.core_index(); + if let Some(group) = relay_parent.validator_group.get(core_index) { + backing_validators.extend(group.validators.iter().cloned()); + } + } + } else { for group in relay_parent.validator_group.values() { backing_validators.extend(group.validators.iter().cloned()); } @@ -743,7 +757,7 @@ async fn update_validator_connections( // to the network bridge passing an empty list of validator ids. Otherwise, it will keep // connecting to the last requested validators until a new request is issued. let validator_ids = if cores_assigned { - list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id) + list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id, false) } else { Vec::new() }; @@ -764,15 +778,18 @@ async fn update_validator_connections( return } + let validator_ids = + list_of_backing_validators_in_view(implicit_view, per_relay_parent, para_id, true); + gum::trace!( target: LOG_TARGET, - "Disconnecting from validators: {:?}", - peer_ids.keys(), + "Keeping connections to validators with pending collations: {:?}", + validator_ids, ); - // Disconnect from all connected validators on the `Collation` protocol. + // Disconnect from all validators with no pending collations. NetworkBridgeTxMessage::ConnectToValidators { - validator_ids: vec![], + validator_ids, peer_set: PeerSet::Collation, failed, } diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 6256cd6614f42..4acddbcce100b 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -415,6 +415,7 @@ async fn distribute_collation( expected_connected: Vec, test_state: &TestState, relay_parent: Hash, + core_index: CoreIndex, ) -> DistributeCollation { // Now we want to distribute a `PoVBlock` let pov_block = PoV { block_data: BlockData(vec![42, 43, 44]) }; @@ -426,6 +427,7 @@ async fn distribute_collation( para_id: test_state.para_id, relay_parent, pov_hash, + core_index, ..Default::default() } .build(); @@ -484,10 +486,9 @@ async fn expect_advertise_collation_msg( virtual_overseer: &mut VirtualOverseer, any_peers: &[PeerId], expected_relay_parent: Hash, - expected_candidate_hashes: Vec, + mut expected_candidate_hashes: Vec, ) { - let mut candidate_hashes: HashSet<_> = expected_candidate_hashes.into_iter().collect(); - let iter_num = candidate_hashes.len(); + let iter_num = expected_candidate_hashes.len(); for _ in 0..iter_num { assert_matches!( @@ -511,10 +512,12 @@ async fn expect_advertise_collation_msg( .. } => { assert_eq!(relay_parent, expected_relay_parent); - assert!(candidate_hashes.contains(&candidate_hash)); + assert!(expected_candidate_hashes.contains(&candidate_hash)); // Drop the hash we've already seen. - candidate_hashes.remove(&candidate_hash); + if let Some(pos) = expected_candidate_hashes.iter().position(|h| h == &candidate_hash) { + expected_candidate_hashes.remove(pos); + } } ); }, @@ -584,6 +587,7 @@ fn v1_protocol_rejected() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -644,6 +648,7 @@ fn advertise_and_send_collation() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -786,6 +791,7 @@ fn advertise_and_send_collation() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -848,6 +854,7 @@ fn delay_reputation_change() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1066,6 +1073,7 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1140,6 +1148,7 @@ fn collate_on_two_different_relay_chain_blocks() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1162,6 +1171,7 @@ fn collate_on_two_different_relay_chain_blocks() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1228,6 +1238,7 @@ fn validator_reconnect_does_not_advertise_a_second_time() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1358,6 +1369,7 @@ where test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1519,6 +1531,7 @@ fn connect_to_group_in_view() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1604,6 +1617,7 @@ fn connect_to_group_in_view() { expected_group, &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1652,6 +1666,7 @@ fn connect_with_no_cores_assigned() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1803,6 +1818,7 @@ fn distribute_collation_forces_connect() { test_state.current_group_validator_authority_ids(), &test_state, test_state.relay_parent, + CoreIndex(0), ) .await; @@ -1911,7 +1927,7 @@ fn connect_advertise_disconnect_three_backing_groups() { let validator_peer_ids: Vec<_> = (0..expected_validators.len()).map(|_| PeerId::random()).sorted().collect(); - for (auth_id, peer_id) in expected_validators.iter().zip(validator_peer_ids.iter()) { + for (peer_id, auth_id) in validator_peer_ids.iter().zip(expected_validators.iter()) { overseer_send( &mut virtual_overseer, CollatorProtocolMessage::NetworkBridgeUpdate( @@ -1926,11 +1942,51 @@ fn connect_advertise_disconnect_three_backing_groups() { .await; } - // Expect collation advertisement for each validator + // Expect declare messages for each validator for peer_id in validator_peer_ids.iter() { expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await; } + // Distribute collations for first 2 cores + let mut candidate_hashes = HashMap::new(); + for core_idx in [0, 1] { + let DistributeCollation { candidate, .. } = distribute_collation( + &mut virtual_overseer, + expected_validators.clone(), + &test_state, + test_state.relay_parent, + CoreIndex(core_idx), + ) + .await; + + // Add the same candidate hash twice we remove them once per validator. + candidate_hashes.insert(core_idx as usize, vec![candidate.hash(); 2]); + } + + // Send peer view changes for all validators to trigger advertisements + for peer_id in validator_peer_ids.iter() { + send_peer_view_change( + &mut virtual_overseer, + peer_id, + vec![test_state.relay_parent], + ) + .await; + } + + // Expect advertisements for 2 collations to each validator + for (idx, peer_ids) in + validator_peer_ids.iter().take(4).chunks(2).into_iter().enumerate() + { + let peer_ids_vec: Vec = peer_ids.copied().collect(); + expect_advertise_collation_msg( + &mut virtual_overseer, + &peer_ids_vec, + test_state.relay_parent, + candidate_hashes[&idx].clone(), + ) + .await; + } + // Send the disconnect message overseer_send( &mut virtual_overseer, @@ -1938,20 +1994,33 @@ fn connect_advertise_disconnect_three_backing_groups() { ) .await; - // Expect a DisconnectPeers for all connected validators + // We should disconnect from validator of core 2, but keep the other validators + // connected assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToValidators{ - validator_ids, + mut validator_ids, peer_set, failed: _, }) => { - // We should disconnect from all validators we were connected to - assert_eq!(validator_ids, vec![], "Expected to disconnect from all validators"); + let mut expected: Vec<_> = expected_validators.into_iter().take(4).collect(); + validator_ids.sort(); + expected.sort(); + assert_eq!(validator_ids, expected, "Expected to disconnect validator assigned to core 2"); assert_eq!(peer_set, PeerSet::Collation); } ); + // Update view and expect connections to all validators to be dropped. + update_view( + Some(vec![]), + &test_state, + &mut virtual_overseer, + vec![(Hash::random(), 11)], + 1, + ) + .await; + TestHarness { virtual_overseer, req_v2_cfg: req_cfg } }, ); diff --git a/prdoc/pr_10446.prdoc b/prdoc/pr_10446.prdoc new file mode 100644 index 0000000000000..c517067bee399 --- /dev/null +++ b/prdoc/pr_10446.prdoc @@ -0,0 +1,9 @@ +title: 'collator-protocol: pre-connect fix' +doc: +- audience: Node Dev + description: |- + Keep the connections to backers open until the relay parent of the advertised + collation expires. +crates: +- name: polkadot-collator-protocol + bump: patch