diff --git a/Cargo.lock b/Cargo.lock index ba9311e42..bffc0c59a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3008,7 +3008,7 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "uplink" -version = "2.8.0" +version = "2.8.1" dependencies = [ "anyhow", "async-trait", diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 46bf6c791..df6e8a7ec 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_json::json; +use tokio::time::Instant; use crate::{Payload, Point}; @@ -19,6 +20,9 @@ pub struct Action { pub name: String, // action payload. json. can be args/payload. depends on the invoked command pub payload: String, + // Instant at which action must be timedout + #[serde(skip)] + pub deadline: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 107060ce1..9b17ea909 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -264,14 +264,14 @@ impl ActionsBridge { .get(&action.name) .ok_or_else(|| Error::NoRoute(action.name.clone()))?; - let duration = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?; + let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?; // current action left unchanged in case of new tunshell action if action.name == TUNSHELL_ACTION { self.parallel_actions.insert(action.action_id); return Ok(()); } - self.current_action = Some(CurrentAction::new(action, duration)); + self.current_action = Some(CurrentAction::new(action, deadline)); Ok(()) } @@ -378,11 +378,11 @@ struct CurrentAction { } impl CurrentAction { - pub fn new(action: Action, duration: Duration) -> CurrentAction { + pub fn new(action: Action, deadline: Instant) -> CurrentAction { CurrentAction { id: action.action_id.clone(), action, - timeout: Box::pin(time::sleep(duration)), + timeout: Box::pin(time::sleep_until(deadline)), } } @@ -416,10 +416,13 @@ pub struct ActionRouter { impl ActionRouter { #[allow(clippy::result_large_err)] - pub fn try_send(&self, action: Action) -> Result> { + /// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete + pub fn try_send(&self, mut action: Action) -> Result> { + let deadline = Instant::now() + self.duration; + action.deadline = Some(deadline); self.actions_tx.try_send(action)?; - Ok(self.duration) + Ok(deadline) } } @@ -544,6 +547,7 @@ mod tests { kind: "test".to_string(), name: "route_1".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -567,6 +571,7 @@ mod tests { kind: "test".to_string(), name: "route_2".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -610,6 +615,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -624,6 +630,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -664,6 +671,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -727,6 +735,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -795,6 +804,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -805,6 +815,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -883,6 +894,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -893,6 +905,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 27d73f1ba..831d2e5d4 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -54,12 +54,11 @@ use human_bytes::human_bytes; use log::{debug, error, info, trace, warn}; use reqwest::{Certificate, Client, ClientBuilder, Identity, Response}; use serde::{Deserialize, Serialize}; -use tokio::time::timeout; +use tokio::time::{timeout_at, Instant}; -use std::collections::HashMap; use std::fs::{metadata, remove_dir_all, File}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; #[cfg(unix)] use std::{ fs::{create_dir, set_permissions, Permissions}, @@ -100,7 +99,6 @@ pub struct FileDownloader { bridge_tx: BridgeTx, client: Client, sequence: u32, - timeouts: HashMap, } impl FileDownloader { @@ -125,17 +123,9 @@ impl FileDownloader { } .build()?; - let timeouts = config - .downloader - .actions - .iter() - .map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))) - .collect(); - Ok(Self { config: config.downloader.clone(), actions_rx, - timeouts, client, bridge_tx, sequence: 0, @@ -158,17 +148,17 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); - - let duration = match self.timeouts.get(&action.name) { - Some(t) => *t, + let deadline = match &action.deadline { + Some(d) => *d, _ => { - error!("Action: {} unconfigured", action.name); + error!("Unconfigured deadline: {}", action.name); continue; } }; // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries - match timeout(duration, self.run(action)).await { + + match timeout_at(deadline, self.run(action)).await { Ok(Err(e)) => self.forward_error(e).await, Err(_) => error!("Last download has timedout"), _ => {} @@ -413,7 +403,7 @@ impl DownloadState { #[cfg(test)] mod test { - use flume::{bounded, TrySendError}; + use flume::bounded; use serde_json::json; use std::{collections::HashMap, time::Duration}; @@ -488,6 +478,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), + deadline: Some(Instant::now() + Duration::from_secs(60)), }; std::thread::sleep(Duration::from_millis(10)); @@ -517,56 +508,4 @@ mod test { } } } - - #[test] - fn multiple_actions_at_once() { - // Ensure path exists - std::fs::create_dir_all(DOWNLOAD_DIR).unwrap(); - // Prepare config - let mut path = PathBuf::from(DOWNLOAD_DIR); - path.push("download"); - let downloader_cfg = DownloaderConfig { - actions: vec![ActionRoute { name: "firmware_update".to_owned(), timeout: 10 }], - path, - }; - let config = config(downloader_cfg.clone()); - let (bridge_tx, _) = create_bridge(); - - // Create channels to forward and push actions on - let (download_tx, download_rx) = bounded(1); - let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap(); - - // Start FileDownloader in separate thread - std::thread::spawn(|| downloader.start()); - - // Create a firmware update action - let download_update = DownloadFile { - content_length: 0, - url: "https://github.com/bytebeamio/uplink/raw/main/docs/logo.png".to_string(), - file_name: "1.0".to_string(), - download_path: None, - }; - let mut expected_forward = download_update.clone(); - let mut path = downloader_cfg.path; - path.push("firmware_update"); - path.push("test.txt"); - expected_forward.download_path = Some(path); - let download_action = Action { - action_id: "1".to_string(), - kind: "firmware_update".to_string(), - name: "firmware_update".to_string(), - payload: json!(download_update).to_string(), - }; - - std::thread::sleep(Duration::from_millis(10)); - - // Send action to FileDownloader with Sender - download_tx.try_send(download_action.clone()).unwrap(); - - // Send action to FileDownloader immediately after, this must fail - match download_tx.try_send(download_action).unwrap_err() { - TrySendError::Full(_) => {} - TrySendError::Disconnected(_) => panic!("Unexpected disconnect"), - } - } } diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index 3f1a97153..d2250bfde 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -4,15 +4,13 @@ use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout; +use tokio::time::timeout_at; use crate::base::bridge::BridgeTx; -use crate::{Action, ActionResponse, ActionRoute, Package}; +use crate::{Action, ActionResponse, Package}; -use std::collections::HashMap; use std::io; use std::process::Stdio; -use std::time::Duration; #[derive(Error, Debug)] pub enum Error { @@ -39,22 +37,11 @@ pub struct ProcessHandler { actions_rx: Receiver, // to send responses back to bridge bridge_tx: BridgeTx, - // to timeout actions as per action route configuration - timeouts: HashMap, } impl ProcessHandler { - pub fn new( - actions_rx: Receiver, - bridge_tx: BridgeTx, - action_routes: &[ActionRoute], - ) -> Self { - let timeouts = action_routes - .iter() - .map(|ActionRoute { name, timeout }| (name.to_owned(), Duration::from_secs(*timeout))) - .collect(); - - Self { actions_rx, bridge_tx, timeouts } + pub fn new(actions_rx: Receiver, bridge_tx: BridgeTx) -> Self { + Self { actions_rx, bridge_tx } } /// Run a process of specified command @@ -96,11 +83,17 @@ impl ProcessHandler { loop { let action = self.actions_rx.recv_async().await?; let command = String::from("tools/") + &action.name; - let duration = self.timeouts.get(&action.name).unwrap().to_owned(); + let deadline = match &action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", action.name); + continue; + } + }; // Spawn the action and capture its stdout, ignore timeouts let child = self.run(&action.action_id, &command, &action.payload).await?; - if let Ok(o) = timeout(duration, self.spawn_and_capture_stdout(child)).await { + if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await { o?; } else { error!("Process timedout: {command}; action_id = {}", action.action_id); diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index 9b248ee55..dd921f041 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -4,17 +4,15 @@ use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout; +use tokio::time::timeout_at; use super::downloader::DownloadFile; -use crate::base::{bridge::BridgeTx, ActionRoute}; +use crate::base::bridge::BridgeTx; use crate::{Action, ActionResponse, Package}; -use std::collections::HashMap; use std::io; use std::path::PathBuf; use std::process::Stdio; -use std::time::Duration; #[derive(Error, Debug)] pub enum Error { @@ -40,19 +38,12 @@ pub struct ScriptRunner { actions_rx: Receiver, // to send responses back to bridge bridge_tx: BridgeTx, - timeouts: HashMap, sequence: u32, } impl ScriptRunner { - pub fn new( - routes: Vec, - actions_rx: Receiver, - bridge_tx: BridgeTx, - ) -> Self { - let timeouts = - routes.iter().map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))).collect(); - Self { actions_rx, bridge_tx, timeouts, sequence: 0 } + pub fn new(actions_rx: Receiver, bridge_tx: BridgeTx) -> Self { + Self { actions_rx, bridge_tx, sequence: 0 } } /// Spawn a child process to run the script with sh @@ -128,17 +119,17 @@ impl ScriptRunner { continue; } }; - let duration = match self.timeouts.get(&action.name) { + let deadline = match &action.deadline { Some(d) => *d, _ => { - error!("Unconfigured action: {}", action.name); + error!("Unconfigured deadline: {}", action.name); continue; } }; // Spawn the action and capture its stdout let child = self.run(command).await?; if let Ok(o) = - timeout(duration, self.spawn_and_capture_stdout(child, &action.action_id)).await + timeout_at(deadline, self.spawn_and_capture_stdout(child, &action.action_id)).await { o? } @@ -181,8 +172,7 @@ mod tests { let (bridge_tx, status_rx) = create_bridge(); let (actions_tx, actions_rx) = bounded(1); - let routes = vec![ActionRoute { name: "test".to_string(), timeout: 100 }]; - let script_runner = ScriptRunner::new(routes, actions_rx, bridge_tx); + let script_runner = ScriptRunner::new(actions_rx, bridge_tx); thread::spawn(move || script_runner.start().unwrap()); actions_tx @@ -191,6 +181,7 @@ mod tests { kind: "1".to_string(), name: "test".to_string(), payload: "".to_string(), + deadline: None, }) .unwrap(); @@ -204,8 +195,7 @@ mod tests { let (bridge_tx, status_rx) = create_bridge(); let (actions_tx, actions_rx) = bounded(1); - let routes = vec![ActionRoute { name: "test".to_string(), timeout: 100 }]; - let script_runner = ScriptRunner::new(routes, actions_rx, bridge_tx); + let script_runner = ScriptRunner::new(actions_rx, bridge_tx); thread::spawn(move || script_runner.start().unwrap()); @@ -216,6 +206,7 @@ mod tests { name: "test".to_string(), payload: "{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}" .to_string(), + deadline: None, }) .unwrap(); diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index c3fcaafe5..dc72a793a 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -467,8 +467,7 @@ impl Uplink { if !self.config.processes.is_empty() { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_routes(&self.config.processes, actions_tx)?; - let process_handler = - ProcessHandler::new(actions_rx, bridge_tx.clone(), &self.config.processes); + let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Process Handler", || { if let Err(e) = process_handler.start() { error!("Process handler stopped!! Error = {:?}", e); @@ -479,8 +478,7 @@ impl Uplink { if !self.config.script_runner.is_empty() { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_routes(&self.config.script_runner, actions_tx)?; - let script_runner = - ScriptRunner::new(self.config.script_runner.clone(), actions_rx, bridge_tx); + let script_runner = ScriptRunner::new(actions_rx, bridge_tx); spawn_named_thread("Script Runner", || { if let Err(e) = script_runner.start() { error!("Script runner stopped!! Error = {:?}", e);