Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 114 additions & 18 deletions vm/devices/net/netvsp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,19 @@ const LINK_DELAY_DURATION: Duration = Duration::from_secs(5);
#[cfg(test)]
const LINK_DELAY_DURATION: Duration = Duration::from_millis(333);

#[derive(PartialEq)]
enum CoordinatorMessage {
#[derive(Default, PartialEq)]
struct CoordinatorMessageUpdateType {
/// Update guest VF state based on current availability and the guest VF state tracked by the primary channel.
/// This includes adding the guest VF device and switching the data path.
UpdateGuestVfState,
guest_vf_state: bool,
/// Update the receive filter for all channels.
filter_state: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e., maybe this can be Option<u32> so that we can store the packet filter in the coordinator itself.

}

#[derive(PartialEq)]
enum CoordinatorMessage {
/// Update network state.
Update(CoordinatorMessageUpdateType),
/// Restart endpoints and resume processing. This will also attempt to set VF and data path state to match current
/// expectations.
Restart,
Expand Down Expand Up @@ -382,6 +390,7 @@ struct NetChannel<T: RingMem> {
pending_send_size: usize,
restart: Option<CoordinatorMessage>,
can_use_ring_size_opt: bool,
packet_filter: u32,
}

/// Buffers used during packet processing.
Expand Down Expand Up @@ -1364,6 +1373,7 @@ impl Nic {
pending_send_size: 0,
restart: None,
can_use_ring_size_opt,
packet_filter: rndisprot::NDIS_PACKET_TYPE_NONE,
},
state,
coordinator_send: self.coordinator_send.clone().unwrap(),
Expand Down Expand Up @@ -1453,6 +1463,7 @@ impl Nic {
mut control: RestoreControl<'_>,
state: saved_state::SavedState,
) -> Result<(), NetRestoreError> {
let mut saved_packet_filter = 0u32;
if let Some(state) = state.open {
let open = match &state.primary {
saved_state::Primary::Version => vec![true],
Expand Down Expand Up @@ -1537,8 +1548,12 @@ impl Nic {
tx_spread_sent,
guest_link_down,
pending_link_action,
packet_filter,
} = ready;

// If saved state does not have a packet filter set, default to directed, multicast, and broadcast.
saved_packet_filter = packet_filter.unwrap_or(rndisprot::NPROTO_PACKET_FILTER);

let version = check_version(version)
.ok_or(NetRestoreError::UnsupportedVersion(version))?;

Expand Down Expand Up @@ -1621,6 +1636,11 @@ impl Nic {
self.insert_worker(channel_idx as u16, &request.unwrap(), state, false)?;
}
}
for worker in self.coordinator.state_mut().unwrap().workers.iter_mut() {
if let Some(worker_state) = worker.state_mut() {
worker_state.channel.packet_filter = saved_packet_filter;
}
}
} else {
control
.restore(&[false])
Expand Down Expand Up @@ -1781,6 +1801,11 @@ impl Nic {
PrimaryChannelGuestVfState::Restoring(saved_state) => saved_state,
};

let worker_0_packet_filter = coordinator.workers[0]
.state()
.unwrap()
.channel
.packet_filter;
saved_state::Primary::Ready(saved_state::ReadyPrimary {
version: ready.buffers.version as u32,
receive_buffer: ready.buffers.recv_buffer.saved_state(),
Expand Down Expand Up @@ -1810,6 +1835,7 @@ impl Nic {
tx_spread_sent: primary.tx_spread_sent,
guest_link_down: !primary.guest_link_up,
pending_link_action,
packet_filter: Some(worker_0_packet_filter),
})
}
};
Expand Down Expand Up @@ -2593,7 +2619,12 @@ impl<T: RingMem> NetChannel<T> {
if primary.rndis_state == RndisState::Operational {
if self.guest_vf_is_available(Some(vfid), buffers.version, buffers.ndis_config)? {
primary.guest_vf_state = PrimaryChannelGuestVfState::AvailableAdvertised;
return Ok(Some(CoordinatorMessage::UpdateGuestVfState));
return Ok(Some(CoordinatorMessage::Update(
CoordinatorMessageUpdateType {
guest_vf_state: true,
..Default::default()
},
)));
} else if let Some(true) = primary.is_data_path_switched {
tracing::error!(
"Data path switched, but current guest negotiation does not support VTL0 VF"
Expand Down Expand Up @@ -2733,10 +2764,7 @@ impl<T: RingMem> NetChannel<T> {
// flag on inband packets and won't send a completion
// packet.
primary.guest_vf_state = PrimaryChannelGuestVfState::AvailableAdvertised;
// restart will also add the VF based on the guest_vf_state
if self.restart.is_none() {
self.restart = Some(CoordinatorMessage::UpdateGuestVfState);
}
self.send_coordinator_update_vf();
} else if let Some(true) = primary.is_data_path_switched {
tracing::error!(
"Data path switched, but current guest negotiation does not support VTL0 VF"
Expand Down Expand Up @@ -2784,12 +2812,18 @@ impl<T: RingMem> NetChannel<T> {
tracing::trace!(?request, "handling control message MESSAGE_TYPE_SET_MSG");

let status = match self.adapter.handle_oid_set(primary, request.oid, reader) {
Ok(restart_endpoint) => {
Ok((restart_endpoint, packet_filter)) => {
// Restart the endpoint if the OID changed some critical
// endpoint property.
if restart_endpoint {
self.restart = Some(CoordinatorMessage::Restart);
}
if let Some(filter) = packet_filter {
if self.packet_filter != filter {
self.packet_filter = filter;
self.send_coordinator_update_filter();
}
}
rndisprot::STATUS_SUCCESS
}
Err(err) => {
Expand Down Expand Up @@ -2973,6 +3007,31 @@ impl<T: RingMem> NetChannel<T> {
}
Ok(())
}

fn send_coordinator_update_message(&mut self, guest_vf: bool, packet_filter: bool) {
if self.restart.is_none() {
self.restart = Some(CoordinatorMessage::Update(CoordinatorMessageUpdateType {
guest_vf_state: guest_vf,
filter_state: packet_filter,
}));
} else if let Some(CoordinatorMessage::Restart) = self.restart {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this would have to be updated, too, to store the new packet filter.

// If a restart message is pending, do nothing.
// A restart will try to switch the data path based on primary.guest_vf_state.
// A restart will apply packet filter changes.
} else if let Some(CoordinatorMessage::Update(ref mut update)) = self.restart {
// Add the new update to the existing message.
update.guest_vf_state |= guest_vf;
update.filter_state |= packet_filter;
}
}

fn send_coordinator_update_vf(&mut self) {
self.send_coordinator_update_message(true, false);
}

fn send_coordinator_update_filter(&mut self) {
self.send_coordinator_update_message(false, true);
}
}

/// Writes an RNDIS message to `writer`.
Expand Down Expand Up @@ -3290,13 +3349,14 @@ impl Adapter {
primary: &mut PrimaryChannelState,
oid: rndisprot::Oid,
reader: impl MemoryRead + Clone,
) -> Result<bool, OidError> {
) -> Result<(bool, Option<u32>), OidError> {
tracing::debug!(?oid, "oid set");

let mut restart_endpoint = false;
let mut packet_filter = None;
match oid {
rndisprot::Oid::OID_GEN_CURRENT_PACKET_FILTER => {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we support reading the filter value back?

// TODO
packet_filter = self.oid_set_packet_filter(reader)?;
}
rndisprot::Oid::OID_TCP_OFFLOAD_PARAMETERS => {
self.oid_set_offload_parameters(reader, primary)?;
Expand All @@ -3323,7 +3383,7 @@ impl Adapter {
return Err(OidError::UnknownOid);
}
}
Ok(restart_endpoint)
Ok((restart_endpoint, packet_filter))
}

fn oid_set_rss_parameters(
Expand Down Expand Up @@ -3381,6 +3441,15 @@ impl Adapter {
Ok(())
}

fn oid_set_packet_filter(
&self,
reader: impl MemoryRead + Clone,
) -> Result<Option<u32>, OidError> {
let filter: rndisprot::RndisPacketFilterOidValue = reader.clone().read_plain()?;
tracing::debug!(filter, "set packet filter");
Ok(Some(filter))
}

fn oid_set_offload_parameters(
&self,
reader: impl MemoryRead + Clone,
Expand Down Expand Up @@ -3871,8 +3940,26 @@ impl Coordinator {
}
sleep_duration = None;
}
Message::Internal(CoordinatorMessage::UpdateGuestVfState) => {
self.update_guest_vf_state(state).await;
Message::Internal(CoordinatorMessage::Update(update_type)) => {
if update_type.filter_state {
self.stop_workers().await;
let worker_0_packet_filter =
self.workers[0].state().unwrap().channel.packet_filter;
self.workers.iter_mut().skip(1).for_each(|worker| {
if let Some(state) = worker.state_mut() {
state.channel.packet_filter = worker_0_packet_filter;
tracing::debug!(
packet_filter = ?worker_0_packet_filter,
channel_idx = state.channel_idx,
"update packet filter"
);
}
});
}

if update_type.guest_vf_state {
self.update_guest_vf_state(state).await;
}
}
Message::UpdateFromEndpoint(EndpointAction::RestartRequired) => self.restart = true,
Message::UpdateFromEndpoint(EndpointAction::LinkStatusNotify(connect)) => {
Expand Down Expand Up @@ -4315,13 +4402,18 @@ impl Coordinator {
self.num_queues = num_queues;
}

let worker_0_packet_filter = self.workers[0].state().unwrap().channel.packet_filter;
// Provide the queue and receive buffer ranges for each worker.
for ((worker, queue), rx_buffer) in self.workers.iter_mut().zip(queues).zip(rx_buffers) {
worker.task_mut().queue_state = Some(QueueState {
queue,
target_vp_set: false,
rx_buffer_range: rx_buffer,
});
// Update the receive packet filter for the subchannel worker.
if let Some(worker) = worker.state_mut() {
worker.channel.packet_filter = worker_0_packet_filter;
}
}

Ok(())
Expand Down Expand Up @@ -4929,6 +5021,13 @@ impl<T: 'static + RingMem> NetChannel<T> {
data: &mut ProcessingData,
epqueue: &mut dyn net_backend::Queue,
) -> Result<bool, WorkerError> {
if self.packet_filter == rndisprot::NDIS_PACKET_TYPE_NONE {
tracing::trace!(
packet_filter = self.packet_filter,
"rx packet not processed"
);
return Ok(false);
}
let n = epqueue
.rx_poll(&mut data.rx_ready)
.map_err(WorkerError::Endpoint)?;
Expand Down Expand Up @@ -5071,10 +5170,7 @@ impl<T: 'static + RingMem> NetChannel<T> {
_ => (),
};
if queue_switch_operation {
// A restart will also try to switch the data path based on primary.guest_vf_state.
if self.restart.is_none() {
self.restart = Some(CoordinatorMessage::UpdateGuestVfState)
};
self.send_coordinator_update_vf();
} else {
self.send_completion(transaction_id, &[])?;
}
Expand Down
12 changes: 12 additions & 0 deletions vm/devices/net/netvsp/src/rndisprot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,7 @@ open_enum! {
DEFAULT = 0x80,
RSS_CAPABILITIES = 0x88,
RSS_PARAMETERS = 0x89,
OID_REQUEST = 0x96,
OFFLOAD = 0xA7,
OFFLOAD_ENCAPSULATION = 0xA8,
}
Expand Down Expand Up @@ -1082,3 +1083,14 @@ open_enum! {
BINARY = 4,
}
}

pub type RndisPacketFilterOidValue = u32;

// Rndis Packet Filter Flags (OID_GEN_CURRENT_PACKET_FILTER)
pub const NDIS_PACKET_TYPE_NONE: u32 = 0x00000000;
pub const NDIS_PACKET_TYPE_DIRECTED: u32 = 0x00000001;
pub const NDIS_PACKET_TYPE_MULTICAST: u32 = 0x00000002;
pub const NDIS_PACKET_TYPE_ALL_MULTICAST: u32 = 0x00000004;
pub const NDIS_PACKET_TYPE_BROADCAST: u32 = 0x00000008;
pub const NPROTO_PACKET_FILTER: u32 =
NDIS_PACKET_TYPE_DIRECTED | NDIS_PACKET_TYPE_ALL_MULTICAST | NDIS_PACKET_TYPE_BROADCAST;
2 changes: 2 additions & 0 deletions vm/devices/net/netvsp/src/saved_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ pub struct ReadyPrimary {
pub guest_link_down: bool,
#[mesh(15)]
pub pending_link_action: Option<bool>,
#[mesh(16)]
pub packet_filter: Option<u32>,
}

#[derive(Debug, Protobuf)]
Expand Down
Loading
Loading