Skip to content

Commit

Permalink
refactor: final reorganize
Browse files Browse the repository at this point in the history
  • Loading branch information
Devdutt Shenoi committed Apr 2, 2024
1 parent 6f8821d commit be31594
Showing 1 changed file with 42 additions and 36 deletions.
78 changes: 42 additions & 36 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,29 +150,51 @@ impl FileDownloader {
}
};
self.action_id = action.action_id.clone();
let deadline = match &action.deadline {
Some(d) => *d,
_ => {
error!("Unconfigured deadline: {}", action.name);
let mut state = match DownloadState::new(action, &self.config) {
Ok(s) => s,
Err(e) => {
self.forward_error(e).await;
continue;
}
};

// NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries
// Update action status for process initiated
let status = ActionResponse::progress(&self.action_id, "Downloading", 0)
.set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;

match timeout_at(deadline, self.run(action)).await {
Ok(Err(e)) => self.forward_error(e).await,
Err(_) => error!("Last download has timedout"),
_ => {}
if let Err(e) = self.download(&mut state).await {
self.forward_error(e).await
}

// Forward updated action as part of response
let DownloadState { action, .. } = state;
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action))
.set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;
}
}

// Forward errors as action response to bridge
async fn forward_error(&mut self, err: Error) {
let status =
ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;
// Accepts `DownloadState`, sets a timeout for the action
async fn download(&mut self, state: &mut DownloadState) -> Result<(), Error> {
let deadline = match &state.action.deadline {
Some(d) => *d,
_ => {
error!("Unconfigured deadline: {}", state.action.name);
return Ok(());
}
};
// NOTE: if download has timedout don't do anything, else ensure errors are forwarded after three retries
match timeout_at(deadline, self.continuous_retry(state)).await {
Ok(r) => r?,
Err(_) => error!("Last download has timedout"),
}

state.meta.verify_checksum()?;
// Update Action payload with `download_path`, i.e. downloaded file's location in fs
state.action.payload = serde_json::to_string(&state.meta)?;

Ok(())
}

// A download must be retried with Range header when HTTP/reqwest errors are faced
Expand All @@ -181,14 +203,15 @@ impl FileDownloader {
let mut req = self.client.get(&state.meta.url);
if let Some(range) = state.retry_range() {
warn!("Retrying download; Continuing to download file from: {range}");
req = self.client.get(&state.meta.url).header("Range", range);
req = req.header("Range", range);
}
let mut stream = req.send().await?.error_for_status()?.bytes_stream();

// Download and store to disk by streaming as chunks
while let Some(item) = stream.next().await {
let chunk = match item {
Ok(c) => c,
// Retry non-status errors
Err(e) if !e.is_status() => {
let status =
ActionResponse::progress(&self.action_id, "Download Failed", 0)
Expand All @@ -203,8 +226,6 @@ impl FileDownloader {
Err(e) => return Err(e.into()),
};
if let Some(percentage) = state.write_bytes(&chunk)? {
//TODO: Simplify progress by reusing action_id and state
//TODO: let response = self.response.progress(percentage);??
let status =
ActionResponse::progress(&self.action_id, "Downloading", percentage);
let status = status.set_sequence(self.sequence());
Expand All @@ -219,26 +240,11 @@ impl FileDownloader {
Ok(())
}

// Accepts a download `Action` and performs necessary data extraction to actually download the file
async fn run(&mut self, action: Action) -> Result<(), Error> {
// Update action status for process initiated
let status = ActionResponse::progress(&self.action_id, "Downloading", 0);
let status = status.set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;

let mut state = DownloadState::new(action, &self.config)?;
self.continuous_retry(&mut state).await?;

// Update Action payload with `download_path`, i.e. downloaded file's location in fs
let DownloadState { meta, mut action, .. } = state;
meta.verify_checksum()?;
action.payload = serde_json::to_string(&meta)?;

let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action))
.set_sequence(self.sequence());
// Forward errors as action response to bridge
async fn forward_error(&mut self, err: Error) {
let status =
ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;

Ok(())
}

fn sequence(&mut self) -> u32 {
Expand Down

0 comments on commit be31594

Please sign in to comment.