From 2e984f41ddd565ae7d4185856fa9aac7dde7fef2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Thu, 4 Apr 2024 16:47:02 +0530 Subject: [PATCH] refactor: downloader (#338) * stop sequencing responses * new is main constructor * refactor: use while loop * doc: fix explanation --- uplink/src/collector/downloader.rs | 133 +++++++++++++---------------- 1 file changed, 57 insertions(+), 76 deletions(-) diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 47b2856e..92188537 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -105,7 +105,6 @@ pub struct FileDownloader { action_id: String, bridge_tx: BridgeTx, client: Client, - sequence: u32, shutdown_rx: Receiver, } @@ -137,7 +136,6 @@ impl FileDownloader { actions_rx, client, bridge_tx, - sequence: 0, action_id: String::default(), shutdown_rx, }) @@ -150,15 +148,7 @@ impl FileDownloader { self.reload().await; info!("Downloader thread is ready to receive download actions"); - loop { - self.sequence = 0; - let action = match self.actions_rx.recv_async().await { - Ok(a) => a, - Err(e) => { - error!("Downloader thread had to stop: {e}"); - break; - } - }; + while let Ok(action) = self.actions_rx.recv_async().await { self.action_id = action.action_id.clone(); let mut state = match DownloadState::new(action, &self.config) { Ok(s) => s, @@ -169,8 +159,7 @@ impl FileDownloader { }; // Update action status for process initiated - let status = ActionResponse::progress(&self.action_id, "Downloading", 0) - .set_sequence(self.sequence()); + let status = ActionResponse::progress(&self.action_id, "Downloading", 0); self.bridge_tx.send_action_response(status).await; if let Err(e) = self.download(&mut state).await { @@ -179,13 +168,14 @@ impl FileDownloader { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) - .set_sequence(self.sequence()); + let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); self.bridge_tx.send_action_response(status).await; } + + error!("Downloader thread stopped"); } - // reloads a download if it wasn't completed during the previous run of uplink + // Loads a download left uncompleted during the previous run of uplink and continues it async fn reload(&mut self) { let mut state = match DownloadState::load(&self.config) { Ok(s) => s, @@ -203,8 +193,7 @@ impl FileDownloader { // Forward updated action as part of response let DownloadState { current: CurrentDownload { action, .. }, .. } = state; - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) - .set_sequence(self.sequence()); + let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)); self.bridge_tx.send_action_response(status).await; } // Accepts `DownloadState`, sets a timeout for the action @@ -257,7 +246,6 @@ impl FileDownloader { Err(e) if !e.is_status() => { let status = ActionResponse::progress(&self.action_id, "Download Failed", 0) - .set_sequence(self.sequence()) .add_error(e.to_string()); self.bridge_tx.send_action_response(status).await; error!("Download failed: {e}"); @@ -270,7 +258,6 @@ impl FileDownloader { if let Some(percentage) = state.write_bytes(&chunk)? { let status = ActionResponse::progress(&self.action_id, "Downloading", percentage); - let status = status.set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; } } @@ -284,15 +271,9 @@ impl FileDownloader { // Forward errors as action response to bridge async fn forward_error(&mut self, err: Error) { - let status = - ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); + let status = ActionResponse::failure(&self.action_id, err.to_string()); self.bridge_tx.send_action_response(status).await; } - - fn sequence(&mut self) -> u32 { - self.sequence += 1; - self.sequence - } } #[cfg(unix)] @@ -395,6 +376,55 @@ struct DownloadState { } impl DownloadState { + fn new(action: Action, config: &DownloaderConfig) -> Result { + // Ensure that directory for downloading file into, exists + let mut path = config.path.clone(); + path.push(&action.name); + + #[cfg(unix)] + create_dirs_with_perms( + path.as_path(), + std::os::unix::fs::PermissionsExt::from_mode(0o777), + )?; + + #[cfg(not(unix))] + std::fs::create_dir_all(&path)?; + + // Extract url information from action payload + let mut meta = match serde_json::from_str::(&action.payload)? { + DownloadFile { file_name, .. } if file_name.is_empty() => { + return Err(Error::EmptyFileName) + } + DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile), + u => u, + }; + + check_disk_size(config, &meta)?; + + let url = meta.url.clone(); + + // Create file to actually download into + let (file, file_path) = create_file(&path, &meta.file_name)?; + // Retry downloading upto 3 times in case of connectivity issues + // TODO: Error out for 1XX/3XX responses + info!( + "Downloading from {} into {}; size = {}", + url, + file_path.display(), + human_bytes(meta.content_length as f64) + ); + meta.download_path = Some(file_path); + let current = CurrentDownload { action, meta, time_left: None }; + + Ok(Self { + current, + file, + bytes_written: 0, + percentage_downloaded: 0, + start: Instant::now(), + }) + } + fn load(config: &DownloaderConfig) -> Result { let mut path = config.path.clone(); path.push("current_download"); @@ -447,55 +477,6 @@ impl DownloadState { Some(format!("bytes={}-{}", self.bytes_written, self.current.meta.content_length)) } - fn new(action: Action, config: &DownloaderConfig) -> Result { - // Ensure that directory for downloading file into, exists - let mut path = config.path.clone(); - path.push(&action.name); - - #[cfg(unix)] - create_dirs_with_perms( - path.as_path(), - std::os::unix::fs::PermissionsExt::from_mode(0o777), - )?; - - #[cfg(not(unix))] - std::fs::create_dir_all(&path)?; - - // Extract url information from action payload - let mut meta = match serde_json::from_str::(&action.payload)? { - DownloadFile { file_name, .. } if file_name.is_empty() => { - return Err(Error::EmptyFileName) - } - DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile), - u => u, - }; - - check_disk_size(config, &meta)?; - - let url = meta.url.clone(); - - // Create file to actually download into - let (file, file_path) = create_file(&path, &meta.file_name)?; - // Retry downloading upto 3 times in case of connectivity issues - // TODO: Error out for 1XX/3XX responses - info!( - "Downloading from {} into {}; size = {}", - url, - file_path.display(), - human_bytes(meta.content_length as f64) - ); - meta.download_path = Some(file_path); - let current = CurrentDownload { action, meta, time_left: None }; - - Ok(Self { - current, - file, - bytes_written: 0, - percentage_downloaded: 0, - start: Instant::now(), - }) - } - fn write_bytes(&mut self, buf: &[u8]) -> Result, Error> { let bytes_downloaded = buf.len(); self.file.write_all(buf)?;