From c5a8de28e7844b3c29d58116d8340967f2e6bcc4 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Fri, 19 Jul 2024 17:24:28 -0700 Subject: [PATCH] refactor(hydro_deploy)!: make `Host` trait use `&self` interior mutability to remove `RwLock` wrappings #430 (#1347) Depends on #1346 --- hydro_deploy/core/src/azure.rs | 92 +++++++++--------- hydro_deploy/core/src/custom_service.rs | 27 ++---- hydro_deploy/core/src/deployment.rs | 38 +++----- hydro_deploy/core/src/gcp.rs | 94 +++++++++---------- hydro_deploy/core/src/hydroflow_crate/mod.rs | 6 +- .../core/src/hydroflow_crate/ports.rs | 44 ++++----- .../core/src/hydroflow_crate/service.rs | 15 ++- hydro_deploy/core/src/lib.rs | 21 +++-- hydro_deploy/core/src/localhost/mod.rs | 16 +--- hydro_deploy/hydro_cli/src/lib.rs | 50 +++------- .../src/deploy.rs | 6 +- hydroflow_plus_test/examples/compute_pi.rs | 6 +- .../examples/first_ten_distributed.rs | 6 +- hydroflow_plus_test/examples/map_reduce.rs | 6 +- .../examples/perf_compute_pi.rs | 6 +- .../examples/simple_cluster.rs | 6 +- 16 files changed, 184 insertions(+), 255 deletions(-) diff --git a/hydro_deploy/core/src/azure.rs b/hydro_deploy/core/src/azure.rs index 2d3f00ee8eb..40db7280f62 100644 --- a/hydro_deploy/core/src/azure.rs +++ b/hydro_deploy/core/src/azure.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex, OnceLock}; use anyhow::Result; use async_trait::async_trait; @@ -12,6 +12,7 @@ use super::{ ServerStrategy, }; use crate::ssh::LaunchedSshHost; +use crate::HostStrategyGetter; pub struct LaunchedVirtualMachine { resource_result: Arc, @@ -50,8 +51,8 @@ pub struct AzureHost { pub image: Option>, pub region: String, pub user: Option, - pub launched: Option>, - external_ports: Vec, + pub launched: OnceLock>, + external_ports: Mutex>, } impl AzureHost { @@ -72,8 +73,8 @@ impl AzureHost { image, region, user, - launched: None, - external_ports: vec![], + launched: OnceLock::new(), + external_ports: Mutex::new(Vec::new()), } } } @@ -84,17 +85,17 @@ impl Host for AzureHost { HostTargetType::Linux } - fn request_port(&mut self, bind_type: &ServerStrategy) { + fn request_port(&self, bind_type: &ServerStrategy) { match bind_type { ServerStrategy::UnixSocket => {} ServerStrategy::InternalTcpPort => {} ServerStrategy::ExternalTcpPort(port) => { - if !self.external_ports.contains(port) { - if self.launched.is_some() { + let mut external_ports = self.external_ports.lock().unwrap(); + if !external_ports.contains(port) { + if self.launched.get().is_some() { todo!("Cannot adjust firewall after host has been launched"); } - - self.external_ports.push(*port); + external_ports.push(*port); } } ServerStrategy::Demux(demux) => { @@ -114,7 +115,7 @@ impl Host for AzureHost { } } - fn request_custom_binary(&mut self) { + fn request_custom_binary(&self) { self.request_port(&ServerStrategy::ExternalTcpPort(22)); } @@ -126,12 +127,8 @@ impl Host for AzureHost { self } - fn as_any_mut(&mut self) -> &mut dyn std::any::Any { - self - } - fn collect_resources(&self, resource_batch: &mut ResourceBatch) { - if self.launched.is_some() { + if self.launched.get().is_some() { return; } @@ -398,46 +395,43 @@ impl Host for AzureHost { fn launched(&self) -> Option> { self.launched - .as_ref() + .get() .map(|a| a.clone() as Arc) } - fn provision(&mut self, resource_result: &Arc) -> Arc { - if self.launched.is_none() { - let id = self.id; - - let internal_ip = resource_result - .terraform - .outputs - .get(&format!("vm-instance-{id}-internal-ip")) - .unwrap() - .value - .clone(); - - let external_ip = resource_result - .terraform - .outputs - .get(&format!("vm-instance-{id}-public-ip")) - .map(|v| v.value.clone()); - - self.launched = Some(Arc::new(LaunchedVirtualMachine { - resource_result: resource_result.clone(), - user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()), - internal_ip, - external_ip, - })) - } - - self.launched.as_ref().unwrap().clone() + fn provision(&self, resource_result: &Arc) -> Arc { + self.launched + .get_or_init(|| { + let id = self.id; + + let internal_ip = resource_result + .terraform + .outputs + .get(&format!("vm-instance-{id}-internal-ip")) + .unwrap() + .value + .clone(); + + let external_ip = resource_result + .terraform + .outputs + .get(&format!("vm-instance-{id}-public-ip")) + .map(|v| v.value.clone()); + + Arc::new(LaunchedVirtualMachine { + resource_result: resource_result.clone(), + user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()), + internal_ip, + external_ip, + }) + }) + .clone() } fn strategy_as_server<'a>( &'a self, client_host: &dyn Host, - ) -> Result<( - ClientStrategy<'a>, - Box ServerStrategy>, - )> { + ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> { if client_host.can_connect_to(ClientStrategy::UnixSocket(self.id)) { Ok(( ClientStrategy::UnixSocket(self.id), @@ -452,7 +446,7 @@ impl Host for AzureHost { Ok(( ClientStrategy::ForwardedTcpPort(self), Box::new(|me| { - me.downcast_mut::() + me.downcast_ref::() .unwrap() .request_port(&ServerStrategy::ExternalTcpPort(22)); // needed to forward ServerStrategy::InternalTcpPort diff --git a/hydro_deploy/core/src/custom_service.rs b/hydro_deploy/core/src/custom_service.rs index ca25d986ddc..69c22954bfc 100644 --- a/hydro_deploy/core/src/custom_service.rs +++ b/hydro_deploy/core/src/custom_service.rs @@ -16,7 +16,7 @@ use crate::hydroflow_crate::ports::ReverseSinkInstantiator; /// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem. pub struct CustomService { _id: usize, - on: Arc>, + on: Arc, /// The ports that the service wishes to expose to the public internet. external_ports: Vec, @@ -25,7 +25,7 @@ pub struct CustomService { } impl CustomService { - pub fn new(id: usize, on: Arc>, external_ports: Vec) -> Self { + pub fn new(id: usize, on: Arc, external_ports: Vec) -> Self { Self { _id: id, on, @@ -46,10 +46,7 @@ impl Service for CustomService { return; } - let mut host = self - .on - .try_write() - .expect("No one should be reading/writing the host while resources are collected"); + let host = &self.on; for port in self.external_ports.iter() { host.request_port(&ServerStrategy::ExternalTcpPort(*port)); @@ -61,8 +58,8 @@ impl Service for CustomService { return Ok(()); } - let mut host_write = self.on.write().await; - let launched = host_write.provision(resource_result); + let host = &self.on; + let launched = host.provision(resource_result); self.launched_host = Some(launched); Ok(()) } @@ -118,7 +115,7 @@ impl HydroflowSource for CustomClientPort { SourcePath::Direct(self.on.upgrade().unwrap().try_read().unwrap().on.clone()) } - fn host(&self) -> Arc> { + fn host(&self) -> Arc { panic!("Custom services cannot be used as the server") } @@ -149,28 +146,24 @@ impl HydroflowSink for CustomClientPort { fn instantiate_reverse( &self, - server_host: &Arc>, + server_host: &Arc, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, ) -> Result { let client = self.on.upgrade().unwrap(); let client_read = client.try_read().unwrap(); - let server_host_clone = server_host.clone(); - let server_host = server_host_clone.try_read().unwrap(); + let server_host = server_host.clone(); - let (conn_type, bind_type) = - server_host.strategy_as_server(client_read.on.try_read().unwrap().deref())?; + let (conn_type, bind_type) = server_host.strategy_as_server(client_read.on.deref())?; let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink)); - let server_host_clone = server_host_clone.clone(); Ok(Box::new(move |me| { - let mut server_host = server_host_clone.try_write().unwrap(); me.downcast_ref::() .unwrap() .record_server_config(client_port); - bind_type(server_host.as_any_mut()) + bind_type(server_host.as_any()) })) } } diff --git a/hydro_deploy/core/src/deployment.rs b/hydro_deploy/core/src/deployment.rs index dd1fbaca695..ffdd69ce27e 100644 --- a/hydro_deploy/core/src/deployment.rs +++ b/hydro_deploy/core/src/deployment.rs @@ -13,7 +13,7 @@ use crate::ServiceBuilder; #[derive(Default)] pub struct Deployment { - pub hosts: Vec>>, + pub hosts: Vec>, pub services: Vec>>, pub resource_pool: ResourcePool, last_resource_result: Option>, @@ -27,7 +27,7 @@ impl Deployment { } #[allow(non_snake_case)] - pub fn Localhost(&mut self) -> Arc> { + pub fn Localhost(&mut self) -> Arc { self.add_host(LocalhostHost::new) } @@ -40,7 +40,7 @@ impl Deployment { region: impl Into, network: Arc>, user: Option, - ) -> Arc> { + ) -> Arc { self.add_host(|id| { GcpComputeEngineHost::new(id, project, machine_type, image, region, network, user) }) @@ -49,7 +49,7 @@ impl Deployment { #[allow(non_snake_case)] pub fn CustomService( &mut self, - on: Arc>, + on: Arc, external_ports: Vec, ) -> Arc> { self.add_service(|id| CustomService::new(id, on, external_ports)) @@ -76,10 +76,10 @@ impl Deployment { } for host in self.hosts.iter_mut() { - host.write().await.collect_resources(&mut resource_batch); + host.collect_resources(&mut resource_batch); } - let result = Arc::new( + let resource_result = Arc::new( progress::ProgressTracker::with_group("provision", None, || async { resource_batch .provision(&mut self.resource_pool, self.last_resource_result.clone()) @@ -87,18 +87,11 @@ impl Deployment { }) .await?, ); - self.last_resource_result = Some(result.clone()); - - progress::ProgressTracker::with_group("provision", None, || { - let hosts_provisioned = - self.hosts - .iter_mut() - .map(|host: &mut Arc>| async { - host.write().await.provision(&result); - }); - futures::future::join_all(hosts_provisioned) - }) - .await; + self.last_resource_result = Some(resource_result.clone()); + + for host in self.hosts.iter() { + host.provision(&resource_result); + } progress::ProgressTracker::with_group("deploy", None, || { let services_future = self @@ -110,7 +103,7 @@ impl Deployment { .unwrap() .write() .await - .deploy(&result) + .deploy(&resource_result) .await }) .collect::>(); @@ -163,11 +156,8 @@ impl Deployment { Ok(()) } - pub fn add_host T>( - &mut self, - host: F, - ) -> Arc> { - let arc = Arc::new(RwLock::new(host(self.next_host_id))); + pub fn add_host T>(&mut self, host: F) -> Arc { + let arc = Arc::new(host(self.next_host_id)); self.next_host_id += 1; self.hosts.push(arc.clone()); diff --git a/hydro_deploy/core/src/gcp.rs b/hydro_deploy/core/src/gcp.rs index 4df55f73518..cc59350232a 100644 --- a/hydro_deploy/core/src/gcp.rs +++ b/hydro_deploy/core/src/gcp.rs @@ -1,5 +1,5 @@ use std::collections::HashMap; -use std::sync::Arc; +use std::sync::{Arc, Mutex, OnceLock}; use anyhow::Result; use async_trait::async_trait; @@ -13,6 +13,7 @@ use super::{ ServerStrategy, }; use crate::ssh::LaunchedSshHost; +use crate::HostStrategyGetter; pub struct LaunchedComputeEngine { resource_result: Arc, @@ -175,8 +176,8 @@ pub struct GcpComputeEngineHost { pub region: String, pub network: Arc>, pub user: Option, - pub launched: Option>, - external_ports: Vec, + pub launched: OnceLock>, + external_ports: Mutex>, } impl GcpComputeEngineHost { @@ -197,8 +198,8 @@ impl GcpComputeEngineHost { region: region.into(), network, user, - launched: None, - external_ports: vec![], + launched: OnceLock::new(), + external_ports: Mutex::new(Vec::new()), } } } @@ -209,17 +210,17 @@ impl Host for GcpComputeEngineHost { HostTargetType::Linux } - fn request_port(&mut self, bind_type: &ServerStrategy) { + fn request_port(&self, bind_type: &ServerStrategy) { match bind_type { ServerStrategy::UnixSocket => {} ServerStrategy::InternalTcpPort => {} ServerStrategy::ExternalTcpPort(port) => { - if !self.external_ports.contains(port) { - if self.launched.is_some() { + let mut external_ports = self.external_ports.lock().unwrap(); + if !external_ports.contains(port) { + if self.launched.get().is_some() { todo!("Cannot adjust firewall after host has been launched"); } - - self.external_ports.push(*port); + external_ports.push(*port); } } ServerStrategy::Demux(demux) => { @@ -239,7 +240,7 @@ impl Host for GcpComputeEngineHost { } } - fn request_custom_binary(&mut self) { + fn request_custom_binary(&self) { self.request_port(&ServerStrategy::ExternalTcpPort(22)); } @@ -251,12 +252,8 @@ impl Host for GcpComputeEngineHost { self } - fn as_any_mut(&mut self) -> &mut dyn std::any::Any { - self - } - fn collect_resources(&self, resource_batch: &mut ResourceBatch) { - if self.launched.is_some() { + if self.launched.get().is_some() { return; } @@ -339,7 +336,8 @@ impl Host for GcpComputeEngineHost { let mut tags = vec![]; let mut external_interfaces = vec![]; - if self.external_ports.is_empty() { + let external_ports = self.external_ports.lock().unwrap(); + if external_ports.is_empty() { external_interfaces.push(json!({ "network": format!("${{{vpc_path}.self_link}}") })); } else { external_interfaces.push(json!({ @@ -352,7 +350,7 @@ impl Host for GcpComputeEngineHost { })); // open the external ports that were requested - let my_external_tags = self.external_ports.iter().map(|port| { + let my_external_tags = external_ports.iter().map(|port| { let rule_id = nanoid!(8, &TERRAFORM_ALPHABET); let firewall_rule = resource_batch .terraform @@ -386,6 +384,7 @@ impl Host for GcpComputeEngineHost { } ); } + drop(external_ports); // Drop the lock as soon as possible. let user = self.user.as_ref().cloned().unwrap_or("hydro".to_string()); resource_batch @@ -429,46 +428,43 @@ impl Host for GcpComputeEngineHost { fn launched(&self) -> Option> { self.launched - .as_ref() + .get() .map(|a| a.clone() as Arc) } - fn provision(&mut self, resource_result: &Arc) -> Arc { - if self.launched.is_none() { - let id = self.id; - - let internal_ip = resource_result - .terraform - .outputs - .get(&format!("vm-instance-{id}-internal-ip")) - .unwrap() - .value - .clone(); + fn provision(&self, resource_result: &Arc) -> Arc { + self.launched + .get_or_init(|| { + let id = self.id; - let external_ip = resource_result - .terraform - .outputs - .get(&format!("vm-instance-{id}-public-ip")) - .map(|v| v.value.clone()); - - self.launched = Some(Arc::new(LaunchedComputeEngine { - resource_result: resource_result.clone(), - user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()), - internal_ip, - external_ip, - })) - } + let internal_ip = resource_result + .terraform + .outputs + .get(&format!("vm-instance-{id}-internal-ip")) + .unwrap() + .value + .clone(); - self.launched.as_ref().unwrap().clone() + let external_ip = resource_result + .terraform + .outputs + .get(&format!("vm-instance-{id}-public-ip")) + .map(|v| v.value.clone()); + + Arc::new(LaunchedComputeEngine { + resource_result: resource_result.clone(), + user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()), + internal_ip, + external_ip, + }) + }) + .clone() } fn strategy_as_server<'a>( &'a self, client_host: &dyn Host, - ) -> Result<( - ClientStrategy<'a>, - Box ServerStrategy>, - )> { + ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> { if client_host.can_connect_to(ClientStrategy::UnixSocket(self.id)) { Ok(( ClientStrategy::UnixSocket(self.id), @@ -483,7 +479,7 @@ impl Host for GcpComputeEngineHost { Ok(( ClientStrategy::ForwardedTcpPort(self), Box::new(|me| { - me.downcast_mut::() + me.downcast_ref::() .unwrap() .request_port(&ServerStrategy::ExternalTcpPort(22)); // needed to forward ServerStrategy::InternalTcpPort diff --git a/hydro_deploy/core/src/hydroflow_crate/mod.rs b/hydro_deploy/core/src/hydroflow_crate/mod.rs index 4e2639e4c4d..125685d2adb 100644 --- a/hydro_deploy/core/src/hydroflow_crate/mod.rs +++ b/hydro_deploy/core/src/hydroflow_crate/mod.rs @@ -1,8 +1,6 @@ use std::path::PathBuf; use std::sync::Arc; -use tokio::sync::RwLock; - use super::Host; use crate::ServiceBuilder; @@ -24,7 +22,7 @@ pub enum CrateTarget { pub struct HydroflowCrate { src: PathBuf, target: CrateTarget, - on: Arc>, + on: Arc, profile: Option, perf: Option, /* If a path is provided, run perf to get CPU time and output to that path.perf.data */ args: Vec, @@ -35,7 +33,7 @@ impl HydroflowCrate { /// Creates a new `HydroflowCrate` that will be deployed on the given host. /// The `src` argument is the path to the crate's directory, and the `on` /// argument is the host that the crate will be deployed on. - pub fn new(src: impl Into, on: Arc>) -> Self { + pub fn new(src: impl Into, on: Arc) -> Self { Self { src: src.into(), target: CrateTarget::Default, diff --git a/hydro_deploy/core/src/hydroflow_crate/ports.rs b/hydro_deploy/core/src/hydroflow_crate/ports.rs index dd7bac51c71..b4fe7248fb1 100644 --- a/hydro_deploy/core/src/hydroflow_crate/ports.rs +++ b/hydro_deploy/core/src/hydroflow_crate/ports.rs @@ -17,7 +17,7 @@ pub trait HydroflowSource: Send + Sync { fn source_path(&self) -> SourcePath; fn record_server_config(&self, config: ServerConfig); - fn host(&self) -> Arc>; + fn host(&self) -> Arc; fn server(&self) -> Arc; fn record_server_strategy(&self, config: ServerStrategy); @@ -41,10 +41,9 @@ pub trait HydroflowSource: Send + Sync { } } -#[async_trait] pub trait HydroflowServer: DynClone + Send + Sync { fn get_port(&self) -> ServerPort; - async fn launched_host(&self) -> Arc; + fn launched_host(&self) -> Arc; } pub type ReverseSinkInstantiator = Box ServerStrategy>; @@ -60,7 +59,7 @@ pub trait HydroflowSink: Send + Sync { /// Returns a thunk that can be called to perform mutations that instantiate the sink, taking a mutable reference to this sink. fn instantiate_reverse( &self, - server_host: &Arc>, + server_host: &Arc, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, ) -> Result; @@ -80,7 +79,7 @@ impl HydroflowSource for TaggedSource { self.source.record_server_config(config); } - fn host(&self) -> Arc> { + fn host(&self) -> Arc { self.source.host() } @@ -104,7 +103,7 @@ impl HydroflowSource for NullSourceSink { SourcePath::Null } - fn host(&self) -> Arc> { + fn host(&self) -> Arc { panic!("null source has no host") } @@ -127,7 +126,7 @@ impl HydroflowSink for NullSourceSink { fn instantiate_reverse( &self, - _server_host: &Arc>, + _server_host: &Arc, _server_sink: Arc, _wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, ) -> Result { @@ -162,7 +161,7 @@ impl HydroflowSink for DemuxSink { fn instantiate_reverse( &self, - server_host: &Arc>, + server_host: &Arc, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, ) -> Result { @@ -194,7 +193,7 @@ impl HydroflowSink for DemuxSink { #[derive(Clone)] pub struct HydroflowPortConfig { pub service: Weak>, - pub service_host: Arc>, + pub service_host: Arc, pub service_server_defns: Arc>>, pub port: String, pub merge: bool, @@ -225,7 +224,7 @@ impl HydroflowSource for HydroflowPortConfig { ) } - fn host(&self) -> Arc> { + fn host(&self) -> Arc { self.service_host.clone() } @@ -268,14 +267,14 @@ impl HydroflowServer for HydroflowPortConfig { server_defns.get(&self.port).unwrap().clone() } - async fn launched_host(&self) -> Arc { - self.service_host.read().await.launched().unwrap() + fn launched_host(&self) -> Arc { + self.service_host.launched().unwrap() } } pub enum SourcePath { Null, - Direct(Arc>), + Direct(Arc), Tagged(Box, u32), } @@ -287,7 +286,6 @@ impl SourcePath { ) -> Result<(HostStrategyGetter, ServerConfig)> { match self { SourcePath::Direct(client_host) => { - let client_host = client_host.try_read().unwrap(); let (conn_type, bind_type) = server_host.strategy_as_server(client_host.deref())?; let base_config = ServerConfig::from_strategy(&conn_type, Arc::new(server.clone())); Ok((bind_type, base_config)) @@ -321,8 +319,7 @@ impl HydroflowSink for HydroflowPortConfig { let server = self.service.upgrade().unwrap(); let server_read = server.try_read().unwrap(); - let server_host_clone = server_read.on.clone(); - let server_host = server_host_clone.try_read().unwrap(); + let server_host = server_read.on.clone(); let (bind_type, base_config) = client_path.plan(self, server_host.deref())?; @@ -331,7 +328,7 @@ impl HydroflowSink for HydroflowPortConfig { let port = self.port.clone(); Ok(Box::new(move || { let mut server_write = server.try_write().unwrap(); - let bind_type = bind_type(server_write.on.try_write().unwrap().as_any_mut()); + let bind_type = (bind_type)(server_write.on.as_any()); if merge { let merge_config = server_write @@ -356,18 +353,16 @@ impl HydroflowSink for HydroflowPortConfig { fn instantiate_reverse( &self, - server_host: &Arc>, + server_host: &Arc, server_sink: Arc, wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig, ) -> Result { let client = self.service.upgrade().unwrap(); let client_read = client.try_read().unwrap(); - let server_host_clone = server_host.clone(); - let server_host = server_host_clone.try_read().unwrap(); + let server_host = server_host.clone(); - let (conn_type, bind_type) = - server_host.strategy_as_server(client_read.on.try_read().unwrap().deref())?; + let (conn_type, bind_type) = server_host.strategy_as_server(client_read.on.deref())?; let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink)); let client = client.clone(); @@ -394,8 +389,7 @@ impl HydroflowSink for HydroflowPortConfig { .insert(port.clone(), client_port); }; - let mut server_host = client_write.on.try_write().unwrap(); - bind_type(server_host.as_any_mut()) + (bind_type)(client_write.on.as_any()) })) } } @@ -468,7 +462,7 @@ impl ServerConfig { ServerConfig::Forwarded(server) => { let selected = select(server.get_port()); - forward_connection(&selected, server.launched_host().await.as_ref()).await + forward_connection(&selected, server.launched_host().as_ref()).await } ServerConfig::Demux(demux) => { diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 9c7b1b9c51d..720fcb20e61 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -19,7 +19,7 @@ use crate::{ pub struct HydroflowCrateService { id: usize, - pub(super) on: Arc>, + pub(super) on: Arc, build_params: BuildParams, perf: Option, args: Option>, @@ -50,7 +50,7 @@ impl HydroflowCrateService { pub fn new( id: usize, src: PathBuf, - on: Arc>, + on: Arc, bin: Option, example: Option, profile: Option, @@ -60,7 +60,7 @@ impl HydroflowCrateService { display_id: Option, external_ports: Vec, ) -> Self { - let target_type = on.try_read().unwrap().target_type(); + let target_type = on.target_type(); let build_params = BuildParams::new(src, bin, example, profile, target_type, features); @@ -165,10 +165,7 @@ impl Service for HydroflowCrateService { tokio::task::spawn(self.build()); - let mut host = self - .on - .try_write() - .expect("No one should be reading/writing the host while resources are collected"); + let host = &self.on; host.request_custom_binary(); for (_, bind_type) in self.port_to_bind.iter() { @@ -194,8 +191,8 @@ impl Service for HydroflowCrateService { || async { let built = self.build().await?; - let mut host_write = self.on.write().await; - let launched = host_write.provision(resource_result); + let host = &self.on; + let launched = host.provision(resource_result); launched.copy_binary(built).await?; diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index c399bf9e91d..2af1802c6db 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -143,30 +143,28 @@ pub enum HostTargetType { Linux, } -pub type HostStrategyGetter = Box ServerStrategy>; +pub type HostStrategyGetter = Box ServerStrategy>; pub trait Host: Send + Sync { fn target_type(&self) -> HostTargetType; - fn request_port(&mut self, bind_type: &ServerStrategy); + fn request_port(&self, bind_type: &ServerStrategy); /// An identifier for this host, which is unique within a deployment. fn id(&self) -> usize; - /// Returns a reference to the host as a trait object. - fn as_any(&self) -> &dyn std::any::Any; - - /// Returns a reference to the host as a trait object. - fn as_any_mut(&mut self) -> &mut dyn std::any::Any; - /// Configures the host to support copying and running a custom binary. - fn request_custom_binary(&mut self); + fn request_custom_binary(&self); /// Makes requests for physical resources (servers) that this host needs to run. + /// + /// This should be called before `provision` is called. fn collect_resources(&self, resource_batch: &mut ResourceBatch); /// Connects to the acquired resources and prepares the host to run services. - fn provision(&mut self, resource_result: &Arc) -> Arc; + /// + /// This should be called after `collect_resources` is called. + fn provision(&self, resource_result: &Arc) -> Arc; fn launched(&self) -> Option>; @@ -179,6 +177,9 @@ pub trait Host: Send + Sync { /// Determines whether this host can connect to another host using the given strategy. fn can_connect_to(&self, typ: ClientStrategy) -> bool; + + /// Returns a reference to the host as a trait object. + fn as_any(&self) -> &dyn std::any::Any; } #[async_trait] diff --git a/hydro_deploy/core/src/localhost/mod.rs b/hydro_deploy/core/src/localhost/mod.rs index 4644ba22a5e..e17e18f8c9b 100644 --- a/hydro_deploy/core/src/localhost/mod.rs +++ b/hydro_deploy/core/src/localhost/mod.rs @@ -13,6 +13,7 @@ use super::{ ResourceResult, ServerStrategy, }; use crate::hydroflow_crate::build::BuildOutput; +use crate::HostStrategyGetter; pub mod launched_binary; pub use launched_binary::*; @@ -45,9 +46,9 @@ impl Host for LocalhostHost { HostTargetType::Local } - fn request_port(&mut self, _bind_type: &ServerStrategy) {} + fn request_port(&self, _bind_type: &ServerStrategy) {} fn collect_resources(&self, _resource_batch: &mut ResourceBatch) {} - fn request_custom_binary(&mut self) {} + fn request_custom_binary(&self) {} fn id(&self) -> usize { self.id @@ -57,25 +58,18 @@ impl Host for LocalhostHost { self } - fn as_any_mut(&mut self) -> &mut dyn std::any::Any { - self - } - fn launched(&self) -> Option> { Some(Arc::new(LaunchedLocalhost {})) } - fn provision(&mut self, _resource_result: &Arc) -> Arc { + fn provision(&self, _resource_result: &Arc) -> Arc { Arc::new(LaunchedLocalhost {}) } fn strategy_as_server<'a>( &'a self, connection_from: &dyn Host, - ) -> Result<( - ClientStrategy<'a>, - Box ServerStrategy>, - )> { + ) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> { if self.client_only { anyhow::bail!("Localhost cannot be a server if it is client only") } diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 88ca1991525..7dc9b753b92 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -328,20 +328,18 @@ impl Deployment { #[pyclass(subclass)] pub struct Host { - underlying: Arc>, + underlying: Arc, } #[pyclass(extends=Host, subclass)] struct LocalhostHost { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl LocalhostHost { fn client_only(&self, py: Python<'_>) -> PyResult> { - let arc = Arc::new(RwLock::new( - self.underlying.try_read().unwrap().client_only(), - )); + let arc = Arc::new(self.underlying.client_only()); Ok(Py::new( py, @@ -372,39 +370,26 @@ impl GcpNetwork { #[pyclass(extends=Host, subclass)] struct GcpComputeEngineHost { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl GcpComputeEngineHost { #[getter] fn internal_ip(&self) -> String { - self.underlying - .blocking_read() - .launched - .as_ref() - .unwrap() - .internal_ip - .clone() + self.underlying.launched.get().unwrap().internal_ip.clone() } #[getter] fn external_ip(&self) -> Option { - self.underlying - .blocking_read() - .launched - .as_ref() - .unwrap() - .external_ip - .clone() + self.underlying.launched.get().unwrap().external_ip.clone() } #[getter] fn ssh_key_path(&self) -> String { self.underlying - .blocking_read() .launched - .as_ref() + .get() .unwrap() .ssh_key_path() .to_str() @@ -415,39 +400,26 @@ impl GcpComputeEngineHost { #[pyclass(extends=Host, subclass)] struct AzureHost { - underlying: Arc>, + underlying: Arc, } #[pymethods] impl AzureHost { #[getter] fn internal_ip(&self) -> String { - self.underlying - .blocking_read() - .launched - .as_ref() - .unwrap() - .internal_ip - .clone() + self.underlying.launched.get().unwrap().internal_ip.clone() } #[getter] fn external_ip(&self) -> Option { - self.underlying - .blocking_read() - .launched - .as_ref() - .unwrap() - .external_ip - .clone() + self.underlying.launched.get().unwrap().external_ip.clone() } #[getter] fn ssh_key_path(&self) -> String { self.underlying - .blocking_read() .launched - .as_ref() + .get() .unwrap() .ssh_key_path() .to_str() diff --git a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs index 4c3382904e3..b1ed029ffc6 100644 --- a/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs +++ b/hydro_deploy/hydroflow_plus_cli_integration/src/deploy.rs @@ -39,7 +39,7 @@ pub trait DeployCrateWrapper { &self, port: &str, deployment: &mut Deployment, - on: &Arc>, + on: &Arc, ) -> CustomClientPort { let sender_service = deployment.CustomService(on.clone(), vec![]); let sender_port = sender_service.read().await.declare_client(&sender_service); @@ -86,7 +86,7 @@ impl DeployPort { pub async fn create_sender( &self, deployment: &mut Deployment, - on: &Arc>, + on: &Arc, ) -> CustomClientPort { self.node.create_sender(&self.port, deployment, on).await } @@ -96,7 +96,7 @@ impl DeployPort { pub async fn create_senders( &self, deployment: &mut Deployment, - on: &Arc>, + on: &Arc, ) -> Vec { let mut out = vec![]; for member in &self.node.members { diff --git a/hydroflow_plus_test/examples/compute_pi.rs b/hydroflow_plus_test/examples/compute_pi.rs index b9aeac8cfee..f35adc354c0 100644 --- a/hydroflow_plus_test/examples/compute_pi.rs +++ b/hydroflow_plus_test/examples/compute_pi.rs @@ -7,7 +7,7 @@ use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec}; use stageleft::RuntimeData; use tokio::sync::RwLock; -type HostCreator = Box Arc>>; +type HostCreator = Box Arc>; // run with no args for localhost, with `gcp ` for GCP #[tokio::main] @@ -20,7 +20,7 @@ async fn main() { let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); ( - Box::new(move |deployment| -> Arc> { + Box::new(move |deployment| -> Arc { deployment.GcpComputeEngineHost( &project, "e2-micro", @@ -35,7 +35,7 @@ async fn main() { } else { let localhost = deployment.borrow_mut().Localhost(); ( - Box::new(move |_| -> Arc> { localhost.clone() }), + Box::new(move |_| -> Arc { localhost.clone() }), "dev", ) }; diff --git a/hydroflow_plus_test/examples/first_ten_distributed.rs b/hydroflow_plus_test/examples/first_ten_distributed.rs index 264a6a0bb60..c96f0a5f6a6 100644 --- a/hydroflow_plus_test/examples/first_ten_distributed.rs +++ b/hydroflow_plus_test/examples/first_ten_distributed.rs @@ -5,7 +5,7 @@ use hydro_deploy::{Deployment, Host, HydroflowCrate}; use hydroflow_plus_cli_integration::DeployProcessSpec; use tokio::sync::RwLock; -type HostCreator = Box Arc>>; +type HostCreator = Box Arc>; // run with no args for localhost, with `gcp ` for GCP #[tokio::main] @@ -18,7 +18,7 @@ async fn main() { let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); ( - Box::new(move |deployment| -> Arc> { + Box::new(move |deployment| -> Arc { deployment.GcpComputeEngineHost( &project, "e2-micro", @@ -33,7 +33,7 @@ async fn main() { } else { let localhost = deployment.Localhost(); ( - Box::new(move |_| -> Arc> { localhost.clone() }), + Box::new(move |_| -> Arc { localhost.clone() }), "dev", ) }; diff --git a/hydroflow_plus_test/examples/map_reduce.rs b/hydroflow_plus_test/examples/map_reduce.rs index 1e20a6f6335..0aeaf1b9e1f 100644 --- a/hydroflow_plus_test/examples/map_reduce.rs +++ b/hydroflow_plus_test/examples/map_reduce.rs @@ -6,7 +6,7 @@ use hydro_deploy::{Deployment, Host, HydroflowCrate}; use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec}; use tokio::sync::RwLock; -type HostCreator = Box Arc>>; +type HostCreator = Box Arc>; // run with no args for localhost, with `gcp ` for GCP #[tokio::main] @@ -19,7 +19,7 @@ async fn main() { let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); ( - Box::new(move |deployment| -> Arc> { + Box::new(move |deployment| -> Arc { deployment.GcpComputeEngineHost( &project, "e2-micro", @@ -34,7 +34,7 @@ async fn main() { } else { let localhost = deployment.borrow_mut().Localhost(); ( - Box::new(move |_| -> Arc> { localhost.clone() }), + Box::new(move |_| -> Arc { localhost.clone() }), "dev", ) }; diff --git a/hydroflow_plus_test/examples/perf_compute_pi.rs b/hydroflow_plus_test/examples/perf_compute_pi.rs index ed6ee798c09..58c945fdaf4 100644 --- a/hydroflow_plus_test/examples/perf_compute_pi.rs +++ b/hydroflow_plus_test/examples/perf_compute_pi.rs @@ -7,7 +7,7 @@ use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec}; use stageleft::RuntimeData; use tokio::sync::RwLock; -type HostCreator = Box Arc>>; +type HostCreator = Box Arc>; // run with no args for localhost, with `gcp ` for GCP #[tokio::main] @@ -20,7 +20,7 @@ async fn main() { let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); ( - Box::new(move |deployment| -> Arc> { + Box::new(move |deployment| -> Arc { deployment.GcpComputeEngineHost( &project, "e2-micro", @@ -35,7 +35,7 @@ async fn main() { } else { let localhost = deployment.borrow_mut().Localhost(); ( - Box::new(move |_| -> Arc> { localhost.clone() }), + Box::new(move |_| -> Arc { localhost.clone() }), "profile", ) }; diff --git a/hydroflow_plus_test/examples/simple_cluster.rs b/hydroflow_plus_test/examples/simple_cluster.rs index 24383293683..3b6d1058d8c 100644 --- a/hydroflow_plus_test/examples/simple_cluster.rs +++ b/hydroflow_plus_test/examples/simple_cluster.rs @@ -6,7 +6,7 @@ use hydro_deploy::{Deployment, Host, HydroflowCrate}; use hydroflow_plus_cli_integration::{DeployClusterSpec, DeployProcessSpec}; use tokio::sync::RwLock; -type HostCreator = Box Arc>>; +type HostCreator = Box Arc>; // run with no args for localhost, with `gcp ` for GCP #[tokio::main] @@ -19,7 +19,7 @@ async fn main() { let network = Arc::new(RwLock::new(GcpNetwork::new(&project, None))); ( - Box::new(move |deployment| -> Arc> { + Box::new(move |deployment| -> Arc { deployment.GcpComputeEngineHost( &project, "e2-micro", @@ -34,7 +34,7 @@ async fn main() { } else { let localhost = deployment.borrow_mut().Localhost(); ( - Box::new(move |_| -> Arc> { localhost.clone() }), + Box::new(move |_| -> Arc { localhost.clone() }), "dev", ) };