From f5f0a91d73b4e296e6edfec6ed4ddb513ca0826a Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Fri, 3 May 2024 14:52:17 +0200 Subject: [PATCH 1/8] hook-worker: deny traffic to internal IPs --- hook-worker/src/config.rs | 3 ++ hook-worker/src/dns.rs | 68 +++++++++++++++++++++++++++++++++++++++ hook-worker/src/lib.rs | 1 + hook-worker/src/main.rs | 1 + hook-worker/src/worker.rs | 11 +++++-- 5 files changed, 82 insertions(+), 2 deletions(-) diff --git a/hook-worker/src/config.rs b/hook-worker/src/config.rs index ceb690f..51b23b7 100644 --- a/hook-worker/src/config.rs +++ b/hook-worker/src/config.rs @@ -37,6 +37,9 @@ pub struct Config { #[envconfig(default = "1")] pub dequeue_batch_size: u32, + + #[envconfig(default = "false")] + pub allow_internal_ips: bool, } impl Config { diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index e69de29..52c55bd 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -0,0 +1,68 @@ +use std::error::Error as StdError; +use std::io; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; + +use futures::FutureExt; +use reqwest::dns::{Addrs, Name, Resolve, Resolving}; +use tokio::task::spawn_blocking; + +/// Internal reqwest type, copied here as part of Resolving +pub(crate) type BoxError = Box; + +/// Returns [`true`] if the address appears to be a globally reachable IPv4. +/// +/// Trimmed down version of the unstable IpAddr::is_global, move to it when it's stable. +fn is_global_ipv4(addr: &SocketAddr) -> bool { + match addr.ip() { + IpAddr::V4(ip) => { + !(ip.octets()[0] == 0 // "This network" + || ip.is_private() + || ip.is_loopback() + || ip.is_link_local() + || ip.is_broadcast()) + } + IpAddr::V6(_) => false, // Our network does not currently support ipv6, let's ignore for now + } +} + +/// DNS resolver using the stdlib resolver, but filtering results to only pass public IPv4 results. +/// +/// Private and broadcast addresses are filtered out, so are IPv6 results for now (as our infra +/// does not currently support IPv6 routing anyway). +/// This is adapted from the GaiResolver in hyper and reqwest. +pub struct PublicIPv4Resolver {} + +impl Resolve for PublicIPv4Resolver { + fn resolve(&self, name: Name) -> Resolving { + // Closure to call the system's resolver (blocking call) through the ToSocketAddrs trait. + let resolve_host = move || (name.as_str(), 0).to_socket_addrs(); + + // Execute the blocking call in a separate worker thread then process its result asynchronously. + // spawn_blocking returns a JoinHandle that implements Future>. + let future_result = spawn_blocking(resolve_host).map(|result| match result { + Ok(Ok(addr)) => { + // Resolution succeeded, pass the IPs in a Box after filtering + let addrs: Addrs = Box::new(addr.filter(is_global_ipv4)); + Ok(addrs) + } + Ok(Err(err)) => { + // Resolution failed, pass error through in a Box + let err: BoxError = Box::new(err); + Err(err) + } + Err(join_err) => { + // The tokio task failed, error handled copied from hyper's GaiResolver + if join_err.is_cancelled() { + let err: BoxError = + Box::new(io::Error::new(io::ErrorKind::Interrupted, join_err)); + Err(err) + } else { + panic!("background task failed: {:?}", join_err) + } + } + }); + + // Box the Future to satisfy the Resolving interface. + Box::pin(future_result) + } +} diff --git a/hook-worker/src/lib.rs b/hook-worker/src/lib.rs index 8488d15..94a0758 100644 --- a/hook-worker/src/lib.rs +++ b/hook-worker/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod dns; pub mod error; pub mod util; pub mod worker; diff --git a/hook-worker/src/main.rs b/hook-worker/src/main.rs index 8a6eeb3..050e2b9 100644 --- a/hook-worker/src/main.rs +++ b/hook-worker/src/main.rs @@ -52,6 +52,7 @@ async fn main() -> Result<(), WorkerError> { config.request_timeout.0, config.max_concurrent_jobs, retry_policy_builder.provide(), + config.allow_internal_ips, worker_liveness, ); diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 824f1e2..9a42a90 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -18,6 +18,7 @@ use reqwest::header; use tokio::sync; use tracing::error; +use crate::dns::PublicIPv4Resolver; use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError}; use crate::util::first_n_bytes_of_response; @@ -84,6 +85,7 @@ impl<'p> WebhookWorker<'p> { request_timeout: time::Duration, max_concurrent_jobs: usize, retry_policy: RetryPolicy, + allow_internal_ips: bool, liveness: HealthHandle, ) -> Self { let mut headers = header::HeaderMap::new(); @@ -92,10 +94,14 @@ impl<'p> WebhookWorker<'p> { header::HeaderValue::from_static("application/json"), ); - let client = reqwest::Client::builder() + let mut client_builder = reqwest::Client::builder() .default_headers(headers) .user_agent("PostHog Webhook Worker") - .timeout(request_timeout) + .timeout(request_timeout); + if !allow_internal_ips { + client_builder = client_builder.dns_resolver(Arc::new(PublicIPv4Resolver {})) + } + let client = client_builder .build() .expect("failed to construct reqwest client for webhook worker"); @@ -569,6 +575,7 @@ mod tests { time::Duration::from_millis(5000), 10, RetryPolicy::default(), + false, liveness, ); From e040018aab87ae8807924cfb951b72cd735bee0d Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 13:14:07 +0200 Subject: [PATCH 2/8] add PublicIPv4Resolver error type and make sure it is logged, add worker test --- hook-worker/src/dns.rs | 32 ++++++++++--- hook-worker/src/error.rs | 19 +++++++- hook-worker/src/worker.rs | 94 ++++++++++++++++++++++++++------------- 3 files changed, 109 insertions(+), 36 deletions(-) diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index 52c55bd..ca444ec 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -1,11 +1,25 @@ use std::error::Error as StdError; -use std::io; use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +use std::{fmt, io}; use futures::FutureExt; use reqwest::dns::{Addrs, Name, Resolve, Resolving}; use tokio::task::spawn_blocking; +pub struct NoPublicIPError; + +impl std::error::Error for NoPublicIPError {} +impl fmt::Display for NoPublicIPError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "No public IPv4 found for specified host") + } +} +impl fmt::Debug for NoPublicIPError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "No public IPv4 found for specified host") + } +} + /// Internal reqwest type, copied here as part of Resolving pub(crate) type BoxError = Box; @@ -40,10 +54,18 @@ impl Resolve for PublicIPv4Resolver { // Execute the blocking call in a separate worker thread then process its result asynchronously. // spawn_blocking returns a JoinHandle that implements Future>. let future_result = spawn_blocking(resolve_host).map(|result| match result { - Ok(Ok(addr)) => { - // Resolution succeeded, pass the IPs in a Box after filtering - let addrs: Addrs = Box::new(addr.filter(is_global_ipv4)); - Ok(addrs) + Ok(Ok(all_addrs)) => { + // Resolution succeeded, filter the results + let filtered_addr: Vec = all_addrs.filter(is_global_ipv4).collect(); + if filtered_addr.is_empty() { + // No public IPs found, error out with PermissionDenied + let err: BoxError = Box::new(NoPublicIPError); + Err(err) + } else { + // Pass remaining IPs in a boxed iterator for request to use. + let addrs: Addrs = Box::new(filtered_addr.into_iter()); + Ok(addrs) + } } Ok(Err(err)) => { // Resolution failed, pass error through in a Box diff --git a/hook-worker/src/error.rs b/hook-worker/src/error.rs index 48468bc..207049e 100644 --- a/hook-worker/src/error.rs +++ b/hook-worker/src/error.rs @@ -1,6 +1,8 @@ +use std::error::Error; use std::fmt; use std::time; +use crate::dns::NoPublicIPError; use hook_common::{pgqueue, webhook::WebhookJobError}; use thiserror::Error; @@ -64,7 +66,11 @@ impl fmt::Display for WebhookRequestError { Some(m) => m.to_string(), None => "No response from the server".to_string(), }; - writeln!(f, "{}", error)?; + if is_error_source::(error) { + writeln!(f, "{}: {}", error ,NoPublicIPError)?; + } else { + writeln!(f, "{}", error)?; + } write!(f, "{}", response_message)?; Ok(()) @@ -132,3 +138,14 @@ pub enum WorkerError { #[error("timed out while waiting for jobs to be available")] TimeoutError, } + +/// Check the error and it's sources (recursively) to return true if an error of the given type is found. +pub fn is_error_source(err: &(dyn std::error::Error + 'static)) -> bool { + if err.downcast_ref::().is_some() { + return true; + } + match err.source() { + None => false, + Some(source) => is_error_source::(source), + } +} diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 9a42a90..7fc3d0e 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -14,7 +14,7 @@ use hook_common::{ webhook::{HttpMethod, WebhookJobError, WebhookJobMetadata, WebhookJobParameters}, }; use http::StatusCode; -use reqwest::header; +use reqwest::{header, Client}; use tokio::sync; use tracing::error; @@ -75,6 +75,25 @@ pub struct WebhookWorker<'p> { liveness: HealthHandle, } +pub fn build_http_client( + request_timeout: time::Duration, + allow_internal_ips: bool, +) -> reqwest::Result { + let mut headers = header::HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + header::HeaderValue::from_static("application/json"), + ); + let mut client_builder = reqwest::Client::builder() + .default_headers(headers) + .user_agent("PostHog Webhook Worker") + .timeout(request_timeout); + if !allow_internal_ips { + client_builder = client_builder.dns_resolver(Arc::new(PublicIPv4Resolver {})) + } + client_builder.build() +} + impl<'p> WebhookWorker<'p> { #[allow(clippy::too_many_arguments)] pub fn new( @@ -88,21 +107,7 @@ impl<'p> WebhookWorker<'p> { allow_internal_ips: bool, liveness: HealthHandle, ) -> Self { - let mut headers = header::HeaderMap::new(); - headers.insert( - header::CONTENT_TYPE, - header::HeaderValue::from_static("application/json"), - ); - - let mut client_builder = reqwest::Client::builder() - .default_headers(headers) - .user_agent("PostHog Webhook Worker") - .timeout(request_timeout); - if !allow_internal_ips { - client_builder = client_builder.dns_resolver(Arc::new(PublicIPv4Resolver {})) - } - let client = client_builder - .build() + let client = build_http_client(request_timeout, allow_internal_ips) .expect("failed to construct reqwest client for webhook worker"); Self { @@ -475,6 +480,7 @@ fn parse_retry_after_header(header_map: &reqwest::header::HeaderMap) -> Option Client { + build_http_client(Duration::from_secs(1), true).expect("failed to create client") + } + #[allow(dead_code)] async fn enqueue_job( queue: &PgQueue, @@ -565,8 +577,8 @@ mod tests { webhook_job_parameters.clone(), webhook_job_metadata, ) - .await - .expect("failed to enqueue job"); + .await + .expect("failed to enqueue job"); let worker = WebhookWorker::new( &worker_id, &queue, @@ -601,15 +613,14 @@ mod tests { assert!(registry.get_status().healthy) } - #[sqlx::test(migrations = "../migrations")] - async fn test_send_webhook(_pg: PgPool) { + #[tokio::test] + async fn test_send_webhook() { let method = HttpMethod::POST; let url = "http://localhost:18081/echo"; let headers = collections::HashMap::new(); let body = "a very relevant request body"; - let client = reqwest::Client::new(); - let response = send_webhook(client, &method, url, &headers, body.to_owned()) + let response = send_webhook(localhost_client(), &method, url, &headers, body.to_owned()) .await .expect("send_webhook failed"); @@ -620,15 +631,14 @@ mod tests { ); } - #[sqlx::test(migrations = "../migrations")] - async fn test_error_message_contains_response_body(_pg: PgPool) { + #[tokio::test] + async fn test_error_message_contains_response_body() { let method = HttpMethod::POST; let url = "http://localhost:18081/fail"; let headers = collections::HashMap::new(); let body = "this is an error message"; - let client = reqwest::Client::new(); - let err = send_webhook(client, &method, url, &headers, body.to_owned()) + let err = send_webhook(localhost_client(), &method, url, &headers, body.to_owned()) .await .err() .expect("request didn't fail when it should have failed"); @@ -645,17 +655,16 @@ mod tests { } } - #[sqlx::test(migrations = "../migrations")] - async fn test_error_message_contains_up_to_n_bytes_of_response_body(_pg: PgPool) { + #[tokio::test] + async fn test_error_message_contains_up_to_n_bytes_of_response_body() { let method = HttpMethod::POST; let url = "http://localhost:18081/fail"; let headers = collections::HashMap::new(); // This is double the current hardcoded amount of bytes. // TODO: Make this configurable and change it here too. let body = (0..20 * 1024).map(|_| "a").collect::>().concat(); - let client = reqwest::Client::new(); - let err = send_webhook(client, &method, url, &headers, body.to_owned()) + let err = send_webhook(localhost_client(), &method, url, &headers, body.to_owned()) .await .err() .expect("request didn't fail when it should have failed"); @@ -673,4 +682,29 @@ mod tests { )); } } + + #[tokio::test] + async fn test_private_ips_denied() { + let method = HttpMethod::POST; + let url = "http://localhost:18081/echo"; + let headers = collections::HashMap::new(); + let body = "a very relevant request body"; + let filtering_client = + build_http_client(Duration::from_secs(1), false).expect("failed to create client"); + + let err = send_webhook(filtering_client, &method, url, &headers, body.to_owned()) + .await + .err() + .expect("request didn't fail when it should have failed"); + + assert!(matches!(err, WebhookError::Request(..))); + if let WebhookError::Request(request_error) = err { + assert_eq!(request_error.status(), None); + assert!(request_error + .to_string() + .contains("No public IPv4 found for specified host")); + } else { + panic!("unexpected error type {:?}", err) + } + } } From 2e62a7646116d1b2eb5dedcafe63ad02b5459c46 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 13:40:12 +0200 Subject: [PATCH 3/8] add tests --- hook-worker/src/dns.rs | 50 +++++++++++++++++++++++++++++++++++++++ hook-worker/src/error.rs | 5 ++-- hook-worker/src/worker.rs | 4 ++-- 3 files changed, 55 insertions(+), 4 deletions(-) diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index ca444ec..54c574b 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -88,3 +88,53 @@ impl Resolve for PublicIPv4Resolver { Box::pin(future_result) } } + +mod tests { + use super::*; + use std::str::FromStr; + + #[tokio::test] + async fn it_resolves_google_com() { + let resolver: PublicIPv4Resolver = PublicIPv4Resolver {}; + let addrs = resolver + .resolve(Name::from_str("google.com").unwrap()) + .await + .expect("lookup has failed"); + assert!(addrs.count() > 0, "empty address list") + } + + #[tokio::test] + async fn it_denies_ipv6_google_com() { + let resolver: PublicIPv4Resolver = PublicIPv4Resolver {}; + match resolver + .resolve(Name::from_str("ipv6.google.com").unwrap()) + .await + { + Ok(_) => panic!("should have failed"), + Err(err) => assert!(err.downcast_ref::().is_some()), + } + } + + #[tokio::test] + async fn it_denies_localhost() { + let resolver: PublicIPv4Resolver = PublicIPv4Resolver {}; + match resolver.resolve(Name::from_str("localhost").unwrap()).await { + Ok(_) => panic!("should have failed"), + Err(err) => assert!(err.is::()), + } + } + + #[tokio::test] + async fn it_propagates_unknown_domain() { + let resolver: PublicIPv4Resolver = PublicIPv4Resolver {}; + match resolver + .resolve(Name::from_str("invalid.domain.unknown").unwrap()) + .await + { + Ok(_) => panic!("should have failed"), + Err(err) => assert!(err + .to_string() + .contains("failed to lookup address information")), + } + } +} diff --git a/hook-worker/src/error.rs b/hook-worker/src/error.rs index 207049e..68a077e 100644 --- a/hook-worker/src/error.rs +++ b/hook-worker/src/error.rs @@ -67,7 +67,7 @@ impl fmt::Display for WebhookRequestError { None => "No response from the server".to_string(), }; if is_error_source::(error) { - writeln!(f, "{}: {}", error ,NoPublicIPError)?; + writeln!(f, "{}: {}", error, NoPublicIPError)?; } else { writeln!(f, "{}", error)?; } @@ -140,8 +140,9 @@ pub enum WorkerError { } /// Check the error and it's sources (recursively) to return true if an error of the given type is found. +/// TODO: use Error::sources() when stable pub fn is_error_source(err: &(dyn std::error::Error + 'static)) -> bool { - if err.downcast_ref::().is_some() { + if err.is::() { return true; } match err.source() { diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 7fc3d0e..7e2693e 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -577,8 +577,8 @@ mod tests { webhook_job_parameters.clone(), webhook_job_metadata, ) - .await - .expect("failed to enqueue job"); + .await + .expect("failed to enqueue job"); let worker = WebhookWorker::new( &worker_id, &queue, From 19611413334cac034286224281b5b4043ce288b7 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 13:42:47 +0200 Subject: [PATCH 4/8] fmt --- hook-worker/src/dns.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index 54c574b..a581f44 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -89,8 +89,10 @@ impl Resolve for PublicIPv4Resolver { } } +#[cfg(test)] mod tests { - use super::*; + use crate::dns::{NoPublicIPError, PublicIPv4Resolver}; + use reqwest::dns::{Name, Resolve}; use std::str::FromStr; #[tokio::test] From 915f6248bd64cfa8efb4f9901ec14b0c889c9075 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 14:26:45 +0200 Subject: [PATCH 5/8] rename error type --- hook-worker/src/dns.rs | 16 ++++++++-------- hook-worker/src/error.rs | 8 ++++---- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index a581f44..14ba987 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -6,15 +6,15 @@ use futures::FutureExt; use reqwest::dns::{Addrs, Name, Resolve, Resolving}; use tokio::task::spawn_blocking; -pub struct NoPublicIPError; +pub struct NoPublicIPv4Error; -impl std::error::Error for NoPublicIPError {} -impl fmt::Display for NoPublicIPError { +impl std::error::Error for NoPublicIPv4Error {} +impl fmt::Display for NoPublicIPv4Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "No public IPv4 found for specified host") } } -impl fmt::Debug for NoPublicIPError { +impl fmt::Debug for NoPublicIPv4Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "No public IPv4 found for specified host") } @@ -59,7 +59,7 @@ impl Resolve for PublicIPv4Resolver { let filtered_addr: Vec = all_addrs.filter(is_global_ipv4).collect(); if filtered_addr.is_empty() { // No public IPs found, error out with PermissionDenied - let err: BoxError = Box::new(NoPublicIPError); + let err: BoxError = Box::new(NoPublicIPv4Error); Err(err) } else { // Pass remaining IPs in a boxed iterator for request to use. @@ -91,7 +91,7 @@ impl Resolve for PublicIPv4Resolver { #[cfg(test)] mod tests { - use crate::dns::{NoPublicIPError, PublicIPv4Resolver}; + use crate::dns::{NoPublicIPv4Error, PublicIPv4Resolver}; use reqwest::dns::{Name, Resolve}; use std::str::FromStr; @@ -113,7 +113,7 @@ mod tests { .await { Ok(_) => panic!("should have failed"), - Err(err) => assert!(err.downcast_ref::().is_some()), + Err(err) => assert!(err.downcast_ref::().is_some()), } } @@ -122,7 +122,7 @@ mod tests { let resolver: PublicIPv4Resolver = PublicIPv4Resolver {}; match resolver.resolve(Name::from_str("localhost").unwrap()).await { Ok(_) => panic!("should have failed"), - Err(err) => assert!(err.is::()), + Err(err) => assert!(err.is::()), } } diff --git a/hook-worker/src/error.rs b/hook-worker/src/error.rs index 68a077e..764e8d9 100644 --- a/hook-worker/src/error.rs +++ b/hook-worker/src/error.rs @@ -2,7 +2,7 @@ use std::error::Error; use std::fmt; use std::time; -use crate::dns::NoPublicIPError; +use crate::dns::NoPublicIPv4Error; use hook_common::{pgqueue, webhook::WebhookJobError}; use thiserror::Error; @@ -66,8 +66,8 @@ impl fmt::Display for WebhookRequestError { Some(m) => m.to_string(), None => "No response from the server".to_string(), }; - if is_error_source::(error) { - writeln!(f, "{}: {}", error, NoPublicIPError)?; + if is_error_source::(error) { + writeln!(f, "{}: {}", error, NoPublicIPv4Error)?; } else { writeln!(f, "{}", error)?; } @@ -142,7 +142,7 @@ pub enum WorkerError { /// Check the error and it's sources (recursively) to return true if an error of the given type is found. /// TODO: use Error::sources() when stable pub fn is_error_source(err: &(dyn std::error::Error + 'static)) -> bool { - if err.is::() { + if err.is::() { return true; } match err.source() { From 61c05169c6e92cc21f23fa1d45f5c188accba842 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 14:29:19 +0200 Subject: [PATCH 6/8] simplify join_err handling --- hook-worker/src/dns.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index 14ba987..d55a281 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -73,14 +73,9 @@ impl Resolve for PublicIPv4Resolver { Err(err) } Err(join_err) => { - // The tokio task failed, error handled copied from hyper's GaiResolver - if join_err.is_cancelled() { - let err: BoxError = - Box::new(io::Error::new(io::ErrorKind::Interrupted, join_err)); - Err(err) - } else { - panic!("background task failed: {:?}", join_err) - } + // The tokio task failed, pass as io::Error in a Box + let err: BoxError = Box::new(io::Error::from(join_err)); + Err(err) } }); From 4e0a5469b365a1e0a3e63348c0a08ed46ca8b9f5 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 14:30:55 +0200 Subject: [PATCH 7/8] improve testing --- hook-worker/src/dns.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index d55a281..36fd7a0 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -108,7 +108,7 @@ mod tests { .await { Ok(_) => panic!("should have failed"), - Err(err) => assert!(err.downcast_ref::().is_some()), + Err(err) => assert!(err.is::()), } } @@ -122,16 +122,19 @@ mod tests { } #[tokio::test] - async fn it_propagates_unknown_domain() { + async fn it_bubbles_up_resolution_error() { let resolver: PublicIPv4Resolver = PublicIPv4Resolver {}; match resolver .resolve(Name::from_str("invalid.domain.unknown").unwrap()) .await { Ok(_) => panic!("should have failed"), - Err(err) => assert!(err - .to_string() - .contains("failed to lookup address information")), + Err(err) => { + assert!(!err.is::()); + assert!(err + .to_string() + .contains("failed to lookup address information")) + } } } } From f2df29f4802c9b8bc81126926f92854134355f6e Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Mon, 6 May 2024 16:03:24 +0200 Subject: [PATCH 8/8] don't retry on NoPublicIPv4Error --- hook-worker/src/worker.rs | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 7e2693e..fdb405a 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -18,8 +18,10 @@ use reqwest::{header, Client}; use tokio::sync; use tracing::error; -use crate::dns::PublicIPv4Resolver; -use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError}; +use crate::dns::{NoPublicIPv4Error, PublicIPv4Resolver}; +use crate::error::{ + is_error_source, WebhookError, WebhookParseError, WebhookRequestError, WorkerError, +}; use crate::util::first_n_bytes_of_response; /// A WebhookJob is any `PgQueueJob` with `WebhookJobParameters` and `WebhookJobMetadata`. @@ -401,10 +403,19 @@ async fn send_webhook( .body(body) .send() .await - .map_err(|e| WebhookRequestError::RetryableRequestError { - error: e, - response: None, - retry_after: None, + .map_err(|e| { + if is_error_source::(&e) { + WebhookRequestError::NonRetryableRetryableRequestError { + error: e, + response: None, + } + } else { + WebhookRequestError::RetryableRequestError { + error: e, + response: None, + retry_after: None, + } + } })?; let retry_after = parse_retry_after_header(response.headers()); @@ -703,6 +714,9 @@ mod tests { assert!(request_error .to_string() .contains("No public IPv4 found for specified host")); + if let WebhookRequestError::RetryableRequestError { .. } = request_error { + panic!("error should not be retryable") + } } else { panic!("unexpected error type {:?}", err) }