From 80892c5c5d84401856965280cfb1a2fbd58ddd3d Mon Sep 17 00:00:00 2001 From: nardoor <102725206+nardoor@users.noreply.github.com> Date: Mon, 18 Nov 2024 12:29:36 +0100 Subject: [PATCH] refactor(backend): migrate from `backoff` to `backon` (#356) Hello, Removed the unmaintained `backoff` crate in the `rustic_backend` crate. Replaced it with the relatively similar `backon` crate. If `backon` as simpler API, it also lacks a few features that `backoff` had. For instance: `no_max_delay` or `backoff::retry_notify`. To make up for it: - I opened https://github.com/Xuanwo/backon/pull/160 on the `backon` github. - I implemented an internal module `backon_extension` inside `backend::rest`. Let me know what you think of the code! Fixes https://github.com/rustic-rs/rustic_core/issues/351 --------- Co-authored-by: Alexander Weiss Co-authored-by: simonsan <14062932+simonsan@users.noreply.github.com> --- Cargo.lock | 22 +-- crates/backend/Cargo.toml | 4 +- crates/backend/src/rest.rs | 285 ++++++++++++++----------------------- 3 files changed, 106 insertions(+), 205 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e0e9370f..9145600f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -349,17 +349,6 @@ dependencies = [ "paste", ] -[[package]] -name = "backoff" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" -dependencies = [ - "getrandom", - "instant", - "rand", -] - [[package]] name = "backon" version = "1.2.0" @@ -2083,15 +2072,6 @@ dependencies = [ "similar", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - [[package]] name = "integer-sqrt" version = "0.1.5" @@ -3585,7 +3565,7 @@ dependencies = [ "aho-corasick", "anyhow", "aws-lc-sys", - "backoff", + "backon", "bytes", "bytesize", "clap", diff --git a/crates/backend/Cargo.toml b/crates/backend/Cargo.toml index 335c9d27..fbe420c1 100644 --- a/crates/backend/Cargo.toml +++ b/crates/backend/Cargo.toml @@ -42,7 +42,7 @@ opendal = [ "tokio/rt-multi-thread", "dep:typed-path", ] -rest = ["dep:reqwest", "dep:backoff"] +rest = ["dep:reqwest", "dep:backon"] rclone = ["rest", "dep:rand", "dep:semver"] [dependencies] @@ -78,7 +78,7 @@ aho-corasick = { workspace = true } walkdir = "2.5.0" # rest backend -backoff = { version = "0.4.0", optional = true } +backon = { version = "1.2.0", optional = true } reqwest = { version = "0.12.8", default-features = false, features = ["json", "rustls-tls-native-roots", "stream", "blocking"], optional = true } # rclone backend diff --git a/crates/backend/src/rest.rs b/crates/backend/src/rest.rs index 2293afa6..5d5841e9 100644 --- a/crates/backend/src/rest.rs +++ b/crates/backend/src/rest.rs @@ -1,11 +1,11 @@ use std::str::FromStr; use std::time::Duration; -use backoff::{backoff::Backoff, ExponentialBackoff, ExponentialBackoffBuilder}; +use backon::{BlockingRetryable, ExponentialBuilder}; use bytes::Bytes; use log::{trace, warn}; use reqwest::{ - blocking::{Client, ClientBuilder, Response}, + blocking::{Client, ClientBuilder}, header::{HeaderMap, HeaderValue}, Url, }; @@ -28,80 +28,12 @@ pub(super) mod constants { pub(super) const DEFAULT_TIMEOUT: Duration = Duration::from_secs(600); } -// trait CheckError to add user-defined method check_error on Response -pub(crate) trait CheckError { - /// Check reqwest Response for error and treat errors as permanent or transient - fn check_error(self) -> Result>; -} - -impl CheckError for Response { - /// Check reqwest Response for error and treat errors as permanent or transient - /// - /// # Errors - /// - /// If the response is an error, it will return an error of type Error - /// - /// # Returns - /// - /// The response if it is not an error - fn check_error(self) -> Result> { - match self.error_for_status() { - Ok(t) => Ok(t), - // Note: status() always give Some(_) as it is called from a Response - Err(err) if err.status().unwrap().is_client_error() => { - Err(backoff::Error::Permanent(err)) - } - Err(err) => Err(backoff::Error::Transient { - err, - retry_after: None, - }), - } - } -} - -/// A backoff implementation that limits the number of retries -#[derive(Clone, Debug)] -struct LimitRetryBackoff { - /// The maximum number of retries - max_retries: usize, - /// The current number of retries - retries: usize, - /// The exponential backoff - exp: ExponentialBackoff, -} - -impl Default for LimitRetryBackoff { - fn default() -> Self { - Self { - max_retries: constants::DEFAULT_RETRY, - retries: 0, - exp: ExponentialBackoffBuilder::new() - .with_max_elapsed_time(None) // no maximum elapsed time; we count number of retires - .build(), - } - } -} - -impl Backoff for LimitRetryBackoff { - /// Returns the next backoff duration. - /// - /// # Notes - /// - /// If the number of retries exceeds the maximum number of retries, it returns None. - fn next_backoff(&mut self) -> Option { - self.retries += 1; - if self.retries > self.max_retries { - None - } else { - self.exp.next_backoff() - } - } - - /// Resets the backoff to the initial state. - fn reset(&mut self) { - self.retries = 0; - self.exp.reset(); - } +fn construct_backoff_error(err: reqwest::Error) -> Box { + RusticError::with_source( + ErrorKind::Backend, + "Backoff failed, please check the logs for more information.", + err, + ) } /// A backend implementation that uses REST to access the backend. @@ -111,24 +43,37 @@ pub struct RestBackend { url: Url, /// The client to use. client: Client, - /// The backoff implementation to use. - backoff: LimitRetryBackoff, -} - -/// Notify function for backoff in case of error -/// -/// # Arguments -/// -/// * `err` - The error that occurred -/// * `duration` - The duration of the backoff -// We need to pass the error by value to satisfy the signature of the notify function -// for handling errors in backoff -#[allow(clippy::needless_pass_by_value)] -fn notify(err: reqwest::Error, duration: Duration) { - warn!("Error {err} at {duration:?}, retrying"); + /// The ``BackoffBuilder`` we use + backoff: ExponentialBuilder, } impl RestBackend { + /// Call the given operation retrying non-permanent errors and giving warnings for failed operations + /// + /// ## Permanent/non-permanent errors + /// + /// - `client_error` are considered permanent + /// - others are not, and are subject to retry + /// + /// ## Returns + /// + /// The operation result + /// or the last error (permanent or not) that occurred. + fn retry_notify(&self, op: F) -> Result + where + F: FnMut() -> Result, + { + op.retry(self.backoff) + .when(|err| { + err.status().map_or( + true, // retry + |status_code| !status_code.is_client_error(), // do not retry if `is_client_error` + ) + }) + .notify(|err, duration| warn!("Error {err} at {duration:?}, retrying")) + .call() + } + /// Create a new [`RestBackend`] from a given url. /// /// # Arguments @@ -169,7 +114,12 @@ impl RestBackend { .map_err(|err| { RusticError::with_source(ErrorKind::Backend, "Failed to build HTTP client", err) })?; - let mut backoff = LimitRetryBackoff::default(); + + // backon doesn't allow us to specify `None` for `max_delay` + // see + let mut backoff = ExponentialBuilder::default() + .with_max_delay(Duration::MAX) // no maximum elapsed time; we count number of retries + .with_max_times(constants::DEFAULT_RETRY); // FIXME: If we have multiple times the same option, this could lead to unexpected behavior for (option, value) in options { @@ -187,7 +137,7 @@ impl RestBackend { .attach_context("option", "retry") })?, }; - backoff.max_retries = max_retries; + backoff = backoff.with_max_times(max_retries); } else if option == "timeout" { let timeout = humantime::Duration::from_str(&value).map_err(|err| { RusticError::with_source( @@ -299,37 +249,34 @@ impl ReadBackend for RestBackend { .attach_context("tpe_dir", tpe.dirname().to_string()) })?; - backoff::retry_notify( - self.backoff.clone(), - || { - if tpe == FileType::Config { - return Ok( - if self.client.head(url.clone()).send()?.status().is_success() { - vec![(Id::default(), 0)] - } else { - Vec::new() - }, - ); - } - - let list = self - .client - .get(url.clone()) - .header("Accept", "application/vnd.x.restic.rest.v2") - .send()? - .check_error()? - .json::>>()? // use Option to be handle null json value - .unwrap_or_default(); - Ok(list - .into_iter() - .filter_map(|i| match i.name.parse::() { - Ok(id) => Some((id, i.size)), - Err(_) => None, - }) - .collect()) - }, - notify, - ) + self.retry_notify(|| { + if tpe == FileType::Config { + return Ok( + if self.client.head(url.clone()).send()?.status().is_success() { + vec![(Id::default(), 0)] + } else { + Vec::new() + }, + ); + } + + let list = self + .client + .get(url.clone()) + .header("Accept", "application/vnd.x.restic.rest.v2") + .send()? + .error_for_status()? + .json::>>()? // use Option to be handle null json value + .unwrap_or_default(); + + Ok(list + .into_iter() + .filter_map(|i| match i.name.parse::() { + Ok(id) => Some((id, i.size)), + Err(_) => None, + }) + .collect()) + }) .map_err(construct_backoff_error) } @@ -351,18 +298,13 @@ impl ReadBackend for RestBackend { .url(tpe, id) .map_err(|err| construct_join_url_error(err, tpe, id, &self.url))?; - backoff::retry_notify( - self.backoff.clone(), - || { - Ok(self - .client - .get(url.clone()) - .send()? - .check_error()? - .bytes()?) - }, - notify, - ) + self.retry_notify(|| { + self.client + .get(url.clone()) + .send()? + .error_for_status()? + .bytes() + }) .map_err(construct_backoff_error) } @@ -398,31 +340,18 @@ impl ReadBackend for RestBackend { .attach_context("id", id.to_string()) })?; - backoff::retry_notify( - self.backoff.clone(), - || { - Ok(self - .client - .get(url.clone()) - .header("Range", header_value.clone()) - .send()? - .check_error()? - .bytes()?) - }, - notify, - ) + self.retry_notify(|| { + self.client + .get(url.clone()) + .header("Range", header_value.clone()) + .send()? + .error_for_status()? + .bytes() + }) .map_err(construct_backoff_error) } } -fn construct_backoff_error(err: backoff::Error) -> Box { - RusticError::with_source( - ErrorKind::Backend, - "Backoff failed, please check the logs for more information.", - err, - ) -} - fn construct_join_url_error( err: JoiningUrlFailedError, tpe: FileType, @@ -453,14 +382,10 @@ impl WriteBackend for RestBackend { .attach_context("join_input", "?create=true") })?; - backoff::retry_notify( - self.backoff.clone(), - || { - _ = self.client.post(url.clone()).send()?.check_error()?; - Ok(()) - }, - notify, - ) + self.retry_notify(|| { + _ = self.client.post(url.clone()).send()?.error_for_status()?; + Ok(()) + }) .map_err(construct_backoff_error) } @@ -492,15 +417,15 @@ impl WriteBackend for RestBackend { ) .body(buf); - backoff::retry_notify( - self.backoff.clone(), - || { - // Note: try_clone() always gives Some(_) as the body is Bytes which is cloneable - _ = req_builder.try_clone().unwrap().send()?.check_error()?; - Ok(()) - }, - notify, - ) + self.retry_notify(|| { + // Note: try_clone() always gives Some(_) as the body is Bytes which is cloneable + _ = req_builder + .try_clone() + .unwrap() + .send()? + .error_for_status()?; + Ok(()) + }) .map_err(construct_backoff_error) } @@ -521,14 +446,10 @@ impl WriteBackend for RestBackend { .url(tpe, id) .map_err(|err| construct_join_url_error(err, tpe, id, &self.url))?; - backoff::retry_notify( - self.backoff.clone(), - || { - _ = self.client.delete(url.clone()).send()?.check_error()?; - Ok(()) - }, - notify, - ) + self.retry_notify(|| { + _ = self.client.delete(url.clone()).send()?.error_for_status()?; + Ok(()) + }) .map_err(construct_backoff_error) } }