diff --git a/tools/utils/src/push_to_uplink.rs b/tools/utils/src/push_to_uplink.rs index 4d292286..4e882de7 100644 --- a/tools/utils/src/push_to_uplink.rs +++ b/tools/utils/src/push_to_uplink.rs @@ -17,7 +17,7 @@ async fn main() { fn argv_to_payload(pairs: &[String]) -> Value { // nlici - let stream = pairs.get(0).unwrap(); + let stream = pairs.first().unwrap(); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; let kv_count = pairs.len() - 1; assert_eq!(kv_count % 2, 0); diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index d6e261e9..9f3a2a91 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -1,4 +1,4 @@ -use flume::{Receiver, Sender}; +use flume::{bounded, Receiver, Sender}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -101,20 +101,21 @@ impl Bridge { (self.actions.ctrl_tx(), self.data.ctrl_tx()) } - pub fn register_action_route( - &mut self, - route: ActionRoute, - actions_tx: Sender, - ) -> Result<(), Error> { - self.actions.register_action_route(route, actions_tx) + pub fn register_action_route(&mut self, route: ActionRoute) -> Result, Error> { + let (actions_tx, actions_rx) = bounded(1); + self.actions.register_action_route(route, actions_tx)?; + + Ok(actions_rx) } pub fn register_action_routes, V: IntoIterator>( &mut self, routes: V, - actions_tx: Sender, - ) -> Result<(), Error> { - self.actions.register_action_routes(routes, actions_tx) + ) -> Result, Error> { + let (actions_tx, actions_rx) = bounded(1); + self.actions.register_action_routes(routes, actions_tx)?; + + Ok(actions_rx) } } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 0866294a..5ed189df 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -794,7 +794,7 @@ fn check_and_flush_metrics( metrics.set_disk_utilized(disk_utilized); // Send pending metrics. This signifies state change - while let Some(metrics) = pending.get(0) { + while let Some(metrics) = pending.front() { match metrics { SerializerMetrics::Main(metrics) => { // Always send pending metrics. They represent state changes diff --git a/uplink/src/collector/systemstats.rs b/uplink/src/collector/systemstats.rs index cf40fb56..d795515d 100644 --- a/uplink/src/collector/systemstats.rs +++ b/uplink/src/collector/systemstats.rs @@ -613,7 +613,7 @@ impl StatCollector { self.sys.refresh_processes(); let timestamp = clock() as u64; for (&id, p) in self.sys.processes() { - let name = p.cmd().get(0).map(|s| s.to_string()).unwrap_or(p.name().to_string()); + let name = p.cmd().first().map(|s| s.to_string()).unwrap_or(p.name().to_string()); if self.config.system_stats.process_names.contains(&name) { let payload = self.processes.push(id.as_u32(), p, name, timestamp); diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index e6f62a03..0d3c966d 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -428,14 +428,12 @@ impl Uplink { let route = ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Tunshell Client", move || tunshell_client.start()); if !self.config.downloader.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?; let file_downloader = FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?; spawn_named_thread("File Downloader", || file_downloader.start()); @@ -445,8 +443,7 @@ impl Uplink { spawn_named_thread("Device Shadow Generator", move || device_shadow.start()); if !self.config.ota_installer.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.ota_installer.actions)?; let ota_installer = OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone()); spawn_named_thread("OTA Installer", move || ota_installer.start()); @@ -458,8 +455,7 @@ impl Uplink { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone()); spawn_named_thread("Logger", || { if let Err(e) = logger.start() { @@ -474,8 +470,7 @@ impl Uplink { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let logger = Logcat::new(config, actions_rx, bridge_tx.clone()); spawn_named_thread("Logger", || { if let Err(e) = logger.start() { @@ -490,8 +485,7 @@ impl Uplink { }; if !self.config.processes.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.processes, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.processes)?; let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Process Handler", || { if let Err(e) = process_handler.start() { @@ -501,8 +495,7 @@ impl Uplink { } if !self.config.script_runner.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.script_runner, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.script_runner)?; let script_runner = ScriptRunner::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Script Runner", || { if let Err(e) = script_runner.start() { @@ -512,9 +505,8 @@ impl Uplink { } if let Some(checker_config) = &self.config.precondition_checks { - let (actions_tx, actions_rx) = bounded(1); + let actions_rx = bridge.register_action_routes(&checker_config.actions)?; let checker = PreconditionChecker::new(self.config.clone(), actions_rx, bridge_tx); - bridge.register_action_routes(&checker_config.actions, actions_tx)?; spawn_named_thread("Logger", || checker.start()); } diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 4b42ae21..e06aff13 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Error; -use flume::bounded; use log::info; use structopt::StructOpt; use tokio::time::sleep; @@ -131,25 +130,22 @@ fn main() -> Result<(), Error> { let mut tcpapps = vec![]; for (app, cfg) in config.tcpapps.clone() { - let mut route_rx = None; - if !cfg.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&cfg.actions, actions_tx)?; - route_rx = Some(actions_rx) - } + let route_rx = if !cfg.actions.is_empty() { + let actions_rx = bridge.register_action_routes(&cfg.actions)?; + Some(actions_rx) + } else { + None + }; tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx())); } - let simulator_actions = config.simulator.as_ref().and_then(|cfg| { - let mut route_rx = None; - if !cfg.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&cfg.actions, actions_tx).unwrap(); - route_rx = Some(actions_rx) + let simulator_actions = match &config.simulator { + Some(cfg) if !cfg.actions.is_empty() => { + let actions_rx = bridge.register_action_routes(&cfg.actions)?; + Some(actions_rx) } - - route_rx - }); + _ => None, + }; let ctrl_tx = uplink.spawn(bridge)?;