diff --git a/hook-worker/src/config.rs b/hook-worker/src/config.rs index ceb690f..51b23b7 100644 --- a/hook-worker/src/config.rs +++ b/hook-worker/src/config.rs @@ -37,6 +37,9 @@ pub struct Config { #[envconfig(default = "1")] pub dequeue_batch_size: u32, + + #[envconfig(default = "false")] + pub allow_internal_ips: bool, } impl Config { diff --git a/hook-worker/src/dns.rs b/hook-worker/src/dns.rs index e69de29..b69c91b 100644 --- a/hook-worker/src/dns.rs +++ b/hook-worker/src/dns.rs @@ -0,0 +1,66 @@ +use std::error::Error as StdError; +use std::io; +use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; + +use futures::FutureExt; +use reqwest::dns::{Addrs, Name, Resolve, Resolving}; +use tokio::task::spawn_blocking; + +/// Internal reqwest type, copied here as part of Resolving +pub(crate) type BoxError = Box; + +/// Returns [`true`] if the address appears to be a globally reachable IPv4. +/// +/// Trimmed down version of the unstable IpAddr::is_global, move to it when it's stable. +fn is_global_ipv4(addr: &SocketAddr) -> bool { + match addr.ip() { + IpAddr::V4(ip) => { + !(ip.octets()[0] == 0 // "This network" + || ip.is_private() + || ip.is_loopback() + || ip.is_link_local() + || ip.is_broadcast()) + } + IpAddr::V6(_) => false, // Our network does not currently support ipv6, let's ignore for now + } +} + +/// DNS resolver using the stdlib resolver, but filtering results to only pass public IPv4 results. +/// +/// Private and broadcast addresses are filtered out, so are IPv6 results for now (as our infra +/// does not currently support IPv6 routing anyway). +/// This is adapted from the GaiResolver in hyper and reqwest. +pub struct PublicIPv4Resolver {} + +impl Resolve for PublicIPv4Resolver { + fn resolve(&self, name: Name) -> Resolving { + // Invokes the ToSocketAddrs trait to call the system's resolver. Blocking call. + let resolve_host = move || (name.as_str(), 0).to_socket_addrs(); + + let future_result = spawn_blocking(resolve_host).map(|result| match result { + Ok(Ok(addr)) => { + // Resolution succeeded, pass the IPs in a Box after filtering + let addrs: Addrs = Box::new(addr.filter(is_global_ipv4)); + Ok(addrs) + } + Ok(Err(err)) => { + // Resolution failed, pass error through in a Box + let err: BoxError = Box::new(err); + Err(err) + } + Err(join_err) => { + // The tokio task failed, error handled copied from hyper's GaiResolver + if join_err.is_cancelled() { + let err: BoxError = + Box::new(io::Error::new(io::ErrorKind::Interrupted, join_err)); + Err(err) + } else { + panic!("background task failed: {:?}", join_err) + } + } + }); + + // Box the Future to satisfy the Resolving interface. + Box::pin(future_result) + } +} diff --git a/hook-worker/src/lib.rs b/hook-worker/src/lib.rs index 8488d15..94a0758 100644 --- a/hook-worker/src/lib.rs +++ b/hook-worker/src/lib.rs @@ -1,4 +1,5 @@ pub mod config; +pub mod dns; pub mod error; pub mod util; pub mod worker; diff --git a/hook-worker/src/main.rs b/hook-worker/src/main.rs index 8a6eeb3..050e2b9 100644 --- a/hook-worker/src/main.rs +++ b/hook-worker/src/main.rs @@ -52,6 +52,7 @@ async fn main() -> Result<(), WorkerError> { config.request_timeout.0, config.max_concurrent_jobs, retry_policy_builder.provide(), + config.allow_internal_ips, worker_liveness, ); diff --git a/hook-worker/src/worker.rs b/hook-worker/src/worker.rs index 824f1e2..9a42a90 100644 --- a/hook-worker/src/worker.rs +++ b/hook-worker/src/worker.rs @@ -18,6 +18,7 @@ use reqwest::header; use tokio::sync; use tracing::error; +use crate::dns::PublicIPv4Resolver; use crate::error::{WebhookError, WebhookParseError, WebhookRequestError, WorkerError}; use crate::util::first_n_bytes_of_response; @@ -84,6 +85,7 @@ impl<'p> WebhookWorker<'p> { request_timeout: time::Duration, max_concurrent_jobs: usize, retry_policy: RetryPolicy, + allow_internal_ips: bool, liveness: HealthHandle, ) -> Self { let mut headers = header::HeaderMap::new(); @@ -92,10 +94,14 @@ impl<'p> WebhookWorker<'p> { header::HeaderValue::from_static("application/json"), ); - let client = reqwest::Client::builder() + let mut client_builder = reqwest::Client::builder() .default_headers(headers) .user_agent("PostHog Webhook Worker") - .timeout(request_timeout) + .timeout(request_timeout); + if !allow_internal_ips { + client_builder = client_builder.dns_resolver(Arc::new(PublicIPv4Resolver {})) + } + let client = client_builder .build() .expect("failed to construct reqwest client for webhook worker"); @@ -569,6 +575,7 @@ mod tests { time::Duration::from_millis(5000), 10, RetryPolicy::default(), + false, liveness, );