From 6126b4e9212375c1bb8f01ec93f1dc2cb51b3a42 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Apr 2024 20:23:47 +0530 Subject: [PATCH 01/24] feat: enable cancellations in downloader --- uplink/src/base/actions.rs | 6 ++++++ uplink/src/collector/downloader.rs | 28 +++++++++++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 1433a1eab..c1767485d 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -114,3 +114,9 @@ impl Point for ActionResponse { self.timestamp } } + +#[derive(Debug, Deserialize)] +pub struct Cancellation { + pub action_id: String, + pub name: String, +} diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 921885378..b53adc23c 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -69,6 +69,7 @@ use std::{ }; use std::{io::Write, path::PathBuf}; +use crate::base::actions::Cancellation; use crate::{base::bridge::BridgeTx, config::DownloaderConfig, Action, ActionResponse, Config}; #[derive(thiserror::Error, Debug)] @@ -207,6 +208,22 @@ impl FileDownloader { } }; select! { + Ok(action) = self.actions_rx.recv_async() => { + if action.name != "cancel-action" { + warn!("Unexpected action: {action:?}"); + unreachable!("Only cancel-action should be sent to downloader while it is handling another downlaod!!"); + } + + let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + if cancellation.action_id != self.action_id { + warn!("Unexpected action: {action:?}"); + unreachable!("Cancel actions meant for current action only should be sent to downloader!!"); + } + + trace!("Deleting partially downloaded file: {cancellation:?}"); + state.clean()?; + }, + // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries o = timeout_at(deadline, self.continuous_retry(state)) => match o { Ok(r) => r?, @@ -229,7 +246,7 @@ impl FileDownloader { } // A download must be retried with Range header when HTTP/reqwest errors are faced - async fn continuous_retry(&mut self, state: &mut DownloadState) -> Result<(), Error> { + async fn continuous_retry(&self, state: &mut DownloadState) -> Result<(), Error> { 'outer: loop { let mut req = self.client.get(&state.current.meta.url); if let Some(range) = state.retry_range() { @@ -438,6 +455,7 @@ impl DownloadState { // Calculate deadline based on written time left current.action.deadline = current.time_left.map(|t| Instant::now() + t); + // Unwrap is ok here as it is expected to be set for actions once received let file = File::open(current.meta.download_path.as_ref().unwrap())?; let bytes_written = file.metadata()?.len() as usize; @@ -469,6 +487,14 @@ impl DownloadState { Ok(()) } + /// Deletes contents of file + fn clean(&self) -> Result<(), Error> { + // Unwrap is ok here as it is expected to be set for actions once received + remove_file(self.current.meta.download_path.as_ref().unwrap())?; + + Ok(()) + } + fn retry_range(&self) -> Option { if self.bytes_written == 0 { return None; From 3f795dd58c5a44de71695941f1289a4ac98d4652 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Apr 2024 22:33:49 +0530 Subject: [PATCH 02/24] feat: handle `cancel-action` in bridge --- uplink/src/base/bridge/actions_lane.rs | 181 ++++++++++++++++++++----- uplink/src/collector/downloader.rs | 1 + uplink/src/config.rs | 2 + uplink/src/lib.rs | 9 +- 4 files changed, 158 insertions(+), 35 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 5fc0d5eea..c8d2cfd19 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -11,6 +11,7 @@ use std::{collections::HashMap, fmt::Debug, pin::Pin, sync::Arc, time::Duration} use super::streams::Streams; use super::{ActionBridgeShutdown, Package, StreamMetrics}; +use crate::base::actions::Cancellation; use crate::config::ActionRoute; use crate::{Action, ActionResponse, Config}; @@ -34,10 +35,16 @@ pub enum Error { Busy, #[error("Action Route clash: \"{0}\"")] ActionRouteClash(String), + #[error("Cancellation request received for action currently not in execution!")] + UnexpectedCancellation, + #[error("Cancellation request for action in execution, but names don't match!")] + CorruptedCancellation, + #[error("Cancellation request failed as action completed execution!")] + FailedCancellation, + #[error("Action cancelled by action_id: {0}")] + Cancelled(String), } -struct RedirectionError(Action); - pub struct ActionsBridge { /// All configuration config: Arc, @@ -103,10 +110,10 @@ impl ActionsBridge { pub fn register_action_route( &mut self, - ActionRoute { name, timeout: duration }: ActionRoute, + ActionRoute { name, timeout: duration, cancellable }: ActionRoute, actions_tx: Sender, ) -> Result<(), Error> { - let action_router = ActionRouter { actions_tx, duration }; + let action_router = ActionRouter { actions_tx, duration, cancellable }; if self.action_routes.insert(name.clone(), action_router).is_some() { return Err(Error::ActionRouteClash(name)); } @@ -149,7 +156,14 @@ impl ActionsBridge { select! { action = self.actions_rx.recv_async() => { let action = action?; + + if action.name == "cancel-action" && self.current_action.is_some() { + self.handle_cancellation(action).await?; + continue + } + self.handle_action(action).await; + } response = self.status_rx.recv_async() => { let response = response?; @@ -234,6 +248,42 @@ impl ActionsBridge { self.forward_action_error(action, error).await; } + /// Forwards cancellation request to the handler if it can handle the same, + /// else marks the current action as cancelled and avoids further redirections + async fn handle_cancellation(&mut self, action: Action) -> Result<(), Error> { + let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + if cancellation.action_id != self.current_action.as_ref().unwrap().id { + warn!("Unexpected cancellation: {cancellation:?}"); + self.forward_action_error(action, Error::UnexpectedCancellation).await; + return Ok(()); + } + + if cancellation.name != self.current_action.as_ref().unwrap().action.name { + warn!("Unexpected cancellation: {cancellation:?}"); + self.forward_action_error(action, Error::CorruptedCancellation).await; + return Ok(()); + } + + let route = self + .action_routes + .get(&action.name) + .expect("Action shouldn't be in execution if it can't be routed!"); + + if !route.is_cancellable() { + // Ensure that action redirections for the action are turned off, + // action will be cancelled on next attempt to redirect + self.current_action.as_mut().unwrap().cancelled_by = Some(action); + + return Ok(()); + } + + if let Err(e) = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver) { + self.forward_action_error(action, e).await; + } + + Ok(()) + } + /// Save current action information in persistence fn save_current_action(&mut self) -> Result<(), Error> { let current_action = match self.current_action.take() { @@ -318,22 +368,48 @@ impl ActionsBridge { action = a; } - if let Err(RedirectionError(action)) = self.redirect_action(action).await { - // NOTE: send success reponse for actions that don't have redirections configured - warn!("Action redirection is not configured for: {:?}", action); - let response = ActionResponse::success(&action.action_id); - self.streams.forward(response).await; + match self.redirect_action(&mut action).await { + Ok(_) => return, + Err(Error::NoRoute(_)) => { + // NOTE: send success reponse for actions that don't have redirections configured + warn!("Action redirection is not configured for: {:?}", action); + let response = ActionResponse::success(&action.action_id); + self.streams.forward(response).await; + + if let Some(CurrentAction { cancelled_by: Some(cancel_action), .. }) = + self.current_action.take() + { + // Marks the cancellation as a failure as action has reached completion without being cancelled + self.forward_action_error(cancel_action, Error::FailedCancellation).await + } + } + Err(Error::Cancelled(cancel_action)) => { + let response = ActionResponse::success(&cancel_action); + self.streams.forward(response).await; - self.clear_current_action(); + self.forward_action_error(action, Error::Cancelled(cancel_action)).await; + } + Err(e) => { + self.forward_action_error(action, e).await; + } } + + self.clear_current_action(); } } - async fn redirect_action(&mut self, mut action: Action) -> Result<(), RedirectionError> { + async fn redirect_action(&mut self, action: &mut Action) -> Result<(), Error> { let fwd_name = self .action_redirections .get(&action.name) - .ok_or_else(|| RedirectionError(action.clone()))?; + .ok_or_else(|| Error::NoRoute(action.name.clone()))?; + + // Cancelled action should not be redirected + if let Some(CurrentAction { cancelled_by: Some(cancel_action), .. }) = + self.current_action.as_ref() + { + return Err(Error::Cancelled(cancel_action.action_id.clone())); + } debug!( "Redirecting action: {} ~> {}; action_id = {}", @@ -341,14 +417,7 @@ impl ActionsBridge { ); action.name = fwd_name.to_owned(); - - if let Err(e) = self.try_route_action(action.clone()) { - error!("Failed to route action to app. Error = {:?}", e); - self.forward_action_error(action, e).await; - - // Remove action because it couldn't be forwarded - self.clear_current_action() - } + self.try_route_action(action.clone())?; Ok(()) } @@ -380,6 +449,8 @@ struct CurrentAction { pub id: String, pub action: Action, pub timeout: Pin>, + // cancel-action request + pub cancelled_by: Option, } impl CurrentAction { @@ -388,6 +459,7 @@ impl CurrentAction { id: action.action_id.clone(), action, timeout: Box::pin(time::sleep_until(deadline)), + cancelled_by: None, } } @@ -409,6 +481,7 @@ impl CurrentAction { id: json.id, action: json.action, timeout: Box::pin(time::sleep(json.timeout)), + cancelled_by: None, }) } } @@ -417,6 +490,7 @@ impl CurrentAction { pub struct ActionRouter { pub(crate) actions_tx: Sender, duration: Duration, + cancellable: bool, } impl ActionRouter { @@ -429,6 +503,10 @@ impl ActionRouter { Ok(deadline) } + + pub fn is_cancellable(&self) -> bool { + self.cancellable + } } /// Handle for apps to send action status to bridge @@ -526,13 +604,21 @@ mod tests { std::env::set_current_dir(&tmpdir).unwrap(); let config = Arc::new(default_config()); let (mut bridge, actions_tx, data_rx) = create_bridge(config); - let route_1 = ActionRoute { name: "route_1".to_string(), timeout: Duration::from_secs(10) }; + let route_1 = ActionRoute { + name: "route_1".to_string(), + timeout: Duration::from_secs(10), + cancellable: false, + }; let (route_tx, route_1_rx) = bounded(1); bridge.register_action_route(route_1, route_tx).unwrap(); let (route_tx, route_2_rx) = bounded(1); - let route_2 = ActionRoute { name: "route_2".to_string(), timeout: Duration::from_secs(30) }; + let route_2 = ActionRoute { + name: "route_2".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(route_2, route_tx).unwrap(); spawn_bridge(bridge); @@ -610,7 +696,11 @@ mod tests { let config = Arc::new(default_config()); let (mut bridge, actions_tx, data_rx) = create_bridge(config); - let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; + let test_route = ActionRoute { + name: "test".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; let (route_tx, action_rx) = bounded(1); bridge.register_action_route(test_route, route_tx).unwrap(); @@ -660,7 +750,11 @@ mod tests { let config = Arc::new(default_config()); let (mut bridge, actions_tx, data_rx) = create_bridge(config); - let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; + let test_route = ActionRoute { + name: "test".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; let (route_tx, action_rx) = bounded(1); bridge.register_action_route(test_route, route_tx).unwrap(); @@ -710,12 +804,19 @@ mod tests { let bridge_tx_2 = bridge.status_tx(); let (route_tx, action_rx_1) = bounded(1); - let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; + let test_route = ActionRoute { + name: "test".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(test_route, route_tx).unwrap(); let (route_tx, action_rx_2) = bounded(1); - let redirect_route = - ActionRoute { name: "redirect".to_string(), timeout: Duration::from_secs(30) }; + let redirect_route = ActionRoute { + name: "redirect".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(redirect_route, route_tx).unwrap(); spawn_bridge(bridge); @@ -777,12 +878,19 @@ mod tests { let bridge_tx_2 = bridge.status_tx(); let (route_tx, action_rx_1) = bounded(1); - let tunshell_route = - ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: Duration::from_secs(30) }; + let tunshell_route = ActionRoute { + name: TUNSHELL_ACTION.to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(tunshell_route, route_tx).unwrap(); let (route_tx, action_rx_2) = bounded(1); - let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; + let test_route = ActionRoute { + name: "test".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(test_route, route_tx).unwrap(); spawn_bridge(bridge); @@ -866,12 +974,19 @@ mod tests { let bridge_tx_2 = bridge.status_tx(); let (route_tx, action_rx_1) = bounded(1); - let test_route = ActionRoute { name: "test".to_string(), timeout: Duration::from_secs(30) }; + let test_route = ActionRoute { + name: "test".to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(test_route, route_tx).unwrap(); let (route_tx, action_rx_2) = bounded(1); - let tunshell_route = - ActionRoute { name: TUNSHELL_ACTION.to_string(), timeout: Duration::from_secs(30) }; + let tunshell_route = ActionRoute { + name: TUNSHELL_ACTION.to_string(), + timeout: Duration::from_secs(30), + cancellable: false, + }; bridge.register_action_route(tunshell_route, route_tx).unwrap(); spawn_bridge(bridge); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index b53adc23c..4120a045e 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -594,6 +594,7 @@ mod test { actions: vec![ActionRoute { name: "firmware_update".to_owned(), timeout: Duration::from_secs(10), + cancellable: true, }], path, }; diff --git a/uplink/src/config.rs b/uplink/src/config.rs index d7718a97d..4fdac763c 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -214,6 +214,8 @@ pub struct ActionRoute { #[serde(default = "default_timeout")] #[serde_as(as = "DurationSeconds")] pub timeout: Duration, + // Can the action handler cancel actions mid execution? + pub cancellable: bool, } impl From<&ActionRoute> for ActionRoute { diff --git a/uplink/src/lib.rs b/uplink/src/lib.rs index c536eb4a9..9fecf7bc5 100644 --- a/uplink/src/lib.rs +++ b/uplink/src/lib.rs @@ -237,8 +237,11 @@ impl Uplink { pub fn spawn_builtins(&mut self, bridge: &mut Bridge) -> Result<(), Error> { let bridge_tx = bridge.bridge_tx(); - let route = - ActionRoute { name: "launch_shell".to_owned(), timeout: Duration::from_secs(10) }; + let route = ActionRoute { + name: "launch_shell".to_owned(), + timeout: Duration::from_secs(10), + cancellable: false, + }; let actions_rx = bridge.register_action_route(route)?; let tunshell_client = TunshellClient::new(actions_rx, bridge_tx.clone()); spawn_named_thread("Tunshell Client", move || tunshell_client.start()); @@ -258,6 +261,7 @@ impl Uplink { let route = ActionRoute { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), + cancellable: false, }; let actions_rx = bridge.register_action_route(route)?; let logger = JournalCtl::new(config, actions_rx, bridge_tx.clone()); @@ -273,6 +277,7 @@ impl Uplink { let route = ActionRoute { name: "journalctl_config".to_string(), timeout: Duration::from_secs(10), + cancellable: false, }; let actions_rx = bridge.register_action_route(route)?; let logger = Logcat::new(config, actions_rx, bridge_tx.clone()); From 08d8cab2b769cd7333aa8ffb2a2b7dffad6c885b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Apr 2024 22:49:13 +0530 Subject: [PATCH 03/24] set `cancelled_by` even for actions in cancellable stages --- uplink/src/base/bridge/actions_lane.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index c8d2cfd19..b142e130e 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -269,16 +269,15 @@ impl ActionsBridge { .get(&action.name) .expect("Action shouldn't be in execution if it can't be routed!"); - if !route.is_cancellable() { - // Ensure that action redirections for the action are turned off, - // action will be cancelled on next attempt to redirect - self.current_action.as_mut().unwrap().cancelled_by = Some(action); - - return Ok(()); - } - - if let Err(e) = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver) { - self.forward_action_error(action, e).await; + // Ensure that action redirections for the action are turned off, + // action will be cancelled on next attempt to redirect + self.current_action.as_mut().unwrap().cancelled_by = Some(action.clone()); + + if route.is_cancellable() { + if let Err(e) = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver) + { + self.forward_action_error(action, e).await; + } } Ok(()) From 41c574fb08bce92bf128fb912269a0ca13def51c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Apr 2024 22:57:13 +0530 Subject: [PATCH 04/24] cancelled download must be updated to bridge --- uplink/src/base/bridge/actions_lane.rs | 7 ++++++- uplink/src/collector/downloader.rs | 4 ++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index b142e130e..eafae3c76 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -354,7 +354,12 @@ impl ActionsBridge { self.streams.forward(response.clone()).await; if response.is_completed() || response.is_failed() { - self.clear_current_action(); + if let Some(CurrentAction { cancelled_by: Some(cancel_action), .. }) = + self.current_action.take() + { + let response = ActionResponse::success(&cancel_action.action_id); + self.streams.forward(response).await; + } return; } diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 4120a045e..cdb353847 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -94,6 +94,8 @@ pub enum Error { BadSave, #[error("Save file doesn't exist")] NoSave, + #[error("Download has been cancelled")] + Cancelled, } /// This struct contains the necessary components to download and store file as notified by a download file @@ -222,6 +224,8 @@ impl FileDownloader { trace!("Deleting partially downloaded file: {cancellation:?}"); state.clean()?; + + return Err(Error::Cancelled); }, // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries From 33a870644dbb9bf350538bcbef619d87dc95e5d4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 24 Apr 2024 16:55:06 +0530 Subject: [PATCH 05/24] chore: cleanup imports --- uplink/src/base/bridge/actions_lane.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 7c8e06cf8..7657c671a 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -540,7 +540,7 @@ impl CtrlTx { #[cfg(test)] mod tests { - use tokio::runtime::Runtime; + use tokio::{runtime::Runtime, select}; use crate::config::{StreamConfig, StreamMetricsConfig}; From 2cf547ab9734e4fa2f0e59e5dc9524a7208a6c06 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 25 Apr 2024 00:42:59 +0530 Subject: [PATCH 06/24] refactor: action timeouts with cancel-action triggered by bridge --- configs/config.toml | 4 +- uplink/src/base/actions.rs | 6 +-- uplink/src/base/bridge/actions_lane.rs | 56 ++++++++++++++++++-------- uplink/src/collector/downloader.rs | 40 +++++------------- uplink/src/collector/process.rs | 50 ++++++++++++++--------- uplink/src/collector/script_runner.rs | 38 +++++++++-------- uplink/src/config.rs | 1 + uplink/src/main.rs | 15 +++++++ 8 files changed, 122 insertions(+), 88 deletions(-) diff --git a/configs/config.toml b/configs/config.toml index d97ff2a44..de88c5463 100644 --- a/configs/config.toml +++ b/configs/config.toml @@ -9,7 +9,7 @@ action_redirections = { "firmware_update" = "install_update", "send_file" = "loa # Script runner allows users to trigger an action that will run an already downloaded # file as described by the download_path field of the JSON payload of a download action. -script_runner = [{ name = "run_script" }] +script_runner = [{ name = "run_script", timeout = 10 }] # Location on disk for persisted streams to write backlogs into, also used to write persistence_path = "/tmp/uplink/" @@ -141,7 +141,7 @@ priority = 255 # - actions: List of actions names that can trigger the downloader, with configurable timeouts # - path: Location in fs where the files are downloaded into [downloader] -actions = [{ name = "update_firmware" }, { name = "send_file" }, { name = "send_script" }] +actions = [{ name = "update_firmware" }, { name = "send_file", timeout = 10 }, { name = "send_script" }] path = "/var/tmp/ota-file" # Configurations associated with the system stats module of uplink, if enabled diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index c1767485d..985f24cb8 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use tokio::time::Instant; use crate::{Payload, Point}; @@ -17,9 +16,6 @@ pub struct Action { pub name: String, // action payload. json. can be args/payload. depends on the invoked command pub payload: String, - // Instant at which action must be timedout - #[serde(skip)] - pub deadline: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -115,7 +111,7 @@ impl Point for ActionResponse { } } -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Serialize)] pub struct Cancellation { pub action_id: String, pub name: String, diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 7657c671a..0120e2ec4 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -165,18 +165,49 @@ impl ActionsBridge { self.handle_action(action).await; } + response = self.status_rx.recv_async() => { let response = response?; self.forward_action_response(response).await; } + _ = &mut self.current_action.as_mut().map(|a| &mut a.timeout).unwrap_or(&mut end) => { - let action = self.current_action.take().unwrap(); - error!("Timeout waiting for action response. Action ID = {}", action.id); - self.forward_action_error(action.action, Error::ActionTimeout).await; + let curret_action = self.current_action.as_mut().unwrap(); + let action = curret_action.action.clone(); + + let route = self + .action_routes + .get(&action.name) + .expect("Action shouldn't be in execution if it can't be routed!"); + + if !route.is_cancellable() { + // Directly send timeout failure response if handler doesn't allow action cancellation + error!("Timeout waiting for action response. Action ID = {}", action.action_id); + self.forward_action_error(action, Error::ActionTimeout).await; + + // Remove action because it timedout + self.clear_current_action(); + continue; + } + + let cancellation = Cancellation { action_id: action.action_id.clone(), name: action.name }; + let payload = serde_json::to_string(&cancellation)?; + let cancel_action = Action { + action_id: "timeout".to_owned(), // Describes cause of action cancellation. NOTE: Action handler shouldn't expect an integer. + name: "cancel-action".to_owned(), + payload, + }; + if route.try_send(cancel_action).is_err() { + error!("Couldn't cancel action ({}) on timeout: {}", action.action_id, Error::UnresponsiveReceiver); + // Remove action anyways + self.clear_current_action(); + continue; + } - // Remove action because it timedout - self.clear_current_action() + // set timeout to end of time in wait of cancellation response + curret_action.timeout = Box::pin(time::sleep(Duration::from_secs(u64::MAX))); } + // Flush streams that timeout Some(timedout_stream) = self.streams.stream_timeouts.next(), if self.streams.stream_timeouts.has_pending() => { debug!("Flushing stream = {}", timedout_stream); @@ -184,12 +215,14 @@ impl ActionsBridge { error!("Failed to flush stream = {}. Error = {}", timedout_stream, e); } } + // Flush all metrics when timed out _ = metrics_timeout.tick() => { if let Err(e) = self.streams.check_and_flush_metrics() { debug!("Failed to flush stream metrics. Error = {}", e); } } + // Handle a shutdown signal _ = self.ctrl_rx.recv_async() => { if let Err(e) = self.save_current_action() { @@ -500,9 +533,8 @@ pub struct ActionRouter { impl ActionRouter { #[allow(clippy::result_large_err)] /// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete - pub fn try_send(&self, mut action: Action) -> Result> { + pub fn try_send(&self, action: Action) -> Result> { let deadline = Instant::now() + self.duration; - action.deadline = Some(deadline); self.actions_tx.try_send(action)?; Ok(deadline) @@ -646,7 +678,6 @@ mod tests { action_id: "1".to_string(), name: "route_1".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -669,7 +700,6 @@ mod tests { action_id: "2".to_string(), name: "route_2".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -716,7 +746,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_1).unwrap(); @@ -730,7 +759,6 @@ mod tests { action_id: "2".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action_2).unwrap(); @@ -774,7 +802,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -845,7 +872,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -921,7 +947,6 @@ mod tests { action_id: "1".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -931,7 +956,6 @@ mod tests { action_id: "2".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -1017,7 +1041,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); @@ -1027,7 +1050,6 @@ mod tests { action_id: "2".to_string(), name: "launch_shell".to_string(), payload: "test".to_string(), - deadline: None, }; actions_tx.send(action).unwrap(); diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 95538971d..2ab63bd39 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -56,7 +56,7 @@ use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identit use rsa::sha2::{Digest, Sha256}; use serde::{Deserialize, Serialize}; use tokio::select; -use tokio::time::{timeout_at, Instant}; +use tokio::time::Instant; use std::fs::{metadata, read, remove_dir_all, remove_file, write, File}; use std::io; @@ -94,8 +94,8 @@ pub enum Error { BadSave, #[error("Save file doesn't exist")] NoSave, - #[error("Download has been cancelled")] - Cancelled, + #[error("Download has been cancelled by '{0}'")] + Cancelled(String), } /// This struct contains the necessary components to download and store file as notified by a download file @@ -205,18 +205,14 @@ impl FileDownloader { // Accepts `DownloadState`, sets a timeout for the action async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> { let shutdown_rx = self.shutdown_rx.clone(); - let deadline = match &state.current.action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", state.current.action.name); - return Ok(()); - } - }; select! { + // Wait till download completes + o = self.continuous_retry(state) => o?, + // Cancel download on receiving cancel action, e.g. on action timeout Ok(action) = self.actions_rx.recv_async() => { if action.name != "cancel-action" { warn!("Unexpected action: {action:?}"); - unreachable!("Only cancel-action should be sent to downloader while it is handling another downlaod!!"); + unreachable!("Only cancel-action should be sent to downloader while it is handling another download!!"); } let cancellation: Cancellation = serde_json::from_str(&action.payload)?; @@ -228,7 +224,7 @@ impl FileDownloader { trace!("Deleting partially downloaded file: {cancellation:?}"); state.clean()?; - return Err(Error::Cancelled); + return Err(Error::Cancelled(action.action_id)); }, Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => { @@ -238,12 +234,6 @@ impl FileDownloader { return Ok(()); }, - - // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries - o = timeout_at(deadline, self.continuous_retry(state)) => match o { - Ok(r) => r?, - Err(_) => error!("Last download has timedout"), - } } state.current.meta.verify_checksum()?; @@ -386,7 +376,6 @@ impl DownloadFile { struct CurrentDownload { action: Action, meta: DownloadFile, - time_left: Option, } // A temporary structure to help us retry downloads @@ -439,7 +428,7 @@ impl DownloadState { human_bytes(meta.content_length as f64) ); meta.download_path = Some(file_path); - let current = CurrentDownload { action, meta, time_left: None }; + let current = CurrentDownload { action, meta }; Ok(Self { current, @@ -459,9 +448,7 @@ impl DownloadState { } let read = read(&path)?; - let mut current: CurrentDownload = serde_json::from_slice(&read)?; - // Calculate deadline based on written time left - current.action.deadline = current.time_left.map(|t| Instant::now() + t); + let current: CurrentDownload = serde_json::from_slice(&read)?; // Unwrap is ok here as it is expected to be set for actions once received let file = File::open(current.meta.download_path.as_ref().unwrap())?; @@ -483,9 +470,7 @@ impl DownloadState { return Ok(()); } - let mut current = self.current.clone(); - // Calculate time left based on deadline - current.time_left = current.action.deadline.map(|t| t.duration_since(Instant::now())); + let current = self.current.clone(); let json = serde_json::to_vec(¤t)?; let mut path = config.path.clone(); @@ -642,7 +627,6 @@ mod test { action_id: "1".to_string(), name: "firmware_update".to_string(), payload: json!(download_update).to_string(), - deadline: Some(Instant::now() + Duration::from_secs(60)), }; std::thread::sleep(Duration::from_millis(10)); @@ -703,7 +687,6 @@ mod test { action_id: "1".to_string(), name: "firmware_update".to_string(), payload: json!(correct_update).to_string(), - deadline: Some(Instant::now() + Duration::from_secs(100)), }; // Send the correct action to FileDownloader @@ -741,7 +724,6 @@ mod test { action_id: "1".to_string(), name: "firmware_update".to_string(), payload: json!(wrong_update).to_string(), - deadline: Some(Instant::now() + Duration::from_secs(100)), }; // Send the wrong action to FileDownloader diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index f1aed754e..c52dc7cf0 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -1,11 +1,11 @@ use flume::{Receiver, RecvError, SendError}; -use log::{debug, error, info}; +use log::{debug, error, info, trace, warn}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout_at; +use crate::base::actions::Cancellation; use crate::base::bridge::BridgeTx; use crate::{Action, ActionResponse, Package}; @@ -26,6 +26,8 @@ pub enum Error { Busy, #[error("No stdout in spawned action")] NoStdout, + #[error("Process has been cancelled by '{0}'")] + Cancelled(String), } /// Process abstracts functions to spawn process and handle their output @@ -55,25 +57,46 @@ impl ProcessHandler { } /// Capture stdout of the running process in a spawned task - pub async fn spawn_and_capture_stdout(&mut self, mut child: Child) -> Result<(), Error> { + pub async fn spawn_and_capture_stdout( + &mut self, + mut child: Child, + action_id: &str, + ) -> Result<(), Error> { let stdout = child.stdout.take().ok_or(Error::NoStdout)?; let mut stdout = BufReader::new(stdout).lines(); loop { select! { - Ok(Some(line)) = stdout.next_line() => { + Ok(Some(line)) = stdout.next_line() => { let status: ActionResponse = match serde_json::from_str(&line) { Ok(status) => status, - Err(e) => ActionResponse::failure("dummy", e.to_string()), + Err(e) => ActionResponse::failure(action_id, e.to_string()), }; debug!("Action status: {:?}", status); self.bridge_tx.send_action_response(status).await; - } - status = child.wait() => { + } + status = child.wait() => { info!("Action done!! Status = {:?}", status); return Ok(()); }, + // Cancel process on receiving cancel action, e.g. on action timeout + Ok(action) = self.actions_rx.recv_async() => { + if action.name != "cancel-action" { + warn!("Unexpected action: {action:?}"); + unreachable!("Only cancel-actions are acceptable!!"); + } + + let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + if cancellation.action_id != action_id { + warn!("Unexpected action: {action:?}"); + unreachable!("Cancel actions meant for current action only are acceptable!!"); + } + + trace!("Cancelling process: '{}'", cancellation.action_id); + let status = ActionResponse::failure(action_id, Error::Cancelled(action.action_id).to_string()); + self.bridge_tx.send_action_response(status).await; + }, } } } @@ -83,21 +106,10 @@ impl ProcessHandler { loop { let action = self.actions_rx.recv_async().await?; let command = format!("tools/{}", action.name); - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // Spawn the action and capture its stdout, ignore timeouts let child = self.run(&action.action_id, &command, &action.payload).await?; - if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await { - o?; - } else { - error!("Process timedout: {command}; action_id = {}", action.action_id); - } + self.spawn_and_capture_stdout(child, &action.action_id).await?; } } } diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index db8f74ac1..63079df26 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -1,12 +1,12 @@ use flume::{Receiver, RecvError, SendError}; -use log::{debug, error, info, warn}; +use log::{debug, error, info, trace, warn}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; use tokio::select; -use tokio::time::timeout_at; use super::downloader::DownloadFile; +use crate::base::actions::Cancellation; use crate::base::bridge::BridgeTx; use crate::{Action, ActionResponse, Package}; @@ -28,6 +28,8 @@ pub enum Error { Busy, #[error("No stdout in spawned action")] NoStdout, + #[error("Script has been cancelled: '{0}'")] + Cancelled(String), } /// Script runner runs a script downloaded with FileDownloader and handles their output over the action_status stream. @@ -86,6 +88,23 @@ impl ScriptRunner { self.forward_status(ActionResponse::success(id)).await; break; }, + // Cancel script run on receiving cancel action, e.g. on action timeout + Ok(action) = self.actions_rx.recv_async() => { + if action.name != "cancel-action" { + warn!("Unexpected action: {action:?}"); + unreachable!("Only cancel-actions are acceptable!!"); + } + + let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + if cancellation.action_id != id { + warn!("Unexpected action: {action:?}"); + unreachable!("Cancel actions meant for current action only are acceptable!!"); + } + + trace!("Cancelling script: '{}'", cancellation.action_id); + let status = ActionResponse::failure(id, Error::Cancelled(action.action_id).to_string()); + self.bridge_tx.send_action_response(status).await; + }, } } @@ -119,20 +138,9 @@ impl ScriptRunner { continue; } }; - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); - continue; - } - }; // Spawn the action and capture its stdout let child = self.run(command).await?; - if let Ok(o) = - timeout_at(deadline, self.spawn_and_capture_stdout(child, &action.action_id)).await - { - o? - } + self.spawn_and_capture_stdout(child, &action.action_id).await? } } @@ -176,7 +184,6 @@ mod tests { action_id: "1".to_string(), name: "test".to_string(), payload: "".to_string(), - deadline: None, }) .unwrap(); @@ -200,7 +207,6 @@ mod tests { name: "test".to_string(), payload: "{\"url\": \"...\", \"content_length\": 0,\"file_name\": \"...\"}" .to_string(), - deadline: None, }) .unwrap(); diff --git a/uplink/src/config.rs b/uplink/src/config.rs index 4fdac763c..bef2e116c 100644 --- a/uplink/src/config.rs +++ b/uplink/src/config.rs @@ -215,6 +215,7 @@ pub struct ActionRoute { #[serde_as(as = "DurationSeconds")] pub timeout: Duration, // Can the action handler cancel actions mid execution? + #[serde(default)] pub cancellable: bool, } diff --git a/uplink/src/main.rs b/uplink/src/main.rs index 572fea502..0221110b3 100644 --- a/uplink/src/main.rs +++ b/uplink/src/main.rs @@ -181,6 +181,21 @@ impl CommandLine { config.actions_subscription = format!("/tenants/{tenant_id}/devices/{device_id}/actions"); + // downloader actions are cancellable by default + for route in config.downloader.actions.iter_mut() { + route.cancellable = true; + } + + // process actions are cancellable by default + for route in config.processes.iter_mut() { + route.cancellable = true; + } + + // script runner actions are cancellable by default + for route in config.script_runner.iter_mut() { + route.cancellable = true; + } + Ok(config) } From b6d480eb2db4fbea2152e7bc628f54a93d47822e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 May 2024 16:57:28 +0530 Subject: [PATCH 07/24] refactor: remove extra checks --- uplink/src/collector/downloader.rs | 9 --------- uplink/src/collector/process.rs | 11 +---------- uplink/src/collector/script_runner.rs | 9 --------- 3 files changed, 1 insertion(+), 28 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 2ab63bd39..2c06cb65b 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -210,16 +210,7 @@ impl FileDownloader { o = self.continuous_retry(state) => o?, // Cancel download on receiving cancel action, e.g. on action timeout Ok(action) = self.actions_rx.recv_async() => { - if action.name != "cancel-action" { - warn!("Unexpected action: {action:?}"); - unreachable!("Only cancel-action should be sent to downloader while it is handling another download!!"); - } - let cancellation: Cancellation = serde_json::from_str(&action.payload)?; - if cancellation.action_id != self.action_id { - warn!("Unexpected action: {action:?}"); - unreachable!("Cancel actions meant for current action only should be sent to downloader!!"); - } trace!("Deleting partially downloaded file: {cancellation:?}"); state.clean()?; diff --git a/uplink/src/collector/process.rs b/uplink/src/collector/process.rs index c52dc7cf0..79eb578eb 100644 --- a/uplink/src/collector/process.rs +++ b/uplink/src/collector/process.rs @@ -1,5 +1,5 @@ use flume::{Receiver, RecvError, SendError}; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, info, trace}; use thiserror::Error; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::{Child, Command}; @@ -82,16 +82,7 @@ impl ProcessHandler { }, // Cancel process on receiving cancel action, e.g. on action timeout Ok(action) = self.actions_rx.recv_async() => { - if action.name != "cancel-action" { - warn!("Unexpected action: {action:?}"); - unreachable!("Only cancel-actions are acceptable!!"); - } - let cancellation: Cancellation = serde_json::from_str(&action.payload)?; - if cancellation.action_id != action_id { - warn!("Unexpected action: {action:?}"); - unreachable!("Cancel actions meant for current action only are acceptable!!"); - } trace!("Cancelling process: '{}'", cancellation.action_id); let status = ActionResponse::failure(action_id, Error::Cancelled(action.action_id).to_string()); diff --git a/uplink/src/collector/script_runner.rs b/uplink/src/collector/script_runner.rs index 63079df26..8e3b06ceb 100644 --- a/uplink/src/collector/script_runner.rs +++ b/uplink/src/collector/script_runner.rs @@ -90,16 +90,7 @@ impl ScriptRunner { }, // Cancel script run on receiving cancel action, e.g. on action timeout Ok(action) = self.actions_rx.recv_async() => { - if action.name != "cancel-action" { - warn!("Unexpected action: {action:?}"); - unreachable!("Only cancel-actions are acceptable!!"); - } - let cancellation: Cancellation = serde_json::from_str(&action.payload)?; - if cancellation.action_id != id { - warn!("Unexpected action: {action:?}"); - unreachable!("Cancel actions meant for current action only are acceptable!!"); - } trace!("Cancelling script: '{}'", cancellation.action_id); let status = ActionResponse::failure(id, Error::Cancelled(action.action_id).to_string()); From f9666e905b563be4e4b929c107726f830053c765 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 May 2024 16:59:35 +0530 Subject: [PATCH 08/24] fix: use cancellation to figure out route --- uplink/src/base/bridge/actions_lane.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 0120e2ec4..656c3c4cc 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -299,7 +299,7 @@ impl ActionsBridge { let route = self .action_routes - .get(&action.name) + .get(&cancellation.name) .expect("Action shouldn't be in execution if it can't be routed!"); // Ensure that action redirections for the action are turned off, From c2116ce0bddbdf59f87c750a638f4e0cf8371829 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 May 2024 17:01:04 +0530 Subject: [PATCH 09/24] fix: `cancel-action` ~> `cancel_action` --- uplink/src/base/bridge/actions_lane.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 656c3c4cc..ad807f9a4 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -157,7 +157,7 @@ impl ActionsBridge { action = self.actions_rx.recv_async() => { let action = action?; - if action.name == "cancel-action" && self.current_action.is_some() { + if action.name == "cancel_action" && self.current_action.is_some() { self.handle_cancellation(action).await?; continue } @@ -194,7 +194,7 @@ impl ActionsBridge { let payload = serde_json::to_string(&cancellation)?; let cancel_action = Action { action_id: "timeout".to_owned(), // Describes cause of action cancellation. NOTE: Action handler shouldn't expect an integer. - name: "cancel-action".to_owned(), + name: "cancel_action".to_owned(), payload, }; if route.try_send(cancel_action).is_err() { @@ -486,7 +486,7 @@ struct CurrentAction { pub id: String, pub action: Action, pub timeout: Pin>, - // cancel-action request + // cancel_action request pub cancelled_by: Option, } From b86742711428c51091d63f29bea3010c6f2389b2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 15 May 2024 17:01:38 +0530 Subject: [PATCH 10/24] panic with message --- uplink/src/base/bridge/actions_lane.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index ad807f9a4..b25831fbc 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -285,13 +285,17 @@ impl ActionsBridge { /// else marks the current action as cancelled and avoids further redirections async fn handle_cancellation(&mut self, action: Action) -> Result<(), Error> { let cancellation: Cancellation = serde_json::from_str(&action.payload)?; - if cancellation.action_id != self.current_action.as_ref().unwrap().id { + let current_action = self + .current_action + .as_ref() + .expect("Actions that are not executing can't be cancelled"); + if cancellation.action_id != current_action.id { warn!("Unexpected cancellation: {cancellation:?}"); self.forward_action_error(action, Error::UnexpectedCancellation).await; return Ok(()); } - if cancellation.name != self.current_action.as_ref().unwrap().action.name { + if cancellation.name != current_action.action.name { warn!("Unexpected cancellation: {cancellation:?}"); self.forward_action_error(action, Error::CorruptedCancellation).await; return Ok(()); From ca2d83559520409435c906adadaaf4d62ce8c7ca Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 May 2024 10:36:19 +0530 Subject: [PATCH 11/24] chore: cargo fmt --- uplink/src/collector/downloader.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 722f5d8d5..4257bfb22 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -56,7 +56,7 @@ use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identit use rsa::sha2::{Digest, Sha256}; use serde::{Deserialize, Serialize}; use tokio::select; -use tokio::time::{Instant, sleep}; +use tokio::time::{sleep, Instant}; use std::fs::{metadata, read, remove_dir_all, remove_file, write, File}; use std::io; @@ -460,7 +460,8 @@ impl DownloadState { let current: CurrentDownload = serde_json::from_slice(&read)?; // Unwrap is ok here as it is expected to be set for actions once received - let file = File::options().append(true).open(current.meta.download_path.as_ref().unwrap())?; + let file = + File::options().append(true).open(current.meta.download_path.as_ref().unwrap())?; let bytes_written = file.metadata()?.len() as usize; remove_file(path)?; From 0e7132501fdf752aaaed3c2a9a3f6d84d5d69ef0 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 May 2024 10:36:41 +0530 Subject: [PATCH 12/24] refactor: action existence check --- uplink/src/base/bridge/actions_lane.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index b25831fbc..407f4ee3d 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -157,7 +157,7 @@ impl ActionsBridge { action = self.actions_rx.recv_async() => { let action = action?; - if action.name == "cancel_action" && self.current_action.is_some() { + if action.name == "cancel_action" { self.handle_cancellation(action).await?; continue } @@ -284,11 +284,11 @@ impl ActionsBridge { /// Forwards cancellation request to the handler if it can handle the same, /// else marks the current action as cancelled and avoids further redirections async fn handle_cancellation(&mut self, action: Action) -> Result<(), Error> { + let Some(current_action) = self.current_action.as_ref() else { + self.forward_action_error(action, Error::UnexpectedCancellation).await; + return Ok(()); + }; let cancellation: Cancellation = serde_json::from_str(&action.payload)?; - let current_action = self - .current_action - .as_ref() - .expect("Actions that are not executing can't be cancelled"); if cancellation.action_id != current_action.id { warn!("Unexpected cancellation: {cancellation:?}"); self.forward_action_error(action, Error::UnexpectedCancellation).await; From b7a810448db0e88d5021b102e434e2bc1334f0ba Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 16 May 2024 15:04:21 +0530 Subject: [PATCH 13/24] ignore responses to timeout actions as they are internal only --- uplink/src/base/bridge/actions_lane.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 407f4ee3d..d00946f1f 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -368,6 +368,11 @@ impl ActionsBridge { } async fn forward_action_response(&mut self, mut response: ActionResponse) { + // Ignore responses to timeout action + if response.action_id == "timeout" { + return; + } + if self.parallel_actions.contains(&response.action_id) { self.forward_parallel_action_response(response).await; From b7075d3f31b4e94da514c6e646c801848a80a943 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 21 May 2024 13:24:36 +0530 Subject: [PATCH 14/24] fix: action might have been redirected --- uplink/src/base/bridge/actions_lane.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index d00946f1f..c5aa8766a 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -288,7 +288,7 @@ impl ActionsBridge { self.forward_action_error(action, Error::UnexpectedCancellation).await; return Ok(()); }; - let cancellation: Cancellation = serde_json::from_str(&action.payload)?; + let mut cancellation: Cancellation = serde_json::from_str(&action.payload)?; if cancellation.action_id != current_action.id { warn!("Unexpected cancellation: {cancellation:?}"); self.forward_action_error(action, Error::UnexpectedCancellation).await; @@ -296,9 +296,11 @@ impl ActionsBridge { } if cancellation.name != current_action.action.name { - warn!("Unexpected cancellation: {cancellation:?}"); - self.forward_action_error(action, Error::CorruptedCancellation).await; - return Ok(()); + debug!( + "Action was redirected: {} ~> {}", + cancellation.name, current_action.action.name + ); + current_action.action.name.clone_into(&mut cancellation.name); } let route = self From 9d08d0b1c9375a7eabf97ca5f34a52a6bdee5180 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 13:11:56 +0530 Subject: [PATCH 15/24] cancellation `received` response --- uplink/src/base/bridge/actions_lane.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index c5aa8766a..bfbb764c4 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -295,6 +295,7 @@ impl ActionsBridge { return Ok(()); } + info!("Received action cancellation: {:?}", cancellation); if cancellation.name != current_action.action.name { debug!( "Action was redirected: {} ~> {}", @@ -311,13 +312,16 @@ impl ActionsBridge { // Ensure that action redirections for the action are turned off, // action will be cancelled on next attempt to redirect self.current_action.as_mut().unwrap().cancelled_by = Some(action.clone()); + let response = ActionResponse::progress(&action.action_id, "Received", 0); if route.is_cancellable() { if let Err(e) = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver) { self.forward_action_error(action, e).await; + return Ok(()); } } + self.forward_action_response(response).await; Ok(()) } From 6f4df44cb0a78b5dc813eb1ba0fe9b544168da3d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 13:15:04 +0530 Subject: [PATCH 16/24] refactor: error only needs `action_id` --- uplink/src/base/bridge/actions_lane.rs | 27 +++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index bfbb764c4..c9e801b96 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -183,7 +183,7 @@ impl ActionsBridge { if !route.is_cancellable() { // Directly send timeout failure response if handler doesn't allow action cancellation error!("Timeout waiting for action response. Action ID = {}", action.action_id); - self.forward_action_error(action, Error::ActionTimeout).await; + self.forward_action_error(&action.action_id, Error::ActionTimeout).await; // Remove action because it timedout self.clear_current_action(); @@ -248,7 +248,7 @@ impl ActionsBridge { "Another action is currently occupying uplink; action_id = {}", current_action.id ); - self.forward_action_error(action, Error::Busy).await; + self.forward_action_error(&action.action_id, Error::Busy).await; return; } } @@ -278,20 +278,20 @@ impl ActionsBridge { } error!("Failed to route action to app. Error = {:?}", error); - self.forward_action_error(action, error).await; + self.forward_action_error(&action.action_id, error).await; } /// Forwards cancellation request to the handler if it can handle the same, /// else marks the current action as cancelled and avoids further redirections async fn handle_cancellation(&mut self, action: Action) -> Result<(), Error> { let Some(current_action) = self.current_action.as_ref() else { - self.forward_action_error(action, Error::UnexpectedCancellation).await; + self.forward_action_error(&action.action_id, Error::UnexpectedCancellation).await; return Ok(()); }; let mut cancellation: Cancellation = serde_json::from_str(&action.payload)?; if cancellation.action_id != current_action.id { warn!("Unexpected cancellation: {cancellation:?}"); - self.forward_action_error(action, Error::UnexpectedCancellation).await; + self.forward_action_error(&action.action_id, Error::UnexpectedCancellation).await; return Ok(()); } @@ -317,7 +317,7 @@ impl ActionsBridge { if route.is_cancellable() { if let Err(e) = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver) { - self.forward_action_error(action, e).await; + self.forward_action_error(&action.action_id, e).await; return Ok(()); } } @@ -432,17 +432,22 @@ impl ActionsBridge { self.current_action.take() { // Marks the cancellation as a failure as action has reached completion without being cancelled - self.forward_action_error(cancel_action, Error::FailedCancellation).await + self.forward_action_error( + &cancel_action.action_id, + Error::FailedCancellation, + ) + .await } } Err(Error::Cancelled(cancel_action)) => { let response = ActionResponse::success(&cancel_action); self.streams.forward(response).await; - self.forward_action_error(action, Error::Cancelled(cancel_action)).await; + self.forward_action_error(&action.action_id, Error::Cancelled(cancel_action)) + .await; } Err(e) => { - self.forward_action_error(action, e).await; + self.forward_action_error(&action.action_id, e).await; } } @@ -483,8 +488,8 @@ impl ActionsBridge { self.streams.forward(response).await; } - async fn forward_action_error(&mut self, action: Action, error: Error) { - let response = ActionResponse::failure(&action.action_id, error.to_string()); + async fn forward_action_error(&mut self, action_id: &str, error: Error) { + let response = ActionResponse::failure(action_id, error.to_string()); self.streams.forward(response).await; } From a52d1c3b5962ba9ee39788d52679e2db2c971a5f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 13:18:18 +0530 Subject: [PATCH 17/24] refactor: store only `action_id` for cancellations --- uplink/src/base/bridge/actions_lane.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index c9e801b96..7f82c224b 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -311,7 +311,7 @@ impl ActionsBridge { // Ensure that action redirections for the action are turned off, // action will be cancelled on next attempt to redirect - self.current_action.as_mut().unwrap().cancelled_by = Some(action.clone()); + self.current_action.as_mut().unwrap().cancelled_by = Some(action.action_id.clone()); let response = ActionResponse::progress(&action.action_id, "Received", 0); if route.is_cancellable() { @@ -405,7 +405,7 @@ impl ActionsBridge { if let Some(CurrentAction { cancelled_by: Some(cancel_action), .. }) = self.current_action.take() { - let response = ActionResponse::success(&cancel_action.action_id); + let response = ActionResponse::success(&cancel_action); self.streams.forward(response).await; } return; @@ -432,11 +432,7 @@ impl ActionsBridge { self.current_action.take() { // Marks the cancellation as a failure as action has reached completion without being cancelled - self.forward_action_error( - &cancel_action.action_id, - Error::FailedCancellation, - ) - .await + self.forward_action_error(&cancel_action, Error::FailedCancellation).await } } Err(Error::Cancelled(cancel_action)) => { @@ -465,7 +461,7 @@ impl ActionsBridge { if let Some(CurrentAction { cancelled_by: Some(cancel_action), .. }) = self.current_action.as_ref() { - return Err(Error::Cancelled(cancel_action.action_id.clone())); + return Err(Error::Cancelled(cancel_action.clone())); } debug!( @@ -507,7 +503,7 @@ struct CurrentAction { pub action: Action, pub timeout: Pin>, // cancel_action request - pub cancelled_by: Option, + pub cancelled_by: Option, } impl CurrentAction { From 34ed57119e2e5a9905f8c3d53b1e5e37070369ea Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 13:33:37 +0530 Subject: [PATCH 18/24] refactor: don't copy action where only id is required --- uplink/src/base/bridge/actions_lane.rs | 27 +++++++++++++------------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 7f82c224b..82cd70c73 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -173,24 +173,25 @@ impl ActionsBridge { _ = &mut self.current_action.as_mut().map(|a| &mut a.timeout).unwrap_or(&mut end) => { let curret_action = self.current_action.as_mut().unwrap(); - let action = curret_action.action.clone(); + let action_id = curret_action.action.action_id.clone(); + let action_name = curret_action.action.name.clone(); let route = self .action_routes - .get(&action.name) + .get(&action_name) .expect("Action shouldn't be in execution if it can't be routed!"); if !route.is_cancellable() { // Directly send timeout failure response if handler doesn't allow action cancellation - error!("Timeout waiting for action response. Action ID = {}", action.action_id); - self.forward_action_error(&action.action_id, Error::ActionTimeout).await; + error!("Timeout waiting for action response. Action ID = {}", action_id); + self.forward_action_error(&action_id, Error::ActionTimeout).await; // Remove action because it timedout self.clear_current_action(); continue; } - let cancellation = Cancellation { action_id: action.action_id.clone(), name: action.name }; + let cancellation = Cancellation { action_id: action_id.clone(), name: action_name }; let payload = serde_json::to_string(&cancellation)?; let cancel_action = Action { action_id: "timeout".to_owned(), // Describes cause of action cancellation. NOTE: Action handler shouldn't expect an integer. @@ -198,7 +199,7 @@ impl ActionsBridge { payload, }; if route.try_send(cancel_action).is_err() { - error!("Couldn't cancel action ({}) on timeout: {}", action.action_id, Error::UnresponsiveReceiver); + error!("Couldn't cancel action ({}) on timeout: {}", action_id, Error::UnresponsiveReceiver); // Remove action anyways self.clear_current_action(); continue; @@ -284,14 +285,15 @@ impl ActionsBridge { /// Forwards cancellation request to the handler if it can handle the same, /// else marks the current action as cancelled and avoids further redirections async fn handle_cancellation(&mut self, action: Action) -> Result<(), Error> { + let action_id = action.action_id.clone(); let Some(current_action) = self.current_action.as_ref() else { - self.forward_action_error(&action.action_id, Error::UnexpectedCancellation).await; + self.forward_action_error(&action_id, Error::UnexpectedCancellation).await; return Ok(()); }; let mut cancellation: Cancellation = serde_json::from_str(&action.payload)?; if cancellation.action_id != current_action.id { warn!("Unexpected cancellation: {cancellation:?}"); - self.forward_action_error(&action.action_id, Error::UnexpectedCancellation).await; + self.forward_action_error(&action_id, Error::UnexpectedCancellation).await; return Ok(()); } @@ -311,16 +313,15 @@ impl ActionsBridge { // Ensure that action redirections for the action are turned off, // action will be cancelled on next attempt to redirect - self.current_action.as_mut().unwrap().cancelled_by = Some(action.action_id.clone()); - let response = ActionResponse::progress(&action.action_id, "Received", 0); + self.current_action.as_mut().unwrap().cancelled_by = Some(action_id.clone()); if route.is_cancellable() { - if let Err(e) = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver) - { - self.forward_action_error(&action.action_id, e).await; + if let Err(e) = route.try_send(action).map_err(|_| Error::UnresponsiveReceiver) { + self.forward_action_error(&action_id, e).await; return Ok(()); } } + let response = ActionResponse::progress(&action_id, "Received", 0); self.forward_action_response(response).await; Ok(()) From 8a7d311c62e2d9017412fc5f7d52f68723108443 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 14:33:15 +0530 Subject: [PATCH 19/24] drop the `CurrentAction.id` field --- uplink/src/base/bridge/actions_lane.rs | 28 ++++++++++++-------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 82cd70c73..e593bbfe2 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -247,7 +247,7 @@ impl ActionsBridge { if action.name != TUNSHELL_ACTION { warn!( "Another action is currently occupying uplink; action_id = {}", - current_action.id + current_action.action.action_id ); self.forward_action_error(&action.action_id, Error::Busy).await; return; @@ -291,7 +291,7 @@ impl ActionsBridge { return Ok(()); }; let mut cancellation: Cancellation = serde_json::from_str(&action.payload)?; - if cancellation.action_id != current_action.id { + if cancellation.action_id != current_action.action.action_id { warn!("Unexpected cancellation: {cancellation:?}"); self.forward_action_error(&action_id, Error::UnexpectedCancellation).await; return Ok(()); @@ -348,7 +348,10 @@ impl ActionsBridge { if path.is_file() { let current_action = CurrentAction::read_from_disk(path)?; - info!("Loading saved action from persistence; action_id: {}", current_action.id); + info!( + "Loading saved action from persistence; action_id: {}", + current_action.action.action_id + ); self.current_action = Some(current_action) } @@ -394,8 +397,11 @@ impl ActionsBridge { } }; - if *inflight_action.id != response.action_id { - error!("response id({}) != active action({})", response.action_id, inflight_action.id); + if *inflight_action.action.action_id != response.action_id { + error!( + "response id({}) != active action({})", + response.action_id, inflight_action.action.action_id + ); return; } @@ -494,13 +500,11 @@ impl ActionsBridge { #[derive(Debug, Deserialize, Serialize)] struct SaveAction { - pub id: String, pub action: Action, pub timeout: Duration, } struct CurrentAction { - pub id: String, pub action: Action, pub timeout: Pin>, // cancel_action request @@ -509,17 +513,12 @@ struct CurrentAction { impl CurrentAction { pub fn new(action: Action, deadline: Instant) -> CurrentAction { - CurrentAction { - id: action.action_id.clone(), - action, - timeout: Box::pin(time::sleep_until(deadline)), - cancelled_by: None, - } + CurrentAction { action, timeout: Box::pin(time::sleep_until(deadline)), cancelled_by: None } } pub fn write_to_disk(self, path: PathBuf) -> Result<(), Error> { let timeout = self.timeout.as_ref().deadline() - Instant::now(); - let save_action = SaveAction { id: self.id, action: self.action, timeout }; + let save_action = SaveAction { action: self.action, timeout }; let json = serde_json::to_string(&save_action)?; fs::write(path, json)?; @@ -532,7 +531,6 @@ impl CurrentAction { fs::remove_file(path)?; Ok(CurrentAction { - id: json.id, action: json.action, timeout: Box::pin(time::sleep(json.timeout)), cancelled_by: None, From 5d7e4dab8685ec53d7a30eeb5edbd1db932438cc Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 14:41:22 +0530 Subject: [PATCH 20/24] refactor: rm current action clone --- uplink/src/base/bridge/actions_lane.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index e593bbfe2..bf64e7324 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -421,7 +421,7 @@ impl ActionsBridge { // Forward actions included in the config to the appropriate forward route, when // they have reached 100% progress but haven't been marked as "Completed"/"Finished". if response.is_done() { - let mut action = inflight_action.action.clone(); + let mut action = self.current_action.take().unwrap().action; if let Some(a) = response.done_response.take() { action = a; @@ -449,12 +449,8 @@ impl ActionsBridge { self.forward_action_error(&action.action_id, Error::Cancelled(cancel_action)) .await; } - Err(e) => { - self.forward_action_error(&action.action_id, e).await; - } + Err(e) => self.forward_action_error(&action.action_id, e).await, } - - self.clear_current_action(); } } From 15edaa4e741aeac3998361faf226812fe8a5a91e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 14:45:11 +0530 Subject: [PATCH 21/24] `name` ~> `action_name` --- uplink/src/base/actions.rs | 3 ++- uplink/src/base/bridge/actions_lane.rs | 12 ++++++------ 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/uplink/src/base/actions.rs b/uplink/src/base/actions.rs index 985f24cb8..d9c07e4a3 100644 --- a/uplink/src/base/actions.rs +++ b/uplink/src/base/actions.rs @@ -114,5 +114,6 @@ impl Point for ActionResponse { #[derive(Debug, Deserialize, Serialize)] pub struct Cancellation { pub action_id: String, - pub name: String, + #[serde(rename = "name")] + pub action_name: String, } diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index bf64e7324..e154302f5 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -191,7 +191,7 @@ impl ActionsBridge { continue; } - let cancellation = Cancellation { action_id: action_id.clone(), name: action_name }; + let cancellation = Cancellation { action_id, action_name }; let payload = serde_json::to_string(&cancellation)?; let cancel_action = Action { action_id: "timeout".to_owned(), // Describes cause of action cancellation. NOTE: Action handler shouldn't expect an integer. @@ -199,7 +199,7 @@ impl ActionsBridge { payload, }; if route.try_send(cancel_action).is_err() { - error!("Couldn't cancel action ({}) on timeout: {}", action_id, Error::UnresponsiveReceiver); + error!("Couldn't cancel action ({}) on timeout: {}", cancellation.action_id, Error::UnresponsiveReceiver); // Remove action anyways self.clear_current_action(); continue; @@ -298,17 +298,17 @@ impl ActionsBridge { } info!("Received action cancellation: {:?}", cancellation); - if cancellation.name != current_action.action.name { + if cancellation.action_name != current_action.action.name { debug!( "Action was redirected: {} ~> {}", - cancellation.name, current_action.action.name + cancellation.action_name, current_action.action.name ); - current_action.action.name.clone_into(&mut cancellation.name); + current_action.action.name.clone_into(&mut cancellation.action_name); } let route = self .action_routes - .get(&cancellation.name) + .get(&cancellation.action_name) .expect("Action shouldn't be in execution if it can't be routed!"); // Ensure that action redirections for the action are turned off, From e9fa4feffe748fa1dcc19f6396e587b57f21012e Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 14:58:35 +0530 Subject: [PATCH 22/24] ci: clippy suggestion --- uplink/src/base/bridge/actions_lane.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index e154302f5..38c2d3949 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -428,7 +428,7 @@ impl ActionsBridge { } match self.redirect_action(&mut action).await { - Ok(_) => return, + Ok(_) => (), Err(Error::NoRoute(_)) => { // NOTE: send success reponse for actions that don't have redirections configured warn!("Action redirection is not configured for: {:?}", action); From a8496e9189a8c00a15aa6b4e58caa8311942b9f4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 15:21:48 +0530 Subject: [PATCH 23/24] fix: cancel actions can be responded to when another action is in execution --- uplink/src/base/bridge/actions_lane.rs | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 38c2d3949..57fea217d 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -397,10 +397,12 @@ impl ActionsBridge { } }; - if *inflight_action.action.action_id != response.action_id { + if !inflight_action.is_executing(&response.action_id) + && !inflight_action.is_cancelled_by(&response.action_id) + { error!( - "response id({}) != active action({})", - response.action_id, inflight_action.action.action_id + "response id({}) != active action({}); response = {:?}", + response.action_id, inflight_action.action.action_id, response ); return; } @@ -532,6 +534,14 @@ impl CurrentAction { cancelled_by: None, }) } + + fn is_executing(&self, action_id: &str) -> bool { + self.action.action_id == action_id + } + + fn is_cancelled_by(&self, action_id: &str) -> bool { + self.cancelled_by.as_ref().is_some_and(|id| id == action_id) + } } #[derive(Debug)] From 74aec4581f0576fba4a433ef035fd3673cada03c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 27 May 2024 15:24:38 +0530 Subject: [PATCH 24/24] refactor: shortcircuit forwards --- uplink/src/base/bridge/actions_lane.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uplink/src/base/bridge/actions_lane.rs b/uplink/src/base/bridge/actions_lane.rs index 57fea217d..62926e417 100644 --- a/uplink/src/base/bridge/actions_lane.rs +++ b/uplink/src/base/bridge/actions_lane.rs @@ -259,7 +259,7 @@ impl ActionsBridge { let error = match self.try_route_action(action.clone()) { Ok(_) => { let response = ActionResponse::progress(&action_id, "Received", 0); - self.forward_action_response(response).await; + self.streams.forward(response).await; return; } Err(e) => e, @@ -322,7 +322,7 @@ impl ActionsBridge { } } let response = ActionResponse::progress(&action_id, "Received", 0); - self.forward_action_response(response).await; + self.streams.forward(response).await; Ok(()) }