Skip to content

Commit

Permalink
Merge pull request #414 from divviup/timg/http-client-retries
Browse files Browse the repository at this point in the history
Retries in HTTP clients
  • Loading branch information
tgeoghegan authored Aug 24, 2022
2 parents 8962bdc + fd7d8d8 commit 89f7fe7
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 22 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions janus_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository = "https://github.com/divviup/janus"
rust-version = "1.63"

[dependencies]
backoff = { version = "0.4.0", features = ["tokio"] }
derivative = "2.2.0"
http = "0.2.8"
janus_core = { version = "0.1", path = "../janus_core" }
Expand Down
60 changes: 46 additions & 14 deletions janus_client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
//! PPM protocol client
use backoff::ExponentialBackoff;
use derivative::Derivative;
use http::{header::CONTENT_TYPE, StatusCode};
use janus_core::{
hpke::associated_data_for_report_share,
hpke::{self, HpkeApplicationInfo, Label},
message::{Duration, HpkeCiphertext, HpkeConfig, Nonce, Report, Role, TaskId},
retries::{http_request_exponential_backoff, retry_http_request},
time::Clock,
};
use prio::{
Expand Down Expand Up @@ -57,14 +59,31 @@ pub struct ClientParameters {
/// The minimum batch duration of the task. This value is shared by all
/// parties in the protocol, and is used to compute report nonces.
min_batch_duration: Duration,
/// Parameters to use when retrying HTTP requests.
http_request_retry_parameters: ExponentialBackoff,
}

impl ClientParameters {
/// Creates a new set of client task parameters.
pub fn new(
task_id: TaskId,
aggregator_endpoints: Vec<Url>,
min_batch_duration: Duration,
) -> Self {
Self::new_with_backoff(
task_id,
aggregator_endpoints,
min_batch_duration,
http_request_exponential_backoff(),
)
}

/// Creates a new set of client task parameters with non-default HTTP request retry parameters.
pub fn new_with_backoff(
task_id: TaskId,
mut aggregator_endpoints: Vec<Url>,
min_batch_duration: Duration,
http_request_retry_parameters: ExponentialBackoff,
) -> Self {
// Ensure provided aggregator endpoints end with a slash, as we will be joining additional
// path segments into these endpoints & the Url::join implementation is persnickety about
Expand All @@ -79,6 +98,7 @@ impl ClientParameters {
task_id,
aggregator_endpoints,
min_batch_duration,
http_request_retry_parameters,
}
}

Expand Down Expand Up @@ -122,7 +142,12 @@ pub async fn aggregator_hpke_config(
) -> Result<HpkeConfig, Error> {
let mut request_url = client_parameters.hpke_config_endpoint(aggregator_role)?;
request_url.set_query(Some(&format!("task_id={}", task_id)));
let hpke_config_response = http_client.get(request_url).send().await?;
let hpke_config_response = retry_http_request(
client_parameters.http_request_retry_parameters.clone(),
|| async { http_client.get(request_url.clone()).send().await },
)
.await
.or_else(|e| e)?;
let status = hpke_config_response.status();
if !status.is_success() {
return Err(Error::Http(status));
Expand Down Expand Up @@ -217,21 +242,26 @@ where
))
}

/// Upload a [`janus_core::message::Report`] to the leader, per §4.3.2 of
/// draft-gpew-priv-ppm. The provided measurement is sharded into one input
/// share plus one proof share for each aggregator and then uploaded to the
/// leader.
/// Upload a [`janus_core::message::Report`] to the leader, per §4.3.2 of draft-gpew-priv-ppm.
/// The provided measurement is sharded into one input share plus one proof share for each
/// aggregator and then uploaded to the leader.
#[tracing::instrument(skip(measurement), err)]
pub async fn upload(&self, measurement: &V::Measurement) -> Result<(), Error> {
let report = self.prepare_report(measurement)?;

let upload_response = self
.http_client
.post(self.parameters.upload_endpoint()?)
.header(CONTENT_TYPE, Report::MEDIA_TYPE)
.body(report.get_encoded())
.send()
.await?;
let upload_endpoint = self.parameters.upload_endpoint()?;
let upload_response = retry_http_request(
self.parameters.http_request_retry_parameters.clone(),
|| async {
self.http_client
.post(upload_endpoint.clone())
.header(CONTENT_TYPE, Report::MEDIA_TYPE)
.body(report.get_encoded())
.send()
.await
},
)
.await
.or_else(|e| e)?;
let status = upload_response.status();
if !status.is_success() {
// TODO(#233): decode an RFC 7807 problem document
Expand All @@ -249,6 +279,7 @@ mod tests {
use janus_core::{
hpke::test_util::generate_test_hpke_config_and_private_key,
message::{TaskId, Time},
retries::test_http_request_exponential_backoff,
test_util::install_test_trace_subscriber,
time::MockClock,
};
Expand All @@ -262,10 +293,11 @@ mod tests {
{
let server_url = Url::parse(&mockito::server_url()).unwrap();
Client::new(
ClientParameters::new(
ClientParameters::new_with_backoff(
TaskId::random(),
Vec::from([server_url.clone(), server_url]),
Duration::from_seconds(1),
test_http_request_exponential_backoff(),
),
vdaf_client,
MockClock::default(),
Expand Down
10 changes: 6 additions & 4 deletions janus_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@ rust-version = "1.63"
database = ["dep:bytes", "dep:postgres-protocol", "dep:postgres-types"]
test-util = [
"dep:assert_matches",
"dep:futures",
"dep:kube",
"dep:k8s-openapi",
"dep:serde_json",
"dep:tempfile",
"dep:tracing",
"dep:tracing-log",
"dep:tracing-subscriber",
"tokio/macros",
Expand All @@ -27,10 +25,12 @@ test-util = [

[dependencies]
anyhow = "1"
backoff = { version = "0.4.0", features = ["tokio"] }
base64 = "0.13.0"
bytes = { version = "1.2.1", optional = true }
chrono = "0.4"
derivative = "2.2.0"
futures = "0.3.23"
hex = "0.4"
hpke-dispatch = "0.3.0"
kube = { version = "0.65", optional = true, default-features = false, features = ["client", "rustls-tls"] }
Expand All @@ -40,17 +40,17 @@ postgres-protocol = { version = "0.6.4", optional = true }
postgres-types = { version = "0.2.4", optional = true }
prio = "0.8.2"
rand = "0.8"
reqwest = { version = "0.11.4", default-features = false, features = ["rustls-tls"] }
ring = "0.16.20"
serde = { version = "1.0.144", features = ["derive"] }
thiserror = "1.0"
tokio = { version = "^1.20", features = ["macros", "net", "rt"] }
tracing = "0.1.36"

# Dependencies required only if feature "test-util" is enabled
assert_matches = { version = "1", optional = true }
serde_json = { version = "1.0.85", optional = true }
futures = { version = "0.3.23", optional = true }
tempfile = { version = "3", optional = true }
tracing = { version = "0.1.36", optional = true }
tracing-log = { version = "0.1.3", optional = true }
tracing-subscriber = { version = "0.3", features = ["std", "env-filter", "fmt"], optional = true }

Expand All @@ -62,3 +62,5 @@ janus_core = { path = ".", features = ["test-util"] }
# lack of support for connecting to servers by IP addresses, which affects many
# Kubernetes clusters.
kube = { version = "0.65", features = ["openssl-tls"] } # ensure this remains compatible with the non-dev dependency
mockito = "0.31.0"
url = "2.2.2"
1 change: 1 addition & 0 deletions janus_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use tokio::task::JoinHandle;

pub mod hpke;
pub mod message;
pub mod retries;
pub mod task;
#[cfg(feature = "test-util")]
pub mod test_util;
Expand Down
Loading

0 comments on commit 89f7fe7

Please sign in to comment.