From 1624ba084016ae425a3d19ce2440eec013957a26 Mon Sep 17 00:00:00 2001 From: John Starks Date: Thu, 9 Jan 2025 23:44:28 +0000 Subject: [PATCH 1/3] vmbus: delay interrupt creation until open response Don't require the guest-to-host interrupt be provided at channel offer time. Wait until open time. This is more flexible, and in the future will allow us to remove some extra synchronziation in `vmbus_relay`. --- vm/devices/net/netvsp/src/test.rs | 49 +++--- vm/devices/vmbus/vmbus_channel/src/bus.rs | 16 +- vm/devices/vmbus/vmbus_channel/src/channel.rs | 60 +++---- vm/devices/vmbus/vmbus_channel/src/offer.rs | 14 +- vm/devices/vmbus/vmbus_relay/src/lib.rs | 148 ++++++++++-------- vm/devices/vmbus/vmbus_server/src/lib.rs | 142 +++++++++++------ .../vmbus_server/src/proxyintegration.rs | 25 +-- 7 files changed, 270 insertions(+), 184 deletions(-) diff --git a/vm/devices/net/netvsp/src/test.rs b/vm/devices/net/netvsp/src/test.rs index c7297a224..6d6dae911 100644 --- a/vm/devices/net/netvsp/src/test.rs +++ b/vm/devices/net/netvsp/src/test.rs @@ -45,6 +45,7 @@ use vmbus_channel::bus::OfferInput; use vmbus_channel::bus::OfferResources; use vmbus_channel::bus::OpenData; use vmbus_channel::bus::OpenRequest; +use vmbus_channel::bus::OpenResult; use vmbus_channel::bus::ParentBus; use vmbus_channel::channel::offer_channel; use vmbus_channel::channel::ChannelHandle; @@ -74,7 +75,7 @@ use zerocopy::FromZeroes; const VMNIC_CHANNEL_TYPE_GUID: Guid = Guid::from_static_str("f8615163-df3e-46c5-913f-f2d2f965ed0e"); enum ChannelResponse { - Open(bool), + Open(Option), Close, Gpadl(bool), // TeardownGpadl(GpadlId), @@ -432,21 +433,18 @@ impl TestNicDevice { .await .expect("open successful"); - if let ChannelResponse::Open(response) = open_response { - assert_eq!(response, true); - } else { + let ChannelResponse::Open(Some(result)) = open_response else { panic!("Unexpected return value"); - } + }; let mem = self.mock_vmbus.memory.clone(); - let guest_to_host_interrupt = self.offer_input.event.clone(); TestNicChannel::new( self, &mem, gpadl_map, ring_gpadl_id, host_to_guest_event, - guest_to_host_interrupt, + result.guest_to_host_interrupt, ) } @@ -478,7 +476,7 @@ impl TestNicDevice { next_avail_guest_page: usize, next_avail_gpadl_id: u32, host_to_guest_interrupt: Interrupt, - ) -> anyhow::Result<()> { + ) -> anyhow::Result> { // Restore the previous memory settings assert_eq!(self.next_avail_gpadl_id, 1); self.next_avail_gpadl_id = next_avail_gpadl_id; @@ -497,6 +495,7 @@ impl TestNicDevice { }) .collect::>)>>(); + let mut guest_to_host_interrupt = None; mesh::CancelContext::new() .with_timeout(Duration::from_millis(1000)) .until_cancelled(async { @@ -519,7 +518,8 @@ impl TestNicDevice { accepted: true, } }).collect::>(); - rpc.handle_sync(|_open| { + rpc.handle_sync(|open| { + guest_to_host_interrupt = open.map(|open| open.guest_to_host_interrupt); Ok(vmbus_channel::bus::RestoreResult { open_request: Some(OpenRequest { open_data: OpenData { @@ -545,7 +545,9 @@ impl TestNicDevice { } }) .await - .unwrap() + .unwrap()?; + + Ok(guest_to_host_interrupt) } } @@ -976,7 +978,6 @@ impl<'a> TestNicChannel<'a> { buffer: SavedStateBlob, ) -> anyhow::Result> { let mem = self.nic.mock_vmbus.memory.clone(); - let guest_to_host_interrupt = nic.offer_input.event.clone(); let host_to_guest_interrupt = { let event = self.host_to_guest_event.clone(); Interrupt::from_fn(move || event.signal()) @@ -986,27 +987,27 @@ impl<'a> TestNicChannel<'a> { let channel_id = self.channel_id; let next_avail_guest_page = self.nic.next_avail_guest_page; let next_avail_gpadl_id = self.nic.next_avail_gpadl_id; - let restored_channel = TestNicChannel::new( - nic, - &mem, - gpadl_map.clone(), - channel_id, - self.host_to_guest_event, - guest_to_host_interrupt, - ); - restored_channel - .nic + let guest_to_host_interrupt = nic .restore( buffer, - gpadl_map, + gpadl_map.clone(), channel_id, next_avail_guest_page, next_avail_gpadl_id, host_to_guest_interrupt, ) - .await?; - Ok(restored_channel) + .await? + .expect("should be open"); + + Ok(TestNicChannel::new( + nic, + &mem, + gpadl_map, + channel_id, + self.host_to_guest_event, + guest_to_host_interrupt, + )) } } diff --git a/vm/devices/vmbus/vmbus_channel/src/bus.rs b/vm/devices/vmbus/vmbus_channel/src/bus.rs index b7ed9659f..9fc3d0c7d 100644 --- a/vm/devices/vmbus/vmbus_channel/src/bus.rs +++ b/vm/devices/vmbus/vmbus_channel/src/bus.rs @@ -22,8 +22,6 @@ use vmcore::interrupt::Interrupt; pub struct OfferInput { /// Parameters describing the offer. pub params: OfferParams, - /// The event to signal when the guest needs attention. - pub event: Interrupt, /// A mesh channel to send channel-related requests to. pub request_send: mesh::Sender, /// A mesh channel to receive channel-related requests to. @@ -80,7 +78,7 @@ impl OfferResources { #[derive(Debug, MeshPayload)] pub enum ChannelRequest { /// Open the channel. - Open(Rpc), + Open(Rpc>), /// Close the channel. Close(Rpc<(), ()>), /// Create a new GPADL. @@ -91,6 +89,14 @@ pub enum ChannelRequest { Modify(Rpc), } +/// The successful result of an open request. +#[derive(Debug, MeshPayload)] +pub struct OpenResult { + /// The interrupt object vmbus should signal when the guest signals the + /// host. + pub guest_to_host_interrupt: Interrupt, +} + /// GPADL information from the guest. #[derive(Debug, MeshPayload)] pub struct GpadlRequest { @@ -117,8 +123,8 @@ pub enum ModifyRequest { pub enum ChannelServerRequest { /// A request to restore the channel. /// - /// The input parameter is whether the channel was saved open. - Restore(FailableRpc), + /// The input parameter provides the open result if the channel was saved open. + Restore(FailableRpc, RestoreResult>), /// A request to revoke the channel. /// /// A channel can also be revoked by dropping it. This request is only necessary if you need to diff --git a/vm/devices/vmbus/vmbus_channel/src/channel.rs b/vm/devices/vmbus/vmbus_channel/src/channel.rs index a3f67f289..9fd19cf66 100644 --- a/vm/devices/vmbus/vmbus_channel/src/channel.rs +++ b/vm/devices/vmbus/vmbus_channel/src/channel.rs @@ -10,6 +10,7 @@ use crate::bus::OfferInput; use crate::bus::OfferParams; use crate::bus::OfferResources; use crate::bus::OpenRequest; +use crate::bus::OpenResult; use crate::bus::ParentBus; use crate::gpadl::GpadlMap; use crate::gpadl::GpadlMapView; @@ -335,20 +336,19 @@ async fn offer_generic( let (state_req_send, state_req_recv) = mesh::channel(); let use_event = bus.use_event(); - let new_event = || { - if use_event { - Notify::from_event(Event::new()) - } else { - Notify::from_slim_event(Arc::new(SlimEvent::new())) - } - }; - let event = new_event(); - let subchannel_events: Vec<_> = (0..max_subchannels).map(|_| new_event()).collect(); + let events: Vec<_> = (0..max_subchannels + 1) + .map(|_| { + if use_event { + Notify::from_event(Event::new()) + } else { + Notify::from_slim_event(Arc::new(SlimEvent::new())) + } + }) + .collect(); let request = OfferInput { params: offer, - event: event.clone().interrupt(), request_send, server_request_recv, }; @@ -357,12 +357,12 @@ async fn offer_generic( let offer_result = bus.add_child(request).await?; - let mut resources = vec![ChannelResources { event }]; - for idx in 0..max_subchannels { - resources.push(ChannelResources { - event: subchannel_events[idx as usize].clone(), - }); - } + let resources = events + .iter() + .map(|event| ChannelResources { + event: event.clone(), + }) + .collect(); let (subchannel_enable_send, subchannel_enable_recv) = mesh::channel(); channel.install(DeviceResources { @@ -380,7 +380,7 @@ async fn offer_generic( let device = Device::new( request_recv, server_request_send, - subchannel_events, + events, gpadl_map, subchannel_enable_recv, ); @@ -446,7 +446,7 @@ struct Device { open: Vec, subchannel_gpadls: Vec>, requests: SelectAll>>, - subchannel_events: Vec, + events: Vec, gpadl_map: Arc, subchannel_enable_recv: mesh::Receiver, } @@ -455,7 +455,7 @@ impl Device { fn new( request_recv: mesh::Receiver, server_request_send: mesh::Sender, - subchannel_events: Vec, + events: Vec, gpadl_map: Arc, subchannel_enable_recv: mesh::Receiver, ) -> Self { @@ -469,7 +469,7 @@ impl Device { open, subchannel_gpadls, requests, - subchannel_events, + events, gpadl_map, subchannel_enable_recv, } @@ -585,7 +585,7 @@ impl Device { channel: &mut dyn VmbusDevice, channel_idx: usize, open_request: OpenRequest, - ) -> bool { + ) -> Option { assert!(!self.open[channel_idx]); // N.B. Any asynchronous GPADL requests will block while in // open(). This should be fine for all known devices. @@ -594,11 +594,13 @@ impl Device { error = error.as_ref() as &dyn std::error::Error, "failed to open channel" ); - false + None } else { - true + Some(OpenResult { + guest_to_host_interrupt: self.events[channel_idx].clone().interrupt(), + }) }; - self.open[channel_idx] = opened; + self.open[channel_idx] = opened.is_some(); opened } @@ -739,9 +741,6 @@ impl Device { subchannel_index: subchannel_idx as u16, ..offer.clone() }, - event: self.subchannel_events[subchannel_idx - 1] - .clone() - .interrupt(), request_send, server_request_recv, }; @@ -778,9 +777,12 @@ impl Device { .map_err(ChannelRestoreError::EnablingSubchannels)?; let mut results = Vec::with_capacity(states.len()); - for (channel_idx, open) in states.iter().copied().enumerate() { + for (channel_idx, (open, event)) in states.iter().copied().zip(&self.events).enumerate() { + let open_result = open.then(|| OpenResult { + guest_to_host_interrupt: event.clone().interrupt(), + }); let result = self.server_requests[channel_idx] - .call_failable(ChannelServerRequest::Restore, open) + .call_failable(ChannelServerRequest::Restore, open_result) .await .map_err(|err| ChannelRestoreError::RestoreError(err.into()))?; diff --git a/vm/devices/vmbus/vmbus_channel/src/offer.rs b/vm/devices/vmbus/vmbus_channel/src/offer.rs index 7241ccfb0..23aad419a 100644 --- a/vm/devices/vmbus/vmbus_channel/src/offer.rs +++ b/vm/devices/vmbus/vmbus_channel/src/offer.rs @@ -9,6 +9,7 @@ use crate::bus::OfferInput; use crate::bus::OfferParams; use crate::bus::OfferResources; use crate::bus::OpenRequest; +use crate::bus::OpenResult; use crate::bus::ParentBus; use crate::gpadl::GpadlMap; use crate::gpadl::GpadlMapView; @@ -70,7 +71,6 @@ impl Offer { let result = bus .add_child(OfferInput { params: offer_params, - event: Interrupt::from_event(event.clone()), request_send, server_request_recv, }) @@ -181,7 +181,9 @@ impl Offer { channel, gpadl_map: self.gpadl_map.clone(), }; - message.response.respond(true); + message.response.respond(Some(OpenResult { + guest_to_host_interrupt: Interrupt::from_event(self.event.event().unwrap().clone()), + })); Ok(resources) } @@ -220,18 +222,18 @@ struct OpenMessage { response: OpenResponse, } -struct OpenResponse(Option>); +struct OpenResponse(Option>>); impl OpenResponse { - fn respond(mut self, open: bool) { - self.0.take().unwrap().complete(open) + fn respond(mut self, result: Option) { + self.0.take().unwrap().complete(result) } } impl Drop for OpenResponse { fn drop(&mut self) { if let Some(rpc) = self.0.take() { - rpc.complete(false); + rpc.complete(None); } } } diff --git a/vm/devices/vmbus/vmbus_relay/src/lib.rs b/vm/devices/vmbus/vmbus_relay/src/lib.rs index 35258f866..372e6a4d7 100644 --- a/vm/devices/vmbus/vmbus_relay/src/lib.rs +++ b/vm/devices/vmbus/vmbus_relay/src/lib.rs @@ -43,6 +43,7 @@ use vmbus_channel::bus::ChannelServerRequest; use vmbus_channel::bus::GpadlRequest; use vmbus_channel::bus::ModifyRequest; use vmbus_channel::bus::OpenRequest; +use vmbus_channel::bus::OpenResult; use vmbus_client as client; use vmbus_client::VmbusClient; use vmbus_core::protocol; @@ -364,7 +365,10 @@ struct RelayChannelTask { impl RelayChannelTask { /// Relay open channel request from VTL0 to Host, responding with Open Result - async fn handle_open_channel(&mut self, open_request: &OpenRequest) -> Result { + async fn handle_open_channel( + &mut self, + open_request: &OpenRequest, + ) -> Result> { let mut open_data = open_request.open_data; // If the guest uses the channel bitmap, the host can't send interrupts @@ -385,7 +389,21 @@ impl RelayChannelTask { }); } - // Always relay guest-to-host interrupts. These can be generated here: + let flags = protocol::OpenChannelFlags::new().with_redirect_interrupt(redirect_interrupt); + let opened = self + .channel + .request_send + .call( + client::ChannelRequest::Open, + client::OpenRequest { open_data, flags }, + ) + .await?; + + if !opened { + return Ok(None); + } + + // Always relay guest-to-host interrupts. These can be generated when: // // * The guest is using the channel bitmap. // * The guest is using the MNF interface and this is implemented in the @@ -399,18 +417,38 @@ impl RelayChannelTask { .connection_id .store(open_data.connection_id, Ordering::SeqCst); - let flags = protocol::OpenChannelFlags::new().with_redirect_interrupt(redirect_interrupt); - - let opened = self - .channel - .request_send - .call( - client::ChannelRequest::Open, - client::OpenRequest { open_data, flags }, - ) - .await?; + Ok(Some(OpenResult { + guest_to_host_interrupt: self.guest_to_host_event(), + })) + } - Ok(opened) + fn guest_to_host_event(&self) -> Interrupt { + let synic = self.channel.synic.clone(); + let connection_id = self.channel.connection_id.clone(); + + Interrupt::from_fn(move || { + let connection_id = connection_id.load(Ordering::SeqCst); + // If a channel is forcibly closed by the host (during a + // revoke), the host interrupt can be disabled before the guest + // is aware the channel is closed. In this case, relaying the + // interrupt can fail, which is not a problem. For example, this + // is the case for an hvsocket channel when the VM gets paused. + // + // In cases were the channel this happened on is open and + // appears stuck, this could indicate a problem. + if connection_id != 0 { + if let Err(err) = synic.signal_event(connection_id, 0) { + tracelimit::info_ratelimited!( + error = &err as &dyn std::error::Error, + "interrupt relay failure, could be normal during channel close" + ); + } + } else { + // The channel close notification reached here but has not + // yet made it to the guest. This is expected. + tracing::debug!("interrupt relay request after close"); + } + }) } fn handle_close_channel(&mut self) { @@ -481,7 +519,7 @@ impl RelayChannelTask { rpc.handle(|open_request| async move { self.handle_open_channel(&open_request) .await - .unwrap_or(false) + .unwrap_or(None) }) .await; } @@ -836,34 +874,9 @@ impl RelayTask { }; let key = params.key(); - let synic = Arc::clone(&self.synic); let connection_id = Arc::new(AtomicU32::new(0)); - let host_connection_id = Arc::clone(&connection_id); let new_offer = OfferInfo { params, - event: Interrupt::from_fn(move || { - let connection_id = host_connection_id.load(Ordering::SeqCst); - // If a channel is forcibly closed by the host (during a - // revoke), the host interrupt can be disabled before the guest - // is aware the channel is closed. In this case, relaying the - // interrupt can fail, which is not a problem. For example, this - // is the case for an hvsocket channel when the VM gets paused. - // - // In cases were the channel this happened on is open and - // appears stuck, this could indicate a problem. - if connection_id != 0 { - if let Err(err) = synic.signal_event(connection_id, 0) { - tracelimit::info_ratelimited!( - error = &err as &dyn std::error::Error, - "interrupt relay failure, could be normal during channel close" - ); - } - } else { - // The channel close notification reached here but has not - // yet made it to the guest. This is expected. - tracing::debug!("interrupt relay request after close"); - } - }), request_send, server_request_recv, }; @@ -879,10 +892,31 @@ impl RelayTask { .await .with_context(|| format!("failed to offer relay channel {key}"))?; - let mut interrupt_relay = None; + let (relay_request_send, relay_request_recv) = mesh::channel(); + let mut channel_task = RelayChannelTask { + driver: Arc::clone(&self.spawner), + channel: RelayChannel { + channel_id: ChannelId(channel_id), + relay_request_recv, + request_send: offer.request_send, + response_recv: offer.response_recv, + server_request_recv: request_recv, + connection_id, + use_interrupt_relay: Arc::clone(&self.use_interrupt_relay), + synic: Arc::clone(&self.synic), + interrupt_relay: None, + gpadls_tearing_down: HashMap::new(), + }, + // New channels start out running. + running: true, + }; + if restore { + let open_result = open.then(|| OpenResult { + guest_to_host_interrupt: channel_task.guest_to_host_event(), + }); let result = server_request_send - .call(ChannelServerRequest::Restore, open) + .call(ChannelServerRequest::Restore, open_result) .await .context("Failed to send restore request")? .map_err(|err| { @@ -892,7 +926,7 @@ impl RelayTask { if let Some(request) = result.open_request { let use_interrupt_relay = self.use_interrupt_relay.load(Ordering::SeqCst); if use_interrupt_relay { - interrupt_relay = Some(InterruptRelay { + channel_task.channel.interrupt_relay = Some(InterruptRelay { event: RegisteredEvent::new_with_flag( self.spawner.as_ref(), Arc::clone(&self.synic), @@ -902,29 +936,19 @@ impl RelayTask { }); } - connection_id.store(request.open_data.connection_id, Ordering::SeqCst); + // TODO: save/restore this connection ID instead of getting it + // back from `vmbus_server`. This is fundamentally the + // connection ID that was registered with `vmbus_client`--it so + // happens that it matches the one `vmbus_server` assigns, but + // this isn't necessarily always going to be true for redirected + // interrupts. + channel_task + .channel + .connection_id + .store(request.open_data.connection_id, Ordering::SeqCst); } } - let (relay_request_send, relay_request_recv) = mesh::channel(); - let mut channel_task = RelayChannelTask { - driver: Arc::clone(&self.spawner), - channel: RelayChannel { - channel_id: ChannelId(channel_id), - relay_request_recv, - request_send: offer.request_send, - response_recv: offer.response_recv, - server_request_recv: request_recv, - connection_id, - use_interrupt_relay: Arc::clone(&self.use_interrupt_relay), - synic: Arc::clone(&self.synic), - interrupt_relay, - gpadls_tearing_down: HashMap::new(), - }, - // New channels start out running. - running: true, - }; - let _channel_worker = self.spawner.spawn("vmbus hcl channel worker", async move { channel_task.run().await }); diff --git a/vm/devices/vmbus/vmbus_server/src/lib.rs b/vm/devices/vmbus/vmbus_server/src/lib.rs index 74828c496..f71b0f46d 100644 --- a/vm/devices/vmbus/vmbus_server/src/lib.rs +++ b/vm/devices/vmbus/vmbus_server/src/lib.rs @@ -58,6 +58,7 @@ use vmbus_channel::bus::OfferKey; use vmbus_channel::bus::OfferResources; use vmbus_channel::bus::OpenData; use vmbus_channel::bus::OpenRequest; +use vmbus_channel::bus::OpenResult; use vmbus_channel::bus::ParentBus; use vmbus_channel::bus::RestoreResult; use vmbus_channel::gpadl::GpadlMap; @@ -198,7 +199,6 @@ enum VmbusRequest { #[derive(mesh::MeshPayload, Debug)] pub struct OfferInfo { pub params: OfferParamsInternal, - pub event: Interrupt, pub request_send: mesh::Sender, pub server_request_recv: mesh::Receiver, } @@ -633,7 +633,7 @@ struct ServerTaskInner { #[derive(Debug)] enum ChannelResponse { - Open(bool), + Open(Option), Close, Gpadl(GpadlId, bool), TeardownGpadl(GpadlId), @@ -646,18 +646,23 @@ struct Channel { seq: u64, state: ChannelState, gpadls: Arc, - guest_to_host_event: Arc, guest_event_port: Box, flags: protocol::OfferFlags, } enum ChannelState { Closed, + Opening { + open_params: OpenParams, + monitor: Option>, + host_to_guest_interrupt: Interrupt, + }, Open { open_params: OpenParams, _event_port: Box, monitor: Option>, host_to_guest_interrupt: Interrupt, + guest_to_host_event: Arc, }, } @@ -692,7 +697,6 @@ impl ServerTask { send: info.request_send, state: ChannelState::Closed, gpadls: GpadlMap::new(), - guest_to_host_event: Arc::new(ChannelEvent(info.event)), guest_event_port, seq: id, flags, @@ -732,7 +736,7 @@ impl ServerTask { if let Some(channel) = channel { match response { Ok(response) => match response { - ChannelResponse::Open(ok) => self.handle_open(offer_id, ok), + ChannelResponse::Open(result) => self.handle_open(offer_id, result), ChannelResponse::Close => self.handle_close(offer_id), ChannelResponse::Gpadl(gpadl_id, ok) => { self.handle_gpadl_create(offer_id, gpadl_id, ok) @@ -755,18 +759,13 @@ impl ServerTask { } } - fn handle_open(&mut self, offer_id: OfferId, ok: bool) { - let status = if ok { + fn handle_open(&mut self, offer_id: OfferId, result: Option) { + let status = if result.is_some() { 0 } else { - let channel = self - .inner - .channels - .get_mut(&offer_id) - .expect("channel still exists"); - channel.state = ChannelState::Closed; protocol::STATUS_UNSUCCESSFUL }; + self.inner.complete_open(offer_id, result); self.server .with_notifier(&mut self.inner) .open_complete(offer_id, status); @@ -806,13 +805,13 @@ impl ServerTask { fn handle_restore_channel( &mut self, offer_id: OfferId, - open: bool, + open: Option, ) -> anyhow::Result { let gpadls = self.server.channel_gpadls(offer_id); let params = self .server .with_notifier(&mut self.inner) - .restore_channel(offer_id, open)?; + .restore_channel(offer_id, open.is_some())?; let channel = self.inner.channels.get_mut(&offer_id).unwrap(); for gpadl in &gpadls { @@ -823,8 +822,9 @@ impl ServerTask { } } - let open_request = params.map(|params| { - let (channel, interrupt) = self.inner.open_channel(offer_id, ¶ms); + let open_request = params.zip(open).map(|(params, result)| { + let (_, interrupt) = self.inner.open_channel(offer_id, ¶ms); + let channel = self.inner.complete_open(offer_id, Some(result)); OpenRequest::new( params.open_data, interrupt, @@ -1058,12 +1058,13 @@ impl ServerTask { if let ChannelState::Open { open_params, host_to_guest_interrupt, + guest_to_host_event, .. } = &channel.state { if force { tracing::info!(channel = %channel.key, "waking host and guest"); - channel.guest_to_host_event.0.deliver(); + guest_to_host_event.0.deliver(); host_to_guest_interrupt.deliver(); return Ok(()); } @@ -1083,17 +1084,24 @@ impl ServerTask { .ok() .context("couldn't split ring")?; - if let Err(err) = self.unstick_incoming_ring(channel, in_gpadl, host_to_guest_interrupt) - { + if let Err(err) = self.unstick_incoming_ring( + channel, + in_gpadl, + guest_to_host_event, + host_to_guest_interrupt, + ) { tracing::warn!( channel = %channel.key, error = err.as_ref() as &dyn std::error::Error, "could not unstick incoming ring" ); } - if let Err(err) = - self.unstick_outgoing_ring(channel, out_gpadl, host_to_guest_interrupt) - { + if let Err(err) = self.unstick_outgoing_ring( + channel, + out_gpadl, + guest_to_host_event, + host_to_guest_interrupt, + ) { tracing::warn!( channel = %channel.key, error = err.as_ref() as &dyn std::error::Error, @@ -1108,12 +1116,13 @@ impl ServerTask { &self, channel: &Channel, in_gpadl: AlignedGpadlView, + guest_to_host_event: &ChannelEvent, host_to_guest_interrupt: &Interrupt, ) -> Result<(), anyhow::Error> { let incoming_mem = GpadlRingMem::new(in_gpadl, &self.inner.gm)?; if ring::reader_needs_signal(&incoming_mem) { tracing::info!(channel = %channel.key, "waking host for incoming ring"); - channel.guest_to_host_event.0.deliver(); + guest_to_host_event.0.deliver(); } if ring::writer_needs_signal(&incoming_mem) { tracing::info!(channel = %channel.key, "waking guest for incoming ring"); @@ -1126,6 +1135,7 @@ impl ServerTask { &self, channel: &Channel, out_gpadl: AlignedGpadlView, + guest_to_host_event: &ChannelEvent, host_to_guest_interrupt: &Interrupt, ) -> Result<(), anyhow::Error> { let outgoing_mem = GpadlRingMem::new(out_gpadl, &self.inner.gm)?; @@ -1135,7 +1145,7 @@ impl ServerTask { } if ring::writer_needs_signal(&outgoing_mem) { tracing::info!(channel = %channel.key, "waking host for outgoing ring"); - channel.guest_to_host_event.0.deliver(); + guest_to_host_event.0.deliver(); } Ok(()) } @@ -1349,22 +1359,6 @@ impl ServerTaskInner { .get_mut(&offer_id) .expect("channel does not exist"); - // Always register with the channel bitmap; if Win7, this may be unnecessary. - if let Some(channel_bitmap) = self.channel_bitmap.as_ref() { - channel_bitmap.register_channel( - open_params.event_flag, - channel.guest_to_host_event.0.clone(), - ); - } - // Always set up an event port; if V1, this will be unused. - let event_port = self - .synic - .add_event_port( - open_params.connection_id, - self.vtl, - channel.guest_to_host_event.clone(), - ) - .expect("connection ID should not be in use"); // For pre-Win8 guests, the host-to-guest event always targets vp 0 and the channel // bitmap is used instead of the event flag. let (target_vp, event_flag) = if self.channel_bitmap.is_some() { @@ -1392,15 +1386,64 @@ impl ServerTaskInner { .map(|monitor| monitor.register_monitor(monitor_id, open_params.connection_id)) }); - channel.state = ChannelState::Open { + channel.state = ChannelState::Opening { open_params: *open_params, - _event_port: event_port, monitor, host_to_guest_interrupt: interrupt.clone(), }; (channel, interrupt) } + fn complete_open(&mut self, offer_id: OfferId, result: Option) -> &mut Channel { + let channel = self + .channels + .get_mut(&offer_id) + .expect("channel does not exist"); + + channel.state = if let Some(result) = result { + match std::mem::replace(&mut channel.state, ChannelState::Closed) { + ChannelState::Opening { + open_params, + monitor, + host_to_guest_interrupt, + } => { + let guest_to_host_event = + Arc::new(ChannelEvent(result.guest_to_host_interrupt)); + // Always register with the channel bitmap; if Win7, this may be unnecessary. + if let Some(channel_bitmap) = self.channel_bitmap.as_ref() { + channel_bitmap.register_channel( + open_params.event_flag, + guest_to_host_event.0.clone(), + ); + } + // Always set up an event port; if V1, this will be unused. + let event_port = self + .synic + .add_event_port( + open_params.connection_id, + self.vtl, + guest_to_host_event.clone(), + ) + .expect("connection ID should not be in use"); + ChannelState::Open { + open_params, + _event_port: event_port, + monitor, + host_to_guest_interrupt, + guest_to_host_event, + } + } + s => { + tracing::error!("attempting to complete open of open or closed channel"); + s + } + } + } else { + ChannelState::Closed + }; + channel + } + /// If the client specified an interrupt page, map it into host memory and /// set up the shared event port. fn map_interrupt_page( @@ -1455,6 +1498,7 @@ impl ServerTaskInner { }; // Force is used by restore because there may be restored channels in the open state. + // TODO: can this check be moved into channels.rs? if !force && self.channels.iter().any(|(_, c)| { matches!( @@ -1462,6 +1506,9 @@ impl ServerTaskInner { ChannelState::Open { monitor: Some(_), .. + } | ChannelState::Opening { + monitor: Some(_), + .. } ) }) @@ -1516,7 +1563,6 @@ impl VmbusServerControl { async fn offer(&self, request: OfferInput) -> anyhow::Result { let mut offer_info = OfferInfo { params: request.params.into(), - event: request.event, request_send: request.request_send, server_request_recv: request.server_request_recv, }; @@ -1711,7 +1757,9 @@ mod tests { }; f(rpc.input()); - rpc.complete(true); + rpc.complete(Some(OpenResult { + guest_to_host_interrupt: Interrupt::null(), + })); } async fn handle_gpadl_teardown(&mut self) { @@ -1729,7 +1777,7 @@ mod tests { async fn restore(&self) { self.server_request_send - .call(ChannelServerRequest::Restore, false) + .call(ChannelServerRequest::Restore, None) .await .unwrap() .unwrap(); @@ -1768,7 +1816,6 @@ mod tests { let (request_send, request_recv) = mesh::channel(); let (server_request_send, server_request_recv) = mesh::channel(); let offer = OfferInput { - event: Interrupt::from_fn(|| {}), request_send, server_request_recv, params: OfferParams { @@ -1986,7 +2033,6 @@ mod tests { flags: protocol::OfferFlags::new().with_enumerate_device_interface(true), ..Default::default() }, - event: Interrupt::from_fn(|| {}), request_send, server_request_recv, }) diff --git a/vm/devices/vmbus/vmbus_server/src/proxyintegration.rs b/vm/devices/vmbus/vmbus_server/src/proxyintegration.rs index e1c1997a4..d6b602ae7 100644 --- a/vm/devices/vmbus/vmbus_server/src/proxyintegration.rs +++ b/vm/devices/vmbus/vmbus_server/src/proxyintegration.rs @@ -33,6 +33,7 @@ use vmbus_channel::bus::ChannelServerRequest; use vmbus_channel::bus::ChannelType; use vmbus_channel::bus::OfferParams; use vmbus_channel::bus::OpenRequest; +use vmbus_channel::bus::OpenResult; use vmbus_channel::gpadl::GpadlId; use vmbus_proxy::vmbusioctl::VMBUS_CHANNEL_ENUMERATE_DEVICE_INTERFACE; use vmbus_proxy::vmbusioctl::VMBUS_CHANNEL_NAMED_PIPE_MODE; @@ -84,6 +85,7 @@ impl ProxyIntegration { struct Channel { server_request_send: Option>, + incoming_event: Event, worker_result: Option>, } @@ -104,7 +106,11 @@ impl ProxyTask { } } - async fn handle_open(&self, proxy_id: u64, open_request: &OpenRequest) -> anyhow::Result<()> { + async fn handle_open( + &self, + proxy_id: u64, + open_request: &OpenRequest, + ) -> anyhow::Result { let event = open_request .interrupt .event() @@ -136,13 +142,10 @@ impl ProxyTask { }) .unwrap(); - self.channels - .lock() - .get_mut(&proxy_id) - .unwrap() - .worker_result = Some(recv); - - Ok(()) + let mut channels = self.channels.lock(); + let channel = channels.get_mut(&proxy_id).unwrap(); + channel.worker_result = Some(recv); + Ok(channel.incoming_event.clone()) } async fn handle_close(&self, proxy_id: u64) { @@ -241,7 +244,6 @@ impl ProxyTask { OfferRequest::Offer, OfferInfo { params: offer.into(), - event: Interrupt::from_event(incoming_event), request_send, server_request_recv, }, @@ -266,6 +268,7 @@ impl ProxyTask { id, Channel { server_request_send, + incoming_event, worker_result: None, }, ); @@ -327,7 +330,9 @@ impl ProxyTask { ChannelRequest::Open(rpc) => { rpc.handle(|open_request| async move { let result = self.handle_open(proxy_id, &open_request).await; - result.is_ok() + result.ok().map(|event| OpenResult { + guest_to_host_interrupt: Interrupt::from_event(event), + }) }) .await } From 9a923198742cd022be5d1539eab4a1af9115c747 Mon Sep 17 00:00:00 2001 From: John Starks Date: Fri, 10 Jan 2025 20:13:30 +0000 Subject: [PATCH 2/3] doc --- vm/devices/vmbus/vmbus_channel/src/bus.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/vm/devices/vmbus/vmbus_channel/src/bus.rs b/vm/devices/vmbus/vmbus_channel/src/bus.rs index 9fc3d0c7d..868b126e3 100644 --- a/vm/devices/vmbus/vmbus_channel/src/bus.rs +++ b/vm/devices/vmbus/vmbus_channel/src/bus.rs @@ -171,7 +171,8 @@ pub trait ParentBus: Send + Sync { /// time. fn clone_bus(&self) -> Box; - /// Returns whether [`OfferInput::event`] needs to be backed by an OS event. + /// Returns whether [`OpenResult::guest_to_host_interrupt`] needs to be + /// backed by an OS event. /// /// TODO: Remove this and just return the appropriate notify type directly /// once subchannel creation and enable are separated. From 4a9d9a01945a76679a6555b87604db6f02781f00 Mon Sep 17 00:00:00 2001 From: John Starks Date: Fri, 10 Jan 2025 20:57:33 +0000 Subject: [PATCH 3/3] better --- vm/devices/vmbus/vmbus_channel/src/offer.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vm/devices/vmbus/vmbus_channel/src/offer.rs b/vm/devices/vmbus/vmbus_channel/src/offer.rs index 23aad419a..e8f9eb17c 100644 --- a/vm/devices/vmbus/vmbus_channel/src/offer.rs +++ b/vm/devices/vmbus/vmbus_channel/src/offer.rs @@ -182,7 +182,7 @@ impl Offer { gpadl_map: self.gpadl_map.clone(), }; message.response.respond(Some(OpenResult { - guest_to_host_interrupt: Interrupt::from_event(self.event.event().unwrap().clone()), + guest_to_host_interrupt: self.event.clone().interrupt(), })); Ok(resources) }