Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mesh_channel: hide RPC response receiver from callers #449

Merged
merged 4 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions openhcl/profiler_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ impl Worker for ProfilerWorker {
WorkerRpc::Stop => {
break;
}
WorkerRpc::Restart(response) => {
response.send(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
WorkerRpc::Restart(rpc) => {
rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
}
WorkerRpc::Inspect(_deferred) => {}
},
Expand Down
4 changes: 2 additions & 2 deletions openhcl/underhill_core/src/diag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ impl Worker for DiagWorker {
};
match msg {
WorkerRpc::Stop => break Ok(()),
WorkerRpc::Restart(response) => {
response.send(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
WorkerRpc::Restart(rpc) => {
rpc.complete(Err(RemoteError::new(anyhow::anyhow!("not supported"))));
}
WorkerRpc::Inspect(_) => {}
}
Expand Down
13 changes: 6 additions & 7 deletions openhcl/underhill_core/src/dispatch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use hyperv_ic_resources::shutdown::ShutdownType;
use igvm_defs::MemoryMapEntryType;
use inspect::Inspect;
use mesh::error::RemoteError;
use mesh::error::RemoteResult;
use mesh::rpc::FailableRpc;
use mesh::rpc::Rpc;
use mesh::rpc::RpcSend;
Expand Down Expand Up @@ -183,7 +182,7 @@ pub(crate) struct LoadedVm {
}

pub struct LoadedVmState<T> {
pub restart_response: mesh::OneshotSender<RemoteResult<T>>,
pub restart_rpc: FailableRpc<(), T>,
pub servicing_state: ServicingState,
pub vm_rpc: mesh::Receiver<UhVmRpc>,
pub control_send: mesh::Sender<ControlRequest>,
Expand Down Expand Up @@ -262,26 +261,26 @@ impl LoadedVm {
Event::WorkerRpcGone => break None,
Event::WorkerRpc(message) => match message {
WorkerRpc::Stop => break None,
WorkerRpc::Restart(response) => {
WorkerRpc::Restart(rpc) => {
let state = async {
let running = self.stop().await;
match self.save(None, false).await {
Ok(servicing_state) => Some((response, servicing_state)),
Ok(servicing_state) => Some((rpc, servicing_state)),
Err(err) => {
if running {
self.start(None).await;
}
response.send(Err(RemoteError::new(err)));
rpc.complete(Err(RemoteError::new(err)));
None
}
}
}
.instrument(tracing::info_span!("restart"))
.await;

if let Some((response, servicing_state)) = state {
if let Some((rpc, servicing_state)) = state {
break Some(LoadedVmState {
restart_response: response,
restart_rpc: rpc,
servicing_state,
vm_rpc,
control_send: self.control_send.lock().take().unwrap(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use ide_resources::IdeControllerConfig;
use ide_resources::IdeDeviceConfig;
use ide_resources::IdePath;
use mesh::rpc::Rpc;
use mesh::rpc::RpcError;
use mesh::rpc::RpcSend;
use mesh::CancelContext;
use nvme_resources::NamespaceDefinition;
Expand Down Expand Up @@ -61,7 +62,7 @@ use vm_resource::ResourceResolver;
#[derive(Error, Debug)]
enum Error<'a> {
#[error("RPC error")]
Rpc(#[source] mesh::RecvError),
Rpc(#[source] RpcError),
#[error("cannot add/remove storage controllers at runtime")]
StorageCannotAddRemoveControllerAtRuntime,
#[error("Striping devices don't support runtime change")]
Expand Down
8 changes: 4 additions & 4 deletions openhcl/underhill_core/src/get_tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,13 @@ impl GetTracingBackend {
)
.merge();

let flush_response = loop {
let flush_rpc = loop {
let trace_type = streams.next().await.unwrap();
match trace_type {
Event::Trace(data) => {
write.send(&data).await.ok();
}
Event::Flush(Rpc((), response)) => break Some(response),
Event::Flush(rpc) => break Some(rpc),
Event::Done => break None,
}
};
Expand All @@ -174,8 +174,8 @@ impl GetTracingBackend {
// Wait for the host to read everything.
write.wait_empty().await.ok();

if let Some(resp) = flush_response {
resp.send(());
if let Some(rpc) = flush_rpc {
rpc.complete(());
} else {
break;
}
Expand Down
20 changes: 10 additions & 10 deletions openhcl/underhill_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ async fn run_control(
Control(ControlRequest),
}

let mut restart_response = None;
let mut restart_rpc = None;
loop {
let event = {
let mut stream = (
Expand Down Expand Up @@ -551,7 +551,7 @@ async fn run_control(
};

let r = async {
if restart_response.is_some() {
if restart_rpc.is_some() {
anyhow::bail!("previous restart still in progress");
}

Expand All @@ -568,7 +568,7 @@ async fn run_control(
rpc.complete(r.map_err(RemoteError::new));
} else {
state = ControlState::Restarting;
restart_response = Some(rpc.1);
restart_rpc = Some(rpc);
}
}
diag_server::DiagRequest::Pause(rpc) => {
Expand Down Expand Up @@ -647,7 +647,7 @@ async fn run_control(
}
#[cfg(feature = "profiler")]
diag_server::DiagRequest::Profile(rpc) => {
let Rpc(rpc_params, rpc_sender) = rpc;
let (rpc_params, rpc_sender) = rpc.split();
// Create profiler host if there is none created before
if profiler_host.is_none() {
match launch_mesh_host(mesh, "profiler", Some(tracing.tracer()))
Expand All @@ -658,7 +658,7 @@ async fn run_control(
profiler_host = Some(host);
}
Err(e) => {
rpc_sender.send(Err(RemoteError::new(e)));
rpc_sender.complete(Err(RemoteError::new(e)));
continue;
}
}
Expand All @@ -680,7 +680,7 @@ async fn run_control(
profiler_worker = worker;
}
Err(e) => {
rpc_sender.send(Err(RemoteError::new(e)));
rpc_sender.complete(Err(RemoteError::new(e)));
continue;
}
}
Expand All @@ -695,17 +695,17 @@ async fn run_control(
.and_then(|result| result.context("profiler worker failed"))
.map_err(RemoteError::new);

rpc_sender.send(result);
rpc_sender.complete(result);
})
.detach();
}
}
}
Event::Worker(event) => match event {
WorkerEvent::Started => {
if let Some(response) = restart_response.take() {
if let Some(response) = restart_rpc.take() {
tracing::info!("restart complete");
response.send(Ok(()));
response.complete(Ok(()));
} else {
tracing::info!("vm worker started");
}
Expand All @@ -719,7 +719,7 @@ async fn run_control(
}
WorkerEvent::RestartFailed(err) => {
tracing::error!(error = &err as &dyn std::error::Error, "restart failed");
restart_response.take().unwrap().send(Err(err));
restart_rpc.take().unwrap().complete(Err(err));
state = ControlState::Started;
}
},
Expand Down
2 changes: 1 addition & 1 deletion openhcl/underhill_core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ impl Worker for UnderhillVmWorker {
};

tracing::info!("sending worker restart state");
state.restart_response.send(Ok(RestartState {
state.restart_rpc.complete(Ok(RestartState {
params,
servicing_state: state.servicing_state,
}))
Expand Down
20 changes: 10 additions & 10 deletions openvmm/hvlite_core/src/worker/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ use memory_range::MemoryRange;
use mesh::error::RemoteError;
use mesh::payload::message::ProtobufMessage;
use mesh::payload::Protobuf;
use mesh::rpc::Rpc;
use mesh::MeshPayload;
use mesh_worker::Worker;
use mesh_worker::WorkerId;
Expand Down Expand Up @@ -2477,7 +2476,7 @@ impl LoadedVm {
pub async fn run(
mut self,
driver: &impl Spawn,
mut rpc: mesh::Receiver<VmRpc>,
mut rpc_recv: mesh::Receiver<VmRpc>,
mut worker_rpc: mesh::Receiver<WorkerRpc<RestartState>>,
) {
enum Event {
Expand Down Expand Up @@ -2508,7 +2507,7 @@ impl LoadedVm {

loop {
let event: Event = {
let a = rpc.recv().map(Event::VmRpc);
let a = rpc_recv.recv().map(Event::VmRpc);
let b = worker_rpc.recv().map(Event::WorkerRpc);
(a, b).race().await
};
Expand All @@ -2517,7 +2516,7 @@ impl LoadedVm {
Event::WorkerRpc(Err(_)) => break,
Event::WorkerRpc(Ok(message)) => match message {
WorkerRpc::Stop => break,
WorkerRpc::Restart(response) => {
WorkerRpc::Restart(rpc) => {
let mut stopped = false;
// First run the non-destructive operations.
let r = async {
Expand All @@ -2532,8 +2531,8 @@ impl LoadedVm {
.await;
match r {
Ok((shared_memory, saved_state)) => {
response.send(Ok(self
.serialize(rpc, shared_memory, saved_state)
rpc.complete(Ok(self
.serialize(rpc_recv, shared_memory, saved_state)
.await));

return;
Expand All @@ -2542,7 +2541,7 @@ impl LoadedVm {
if stopped {
self.state_units.start().await;
}
response.send(Err(RemoteError::new(err)));
rpc.complete(Err(RemoteError::new(err)));
}
}
}
Expand Down Expand Up @@ -2618,16 +2617,17 @@ impl LoadedVm {
})
.await
}
VmRpc::ConnectHvsock(Rpc((mut ctx, service_id, vtl), response)) => {
VmRpc::ConnectHvsock(rpc) => {
let ((mut ctx, service_id, vtl), response) = rpc.split();
if let Some(relay) = self.hvsock_relay(vtl) {
let fut = relay.connect(&mut ctx, service_id);
driver
.spawn("vmrpc-hvsock-connect", async move {
response.send(fut.await.map_err(RemoteError::new))
response.complete(fut.await.map_err(RemoteError::new))
})
.detach();
} else {
response.send(Err(RemoteError::new(anyhow::anyhow!(
response.complete(Err(RemoteError::new(anyhow::anyhow!(
"hvsock is not available"
))));
}
Expand Down
7 changes: 2 additions & 5 deletions openvmm/hvlite_helpers/src/underhill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
use anyhow::Context;
use get_resources::ged::GuestEmulationRequest;
use hvlite_defs::rpc::VmRpc;
use mesh::error::RemoteResultExt;
use mesh::rpc::RpcSend;

/// Replace the running version of Underhill.
Expand All @@ -28,9 +27,8 @@ pub async fn service_underhill(
// blocked while waiting for the guest.
tracing::debug!("waiting for guest to send saved state");
let r = send
.call(GuestEmulationRequest::SaveGuestVtl2State, ())
.call_failable(GuestEmulationRequest::SaveGuestVtl2State, ())
.await
.flatten()
.context("failed to save VTL2 state");

if r.is_err() {
Expand All @@ -51,9 +49,8 @@ pub async fn service_underhill(
//
// TODO: event driven, cancellable.
tracing::debug!("waiting for VTL0 to start");
send.call(GuestEmulationRequest::WaitForVtl0Start, ())
send.call_failable(GuestEmulationRequest::WaitForVtl0Start, ())
.await
.flatten()
.context("vtl0 start failed")?;

Ok(())
Expand Down
3 changes: 2 additions & 1 deletion openvmm/membacking/src/mapping_manager/va_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use futures::executor::block_on;
use guestmem::GuestMemoryAccess;
use guestmem::PageFaultAction;
use memory_range::MemoryRange;
use mesh::rpc::RpcError;
use mesh::rpc::RpcSend;
use parking_lot::Mutex;
use sparse_mmap::SparseMapping;
Expand Down Expand Up @@ -156,7 +157,7 @@ impl MapperTask {
#[derive(Debug, Error)]
pub enum VaMapperError {
#[error("failed to communicate with the memory manager")]
MemoryManagerGone(#[source] mesh::RecvError),
MemoryManagerGone(#[source] RpcError),
#[error("failed to reserve address space")]
Reserve(#[source] std::io::Error),
}
Expand Down
4 changes: 1 addition & 3 deletions openvmm/membacking/src/region_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,7 @@ impl RegionHandle {
impl Drop for RegionHandle {
fn drop(&mut self) {
if let Some(id) = self.id {
let (send, _recv) = mesh::oneshot();
self.req_send
.send(RegionRequest::RemoveRegion(Rpc(id, send)));
let _recv = self.req_send.call(RegionRequest::RemoveRegion, id);
// Don't wait for the response.
}
}
Expand Down
10 changes: 5 additions & 5 deletions openvmm/openvmm_entry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ use inspect::InspectionBuilder;
use io::Read;
use mesh::error::RemoteError;
use mesh::rpc::Rpc;
use mesh::rpc::RpcError;
use mesh::rpc::RpcSend;
use mesh::CancelContext;
use mesh::RecvError;
use mesh_worker::launch_local_worker;
use mesh_worker::WorkerEvent;
use mesh_worker::WorkerHandle;
Expand Down Expand Up @@ -2092,7 +2092,7 @@ async fn run_control(driver: &DefaultDriver, mesh: &VmmMesh, opt: Options) -> an
})
.unwrap();

let mut state_change_task = None::<Task<Result<StateChange, RecvError>>>;
let mut state_change_task = None::<Task<Result<StateChange, RpcError>>>;
let mut pulse_save_restore_interval: Option<Duration> = None;
let mut pending_shutdown = None;

Expand All @@ -2113,8 +2113,8 @@ async fn run_control(driver: &DefaultDriver, mesh: &VmmMesh, opt: Options) -> an
PulseSaveRestore,
Worker(WorkerEvent),
VncWorker(WorkerEvent),
StateChange(Result<StateChange, RecvError>),
ShutdownResult(Result<hyperv_ic_resources::shutdown::ShutdownResult, RecvError>),
StateChange(Result<StateChange, RpcError>),
ShutdownResult(Result<hyperv_ic_resources::shutdown::ShutdownResult, RpcError>),
}

let mut console_command_recv = console_command_recv
Expand Down Expand Up @@ -2360,7 +2360,7 @@ async fn run_control(driver: &DefaultDriver, mesh: &VmmMesh, opt: Options) -> an
fn state_change<U: 'static + Send>(
driver: impl Spawn,
vm_rpc: &mesh::Sender<VmRpc>,
state_change_task: &mut Option<Task<Result<StateChange, RecvError>>>,
state_change_task: &mut Option<Task<Result<StateChange, RpcError>>>,
f: impl FnOnce(Rpc<(), U>) -> VmRpc,
g: impl FnOnce(U) -> StateChange + 'static + Send,
) {
Expand Down
Loading
Loading