diff --git a/aggregator/src/binaries/aggregation_job_driver.rs b/aggregator/src/binaries/aggregation_job_driver.rs index fd67ca96b..260e32ed5 100644 --- a/aggregator/src/binaries/aggregation_job_driver.rs +++ b/aggregator/src/binaries/aggregation_job_driver.rs @@ -21,6 +21,14 @@ pub async fn main_callback(ctx: BinaryContext) -> Re let aggregation_job_driver = Arc::new(AggregationJobDriver::new( reqwest::Client::builder() .user_agent(CLIENT_USER_AGENT) + .timeout(Duration::from_secs( + ctx.config.job_driver_config.http_request_timeout_secs, + )) + .connect_timeout(Duration::from_secs( + ctx.config + .job_driver_config + .http_request_connection_timeout_secs, + )) .build() .context("couldn't create HTTP client")?, &ctx.meter, @@ -152,6 +160,8 @@ mod tests { worker_lease_duration_secs: 600, worker_lease_clock_skew_allowance_secs: 60, maximum_attempts_before_failure: 5, + http_request_timeout_secs: 10, + http_request_connection_timeout_secs: 30, }, batch_aggregation_shard_count: 32, taskprov_config: TaskprovConfig::default(), diff --git a/aggregator/src/binaries/collection_job_driver.rs b/aggregator/src/binaries/collection_job_driver.rs index 3f2a41fea..16d8eb92c 100644 --- a/aggregator/src/binaries/collection_job_driver.rs +++ b/aggregator/src/binaries/collection_job_driver.rs @@ -21,6 +21,14 @@ pub async fn main_callback(ctx: BinaryContext) -> Re let collection_job_driver = Arc::new(CollectionJobDriver::new( reqwest::Client::builder() .user_agent(CLIENT_USER_AGENT) + .timeout(Duration::from_secs( + ctx.config.job_driver_config.http_request_timeout_secs, + )) + .connect_timeout(Duration::from_secs( + ctx.config + .job_driver_config + .http_request_connection_timeout_secs, + )) .build() .context("couldn't create HTTP client")?, &ctx.meter, @@ -148,6 +156,8 @@ mod tests { worker_lease_duration_secs: 600, worker_lease_clock_skew_allowance_secs: 60, maximum_attempts_before_failure: 5, + http_request_timeout_secs: 10, + http_request_connection_timeout_secs: 30, }, batch_aggregation_shard_count: 32, }) diff --git a/aggregator/src/config.rs b/aggregator/src/config.rs index 85daeee28..781c2d591 100644 --- a/aggregator/src/config.rs +++ b/aggregator/src/config.rs @@ -154,6 +154,24 @@ pub struct JobDriverConfig { /// The number of attempts to drive a work item before it is placed in a permanent failure /// state. pub maximum_attempts_before_failure: usize, + /// Timeout to apply when establishing connections to the helper for HTTP requests. See + /// [`reqwest::ClientBuilder::connect_timeout`] for details. + #[serde(default = "JobDriverConfig::default_http_connection_timeout")] + pub http_request_connection_timeout_secs: u64, + /// Timeout to apply to HTTP requests overall (including connection establishment) when + /// communicating with the helper. See [`reqwest::ClientBuilder::timeout`] for details. + #[serde(default = "JobDriverConfig::default_http_request_timeout")] + pub http_request_timeout_secs: u64, +} + +impl JobDriverConfig { + fn default_http_connection_timeout() -> u64 { + 10 + } + + fn default_http_request_timeout() -> u64 { + 30 + } } #[cfg(feature = "test-util")] @@ -251,6 +269,9 @@ mod tests { worker_lease_duration_secs: 600, worker_lease_clock_skew_allowance_secs: 60, maximum_attempts_before_failure: 5, + http_request_connection_timeout_secs: JobDriverConfig::default_http_connection_timeout( + ), + http_request_timeout_secs: JobDriverConfig::default_http_request_timeout(), }) } diff --git a/aggregator/tests/integration/graceful_shutdown.rs b/aggregator/tests/integration/graceful_shutdown.rs index 76652b2c9..a2ab87a39 100644 --- a/aggregator/tests/integration/graceful_shutdown.rs +++ b/aggregator/tests/integration/graceful_shutdown.rs @@ -317,6 +317,8 @@ async fn aggregation_job_driver_shutdown() { worker_lease_duration_secs: 600, worker_lease_clock_skew_allowance_secs: 60, maximum_attempts_before_failure: 5, + http_request_timeout_secs: 10, + http_request_connection_timeout_secs: 30, }, taskprov_config: TaskprovConfig { enabled: false }, batch_aggregation_shard_count: 32, @@ -346,6 +348,8 @@ async fn collection_job_driver_shutdown() { worker_lease_duration_secs: 600, worker_lease_clock_skew_allowance_secs: 60, maximum_attempts_before_failure: 5, + http_request_timeout_secs: 10, + http_request_connection_timeout_secs: 30, }, batch_aggregation_shard_count: 32, }; diff --git a/client/src/lib.rs b/client/src/lib.rs index a066ec3c9..2707684b8 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -86,30 +86,13 @@ impl ClientParameters { leader_aggregator_endpoint: Url, helper_aggregator_endpoint: Url, time_precision: Duration, - ) -> Self { - Self::new_with_backoff( - task_id, - leader_aggregator_endpoint, - helper_aggregator_endpoint, - time_precision, - http_request_exponential_backoff(), - ) - } - - /// Creates a new set of client task parameters with non-default HTTP request retry parameters. - pub fn new_with_backoff( - task_id: TaskId, - leader_aggregator_endpoint: Url, - helper_aggregator_endpoint: Url, - time_precision: Duration, - http_request_retry_parameters: ExponentialBackoff, ) -> Self { Self { task_id, leader_aggregator_endpoint: url_ensure_trailing_slash(leader_aggregator_endpoint), helper_aggregator_endpoint: url_ensure_trailing_slash(helper_aggregator_endpoint), time_precision, - http_request_retry_parameters, + http_request_retry_parameters: http_request_exponential_backoff(), } } @@ -192,6 +175,10 @@ async fn aggregator_hpke_config( /// Construct a [`reqwest::Client`] suitable for use in a DAP [`Client`]. pub fn default_http_client() -> Result { Ok(reqwest::Client::builder() + // Clients wishing to override these timeouts may provide their own + // values using ClientBuilder::with_http_client. + .timeout(std::time::Duration::from_secs(30)) + .connect_timeout(std::time::Duration::from_secs(10)) .user_agent(CLIENT_USER_AGENT) .build()?) } @@ -693,13 +680,13 @@ mod tests { let mut server = mockito::Server::new_async().await; let server_url = Url::parse(&server.url()).unwrap(); let http_client = &default_http_client().unwrap(); - let client_parameters = ClientParameters::new_with_backoff( + let mut client_parameters = ClientParameters::new( random(), server_url.clone(), server_url, Duration::from_seconds(1), - test_http_request_exponential_backoff(), ); + client_parameters.http_request_retry_parameters = test_http_request_exponential_backoff(); let keypair = generate_test_hpke_config_and_private_key(); let hpke_config_list = HpkeConfigList::new(Vec::from([keypair.config().clone()])); @@ -730,13 +717,13 @@ mod tests { let mut server = mockito::Server::new_async().await; let server_url = Url::parse(&server.url()).unwrap(); let http_client = &default_http_client().unwrap(); - let client_parameters = ClientParameters::new_with_backoff( + let mut client_parameters = ClientParameters::new( random(), server_url.clone(), server_url, Duration::from_seconds(1), - test_http_request_exponential_backoff(), ); + client_parameters.http_request_retry_parameters = test_http_request_exponential_backoff(); let encoded_bad_hpke_config = hex!( "64" // HpkeConfigId diff --git a/collector/src/lib.rs b/collector/src/lib.rs index 9889f393c..4c4172462 100644 --- a/collector/src/lib.rs +++ b/collector/src/lib.rs @@ -142,6 +142,10 @@ static COLLECTOR_USER_AGENT: &str = concat!( /// Construct a [`reqwest::Client`] suitable for use in a DAP [`Collector`]. pub fn default_http_client() -> Result { Ok(reqwest::Client::builder() + // Clients may override default timeouts using + // CollectorBuilder::with_http_client + .timeout(StdDuration::from_secs(30)) + .connect_timeout(StdDuration::from_secs(10)) .user_agent(COLLECTOR_USER_AGENT) .build()?) } diff --git a/docs/samples/advanced_config/aggregation_job_driver.yaml b/docs/samples/advanced_config/aggregation_job_driver.yaml index d5a160444..643d93784 100644 --- a/docs/samples/advanced_config/aggregation_job_driver.yaml +++ b/docs/samples/advanced_config/aggregation_job_driver.yaml @@ -87,6 +87,17 @@ worker_lease_clock_skew_allowance_secs: 60 # (required) maximum_attempts_before_failure: 10 +# Timeout to apply when establishing connections to the helper for HTTP requests, in seconds. See +# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.connect_timeout for +# details. (optional; defaults to 10 seconds) +http_request_connection_timeout_secs: 10 + +# Timeout to apply to HTTP requests overall (including connection establishment) when communicating +# with the helper. See +# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout for details. +# (optional; defaults to 30 seconds) +http_request_timeout_secs: 30 + # Number of sharded database records per batch aggregation. Must not be greater # than the equivalent setting in the collection job driver. (required) batch_aggregation_shard_count: 32 diff --git a/docs/samples/advanced_config/collection_job_driver.yaml b/docs/samples/advanced_config/collection_job_driver.yaml index 648e81373..2dd65236b 100644 --- a/docs/samples/advanced_config/collection_job_driver.yaml +++ b/docs/samples/advanced_config/collection_job_driver.yaml @@ -87,6 +87,17 @@ worker_lease_clock_skew_allowance_secs: 60 # (required) maximum_attempts_before_failure: 10 +# Timeout to apply when establishing connections to the helper for HTTP requests, in seconds. See +# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.connect_timeout for +# details. (optional; defaults to 10 seconds) +http_request_connection_timeout_secs: 10 + +# Timeout to apply to HTTP requests overall (including connection establishment) when communicating +# with the helper. See +# https://docs.rs/reqwest/latest/reqwest/struct.ClientBuilder.html#method.timeout for details. +# (optional; defaults to 30 seconds) +http_request_timeout_secs: 30 + # Number of sharded database records per batch aggregation. Must not be less # than the equivalent setting in the aggregator and aggregation job driver. # (required) diff --git a/integration_tests/src/janus.rs b/integration_tests/src/janus.rs index 0d943376f..e3edf3f28 100644 --- a/integration_tests/src/janus.rs +++ b/integration_tests/src/janus.rs @@ -183,6 +183,8 @@ impl JanusInProcess { worker_lease_duration_secs: 10, worker_lease_clock_skew_allowance_secs: 1, maximum_attempts_before_failure: 3, + http_request_timeout_secs: 30, + http_request_connection_timeout_secs: 10, }, taskprov_config: TaskprovConfig::default(), batch_aggregation_shard_count: 32, @@ -198,6 +200,8 @@ impl JanusInProcess { worker_lease_duration_secs: 10, worker_lease_clock_skew_allowance_secs: 1, maximum_attempts_before_failure: 3, + http_request_timeout_secs: 30, + http_request_connection_timeout_secs: 10, }, batch_aggregation_shard_count: 32, };