diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index c9fba679..eca78814 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -49,10 +49,10 @@ use bytes::BytesMut; use flume::Receiver; -use futures_util::{Future, StreamExt}; +use futures_util::StreamExt; use human_bytes::human_bytes; use log::{debug, error, info, trace, warn}; -use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identity, Response}; +use reqwest::{Certificate, Client, ClientBuilder, Error as ReqwestError, Identity}; use rsa::sha2::{Digest, Sha256}; use serde::{Deserialize, Serialize}; use tokio::time::{timeout_at, Instant}; @@ -150,208 +150,155 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); + let mut state = match DownloadState::new(action, &self.config) { + Ok(s) => s, + Err(e) => { + self.forward_error(e).await; continue; } }; - // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + // Update action status for process initiated + let status = ActionResponse::progress(&self.action_id, "Downloading", 0) + .set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; - match timeout_at(deadline, self.run(action)).await { - Ok(Err(e)) => self.forward_error(e).await, - Err(_) => error!("Last download has timedout"), - _ => {} + if let Err(e) = self.download(&mut state).await { + self.forward_error(e).await } + + // Forward updated action as part of response + let DownloadState { action, .. } = state; + let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) + .set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; } } - // Forward errors as action response to bridge - async fn forward_error(&mut self, err: Error) { - let status = - ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; + // Accepts `DownloadState`, sets a timeout for the action + async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> { + let deadline = match &state.action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", state.action.name); + return Ok(()); + } + }; + // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + match timeout_at(deadline, self.continuous_retry(state)).await { + Ok(r) => r?, + Err(_) => error!("Last download has timedout"), + } + + state.meta.verify_checksum()?; + // Update Action payload with `download_path`, i.e. downloaded file's location in fs + state.action.payload = serde_json::to_string(&state.meta)?; + + Ok(()) } // A download must be retried with Range header when HTTP/reqwest errors are faced - async fn continuous_retry( - &mut self, - url: &str, - mut download: DownloadState, - ) -> Result<(), Error> { - let mut req = self.client.get(url).send(); - loop { - match self.download(req, &mut download).await { - Ok(_) => break, - Err(Error::Reqwest(e)) if !e.is_status() => { - let status = ActionResponse::progress(&self.action_id, "Download Failed", 0) - .set_sequence(self.sequence()) - .add_error(e.to_string()); + async fn continuous_retry(&mut self, state: &mut DownloadState) -> Result<(), Error> { + 'outer: loop { + let mut req = self.client.get(&state.meta.url); + if let Some(range) = state.retry_range() { + warn!("Retrying download; Continuing to download file from: {range}"); + req = req.header("Range", range); + } + let mut stream = req.send().await?.error_for_status()?.bytes_stream(); + + // Download and store to disk by streaming as chunks + while let Some(item) = stream.next().await { + let chunk = match item { + Ok(c) => c, + // Retry non-status errors + Err(e) if !e.is_status() => { + let status = + ActionResponse::progress(&self.action_id, "Download Failed", 0) + .set_sequence(self.sequence()) + .add_error(e.to_string()); + self.bridge_tx.send_action_response(status).await; + error!("Download failed: {e}"); + // Retry after wait + tokio::time::sleep(Duration::from_secs(1)).await; + continue 'outer; + } + Err(e) => return Err(e.into()), + }; + if let Some(percentage) = state.write_bytes(&chunk)? { + let status = + ActionResponse::progress(&self.action_id, "Downloading", percentage); + let status = status.set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; - error!("Download failed: {e}"); } - Err(e) => return Err(e), } - tokio::time::sleep(Duration::from_secs(1)).await; - let range = download.retry_range(); - warn!("Retrying download; Continuing to download file from: {range}"); - req = self.client.get(url).header("Range", range).send(); + info!("Firmware downloaded successfully"); + break; } Ok(()) } - // Accepts a download `Action` and performs necessary data extraction to actually download the file - async fn run(&mut self, mut action: Action) -> Result<(), Error> { - // Update action status for process initiated - let status = ActionResponse::progress(&self.action_id, "Downloading", 0); - let status = status.set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; - - // Ensure that directory for downloading file into, exists - let mut download_path = self.config.path.clone(); - download_path.push(&action.name); - - #[cfg(unix)] - self.create_dirs_with_perms( - download_path.as_path(), - std::os::unix::fs::PermissionsExt::from_mode(0o777), - )?; - - #[cfg(not(unix))] - std::fs::create_dir_all(&download_path)?; - - // Extract url information from action payload - let mut update = match serde_json::from_str::(&action.payload)? { - DownloadFile { file_name, .. } if file_name.is_empty() => { - return Err(Error::EmptyFileName) - } - DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile), - u => u, - }; - - self.check_disk_size(&update)?; - - let url = update.url.clone(); - - // Create file to actually download into - let (file, file_path) = self.create_file(&download_path, &update.file_name)?; - - // Retry downloading upto 3 times in case of connectivity issues - // TODO: Error out for 1XX/3XX responses - info!( - "Downloading from {} into {}; size = {}", - url, - file_path.display(), - human_bytes(update.content_length as f64) - ); - let download = DownloadState { - file, - bytes_written: 0, - bytes_downloaded: 0, - percentage_downloaded: 0, - content_length: update.content_length, - start_instant: Instant::now(), - }; - self.continuous_retry(&url, download).await?; - - // Update Action payload with `download_path`, i.e. downloaded file's location in fs - update.insert_path(file_path.clone()); - update.verify_checksum()?; - - action.payload = serde_json::to_string(&update)?; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); - - let status = status.set_sequence(self.sequence()); + // Forward errors as action response to bridge + async fn forward_error(&mut self, err: Error) { + let status = + ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; + } - Ok(()) + fn sequence(&mut self) -> u32 { + self.sequence += 1; + self.sequence } +} - fn check_disk_size(&mut self, download: &DownloadFile) -> Result<(), Error> { - let disk_free_space = fs2::free_space(&self.config.path)? as usize; +#[cfg(unix)] +/// Custom create_dir_all which sets permissions on each created directory, only works on unix +fn create_dirs_with_perms(path: &Path, perms: Permissions) -> std::io::Result<()> { + let mut current_path = PathBuf::new(); - let req_size = human_bytes(download.content_length as f64); - let free_size = human_bytes(disk_free_space as f64); - debug!("Download requires {req_size}; Disk free space is {free_size}"); + for component in path.components() { + current_path.push(component); - if download.content_length > disk_free_space { - return Err(Error::InsufficientDisk(free_size)); + if !current_path.exists() { + create_dir(¤t_path)?; + set_permissions(¤t_path, perms.clone())?; } - - Ok(()) } - #[cfg(unix)] - /// Custom create_dir_all which sets permissions on each created directory, only works on unix - fn create_dirs_with_perms(&self, path: &Path, perms: Permissions) -> std::io::Result<()> { - let mut current_path = PathBuf::new(); - - for component in path.components() { - current_path.push(component); + Ok(()) +} - if !current_path.exists() { - create_dir(¤t_path)?; - set_permissions(¤t_path, perms.clone())?; - } +/// Creates file to download into +fn create_file(download_path: &PathBuf, file_name: &str) -> Result<(File, PathBuf), Error> { + let mut file_path = download_path.to_owned(); + file_path.push(file_name); + // NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it + if let Ok(f) = metadata(&file_path) { + if f.is_dir() { + remove_dir_all(&file_path)?; } - - Ok(()) } + let file = File::create(&file_path)?; + #[cfg(unix)] + file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?; - /// Creates file to download into - fn create_file( - &self, - download_path: &PathBuf, - file_name: &str, - ) -> Result<(File, PathBuf), Error> { - let mut file_path = download_path.to_owned(); - file_path.push(file_name); - // NOTE: if file_path is occupied by a directory due to previous working of uplink, remove it - if let Ok(f) = metadata(&file_path) { - if f.is_dir() { - remove_dir_all(&file_path)?; - } - } - let file = File::create(&file_path)?; - #[cfg(unix)] - file.set_permissions(std::os::unix::fs::PermissionsExt::from_mode(0o666))?; - - Ok((file, file_path)) - } + Ok((file, file_path)) +} - /// Downloads from server and stores into file - async fn download( - &mut self, - req: impl Future>, - download: &mut DownloadState, - ) -> Result<(), Error> { - let mut stream = req.await?.error_for_status()?.bytes_stream(); - - // Download and store to disk by streaming as chunks - while let Some(item) = stream.next().await { - let chunk = item?; - if let Some(percentage) = download.write_bytes(&chunk)? { - //TODO: Simplify progress by reusing action_id and state - //TODO: let response = self.response.progress(percentage);?? - let status = ActionResponse::progress(&self.action_id, "Downloading", percentage); - let status = status.set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; - } - } +fn check_disk_size(config: &DownloaderConfig, download: &DownloadFile) -> Result<(), Error> { + let disk_free_space = fs2::free_space(&config.path)? as usize; - info!("Firmware downloaded successfully"); + let req_size = human_bytes(download.content_length as f64); + let free_size = human_bytes(disk_free_space as f64); + debug!("Download requires {req_size}; Disk free space is {free_size}"); - Ok(()) + if download.content_length > disk_free_space { + return Err(Error::InsufficientDisk(free_size)); } - fn sequence(&mut self) -> u32 { - self.sequence += 1; - self.sequence - } + Ok(()) } /// Expected JSON format of data contained in the [`payload`] of a download file [`Action`] @@ -371,10 +318,6 @@ pub struct DownloadFile { } impl DownloadFile { - fn insert_path(&mut self, download_path: PathBuf) { - self.download_path = Some(download_path); - } - fn verify_checksum(&self) -> Result<(), Error> { let Some(checksum) = &self.checksum else { return Ok(()) }; let path = self.download_path.as_ref().expect("Downloader didn't set \"download_path\""); @@ -394,23 +337,74 @@ impl DownloadFile { // A temporary structure to help us retry downloads // that failed after partial completion. struct DownloadState { + action: Action, + meta: DownloadFile, file: File, bytes_written: usize, bytes_downloaded: usize, percentage_downloaded: u8, - content_length: usize, start_instant: Instant, } impl DownloadState { + fn new(action: Action, config: &DownloaderConfig) -> Result { + // Ensure that directory for downloading file into, exists + let mut path = config.path.clone(); + path.push(&action.name); + + #[cfg(unix)] + create_dirs_with_perms( + path.as_path(), + std::os::unix::fs::PermissionsExt::from_mode(0o777), + )?; + + #[cfg(not(unix))] + std::fs::create_dir_all(&path)?; + + // Extract url information from action payload + let mut meta = match serde_json::from_str::(&action.payload)? { + DownloadFile { file_name, .. } if file_name.is_empty() => { + return Err(Error::EmptyFileName) + } + DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile), + u => u, + }; + + check_disk_size(config, &meta)?; + + let url = meta.url.clone(); + + // Create file to actually download into + let (file, file_path) = create_file(&path, &meta.file_name)?; + // Retry downloading upto 3 times in case of connectivity issues + // TODO: Error out for 1XX/3XX responses + info!( + "Downloading from {} into {}; size = {}", + url, + file_path.display(), + human_bytes(meta.content_length as f64) + ); + meta.download_path = Some(file_path); + + Ok(DownloadState { + action, + meta, + file, + bytes_written: 0, + bytes_downloaded: 0, + percentage_downloaded: 0, + start_instant: Instant::now(), + }) + } + fn write_bytes(&mut self, buf: &[u8]) -> Result, Error> { self.bytes_downloaded += buf.len(); self.file.write_all(buf)?; self.bytes_written = self.bytes_downloaded; - let size = human_bytes(self.content_length as f64); + let size = human_bytes(self.meta.content_length as f64); // Calculate percentage on the basis of content_length - let factor = self.bytes_downloaded as f32 / self.content_length as f32; + let factor = self.bytes_downloaded as f32 / self.meta.content_length as f32; let percentage = (99.99 * factor) as u8; // NOTE: ensure lesser frequency of action responses, once every percentage points @@ -433,8 +427,12 @@ impl DownloadState { } } - fn retry_range(&self) -> String { - format!("bytes={}-{}", self.bytes_written, self.content_length) + fn retry_range(&self) -> Option { + if self.bytes_written == self.meta.content_length { + return None; + } + + Some(format!("bytes={}-{}", self.bytes_written, self.meta.content_length)) } }