diff --git a/configs/config.toml b/configs/config.toml index d97ff2a44..de88c5463 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -9,7 +9,7 @@ action_redirections = { "firmware_update" = "install_update", "send_file" = "loa # Script runner allows users to trigger an action that will run an already downloaded # file as described by the download_path field of the JSON payload of a download action. -script_runner = [{ name = "run_script" }] +script_runner = [{ name = "run_script", timeout = 10 }] # Location on disk for persisted streams to write backlogs into, also used to write persistence_path = "/tmp/uplink/" @@ -141,7 +141,7 @@ priority = 255 # - actions: List of actions names that can trigger the downloader, with configurable timeouts # - path: Location in fs where the files are downloaded into [downloader] -actions = [{ name = "update_firmware" }, { name = "send_file" }, { name = "send_script" }] +actions = [{ name = "update_firmware" }, { name = "send_file", timeout = 10 }, { name = "send_script" }] path = "/var/tmp/ota-file" # Configurations associated with the system stats module of uplink, if enabled diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index c1767485d..985f24cb8 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use tokio::time::Instant; use crate::{Payload, Point}; @@ -17,9 +16,6 @@ pub struct Action { pub name: String, // action payload. json. can be args/payload. depends on the invoked command pub payload: String, - // Instant at which action must be timedout - #[serde(skip)] - pub deadline: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -115,7 +111,7 @@ impl Point for ActionResponse { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct Cancellation { pub action_id: String, pub name: String, diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 7657c671a..0120e2ec4 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -165,18 +165,49 @@ impl ActionsBridge { self.handle_action(action).await; } + response = self.status_rx.recv_async() => { let response = response?; self.forward_action_response(response).await; } + _ = &mut self.current_action.as_mut().map(|a| &mut a.timeout).unwrap_or(&mut end) => { - let action = self.current_action.take().unwrap(); - error!("Timeout waiting for action response. Action ID = {}", action.id); - self.forward_action_error(action.action, Error::ActionTimeout).await; + let curret_action = self.current_action.as_mut().unwrap(); + let action = curret_action.action.clone(); + + let route = self + .action_routes + .get(&action.name) + .expect("Action shouldn't be in execution if it can't be routed!"); + + if !route.is_cancellable() { + // Directly send timeout failure response if handler doesn't allow action cancellation + error!("Timeout waiting for action response. Action ID = {}", action.action_id); + self.forward_action_error(action, Error::ActionTimeout).await; + + // Remove action because it timedout + self.clear_current_action(); + continue; + } + + let cancellation = Cancellation { action_id: action.action_id.clone(), name: action.name }; + let payload = serde_json::to_string(&cancellation)?; + let cancel_action = Action { + action_id: "timeout".to_owned(), // Describes cause of action cancellation. NOTE: Action handler shouldn't expect an integer. + name: "cancel-action".to_owned(), + payload, + }; + if route.try_send(cancel_action).is_err() { + error!("Couldn't cancel action ({}) on timeout: {}", action.action_id, Error::UnresponsiveReceiver); + // Remove action anyways + self.clear_current_action(); + continue; + } - // Remove action because it timedout - self.clear_current_action() + // set timeout to end of time in wait of cancellation response + curret_action.timeout = Box::pin(time::sleep(Duration::from_secs(u64::MAX))); } + // Flush streams that timeout Some(timedout_stream) = self.streams.stream_timeouts.next(), if self.streams.stream_timeouts.has_pending() => { debug!("Flushing stream = {}", timedout_stream); @@ -184,12 +215,14 @@ impl ActionsBridge { error!("Failed to flush stream = {}. Error = {}", timedout_stream, e); } } + // Flush all metrics when timed out _ = metrics_timeout.tick() => { if let Err(e) = self.streams.check_and_flush_metrics() { debug!("Failed to flush stream metrics. Error = {}", e); } } + // Handle a shutdown signal _ = self.ctrl_rx.recv_async() => { if let Err(e) = self.save_current_action() { @@ -500,9 +533,8 @@ pub struct ActionRouter { impl ActionRouter { #[allow(clippy::result_large_err)] /// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete - pub fn try_send(&self, mut action: Action) -> Result> { + pub fn try_send(&self, action: Action) -> Result> { let deadline = Instant::now() + self.duration; - action.deadline = Some(deadline); self.actions_tx.try_send(action)?; Ok(deadline) @@ -646,7 +678,6 @@ mod tests { action_id: "1".to_string(), name: "route_1".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -669,7 +700,6 @@ mod tests { action_id: "2".to_string(), name: "route_2".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -716,7 +746,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -730,7 +759,6 @@ mod tests { action_id: "2".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -774,7 +802,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -845,7 +872,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -921,7 +947,6 @@ mod tests { action_id: "1".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -931,7 +956,6 @@ mod tests { action_id: "2".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -1017,7 +1041,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -1027,7 +1050,6 @@ mod tests { action_id: "2".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 95538971d..2ab63bd39 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -56,7 +56,7 @@ use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identit use rsa::sha2::{Digest, Sha256}; use serde::{Deserialize, Serialize}; use tokio::select; -use tokio::time::{timeout_at, Instant}; +use tokio::time::Instant; use std::fs::{metadata, read, remove_dir_all, remove_file, write, File}; use std::io; @@ -94,8 +94,8 @@ pub enum Error { BadSave, #[error("Save file doesn't exist")] NoSave, - #[error("Download has been cancelled")] - Cancelled, + #[error("Download has been cancelled by '{0}'")] + Cancelled(String), } /// This struct contains the necessary components to download and store file as notified by a download file @@ -205,18 +205,14 @@ impl FileDownloader { // Accepts `DownloadState`, sets a timeout for the action async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> { let shutdown_rx = self.shutdown_rx.clone(); - let deadline = match &state.current.action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", state.current.action.name); - return Ok(()); - } - }; select! { + // Wait till download completes + o = self.continuous_retry(state) => o?, + // Cancel download on receiving cancel action, e.g. on action timeout Ok(action) = self.actions_rx.recv_async() => { if action.name != "cancel-action" { warn!("Unexpected action: {action:?}"); - unreachable!("Only cancel-action should be sent to downloader while it is handling another downlaod!!"); + unreachable!("Only cancel-action should be sent to downloader while it is handling another download!!"); } let cancellation: Cancellation = serde_json::from_str(&action.payload)?; @@ -228,7 +224,7 @@ impl FileDownloader { trace!("Deleting partially downloaded file: {cancellation:?}"); state.clean()?; - return Err(Error::Cancelled); + return Err(Error::Cancelled(action.action_id)); }, Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => { @@ -238,12 +234,6 @@ impl FileDownloader { return Ok(()); }, - - // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries - o = timeout_at(deadline, self.continuous_retry(state)) => match o { - Ok(r) => r?, - Err(_) => error!("Last download has timedout"), - } } state.current.meta.verify_checksum()?; @@ -386,7 +376,6 @@ impl DownloadFile { struct CurrentDownload { action: Action, meta: DownloadFile, - time_left: Option, } // A temporary structure to help us retry downloads @@ -439,7 +428,7 @@ impl DownloadState { human_bytes(meta.content_length as f64) ); meta.download_path = Some(file_path); - let current = CurrentDownload { action, meta, time_left: None }; + let current = CurrentDownload { action, meta }; Ok(Self { current, @@ -459,9 +448,7 @@ impl DownloadState { } let read = read(&path)?; - let mut current: CurrentDownload = serde_json::from_slice(&read)?; - // Calculate deadline based on written time left - current.action.deadline = current.time_left.map(|t| Instant::now() + t); + let current: CurrentDownload = serde_json::from_slice(&read)?; // Unwrap is ok here as it is expected to be set for actions once received let file = File::open(current.meta.download_path.as_ref().unwrap())?; @@ -483,9 +470,7 @@ impl DownloadState { return Ok(()); } - let mut current = self.current.clone(); - // Calculate time left based on deadline - current.time_left = current.action.deadline.map(|t| t.duration_since(Instant::now())); + let current = self.current.clone(); let json = serde_json::to_vec(¤t)?; let mut path = config.path.clone(); @@ -642,7 +627,6 @@ mod test { action_id: "1".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), - deadline: Some(Instant::now() + Duration::from_secs(60)), }; std::thread::sleep(Duration::from_millis(10)); @@ -703,7 +687,6 @@ mod test { action_id: "1".to_string(), name: "firmware_update".to_string(), payload: json!(correct_update).to_string(), - deadline: Some(Instant::now() + Duration::from_secs(100)), }; // Send the correct action to FileDownloader @@ -741,7 +724,6 @@ mod test { action_id: "1".to_string(), name: "firmware_update".to_string(), payload: json!(wrong_update).to_string(), - deadline: Some(Instant::now() + Duration::from_secs(100)), }; // Send the wrong action to FileDownloader diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index f1aed754e..c52dc7cf0 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -1,11 +1,11 @@ use flume::{Receiver, RecvError, SendError}; -use log::{debug, error, info}; +use log::{debug, error, info, trace, warn}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout_at; +use crate::base::actions::Cancellation; use crate::base::bridge::BridgeTx; use crate::{Action, ActionResponse, Package}; @@ -26,6 +26,8 @@ pub enum Error { Busy, #[error("No stdout in spawned action")] NoStdout, + #[error("Process has been cancelled by '{0}'")] + Cancelled(String), } /// Process abstracts functions to spawn process and handle their output @@ -55,25 +57,46 @@ impl ProcessHandler { } /// Capture stdout of the running process in a spawned task - pub async fn spawn_and_capture_stdout(&mut self, mut child: Child) -> Result<(), Error> { + pub async fn spawn_and_capture_stdout( + &mut self, + mut child: Child, + action_id: &str, + ) -> Result<(), Error> { let stdout = child.stdout.take().ok_or(Error::NoStdout)?; let mut stdout = BufReader::new(stdout).lines(); loop { select! { - Ok(Some(line)) = stdout.next_line() => { + Ok(Some(line)) = stdout.next_line() => { let status: ActionResponse = match serde_json::from_str(&line) { Ok(status) => status, - Err(e) => ActionResponse::failure("dummy", e.to_string()), + Err(e) => ActionResponse::failure(action_id, e.to_string()), }; debug!("Action status: {:?}", status); self.bridge_tx.send_action_response(status).await; - } - status = child.wait() => { + } + status = child.wait() => { info!("Action done!! Status = {:?}", status); return Ok(()); }, + // Cancel process on receiving cancel action, e.g. on action timeout + Ok(action) = self.actions_rx.recv_async() => { + if action.name != "cancel-action" { + warn!("Unexpected action: {action:?}"); + unreachable!("Only cancel-actions are acceptable!!"); + } + + let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + if cancellation.action_id != action_id { + warn!("Unexpected action: {action:?}"); + unreachable!("Cancel actions meant for current action only are acceptable!!"); + } + + trace!("Cancelling process: '{}'", cancellation.action_id); + let status = ActionResponse::failure(action_id, Error::Cancelled(action.action_id).to_string()); + self.bridge_tx.send_action_response(status).await; + }, } } } @@ -83,21 +106,10 @@ impl ProcessHandler { loop { let action = self.actions_rx.recv_async().await?; let command = format!("tools/{}", action.name); - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // Spawn the action and capture its stdout, ignore timeouts let child = self.run(&action.action_id, &command, &action.payload).await?; - if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await { - o?; - } else { - error!("Process timedout: {command}; action_id = {}", action.action_id); - } + self.spawn_and_capture_stdout(child, &action.action_id).await?; } } } diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index db8f74ac1..63079df26 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -1,12 +1,12 @@ use flume::{Receiver, RecvError, SendError}; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout_at; use super::downloader::DownloadFile; +use crate::base::actions::Cancellation; use crate::base::bridge::BridgeTx; use crate::{Action, ActionResponse, Package}; @@ -28,6 +28,8 @@ pub enum Error { Busy, #[error("No stdout in spawned action")] NoStdout, + #[error("Script has been cancelled: '{0}'")] + Cancelled(String), } /// Script runner runs a script downloaded with FileDownloader and handles their output over the action_status stream. @@ -86,6 +88,23 @@ impl ScriptRunner { self.forward_status(ActionResponse::success(id)).await; break; }, + // Cancel script run on receiving cancel action, e.g. on action timeout + Ok(action) = self.actions_rx.recv_async() => { + if action.name != "cancel-action" { + warn!("Unexpected action: {action:?}"); + unreachable!("Only cancel-actions are acceptable!!"); + } + + let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + if cancellation.action_id != id { + warn!("Unexpected action: {action:?}"); + unreachable!("Cancel actions meant for current action only are acceptable!!"); + } + + trace!("Cancelling script: '{}'", cancellation.action_id); + let status = ActionResponse::failure(id, Error::Cancelled(action.action_id).to_string()); + self.bridge_tx.send_action_response(status).await; + }, } } @@ -119,20 +138,9 @@ impl ScriptRunner { continue; } }; - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // Spawn the action and capture its stdout let child = self.run(command).await?; - if let Ok(o) = - timeout_at(deadline, self.spawn_and_capture_stdout(child, &action.action_id)).await - { - o? - } + self.spawn_and_capture_stdout(child, &action.action_id).await? } } @@ -176,7 +184,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "".to_string(), - deadline: None, }) .unwrap(); @@ -200,7 +207,6 @@ mod tests { name: "test".to_string(), payload: "{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}" .to_string(), - deadline: None, }) .unwrap(); diff --git a/uplink/src/config.rs b/uplink/src/config.rs index 4fdac763c..bef2e116c 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -215,6 +215,7 @@ pub struct ActionRoute { #[serde_as(as = "DurationSeconds")] pub timeout: Duration, // Can the action handler cancel actions mid execution? + #[serde(default)] pub cancellable: bool, } diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 572fea502..0221110b3 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -181,6 +181,21 @@ impl CommandLine { config.actions_subscription = format!("/tenants/{tenant_id}/devices/{device_id}/actions"); + // downloader actions are cancellable by default + for route in config.downloader.actions.iter_mut() { + route.cancellable = true; + } + + // process actions are cancellable by default + for route in config.processes.iter_mut() { + route.cancellable = true; + } + + // script runner actions are cancellable by default + for route in config.script_runner.iter_mut() { + route.cancellable = true; + } + Ok(config) }