Skip to content

Commit

Permalink
refactor: action timeouts with cancel-action triggered by bridge
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Apr 24, 2024
1 parent 33a8706 commit 2cf547a
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 88 deletions.
4 changes: 2 additions & 2 deletions configs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ action_redirections = { "firmware_update" = "install_update", "send_file" = "loa

# Script runner allows users to trigger an action that will run an already downloaded
# file as described by the download_path field of the JSON payload of a download action.
script_runner = [{ name = "run_script" }]
script_runner = [{ name = "run_script", timeout = 10 }]

# Location on disk for persisted streams to write backlogs into, also used to write
persistence_path = "/tmp/uplink/"
Expand Down Expand Up @@ -141,7 +141,7 @@ priority = 255
# - actions: List of actions names that can trigger the downloader, with configurable timeouts
# - path: Location in fs where the files are downloaded into
[downloader]
actions = [{ name = "update_firmware" }, { name = "send_file" }, { name = "send_script" }]
actions = [{ name = "update_firmware" }, { name = "send_file", timeout = 10 }, { name = "send_script" }]
path = "/var/tmp/ota-file"

# Configurations associated with the system stats module of uplink, if enabled
Expand Down
6 changes: 1 addition & 5 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
use tokio::time::Instant;

use crate::{Payload, Point};

Expand All @@ -17,9 +16,6 @@ pub struct Action {
pub name: String,
// action payload. json. can be args/payload. depends on the invoked command
pub payload: String,
// Instant at which action must be timedout
#[serde(skip)]
pub deadline: Option<Instant>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down Expand Up @@ -115,7 +111,7 @@ impl Point for ActionResponse {
}
}

#[derive(Debug, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
pub struct Cancellation {
pub action_id: String,
pub name: String,
Expand Down
56 changes: 39 additions & 17 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,31 +165,64 @@ impl ActionsBridge {
self.handle_action(action).await;

}

response = self.status_rx.recv_async() => {
let response = response?;
self.forward_action_response(response).await;
}

_ = &mut self.current_action.as_mut().map(|a| &mut a.timeout).unwrap_or(&mut end) => {
let action = self.current_action.take().unwrap();
error!("Timeout waiting for action response. Action ID = {}", action.id);
self.forward_action_error(action.action, Error::ActionTimeout).await;
let curret_action = self.current_action.as_mut().unwrap();
let action = curret_action.action.clone();

let route = self
.action_routes
.get(&action.name)
.expect("Action shouldn't be in execution if it can't be routed!");

if !route.is_cancellable() {
// Directly send timeout failure response if handler doesn't allow action cancellation
error!("Timeout waiting for action response. Action ID = {}", action.action_id);
self.forward_action_error(action, Error::ActionTimeout).await;

// Remove action because it timedout
self.clear_current_action();
continue;
}

let cancellation = Cancellation { action_id: action.action_id.clone(), name: action.name };
let payload = serde_json::to_string(&cancellation)?;
let cancel_action = Action {
action_id: "timeout".to_owned(), // Describes cause of action cancellation. NOTE: Action handler shouldn't expect an integer.
name: "cancel-action".to_owned(),
payload,
};
if route.try_send(cancel_action).is_err() {
error!("Couldn't cancel action ({}) on timeout: {}", action.action_id, Error::UnresponsiveReceiver);
// Remove action anyways
self.clear_current_action();
continue;
}

// Remove action because it timedout
self.clear_current_action()
// set timeout to end of time in wait of cancellation response
curret_action.timeout = Box::pin(time::sleep(Duration::from_secs(u64::MAX)));
}

// Flush streams that timeout
Some(timedout_stream) = self.streams.stream_timeouts.next(), if self.streams.stream_timeouts.has_pending() => {
debug!("Flushing stream = {}", timedout_stream);
if let Err(e) = self.streams.flush_stream(&timedout_stream).await {
error!("Failed to flush stream = {}. Error = {}", timedout_stream, e);
}
}

// Flush all metrics when timed out
_ = metrics_timeout.tick() => {
if let Err(e) = self.streams.check_and_flush_metrics() {
debug!("Failed to flush stream metrics. Error = {}", e);
}
}

// Handle a shutdown signal
_ = self.ctrl_rx.recv_async() => {
if let Err(e) = self.save_current_action() {
Expand Down Expand Up @@ -500,9 +533,8 @@ pub struct ActionRouter {
impl ActionRouter {
#[allow(clippy::result_large_err)]
/// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete
pub fn try_send(&self, mut action: Action) -> Result<Instant, TrySendError<Action>> {
pub fn try_send(&self, action: Action) -> Result<Instant, TrySendError<Action>> {
let deadline = Instant::now() + self.duration;
action.deadline = Some(deadline);
self.actions_tx.try_send(action)?;

Ok(deadline)
Expand Down Expand Up @@ -646,7 +678,6 @@ mod tests {
action_id: "1".to_string(),
name: "route_1".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_1).unwrap();

Expand All @@ -669,7 +700,6 @@ mod tests {
action_id: "2".to_string(),
name: "route_2".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_2).unwrap();

Expand Down Expand Up @@ -716,7 +746,6 @@ mod tests {
action_id: "1".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_1).unwrap();

Expand All @@ -730,7 +759,6 @@ mod tests {
action_id: "2".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_2).unwrap();

Expand Down Expand Up @@ -774,7 +802,6 @@ mod tests {
action_id: "1".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -845,7 +872,6 @@ mod tests {
action_id: "1".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -921,7 +947,6 @@ mod tests {
action_id: "1".to_string(),
name: "launch_shell".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand All @@ -931,7 +956,6 @@ mod tests {
action_id: "2".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -1017,7 +1041,6 @@ mod tests {
action_id: "1".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand All @@ -1027,7 +1050,6 @@ mod tests {
action_id: "2".to_string(),
name: "launch_shell".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down
40 changes: 11 additions & 29 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identit
use rsa::sha2::{Digest, Sha256};
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::time::{timeout_at, Instant};
use tokio::time::Instant;

use std::fs::{metadata, read, remove_dir_all, remove_file, write, File};
use std::io;
Expand Down Expand Up @@ -94,8 +94,8 @@ pub enum Error {
BadSave,
#[error("Save file doesn't exist")]
NoSave,
#[error("Download has been cancelled")]
Cancelled,
#[error("Download has been cancelled by '{0}'")]
Cancelled(String),
}

/// This struct contains the necessary components to download and store file as notified by a download file
Expand Down Expand Up @@ -205,18 +205,14 @@ impl FileDownloader {
// Accepts `DownloadState`, sets a timeout for the action
async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> {
let shutdown_rx = self.shutdown_rx.clone();
let deadline = match &state.current.action.deadline {
Some(d) => *d,
_ => {
error!("Unconfigured deadline: {}", state.current.action.name);
return Ok(());
}
};
select! {
// Wait till download completes
o = self.continuous_retry(state) => o?,
// Cancel download on receiving cancel action, e.g. on action timeout
Ok(action) = self.actions_rx.recv_async() => {
if action.name != "cancel-action" {
warn!("Unexpected action: {action:?}");
unreachable!("Only cancel-action should be sent to downloader while it is handling another downlaod!!");
unreachable!("Only cancel-action should be sent to downloader while it is handling another download!!");
}

let cancellation: Cancellation = serde_json::from_str(&action.payload)?;
Expand All @@ -228,7 +224,7 @@ impl FileDownloader {
trace!("Deleting partially downloaded file: {cancellation:?}");
state.clean()?;

return Err(Error::Cancelled);
return Err(Error::Cancelled(action.action_id));
},

Ok(_) = shutdown_rx.recv_async(), if !shutdown_rx.is_disconnected() => {
Expand All @@ -238,12 +234,6 @@ impl FileDownloader {

return Ok(());
},

// NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries
o = timeout_at(deadline, self.continuous_retry(state)) => match o {
Ok(r) => r?,
Err(_) => error!("Last download has timedout"),
}
}

state.current.meta.verify_checksum()?;
Expand Down Expand Up @@ -386,7 +376,6 @@ impl DownloadFile {
struct CurrentDownload {
action: Action,
meta: DownloadFile,
time_left: Option<Duration>,
}

// A temporary structure to help us retry downloads
Expand Down Expand Up @@ -439,7 +428,7 @@ impl DownloadState {
human_bytes(meta.content_length as f64)
);
meta.download_path = Some(file_path);
let current = CurrentDownload { action, meta, time_left: None };
let current = CurrentDownload { action, meta };

Ok(Self {
current,
Expand All @@ -459,9 +448,7 @@ impl DownloadState {
}

let read = read(&path)?;
let mut current: CurrentDownload = serde_json::from_slice(&read)?;
// Calculate deadline based on written time left
current.action.deadline = current.time_left.map(|t| Instant::now() + t);
let current: CurrentDownload = serde_json::from_slice(&read)?;

// Unwrap is ok here as it is expected to be set for actions once received
let file = File::open(current.meta.download_path.as_ref().unwrap())?;
Expand All @@ -483,9 +470,7 @@ impl DownloadState {
return Ok(());
}

let mut current = self.current.clone();
// Calculate time left based on deadline
current.time_left = current.action.deadline.map(|t| t.duration_since(Instant::now()));
let current = self.current.clone();
let json = serde_json::to_vec(&current)?;

let mut path = config.path.clone();
Expand Down Expand Up @@ -642,7 +627,6 @@ mod test {
action_id: "1".to_string(),
name: "firmware_update".to_string(),
payload: json!(download_update).to_string(),
deadline: Some(Instant::now() + Duration::from_secs(60)),
};

std::thread::sleep(Duration::from_millis(10));
Expand Down Expand Up @@ -703,7 +687,6 @@ mod test {
action_id: "1".to_string(),
name: "firmware_update".to_string(),
payload: json!(correct_update).to_string(),
deadline: Some(Instant::now() + Duration::from_secs(100)),
};

// Send the correct action to FileDownloader
Expand Down Expand Up @@ -741,7 +724,6 @@ mod test {
action_id: "1".to_string(),
name: "firmware_update".to_string(),
payload: json!(wrong_update).to_string(),
deadline: Some(Instant::now() + Duration::from_secs(100)),
};

// Send the wrong action to FileDownloader
Expand Down
Loading

0 comments on commit 2cf547a

Please sign in to comment.