Skip to content

Commit

Permalink
refactor: remove timeout information from apps (#293)
Browse files Browse the repository at this point in the history
* refactor: remove timeout information from apps

* doc: comment describing `ActionRoute.try_send`

* refactor: don't use optional

* test: remove unnecessary test

* test: set deadline to minute later

* Revert "refactor: don't use optional"

This reverts commit e9aac6d.
  • Loading branch information
Devdutt Shenoi authored Oct 13, 2023
1 parent adc6695 commit a0fb251
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 120 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;

use crate::{Payload, Point};

Expand All @@ -19,6 +20,9 @@ pub struct Action {
pub name: String,
// action payload. json. can be args/payload. depends on the invoked command
pub payload: String,
// Instant at which action must be timedout
#[serde(skip)]
pub deadline: Option<Instant>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
25 changes: 19 additions & 6 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ impl ActionsBridge {
.get(&action.name)
.ok_or_else(|| Error::NoRoute(action.name.clone()))?;

let duration = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
// current action left unchanged in case of new tunshell action
if action.name == TUNSHELL_ACTION {
self.parallel_actions.insert(action.action_id);
return Ok(());
}

self.current_action = Some(CurrentAction::new(action, duration));
self.current_action = Some(CurrentAction::new(action, deadline));

Ok(())
}
Expand Down Expand Up @@ -378,11 +378,11 @@ struct CurrentAction {
}

impl CurrentAction {
pub fn new(action: Action, duration: Duration) -> CurrentAction {
pub fn new(action: Action, deadline: Instant) -> CurrentAction {
CurrentAction {
id: action.action_id.clone(),
action,
timeout: Box::pin(time::sleep(duration)),
timeout: Box::pin(time::sleep_until(deadline)),
}
}

Expand Down Expand Up @@ -416,10 +416,13 @@ pub struct ActionRouter {

impl ActionRouter {
#[allow(clippy::result_large_err)]
pub fn try_send(&self, action: Action) -> Result<Duration, TrySendError<Action>> {
/// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete
pub fn try_send(&self, mut action: Action) -> Result<Instant, TrySendError<Action>> {
let deadline = Instant::now() + self.duration;
action.deadline = Some(deadline);
self.actions_tx.try_send(action)?;

Ok(self.duration)
Ok(deadline)
}
}

Expand Down Expand Up @@ -544,6 +547,7 @@ mod tests {
kind: "test".to_string(),
name: "route_1".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_1).unwrap();

Expand All @@ -567,6 +571,7 @@ mod tests {
kind: "test".to_string(),
name: "route_2".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_2).unwrap();

Expand Down Expand Up @@ -610,6 +615,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_1).unwrap();

Expand All @@ -624,6 +630,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_2).unwrap();

Expand Down Expand Up @@ -664,6 +671,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -727,6 +735,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -795,6 +804,7 @@ mod tests {
kind: "tunshell".to_string(),
name: "launch_shell".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand All @@ -805,6 +815,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -883,6 +894,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand All @@ -893,6 +905,7 @@ mod tests {
kind: "tunshell".to_string(),
name: "launch_shell".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down
79 changes: 9 additions & 70 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ use human_bytes::human_bytes;
use log::{debug, error, info, trace, warn};
use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identity, Response};
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
use tokio::time::{timeout_at, Instant};

use std::collections::HashMap;
use std::fs::{metadata, remove_dir_all, File};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
#[cfg(unix)]
use std::{
fs::{create_dir, set_permissions, Permissions},
Expand Down Expand Up @@ -100,7 +99,6 @@ pub struct FileDownloader {
bridge_tx: BridgeTx,
client: Client,
sequence: u32,
timeouts: HashMap<String, Duration>,
}

impl FileDownloader {
Expand All @@ -125,17 +123,9 @@ impl FileDownloader {
}
.build()?;

let timeouts = config
.downloader
.actions
.iter()
.map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout)))
.collect();

Ok(Self {
config: config.downloader.clone(),
actions_rx,
timeouts,
client,
bridge_tx,
sequence: 0,
Expand All @@ -158,17 +148,17 @@ impl FileDownloader {
}
};
self.action_id = action.action_id.clone();

let duration = match self.timeouts.get(&action.name) {
Some(t) => *t,
let deadline = match &action.deadline {
Some(d) => *d,
_ => {
error!("Action: {} unconfigured", action.name);
error!("Unconfigured deadline: {}", action.name);
continue;
}
};

// NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries
match timeout(duration, self.run(action)).await {

match timeout_at(deadline, self.run(action)).await {
Ok(Err(e)) => self.forward_error(e).await,
Err(_) => error!("Last download has timedout"),
_ => {}
Expand Down Expand Up @@ -417,7 +407,7 @@ impl DownloadState {

#[cfg(test)]
mod test {
use flume::{bounded, TrySendError};
use flume::bounded;
use serde_json::json;

use std::{collections::HashMap, time::Duration};
Expand Down Expand Up @@ -492,6 +482,7 @@ mod test {
kind: "firmware_update".to_string(),
name: "firmware_update".to_string(),
payload: json!(download_update).to_string(),
deadline: Some(Instant::now() + Duration::from_secs(60)),
};

std::thread::sleep(Duration::from_millis(10));
Expand Down Expand Up @@ -521,56 +512,4 @@ mod test {
}
}
}

#[test]
fn multiple_actions_at_once() {
// Ensure path exists
std::fs::create_dir_all(DOWNLOAD_DIR).unwrap();
// Prepare config
let mut path = PathBuf::from(DOWNLOAD_DIR);
path.push("download");
let downloader_cfg = DownloaderConfig {
actions: vec![ActionRoute { name: "firmware_update".to_owned(), timeout: 10 }],
path,
};
let config = config(downloader_cfg.clone());
let (bridge_tx, _) = create_bridge();

// Create channels to forward and push actions on
let (download_tx, download_rx) = bounded(1);
let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap();

// Start FileDownloader in separate thread
std::thread::spawn(|| downloader.start());

// Create a firmware update action
let download_update = DownloadFile {
content_length: 0,
url: "https://github.com/bytebeamio/uplink/raw/main/docs/logo.png".to_string(),
file_name: "1.0".to_string(),
download_path: None,
};
let mut expected_forward = download_update.clone();
let mut path = downloader_cfg.path;
path.push("firmware_update");
path.push("test.txt");
expected_forward.download_path = Some(path);
let download_action = Action {
action_id: "1".to_string(),
kind: "firmware_update".to_string(),
name: "firmware_update".to_string(),
payload: json!(download_update).to_string(),
};

std::thread::sleep(Duration::from_millis(10));

// Send action to FileDownloader with Sender<Action>
download_tx.try_send(download_action.clone()).unwrap();

// Send action to FileDownloader immediately after, this must fail
match download_tx.try_send(download_action).unwrap_err() {
TrySendError::Full(_) => {}
TrySendError::Disconnected(_) => panic!("Unexpected disconnect"),
}
}
}
31 changes: 12 additions & 19 deletions uplink/src/collector/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ use thiserror::Error;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::select;
use tokio::time::timeout;
use tokio::time::timeout_at;

use crate::base::bridge::BridgeTx;
use crate::{Action, ActionResponse, ActionRoute, Package};
use crate::{Action, ActionResponse, Package};

use std::collections::HashMap;
use std::io;
use std::process::Stdio;
use std::time::Duration;

#[derive(Error, Debug)]
pub enum Error {
Expand All @@ -39,22 +37,11 @@ pub struct ProcessHandler {
actions_rx: Receiver<Action>,
// to send responses back to bridge
bridge_tx: BridgeTx,
// to timeout actions as per action route configuration
timeouts: HashMap<String, Duration>,
}

impl ProcessHandler {
pub fn new(
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
action_routes: &[ActionRoute],
) -> Self {
let timeouts = action_routes
.iter()
.map(|ActionRoute { name, timeout }| (name.to_owned(), Duration::from_secs(*timeout)))
.collect();

Self { actions_rx, bridge_tx, timeouts }
pub fn new(actions_rx: Receiver<Action>, bridge_tx: BridgeTx) -> Self {
Self { actions_rx, bridge_tx }
}

/// Run a process of specified command
Expand Down Expand Up @@ -96,11 +83,17 @@ impl ProcessHandler {
loop {
let action = self.actions_rx.recv_async().await?;
let command = String::from("tools/") + &action.name;
let duration = self.timeouts.get(&action.name).unwrap().to_owned();
let deadline = match &action.deadline {
Some(d) => *d,
_ => {
error!("Unconfigured deadline: {}", action.name);
continue;
}
};

// Spawn the action and capture its stdout, ignore timeouts
let child = self.run(&action.action_id, &command, &action.payload).await?;
if let Ok(o) = timeout(duration, self.spawn_and_capture_stdout(child)).await {
if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await {
o?;
} else {
error!("Process timedout: {command}; action_id = {}", action.action_id);
Expand Down
Loading

0 comments on commit a0fb251

Please sign in to comment.