Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion openhcl/underhill_core/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ pub(crate) struct LoadedVm {
pub host_vmbus_relay: Option<VmbusRelayHandle>,
// channels are revoked when dropped, so make sure to keep them alive
pub _vmbus_devices: Vec<SpawnedUnit<ChannelUnit<dyn VmbusDevice>>>,
pub _vmbus_intercept_devices: Vec<mesh::OneshotSender<()>>,
pub _ide_accel_devices: Vec<SpawnedUnit<ChannelUnit<storvsp::StorageDevice>>>,
pub network_settings: Option<Box<dyn LoadedVmNetworkSettings>>,
pub shutdown_relay: Option<(
Expand Down
6 changes: 1 addition & 5 deletions openhcl/underhill_core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3062,8 +3062,6 @@ async fn new_underhill_vm(
);
}

let mut vmbus_intercept_devices = Vec::new();

let shutdown_relay = if let Some(recv) = intercepted_shutdown_ic {
let mut shutdown_guest = ShutdownGuestIc::new();
let recv_host_shutdown = shutdown_guest.get_shutdown_notifier();
Expand All @@ -3089,8 +3087,7 @@ async fn new_underhill_vm(
.context("shutdown relay dma client")?,
shutdown_guest,
)?;
vmbus_intercept_devices.push(shutdown_guest.detach(driver_source.simple(), recv)?);

shutdown_guest.detach(driver_source.simple(), recv)?;
Some((recv_host_shutdown, send_guest_shutdown))
} else {
None
Expand Down Expand Up @@ -3218,7 +3215,6 @@ async fn new_underhill_vm(
vmbus_server,
host_vmbus_relay,
_vmbus_devices: vmbus_devices,
_vmbus_intercept_devices: vmbus_intercept_devices,
_ide_accel_devices: ide_accel_devices,
network_settings,
shutdown_relay,
Expand Down
8 changes: 4 additions & 4 deletions vm/devices/vmbus/vmbus_relay/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,10 +647,6 @@ impl RelayTask {
async fn handle_offer(&mut self, offer: client::OfferInfo) -> Result<()> {
let channel_id = offer.offer.channel_id.0;

if self.channels.contains_key(&ChannelId(channel_id)) {
anyhow::bail!("channel {channel_id} already exists");
}

if let Some(intercept) = self.intercept_channels.get(&offer.offer.instance_id) {
self.channels.insert(
ChannelId(channel_id),
Expand All @@ -660,6 +656,10 @@ impl RelayTask {
return Ok(());
}

if self.channels.contains_key(&ChannelId(channel_id)) {
anyhow::bail!("channel {channel_id} already exists");
}

// Used to Recv requests from the server.
let (request_send, request_recv) = mesh::channel();
// Used to Send responses from the server
Expand Down
83 changes: 52 additions & 31 deletions vm/devices/vmbus/vmbus_relay_intercept_device/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,36 @@ impl<T: SimpleVmbusClientDeviceAsync> SimpleVmbusClientDeviceWrapper<T> {
mut self,
driver: impl SpawnDriver,
recv_relay: mesh::Receiver<InterceptChannelRequest>,
) -> Result<mesh::OneshotSender<()>> {
) -> Result<()> {
let (send_disconnected, recv_disconnected) = mesh::oneshot();
self.vmbus_listener.insert(
&self.spawner,
format!("{}", self.instance_id),
SimpleVmbusClientDeviceTaskState {
offer: None,
recv_relay,
send_disconnected: Some(send_disconnected),
vtl_pages: None,
},
);
let (driver_send, driver_recv) = mesh::oneshot();
driver
.spawn(
format!("vmbus_relay_device {}", self.instance_id),
async move {
self.vmbus_listener.start();
let _ = driver_recv.await;
self.vmbus_listener.stop().await;
let _ = recv_disconnected.await;
assert!(!self.vmbus_listener.stop().await);
if self.vmbus_listener.state().unwrap().vtl_pages.is_some() {
// The VTL pages were not freed. This can occur if an
// error is hit that drops the vmbus parent tasks. Just
// pend here and let the outer error cause the VM to
// exit.
pending::<()>().await;
Comment on lines +190 to +194
Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using pending::<()>().await to intentionally hang the task is an unusual pattern. Consider adding a more explicit comment explaining why this infinite wait is the desired behavior, or using a more conventional approach like returning an error that would cause the VM to exit.

Suggested change
// The VTL pages were not freed. This can occur if an
// error is hit that drops the vmbus parent tasks. Just
// pend here and let the outer error cause the VM to
// exit.
pending::<()>().await;
// The VTL pages were not freed. This is an unrecoverable error.
// Return an error to trigger VM exit in a controlled manner.
anyhow::bail!("VTL pages were not freed; unrecoverable error in vmbus relay device");

Copilot uses AI. Check for mistakes.

}
},
)
.detach();
Ok(driver_send)
Ok(())
}
}

Expand All @@ -215,6 +223,8 @@ struct SimpleVmbusClientDeviceTaskState {
offer: Option<OfferInfo>,
#[inspect(skip)]
recv_relay: mesh::Receiver<InterceptChannelRequest>,
#[inspect(skip)]
send_disconnected: Option<mesh::OneshotSender<()>>,
#[inspect(hex, with = "|x| x.as_ref().map(|x| inspect::iter_by_index(x.pfns()))")]
vtl_pages: Option<MemoryBlock>,
}
Expand All @@ -234,7 +244,13 @@ impl<T: SimpleVmbusClientDeviceAsync> AsyncRun<SimpleVmbusClientDeviceTaskState>
stop: &mut StopTask<'_>,
state: &mut SimpleVmbusClientDeviceTaskState,
) -> Result<(), Cancelled> {
stop.until_stopped(self.process_messages(state)).await
stop.until_stopped(self.process_messages(state)).await?;
state
.send_disconnected
.take()
.expect("task should not be restarted")
.send(());
Ok(())
}
}

Expand Down Expand Up @@ -351,21 +367,27 @@ impl<T: SimpleVmbusClientDeviceAsync> SimpleVmbusClientDeviceTask<T> {
};

if state.vtl_pages.is_some() {
if let Err(err) = offer
match offer
.request_send
.call(
ChannelRequest::TeardownGpadl,
GpadlId(state.vtl_pages.as_ref().unwrap().pfns()[1] as u32),
)
.await
{
tracing::error!(
error = &err as &dyn std::error::Error,
"failed to teardown gpadl"
);
Ok(()) => {
state.vtl_pages = None;
}
Err(err) => {
// If the ring buffer pages are still in use by the host, which
// has to be assumed, the memory pages cannot be used again as
// they have been marked as visible to VTL0.
tracing::error!(
error = &err as &dyn std::error::Error,
"Failed to teardown gpadl -- leaking memory."
);
}
}

state.vtl_pages = None;
}
}

Expand Down Expand Up @@ -504,7 +526,7 @@ impl<T: SimpleVmbusClientDeviceAsync> SimpleVmbusClientDeviceTask<T> {

/// Responds to the channel being revoked by the host.
async fn handle_revoke(&mut self, state: &mut SimpleVmbusClientDeviceTaskState) {
let Some(offer) = state.offer.take() else {
let Some(offer) = state.offer.as_ref() else {
Copy link

Copilot AI Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change from state.offer.take() to state.offer.as_ref() means the offer is no longer consumed here, but it's still taken at line 538. This could lead to unexpected behavior if handle_revoke is called multiple times, as subsequent calls would still find an offer present.

Copilot uses AI. Check for mistakes.

return;
};
tracing::info!("device revoked");
Expand All @@ -513,6 +535,7 @@ impl<T: SimpleVmbusClientDeviceAsync> SimpleVmbusClientDeviceTask<T> {
self.device.task_mut().0.close(offer.offer.subchannel_index);
}
self.cleanup_device_resources(state).await;
drop(state.offer.take());
}

fn handle_save(&mut self) -> SavedStateBlob {
Expand Down Expand Up @@ -545,27 +568,25 @@ impl<T: SimpleVmbusClientDeviceAsync> SimpleVmbusClientDeviceTask<T> {
loop {
enum Event {
Request(InterceptChannelRequest),
Revoke(()),
Revoke,
}
let revoke = pin!(async {
if let Some(offer) = &mut state.offer {
(&mut offer.revoke_recv).await.ok();
} else {
pending().await
}
});
let Some(r) = (
(&mut state.recv_relay).map(Event::Request),
futures::stream::once(revoke).map(Event::Revoke),
)
.merge()
.next()
.await
else {
let r = if let Some(offer) = &mut state.offer {
(
(&mut state.recv_relay).map(Event::Request),
futures::stream::once(&mut offer.revoke_recv).map(|_| Event::Revoke),
)
.merge()
.next()
.await
} else {
let mut recv_relay = pin!(&mut state.recv_relay);
recv_relay.next().await.map(Event::Request)
};
let Some(r) = r else {
break;
};
match r {
Event::Revoke(()) => {
Event::Revoke => {
self.handle_revoke(state).await;
}
Event::Request(InterceptChannelRequest::Offer(offer)) => {
Expand Down