Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

hook-worker: deny traffic to internal IPs and IPv6 #35

Merged
merged 8 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hook-worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub struct Config {

#[envconfig(default = "1")]
pub dequeue_batch_size: u32,

#[envconfig(default = "false")]
pub allow_internal_ips: bool,
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this ever be true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

end2end tests against containerized endpoint, local testing, hobby deploy

}

impl Config {
Expand Down
140 changes: 140 additions & 0 deletions hook-worker/src/dns.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::error::Error as StdError;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::{fmt, io};

use futures::FutureExt;
use reqwest::dns::{Addrs, Name, Resolve, Resolving};
use tokio::task::spawn_blocking;

pub struct NoPublicIPv4Error;

impl std::error::Error for NoPublicIPv4Error {}
impl fmt::Display for NoPublicIPv4Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "No public IPv4 found for specified host")
}
}
impl fmt::Debug for NoPublicIPv4Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "No public IPv4 found for specified host")
}
}

/// Internal reqwest type, copied here as part of Resolving
pub(crate) type BoxError = Box<dyn StdError + Send + Sync>;

/// Returns [`true`] if the address appears to be a globally reachable IPv4.
///
/// Trimmed down version of the unstable IpAddr::is_global, move to it when it's stable.
fn is_global_ipv4(addr: &SocketAddr) -> bool {
Comment on lines +26 to +29
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As clear as it gets, very nice 👍

match addr.ip() {
IpAddr::V4(ip) => {
!(ip.octets()[0] == 0 // "This network"
|| ip.is_private()
|| ip.is_loopback()
|| ip.is_link_local()
|| ip.is_broadcast())
}
IpAddr::V6(_) => false, // Our network does not currently support ipv6, let's ignore for now
}
}

/// DNS resolver using the stdlib resolver, but filtering results to only pass public IPv4 results.
///
/// Private and broadcast addresses are filtered out, so are IPv6 results for now (as our infra
/// does not currently support IPv6 routing anyway).
/// This is adapted from the GaiResolver in hyper and reqwest.
pub struct PublicIPv4Resolver {}

impl Resolve for PublicIPv4Resolver {
fn resolve(&self, name: Name) -> Resolving {
// Closure to call the system's resolver (blocking call) through the ToSocketAddrs trait.
let resolve_host = move || (name.as_str(), 0).to_socket_addrs();

// Execute the blocking call in a separate worker thread then process its result asynchronously.
// spawn_blocking returns a JoinHandle that implements Future<Result<(closure result), JoinError>>.
let future_result = spawn_blocking(resolve_host).map(|result| match result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any risk of running out of blocking threads? Should we cache these resolutions in some way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • spawn_blocking's doc indicates that a thread poll is used, and tasks are queued if no thread is available. The default pool max size of 512 is way higher than our concurrency. That's also what the default resolver does.

  • The glibc already handles caching, so caching again here would be a premature optimization. Also, it makes debugging harder: I spend weeks working around grpc-java's internal DNS caching creating errors when services rolled out.

Copy link
Contributor

@bretthoerner bretthoerner May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had this skeleton with all but the (sync) IP range blocking logic decided, where I stole from the reqwest GaiResolver and didn't use spawn_blocking: 825a9bc

🤷 If it's helpful, maybe I'm missing something there, I haven't woken up yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HyperGaiResolver::call boils down to tokio::task::spawn_blocking(move || { (&*name.host, 0).to_socket_addrs().map(|i| SocketAddrs { iter: i }), which is what I inlined here instead of adding another dependency.

I'm open to erroring out instead of filtering out, which could help with debugging. That would probably require us to implement filtering out private IPV6 too while it's not needed.

Ok(Ok(all_addrs)) => {
// Resolution succeeded, filter the results
let filtered_addr: Vec<SocketAddr> = all_addrs.filter(is_global_ipv4).collect();
if filtered_addr.is_empty() {
// No public IPs found, error out with PermissionDenied
let err: BoxError = Box::new(NoPublicIPv4Error);
Err(err)
} else {
// Pass remaining IPs in a boxed iterator for request to use.
let addrs: Addrs = Box::new(filtered_addr.into_iter());
Ok(addrs)
}
}
Ok(Err(err)) => {
// Resolution failed, pass error through in a Box
let err: BoxError = Box::new(err);
Err(err)
}
Err(join_err) => {
// The tokio task failed, pass as io::Error in a Box
let err: BoxError = Box::new(io::Error::from(join_err));
Err(err)
}
});

// Box the Future to satisfy the Resolving interface.
Box::pin(future_result)
}
}

#[cfg(test)]
mod tests {
use crate::dns::{NoPublicIPv4Error, PublicIPv4Resolver};
use reqwest::dns::{Name, Resolve};
use std::str::FromStr;

#[tokio::test]
async fn it_resolves_google_com() {
let resolver: PublicIPv4Resolver = PublicIPv4Resolver {};
let addrs = resolver
.resolve(Name::from_str("google.com").unwrap())
.await
.expect("lookup has failed");
assert!(addrs.count() > 0, "empty address list")
}

#[tokio::test]
async fn it_denies_ipv6_google_com() {
let resolver: PublicIPv4Resolver = PublicIPv4Resolver {};
match resolver
.resolve(Name::from_str("ipv6.google.com").unwrap())
.await
{
Ok(_) => panic!("should have failed"),
Err(err) => assert!(err.is::<NoPublicIPv4Error>()),
}
}

#[tokio::test]
async fn it_denies_localhost() {
let resolver: PublicIPv4Resolver = PublicIPv4Resolver {};
match resolver.resolve(Name::from_str("localhost").unwrap()).await {
Ok(_) => panic!("should have failed"),
Err(err) => assert!(err.is::<NoPublicIPv4Error>()),
}
}

#[tokio::test]
async fn it_bubbles_up_resolution_error() {
let resolver: PublicIPv4Resolver = PublicIPv4Resolver {};
match resolver
.resolve(Name::from_str("invalid.domain.unknown").unwrap())
.await
{
Ok(_) => panic!("should have failed"),
Err(err) => {
assert!(!err.is::<NoPublicIPv4Error>());
assert!(err
.to_string()
.contains("failed to lookup address information"))
}
}
}
}
20 changes: 19 additions & 1 deletion hook-worker/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::error::Error;
use std::fmt;
use std::time;

use crate::dns::NoPublicIPv4Error;
use hook_common::{pgqueue, webhook::WebhookJobError};
use thiserror::Error;

Expand Down Expand Up @@ -64,7 +66,11 @@ impl fmt::Display for WebhookRequestError {
Some(m) => m.to_string(),
None => "No response from the server".to_string(),
};
writeln!(f, "{}", error)?;
if is_error_source::<NoPublicIPv4Error>(error) {
writeln!(f, "{}: {}", error, NoPublicIPv4Error)?;
} else {
writeln!(f, "{}", error)?;
}
write!(f, "{}", response_message)?;

Ok(())
Expand Down Expand Up @@ -132,3 +138,15 @@ pub enum WorkerError {
#[error("timed out while waiting for jobs to be available")]
TimeoutError,
}

/// Check the error and it's sources (recursively) to return true if an error of the given type is found.
/// TODO: use Error::sources() when stable
pub fn is_error_source<T: Error + 'static>(err: &(dyn std::error::Error + 'static)) -> bool {
if err.is::<NoPublicIPv4Error>() {
return true;
}
match err.source() {
None => false,
Some(source) => is_error_source::<T>(source),
}
}
1 change: 1 addition & 0 deletions hook-worker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod config;
pub mod dns;
pub mod error;
pub mod util;
pub mod worker;
1 change: 1 addition & 0 deletions hook-worker/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ async fn main() -> Result<(), WorkerError> {
config.request_timeout.0,
config.max_concurrent_jobs,
retry_policy_builder.provide(),
config.allow_internal_ips,
worker_liveness,
);

Expand Down
Loading
Loading