Skip to content

Commit

Permalink
Allow task failure on disk imports (#1002)
Browse files Browse the repository at this point in the history
* Allow task failure on disk imports

Previously, when `oxide` initiated many concurrent uploads, network
congestion could cause packet loss and timeouts for some worker tasks.
While we added the undocumented `--parallelism` flag as a temporary
workaround, this required users to manually diagnose network issues
and adjust settings themselves.

This commit attempts to make disk imports more reliable by allowing the
job to continue when a subset of upload tasks have failed. When a worker
encounters a network error, it reports the failed chunk's file offset
back to the main task. The main task will retry these failed chunks
after completing the initial upload attempt. This creates natural
backpressure - as network congestion increases and more tasks fail, the
number of concurrent uploads automatically decreases to a sustainable
level.

This requires a switch to a single `mpmc` channel shared between all
worker tasks to avoid losing any buffered jobs when a task errors out.

While we're at it, clean up disk imports with:

* Replace synchronous file i/o with tokio fs

* Remove redundant atomic progress variable

The `watch` channel gives us a thread-safe way to communicate changes
between tasks. Just use the `send_modify` method to increment the value
and get rid of the unneeded `AtomicU64`.

* Dedupe error messages

Worker tasks are likely to encounter the same error when uploading
chunks. Rather than listing the same error N times, dedupe them and show
only unique errors.

* Make upload_thread_ct NonZeroUsize

It is invalid to set upload_thread_ct to zero. Update the API to enforce
this constraint.

* Rename upload_thread_ct to upload_task_ct

The `upload_thread_ct` variable name is inaccurate, as we're creating
tokio tasks to perform uploads, not OS threads.

Correct the name to `upload_task_ct`.

* Update copyright year

---------

Co-authored-by: Adam Leventhal <[email protected]>
  • Loading branch information
wfchandler and ahl authored Feb 10, 2025
1 parent e633564 commit 914cc4f
Show file tree
Hide file tree
Showing 8 changed files with 160 additions and 96 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dirs = "4.0.0"
dropshot = "0.13.0"
env_logger = "0.10.2"
expectorate = { version = "1.1.0", features = ["predicates"] }
flume = "0.11.1"
futures = "0.3.31"
httpmock = "0.7.0"
humantime = "2"
Expand Down
2 changes: 1 addition & 1 deletion cli/src/cmd_disk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl crate::AuthenticatedCmd for CmdDiskImport {
.disk_import()
.project(self.project.clone())
.description(self.description.clone())
.upload_thread_ct(self.parallelism)
.upload_task_ct(self.parallelism)
.disk(self.disk.clone())
.disk_info(disk_info.clone());

Expand Down
2 changes: 1 addition & 1 deletion cli/tests/test_disk_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ fn test_disk_write_import_fail() {
});

let test_file = Testfile::new_random(CHUNK_SIZE * 2).unwrap();
let output = r#"(?m)\AErrors while uploading the disk image:\n \* Error Response: status: 503 Service Unavailable;.*$\n \* Error Response: status: 503 Service Unavailable;.*"#;
let output = r#"(?m)\AError while uploading the disk image:\n \* Error Response: status: 503 Service Unavailable;.*$"#;

Command::cargo_bin("oxide")
.unwrap()
Expand Down
6 changes: 3 additions & 3 deletions integration-tests/tests/test_extras.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ async fn test_disk_import() {
.disk_import()
.project("hi")
.description("test extra")
.upload_thread_ct(1)
.upload_task_ct(1)
.disk("test-import")
.disk_info(disk_info.clone())
.execute()
Expand All @@ -93,7 +93,7 @@ async fn test_disk_import() {
.disk_import()
.project("hi")
.description("test extra")
.upload_thread_ct(8)
.upload_task_ct(8)
.disk("test-import")
.disk_info(disk_info.clone())
.execute_with_control()
Expand All @@ -119,7 +119,7 @@ async fn test_disk_import() {
.disk_import()
.project("hi")
.description("test extra")
.upload_thread_ct(8)
.upload_task_ct(8)
.disk("test-import")
.disk_info(disk_info.clone())
.execute_with_control()
Expand Down
1 change: 1 addition & 0 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ base64 = { workspace = true }
chrono = { workspace = true }
clap = { workspace = true, optional = true }
dirs = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
progenitor-client = { workspace = true }
rand = { workspace = true }
Expand Down
Loading

0 comments on commit 914cc4f

Please sign in to comment.