From c344fc6c28fb44b8fbbbc3ec466ff1ce9406cf56 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 25 Jan 2024 00:00:07 +0100 Subject: [PATCH 01/24] Add downsampling interceptor POC --- commons/zenoh-config/src/lib.rs | 11 ++ zenoh/src/net/routing/dispatcher/tables.rs | 16 +++ .../net/routing/interceptor/downsampling.rs | 126 ++++++++++++++++++ zenoh/src/net/routing/interceptor/mod.rs | 17 ++- zenoh/tests/interceptors.rs | 79 +++++++++++ 5 files changed, 244 insertions(+), 5 deletions(-) create mode 100644 zenoh/src/net/routing/interceptor/downsampling.rs create mode 100644 zenoh/tests/interceptors.rs diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 29a87a43ee..de1ff69cfc 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -70,6 +70,12 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct DownsamplerConf { + pub keyexpr: OwnedKeyExpr, + pub threshold_ms: u64, +} + pub trait ConfigValidator: Send + Sync { fn check_config( &self, @@ -405,6 +411,11 @@ validated_struct::validator! { }, }, + /// Configuration of the downsampling. + pub downsampling: #[derive(Default)] + DownsamplingConf { + downsamples: Vec, + }, /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. plugins_search_dirs: Vec, // TODO (low-prio): Switch this String to a PathBuf? (applies to other paths in the config as well) diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 274b600024..510ea7e29b 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -84,6 +84,22 @@ impl Tables { // let queries_default_timeout = // Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); let hat_code = hat::new_hat(whatami, config); + + //TODO(sashacmc): add interceptors config reloading there or incapsulate in the interceptors, but it + //will require interface changes + // + //// config reloading sample: + //let cfg_rx = config.subscribe(); + //task::spawn({ + // async move { + // while let Ok(change) = cfg_rx.recv_async().await { + // let change = change.strip_prefix('/').unwrap_or(&change); + // if !change.starts_with("plugins") { + // continue; + // } + // } + //}); + Tables { zid, whatami, diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs new file mode 100644 index 0000000000..b6a0de67c1 --- /dev/null +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -0,0 +1,126 @@ +// +// Copyright (c) 2024 ZettaScale Technology +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License 2.0 which is available at +// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// +// Contributors: +// ZettaScale Zenoh Team, +// + +//! ⚠️ WARNING ⚠️ +//! +//! This module is intended for Zenoh's internal use. +//! +//! [Click here for Zenoh's documentation](../zenoh/index.html) + +use crate::net::routing::interceptor::*; +use crate::KeyExpr; +use std::sync::{Arc, Mutex}; +use zenoh_config::DownsamplerConf; + +pub(crate) struct IngressMsgDownsampler { + conf: DownsamplerConf, + latest_message_timestamp: Arc>, +} + +impl InterceptorTrait for IngressMsgDownsampler { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + if let Some(full_expr) = ctx.full_expr() { + match KeyExpr::new(full_expr) { + Ok(keyexpr) => { + if !self.conf.keyexpr.intersects(&keyexpr) { + return Some(ctx); + } + + let timestamp = std::time::Instant::now(); + let mut latest_message_timestamp = + self.latest_message_timestamp.lock().unwrap(); + + if timestamp - *latest_message_timestamp + >= std::time::Duration::from_millis(self.conf.threshold_ms) + { + *latest_message_timestamp = timestamp; + log::trace!("Interceptor: Passed threshold, passing."); + Some(ctx) + } else { + log::trace!("Interceptor: Skipped due to threshold."); + None + } + } + Err(_) => { + log::warn!("Interceptor: Wrong KeyExpr, passing."); + Some(ctx) + } + } + } else { + // message has no key expr + Some(ctx) + } + } +} + +impl IngressMsgDownsampler { + pub fn new(conf: DownsamplerConf) -> Self { + // TODO (sashacmc): I need just := 0, but how??? + let zero_ts = + std::time::Instant::now() - std::time::Duration::from_micros(conf.threshold_ms); + Self { + conf, + latest_message_timestamp: Arc::new(Mutex::new(zero_ts)), + } + } +} + +pub(crate) struct EgressMsgDownsampler {} + +impl InterceptorTrait for EgressMsgDownsampler { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + // TODO(sashacmc): Do we need Ergress Downsampler? + Some(ctx) + } +} + +pub struct DownsamplerInterceptor { + conf: DownsamplerConf, +} + +impl DownsamplerInterceptor { + pub fn new(conf: DownsamplerConf) -> Self { + log::debug!("DownsamplerInterceptor enabled: {:?}", conf); + Self { conf } + } +} + +impl InterceptorFactoryTrait for DownsamplerInterceptor { + fn new_transport_unicast( + &self, + transport: &TransportUnicast, + ) -> (Option, Option) { + log::debug!("New transport unicast {:?}", transport); + ( + Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))), + Some(Box::new(EgressMsgDownsampler {})), + ) + } + + fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option { + log::debug!("New transport multicast {:?}", transport); + Some(Box::new(EgressMsgDownsampler {})) + } + + fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option { + log::debug!("New peer multicast {:?}", transport); + Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))) + } +} diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index 7503580405..e5c044116b 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -17,11 +17,15 @@ //! This module is intended for Zenoh's internal use. //! //! [Click here for Zenoh's documentation](../zenoh/index.html) + use super::RoutingContext; use zenoh_config::Config; use zenoh_protocol::network::NetworkMessage; use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast}; +pub mod downsampling; +use crate::net::routing::interceptor::downsampling::DownsamplerInterceptor; + pub(crate) trait InterceptorTrait { fn intercept( &self, @@ -44,11 +48,14 @@ pub(crate) trait InterceptorFactoryTrait { pub(crate) type InterceptorFactory = Box; -pub(crate) fn interceptor_factories(_config: &Config) -> Vec { - // Add interceptors here - // @TODO build the list of intercetors with the correct order from the config - // vec![Box::new(LoggerInterceptor {})] - vec![] +pub(crate) fn interceptor_factories(config: &Config) -> Vec { + let mut res: Vec = vec![]; + + for ds in config.downsampling().downsamples() { + res.push(Box::new(DownsamplerInterceptor::new(ds.clone()))) + } + res.push(Box::new(LoggerInterceptor {})); + res } pub(crate) struct InterceptorsChain { diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs new file mode 100644 index 0000000000..9ab78747ed --- /dev/null +++ b/zenoh/tests/interceptors.rs @@ -0,0 +1,79 @@ +use std::sync::{Arc, Mutex}; + +#[test] +fn downsampling() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare publisher + let mut config = Config::default(); + config + .insert_json5( + "downsampling/downsamples", + r#" + [ + { + keyexpr: "test/downsamples/r100", + threshold_ms: 100, + }, + { + keyexpr: "test/downsamples/r50", + threshold_ms: 50, + }, + ] + "#, + ) + .unwrap(); + + // declare subscriber + let zenoh_sub = zenoh::open(config).res().unwrap(); + + let last_time_r100 = Arc::new(Mutex::new( + std::time::Instant::now() - std::time::Duration::from_millis(100), + )); + let last_time_r50 = Arc::new(Mutex::new( + std::time::Instant::now() - std::time::Duration::from_millis(50), + )); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples/*") + .callback(move |sample| { + let curr_time = std::time::Instant::now(); + if sample.key_expr.as_str() == "test/downsamples/r100" { + let mut last_time = last_time_r100.lock().unwrap(); + let interval = (curr_time - *last_time).as_millis() + 1; + *last_time = curr_time; + println!("interval 100: {}", interval); + assert!(interval >= 100); + } else if sample.key_expr.as_str() == "test/downsamples/r50" { + let mut last_time = last_time_r50.lock().unwrap(); + let interval = (curr_time - *last_time).as_millis() + 1; + *last_time = curr_time; + println!("interval 50: {}", interval); + assert!(interval >= 50); + } + }) + .res() + .unwrap(); + + let zenoh_pub = zenoh::open(Config::default()).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples/r100") + .res() + .unwrap(); + + let publisher_r50 = zenoh_pub + .declare_publisher("test/downsamples/r50") + .res() + .unwrap(); + + let interval = std::time::Duration::from_millis(1); + for i in 0..1000 { + println!("message {}", i); + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_r50.put(format!("message {}", i)).res().unwrap(); + + std::thread::sleep(interval); + } +} From 959af2675a6f349f4839833fd40f8f7f38ed90a2 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 26 Jan 2024 18:14:26 +0100 Subject: [PATCH 02/24] Rework downsampling interceptors for egress + refactoring --- commons/zenoh-config/src/lib.rs | 6 +- zenoh/src/net/routing/dispatcher/tables.rs | 2 +- .../net/routing/interceptor/downsampling.rs | 105 +++++++++--------- zenoh/src/net/routing/mod.rs | 16 +++ zenoh/src/net/runtime/adminspace.rs | 13 +++ zenoh/tests/interceptors.rs | 43 ++++--- 6 files changed, 107 insertions(+), 78 deletions(-) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index de1ff69cfc..5eb0aa1af2 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -72,8 +72,10 @@ pub type SecretValue = Secret; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct DownsamplerConf { - pub keyexpr: OwnedKeyExpr, - pub threshold_ms: u64, + pub keyexpr: Option, + pub interface: Option, + pub strategy: Option, + pub threshold_ms: Option, } pub trait ConfigValidator: Send + Sync { diff --git a/zenoh/src/net/routing/dispatcher/tables.rs b/zenoh/src/net/routing/dispatcher/tables.rs index 510ea7e29b..6d9b4f50d5 100644 --- a/zenoh/src/net/routing/dispatcher/tables.rs +++ b/zenoh/src/net/routing/dispatcher/tables.rs @@ -85,7 +85,7 @@ impl Tables { // Duration::from_millis(unwrap_or_default!(config.queries_default_timeout())); let hat_code = hat::new_hat(whatami, config); - //TODO(sashacmc): add interceptors config reloading there or incapsulate in the interceptors, but it + //TODO(sashacmc): add interceptors config reloading there or encapsulate in the interceptors, but it //will require interface changes // //// config reloading sample: diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index b6a0de67c1..6f433f428c 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -19,78 +19,77 @@ //! [Click here for Zenoh's documentation](../zenoh/index.html) use crate::net::routing::interceptor::*; -use crate::KeyExpr; use std::sync::{Arc, Mutex}; use zenoh_config::DownsamplerConf; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; -pub(crate) struct IngressMsgDownsampler { - conf: DownsamplerConf, - latest_message_timestamp: Arc>, -} +// TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option) + +pub(crate) struct IngressMsgDownsampler {} impl InterceptorTrait for IngressMsgDownsampler { fn intercept( &self, ctx: RoutingContext, ) -> Option> { - if let Some(full_expr) = ctx.full_expr() { - match KeyExpr::new(full_expr) { - Ok(keyexpr) => { - if !self.conf.keyexpr.intersects(&keyexpr) { - return Some(ctx); - } + Some(ctx) + } +} - let timestamp = std::time::Instant::now(); - let mut latest_message_timestamp = - self.latest_message_timestamp.lock().unwrap(); +pub(crate) struct EgressMsgDownsampler { + keyexpr: Option, + threshold: std::time::Duration, + latest_message_timestamp: Arc>, +} - if timestamp - *latest_message_timestamp - >= std::time::Duration::from_millis(self.conf.threshold_ms) - { - *latest_message_timestamp = timestamp; - log::trace!("Interceptor: Passed threshold, passing."); - Some(ctx) - } else { - log::trace!("Interceptor: Skipped due to threshold."); - None - } - } - Err(_) => { - log::warn!("Interceptor: Wrong KeyExpr, passing."); - Some(ctx) +impl InterceptorTrait for EgressMsgDownsampler { + fn intercept( + &self, + ctx: RoutingContext, + ) -> Option> { + if let Some(cfg_keyexpr) = self.keyexpr.as_ref() { + if let Some(keyexpr) = ctx.full_key_expr() { + if !cfg_keyexpr.intersects(&keyexpr) { + return Some(ctx); } + } else { + return Some(ctx); } - } else { - // message has no key expr + } + + let timestamp = std::time::Instant::now(); + let mut latest_message_timestamp = self.latest_message_timestamp.lock().unwrap(); + + if timestamp - *latest_message_timestamp >= self.threshold { + *latest_message_timestamp = timestamp; + log::debug!("Interceptor: Passed threshold, passing."); Some(ctx) + } else { + log::debug!("Interceptor: Skipped due to threshold."); + None } } } -impl IngressMsgDownsampler { +impl EgressMsgDownsampler { pub fn new(conf: DownsamplerConf) -> Self { - // TODO (sashacmc): I need just := 0, but how??? - let zero_ts = - std::time::Instant::now() - std::time::Duration::from_micros(conf.threshold_ms); - Self { - conf, - latest_message_timestamp: Arc::new(Mutex::new(zero_ts)), + if let Some(threshold_ms) = conf.threshold_ms { + let threshold = std::time::Duration::from_millis(threshold_ms); + Self { + keyexpr: conf.keyexpr, + threshold, + // TODO (sashacmc): I need just := 0, but how??? + latest_message_timestamp: Arc::new(Mutex::new( + std::time::Instant::now() - threshold, + )), + } + } else { + // TODO (sashacmc): how correctly process an error? + panic!("Rate limit downsampler shoud have a threshold_ms parameter"); } } } -pub(crate) struct EgressMsgDownsampler {} - -impl InterceptorTrait for EgressMsgDownsampler { - fn intercept( - &self, - ctx: RoutingContext, - ) -> Option> { - // TODO(sashacmc): Do we need Ergress Downsampler? - Some(ctx) - } -} - pub struct DownsamplerInterceptor { conf: DownsamplerConf, } @@ -109,18 +108,18 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { ) -> (Option, Option) { log::debug!("New transport unicast {:?}", transport); ( - Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))), - Some(Box::new(EgressMsgDownsampler {})), + Some(Box::new(IngressMsgDownsampler {})), + Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))), ) } fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option { log::debug!("New transport multicast {:?}", transport); - Some(Box::new(EgressMsgDownsampler {})) + Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))) } fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option { log::debug!("New peer multicast {:?}", transport); - Some(Box::new(IngressMsgDownsampler::new(self.conf.clone()))) + Some(Box::new(IngressMsgDownsampler {})) } } diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 0b069c1337..37a698263b 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -24,6 +24,7 @@ pub mod router; use std::{cell::OnceCell, sync::Arc}; +use zenoh_protocol::core::key_expr::OwnedKeyExpr; use zenoh_protocol::{core::WireExpr, network::NetworkMessage}; use self::{dispatcher::face::Face, router::Resource}; @@ -168,4 +169,19 @@ impl RoutingContext { } None } + + //TODO (sashacmc): maybe rewrite full_expr to don't convert back? + #[inline] + pub(crate) fn full_key_expr(&self) -> Option { + match self.full_expr() { + Some(full_expr) => { + if let Ok(keyexpr) = OwnedKeyExpr::new(full_expr) { + Some(keyexpr) + } else { + None + } + } + None => None, + } + } } diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index f6fb13e76e..284004f684 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -170,6 +170,19 @@ impl AdminSpace { .unwrap(), Arc::new(plugins_status), ); + //TODO(sashacmc): Adminspace configuration + //handlers.insert( + // format!("@/router/{zid_str}/status/downsampling/**") + // .try_into() + // .unwrap(), + // Arc::new(downsampling_status), + //); + //handlers.insert( + // format!("@/router/{zid_str}/status/prefixes/**") + // .try_into() + // .unwrap(), + // Arc::new(downsampling_status), + //); let mut active_plugins = plugins_mgr .started_plugins_iter() diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 9ab78747ed..8765c9b6b7 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -6,28 +6,8 @@ fn downsampling() { use zenoh::prelude::sync::*; - // declare publisher - let mut config = Config::default(); - config - .insert_json5( - "downsampling/downsamples", - r#" - [ - { - keyexpr: "test/downsamples/r100", - threshold_ms: 100, - }, - { - keyexpr: "test/downsamples/r50", - threshold_ms: 50, - }, - ] - "#, - ) - .unwrap(); - // declare subscriber - let zenoh_sub = zenoh::open(config).res().unwrap(); + let zenoh_sub = zenoh::open(Config::default()).res().unwrap(); let last_time_r100 = Arc::new(Mutex::new( std::time::Instant::now() - std::time::Duration::from_millis(100), @@ -57,7 +37,26 @@ fn downsampling() { .res() .unwrap(); - let zenoh_pub = zenoh::open(Config::default()).res().unwrap(); + // declare publisher + let mut config = Config::default(); + config + .insert_json5( + "downsampling/downsamples", + r#" + [ + { + keyexpr: "test/downsamples/r100", + threshold_ms: 100, + }, + { + keyexpr: "test/downsamples/r50", + threshold_ms: 50, + }, + ] + "#, + ) + .unwrap(); + let zenoh_pub = zenoh::open(config).res().unwrap(); let publisher_r100 = zenoh_pub .declare_publisher("test/downsamples/r100") .res() From 93d689ca41ae78245f7e270e3332d84bccd228d6 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Mon, 29 Jan 2024 18:51:25 +0100 Subject: [PATCH 03/24] [skip ci] Add interface check (don't work yet) --- io/zenoh-link-commons/src/lib.rs | 2 ++ io/zenoh-link-commons/src/unicast.rs | 1 + io/zenoh-links/zenoh-link-quic/src/unicast.rs | 5 +++++ io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 11 +++++++++++ io/zenoh-links/zenoh-link-tls/src/unicast.rs | 5 +++++ io/zenoh-links/zenoh-link-udp/src/unicast.rs | 5 +++++ .../zenoh-link-unixsock_stream/src/unicast.rs | 5 +++++ io/zenoh-links/zenoh-link-ws/src/unicast.rs | 5 +++++ zenoh/src/net/routing/interceptor/downsampling.rs | 10 ++++++++++ 9 files changed, 49 insertions(+) diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 790f4792a4..8191a65dca 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -43,6 +43,8 @@ pub struct Link { pub mtu: u16, pub is_reliable: bool, pub is_streamed: bool, + // there no method to check interface + // may be will be better just add interface there in place of method? } #[async_trait] diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index d44686ff50..13858e88d5 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -45,6 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync { fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; + fn is_matched_to_interface(&self, interface: &str) -> bool; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 2b1c59ad23..f023a2f976 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -152,6 +152,11 @@ impl LinkUnicastTrait for LinkUnicastQuic { fn is_streamed(&self) -> bool { true } + + fn is_matched_to_interface(&self, _interface: &str) -> bool { + // Not supported for now + false + } } impl Drop for LinkUnicastQuic { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 3960b91228..4626172756 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -153,6 +153,17 @@ impl LinkUnicastTrait for LinkUnicastTcp { fn is_streamed(&self) -> bool { true } + + fn is_matched_to_interface(&self, name: &str) -> bool { + if let Ok(opt_addr) = zenoh_util::net::get_interface(name.trim()) { + if let Some(addr) = opt_addr { + if addr == self.src_addr.ip() { + return true; + } + } + } + false + } } impl Drop for LinkUnicastTcp { diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index 7761195e4b..b8c2378cd1 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -204,6 +204,11 @@ impl LinkUnicastTrait for LinkUnicastTls { fn is_streamed(&self) -> bool { true } + + fn is_matched_to_interface(&self, _interface: &str) -> bool { + // Not supported for now + false + } } impl Drop for LinkUnicastTls { diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 585442ed71..2d61fc2dcb 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -217,6 +217,11 @@ impl LinkUnicastTrait for LinkUnicastUdp { fn is_streamed(&self) -> bool { false } + + fn is_matched_to_interface(&self, _interface: &str) -> bool { + // Not supported for now + false + } } impl fmt::Display for LinkUnicastUdp { diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index e4d751344f..57328d9460 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -124,6 +124,11 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { fn is_streamed(&self) -> bool { true } + + fn is_matched_to_interface(&self, _interface: &str) -> bool { + // Not supported for now + false + } } impl Drop for LinkUnicastUnixSocketStream { diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index 2238dcb4a6..ad7b31b342 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -216,6 +216,11 @@ impl LinkUnicastTrait for LinkUnicastWs { fn is_streamed(&self) -> bool { false } + + fn is_matched_to_interface(&self, _interface: &str) -> bool { + // Not supported for now + false + } } impl Drop for LinkUnicastWs { diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 6f433f428c..ed4b2ac81a 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -21,6 +21,7 @@ use crate::net::routing::interceptor::*; use std::sync::{Arc, Mutex}; use zenoh_config::DownsamplerConf; +use zenoh_link::LinkUnicast; use zenoh_protocol::core::key_expr::OwnedKeyExpr; // TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option) @@ -107,6 +108,15 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { transport: &TransportUnicast, ) -> (Option, Option) { log::debug!("New transport unicast {:?}", transport); + if let Some(interface) = self.conf.interface { + if let Ok(links) = transport.get_links() { + for link in links { + if !(link as LinkUnicast).is_matched_to_interface(interface) { + return (None, None); + } + } + } + }; ( Some(Box::new(IngressMsgDownsampler {})), Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))), From a0d489ce3ae8b1fc87fb92f6bb0b2bd18c9a9cb5 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 30 Jan 2024 18:58:19 +0100 Subject: [PATCH 04/24] Implement interface filtering (unix only) --- commons/zenoh-util/src/std_only/net/mod.rs | 67 ++++++++++ io/zenoh-link-commons/src/lib.rs | 7 +- io/zenoh-link-commons/src/unicast.rs | 4 +- io/zenoh-links/zenoh-link-quic/src/unicast.rs | 11 +- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 18 ++- io/zenoh-links/zenoh-link-tls/src/unicast.rs | 11 +- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 11 +- .../zenoh-link-unixsock_stream/src/unicast.rs | 11 +- io/zenoh-links/zenoh-link-ws/src/unicast.rs | 11 +- .../net/routing/interceptor/downsampling.rs | 10 +- zenoh/tests/interceptors.rs | 114 ++++++++++++++++-- 11 files changed, 223 insertions(+), 52 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index fd5a215952..9562c97ded 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -424,6 +424,73 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { } } +pub fn get_interface_by_addr(addr: IpAddr) -> Vec { + #[cfg(unix)] + { + if addr.is_unspecified() { + pnet_datalink::interfaces() + .iter() + .map(|iface| iface.name.clone()) + .collect::>() + } else { + pnet_datalink::interfaces() + .iter() + .filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr)) + .map(|iface| iface.name.clone()) + .collect::>() + } + } + #[cfg(windows)] + { + // TODO(sashacmc): check and fix + unsafe { + use crate::ffi; + use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; + + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + winapi::shared::ws2def::AF_INET.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret) + } + + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); + while let Some(ucast_addr) = next_ucast_addr { + if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { + if ifaddr.ip() == addr { + return Ok(iface.AdapterName); + } + } + next_ucast_addr = ucast_addr.Next.as_ref(); + } + next_iface = iface.Next.as_ref(); + } + bail!("No interface found with address {addr}") + } + } +} + pub fn get_ipv4_ipaddrs() -> Vec { get_local_addresses() .unwrap_or_else(|_| vec![]) diff --git a/io/zenoh-link-commons/src/lib.rs b/io/zenoh-link-commons/src/lib.rs index 8191a65dca..2d5bf243f1 100644 --- a/io/zenoh-link-commons/src/lib.rs +++ b/io/zenoh-link-commons/src/lib.rs @@ -23,7 +23,7 @@ extern crate alloc; mod multicast; mod unicast; -use alloc::{borrow::ToOwned, boxed::Box, string::String}; +use alloc::{borrow::ToOwned, boxed::Box, string::String, vec, vec::Vec}; use async_trait::async_trait; use core::{cmp::PartialEq, fmt, hash::Hash}; pub use multicast::*; @@ -43,8 +43,7 @@ pub struct Link { pub mtu: u16, pub is_reliable: bool, pub is_streamed: bool, - // there no method to check interface - // may be will be better just add interface there in place of method? + pub interfaces: Vec, } #[async_trait] @@ -73,6 +72,7 @@ impl From<&LinkUnicast> for Link { mtu: link.get_mtu(), is_reliable: link.is_reliable(), is_streamed: link.is_streamed(), + interfaces: link.get_interfaces(), } } } @@ -92,6 +92,7 @@ impl From<&LinkMulticast> for Link { mtu: link.get_mtu(), is_reliable: link.is_reliable(), is_streamed: false, + interfaces: vec![], } } } diff --git a/io/zenoh-link-commons/src/unicast.rs b/io/zenoh-link-commons/src/unicast.rs index 13858e88d5..3b4a69b707 100644 --- a/io/zenoh-link-commons/src/unicast.rs +++ b/io/zenoh-link-commons/src/unicast.rs @@ -11,7 +11,7 @@ // Contributors: // ZettaScale Zenoh Team, // -use alloc::{boxed::Box, sync::Arc, vec::Vec}; +use alloc::{boxed::Box, string::String, sync::Arc, vec::Vec}; use async_trait::async_trait; use core::{ fmt, @@ -45,7 +45,7 @@ pub trait LinkUnicastTrait: Send + Sync { fn get_dst(&self) -> &Locator; fn is_reliable(&self) -> bool; fn is_streamed(&self) -> bool; - fn is_matched_to_interface(&self, interface: &str) -> bool; + fn get_interfaces(&self) -> Vec; async fn write(&self, buffer: &[u8]) -> ZResult; async fn write_all(&self, buffer: &[u8]) -> ZResult<()>; async fn read(&self, buffer: &mut [u8]) -> ZResult; diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index f023a2f976..916252edfd 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -138,6 +138,12 @@ impl LinkUnicastTrait for LinkUnicastQuic { &self.dst_locator } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn get_mtu(&self) -> u16 { *QUIC_DEFAULT_MTU @@ -152,11 +158,6 @@ impl LinkUnicastTrait for LinkUnicastQuic { fn is_streamed(&self) -> bool { true } - - fn is_matched_to_interface(&self, _interface: &str) -> bool { - // Not supported for now - false - } } impl Drop for LinkUnicastQuic { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index 4626172756..c65bb6e979 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -139,6 +139,13 @@ impl LinkUnicastTrait for LinkUnicastTcp { &self.dst_locator } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + let res = zenoh_util::net::get_interface_by_addr(self.src_addr.ip()); + log::debug!("get_interfaces for {:?}: {:?}", self.src_addr.ip(), res); + res + } + #[inline(always)] fn get_mtu(&self) -> u16 { *TCP_DEFAULT_MTU @@ -153,17 +160,6 @@ impl LinkUnicastTrait for LinkUnicastTcp { fn is_streamed(&self) -> bool { true } - - fn is_matched_to_interface(&self, name: &str) -> bool { - if let Ok(opt_addr) = zenoh_util::net::get_interface(name.trim()) { - if let Some(addr) = opt_addr { - if addr == self.src_addr.ip() { - return true; - } - } - } - false - } } impl Drop for LinkUnicastTcp { diff --git a/io/zenoh-links/zenoh-link-tls/src/unicast.rs b/io/zenoh-links/zenoh-link-tls/src/unicast.rs index b8c2378cd1..de3b616d05 100644 --- a/io/zenoh-links/zenoh-link-tls/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tls/src/unicast.rs @@ -195,6 +195,12 @@ impl LinkUnicastTrait for LinkUnicastTls { *TLS_DEFAULT_MTU } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true @@ -204,11 +210,6 @@ impl LinkUnicastTrait for LinkUnicastTls { fn is_streamed(&self) -> bool { true } - - fn is_matched_to_interface(&self, _interface: &str) -> bool { - // Not supported for now - false - } } impl Drop for LinkUnicastTls { diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 2d61fc2dcb..17b49d7088 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -203,6 +203,12 @@ impl LinkUnicastTrait for LinkUnicastUdp { &self.dst_locator } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn get_mtu(&self) -> u16 { *UDP_DEFAULT_MTU @@ -217,11 +223,6 @@ impl LinkUnicastTrait for LinkUnicastUdp { fn is_streamed(&self) -> bool { false } - - fn is_matched_to_interface(&self, _interface: &str) -> bool { - // Not supported for now - false - } } impl fmt::Display for LinkUnicastUdp { diff --git a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs index 57328d9460..5e2e4b0d43 100644 --- a/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixsock_stream/src/unicast.rs @@ -115,6 +115,12 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { *UNIXSOCKSTREAM_DEFAULT_MTU } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true @@ -124,11 +130,6 @@ impl LinkUnicastTrait for LinkUnicastUnixSocketStream { fn is_streamed(&self) -> bool { true } - - fn is_matched_to_interface(&self, _interface: &str) -> bool { - // Not supported for now - false - } } impl Drop for LinkUnicastUnixSocketStream { diff --git a/io/zenoh-links/zenoh-link-ws/src/unicast.rs b/io/zenoh-links/zenoh-link-ws/src/unicast.rs index ad7b31b342..9252f9e326 100644 --- a/io/zenoh-links/zenoh-link-ws/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-ws/src/unicast.rs @@ -207,6 +207,12 @@ impl LinkUnicastTrait for LinkUnicastWs { *WS_DEFAULT_MTU } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true @@ -216,11 +222,6 @@ impl LinkUnicastTrait for LinkUnicastWs { fn is_streamed(&self) -> bool { false } - - fn is_matched_to_interface(&self, _interface: &str) -> bool { - // Not supported for now - false - } } impl Drop for LinkUnicastWs { diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index ed4b2ac81a..522b1a9f93 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -21,7 +21,6 @@ use crate::net::routing::interceptor::*; use std::sync::{Arc, Mutex}; use zenoh_config::DownsamplerConf; -use zenoh_link::LinkUnicast; use zenoh_protocol::core::key_expr::OwnedKeyExpr; // TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option) @@ -108,10 +107,15 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { transport: &TransportUnicast, ) -> (Option, Option) { log::debug!("New transport unicast {:?}", transport); - if let Some(interface) = self.conf.interface { + if let Some(interface) = self.conf.interface.clone() { + log::debug!("New downsampler transport unicast interface: {}", interface); if let Ok(links) = transport.get_links() { for link in links { - if !(link as LinkUnicast).is_matched_to_interface(interface) { + log::debug!( + "New downsampler transport unicast interfaces: {:?}", + link.interfaces + ); + if !link.interfaces.contains(&interface) { return (None, None); } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 8765c9b6b7..4838b8f587 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -1,7 +1,7 @@ use std::sync::{Arc, Mutex}; #[test] -fn downsampling() { +fn downsampling_by_keyexpr() { let _ = env_logger::builder().is_test(true).try_init(); use zenoh::prelude::sync::*; @@ -16,17 +16,22 @@ fn downsampling() { std::time::Instant::now() - std::time::Duration::from_millis(50), )); + let total_count = Arc::new(Mutex::new(0)); + let total_count_clone = total_count.clone(); + let _sub = zenoh_sub - .declare_subscriber("test/downsamples/*") + .declare_subscriber("test/downsamples_by_keyexp/*") .callback(move |sample| { + let mut count = total_count_clone.lock().unwrap(); + *count += 1; let curr_time = std::time::Instant::now(); - if sample.key_expr.as_str() == "test/downsamples/r100" { + if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { let mut last_time = last_time_r100.lock().unwrap(); let interval = (curr_time - *last_time).as_millis() + 1; *last_time = curr_time; println!("interval 100: {}", interval); assert!(interval >= 100); - } else if sample.key_expr.as_str() == "test/downsamples/r50" { + } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { let mut last_time = last_time_r50.lock().unwrap(); let interval = (curr_time - *last_time).as_millis() + 1; *last_time = curr_time; @@ -45,11 +50,11 @@ fn downsampling() { r#" [ { - keyexpr: "test/downsamples/r100", + keyexpr: "test/downsamples_by_keyexp/r100", threshold_ms: 100, }, { - keyexpr: "test/downsamples/r50", + keyexpr: "test/downsamples_by_keyexp/r50", threshold_ms: 50, }, ] @@ -58,12 +63,17 @@ fn downsampling() { .unwrap(); let zenoh_pub = zenoh::open(config).res().unwrap(); let publisher_r100 = zenoh_pub - .declare_publisher("test/downsamples/r100") + .declare_publisher("test/downsamples_by_keyexp/r100") .res() .unwrap(); let publisher_r50 = zenoh_pub - .declare_publisher("test/downsamples/r50") + .declare_publisher("test/downsamples_by_keyexp/r50") + .res() + .unwrap(); + + let publisher_all = zenoh_pub + .declare_publisher("test/downsamples_by_keyexp/all") .res() .unwrap(); @@ -72,7 +82,95 @@ fn downsampling() { println!("message {}", i); publisher_r100.put(format!("message {}", i)).res().unwrap(); publisher_r50.put(format!("message {}", i)).res().unwrap(); + publisher_all.put(format!("message {}", i)).res().unwrap(); + + std::thread::sleep(interval); + } + + assert!(*(total_count.lock().unwrap()) >= 1000); +} + +#[cfg(unix)] +#[test] +fn downsampling_by_interface() { + let _ = env_logger::builder().is_test(true).try_init(); + + use zenoh::prelude::sync::*; + + // declare subscriber + let mut config_sub = Config::default(); + config_sub + .insert_json5("listen/endpoints", r#"["tcp/127.0.0.1:7447"]"#) + .unwrap(); + let zenoh_sub = zenoh::open(config_sub).res().unwrap(); + + let last_time_r100 = Arc::new(Mutex::new( + std::time::Instant::now() - std::time::Duration::from_millis(100), + )); + let total_count = Arc::new(Mutex::new(0)); + let total_count_clone = total_count.clone(); + + let _sub = zenoh_sub + .declare_subscriber("test/downsamples_by_interface/*") + .callback(move |sample| { + let mut count = total_count_clone.lock().unwrap(); + *count += 1; + let curr_time = std::time::Instant::now(); + if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" { + let mut last_time = last_time_r100.lock().unwrap(); + let interval = (curr_time - *last_time).as_millis() + 1; + *last_time = curr_time; + println!("interval 100: {}", interval); + assert!(interval >= 100); + } + }) + .res() + .unwrap(); + + // declare publisher + let mut config_pub = Config::default(); + config_pub + .insert_json5("connect/endpoints", r#"["tcp/127.0.0.1:7447"]"#) + .unwrap(); + config_pub + .insert_json5( + "downsampling/downsamples", + r#" + [ + { + keyexpr: "test/downsamples_by_interface/r100", + interface: "lo", + threshold_ms: 100, + }, + { + keyexpr: "test/downsamples_by_interface/all", + interface: "some_unknown_interface", + threshold_ms: 100, + }, + ] + "#, + ) + .unwrap(); + + let zenoh_pub = zenoh::open(config_pub).res().unwrap(); + let publisher_r100 = zenoh_pub + .declare_publisher("test/downsamples_by_interface/r100") + .res() + .unwrap(); + + let publisher_all = zenoh_pub + .declare_publisher("test/downsamples_by_interface/all") + .res() + .unwrap(); + + let interval = std::time::Duration::from_millis(1); + for i in 0..1000 { + println!("message {}", i); + publisher_r100.put(format!("message {}", i)).res().unwrap(); + publisher_all.put(format!("message {}", i)).res().unwrap(); std::thread::sleep(interval); } + + assert!(*(total_count.lock().unwrap()) >= 1000); } From 00473a9f328bb70ccf794ef382988bd3f744c698 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Tue, 30 Jan 2024 19:21:23 +0100 Subject: [PATCH 05/24] Commit missed files --- io/zenoh-links/zenoh-link-quic/src/unicast.rs | 10 +++++----- io/zenoh-links/zenoh-link-serial/src/unicast.rs | 6 ++++++ io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 10 +++++----- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 10 +++++----- io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs | 6 ++++++ 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/io/zenoh-links/zenoh-link-quic/src/unicast.rs b/io/zenoh-links/zenoh-link-quic/src/unicast.rs index 916252edfd..856c0f37d8 100644 --- a/io/zenoh-links/zenoh-link-quic/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-quic/src/unicast.rs @@ -139,14 +139,14 @@ impl LinkUnicastTrait for LinkUnicastQuic { } #[inline(always)] - fn get_interfaces(&self) -> Vec { - // Not supported for now - vec![] + fn get_mtu(&self) -> u16 { + *QUIC_DEFAULT_MTU } #[inline(always)] - fn get_mtu(&self) -> u16 { - *QUIC_DEFAULT_MTU + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] } #[inline(always)] diff --git a/io/zenoh-links/zenoh-link-serial/src/unicast.rs b/io/zenoh-links/zenoh-link-serial/src/unicast.rs index 6ec8f8f279..25d0f6306a 100644 --- a/io/zenoh-links/zenoh-link-serial/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-serial/src/unicast.rs @@ -181,6 +181,12 @@ impl LinkUnicastTrait for LinkUnicastSerial { *SERIAL_DEFAULT_MTU } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { false diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index c65bb6e979..a441ba5ab9 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -139,6 +139,11 @@ impl LinkUnicastTrait for LinkUnicastTcp { &self.dst_locator } + #[inline(always)] + fn get_mtu(&self) -> u16 { + *TCP_DEFAULT_MTU + } + #[inline(always)] fn get_interfaces(&self) -> Vec { let res = zenoh_util::net::get_interface_by_addr(self.src_addr.ip()); @@ -146,11 +151,6 @@ impl LinkUnicastTrait for LinkUnicastTcp { res } - #[inline(always)] - fn get_mtu(&self) -> u16 { - *TCP_DEFAULT_MTU - } - #[inline(always)] fn is_reliable(&self) -> bool { true diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index 17b49d7088..fb14580043 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -204,14 +204,14 @@ impl LinkUnicastTrait for LinkUnicastUdp { } #[inline(always)] - fn get_interfaces(&self) -> Vec { - // Not supported for now - vec![] + fn get_mtu(&self) -> u16 { + *UDP_DEFAULT_MTU } #[inline(always)] - fn get_mtu(&self) -> u16 { - *UDP_DEFAULT_MTU + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] } #[inline(always)] diff --git a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs index 156698d195..2add468eef 100644 --- a/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs +++ b/io/zenoh-links/zenoh-link-unixpipe/src/unix/unicast.rs @@ -492,6 +492,12 @@ impl LinkUnicastTrait for UnicastPipe { LINUX_PIPE_MAX_MTU } + #[inline(always)] + fn get_interfaces(&self) -> Vec { + // Not supported for now + vec![] + } + #[inline(always)] fn is_reliable(&self) -> bool { true From bc3947eeb23ca2d501d196684b2ccc13a4b73fde Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 31 Jan 2024 17:04:40 +0100 Subject: [PATCH 06/24] Enable test run logging --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 51fbb62cf2..73af857728 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -93,7 +93,7 @@ jobs: uses: taiki-e/install-action@nextest - name: Run tests - run: cargo nextest run --exclude zenoh-examples --exclude zenoh-plugin-example --workspace + run: cargo nextest run --exclude zenoh-examples --exclude zenoh-plugin-example --workspace -- --nocapture env: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse ASYNC_STD_THREAD_COUNT: 4 From 2f608da4f55d2165cbf4befed96a5656186f38d8 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 31 Jan 2024 17:17:01 +0100 Subject: [PATCH 07/24] Add some debug output --- .github/workflows/ci.yml | 2 +- zenoh/src/net/routing/interceptor/downsampling.rs | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 73af857728..51fbb62cf2 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -93,7 +93,7 @@ jobs: uses: taiki-e/install-action@nextest - name: Run tests - run: cargo nextest run --exclude zenoh-examples --exclude zenoh-plugin-example --workspace -- --nocapture + run: cargo nextest run --exclude zenoh-examples --exclude zenoh-plugin-example --workspace env: CARGO_REGISTRIES_CRATES_IO_PROTOCOL: sparse ASYNC_STD_THREAD_COUNT: 4 diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 522b1a9f93..94b11a3033 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -109,12 +109,17 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { log::debug!("New transport unicast {:?}", transport); if let Some(interface) = self.conf.interface.clone() { log::debug!("New downsampler transport unicast interface: {}", interface); + print!("New downsampler transport unicast interface: {}", interface); if let Ok(links) = transport.get_links() { for link in links { log::debug!( "New downsampler transport unicast interfaces: {:?}", link.interfaces ); + print!( + "New downsampler transport unicast interfaces: {:?}", + link.interfaces + ); if !link.interfaces.contains(&interface) { return (None, None); } From 62ff262d393b639ae89767efadeafe8e0abf1aa8 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 31 Jan 2024 18:29:32 +0100 Subject: [PATCH 08/24] Add multiply keyexprs/interfaces support --- commons/zenoh-config/src/lib.rs | 7 +++- .../net/routing/interceptor/downsampling.rs | 38 +++++++++++-------- zenoh/tests/interceptors.rs | 12 +++--- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 5eb0aa1af2..753abdb8e6 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -72,8 +72,11 @@ pub type SecretValue = Secret; #[derive(Debug, Deserialize, Serialize, Clone)] pub struct DownsamplerConf { - pub keyexpr: Option, - pub interface: Option, + /// A list of key-expressions to which the downsampling will be applied. + pub keyexprs: Option>, + /// A list of interfaces to which the downsampling will be applied. + pub interfaces: Option>, + /// Downsampling strategy. pub strategy: Option, pub threshold_ms: Option, } diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 94b11a3033..31e5544970 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -37,7 +37,7 @@ impl InterceptorTrait for IngressMsgDownsampler { } pub(crate) struct EgressMsgDownsampler { - keyexpr: Option, + keyexprs: Option>, threshold: std::time::Duration, latest_message_timestamp: Arc>, } @@ -47,12 +47,16 @@ impl InterceptorTrait for EgressMsgDownsampler { &self, ctx: RoutingContext, ) -> Option> { - if let Some(cfg_keyexpr) = self.keyexpr.as_ref() { + if let Some(cfg_keyexprs) = self.keyexprs.as_ref() { + let mut matched = false; if let Some(keyexpr) = ctx.full_key_expr() { - if !cfg_keyexpr.intersects(&keyexpr) { - return Some(ctx); + for cfg_keyexpr in cfg_keyexprs { + if cfg_keyexpr.intersects(&keyexpr) { + matched = true; + } } - } else { + } + if !matched { return Some(ctx); } } @@ -76,7 +80,7 @@ impl EgressMsgDownsampler { if let Some(threshold_ms) = conf.threshold_ms { let threshold = std::time::Duration::from_millis(threshold_ms); Self { - keyexpr: conf.keyexpr, + keyexprs: conf.keyexprs, threshold, // TODO (sashacmc): I need just := 0, but how??? latest_message_timestamp: Arc::new(Mutex::new( @@ -106,21 +110,23 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { &self, transport: &TransportUnicast, ) -> (Option, Option) { - log::debug!("New transport unicast {:?}", transport); - if let Some(interface) = self.conf.interface.clone() { - log::debug!("New downsampler transport unicast interface: {}", interface); - print!("New downsampler transport unicast interface: {}", interface); + log::debug!("New downsampler transport unicast {:?}", transport); + if let Some(interfaces) = &self.conf.interfaces { + log::debug!( + "New downsampler transport unicast config interfaces: {:?}", + interfaces + ); if let Ok(links) = transport.get_links() { for link in links { log::debug!( - "New downsampler transport unicast interfaces: {:?}", - link.interfaces - ); - print!( - "New downsampler transport unicast interfaces: {:?}", + "New downsampler transport unicast link interfaces: {:?}", link.interfaces ); - if !link.interfaces.contains(&interface) { + if !link + .interfaces + .iter() + .any(|interface| interfaces.contains(&interface)) + { return (None, None); } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 4838b8f587..502940ab41 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -50,11 +50,11 @@ fn downsampling_by_keyexpr() { r#" [ { - keyexpr: "test/downsamples_by_keyexp/r100", + keyexprs: ["test/downsamples_by_keyexp/r100"], threshold_ms: 100, }, { - keyexpr: "test/downsamples_by_keyexp/r50", + keyexprs: ["test/downsamples_by_keyexp/r50"], threshold_ms: 50, }, ] @@ -138,13 +138,13 @@ fn downsampling_by_interface() { r#" [ { - keyexpr: "test/downsamples_by_interface/r100", - interface: "lo", + keyexprs: ["test/downsamples_by_interface/r100"], + interfaces: ["lo", "lo0"], threshold_ms: 100, }, { - keyexpr: "test/downsamples_by_interface/all", - interface: "some_unknown_interface", + keyexprs: ["test/downsamples_by_interface/all"], + interfaces: ["some_unknown_interface"], threshold_ms: 100, }, ] From 5349da38a6a1b03f952c85c075e43f44fb38b7e8 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Wed, 31 Jan 2024 18:58:23 +0100 Subject: [PATCH 09/24] Syntax fixes --- commons/zenoh-util/src/std_only/net/mod.rs | 33 +++++++++++-------- .../net/routing/interceptor/downsampling.rs | 6 +--- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 9562c97ded..4d18476109 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -443,6 +443,7 @@ pub fn get_interface_by_addr(addr: IpAddr) -> Vec { #[cfg(windows)] { // TODO(sashacmc): check and fix + let mut result = vec![]; unsafe { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; @@ -469,25 +470,29 @@ pub fn get_interface_by_addr(addr: IpAddr) -> Vec { retries += 1; } - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } - - let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); - while let Some(iface) = next_iface { - let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); - while let Some(ucast_addr) = next_ucast_addr { - if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { - if ifaddr.ip() == addr { - return Ok(iface.AdapterName); + if addr.is_unspecified() { + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + result.push(iface.AdapterName); + next_iface = iface.Next.as_ref(); + } + } else { + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); + while let Some(iface) = next_iface { + let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); + while let Some(ucast_addr) = next_ucast_addr { + if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { + if ifaddr.ip() == addr { + result.push(iface.AdapterName); + } } + next_ucast_addr = ucast_addr.Next.as_ref(); } - next_ucast_addr = ucast_addr.Next.as_ref(); + next_iface = iface.Next.as_ref(); } - next_iface = iface.Next.as_ref(); } - bail!("No interface found with address {addr}") } + result } } diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 31e5544970..4492f1338a 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -122,11 +122,7 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { "New downsampler transport unicast link interfaces: {:?}", link.interfaces ); - if !link - .interfaces - .iter() - .any(|interface| interfaces.contains(&interface)) - { + if !link.interfaces.iter().any(|x| interfaces.contains(x)) { return (None, None); } } From 060ce097f2da8afc3b1d580926a44bd8ab324309 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 13:47:38 +0000 Subject: [PATCH 10/24] Fix windows build --- commons/zenoh-util/src/std_only/net/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 4d18476109..0113133a13 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -473,7 +473,7 @@ pub fn get_interface_by_addr(addr: IpAddr) -> Vec { if addr.is_unspecified() { let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { - result.push(iface.AdapterName); + result.push(ffi::pstr_to_string(iface.AdapterName)); next_iface = iface.Next.as_ref(); } } else { @@ -483,7 +483,7 @@ pub fn get_interface_by_addr(addr: IpAddr) -> Vec { while let Some(ucast_addr) = next_ucast_addr { if let Ok(ifaddr) = ffi::win::sockaddr_to_addr(ucast_addr.Address) { if ifaddr.ip() == addr { - result.push(iface.AdapterName); + result.push(ffi::pstr_to_string(iface.AdapterName)); } } next_ucast_addr = ucast_addr.Next.as_ref(); From 2d6ac394c0c61d9470d2834b37f5e21d35c587df Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 15:41:10 +0100 Subject: [PATCH 11/24] Optimisation and refactoring --- commons/zenoh-config/src/lib.rs | 4 +- .../net/routing/interceptor/downsampling.rs | 53 ++++++++++--------- zenoh/tests/interceptors.rs | 4 +- 3 files changed, 34 insertions(+), 27 deletions(-) diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 753abdb8e6..8645317e92 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -76,8 +76,10 @@ pub struct DownsamplerConf { pub keyexprs: Option>, /// A list of interfaces to which the downsampling will be applied. pub interfaces: Option>, - /// Downsampling strategy. + /// Downsampling strategy (default: ratelimit). + // TODO(sashacmc): how specify default value and generate DEFAULT_CONFIG? pub strategy: Option, + /// Minimim timeout between two messages for ratelimit starategy pub threshold_ms: Option, } diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 4492f1338a..d648e3da0b 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -23,26 +23,17 @@ use std::sync::{Arc, Mutex}; use zenoh_config::DownsamplerConf; use zenoh_protocol::core::key_expr::OwnedKeyExpr; -// TODO(sashacmc): this is ratelimit strategy, we should also add decimation (with "factor" option) +static RATELIMIT_STRATEGY: &'static str = "ratelimit"; -pub(crate) struct IngressMsgDownsampler {} +// TODO(sashacmc): this is ratelimit strategy, we can also add decimation (with "factor" option) -impl InterceptorTrait for IngressMsgDownsampler { - fn intercept( - &self, - ctx: RoutingContext, - ) -> Option> { - Some(ctx) - } -} - -pub(crate) struct EgressMsgDownsampler { +pub(crate) struct EgressMsgDownsamplerRatelimit { keyexprs: Option>, threshold: std::time::Duration, latest_message_timestamp: Arc>, } -impl InterceptorTrait for EgressMsgDownsampler { +impl InterceptorTrait for EgressMsgDownsamplerRatelimit { fn intercept( &self, ctx: RoutingContext, @@ -75,7 +66,7 @@ impl InterceptorTrait for EgressMsgDownsampler { } } -impl EgressMsgDownsampler { +impl EgressMsgDownsamplerRatelimit { pub fn new(conf: DownsamplerConf) -> Self { if let Some(threshold_ms) = conf.threshold_ms { let threshold = std::time::Duration::from_millis(threshold_ms); @@ -128,19 +119,33 @@ impl InterceptorFactoryTrait for DownsamplerInterceptor { } } }; - ( - Some(Box::new(IngressMsgDownsampler {})), - Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))), - ) + + let strategy = self + .conf + .strategy + .as_ref() + .map_or_else(|| RATELIMIT_STRATEGY.to_string(), |s| s.clone()); + + if strategy == RATELIMIT_STRATEGY { + ( + None, + Some(Box::new(EgressMsgDownsamplerRatelimit::new( + self.conf.clone(), + ))), + ) + } else { + panic!("Unsupported downsampling strategy: {}", strategy) + } } - fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option { - log::debug!("New transport multicast {:?}", transport); - Some(Box::new(EgressMsgDownsampler::new(self.conf.clone()))) + fn new_transport_multicast( + &self, + _transport: &TransportMulticast, + ) -> Option { + None } - fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option { - log::debug!("New peer multicast {:?}", transport); - Some(Box::new(IngressMsgDownsampler {})) + fn new_peer_multicast(&self, _transport: &TransportMulticast) -> Option { + None } } diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 502940ab41..0c1beb455a 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -27,13 +27,13 @@ fn downsampling_by_keyexpr() { let curr_time = std::time::Instant::now(); if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { let mut last_time = last_time_r100.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 1; + let interval = (curr_time - *last_time).as_millis() + 3; *last_time = curr_time; println!("interval 100: {}", interval); assert!(interval >= 100); } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { let mut last_time = last_time_r50.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 1; + let interval = (curr_time - *last_time).as_millis() + 2; *last_time = curr_time; println!("interval 50: {}", interval); assert!(interval >= 50); From 98e06e5f5b9a42a231f26ce7f5a53a6204e23c8a Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 21:07:59 +0000 Subject: [PATCH 12/24] Windows net code refactoring --- commons/zenoh-util/src/std_only/net/mod.rs | 176 +++++------------- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 13 +- .../net/routing/interceptor/downsampling.rs | 2 +- 3 files changed, 57 insertions(+), 134 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 0113133a13..ffeb97b593 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -91,6 +91,39 @@ pub fn set_linger(socket: &TcpStream, dur: Option) -> ZResult<()> { } } +#[cfg(windows)] +unsafe fn get_adapters_adresses() -> ZResult> { + use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; + + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + winapi::shared::ws2def::AF_INET.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret) + } + + return Ok(buffer); +} + pub fn get_interface(name: &str) -> ZResult> { #[cfg(unix)] { @@ -117,31 +150,7 @@ pub fn get_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses()?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -218,33 +227,9 @@ pub fn get_local_addresses() -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; + let buffer = get_adapters_adresses()?; + let mut result = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } - let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); @@ -317,33 +302,9 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut addrs = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret); - } + let buffer = get_adapters_adresses()?; + let mut addrs = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { if name == ffi::pstr_to_string(iface.AdapterName) @@ -380,31 +341,7 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses()?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -424,51 +361,30 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { } } -pub fn get_interface_by_addr(addr: IpAddr) -> Vec { +pub fn get_interfaces_by_addr(addr: IpAddr) -> ZResult> { #[cfg(unix)] { if addr.is_unspecified() { - pnet_datalink::interfaces() + Ok(pnet_datalink::interfaces() .iter() .map(|iface| iface.name.clone()) - .collect::>() + .collect::>()) } else { - pnet_datalink::interfaces() + Ok(pnet_datalink::interfaces() .iter() .filter(|iface| iface.ips.iter().any(|ipnet| ipnet.ip() == addr)) .map(|iface| iface.name.clone()) - .collect::>() + .collect::>()) } } #[cfg(windows)] { - // TODO(sashacmc): check and fix let mut result = vec![]; unsafe { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } + let buffer = get_adapters_adresses()?; if addr.is_unspecified() { let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); @@ -492,7 +408,7 @@ pub fn get_interface_by_addr(addr: IpAddr) -> Vec { } } } - result + Ok(result) } } diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index a441ba5ab9..d5c0a0c55d 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -146,9 +146,16 @@ impl LinkUnicastTrait for LinkUnicastTcp { #[inline(always)] fn get_interfaces(&self) -> Vec { - let res = zenoh_util::net::get_interface_by_addr(self.src_addr.ip()); - log::debug!("get_interfaces for {:?}: {:?}", self.src_addr.ip(), res); - res + match zenoh_util::net::get_interfaces_by_addr(self.src_addr.ip()) { + Ok(interfaces) => { + log::debug!("get_interfaces for {:?}: {:?}", self.src_addr.ip(), interfaces); + interfaces + }, + Err(e) => { + log::error!("get_interfaces for {:?} failed: {:?}", self.src_addr.ip(), e); + vec![] + }, + } } #[inline(always)] diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index d648e3da0b..4c0b1cf341 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -23,7 +23,7 @@ use std::sync::{Arc, Mutex}; use zenoh_config::DownsamplerConf; use zenoh_protocol::core::key_expr::OwnedKeyExpr; -static RATELIMIT_STRATEGY: &'static str = "ratelimit"; +const RATELIMIT_STRATEGY: &str = "ratelimit"; // TODO(sashacmc): this is ratelimit strategy, we can also add decimation (with "factor" option) From e03d5ea345edf5d74ba984f9e074a38e5d3b0205 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 22:11:05 +0100 Subject: [PATCH 13/24] Fix formatting --- commons/zenoh-util/src/std_only/net/mod.rs | 4 ++-- io/zenoh-links/zenoh-link-tcp/src/unicast.rs | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index ffeb97b593..7b30a6861b 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -116,7 +116,7 @@ unsafe fn get_adapters_adresses() -> ZResult> { } retries += 1; } - + if ret != 0 { bail!("GetAdaptersAddresses returned {}", ret) } @@ -228,7 +228,7 @@ pub fn get_local_addresses() -> ZResult> { use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; let buffer = get_adapters_adresses()?; - + let mut result = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { diff --git a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs index d5c0a0c55d..71a87df605 100644 --- a/io/zenoh-links/zenoh-link-tcp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-tcp/src/unicast.rs @@ -148,13 +148,21 @@ impl LinkUnicastTrait for LinkUnicastTcp { fn get_interfaces(&self) -> Vec { match zenoh_util::net::get_interfaces_by_addr(self.src_addr.ip()) { Ok(interfaces) => { - log::debug!("get_interfaces for {:?}: {:?}", self.src_addr.ip(), interfaces); + log::debug!( + "get_interfaces for {:?}: {:?}", + self.src_addr.ip(), + interfaces + ); interfaces - }, + } Err(e) => { - log::error!("get_interfaces for {:?} failed: {:?}", self.src_addr.ip(), e); + log::error!( + "get_interfaces for {:?} failed: {:?}", + self.src_addr.ip(), + e + ); vec![] - }, + } } } From 167652e52f34a11af39c534901348922266e3d18 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 22:17:53 +0100 Subject: [PATCH 14/24] Clippy fix --- commons/zenoh-util/src/std_only/net/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 7b30a6861b..581eb2534e 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -121,7 +121,7 @@ unsafe fn get_adapters_adresses() -> ZResult> { bail!("GetAdaptersAddresses returned {}", ret) } - return Ok(buffer); + Ok(buffer) } pub fn get_interface(name: &str) -> ZResult> { From 8a5ca562ad9860f46670eef37da9ff0cbfa753d2 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 22:51:34 +0100 Subject: [PATCH 15/24] Add udp interface support --- io/zenoh-links/zenoh-link-udp/src/unicast.rs | 20 ++++++++++++++++++-- zenoh/src/net/routing/mod.rs | 1 - zenoh/tests/interceptors.rs | 4 ++-- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/io/zenoh-links/zenoh-link-udp/src/unicast.rs b/io/zenoh-links/zenoh-link-udp/src/unicast.rs index fb14580043..016a557740 100644 --- a/io/zenoh-links/zenoh-link-udp/src/unicast.rs +++ b/io/zenoh-links/zenoh-link-udp/src/unicast.rs @@ -210,8 +210,24 @@ impl LinkUnicastTrait for LinkUnicastUdp { #[inline(always)] fn get_interfaces(&self) -> Vec { - // Not supported for now - vec![] + match zenoh_util::net::get_interfaces_by_addr(self.src_addr.ip()) { + Ok(interfaces) => { + log::debug!( + "get_interfaces for {:?}: {:?}", + self.src_addr.ip(), + interfaces + ); + interfaces + } + Err(e) => { + log::error!( + "get_interfaces for {:?} failed: {:?}", + self.src_addr.ip(), + e + ); + vec![] + } + } } #[inline(always)] diff --git a/zenoh/src/net/routing/mod.rs b/zenoh/src/net/routing/mod.rs index 37a698263b..c96a6302a3 100644 --- a/zenoh/src/net/routing/mod.rs +++ b/zenoh/src/net/routing/mod.rs @@ -170,7 +170,6 @@ impl RoutingContext { None } - //TODO (sashacmc): maybe rewrite full_expr to don't convert back? #[inline] pub(crate) fn full_key_expr(&self) -> Option { match self.full_expr() { diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 0c1beb455a..938c1d1dd4 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -27,13 +27,13 @@ fn downsampling_by_keyexpr() { let curr_time = std::time::Instant::now(); if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { let mut last_time = last_time_r100.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 3; + let interval = (curr_time - *last_time).as_millis() + 5; *last_time = curr_time; println!("interval 100: {}", interval); assert!(interval >= 100); } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { let mut last_time = last_time_r50.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 2; + let interval = (curr_time - *last_time).as_millis() + 5; *last_time = curr_time; println!("interval 50: {}", interval); assert!(interval >= 50); From 36b542be0086bfb8341782e118c0c78e726aef94 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Thu, 1 Feb 2024 23:25:35 +0100 Subject: [PATCH 16/24] Improve test stability --- zenoh/tests/interceptors.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 938c1d1dd4..64d3c0ec49 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -87,6 +87,7 @@ fn downsampling_by_keyexpr() { std::thread::sleep(interval); } + std::thread::sleep(std::time::Duration::from_secs(1)); assert!(*(total_count.lock().unwrap()) >= 1000); } @@ -172,5 +173,6 @@ fn downsampling_by_interface() { std::thread::sleep(interval); } + std::thread::sleep(std::time::Duration::from_secs(1)); assert!(*(total_count.lock().unwrap()) >= 1000); } From 5774b61456a90098c16189d550ebdb8b503b94fe Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 09:46:04 +0100 Subject: [PATCH 17/24] Speedup tests --- zenoh/tests/interceptors.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 64d3c0ec49..aa326eace5 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -78,7 +78,8 @@ fn downsampling_by_keyexpr() { .unwrap(); let interval = std::time::Duration::from_millis(1); - for i in 0..1000 { + let messages_count = 1000; + for i in 0..messages_count { println!("message {}", i); publisher_r100.put(format!("message {}", i)).res().unwrap(); publisher_r50.put(format!("message {}", i)).res().unwrap(); @@ -87,8 +88,13 @@ fn downsampling_by_keyexpr() { std::thread::sleep(interval); } - std::thread::sleep(std::time::Duration::from_secs(1)); - assert!(*(total_count.lock().unwrap()) >= 1000); + for _ in 0..100 { + if *(total_count.lock().unwrap()) >= messages_count { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(*(total_count.lock().unwrap()) >= messages_count); } #[cfg(unix)] @@ -165,7 +171,8 @@ fn downsampling_by_interface() { .unwrap(); let interval = std::time::Duration::from_millis(1); - for i in 0..1000 { + let messages_count = 1000; + for i in 0..messages_count { println!("message {}", i); publisher_r100.put(format!("message {}", i)).res().unwrap(); publisher_all.put(format!("message {}", i)).res().unwrap(); @@ -173,6 +180,11 @@ fn downsampling_by_interface() { std::thread::sleep(interval); } - std::thread::sleep(std::time::Duration::from_secs(1)); - assert!(*(total_count.lock().unwrap()) >= 1000); + for _ in 0..100 { + if *(total_count.lock().unwrap()) >= messages_count { + break; + } + std::thread::sleep(std::time::Duration::from_millis(100)); + } + assert!(*(total_count.lock().unwrap()) >= messages_count); } From d44cf3744654c252f405190a98319f3fb1346021 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 10:58:41 +0100 Subject: [PATCH 18/24] Temprorary disable interceptor test for windows --- zenoh/tests/interceptors.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index aa326eace5..56db101d0d 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -1,5 +1,6 @@ use std::sync::{Arc, Mutex}; +#[cfg(unix)] #[test] fn downsampling_by_keyexpr() { let _ = env_logger::builder().is_test(true).try_init(); From b9ed46c22162dcbf5011e64c84498800aaefec42 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 12:43:48 +0100 Subject: [PATCH 19/24] Fix clippy warning --- zenoh/tests/interceptors.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 56db101d0d..095ccf9bc1 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -1,3 +1,4 @@ +#[cfg(unix)] use std::sync::{Arc, Mutex}; #[cfg(unix)] From 9fd92e074373147dc9fca2a5fc9074b3ee509c91 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 13:34:16 +0100 Subject: [PATCH 20/24] Rework test to count middle interval size --- zenoh/tests/interceptors.rs | 82 ++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 28 deletions(-) diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index 095ccf9bc1..ab40c4a0a5 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -1,7 +1,44 @@ -#[cfg(unix)] use std::sync::{Arc, Mutex}; -#[cfg(unix)] +struct IntervalCounter { + first_tick: bool, + last_time: std::time::Instant, + count: u32, + total_time: std::time::Duration, +} + +impl IntervalCounter { + fn new() -> IntervalCounter { + IntervalCounter { + first_tick: true, + last_time: std::time::Instant::now(), + count: 0, + total_time: std::time::Duration::from_secs(0), + } + } + + fn tick(&mut self) { + let curr_time = std::time::Instant::now(); + if self.first_tick { + self.first_tick = false; + } else { + self.total_time += curr_time - self.last_time; + self.count += 1; + } + self.last_time = curr_time; + } + + fn get_middle(&self) -> u32 { + self.total_time.as_millis() as u32 / self.count + } + + fn check_middle(&self, ms: u32) { + let middle = self.get_middle(); + println!("Interval {}, count: {}, middle: {}", ms, self.count, middle); + assert!(middle + 1 >= ms); + } +} + #[test] fn downsampling_by_keyexpr() { let _ = env_logger::builder().is_test(true).try_init(); @@ -11,12 +48,10 @@ fn downsampling_by_keyexpr() { // declare subscriber let zenoh_sub = zenoh::open(Config::default()).res().unwrap(); - let last_time_r100 = Arc::new(Mutex::new( - std::time::Instant::now() - std::time::Duration::from_millis(100), - )); - let last_time_r50 = Arc::new(Mutex::new( - std::time::Instant::now() - std::time::Duration::from_millis(50), - )); + let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r100_clone = counter_r100.clone(); + let counter_r50 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r50_clone = counter_r50.clone(); let total_count = Arc::new(Mutex::new(0)); let total_count_clone = total_count.clone(); @@ -26,19 +61,10 @@ fn downsampling_by_keyexpr() { .callback(move |sample| { let mut count = total_count_clone.lock().unwrap(); *count += 1; - let curr_time = std::time::Instant::now(); if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r100" { - let mut last_time = last_time_r100.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 5; - *last_time = curr_time; - println!("interval 100: {}", interval); - assert!(interval >= 100); + counter_r100.lock().unwrap().tick(); } else if sample.key_expr.as_str() == "test/downsamples_by_keyexp/r50" { - let mut last_time = last_time_r50.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 5; - *last_time = curr_time; - println!("interval 50: {}", interval); - assert!(interval >= 50); + counter_r50.lock().unwrap().tick(); } }) .res() @@ -97,6 +123,9 @@ fn downsampling_by_keyexpr() { std::thread::sleep(std::time::Duration::from_millis(100)); } assert!(*(total_count.lock().unwrap()) >= messages_count); + + counter_r50_clone.lock().unwrap().check_middle(50); + counter_r100_clone.lock().unwrap().check_middle(100); } #[cfg(unix)] @@ -113,9 +142,9 @@ fn downsampling_by_interface() { .unwrap(); let zenoh_sub = zenoh::open(config_sub).res().unwrap(); - let last_time_r100 = Arc::new(Mutex::new( - std::time::Instant::now() - std::time::Duration::from_millis(100), - )); + let counter_r100 = Arc::new(Mutex::new(IntervalCounter::new())); + let counter_r100_clone = counter_r100.clone(); + let total_count = Arc::new(Mutex::new(0)); let total_count_clone = total_count.clone(); @@ -124,13 +153,8 @@ fn downsampling_by_interface() { .callback(move |sample| { let mut count = total_count_clone.lock().unwrap(); *count += 1; - let curr_time = std::time::Instant::now(); if sample.key_expr.as_str() == "test/downsamples_by_interface/r100" { - let mut last_time = last_time_r100.lock().unwrap(); - let interval = (curr_time - *last_time).as_millis() + 1; - *last_time = curr_time; - println!("interval 100: {}", interval); - assert!(interval >= 100); + counter_r100.lock().unwrap().tick(); } }) .res() @@ -189,4 +213,6 @@ fn downsampling_by_interface() { std::thread::sleep(std::time::Duration::from_millis(100)); } assert!(*(total_count.lock().unwrap()) >= messages_count); + + counter_r100_clone.lock().unwrap().check_middle(100); } From f484a9b70bf9eac5d9222124ab396ab394a66a63 Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 14:38:41 +0100 Subject: [PATCH 21/24] Revert windows net refactoring --- commons/zenoh-util/src/std_only/net/mod.rs | 108 +++++++++++++++++++-- 1 file changed, 102 insertions(+), 6 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index 581eb2534e..d0eae43d8f 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -150,7 +150,31 @@ pub fn get_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let buffer = get_adapters_adresses()?; + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + winapi::shared::ws2def::AF_INET.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret) + } let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -227,9 +251,33 @@ pub fn get_local_addresses() -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let buffer = get_adapters_adresses()?; - let mut result = vec![]; + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret) + } + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); @@ -302,9 +350,33 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let buffer = get_adapters_adresses()?; - let mut addrs = vec![]; + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + winapi::shared::ws2def::AF_INET.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret); + } + let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { if name == ffi::pstr_to_string(iface.AdapterName) @@ -341,7 +413,31 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let buffer = get_adapters_adresses()?; + let mut ret; + let mut retries = 0; + let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; + let mut buffer: Vec; + loop { + buffer = Vec::with_capacity(size as usize); + ret = winapi::um::iphlpapi::GetAdaptersAddresses( + winapi::shared::ws2def::AF_INET.try_into().unwrap(), + 0, + std::ptr::null_mut(), + buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, + &mut size, + ); + if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { + break; + } + if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { + break; + } + retries += 1; + } + + if ret != 0 { + bail!("GetAdaptersAddresses returned {}", ret) + } let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { From eaa35b7550b7154b1b7f082d14657730a2d3867e Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 15:26:54 +0000 Subject: [PATCH 22/24] Fix windows net refactoring --- commons/zenoh-util/src/std_only/net/mod.rs | 114 ++------------------- 1 file changed, 9 insertions(+), 105 deletions(-) diff --git a/commons/zenoh-util/src/std_only/net/mod.rs b/commons/zenoh-util/src/std_only/net/mod.rs index d0eae43d8f..4fad541d1e 100644 --- a/commons/zenoh-util/src/std_only/net/mod.rs +++ b/commons/zenoh-util/src/std_only/net/mod.rs @@ -92,7 +92,7 @@ pub fn set_linger(socket: &TcpStream, dur: Option) -> ZResult<()> { } #[cfg(windows)] -unsafe fn get_adapters_adresses() -> ZResult> { +unsafe fn get_adapters_adresses(af_spec: i32) -> ZResult> { use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; let mut ret; @@ -102,7 +102,7 @@ unsafe fn get_adapters_adresses() -> ZResult> { loop { buffer = Vec::with_capacity(size as usize); ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), + af_spec.try_into().unwrap(), 0, std::ptr::null_mut(), buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, @@ -150,31 +150,7 @@ pub fn get_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -251,33 +227,9 @@ pub fn get_local_addresses() -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut result = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_UNSPEC.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?; + let mut result = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { let mut next_ucast_addr = iface.FirstUnicastAddress.as_ref(); @@ -350,33 +302,9 @@ pub fn get_unicast_addresses_of_interface(name: &str) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut addrs = vec![]; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret); - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; + let mut addrs = vec![]; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { if name == ffi::pstr_to_string(iface.AdapterName) @@ -413,31 +341,7 @@ pub fn get_index_of_interface(addr: IpAddr) -> ZResult { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let mut ret; - let mut retries = 0; - let mut size: u32 = *WINDOWS_GET_ADAPTERS_ADDRESSES_BUF_SIZE; - let mut buffer: Vec; - loop { - buffer = Vec::with_capacity(size as usize); - ret = winapi::um::iphlpapi::GetAdaptersAddresses( - winapi::shared::ws2def::AF_INET.try_into().unwrap(), - 0, - std::ptr::null_mut(), - buffer.as_mut_ptr() as *mut IP_ADAPTER_ADDRESSES_LH, - &mut size, - ); - if ret != winapi::shared::winerror::ERROR_BUFFER_OVERFLOW { - break; - } - if retries >= *WINDOWS_GET_ADAPTERS_ADDRESSES_MAX_RETRIES { - break; - } - retries += 1; - } - - if ret != 0 { - bail!("GetAdaptersAddresses returned {}", ret) - } + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_INET)?; let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); while let Some(iface) = next_iface { @@ -480,7 +384,7 @@ pub fn get_interfaces_by_addr(addr: IpAddr) -> ZResult> { use crate::ffi; use winapi::um::iptypes::IP_ADAPTER_ADDRESSES_LH; - let buffer = get_adapters_adresses()?; + let buffer = get_adapters_adresses(winapi::shared::ws2def::AF_UNSPEC)?; if addr.is_unspecified() { let mut next_iface = (buffer.as_ptr() as *mut IP_ADAPTER_ADDRESSES_LH).as_ref(); From 437f1d1ed728230e143c935624b9973d53a3b99e Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Fri, 2 Feb 2024 20:34:52 +0100 Subject: [PATCH 23/24] Add comment regarding links update --- zenoh/src/net/runtime/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/zenoh/src/net/runtime/mod.rs b/zenoh/src/net/runtime/mod.rs index ac125421f6..d7c7633a96 100644 --- a/zenoh/src/net/runtime/mod.rs +++ b/zenoh/src/net/runtime/mod.rs @@ -215,6 +215,10 @@ impl TransportEventHandler for RuntimeTransportEventHandler { ) -> ZResult> { match zread!(self.runtime).as_ref() { Some(runtime) => { + // TODO(sashacmc): + // for detect link update and reload interceptors we can add new slave_handler + // or add some call to RuntimeSession/RuntimeMuticastSession processing + let slave_handlers: Vec> = zread!(runtime.state.transport_handlers) .iter() From ab406d83198b8b63a40588777980ae300281b65a Mon Sep 17 00:00:00 2001 From: Alexander Bushnev Date: Mon, 5 Feb 2024 13:10:15 +0100 Subject: [PATCH 24/24] Config refactoring --- commons/zenoh-config/src/defaults.rs | 11 +++++++++++ commons/zenoh-config/src/lib.rs | 5 ++--- zenoh/src/net/routing/interceptor/downsampling.rs | 8 ++++---- zenoh/src/net/routing/interceptor/mod.rs | 2 +- zenoh/tests/interceptors.rs | 6 ++---- 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/commons/zenoh-config/src/defaults.rs b/commons/zenoh-config/src/defaults.rs index 8d1a5dbc0f..f887c15864 100644 --- a/commons/zenoh-config/src/defaults.rs +++ b/commons/zenoh-config/src/defaults.rs @@ -211,3 +211,14 @@ impl Default for SharedMemoryConf { Self { enabled: false } } } + +impl Default for DownsamplingItemConf { + fn default() -> Self { + Self { + keyexprs: None, + interfaces: None, + strategy: Some("ratelimit".to_string()), + threshold_ms: None, + } + } +} diff --git a/commons/zenoh-config/src/lib.rs b/commons/zenoh-config/src/lib.rs index 8645317e92..df5f1ff89c 100644 --- a/commons/zenoh-config/src/lib.rs +++ b/commons/zenoh-config/src/lib.rs @@ -71,13 +71,12 @@ impl Zeroize for SecretString { pub type SecretValue = Secret; #[derive(Debug, Deserialize, Serialize, Clone)] -pub struct DownsamplerConf { +pub struct DownsamplingItemConf { /// A list of key-expressions to which the downsampling will be applied. pub keyexprs: Option>, /// A list of interfaces to which the downsampling will be applied. pub interfaces: Option>, /// Downsampling strategy (default: ratelimit). - // TODO(sashacmc): how specify default value and generate DEFAULT_CONFIG? pub strategy: Option, /// Minimim timeout between two messages for ratelimit starategy pub threshold_ms: Option, @@ -421,7 +420,7 @@ validated_struct::validator! { /// Configuration of the downsampling. pub downsampling: #[derive(Default)] DownsamplingConf { - downsamples: Vec, + items: Vec, }, /// A list of directories where plugins may be searched for if no `__path__` was specified for them. /// The executable's current directory will be added to the search paths. diff --git a/zenoh/src/net/routing/interceptor/downsampling.rs b/zenoh/src/net/routing/interceptor/downsampling.rs index 4c0b1cf341..352b79a6d8 100644 --- a/zenoh/src/net/routing/interceptor/downsampling.rs +++ b/zenoh/src/net/routing/interceptor/downsampling.rs @@ -20,7 +20,7 @@ use crate::net::routing::interceptor::*; use std::sync::{Arc, Mutex}; -use zenoh_config::DownsamplerConf; +use zenoh_config::DownsamplingItemConf; use zenoh_protocol::core::key_expr::OwnedKeyExpr; const RATELIMIT_STRATEGY: &str = "ratelimit"; @@ -67,7 +67,7 @@ impl InterceptorTrait for EgressMsgDownsamplerRatelimit { } impl EgressMsgDownsamplerRatelimit { - pub fn new(conf: DownsamplerConf) -> Self { + pub fn new(conf: DownsamplingItemConf) -> Self { if let Some(threshold_ms) = conf.threshold_ms { let threshold = std::time::Duration::from_millis(threshold_ms); Self { @@ -86,11 +86,11 @@ impl EgressMsgDownsamplerRatelimit { } pub struct DownsamplerInterceptor { - conf: DownsamplerConf, + conf: DownsamplingItemConf, } impl DownsamplerInterceptor { - pub fn new(conf: DownsamplerConf) -> Self { + pub fn new(conf: DownsamplingItemConf) -> Self { log::debug!("DownsamplerInterceptor enabled: {:?}", conf); Self { conf } } diff --git a/zenoh/src/net/routing/interceptor/mod.rs b/zenoh/src/net/routing/interceptor/mod.rs index e5c044116b..6af14e8341 100644 --- a/zenoh/src/net/routing/interceptor/mod.rs +++ b/zenoh/src/net/routing/interceptor/mod.rs @@ -51,7 +51,7 @@ pub(crate) type InterceptorFactory = Box Vec { let mut res: Vec = vec![]; - for ds in config.downsampling().downsamples() { + for ds in config.downsampling().items() { res.push(Box::new(DownsamplerInterceptor::new(ds.clone()))) } res.push(Box::new(LoggerInterceptor {})); diff --git a/zenoh/tests/interceptors.rs b/zenoh/tests/interceptors.rs index ab40c4a0a5..1cd07c2f09 100644 --- a/zenoh/tests/interceptors.rs +++ b/zenoh/tests/interceptors.rs @@ -74,7 +74,7 @@ fn downsampling_by_keyexpr() { let mut config = Config::default(); config .insert_json5( - "downsampling/downsamples", + "downsampling/items", r#" [ { @@ -108,7 +108,6 @@ fn downsampling_by_keyexpr() { let interval = std::time::Duration::from_millis(1); let messages_count = 1000; for i in 0..messages_count { - println!("message {}", i); publisher_r100.put(format!("message {}", i)).res().unwrap(); publisher_r50.put(format!("message {}", i)).res().unwrap(); publisher_all.put(format!("message {}", i)).res().unwrap(); @@ -167,7 +166,7 @@ fn downsampling_by_interface() { .unwrap(); config_pub .insert_json5( - "downsampling/downsamples", + "downsampling/items", r#" [ { @@ -199,7 +198,6 @@ fn downsampling_by_interface() { let interval = std::time::Duration::from_millis(1); let messages_count = 1000; for i in 0..messages_count { - println!("message {}", i); publisher_r100.put(format!("message {}", i)).res().unwrap(); publisher_all.put(format!("message {}", i)).res().unwrap();