diff --git a/uplink/src/collector/downloader.rs b/uplink/src/collector/downloader.rs index 79e262821..eca788148 100644 --- a/uplink/src/collector/downloader.rs +++ b/uplink/src/collector/downloader.rs @@ -150,29 +150,51 @@ impl FileDownloader { } }; self.action_id = action.action_id.clone(); - let deadline = match &action.deadline { - Some(d) => *d, - _ => { - error!("Unconfigured deadline: {}", action.name); + let mut state = match DownloadState::new(action, &self.config) { + Ok(s) => s, + Err(e) => { + self.forward_error(e).await; continue; } }; - // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + // Update action status for process initiated + let status = ActionResponse::progress(&self.action_id, "Downloading", 0) + .set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; - match timeout_at(deadline, self.run(action)).await { - Ok(Err(e)) => self.forward_error(e).await, - Err(_) => error!("Last download has timedout"), - _ => {} + if let Err(e) = self.download(&mut state).await { + self.forward_error(e).await } + + // Forward updated action as part of response + let DownloadState { action, .. } = state; + let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) + .set_sequence(self.sequence()); + self.bridge_tx.send_action_response(status).await; } } - // Forward errors as action response to bridge - async fn forward_error(&mut self, err: Error) { - let status = - ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; + // Accepts `DownloadState`, sets a timeout for the action + async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> { + let deadline = match &state.action.deadline { + Some(d) => *d, + _ => { + error!("Unconfigured deadline: {}", state.action.name); + return Ok(()); + } + }; + // NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries + match timeout_at(deadline, self.continuous_retry(state)).await { + Ok(r) => r?, + Err(_) => error!("Last download has timedout"), + } + + state.meta.verify_checksum()?; + // Update Action payload with `download_path`, i.e. downloaded file's location in fs + state.action.payload = serde_json::to_string(&state.meta)?; + + Ok(()) } // A download must be retried with Range header when HTTP/reqwest errors are faced @@ -181,7 +203,7 @@ impl FileDownloader { let mut req = self.client.get(&state.meta.url); if let Some(range) = state.retry_range() { warn!("Retrying download; Continuing to download file from: {range}"); - req = self.client.get(&state.meta.url).header("Range", range); + req = req.header("Range", range); } let mut stream = req.send().await?.error_for_status()?.bytes_stream(); @@ -189,6 +211,7 @@ impl FileDownloader { while let Some(item) = stream.next().await { let chunk = match item { Ok(c) => c, + // Retry non-status errors Err(e) if !e.is_status() => { let status = ActionResponse::progress(&self.action_id, "Download Failed", 0) @@ -203,8 +226,6 @@ impl FileDownloader { Err(e) => return Err(e.into()), }; if let Some(percentage) = state.write_bytes(&chunk)? { - //TODO: Simplify progress by reusing action_id and state - //TODO: let response = self.response.progress(percentage);?? let status = ActionResponse::progress(&self.action_id, "Downloading", percentage); let status = status.set_sequence(self.sequence()); @@ -219,26 +240,11 @@ impl FileDownloader { Ok(()) } - // Accepts a download `Action` and performs necessary data extraction to actually download the file - async fn run(&mut self, action: Action) -> Result<(), Error> { - // Update action status for process initiated - let status = ActionResponse::progress(&self.action_id, "Downloading", 0); - let status = status.set_sequence(self.sequence()); - self.bridge_tx.send_action_response(status).await; - - let mut state = DownloadState::new(action, &self.config)?; - self.continuous_retry(&mut state).await?; - - // Update Action payload with `download_path`, i.e. downloaded file's location in fs - let DownloadState { meta, mut action, .. } = state; - meta.verify_checksum()?; - action.payload = serde_json::to_string(&meta)?; - - let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action)) - .set_sequence(self.sequence()); + // Forward errors as action response to bridge + async fn forward_error(&mut self, err: Error) { + let status = + ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence()); self.bridge_tx.send_action_response(status).await; - - Ok(()) } fn sequence(&mut self) -> u32 {