From c86f80046655ee73b34a4f1218a670906a97d630 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 5 Oct 2023 19:28:32 +0530 Subject: [PATCH 1/6] refactor: remove timeout information from apps --- Cargo.lock | 2 +- uplink/src/base/actions.rs | 4 ++++ uplink/src/base/bridge/actions_lane.rs | 24 +++++++++++++++----- uplink/src/collector/downloader.rs | 24 +++++++------------- uplink/src/collector/process.rs | 31 ++++++++++---------------- uplink/src/collector/script_runner.rs | 31 +++++++++----------------- uplink/src/lib.rs | 6 ++--- 7 files changed, 56 insertions(+), 66 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba9311e42..bffc0c59a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3008,7 +3008,7 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" [[package]] name = "uplink" -version = "2.8.0" +version = "2.8.1" dependencies = [ "anyhow", "async-trait", diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 46bf6c791..df6e8a7ec 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_json::json; +use tokio::time::Instant; use crate::{Payload, Point}; @@ -19,6 +20,9 @@ pub struct Action { pub name: String, // action payload. json. can be args/payload. depends on the invoked command pub payload: String, + // Instant at which action must be timedout + #[serde(skip)] + pub deadline: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 107060ce1..d0a59d21e 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -264,14 +264,14 @@ impl ActionsBridge { .get(&action.name) .ok_or_else(|| Error::NoRoute(action.name.clone()))?; - let duration = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?; + let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?; // current action left unchanged in case of new tunshell action if action.name == TUNSHELL_ACTION { self.parallel_actions.insert(action.action_id); return Ok(()); } - self.current_action = Some(CurrentAction::new(action, duration)); + self.current_action = Some(CurrentAction::new(action, deadline)); Ok(()) } @@ -378,11 +378,11 @@ struct CurrentAction { } impl CurrentAction { - pub fn new(action: Action, duration: Duration) -> CurrentAction { + pub fn new(action: Action, deadline: Instant) -> CurrentAction { CurrentAction { id: action.action_id.clone(), action, - timeout: Box::pin(time::sleep(duration)), + timeout: Box::pin(time::sleep_until(deadline)), } } @@ -416,10 +416,12 @@ pub struct ActionRouter { impl ActionRouter { #[allow(clippy::result_large_err)] - pub fn try_send(&self, action: Action) -> Result> { + pub fn try_send(&self, mut action: Action) -> Result> { + let deadline = Instant::now() + self.duration; + action.deadline = Some(deadline); self.actions_tx.try_send(action)?; - Ok(self.duration) + Ok(deadline) } } @@ -544,6 +546,7 @@ mod tests { kind: "test".to_string(), name: "route_1".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -567,6 +570,7 @@ mod tests { kind: "test".to_string(), name: "route_2".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -610,6 +614,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -624,6 +629,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -664,6 +670,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -727,6 +734,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -795,6 +803,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -805,6 +814,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -883,6 +893,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -893,6 +904,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), + deadline: None, }; actions_tx.send(action).unwrap(); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 27d73f1ba..9df59ebaf 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -54,9 +54,8 @@ use human_bytes::human_bytes; use log::{debug, error, info, trace, warn}; use reqwest::{Certificate, Client, ClientBuilder, Identity, Response}; use serde::{Deserialize, Serialize}; -use tokio::time::timeout; +use tokio::time::timeout_at; -use std::collections::HashMap; use std::fs::{metadata, remove_dir_all, File}; use std::sync::Arc; use std::time::{Duration, Instant}; @@ -100,7 +99,6 @@ pub struct FileDownloader { bridge_tx: BridgeTx, client: Client, sequence: u32, - timeouts: HashMap, } impl FileDownloader { @@ -125,17 +123,9 @@ impl FileDownloader { } .build()?; - let timeouts = config - .downloader - .actions - .iter() - .map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))) - .collect(); - Ok(Self { config: config.downloader.clone(), actions_rx, - timeouts, client, bridge_tx, sequence: 0, @@ -158,17 +148,17 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); - - let duration = match self.timeouts.get(&action.name) { - Some(t) => *t, + let deadline = match &action.deadline { + Some(d) => *d, _ => { - error!("Action: {} unconfigured", action.name); + error!("Unconfigured deadline: {}", action.name); continue; } }; // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries - match timeout(duration, self.run(action)).await { + + match timeout_at(deadline, self.run(action)).await { Ok(Err(e)) => self.forward_error(e).await, Err(_) => error!("Last download has timedout"), _ => {} @@ -488,6 +478,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), + deadline: None, }; std::thread::sleep(Duration::from_millis(10)); @@ -556,6 +547,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), + deadline: None, }; std::thread::sleep(Duration::from_millis(10)); diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index 3f1a97153..d2250bfde 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -4,15 +4,13 @@ use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout; +use tokio::time::timeout_at; use crate::base::bridge::BridgeTx; -use crate::{Action, ActionResponse, ActionRoute, Package}; +use crate::{Action, ActionResponse, Package}; -use std::collections::HashMap; use std::io; use std::process::Stdio; -use std::time::Duration; #[derive(Error, Debug)] pub enum Error { @@ -39,22 +37,11 @@ pub struct ProcessHandler { actions_rx: Receiver, // to send responses back to bridge bridge_tx: BridgeTx, - // to timeout actions as per action route configuration - timeouts: HashMap, } impl ProcessHandler { - pub fn new( - actions_rx: Receiver, - bridge_tx: BridgeTx, - action_routes: &[ActionRoute], - ) -> Self { - let timeouts = action_routes - .iter() - .map(|ActionRoute { name, timeout }| (name.to_owned(), Duration::from_secs(*timeout))) - .collect(); - - Self { actions_rx, bridge_tx, timeouts } + pub fn new(actions_rx: Receiver, bridge_tx: BridgeTx) -> Self { + Self { actions_rx, bridge_tx } } /// Run a process of specified command @@ -96,11 +83,17 @@ impl ProcessHandler { loop { let action = self.actions_rx.recv_async().await?; let command = String::from("tools/") + &action.name; - let duration = self.timeouts.get(&action.name).unwrap().to_owned(); + let deadline = match &action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", action.name); + continue; + } + }; // Spawn the action and capture its stdout, ignore timeouts let child = self.run(&action.action_id, &command, &action.payload).await?; - if let Ok(o) = timeout(duration, self.spawn_and_capture_stdout(child)).await { + if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await { o?; } else { error!("Process timedout: {command}; action_id = {}", action.action_id); diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index 9b248ee55..dd921f041 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -4,17 +4,15 @@ use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout; +use tokio::time::timeout_at; use super::downloader::DownloadFile; -use crate::base::{bridge::BridgeTx, ActionRoute}; +use crate::base::bridge::BridgeTx; use crate::{Action, ActionResponse, Package}; -use std::collections::HashMap; use std::io; use std::path::PathBuf; use std::process::Stdio; -use std::time::Duration; #[derive(Error, Debug)] pub enum Error { @@ -40,19 +38,12 @@ pub struct ScriptRunner { actions_rx: Receiver, // to send responses back to bridge bridge_tx: BridgeTx, - timeouts: HashMap, sequence: u32, } impl ScriptRunner { - pub fn new( - routes: Vec, - actions_rx: Receiver, - bridge_tx: BridgeTx, - ) -> Self { - let timeouts = - routes.iter().map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout))).collect(); - Self { actions_rx, bridge_tx, timeouts, sequence: 0 } + pub fn new(actions_rx: Receiver, bridge_tx: BridgeTx) -> Self { + Self { actions_rx, bridge_tx, sequence: 0 } } /// Spawn a child process to run the script with sh @@ -128,17 +119,17 @@ impl ScriptRunner { continue; } }; - let duration = match self.timeouts.get(&action.name) { + let deadline = match &action.deadline { Some(d) => *d, _ => { - error!("Unconfigured action: {}", action.name); + error!("Unconfigured deadline: {}", action.name); continue; } }; // Spawn the action and capture its stdout let child = self.run(command).await?; if let Ok(o) = - timeout(duration, self.spawn_and_capture_stdout(child, &action.action_id)).await + timeout_at(deadline, self.spawn_and_capture_stdout(child, &action.action_id)).await { o? } @@ -181,8 +172,7 @@ mod tests { let (bridge_tx, status_rx) = create_bridge(); let (actions_tx, actions_rx) = bounded(1); - let routes = vec![ActionRoute { name: "test".to_string(), timeout: 100 }]; - let script_runner = ScriptRunner::new(routes, actions_rx, bridge_tx); + let script_runner = ScriptRunner::new(actions_rx, bridge_tx); thread::spawn(move || script_runner.start().unwrap()); actions_tx @@ -191,6 +181,7 @@ mod tests { kind: "1".to_string(), name: "test".to_string(), payload: "".to_string(), + deadline: None, }) .unwrap(); @@ -204,8 +195,7 @@ mod tests { let (bridge_tx, status_rx) = create_bridge(); let (actions_tx, actions_rx) = bounded(1); - let routes = vec![ActionRoute { name: "test".to_string(), timeout: 100 }]; - let script_runner = ScriptRunner::new(routes, actions_rx, bridge_tx); + let script_runner = ScriptRunner::new(actions_rx, bridge_tx); thread::spawn(move || script_runner.start().unwrap()); @@ -216,6 +206,7 @@ mod tests { name: "test".to_string(), payload: "{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}" .to_string(), + deadline: None, }) .unwrap(); diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index 28a6fa6ec..6fe6bd14b 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -476,8 +476,7 @@ impl Uplink { if !self.config.processes.is_empty() { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_routes(&self.config.processes, actions_tx)?; - let process_handler = - ProcessHandler::new(actions_rx, bridge_tx.clone(), &self.config.processes); + let process_handler = ProcessHandler::new(actions_rx, bridge_tx.clone()); thread::spawn(move || { if let Err(e) = process_handler.start() { error!("Process handler stopped!! Error = {:?}", e); @@ -488,8 +487,7 @@ impl Uplink { if !self.config.script_runner.is_empty() { let (actions_tx, actions_rx) = bounded(1); bridge.register_action_routes(&self.config.script_runner, actions_tx)?; - let script_runner = - ScriptRunner::new(self.config.script_runner.clone(), actions_rx, bridge_tx); + let script_runner = ScriptRunner::new(actions_rx, bridge_tx); thread::spawn(move || { if let Err(e) = script_runner.start() { error!("Script runner stopped!! Error = {:?}", e); From c7677b0f633f13b0bd5922fb31ffc032bfafe32e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Oct 2023 22:07:22 +0530 Subject: [PATCH 2/6] doc: comment describing `ActionRoute.try_send` --- uplink/src/base/bridge/actions_lane.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index d0a59d21e..9b17ea909 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -416,6 +416,7 @@ pub struct ActionRouter { impl ActionRouter { #[allow(clippy::result_large_err)] + /// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete pub fn try_send(&self, mut action: Action) -> Result> { let deadline = Instant::now() + self.duration; action.deadline = Some(deadline); From e9aac6dee0b4e7a7cc0ef9faa51dea54ad90fd7c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Oct 2023 22:19:10 +0530 Subject: [PATCH 3/6] refactor: don't use optional --- uplink/src/base/actions.rs | 8 ++++++-- uplink/src/base/bridge/actions_lane.rs | 22 +++++++++++----------- uplink/src/collector/downloader.rs | 17 +++++------------ uplink/src/collector/process.rs | 9 +-------- uplink/src/collector/script_runner.rs | 15 +++++---------- 5 files changed, 28 insertions(+), 43 deletions(-) diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index df6e8a7ec..e0d9dbb14 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -6,6 +6,10 @@ use crate::{Payload, Point}; use super::clock; +fn now() -> Instant { + Instant::now() +} + /// On the Bytebeam platform, an Action is how beamd and through it, /// the end-user, can communicate the tasks they want to perform on /// said device, in this case, uplink. @@ -21,8 +25,8 @@ pub struct Action { // action payload. json. can be args/payload. depends on the invoked command pub payload: String, // Instant at which action must be timedout - #[serde(skip)] - pub deadline: Option, + #[serde(skip, default = "now")] + pub deadline: Instant, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 9b17ea909..daea83a60 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -419,7 +419,7 @@ impl ActionRouter { /// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete pub fn try_send(&self, mut action: Action) -> Result> { let deadline = Instant::now() + self.duration; - action.deadline = Some(deadline); + action.deadline = deadline; self.actions_tx.try_send(action)?; Ok(deadline) @@ -547,7 +547,7 @@ mod tests { kind: "test".to_string(), name: "route_1".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action_1).unwrap(); @@ -571,7 +571,7 @@ mod tests { kind: "test".to_string(), name: "route_2".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action_2).unwrap(); @@ -615,7 +615,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action_1).unwrap(); @@ -630,7 +630,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action_2).unwrap(); @@ -671,7 +671,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action).unwrap(); @@ -735,7 +735,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action).unwrap(); @@ -804,7 +804,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action).unwrap(); @@ -815,7 +815,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action).unwrap(); @@ -894,7 +894,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action).unwrap(); @@ -905,7 +905,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: None, + deadline: Instant::now(), }; actions_tx.send(action).unwrap(); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 9df59ebaf..cad0155f8 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -54,11 +54,11 @@ use human_bytes::human_bytes; use log::{debug, error, info, trace, warn}; use reqwest::{Certificate, Client, ClientBuilder, Identity, Response}; use serde::{Deserialize, Serialize}; -use tokio::time::timeout_at; +use tokio::time::{timeout_at, Instant}; use std::fs::{metadata, remove_dir_all, File}; use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::time::Duration; #[cfg(unix)] use std::{ fs::{create_dir, set_permissions, Permissions}, @@ -148,17 +148,10 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries - match timeout_at(deadline, self.run(action)).await { + match timeout_at(action.deadline, self.run(action)).await { Ok(Err(e)) => self.forward_error(e).await, Err(_) => error!("Last download has timedout"), _ => {} @@ -478,7 +471,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), - deadline: None, + deadline: Instant::now(), }; std::thread::sleep(Duration::from_millis(10)); @@ -547,7 +540,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), - deadline: None, + deadline: Instant::now(), }; std::thread::sleep(Duration::from_millis(10)); diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index d2250bfde..c06560b17 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -83,17 +83,10 @@ impl ProcessHandler { loop { let action = self.actions_rx.recv_async().await?; let command = String::from("tools/") + &action.name; - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // Spawn the action and capture its stdout, ignore timeouts let child = self.run(&action.action_id, &command, &action.payload).await?; - if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await { + if let Ok(o) = timeout_at(action.deadline, self.spawn_and_capture_stdout(child)).await { o?; } else { error!("Process timedout: {command}; action_id = {}", action.action_id); diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index dd921f041..31fbe34a3 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -119,17 +119,11 @@ impl ScriptRunner { continue; } }; - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // Spawn the action and capture its stdout let child = self.run(command).await?; if let Ok(o) = - timeout_at(deadline, self.spawn_and_capture_stdout(child, &action.action_id)).await + timeout_at(action.deadline, self.spawn_and_capture_stdout(child, &action.action_id)) + .await { o? } @@ -155,6 +149,7 @@ mod tests { }; use flume::bounded; + use tokio::time::Instant; fn create_bridge() -> (BridgeTx, Receiver) { let (data_tx, _) = flume::bounded(2); @@ -181,7 +176,7 @@ mod tests { kind: "1".to_string(), name: "test".to_string(), payload: "".to_string(), - deadline: None, + deadline: Instant::now(), }) .unwrap(); @@ -206,7 +201,7 @@ mod tests { name: "test".to_string(), payload: "{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}" .to_string(), - deadline: None, + deadline: Instant::now(), }) .unwrap(); From a39ec2a97818e3c6a951d24cc80cc6778fe93fab Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 9 Oct 2023 21:20:39 +0530 Subject: [PATCH 4/6] test: remove unnecessary test --- uplink/src/collector/downloader.rs | 55 +----------------------------- 1 file changed, 1 insertion(+), 54 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index cad0155f8..973060983 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -396,7 +396,7 @@ impl DownloadState { #[cfg(test)] mod test { - use flume::{bounded, TrySendError}; + use flume::bounded; use serde_json::json; use std::{collections::HashMap, time::Duration}; @@ -501,57 +501,4 @@ mod test { } } } - - #[test] - fn multiple_actions_at_once() { - // Ensure path exists - std::fs::create_dir_all(DOWNLOAD_DIR).unwrap(); - // Prepare config - let mut path = PathBuf::from(DOWNLOAD_DIR); - path.push("download"); - let downloader_cfg = DownloaderConfig { - actions: vec![ActionRoute { name: "firmware_update".to_owned(), timeout: 10 }], - path, - }; - let config = config(downloader_cfg.clone()); - let (bridge_tx, _) = create_bridge(); - - // Create channels to forward and push actions on - let (download_tx, download_rx) = bounded(1); - let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap(); - - // Start FileDownloader in separate thread - std::thread::spawn(|| downloader.start()); - - // Create a firmware update action - let download_update = DownloadFile { - content_length: 0, - url: "https://github.com/bytebeamio/uplink/raw/main/docs/logo.png".to_string(), - file_name: "1.0".to_string(), - download_path: None, - }; - let mut expected_forward = download_update.clone(); - let mut path = downloader_cfg.path; - path.push("firmware_update"); - path.push("test.txt"); - expected_forward.download_path = Some(path); - let download_action = Action { - action_id: "1".to_string(), - kind: "firmware_update".to_string(), - name: "firmware_update".to_string(), - payload: json!(download_update).to_string(), - deadline: Instant::now(), - }; - - std::thread::sleep(Duration::from_millis(10)); - - // Send action to FileDownloader with Sender - download_tx.try_send(download_action.clone()).unwrap(); - - // Send action to FileDownloader immediately after, this must fail - match download_tx.try_send(download_action).unwrap_err() { - TrySendError::Full(_) => {} - TrySendError::Disconnected(_) => panic!("Unexpected disconnect"), - } - } } From 42245a13847d8062c2a7859c36713a8ab810b24f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 10 Oct 2023 22:51:17 +0530 Subject: [PATCH 5/6] test: set deadline to minute later --- uplink/src/collector/downloader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 973060983..2437da2af 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -471,7 +471,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), - deadline: Instant::now(), + deadline: Instant::now() + Duration::from_secs(60), }; std::thread::sleep(Duration::from_millis(10)); From 5f9f6b54ddc83b9af736afcc3c9f10ce59603240 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 13 Oct 2023 16:44:17 +0530 Subject: [PATCH 6/6] Revert "refactor: don't use optional" This reverts commit e9aac6dee0b4e7a7cc0ef9faa51dea54ad90fd7c. --- uplink/src/base/actions.rs | 8 ++------ uplink/src/base/bridge/actions_lane.rs | 22 +++++++++++----------- uplink/src/collector/downloader.rs | 11 +++++++++-- uplink/src/collector/process.rs | 9 ++++++++- uplink/src/collector/script_runner.rs | 15 ++++++++++----- 5 files changed, 40 insertions(+), 25 deletions(-) diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index e0d9dbb14..df6e8a7ec 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -6,10 +6,6 @@ use crate::{Payload, Point}; use super::clock; -fn now() -> Instant { - Instant::now() -} - /// On the Bytebeam platform, an Action is how beamd and through it, /// the end-user, can communicate the tasks they want to perform on /// said device, in this case, uplink. @@ -25,8 +21,8 @@ pub struct Action { // action payload. json. can be args/payload. depends on the invoked command pub payload: String, // Instant at which action must be timedout - #[serde(skip, default = "now")] - pub deadline: Instant, + #[serde(skip)] + pub deadline: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index daea83a60..9b17ea909 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -419,7 +419,7 @@ impl ActionRouter { /// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete pub fn try_send(&self, mut action: Action) -> Result> { let deadline = Instant::now() + self.duration; - action.deadline = deadline; + action.deadline = Some(deadline); self.actions_tx.try_send(action)?; Ok(deadline) @@ -547,7 +547,7 @@ mod tests { kind: "test".to_string(), name: "route_1".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -571,7 +571,7 @@ mod tests { kind: "test".to_string(), name: "route_2".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -615,7 +615,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -630,7 +630,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -671,7 +671,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -735,7 +735,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -804,7 +804,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -815,7 +815,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -894,7 +894,7 @@ mod tests { kind: "test".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action).unwrap(); @@ -905,7 +905,7 @@ mod tests { kind: "tunshell".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: Instant::now(), + deadline: None, }; actions_tx.send(action).unwrap(); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 2437da2af..831d2e5d4 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -148,10 +148,17 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); + let deadline = match &action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", action.name); + continue; + } + }; // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries - match timeout_at(action.deadline, self.run(action)).await { + match timeout_at(deadline, self.run(action)).await { Ok(Err(e)) => self.forward_error(e).await, Err(_) => error!("Last download has timedout"), _ => {} @@ -471,7 +478,7 @@ mod test { kind: "firmware_update".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), - deadline: Instant::now() + Duration::from_secs(60), + deadline: Some(Instant::now() + Duration::from_secs(60)), }; std::thread::sleep(Duration::from_millis(10)); diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index c06560b17..d2250bfde 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -83,10 +83,17 @@ impl ProcessHandler { loop { let action = self.actions_rx.recv_async().await?; let command = String::from("tools/") + &action.name; + let deadline = match &action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", action.name); + continue; + } + }; // Spawn the action and capture its stdout, ignore timeouts let child = self.run(&action.action_id, &command, &action.payload).await?; - if let Ok(o) = timeout_at(action.deadline, self.spawn_and_capture_stdout(child)).await { + if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await { o?; } else { error!("Process timedout: {command}; action_id = {}", action.action_id); diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index 31fbe34a3..dd921f041 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -119,11 +119,17 @@ impl ScriptRunner { continue; } }; + let deadline = match &action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", action.name); + continue; + } + }; // Spawn the action and capture its stdout let child = self.run(command).await?; if let Ok(o) = - timeout_at(action.deadline, self.spawn_and_capture_stdout(child, &action.action_id)) - .await + timeout_at(deadline, self.spawn_and_capture_stdout(child, &action.action_id)).await { o? } @@ -149,7 +155,6 @@ mod tests { }; use flume::bounded; - use tokio::time::Instant; fn create_bridge() -> (BridgeTx, Receiver) { let (data_tx, _) = flume::bounded(2); @@ -176,7 +181,7 @@ mod tests { kind: "1".to_string(), name: "test".to_string(), payload: "".to_string(), - deadline: Instant::now(), + deadline: None, }) .unwrap(); @@ -201,7 +206,7 @@ mod tests { name: "test".to_string(), payload: "{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}" .to_string(), - deadline: Instant::now(), + deadline: None, }) .unwrap();