From c23eab687ed247d210fea3fdb39f4b0d02a7488a Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 29 Jan 2024 23:20:21 +0530 Subject: [PATCH 1/2] ci: clippy suggestion --- tools/utils/src/push_to_uplink.rs | 2 +- uplink/src/base/serializer/mod.rs | 2 +- uplink/src/collector/systemstats.rs | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/utils/src/push_to_uplink.rs b/tools/utils/src/push_to_uplink.rs index 4d292286..4e882de7 100644 --- a/tools/utils/src/push_to_uplink.rs +++ b/tools/utils/src/push_to_uplink.rs @@ -17,7 +17,7 @@ async fn main() { fn argv_to_payload(pairs: &[String]) -> Value { // nlici - let stream = pairs.get(0).unwrap(); + let stream = pairs.first().unwrap(); let timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() as u64; let kv_count = pairs.len() - 1; assert_eq!(kv_count % 2, 0); diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index 0866294a..5ed189df 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -794,7 +794,7 @@ fn check_and_flush_metrics( metrics.set_disk_utilized(disk_utilized); // Send pending metrics. This signifies state change - while let Some(metrics) = pending.get(0) { + while let Some(metrics) = pending.front() { match metrics { SerializerMetrics::Main(metrics) => { // Always send pending metrics. They represent state changes diff --git a/uplink/src/collector/systemstats.rs b/uplink/src/collector/systemstats.rs index cf40fb56..d795515d 100644 --- a/uplink/src/collector/systemstats.rs +++ b/uplink/src/collector/systemstats.rs @@ -613,7 +613,7 @@ impl StatCollector { self.sys.refresh_processes(); let timestamp = clock() as u64; for (&id, p) in self.sys.processes() { - let name = p.cmd().get(0).map(|s| s.to_string()).unwrap_or(p.name().to_string()); + let name = p.cmd().first().map(|s| s.to_string()).unwrap_or(p.name().to_string()); if self.config.system_stats.process_names.contains(&name) { let payload = self.processes.push(id.as_u32(), p, name, timestamp); From f601a9bf5baf8cd71fba74b48767afbac45571ed Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 6 Feb 2024 22:08:32 +0530 Subject: [PATCH 2/2] refactor: action registration (#325) * refactor: action registration * refactor: make it readable, rm `mut` --- uplink/src/base/bridge/mod.rs | 21 +++++++++++---------- uplink/src/lib.rs | 21 +++++++-------------- uplink/src/main.rs | 28 ++++++++++++---------------- 3 files changed, 30 insertions(+), 40 deletions(-) diff --git a/uplink/src/base/bridge/mod.rs b/uplink/src/base/bridge/mod.rs index d6e261e9..9f3a2a91 100644 --- a/uplink/src/base/bridge/mod.rs +++ b/uplink/src/base/bridge/mod.rs @@ -1,4 +1,4 @@ -use flume::{Receiver, Sender}; +use flume::{bounded, Receiver, Sender}; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -101,20 +101,21 @@ impl Bridge { (self.actions.ctrl_tx(), self.data.ctrl_tx()) } - pub fn register_action_route( - &mut self, - route: ActionRoute, - actions_tx: Sender, - ) -> Result<(), Error> { - self.actions.register_action_route(route, actions_tx) + pub fn register_action_route(&mut self, route: ActionRoute) -> Result, Error> { + let (actions_tx, actions_rx) = bounded(1); + self.actions.register_action_route(route, actions_tx)?; + + Ok(actions_rx) } pub fn register_action_routes, V: IntoIterator>( &mut self, routes: V, - actions_tx: Sender, - ) -> Result<(), Error> { - self.actions.register_action_routes(routes, actions_tx) + ) -> Result, Error> { + let (actions_tx, actions_rx) = bounded(1); + self.actions.register_action_routes(routes, actions_tx)?; + + Ok(actions_rx) } } diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index a72727a8..6bfff3f5 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -427,14 +427,12 @@ impl Uplink { let route = ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Tunshell Client", move || tunshell_client.start()); if !self.config.downloader.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.downloader.actions)?; let file_downloader = FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?; spawn_named_thread("File Downloader", || file_downloader.start()); @@ -444,8 +442,7 @@ impl Uplink { spawn_named_thread("Device Shadow Generator", move || device_shadow.start()); if !self.config.ota_installer.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.ota_installer.actions)?; let ota_installer = OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone()); spawn_named_thread("OTA Installer", move || ota_installer.start()); @@ -457,8 +454,7 @@ impl Uplink { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone()); spawn_named_thread("Logger", || { if let Err(e) = logger.start() { @@ -473,8 +469,7 @@ impl Uplink { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), }; - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_route(route, actions_tx)?; + let actions_rx = bridge.register_action_route(route)?; let logger = Logcat::new(config, actions_rx, bridge_tx.clone()); spawn_named_thread("Logger", || { if let Err(e) = logger.start() { @@ -489,8 +484,7 @@ impl Uplink { }; if !self.config.processes.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.processes, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.processes)?; let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Process Handler", || { if let Err(e) = process_handler.start() { @@ -500,8 +494,7 @@ impl Uplink { } if !self.config.script_runner.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&self.config.script_runner, actions_tx)?; + let actions_rx = bridge.register_action_routes(&self.config.script_runner)?; let script_runner = ScriptRunner::new(actions_rx, bridge_tx); spawn_named_thread("Script Runner", || { if let Err(e) = script_runner.start() { diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 4b42ae21..e06aff13 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Error; -use flume::bounded; use log::info; use structopt::StructOpt; use tokio::time::sleep; @@ -131,25 +130,22 @@ fn main() -> Result<(), Error> { let mut tcpapps = vec![]; for (app, cfg) in config.tcpapps.clone() { - let mut route_rx = None; - if !cfg.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&cfg.actions, actions_tx)?; - route_rx = Some(actions_rx) - } + let route_rx = if !cfg.actions.is_empty() { + let actions_rx = bridge.register_action_routes(&cfg.actions)?; + Some(actions_rx) + } else { + None + }; tcpapps.push(TcpJson::new(app, cfg, route_rx, bridge.bridge_tx())); } - let simulator_actions = config.simulator.as_ref().and_then(|cfg| { - let mut route_rx = None; - if !cfg.actions.is_empty() { - let (actions_tx, actions_rx) = bounded(1); - bridge.register_action_routes(&cfg.actions, actions_tx).unwrap(); - route_rx = Some(actions_rx) + let simulator_actions = match &config.simulator { + Some(cfg) if !cfg.actions.is_empty() => { + let actions_rx = bridge.register_action_routes(&cfg.actions)?; + Some(actions_rx) } - - route_rx - }); + _ => None, + }; let ctrl_tx = uplink.spawn(bridge)?;