From cf05049005df469c9a01fe6f45a020f8c0dfd514 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 2 Apr 2024 21:36:17 +0530 Subject: [PATCH 1/5] refactor: pull out methods --- uplink/src/collector/downloader.rs | 76 ++++++++++++++---------------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index c9fba6790..851101b78 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -216,7 +216,7 @@ impl FileDownloader { download_path.push(&action.name); #[cfg(unix)] - self.create_dirs_with_perms( + create_dirs_with_perms( download_path.as_path(), std::os::unix::fs::PermissionsExt::from_mode(0o777), )?; @@ -238,7 +238,7 @@ impl FileDownloader { let url = update.url.clone(); // Create file to actually download into - let (file, file_path) = self.create_file(&download_path, &update.file_name)?; + let (file, file_path) = create_file(&download_path, &update.file_name)?; // Retry downloading upto 3 times in case of connectivity issues // TODO: Error out for 1XX/3XX responses @@ -285,44 +285,6 @@ impl FileDownloader { Ok(()) } - #[cfg(unix)] - /// Custom create_dir_all which sets permissions on each created directory, only works on unix - fn create_dirs_with_perms(&self, path: &Path, perms: Permissions) -> std::io::Result<()> { - let mut current_path = PathBuf::new(); - - for component in path.components() { - current_path.push(component); - - if !current_path.exists() { - create_dir(¤t_path)?; - set_permissions(¤t_path, perms.clone())?; - } - } - - Ok(()) - } - - /// Creates file to download into - fn create_file( - &self, - download_path: &PathBuf, - file_name: &str, - ) -> Result<(File, PathBuf), Error> { - let mut file_path = download_path.to_owned(); - file_path.push(file_name); - // NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it - if let Ok(f) = metadata(&file_path) { - if f.is_dir() { - remove_dir_all(&file_path)?; - } - } - let file = File::create(&file_path)?; - #[cfg(unix)] - file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?; - - Ok((file, file_path)) - } - /// Downloads from server and stores into file async fn download( &mut self, @@ -354,6 +316,40 @@ impl FileDownloader { } } +#[cfg(unix)] +/// Custom create_dir_all which sets permissions on each created directory, only works on unix +fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<()> { + let mut current_path = PathBuf::new(); + + for component in path.components() { + current_path.push(component); + + if !current_path.exists() { + create_dir(¤t_path)?; + set_permissions(¤t_path, perms.clone())?; + } + } + + Ok(()) +} + +/// Creates file to download into +fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> { + let mut file_path = download_path.to_owned(); + file_path.push(file_name); + // NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it + if let Ok(f) = metadata(&file_path) { + if f.is_dir() { + remove_dir_all(&file_path)?; + } + } + let file = File::create(&file_path)?; + #[cfg(unix)] + file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?; + + Ok((file, file_path)) +} + /// Expected JSON format of data contained in the [`payload`] of a download file [`Action`] /// /// [`payload`]: Action#structfield.payload From f1ea32d29fd0d592314c362697ee3a75c59faf6b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 2 Apr 2024 22:01:53 +0530 Subject: [PATCH 2/5] refactor: move out method --- uplink/src/collector/downloader.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 851101b78..e8640b3ef 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -233,7 +233,7 @@ impl FileDownloader { u => u, }; - self.check_disk_size(&update)?; + check_disk_size(&self.config, &update)?; let url = update.url.clone(); @@ -271,20 +271,6 @@ impl FileDownloader { Ok(()) } - fn check_disk_size(&mut self, download: &DownloadFile) -> Result<(), Error> { - let disk_free_space = fs2::free_space(&self.config.path)? as usize; - - let req_size = human_bytes(download.content_length as f64); - let free_size = human_bytes(disk_free_space as f64); - debug!("Download requires {req_size}; Disk free space is {free_size}"); - - if download.content_length > disk_free_space { - return Err(Error::InsufficientDisk(free_size)); - } - - Ok(()) - } - /// Downloads from server and stores into file async fn download( &mut self, @@ -350,6 +336,20 @@ fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBu Ok((file, file_path)) } +fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> { + let disk_free_space = fs2::free_space(&config.path)? as usize; + + let req_size = human_bytes(download.content_length as f64); + let free_size = human_bytes(disk_free_space as f64); + debug!("Download requires {req_size}; Disk free space is {free_size}"); + + if download.content_length > disk_free_space { + return Err(Error::InsufficientDisk(free_size)); + } + + Ok(()) +} + /// Expected JSON format of data contained in the [`payload`] of a download file [`Action`] /// /// [`payload`]: Action#structfield.payload From 01ad94d7fdb593f7e0f5d285715cd36aa4e0f356 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 2 Apr 2024 22:03:25 +0530 Subject: [PATCH 3/5] refactor: constructor for `DownloadState` --- uplink/src/collector/downloader.rs | 138 ++++++++++++++--------------- 1 file changed, 68 insertions(+), 70 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index e8640b3ef..118b15ef5 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -176,14 +176,10 @@ impl FileDownloader { } // A download must be retried with Range header when HTTP/reqwest errors are faced - async fn continuous_retry( - &mut self, - url: &str, - mut download: DownloadState, - ) -> Result<(), Error> { - let mut req = self.client.get(url).send(); + async fn continuous_retry(&mut self, state: &mut DownloadState) -> Result<(), Error> { + let mut req = self.client.get(&state.meta.url).send(); loop { - match self.download(req, &mut download).await { + match self.download(req, state).await { Ok(_) => break, Err(Error::Reqwest(e)) if !e.is_status() => { let status = ActionResponse::progress(&self.action_id, "Download Failed", 0) @@ -196,76 +192,31 @@ impl FileDownloader { } tokio::time::sleep(Duration::from_secs(1)).await; - let range = download.retry_range(); + let range = state.retry_range(); warn!("Retrying download; Continuing to download file from: {range}"); - req = self.client.get(url).header("Range", range).send(); + req = self.client.get(&state.meta.url).header("Range", range).send(); } Ok(()) } // Accepts a download `Action` and performs necessary data extraction to actually download the file - async fn run(&mut self, mut action: Action) -> Result<(), Error> { + async fn run(&mut self, action: Action) -> Result<(), Error> { // Update action status for process initiated let status = ActionResponse::progress(&self.action_id, "Downloading", 0); let status = status.set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; - // Ensure that directory for downloading file into, exists - let mut download_path = self.config.path.clone(); - download_path.push(&action.name); - - #[cfg(unix)] - create_dirs_with_perms( - download_path.as_path(), - std::os::unix::fs::PermissionsExt::from_mode(0o777), - )?; - - #[cfg(not(unix))] - std::fs::create_dir_all(&download_path)?; - - // Extract url information from action payload - let mut update = match serde_json::from_str::(&action.payload)? { - DownloadFile { file_name, .. } if file_name.is_empty() => { - return Err(Error::EmptyFileName) - } - DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile), - u => u, - }; - - check_disk_size(&self.config, &update)?; - - let url = update.url.clone(); - - // Create file to actually download into - let (file, file_path) = create_file(&download_path, &update.file_name)?; - - // Retry downloading upto 3 times in case of connectivity issues - // TODO: Error out for 1XX/3XX responses - info!( - "Downloading from {} into {}; size = {}", - url, - file_path.display(), - human_bytes(update.content_length as f64) - ); - let download = DownloadState { - file, - bytes_written: 0, - bytes_downloaded: 0, - percentage_downloaded: 0, - content_length: update.content_length, - start_instant: Instant::now(), - }; - self.continuous_retry(&url, download).await?; + let mut state = DownloadState::new(action, &self.config)?; + self.continuous_retry(&mut state).await?; // Update Action payload with `download_path`, i.e. downloaded file's location in fs - update.insert_path(file_path.clone()); - update.verify_checksum()?; - - action.payload = serde_json::to_string(&update)?; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); + let DownloadState { meta, mut action, .. } = state; + meta.verify_checksum()?; + action.payload = serde_json::to_string(&meta)?; - let status = status.set_sequence(self.sequence()); + let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) + .set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; Ok(()) @@ -367,10 +318,6 @@ pub struct DownloadFile { } impl DownloadFile { - fn insert_path(&mut self, download_path: PathBuf) { - self.download_path = Some(download_path); - } - fn verify_checksum(&self) -> Result<(), Error> { let Some(checksum) = &self.checksum else { return Ok(()) }; let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\""); @@ -390,23 +337,74 @@ impl DownloadFile { // A temporary structure to help us retry downloads // that failed after partial completion. struct DownloadState { + action: Action, + meta: DownloadFile, file: File, bytes_written: usize, bytes_downloaded: usize, percentage_downloaded: u8, - content_length: usize, start_instant: Instant, } impl DownloadState { + fn new(action: Action, config: &DownloaderConfig) -> Result { + // Ensure that directory for downloading file into, exists + let mut path = config.path.clone(); + path.push(&action.name); + + #[cfg(unix)] + create_dirs_with_perms( + path.as_path(), + std::os::unix::fs::PermissionsExt::from_mode(0o777), + )?; + + #[cfg(not(unix))] + std::fs::create_dir_all(&path)?; + + // Extract url information from action payload + let mut meta = match serde_json::from_str::(&action.payload)? { + DownloadFile { file_name, .. } if file_name.is_empty() => { + return Err(Error::EmptyFileName) + } + DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile), + u => u, + }; + + check_disk_size(config, &meta)?; + + let url = meta.url.clone(); + + // Create file to actually download into + let (file, file_path) = create_file(&path, &meta.file_name)?; + // Retry downloading upto 3 times in case of connectivity issues + // TODO: Error out for 1XX/3XX responses + info!( + "Downloading from {} into {}; size = {}", + url, + file_path.display(), + human_bytes(meta.content_length as f64) + ); + meta.download_path = Some(file_path); + + Ok(DownloadState { + action, + meta, + file, + bytes_written: 0, + bytes_downloaded: 0, + percentage_downloaded: 0, + start_instant: Instant::now(), + }) + } + fn write_bytes(&mut self, buf: &[u8]) -> Result, Error> { self.bytes_downloaded += buf.len(); self.file.write_all(buf)?; self.bytes_written = self.bytes_downloaded; - let size = human_bytes(self.content_length as f64); + let size = human_bytes(self.meta.content_length as f64); // Calculate percentage on the basis of content_length - let factor = self.bytes_downloaded as f32 / self.content_length as f32; + let factor = self.bytes_downloaded as f32 / self.meta.content_length as f32; let percentage = (99.99 * factor) as u8; // NOTE: ensure lesser frequency of action responses, once every percentage points @@ -430,7 +428,7 @@ impl DownloadState { } fn retry_range(&self) -> String { - format!("bytes={}-{}", self.bytes_written, self.content_length) + format!("bytes={}-{}", self.bytes_written, self.meta.content_length) } } From 6f8821dfad45332fbd6270de1f39890cf4cc28c9 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 2 Apr 2024 22:46:11 +0530 Subject: [PATCH 4/5] refactor: `continuous_retry` --- uplink/src/collector/downloader.rs | 84 +++++++++++++++--------------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 118b15ef5..79e262821 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -49,10 +49,10 @@ use bytes::BytesMut; use flume::Receiver; -use futures_util::{Future, StreamExt}; +use futures_util::StreamExt; use human_bytes::human_bytes; use log::{debug, error, info, trace, warn}; -use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identity, Response}; +use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identity}; use rsa::sha2::{Digest, Sha256}; use serde::{Deserialize, Serialize}; use tokio::time::{timeout_at, Instant}; @@ -177,24 +177,43 @@ impl FileDownloader { // A download must be retried with Range header when HTTP/reqwest errors are faced async fn continuous_retry(&mut self, state: &mut DownloadState) -> Result<(), Error> { - let mut req = self.client.get(&state.meta.url).send(); - loop { - match self.download(req, state).await { - Ok(_) => break, - Err(Error::Reqwest(e)) if !e.is_status() => { - let status = ActionResponse::progress(&self.action_id, "Download Failed", 0) - .set_sequence(self.sequence()) - .add_error(e.to_string()); + 'outer: loop { + let mut req = self.client.get(&state.meta.url); + if let Some(range) = state.retry_range() { + warn!("Retrying download; Continuing to download file from: {range}"); + req = self.client.get(&state.meta.url).header("Range", range); + } + let mut stream = req.send().await?.error_for_status()?.bytes_stream(); + + // Download and store to disk by streaming as chunks + while let Some(item) = stream.next().await { + let chunk = match item { + Ok(c) => c, + Err(e) if !e.is_status() => { + let status = + ActionResponse::progress(&self.action_id, "Download Failed", 0) + .set_sequence(self.sequence()) + .add_error(e.to_string()); + self.bridge_tx.send_action_response(status).await; + error!("Download failed: {e}"); + // Retry after wait + tokio::time::sleep(Duration::from_secs(1)).await; + continue 'outer; + } + Err(e) => return Err(e.into()), + }; + if let Some(percentage) = state.write_bytes(&chunk)? { + //TODO: Simplify progress by reusing action_id and state + //TODO: let response = self.response.progress(percentage);?? + let status = + ActionResponse::progress(&self.action_id, "Downloading", percentage); + let status = status.set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; - error!("Download failed: {e}"); } - Err(e) => return Err(e), } - tokio::time::sleep(Duration::from_secs(1)).await; - let range = state.retry_range(); - warn!("Retrying download; Continuing to download file from: {range}"); - req = self.client.get(&state.meta.url).header("Range", range).send(); + info!("Firmware downloaded successfully"); + break; } Ok(()) @@ -222,31 +241,6 @@ impl FileDownloader { Ok(()) } - /// Downloads from server and stores into file - async fn download( - &mut self, - req: impl Future>, - download: &mut DownloadState, - ) -> Result<(), Error> { - let mut stream = req.await?.error_for_status()?.bytes_stream(); - - // Download and store to disk by streaming as chunks - while let Some(item) = stream.next().await { - let chunk = item?; - if let Some(percentage) = download.write_bytes(&chunk)? { - //TODO: Simplify progress by reusing action_id and state - //TODO: let response = self.response.progress(percentage);?? - let status = ActionResponse::progress(&self.action_id, "Downloading", percentage); - let status = status.set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; - } - } - - info!("Firmware downloaded successfully"); - - Ok(()) - } - fn sequence(&mut self) -> u32 { self.sequence += 1; self.sequence @@ -427,8 +421,12 @@ impl DownloadState { } } - fn retry_range(&self) -> String { - format!("bytes={}-{}", self.bytes_written, self.meta.content_length) + fn retry_range(&self) -> Option { + if self.bytes_written == self.meta.content_length { + return None; + } + + Some(format!("bytes={}-{}", self.bytes_written, self.meta.content_length)) } } From be31594e01656d70b734e65756fa37b42133a7c8 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 2 Apr 2024 23:46:27 +0530 Subject: [PATCH 5/5] refactor: final reorganize --- uplink/src/collector/downloader.rs | 78 ++++++++++++++++-------------- 1 file changed, 42 insertions(+), 36 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 79e262821..eca788148 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -150,29 +150,51 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); + let mut state = match DownloadState::new(action, &self.config) { + Ok(s) => s, + Err(e) => { + self.forward_error(e).await; continue; } }; - // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + // Update action status for process initiated + let status = ActionResponse::progress(&self.action_id, "Downloading", 0) + .set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; - match timeout_at(deadline, self.run(action)).await { - Ok(Err(e)) => self.forward_error(e).await, - Err(_) => error!("Last download has timedout"), - _ => {} + if let Err(e) = self.download(&mut state).await { + self.forward_error(e).await } + + // Forward updated action as part of response + let DownloadState { action, .. } = state; + let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) + .set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; } } - // Forward errors as action response to bridge - async fn forward_error(&mut self, err: Error) { - let status = - ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; + // Accepts `DownloadState`, sets a timeout for the action + async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> { + let deadline = match &state.action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", state.action.name); + return Ok(()); + } + }; + // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + match timeout_at(deadline, self.continuous_retry(state)).await { + Ok(r) => r?, + Err(_) => error!("Last download has timedout"), + } + + state.meta.verify_checksum()?; + // Update Action payload with `download_path`, i.e. downloaded file's location in fs + state.action.payload = serde_json::to_string(&state.meta)?; + + Ok(()) } // A download must be retried with Range header when HTTP/reqwest errors are faced @@ -181,7 +203,7 @@ impl FileDownloader { let mut req = self.client.get(&state.meta.url); if let Some(range) = state.retry_range() { warn!("Retrying download; Continuing to download file from: {range}"); - req = self.client.get(&state.meta.url).header("Range", range); + req = req.header("Range", range); } let mut stream = req.send().await?.error_for_status()?.bytes_stream(); @@ -189,6 +211,7 @@ impl FileDownloader { while let Some(item) = stream.next().await { let chunk = match item { Ok(c) => c, + // Retry non-status errors Err(e) if !e.is_status() => { let status = ActionResponse::progress(&self.action_id, "Download Failed", 0) @@ -203,8 +226,6 @@ impl FileDownloader { Err(e) => return Err(e.into()), }; if let Some(percentage) = state.write_bytes(&chunk)? { - //TODO: Simplify progress by reusing action_id and state - //TODO: let response = self.response.progress(percentage);?? let status = ActionResponse::progress(&self.action_id, "Downloading", percentage); let status = status.set_sequence(self.sequence()); @@ -219,26 +240,11 @@ impl FileDownloader { Ok(()) } - // Accepts a download `Action` and performs necessary data extraction to actually download the file - async fn run(&mut self, action: Action) -> Result<(), Error> { - // Update action status for process initiated - let status = ActionResponse::progress(&self.action_id, "Downloading", 0); - let status = status.set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; - - let mut state = DownloadState::new(action, &self.config)?; - self.continuous_retry(&mut state).await?; - - // Update Action payload with `download_path`, i.e. downloaded file's location in fs - let DownloadState { meta, mut action, .. } = state; - meta.verify_checksum()?; - action.payload = serde_json::to_string(&meta)?; - - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) - .set_sequence(self.sequence()); + // Forward errors as action response to bridge + async fn forward_error(&mut self, err: Error) { + let status = + ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; - - Ok(()) } fn sequence(&mut self) -> u32 {