Skip to content

Commit

Permalink
Set HTTP client timeouts (#2632)
Browse files Browse the repository at this point in the history
* client, collector: default timeouts for requests

Apply default timeouts (10 s to connect, 30 s overall) to HTTP requests
made by the client and collector crates. Additionally, we remove
`ClientParameters::new_with_backoff`: since that struct was private to
the crate anyway, we can just write directly to its
`http_request_retry_parameters` field, and `ClientBuilder::with_backoff`
provides a public API affordance for achieving this.

Resolves #426

* Apply timeouts to leader->helper HTTP requests

Set default timeouts for HTTP requests made from leader to helper, and
wire up config options for them.
  • Loading branch information
tgeoghegan authored Feb 7, 2024
1 parent 1e5cc1b commit 8baa237
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 22 deletions.
10 changes: 10 additions & 0 deletions aggregator/src/binaries/aggregation_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
let aggregation_job_driver = Arc::new(AggregationJobDriver::new(
reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.timeout(Duration::from_secs(
ctx.config.job_driver_config.http_request_timeout_secs,
))
.connect_timeout(Duration::from_secs(
ctx.config
.job_driver_config
.http_request_connection_timeout_secs,
))
.build()
.context("couldn't create HTTP client")?,
&ctx.meter,
Expand Down Expand Up @@ -152,6 +160,8 @@ mod tests {
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
maximum_attempts_before_failure: 5,
http_request_timeout_secs: 10,
http_request_connection_timeout_secs: 30,
},
batch_aggregation_shard_count: 32,
taskprov_config: TaskprovConfig::default(),
Expand Down
10 changes: 10 additions & 0 deletions aggregator/src/binaries/collection_job_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pub async fn main_callback(ctx: BinaryContext<RealClock, Options, Config>) -> Re
let collection_job_driver = Arc::new(CollectionJobDriver::new(
reqwest::Client::builder()
.user_agent(CLIENT_USER_AGENT)
.timeout(Duration::from_secs(
ctx.config.job_driver_config.http_request_timeout_secs,
))
.connect_timeout(Duration::from_secs(
ctx.config
.job_driver_config
.http_request_connection_timeout_secs,
))
.build()
.context("couldn't create HTTP client")?,
&ctx.meter,
Expand Down Expand Up @@ -148,6 +156,8 @@ mod tests {
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
maximum_attempts_before_failure: 5,
http_request_timeout_secs: 10,
http_request_connection_timeout_secs: 30,
},
batch_aggregation_shard_count: 32,
})
Expand Down
21 changes: 21 additions & 0 deletions aggregator/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,24 @@ pub struct JobDriverConfig {
/// The number of attempts to drive a work item before it is placed in a permanent failure
/// state.
pub maximum_attempts_before_failure: usize,
/// Timeout to apply when establishing connections to the helper for HTTP requests. See
/// [`reqwest::ClientBuilder::connect_timeout`] for details.
#[serde(default = "JobDriverConfig::default_http_connection_timeout")]
pub http_request_connection_timeout_secs: u64,
/// Timeout to apply to HTTP requests overall (including connection establishment) when
/// communicating with the helper. See [`reqwest::ClientBuilder::timeout`] for details.
#[serde(default = "JobDriverConfig::default_http_request_timeout")]
pub http_request_timeout_secs: u64,
}

impl JobDriverConfig {
fn default_http_connection_timeout() -> u64 {
10
}

fn default_http_request_timeout() -> u64 {
30
}
}

#[cfg(feature = "test-util")]
Expand Down Expand Up @@ -251,6 +269,9 @@ mod tests {
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
maximum_attempts_before_failure: 5,
http_request_connection_timeout_secs: JobDriverConfig::default_http_connection_timeout(
),
http_request_timeout_secs: JobDriverConfig::default_http_request_timeout(),
})
}

Expand Down
4 changes: 4 additions & 0 deletions aggregator/tests/integration/graceful_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,8 @@ async fn aggregation_job_driver_shutdown() {
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
maximum_attempts_before_failure: 5,
http_request_timeout_secs: 10,
http_request_connection_timeout_secs: 30,
},
taskprov_config: TaskprovConfig { enabled: false },
batch_aggregation_shard_count: 32,
Expand Down Expand Up @@ -346,6 +348,8 @@ async fn collection_job_driver_shutdown() {
worker_lease_duration_secs: 600,
worker_lease_clock_skew_allowance_secs: 60,
maximum_attempts_before_failure: 5,
http_request_timeout_secs: 10,
http_request_connection_timeout_secs: 30,
},
batch_aggregation_shard_count: 32,
};
Expand Down
31 changes: 9 additions & 22 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,30 +86,13 @@ impl ClientParameters {
leader_aggregator_endpoint: Url,
helper_aggregator_endpoint: Url,
time_precision: Duration,
) -> Self {
Self::new_with_backoff(
task_id,
leader_aggregator_endpoint,
helper_aggregator_endpoint,
time_precision,
http_request_exponential_backoff(),
)
}

/// Creates a new set of client task parameters with non-default HTTP request retry parameters.
pub fn new_with_backoff(
task_id: TaskId,
leader_aggregator_endpoint: Url,
helper_aggregator_endpoint: Url,
time_precision: Duration,
http_request_retry_parameters: ExponentialBackoff,
) -> Self {
Self {
task_id,
leader_aggregator_endpoint: url_ensure_trailing_slash(leader_aggregator_endpoint),
helper_aggregator_endpoint: url_ensure_trailing_slash(helper_aggregator_endpoint),
time_precision,
http_request_retry_parameters,
http_request_retry_parameters: http_request_exponential_backoff(),
}
}

Expand Down Expand Up @@ -192,6 +175,10 @@ async fn aggregator_hpke_config(
/// Construct a [`reqwest::Client`] suitable for use in a DAP [`Client`].
pub fn default_http_client() -> Result<reqwest::Client, Error> {
Ok(reqwest::Client::builder()
// Clients wishing to override these timeouts may provide their own
// values using ClientBuilder::with_http_client.
.timeout(std::time::Duration::from_secs(30))
.connect_timeout(std::time::Duration::from_secs(10))
.user_agent(CLIENT_USER_AGENT)
.build()?)
}
Expand Down Expand Up @@ -693,13 +680,13 @@ mod tests {
let mut server = mockito::Server::new_async().await;
let server_url = Url::parse(&server.url()).unwrap();
let http_client = &default_http_client().unwrap();
let client_parameters = ClientParameters::new_with_backoff(
let mut client_parameters = ClientParameters::new(
random(),
server_url.clone(),
server_url,
Duration::from_seconds(1),
test_http_request_exponential_backoff(),
);
client_parameters.http_request_retry_parameters = test_http_request_exponential_backoff();

let keypair = generate_test_hpke_config_and_private_key();
let hpke_config_list = HpkeConfigList::new(Vec::from([keypair.config().clone()]));
Expand Down Expand Up @@ -730,13 +717,13 @@ mod tests {
let mut server = mockito::Server::new_async().await;
let server_url = Url::parse(&server.url()).unwrap();
let http_client = &default_http_client().unwrap();
let client_parameters = ClientParameters::new_with_backoff(
let mut client_parameters = ClientParameters::new(
random(),
server_url.clone(),
server_url,
Duration::from_seconds(1),
test_http_request_exponential_backoff(),
);
client_parameters.http_request_retry_parameters = test_http_request_exponential_backoff();

let encoded_bad_hpke_config = hex!(
"64" // HpkeConfigId
Expand Down
4 changes: 4 additions & 0 deletions collector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ static COLLECTOR_USER_AGENT: &str = concat!(
/// Construct a [`reqwest::Client`] suitable for use in a DAP [`Collector`].
pub fn default_http_client() -> Result<reqwest::Client, Error> {
Ok(reqwest::Client::builder()
// Clients may override default timeouts using
// CollectorBuilder::with_http_client
.timeout(StdDuration::from_secs(30))
.connect_timeout(StdDuration::from_secs(10))
.user_agent(COLLECTOR_USER_AGENT)
.build()?)
}
Expand Down
11 changes: 11 additions & 0 deletions docs/samples/advanced_config/aggregation_job_driver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ worker_lease_clock_skew_allowance_secs: 60
# (required)
maximum_attempts_before_failure: 10

# Timeout to apply when establishing connections to the helper for HTTP requests, in seconds. See
# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.connect_timeout for
# details. (optional; defaults to 10 seconds)
http_request_connection_timeout_secs: 10

# Timeout to apply to HTTP requests overall (including connection establishment) when communicating
# with the helper. See
# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout for details.
# (optional; defaults to 30 seconds)
http_request_timeout_secs: 30

# Number of sharded database records per batch aggregation. Must not be greater
# than the equivalent setting in the collection job driver. (required)
batch_aggregation_shard_count: 32
Expand Down
11 changes: 11 additions & 0 deletions docs/samples/advanced_config/collection_job_driver.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ worker_lease_clock_skew_allowance_secs: 60
# (required)
maximum_attempts_before_failure: 10

# Timeout to apply when establishing connections to the helper for HTTP requests, in seconds. See
# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.connect_timeout for
# details. (optional; defaults to 10 seconds)
http_request_connection_timeout_secs: 10

# Timeout to apply to HTTP requests overall (including connection establishment) when communicating
# with the helper. See
# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout for details.
# (optional; defaults to 30 seconds)
http_request_timeout_secs: 30

# Number of sharded database records per batch aggregation. Must not be less
# than the equivalent setting in the aggregator and aggregation job driver.
# (required)
Expand Down
4 changes: 4 additions & 0 deletions integration_tests/src/janus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ impl JanusInProcess {
worker_lease_duration_secs: 10,
worker_lease_clock_skew_allowance_secs: 1,
maximum_attempts_before_failure: 3,
http_request_timeout_secs: 30,
http_request_connection_timeout_secs: 10,
},
taskprov_config: TaskprovConfig::default(),
batch_aggregation_shard_count: 32,
Expand All @@ -198,6 +200,8 @@ impl JanusInProcess {
worker_lease_duration_secs: 10,
worker_lease_clock_skew_allowance_secs: 1,
maximum_attempts_before_failure: 3,
http_request_timeout_secs: 30,
http_request_connection_timeout_secs: 10,
},
batch_aggregation_shard_count: 32,
};
Expand Down

0 comments on commit 8baa237

Please sign in to comment.