Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove timeout information from apps #293

Merged
merged 7 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use serde::{Deserialize, Serialize};
use serde_json::json;
use tokio::time::Instant;

use crate::{Payload, Point};

Expand All @@ -19,6 +20,9 @@ pub struct Action {
pub name: String,
// action payload. json. can be args/payload. depends on the invoked command
pub payload: String,
// Instant at which action must be timedout
#[serde(skip)]
pub deadline: Option<Instant>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
25 changes: 19 additions & 6 deletions uplink/src/base/bridge/actions_lane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,14 @@ impl ActionsBridge {
.get(&action.name)
.ok_or_else(|| Error::NoRoute(action.name.clone()))?;

let duration = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
let deadline = route.try_send(action.clone()).map_err(|_| Error::UnresponsiveReceiver)?;
// current action left unchanged in case of new tunshell action
if action.name == TUNSHELL_ACTION {
self.parallel_actions.insert(action.action_id);
return Ok(());
}

self.current_action = Some(CurrentAction::new(action, duration));
self.current_action = Some(CurrentAction::new(action, deadline));

Ok(())
}
Expand Down Expand Up @@ -378,11 +378,11 @@ struct CurrentAction {
}

impl CurrentAction {
pub fn new(action: Action, duration: Duration) -> CurrentAction {
pub fn new(action: Action, deadline: Instant) -> CurrentAction {
CurrentAction {
id: action.action_id.clone(),
action,
timeout: Box::pin(time::sleep(duration)),
timeout: Box::pin(time::sleep_until(deadline)),
}
}

Expand Down Expand Up @@ -416,10 +416,13 @@ pub struct ActionRouter {

impl ActionRouter {
#[allow(clippy::result_large_err)]
pub fn try_send(&self, action: Action) -> Result<Duration, TrySendError<Action>> {
/// Forwards action to the appropriate application and returns the instance in time at which it should be timedout if incomplete
pub fn try_send(&self, mut action: Action) -> Result<Instant, TrySendError<Action>> {
de-sh marked this conversation as resolved.
Show resolved Hide resolved
let deadline = Instant::now() + self.duration;
action.deadline = Some(deadline);
self.actions_tx.try_send(action)?;

Ok(self.duration)
Ok(deadline)
}
}

Expand Down Expand Up @@ -544,6 +547,7 @@ mod tests {
kind: "test".to_string(),
name: "route_1".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_1).unwrap();

Expand All @@ -567,6 +571,7 @@ mod tests {
kind: "test".to_string(),
name: "route_2".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_2).unwrap();

Expand Down Expand Up @@ -610,6 +615,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_1).unwrap();

Expand All @@ -624,6 +630,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action_2).unwrap();

Expand Down Expand Up @@ -664,6 +671,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -727,6 +735,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -795,6 +804,7 @@ mod tests {
kind: "tunshell".to_string(),
name: "launch_shell".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand All @@ -805,6 +815,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down Expand Up @@ -883,6 +894,7 @@ mod tests {
kind: "test".to_string(),
name: "test".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand All @@ -893,6 +905,7 @@ mod tests {
kind: "tunshell".to_string(),
name: "launch_shell".to_string(),
payload: "test".to_string(),
deadline: None,
};
actions_tx.send(action).unwrap();

Expand Down
79 changes: 9 additions & 70 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,11 @@ use human_bytes::human_bytes;
use log::{debug, error, info, trace, warn};
use reqwest::{Certificate, Client, ClientBuilder, Identity, Response};
use serde::{Deserialize, Serialize};
use tokio::time::timeout;
use tokio::time::{timeout_at, Instant};

use std::collections::HashMap;
use std::fs::{metadata, remove_dir_all, File};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::time::Duration;
#[cfg(unix)]
use std::{
fs::{create_dir, set_permissions, Permissions},
Expand Down Expand Up @@ -100,7 +99,6 @@ pub struct FileDownloader {
bridge_tx: BridgeTx,
client: Client,
sequence: u32,
timeouts: HashMap<String, Duration>,
}

impl FileDownloader {
Expand All @@ -125,17 +123,9 @@ impl FileDownloader {
}
.build()?;

let timeouts = config
.downloader
.actions
.iter()
.map(|s| (s.name.to_owned(), Duration::from_secs(s.timeout)))
.collect();

Ok(Self {
config: config.downloader.clone(),
actions_rx,
timeouts,
client,
bridge_tx,
sequence: 0,
Expand All @@ -158,17 +148,17 @@ impl FileDownloader {
}
};
self.action_id = action.action_id.clone();

let duration = match self.timeouts.get(&action.name) {
Some(t) => *t,
let deadline = match &action.deadline {
Some(d) => *d,
_ => {
error!("Action: {} unconfigured", action.name);
error!("Unconfigured deadline: {}", action.name);
continue;
}
};

// NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries
match timeout(duration, self.run(action)).await {

match timeout_at(deadline, self.run(action)).await {
Ok(Err(e)) => self.forward_error(e).await,
Err(_) => error!("Last download has timedout"),
_ => {}
Expand Down Expand Up @@ -413,7 +403,7 @@ impl DownloadState {

#[cfg(test)]
mod test {
use flume::{bounded, TrySendError};
use flume::bounded;
use serde_json::json;

use std::{collections::HashMap, time::Duration};
Expand Down Expand Up @@ -488,6 +478,7 @@ mod test {
kind: "firmware_update".to_string(),
name: "firmware_update".to_string(),
payload: json!(download_update).to_string(),
deadline: Some(Instant::now() + Duration::from_secs(60)),
};

std::thread::sleep(Duration::from_millis(10));
Expand Down Expand Up @@ -517,56 +508,4 @@ mod test {
}
}
}

#[test]
fn multiple_actions_at_once() {
// Ensure path exists
std::fs::create_dir_all(DOWNLOAD_DIR).unwrap();
// Prepare config
let mut path = PathBuf::from(DOWNLOAD_DIR);
path.push("download");
let downloader_cfg = DownloaderConfig {
actions: vec![ActionRoute { name: "firmware_update".to_owned(), timeout: 10 }],
path,
};
let config = config(downloader_cfg.clone());
let (bridge_tx, _) = create_bridge();

// Create channels to forward and push actions on
let (download_tx, download_rx) = bounded(1);
let downloader = FileDownloader::new(Arc::new(config), download_rx, bridge_tx).unwrap();

// Start FileDownloader in separate thread
std::thread::spawn(|| downloader.start());

// Create a firmware update action
let download_update = DownloadFile {
content_length: 0,
url: "https://github.com/bytebeamio/uplink/raw/main/docs/logo.png".to_string(),
file_name: "1.0".to_string(),
download_path: None,
};
let mut expected_forward = download_update.clone();
let mut path = downloader_cfg.path;
path.push("firmware_update");
path.push("test.txt");
expected_forward.download_path = Some(path);
let download_action = Action {
action_id: "1".to_string(),
kind: "firmware_update".to_string(),
name: "firmware_update".to_string(),
payload: json!(download_update).to_string(),
};

std::thread::sleep(Duration::from_millis(10));

// Send action to FileDownloader with Sender<Action>
download_tx.try_send(download_action.clone()).unwrap();

// Send action to FileDownloader immediately after, this must fail
match download_tx.try_send(download_action).unwrap_err() {
TrySendError::Full(_) => {}
TrySendError::Disconnected(_) => panic!("Unexpected disconnect"),
}
}
}
31 changes: 12 additions & 19 deletions uplink/src/collector/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@ use thiserror::Error;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
use tokio::select;
use tokio::time::timeout;
use tokio::time::timeout_at;

use crate::base::bridge::BridgeTx;
use crate::{Action, ActionResponse, ActionRoute, Package};
use crate::{Action, ActionResponse, Package};

use std::collections::HashMap;
use std::io;
use std::process::Stdio;
use std::time::Duration;

#[derive(Error, Debug)]
pub enum Error {
Expand All @@ -39,22 +37,11 @@ pub struct ProcessHandler {
actions_rx: Receiver<Action>,
// to send responses back to bridge
bridge_tx: BridgeTx,
// to timeout actions as per action route configuration
timeouts: HashMap<String, Duration>,
}

impl ProcessHandler {
pub fn new(
actions_rx: Receiver<Action>,
bridge_tx: BridgeTx,
action_routes: &[ActionRoute],
) -> Self {
let timeouts = action_routes
.iter()
.map(|ActionRoute { name, timeout }| (name.to_owned(), Duration::from_secs(*timeout)))
.collect();

Self { actions_rx, bridge_tx, timeouts }
pub fn new(actions_rx: Receiver<Action>, bridge_tx: BridgeTx) -> Self {
Self { actions_rx, bridge_tx }
}

/// Run a process of specified command
Expand Down Expand Up @@ -96,11 +83,17 @@ impl ProcessHandler {
loop {
let action = self.actions_rx.recv_async().await?;
let command = String::from("tools/") + &action.name;
let duration = self.timeouts.get(&action.name).unwrap().to_owned();
let deadline = match &action.deadline {
Some(d) => *d,
_ => {
error!("Unconfigured deadline: {}", action.name);
continue;
}
};

// Spawn the action and capture its stdout, ignore timeouts
let child = self.run(&action.action_id, &command, &action.payload).await?;
if let Ok(o) = timeout(duration, self.spawn_and_capture_stdout(child)).await {
if let Ok(o) = timeout_at(deadline, self.spawn_and_capture_stdout(child)).await {
o?;
} else {
error!("Process timedout: {command}; action_id = {}", action.action_id);
Expand Down
Loading