Skip to content

Commit

Permalink
Merge branch 'main' into stateless
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi authored Oct 7, 2023
2 parents c86f800 + ddb1682 commit 02ed362
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 38 deletions.
61 changes: 27 additions & 34 deletions uplink/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ pub use base::{ActionRoute, Config};
pub use collector::{simulator, tcpjson::TcpJson};
pub use storage::Storage;

/// Spawn a named thread to run the function f on
pub fn spawn_named_thread<F>(name: &str, f: F)
where
F: FnOnce() + Send + 'static,
{
thread::Builder::new().name(name.to_string()).spawn(f).unwrap();
}

pub struct Uplink {
config: Arc<Config>,
action_rx: Receiver<Action>,
Expand Down Expand Up @@ -328,12 +336,8 @@ impl Uplink {

// Serializer thread to handle network conditions state machine
// and send data to mqtt thread
thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name("serializer")
.enable_time()
.build()
.unwrap();
spawn_named_thread("Serializer", || {
let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap();

rt.block_on(async {
if let Err(e) = serializer.start().await {
Expand All @@ -343,9 +347,8 @@ impl Uplink {
});

// Mqtt thread to receive actions and send data
thread::spawn(|| {
spawn_named_thread("Mqttio", || {
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name("mqttio")
.enable_time()
.enable_io()
.build()
Expand All @@ -365,12 +368,8 @@ impl Uplink {
);

// Metrics monitor thread
thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name("monitor")
.enable_time()
.build()
.unwrap();
spawn_named_thread("Monitor", || {
let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap();

rt.block_on(async move {
if let Err(e) = monitor.start().await {
Expand All @@ -382,12 +381,8 @@ impl Uplink {
let Bridge { data: mut data_lane, actions: mut actions_lane } = bridge;

// Bridge thread to direct actions
thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name("bridge_actions_lane")
.enable_time()
.build()
.unwrap();
spawn_named_thread("Bridge actions_lane", || {
let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap();

rt.block_on(async move {
if let Err(e) = actions_lane.start().await {
Expand All @@ -397,12 +392,8 @@ impl Uplink {
});

// Bridge thread to batch and forward data
thread::spawn(|| {
let rt = tokio::runtime::Builder::new_current_thread()
.thread_name("bridge_data_lane")
.enable_time()
.build()
.unwrap();
spawn_named_thread("Bridge data_lane", || {
let rt = tokio::runtime::Builder::new_current_thread().enable_time().build().unwrap();

rt.block_on(async move {
if let Err(e) = data_lane.start().await {
Expand All @@ -421,25 +412,25 @@ impl Uplink {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone());
thread::spawn(move || tunshell_client.start());
spawn_named_thread("Tunshell Client", move || tunshell_client.start());

if !self.config.downloader.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.downloader.actions, actions_tx)?;
let file_downloader =
FileDownloader::new(self.config.clone(), actions_rx, bridge_tx.clone())?;
thread::spawn(move || file_downloader.start());
spawn_named_thread("File Downloader", || file_downloader.start());
}

let device_shadow = DeviceShadow::new(self.config.device_shadow.clone(), bridge_tx.clone());
thread::spawn(move || device_shadow.start());
spawn_named_thread("Device Shadow Generator", move || device_shadow.start());

if !self.config.ota_installer.actions.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.ota_installer.actions, actions_tx)?;
let ota_installer =
OTAInstaller::new(self.config.ota_installer.clone(), actions_rx, bridge_tx.clone());
thread::spawn(move || ota_installer.start());
spawn_named_thread("OTA Installer", move || ota_installer.start());
}

#[cfg(target_os = "linux")]
Expand All @@ -448,7 +439,7 @@ impl Uplink {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone());
thread::spawn(move || {
spawn_named_thread("Logger", || {
if let Err(e) = logger.start() {
error!("Logger stopped!! Error = {:?}", e);
}
Expand All @@ -461,7 +452,7 @@ impl Uplink {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_route(route, actions_tx)?;
let logger = Logcat::new(config, actions_rx, bridge_tx.clone());
thread::spawn(move || {
spawn_named_thread("Logger", || {
if let Err(e) = logger.start() {
error!("Logger stopped!! Error = {:?}", e);
}
Expand All @@ -470,14 +461,15 @@ impl Uplink {

if self.config.system_stats.enabled {
let stat_collector = StatCollector::new(self.config.clone(), bridge_tx.clone());
thread::spawn(move || stat_collector.start());
spawn_named_thread("Stat Collector", || stat_collector.start());
};

if !self.config.processes.is_empty() {
let (actions_tx, actions_rx) = bounded(1);
bridge.register_action_routes(&self.config.processes, actions_tx)?;

let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone());
thread::spawn(move || {
spawn_named_thread("Process Handler", || {
if let Err(e) = process_handler.start() {
error!("Process handler stopped!! Error = {:?}", e);
}
Expand All @@ -489,6 +481,7 @@ impl Uplink {
bridge.register_action_routes(&self.config.script_runner, actions_tx)?;
let script_runner = ScriptRunner::new(actions_rx, bridge_tx);
thread::spawn(move || {
spawn_named_thread("Script Runner", || {
if let Err(e) = script_runner.start() {
error!("Script runner stopped!! Error = {:?}", e);
}
Expand Down
9 changes: 5 additions & 4 deletions uplink/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
mod console;

use std::sync::Arc;
use std::thread;
use std::time::Duration;

use anyhow::Error;
Expand All @@ -19,7 +18,7 @@ pub type ReloadHandle =

use uplink::base::AppConfig;
use uplink::config::{get_configs, initialize, CommandLine};
use uplink::{simulator, Config, TcpJson, Uplink};
use uplink::{simulator, spawn_named_thread, Config, TcpJson, Uplink};

fn initialize_logging(commandline: &CommandLine) -> ReloadHandle {
let level = match commandline.verbose {
Expand Down Expand Up @@ -156,15 +155,17 @@ fn main() -> Result<(), Error> {

if let Some(config) = config.simulator.clone() {
let bridge_tx = bridge_tx.clone();
thread::spawn(move || {
spawn_named_thread("Simulator", || {
simulator::start(config, bridge_tx, simulator_actions).unwrap();
});
}

if config.console.enabled {
let port = config.console.port;
let bridge_tx = bridge_tx.clone();
thread::spawn(move || console::start(port, reload_handle, bridge_tx));
spawn_named_thread("Uplink Console", move || {
console::start(port, reload_handle, bridge_tx)
});
}

let rt = tokio::runtime::Builder::new_current_thread()
Expand Down

0 comments on commit 02ed362

Please sign in to comment.