Skip to content

Commit

Permalink
Implement interface filtering (unix only)
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Jan 30, 2024
1 parent 362e5a5 commit 647c3ab
Show file tree
Hide file tree
Showing 11 changed files with 223 additions and 52 deletions.
67 changes: 67 additions & 0 deletions commons/zenoh-util/src/std_only/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,73 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult<u32> {
}
}

pub fn get_interface_by_addr(addr: IpAddr) -> Vec<String> {
#[cfg(unix)]
{
if addr.is_unspecified() {
pnet_datalink::interfaces()
.iter()
.map(|iface| iface.name.clone())
.collect::<Vec<String>>()
} else {
pnet_datalink::interfaces()
.iter()
.filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr))
.map(|iface| iface.name.clone())
.collect::<Vec<String>>()
}
}
#[cfg(windows)]
{
// TODO(sashacmc): check and fix
unsafe {
use crate::ffi;
use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH;

let mut ret;
let mut retries = 0;
let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE;
let mut buffer: Vec<u8>;
loop {
buffer = Vec::with_capacity(size as usize);
ret = winapi::um::iphlpapi::GetAdaptersAddresses(
winapi::shared::ws2def::AF_INET.try_into().unwrap(),
0,
std::ptr::null_mut(),
buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH,
&mut size,
);
if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW {
break;
}
if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES {
break;
}
retries += 1;
}

if ret != 0 {
bail!("GetAdaptersAddresses returned {}", ret)
}

let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref();
while let Some(iface) = next_iface {
let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref();
while let Some(ucast_addr) = next_ucast_addr {
if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) {
if ifaddr.ip() == addr {
return Ok(iface.AdapterName);
}
}
next_ucast_addr = ucast_addr.Next.as_ref();
}
next_iface = iface.Next.as_ref();
}
bail!("No interface found with address {addr}")
}
}
}

pub fn get_ipv4_ipaddrs() -> Vec<IpAddr> {
get_local_addresses()
.unwrap_or_else(|_| vec![])
Expand Down
7 changes: 4 additions & 3 deletions io/zenoh-link-commons/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern crate alloc;
mod multicast;
mod unicast;

use alloc::{borrow::ToOwned, boxed::Box, string::String};
use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec};
use async_trait::async_trait;
use core::{cmp::PartialEq, fmt, hash::Hash};
pub use multicast::*;
Expand All @@ -43,8 +43,7 @@ pub struct Link {
pub mtu: u16,
pub is_reliable: bool,
pub is_streamed: bool,
// there no method to check interface
// may be will be better just add interface there in place of method?
pub interfaces: Vec<String>,
}

#[async_trait]
Expand Down Expand Up @@ -73,6 +72,7 @@ impl From<&LinkUnicast> for Link {
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: link.is_streamed(),
interfaces: link.get_interfaces(),
}
}
}
Expand All @@ -92,6 +92,7 @@ impl From<&LinkMulticast> for Link {
mtu: link.get_mtu(),
is_reliable: link.is_reliable(),
is_streamed: false,
interfaces: vec![],
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions io/zenoh-link-commons/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec};
use async_trait::async_trait;
use core::{
fmt,
Expand Down Expand Up @@ -45,7 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync {
fn get_dst(&self) -> &Locator;
fn is_reliable(&self) -> bool;
fn is_streamed(&self) -> bool;
fn is_matched_to_interface(&self, interface: &str) -> bool;
fn get_interfaces(&self) -> Vec<String>;
async fn write(&self, buffer: &[u8]) -> ZResult<usize>;
async fn write_all(&self, buffer: &[u8]) -> ZResult<()>;
async fn read(&self, buffer: &mut [u8]) -> ZResult<usize>;
Expand Down
11 changes: 6 additions & 5 deletions io/zenoh-links/zenoh-link-quic/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ impl LinkUnicastTrait for LinkUnicastQuic {
&self.dst_locator
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
*QUIC_DEFAULT_MTU
Expand All @@ -152,11 +158,6 @@ impl LinkUnicastTrait for LinkUnicastQuic {
fn is_streamed(&self) -> bool {
true
}

fn is_matched_to_interface(&self, _interface: &str) -> bool {
// Not supported for now
false
}
}

impl Drop for LinkUnicastQuic {
Expand Down
18 changes: 7 additions & 11 deletions io/zenoh-links/zenoh-link-tcp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ impl LinkUnicastTrait for LinkUnicastTcp {
&self.dst_locator
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
let res = zenoh_util::net::get_interface_by_addr(self.src_addr.ip());
log::debug!("get_interfaces for {:?}: {:?}", self.src_addr.ip(), res);
res
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
*TCP_DEFAULT_MTU
Expand All @@ -153,17 +160,6 @@ impl LinkUnicastTrait for LinkUnicastTcp {
fn is_streamed(&self) -> bool {
true
}

fn is_matched_to_interface(&self, name: &str) -> bool {
if let Ok(opt_addr) = zenoh_util::net::get_interface(name.trim()) {
if let Some(addr) = opt_addr {
if addr == self.src_addr.ip() {
return true;
}
}
}
false
}
}

impl Drop for LinkUnicastTcp {
Expand Down
11 changes: 6 additions & 5 deletions io/zenoh-links/zenoh-link-tls/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ impl LinkUnicastTrait for LinkUnicastTls {
*TLS_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand All @@ -204,11 +210,6 @@ impl LinkUnicastTrait for LinkUnicastTls {
fn is_streamed(&self) -> bool {
true
}

fn is_matched_to_interface(&self, _interface: &str) -> bool {
// Not supported for now
false
}
}

impl Drop for LinkUnicastTls {
Expand Down
11 changes: 6 additions & 5 deletions io/zenoh-links/zenoh-link-udp/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ impl LinkUnicastTrait for LinkUnicastUdp {
&self.dst_locator
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn get_mtu(&self) -> u16 {
*UDP_DEFAULT_MTU
Expand All @@ -217,11 +223,6 @@ impl LinkUnicastTrait for LinkUnicastUdp {
fn is_streamed(&self) -> bool {
false
}

fn is_matched_to_interface(&self, _interface: &str) -> bool {
// Not supported for now
false
}
}

impl fmt::Display for LinkUnicastUdp {
Expand Down
11 changes: 6 additions & 5 deletions io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream {
*UNIXSOCKSTREAM_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand All @@ -124,11 +130,6 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream {
fn is_streamed(&self) -> bool {
true
}

fn is_matched_to_interface(&self, _interface: &str) -> bool {
// Not supported for now
false
}
}

impl Drop for LinkUnicastUnixSocketStream {
Expand Down
11 changes: 6 additions & 5 deletions io/zenoh-links/zenoh-link-ws/src/unicast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ impl LinkUnicastTrait for LinkUnicastWs {
*WS_DEFAULT_MTU
}

#[inline(always)]
fn get_interfaces(&self) -> Vec<String> {
// Not supported for now
vec![]
}

#[inline(always)]
fn is_reliable(&self) -> bool {
true
Expand All @@ -216,11 +222,6 @@ impl LinkUnicastTrait for LinkUnicastWs {
fn is_streamed(&self) -> bool {
false
}

fn is_matched_to_interface(&self, _interface: &str) -> bool {
// Not supported for now
false
}
}

impl Drop for LinkUnicastWs {
Expand Down
10 changes: 7 additions & 3 deletions zenoh/src/net/routing/interceptor/downsampling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
use crate::net::routing::interceptor::*;
use std::sync::{Arc, Mutex};
use zenoh_config::DownsamplerConf;
use zenoh_link::LinkUnicast;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;

// TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option)
Expand Down Expand Up @@ -108,10 +107,15 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor {
transport: &TransportUnicast,
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New transport unicast {:?}", transport);
if let Some(interface) = self.conf.interface {
if let Some(interface) = self.conf.interface.clone() {
log::debug!("New downsampler transport unicast interface: {}", interface);
if let Ok(links) = transport.get_links() {
for link in links {
if !(link as LinkUnicast).is_matched_to_interface(interface) {
log::debug!(
"New downsampler transport unicast interfaces: {:?}",
link.interfaces
);
if !link.interfaces.contains(&interface) {
return (None, None);
}
}
Expand Down
Loading

0 comments on commit 647c3ab

Please sign in to comment.