From 914cc4fbbc79a1273ccadd407807bc1b3bad1fc2 Mon Sep 17 00:00:00 2001 From: Will Chandler <10663805+wfchandler@users.noreply.github.com> Date: Mon, 10 Feb 2025 10:39:10 -0500 Subject: [PATCH] Allow task failure on disk imports (#1002) * Allow task failure on disk imports Previously, when `oxide` initiated many concurrent uploads, network congestion could cause packet loss and timeouts for some worker tasks. While we added the undocumented `--parallelism` flag as a temporary workaround, this required users to manually diagnose network issues and adjust settings themselves. This commit attempts to make disk imports more reliable by allowing the job to continue when a subset of upload tasks have failed. When a worker encounters a network error, it reports the failed chunk's file offset back to the main task. The main task will retry these failed chunks after completing the initial upload attempt. This creates natural backpressure - as network congestion increases and more tasks fail, the number of concurrent uploads automatically decreases to a sustainable level. This requires a switch to a single `mpmc` channel shared between all worker tasks to avoid losing any buffered jobs when a task errors out. While we're at it, clean up disk imports with: * Replace synchronous file i/o with tokio fs * Remove redundant atomic progress variable The `watch` channel gives us a thread-safe way to communicate changes between tasks. Just use the `send_modify` method to increment the value and get rid of the unneeded `AtomicU64`. * Dedupe error messages Worker tasks are likely to encounter the same error when uploading chunks. Rather than listing the same error N times, dedupe them and show only unique errors. * Make upload_thread_ct NonZeroUsize It is invalid to set upload_thread_ct to zero. Update the API to enforce this constraint. * Rename upload_thread_ct to upload_task_ct The `upload_thread_ct` variable name is inaccurate, as we're creating tokio tasks to perform uploads, not OS threads. Correct the name to `upload_task_ct`. * Update copyright year --------- Co-authored-by: Adam Leventhal --- Cargo.lock | 25 +++ Cargo.toml | 1 + cli/src/cmd_disk.rs | 2 +- cli/tests/test_disk_import.rs | 2 +- integration-tests/tests/test_extras.rs | 6 +- sdk/Cargo.toml | 1 + sdk/src/extras/disk.rs | 213 +++++++++++++++---------- sdk/src/extras/mod.rs | 6 +- 8 files changed, 160 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ed7fb47f..eff73801 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0e4dd2a88388a1f4ccc7c9ce104604dab68d9f408dc34cd45823d5a9069095" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -2010,6 +2022,15 @@ dependencies = [ "version_check", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.15", +] + [[package]] name = "native-tls" version = "0.2.12" @@ -2197,6 +2218,7 @@ dependencies = [ "chrono", "clap", "dirs", + "flume", "futures", "progenitor-client", "rand", @@ -3248,6 +3270,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "stability" diff --git a/Cargo.toml b/Cargo.toml index f650e1bb..61c82d9f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ dirs = "4.0.0" dropshot = "0.13.0" env_logger = "0.10.2" expectorate = { version = "1.1.0", features = ["predicates"] } +flume = "0.11.1" futures = "0.3.31" httpmock = "0.7.0" humantime = "2" diff --git a/cli/src/cmd_disk.rs b/cli/src/cmd_disk.rs index aa9b39d7..0eeb0b69 100644 --- a/cli/src/cmd_disk.rs +++ b/cli/src/cmd_disk.rs @@ -101,7 +101,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport { .disk_import() .project(self.project.clone()) .description(self.description.clone()) - .upload_thread_ct(self.parallelism) + .upload_task_ct(self.parallelism) .disk(self.disk.clone()) .disk_info(disk_info.clone()); diff --git a/cli/tests/test_disk_import.rs b/cli/tests/test_disk_import.rs index 08688349..aedc330a 100644 --- a/cli/tests/test_disk_import.rs +++ b/cli/tests/test_disk_import.rs @@ -573,7 +573,7 @@ fn test_disk_write_import_fail() { }); let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap(); - let output = r#"(?m)\AErrors while uploading the disk image:\n \* Error Response: status: 503 Service Unavailable;.*$\n \* Error Response: status: 503 Service Unavailable;.*"#; + let output = r#"(?m)\AError while uploading the disk image:\n \* Error Response: status: 503 Service Unavailable;.*$"#; Command::cargo_bin("oxide") .unwrap() diff --git a/integration-tests/tests/test_extras.rs b/integration-tests/tests/test_extras.rs index 65708672..61376612 100644 --- a/integration-tests/tests/test_extras.rs +++ b/integration-tests/tests/test_extras.rs @@ -80,7 +80,7 @@ async fn test_disk_import() { .disk_import() .project("hi") .description("test extra") - .upload_thread_ct(1) + .upload_task_ct(1) .disk("test-import") .disk_info(disk_info.clone()) .execute() @@ -93,7 +93,7 @@ async fn test_disk_import() { .disk_import() .project("hi") .description("test extra") - .upload_thread_ct(8) + .upload_task_ct(8) .disk("test-import") .disk_info(disk_info.clone()) .execute_with_control() @@ -119,7 +119,7 @@ async fn test_disk_import() { .disk_import() .project("hi") .description("test extra") - .upload_thread_ct(8) + .upload_task_ct(8) .disk("test-import") .disk_info(disk_info.clone()) .execute_with_control() diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 9c5bcfd4..60aeea8c 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -12,6 +12,7 @@ base64 = { workspace = true } chrono = { workspace = true } clap = { workspace = true, optional = true } dirs = { workspace = true } +flume = { workspace = true } futures = { workspace = true } progenitor-client = { workspace = true } rand = { workspace = true } diff --git a/sdk/src/extras/disk.rs b/sdk/src/extras/disk.rs index 1b04f7a3..88f9dab4 100644 --- a/sdk/src/extras/disk.rs +++ b/sdk/src/extras/disk.rs @@ -2,7 +2,7 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -// Copyright 2024 Oxide Computer Company +// Copyright 2025 Oxide Computer Company use super::ClientExtraDiskExt; use crate::Client; @@ -19,8 +19,8 @@ pub mod builder { use crate::{Client, Error}; use std::future::Future; - use std::sync::atomic::{AtomicBool, AtomicU64}; - use std::sync::Arc; + use std::num::NonZeroUsize; + use std::sync::atomic::AtomicBool; use tokio::sync::{oneshot, watch}; /// Builder for [`ClientExtraDiskExt::disk_import`] @@ -30,7 +30,7 @@ pub mod builder { client: &'a Client, project: Result, description: Result, - upload_thread_ct: Result, + upload_task_ct: Result, disk: Result, disk_info: Result, image_info: Result, String>, @@ -42,7 +42,7 @@ pub mod builder { client, project: Err("project was not initialized".to_string()), description: Err("description was not initialized".to_string()), - upload_thread_ct: Err("upload_thread_ct was not initialized".to_string()), + upload_task_ct: Err("upload_task_ct was not initialized".to_string()), disk: Err("disk was not initialized".to_string()), disk_info: Err("disk_info was not initialized".to_string()), image_info: Ok(None), @@ -69,13 +69,13 @@ pub mod builder { self } - pub fn upload_thread_ct(mut self, value: V) -> Self + pub fn upload_task_ct(mut self, value: V) -> Self where - V: std::convert::TryInto, + V: std::convert::TryInto, { - self.upload_thread_ct = value - .try_into() - .map_err(|_| "conversion to `usize` for upload_thread_ct failed".to_string()); + self.upload_task_ct = value.try_into().map_err(|_| { + "conversion to `non-zero usize` for upload_task_ct failed".to_string() + }); self } @@ -144,22 +144,19 @@ pub mod builder { let project = builder.project.map_err(Error::InvalidRequest)?; let description = builder.description.map_err(Error::InvalidRequest)?; - let upload_thread_ct = builder.upload_thread_ct.map_err(Error::InvalidRequest)?; + let upload_task_ct = builder.upload_task_ct.map_err(Error::InvalidRequest)?; let disk = builder.disk.map_err(Error::InvalidRequest)?; let disk_info = builder.disk_info.map_err(Error::InvalidRequest)?; let image_info = builder.image_info.map_err(Error::InvalidRequest)?; - let upload_progress = Arc::new(AtomicU64::new(0)); - Ok(Self { client: builder.client, project, description, - upload_thread_ct, + upload_task_ct, disk, disk_info, image_info, - upload_progress, progress_tx, cleanup_started: AtomicBool::new(false), }) @@ -177,13 +174,14 @@ pub mod types { }; use base64::Engine; - use std::fs::{self, File}; - use std::io::{self, Read}; + use std::collections::HashSet; + use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; - use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; - use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; - use tokio::sync::{mpsc, oneshot, watch}; + use tokio::fs::File; + use tokio::io::{self, AsyncReadExt}; + use tokio::sync::{oneshot, watch}; // Upload to Nexus in 512k byte chunks const CHUNK_SIZE: u64 = 512 * 1024; @@ -249,11 +247,10 @@ pub mod types { pub client: &'a Client, pub project: NameOrId, pub description: String, - pub upload_thread_ct: usize, + pub upload_task_ct: NonZeroUsize, pub disk: Name, pub disk_info: DiskInfo, pub image_info: Option, - pub upload_progress: Arc, pub progress_tx: watch::Sender, pub cleanup_started: AtomicBool, } @@ -328,56 +325,55 @@ pub mod types { DiskImportError::context("starting the build write process failed", e) })?; - // Create one tokio task for each thread that will upload file chunks let mut handles: Vec>> = - Vec::with_capacity(self.upload_thread_ct); - let mut senders = Vec::with_capacity(self.upload_thread_ct); - - for _ in 0..self.upload_thread_ct { - let (tx, mut rx) = mpsc::channel(100); + Vec::with_capacity(self.upload_task_ct.get()); + let (tx, rx) = flume::bounded(64); + let (failed_tx, failed_rx) = flume::bounded(self.upload_task_ct.get()); + let (resubmit_tx, resubmit_rx) = flume::bounded(self.upload_task_ct.get()); + + for _ in 0..self.upload_task_ct.get() { + let mut worker = UploadWorker { + client: self.client.clone(), + disk: self.disk.clone(), + project: self.project.clone(), + progress_tx: self.progress_tx.clone(), + }; - let client = self.client.clone(); - let project = self.project.clone(); - let disk = self.disk.clone(); - let upload_progress = self.upload_progress.clone(); - let progress_tx = self.progress_tx.clone(); + let rx = rx.clone(); + let failed_tx = failed_tx.clone(); + let resubmit_rx = resubmit_rx.clone(); handles.push(tokio::spawn(async move { - while let Some((offset, base64_encoded_data, data_len)) = rx.recv().await { - client - .disk_bulk_write_import() - .disk(disk.clone()) - .project(project.clone()) - .body(ImportBlocksBulkWrite { - offset, - base64_encoded_data, - }) - .send() - .await?; - - upload_progress.fetch_add(data_len, Ordering::Relaxed); - progress_tx.send_replace(upload_progress.load(Ordering::Relaxed)); + while let Ok(chunk) = rx.recv_async().await { + if let Err(e) = worker.upload_chunk(&chunk).await { + let _ = failed_tx.send_async(chunk).await; + return Err(e); + } + } + + // Signal the main task that the worker has sent all chunks successfully and + // none remain to be sent. + drop(failed_tx); + + while let Ok(chunk) = resubmit_rx.recv_async().await { + worker.upload_chunk(&chunk).await?; } Ok(()) })); - - senders.push(tx); } + // Only worker tasks use this sender. + drop(failed_tx); - // Read chunks from the file in the file system and send them to the - // upload threads. - let mut file = File::open(&self.disk_info.file_path)?; - let mut i = 0; + let mut buf = Vec::with_capacity(CHUNK_SIZE as usize); + let mut file = File::open(&self.disk_info.file_path).await?; let mut offset = 0; let read_result: Result<(), DiskImportError> = loop { - let mut chunk = Vec::with_capacity(CHUNK_SIZE as usize); - - let n = match file.by_ref().take(CHUNK_SIZE).read_to_end(&mut chunk) { + let n = match (&mut file).take(CHUNK_SIZE).read_to_end(&mut buf).await { Ok(n) => n, Err(e) => { - break Err(DiskImportError::context( + return Err(DiskImportError::context( format!("reading from {} failed", self.disk_info.file_path.display()), e, )); @@ -388,39 +384,44 @@ pub mod types { break Ok(()); } - // If the chunk we just read is all zeroes, don't POST it. - if !chunk.iter().all(|x| *x == 0) { - let encoded = base64::engine::general_purpose::STANDARD.encode(&chunk[0..n]); - - if senders[i % self.upload_thread_ct] - .send((offset, encoded, n as u64)) + // If the chunk we just read is all zeroes, we don't need to upload it. + let data = &buf[..n]; + if !data.iter().all(|x| *x == 0) { + // Failure to send indicates that all upload tasks exited early + // due to errors on their end. We will return those errors below. + if tx + .send_async(Chunk { + offset, + data: data.to_vec(), + }) .await .is_err() { - // Failure to send indicates that the upload task exited early - // due to an error on its end. We will return that error below. break Ok(()); } } else { - // Bump the progress bar here to make it consistent - self.upload_progress.fetch_add(n as u64, Ordering::Relaxed); - self.progress_tx - .send_replace(self.upload_progress.load(Ordering::Relaxed)); + // Bump the progress bar here. We don't need to send this chunk so we've completed the relevant work. + self.progress_tx.send_modify(|offset| *offset += n as u64); } - offset += CHUNK_SIZE; - i += 1; + offset += n as u64; + buf.clear(); }; + drop(tx); - for tx in senders { - drop(tx); + // Resubmit any failed chunks back to the remaining workers to retry. + while let Ok(failed_chunk) = failed_rx.recv_async().await { + if resubmit_tx.send_async(failed_chunk).await.is_err() { + // All worker tasks have failed. + break; + } } + drop(resubmit_tx); let mut errors = Vec::new(); if let Err(e) = read_result { errors.push(e); } - for handle in handles { let result = handle.await.map_err(DiskImportError::other)?; if let Err(err) = result { @@ -428,21 +429,24 @@ pub mod types { } } - match errors.len() { - 1 => { - return Err(DiskImportError::context( - "Error while uploading the disk image", - errors.remove(0), - )) + // Only return an error if all worker tasks failed. + if errors.len() == self.upload_task_ct.get() { + // Dedupe error messages. + let mut err_set = HashSet::new(); + for err in errors { + err_set.insert(format!("\n * {err}")); } - 2.. => { - let mut msg = String::from("Errors while uploading the disk image:"); - for err in errors { - msg += &format!("\n * {err}"); - } - return Err(DiskImportError::Other(msg.into())); + + let mut msg = match err_set.len() { + 1 => String::from("Error while uploading the disk image:"), + 2.. => String::from("Errors while uploading the disk image:"), + 0 => unreachable!("error count was zero"), + }; + + for err in err_set { + msg += &err; } - 0 => {} + return Err(DiskImportError::Other(msg.into())); } // Stop the bulk write process @@ -704,7 +708,7 @@ pub mod types { ))); } - let file_size = fs::metadata(&file_path)?.len(); + let file_size = std::fs::metadata(&file_path)?.len(); let disk_size = Self::get_disk_size(file_size, requested_disk_size.map(|x| **x)).into(); let disk_block_size = match requested_disk_block_size { @@ -796,6 +800,39 @@ pub mod types { pub image_version: String, } + struct Chunk { + offset: u64, + data: Vec, + } + + struct UploadWorker { + client: Client, + disk: Name, + project: NameOrId, + progress_tx: watch::Sender, + } + + impl UploadWorker { + async fn upload_chunk(&mut self, chunk: &Chunk) -> Result<(), DiskImportError> { + let base64_encoded_data = base64::engine::general_purpose::STANDARD.encode(&chunk.data); + self.client + .disk_bulk_write_import() + .disk(&*self.disk) + .project(&self.project) + .body(ImportBlocksBulkWrite { + offset: chunk.offset, + base64_encoded_data, + }) + .send() + .await?; + + self.progress_tx + .send_modify(|offset| *offset += chunk.data.len() as u64); + + Ok(()) + } + } + #[cfg(test)] mod tests { use super::*; diff --git a/sdk/src/extras/mod.rs b/sdk/src/extras/mod.rs index 5a233a65..1d2b9ba4 100644 --- a/sdk/src/extras/mod.rs +++ b/sdk/src/extras/mod.rs @@ -25,7 +25,7 @@ pub trait ClientExtraDiskExt { /// Arguments: /// - `project`: Name or ID of the project /// - `description`: Human-readable free-form text about the disk - /// - `upload_thread_ct`: The number of threads used to upload the disk + /// - `upload_task_ct`: The number of tokio tasks used to upload the disk /// - `disk`: Name of the disk /// - `disk_info`: Information needed to construct the disk /// - `image_info`: Information needed to construct a snapshot and image, optional @@ -36,7 +36,7 @@ pub trait ClientExtraDiskExt { /// client.disk_import() /// .project(project) /// .description(description) - /// .upload_thread_ct(upload_thread_ct) + /// .upload_task_ct(upload_task_ct) /// .disk(disk) /// .disk_info(disk_info) /// .image_info(image_info) @@ -53,7 +53,7 @@ pub trait ClientExtraDiskExt { /// let (import_future, handle) = client.disk_import() /// .project(project) /// .description(description) - /// .upload_thread_ct(upload_thread_ct) + /// .upload_task_ct(upload_task_ct) /// .disk(disk) /// .disk_info(disk_info) /// .image_info(image_info)