From ddb16825f101cb107bb71242e9fa51fff10abf65 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Oct 2023 13:39:28 +0530 Subject: [PATCH] feat: name threads when spawning (#292) * feat: name threads when spawning * refactor: DRY naming threads --- uplink/src/lib.rs | 61 ++++++++++++++++++++-------------------------- uplink/src/main.rs | 9 ++++--- 2 files changed, 31 insertions(+), 39 deletions(-) diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 28a6fa6ec..c3fcaafe5 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -266,6 +266,14 @@ pub use base::{ActionRoute, Config}; pub use collector::{simulator, tcpjson::TcpJson}; pub use storage::Storage; +/// Spawn a named thread to run the function f on +pub fn spawn_named_thread(name: &str, f: F) +where + F: FnOnce() + Send + 'static, +{ + thread::Builder::new().name(name.to_string()).spawn(f).unwrap(); +} + pub struct Uplink { config: Arc, action_rx: Receiver, @@ -328,12 +336,8 @@ impl Uplink { // Serializer thread to handle network conditions state machine // and send data to mqtt thread - thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("serializer") - .enable_time() - .build() - .unwrap(); + spawn_named_thread("Serializer", || { + let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async { if let Err(e) = serializer.start().await { @@ -343,9 +347,8 @@ impl Uplink { }); // Mqtt thread to receive actions and send data - thread::spawn(|| { + spawn_named_thread("Mqttio", || { let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("mqttio") .enable_time() .enable_io() .build() @@ -365,12 +368,8 @@ impl Uplink { ); // Metrics monitor thread - thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("monitor") - .enable_time() - .build() - .unwrap(); + spawn_named_thread("Monitor", || { + let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { if let Err(e) = monitor.start().await { @@ -382,12 +381,8 @@ impl Uplink { let Bridge { data: mut data_lane, actions: mut actions_lane } = bridge; // Bridge thread to direct actions - thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("bridge_actions_lane") - .enable_time() - .build() - .unwrap(); + spawn_named_thread("Bridge actions_lane", || { + let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { if let Err(e) = actions_lane.start().await { @@ -397,12 +392,8 @@ impl Uplink { }); // Bridge thread to batch and forward data - thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .thread_name("bridge_data_lane") - .enable_time() - .build() - .unwrap(); + spawn_named_thread("Bridge data_lane", || { + let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap(); rt.block_on(async move { if let Err(e) = data_lane.start().await { @@ -421,25 +412,25 @@ impl Uplink { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_route(route, actions_tx)?; let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); - thread::spawn(move || tunshell_client.start()); + spawn_named_thread("Tunshell Client", move || tunshell_client.start()); if !self.config.downloader.actions.is_empty() { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?; let file_downloader = FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?; - thread::spawn(move || file_downloader.start()); + spawn_named_thread("File Downloader", || file_downloader.start()); } let device_shadow = DeviceShadow::new(self.config.device_shadow.clone(), bridge_tx.clone()); - thread::spawn(move || device_shadow.start()); + spawn_named_thread("Device Shadow Generator", move || device_shadow.start()); if !self.config.ota_installer.actions.is_empty() { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?; let ota_installer = OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone()); - thread::spawn(move || ota_installer.start()); + spawn_named_thread("OTA Installer", move || ota_installer.start()); } #[cfg(target_os = "linux")] @@ -448,7 +439,7 @@ impl Uplink { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_route(route, actions_tx)?; let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone()); - thread::spawn(move || { + spawn_named_thread("Logger", || { if let Err(e) = logger.start() { error!("Logger stopped!! Error = {:?}", e); } @@ -461,7 +452,7 @@ impl Uplink { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_route(route, actions_tx)?; let logger = Logcat::new(config, actions_rx, bridge_tx.clone()); - thread::spawn(move || { + spawn_named_thread("Logger", || { if let Err(e) = logger.start() { error!("Logger stopped!! Error = {:?}", e); } @@ -470,7 +461,7 @@ impl Uplink { if self.config.system_stats.enabled { let stat_collector = StatCollector::new(self.config.clone(), bridge_tx.clone()); - thread::spawn(move || stat_collector.start()); + spawn_named_thread("Stat Collector", || stat_collector.start()); }; if !self.config.processes.is_empty() { @@ -478,7 +469,7 @@ impl Uplink { bridge.register_action_routes(&self.config.processes, actions_tx)?; let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone(), &self.config.processes); - thread::spawn(move || { + spawn_named_thread("Process Handler", || { if let Err(e) = process_handler.start() { error!("Process handler stopped!! Error = {:?}", e); } @@ -490,7 +481,7 @@ impl Uplink { bridge.register_action_routes(&self.config.script_runner, actions_tx)?; let script_runner = ScriptRunner::new(self.config.script_runner.clone(), actions_rx, bridge_tx); - thread::spawn(move || { + spawn_named_thread("Script Runner", || { if let Err(e) = script_runner.start() { error!("Script runner stopped!! Error = {:?}", e); } diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 92bcb0e7c..06b400f4e 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -1,7 +1,6 @@ mod console; use std::sync::Arc; -use std::thread; use std::time::Duration; use anyhow::Error; @@ -19,7 +18,7 @@ pub type ReloadHandle = use uplink::base::AppConfig; use uplink::config::{get_configs, initialize, CommandLine}; -use uplink::{simulator, Config, TcpJson, Uplink}; +use uplink::{simulator, spawn_named_thread, Config, TcpJson, Uplink}; fn initialize_logging(commandline: &CommandLine) -> ReloadHandle { let level = match commandline.verbose { @@ -156,7 +155,7 @@ fn main() -> Result<(), Error> { if let Some(config) = config.simulator.clone() { let bridge_tx = bridge_tx.clone(); - thread::spawn(move || { + spawn_named_thread("Simulator", || { simulator::start(config, bridge_tx, simulator_actions).unwrap(); }); } @@ -164,7 +163,9 @@ fn main() -> Result<(), Error> { if config.console.enabled { let port = config.console.port; let bridge_tx = bridge_tx.clone(); - thread::spawn(move || console::start(port, reload_handle, bridge_tx)); + spawn_named_thread("Uplink Console", move || { + console::start(port, reload_handle, bridge_tx) + }); } let rt = tokio::runtime::Builder::new_current_thread()