Skip to content

Commit

Permalink
refactor: downloader (#338)
Browse files Browse the repository at this point in the history
* stop sequencing responses

* new is main constructor

* refactor: use while loop

* doc: fix explanation
  • Loading branch information
Devdutt Shenoi authored Apr 4, 2024
1 parent 6c723c9 commit 2e984f4
Showing 1 changed file with 57 additions and 76 deletions.
133 changes: 57 additions & 76 deletions uplink/src/collector/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ pub struct FileDownloader {
action_id: String,
bridge_tx: BridgeTx,
client: Client,
sequence: u32,
shutdown_rx: Receiver<DownloaderShutdown>,
}

Expand Down Expand Up @@ -137,7 +136,6 @@ impl FileDownloader {
actions_rx,
client,
bridge_tx,
sequence: 0,
action_id: String::default(),
shutdown_rx,
})
Expand All @@ -150,15 +148,7 @@ impl FileDownloader {
self.reload().await;

info!("Downloader thread is ready to receive download actions");
loop {
self.sequence = 0;
let action = match self.actions_rx.recv_async().await {
Ok(a) => a,
Err(e) => {
error!("Downloader thread had to stop: {e}");
break;
}
};
while let Ok(action) = self.actions_rx.recv_async().await {
self.action_id = action.action_id.clone();
let mut state = match DownloadState::new(action, &self.config) {
Ok(s) => s,
Expand All @@ -169,8 +159,7 @@ impl FileDownloader {
};

// Update action status for process initiated
let status = ActionResponse::progress(&self.action_id, "Downloading", 0)
.set_sequence(self.sequence());
let status = ActionResponse::progress(&self.action_id, "Downloading", 0);
self.bridge_tx.send_action_response(status).await;

if let Err(e) = self.download(&mut state).await {
Expand All @@ -179,13 +168,14 @@ impl FileDownloader {

// Forward updated action as part of response
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action))
.set_sequence(self.sequence());
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action));
self.bridge_tx.send_action_response(status).await;
}

error!("Downloader thread stopped");
}

// reloads a download if it wasn't completed during the previous run of uplink
// Loads a download left uncompleted during the previous run of uplink and continues it
async fn reload(&mut self) {
let mut state = match DownloadState::load(&self.config) {
Ok(s) => s,
Expand All @@ -203,8 +193,7 @@ impl FileDownloader {

// Forward updated action as part of response
let DownloadState { current: CurrentDownload { action, .. }, .. } = state;
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action))
.set_sequence(self.sequence());
let status = ActionResponse::done(&self.action_id, "Downloaded", Some(action));
self.bridge_tx.send_action_response(status).await;
}
// Accepts `DownloadState`, sets a timeout for the action
Expand Down Expand Up @@ -257,7 +246,6 @@ impl FileDownloader {
Err(e) if !e.is_status() => {
let status =
ActionResponse::progress(&self.action_id, "Download Failed", 0)
.set_sequence(self.sequence())
.add_error(e.to_string());
self.bridge_tx.send_action_response(status).await;
error!("Download failed: {e}");
Expand All @@ -270,7 +258,6 @@ impl FileDownloader {
if let Some(percentage) = state.write_bytes(&chunk)? {
let status =
ActionResponse::progress(&self.action_id, "Downloading", percentage);
let status = status.set_sequence(self.sequence());
self.bridge_tx.send_action_response(status).await;
}
}
Expand All @@ -284,15 +271,9 @@ impl FileDownloader {

// Forward errors as action response to bridge
async fn forward_error(&mut self, err: Error) {
let status =
ActionResponse::failure(&self.action_id, err.to_string()).set_sequence(self.sequence());
let status = ActionResponse::failure(&self.action_id, err.to_string());
self.bridge_tx.send_action_response(status).await;
}

fn sequence(&mut self) -> u32 {
self.sequence += 1;
self.sequence
}
}

#[cfg(unix)]
Expand Down Expand Up @@ -395,6 +376,55 @@ struct DownloadState {
}

impl DownloadState {
fn new(action: Action, config: &DownloaderConfig) -> Result<Self, Error> {
// Ensure that directory for downloading file into, exists
let mut path = config.path.clone();
path.push(&action.name);

#[cfg(unix)]
create_dirs_with_perms(
path.as_path(),
std::os::unix::fs::PermissionsExt::from_mode(0o777),
)?;

#[cfg(not(unix))]
std::fs::create_dir_all(&path)?;

// Extract url information from action payload
let mut meta = match serde_json::from_str::<DownloadFile>(&action.payload)? {
DownloadFile { file_name, .. } if file_name.is_empty() => {
return Err(Error::EmptyFileName)
}
DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile),
u => u,
};

check_disk_size(config, &meta)?;

let url = meta.url.clone();

// Create file to actually download into
let (file, file_path) = create_file(&path, &meta.file_name)?;
// Retry downloading upto 3 times in case of connectivity issues
// TODO: Error out for 1XX/3XX responses
info!(
"Downloading from {} into {}; size = {}",
url,
file_path.display(),
human_bytes(meta.content_length as f64)
);
meta.download_path = Some(file_path);
let current = CurrentDownload { action, meta, time_left: None };

Ok(Self {
current,
file,
bytes_written: 0,
percentage_downloaded: 0,
start: Instant::now(),
})
}

fn load(config: &DownloaderConfig) -> Result<Self, Error> {
let mut path = config.path.clone();
path.push("current_download");
Expand Down Expand Up @@ -447,55 +477,6 @@ impl DownloadState {
Some(format!("bytes={}-{}", self.bytes_written, self.current.meta.content_length))
}

fn new(action: Action, config: &DownloaderConfig) -> Result<Self, Error> {
// Ensure that directory for downloading file into, exists
let mut path = config.path.clone();
path.push(&action.name);

#[cfg(unix)]
create_dirs_with_perms(
path.as_path(),
std::os::unix::fs::PermissionsExt::from_mode(0o777),
)?;

#[cfg(not(unix))]
std::fs::create_dir_all(&path)?;

// Extract url information from action payload
let mut meta = match serde_json::from_str::<DownloadFile>(&action.payload)? {
DownloadFile { file_name, .. } if file_name.is_empty() => {
return Err(Error::EmptyFileName)
}
DownloadFile { content_length: 0, .. } => return Err(Error::EmptyFile),
u => u,
};

check_disk_size(config, &meta)?;

let url = meta.url.clone();

// Create file to actually download into
let (file, file_path) = create_file(&path, &meta.file_name)?;
// Retry downloading upto 3 times in case of connectivity issues
// TODO: Error out for 1XX/3XX responses
info!(
"Downloading from {} into {}; size = {}",
url,
file_path.display(),
human_bytes(meta.content_length as f64)
);
meta.download_path = Some(file_path);
let current = CurrentDownload { action, meta, time_left: None };

Ok(Self {
current,
file,
bytes_written: 0,
percentage_downloaded: 0,
start: Instant::now(),
})
}

fn write_bytes(&mut self, buf: &[u8]) -> Result<Option<u8>, Error> {
let bytes_downloaded = buf.len();
self.file.write_all(buf)?;
Expand Down

0 comments on commit 2e984f4

Please sign in to comment.