diff --git a/zenoh-ext/examples/examples/z_advanced_pub.rs b/zenoh-ext/examples/examples/z_advanced_pub.rs index 6437515c8..56d05ba7c 100644 --- a/zenoh-ext/examples/examples/z_advanced_pub.rs +++ b/zenoh-ext/examples/examples/z_advanced_pub.rs @@ -16,7 +16,7 @@ use std::time::Duration; use clap::{arg, Parser}; use zenoh::{config::Config, key_expr::KeyExpr}; use zenoh_config::ModeDependentValue; -use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig}; +use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig}; use zenoh_ext_examples::CommonArgs; #[tokio::main] @@ -33,7 +33,9 @@ async fn main() { let publisher = session .declare_publisher(&key_expr) .cache(CacheConfig::default().max_samples(history)) - .sample_miss_detection() + .sample_miss_detection( + MissDetectionConfig::default().last_sample_miss_detection(Duration::from_secs(5)), + ) .publisher_detection() .await .unwrap(); diff --git a/zenoh-ext/examples/examples/z_advanced_sub.rs b/zenoh-ext/examples/examples/z_advanced_sub.rs index 5bea70f7d..5d371a407 100644 --- a/zenoh-ext/examples/examples/z_advanced_sub.rs +++ b/zenoh-ext/examples/examples/z_advanced_sub.rs @@ -32,7 +32,11 @@ async fn main() { let subscriber = session .declare_subscriber(key_expr) .history(HistoryConfig::default().detect_late_publishers()) - .recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1)))) + .recovery( + RecoveryConfig::default() + .periodic_queries(Some(Duration::from_secs(1))) + .heartbeat_listener(true), + ) .subscriber_detection() .await .unwrap(); diff --git a/zenoh-ext/src/advanced_publisher.rs b/zenoh-ext/src/advanced_publisher.rs index 9afc2fb09..2cab49b83 100644 --- a/zenoh-ext/src/advanced_publisher.rs +++ b/zenoh-ext/src/advanced_publisher.rs @@ -13,16 +13,22 @@ // use std::{ future::{IntoFuture, Ready}, - sync::atomic::{AtomicU32, Ordering}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Duration, }; use zenoh::{ bytes::{Encoding, OptionZBytes, ZBytes}, internal::{ bail, + runtime::ZRuntime, traits::{ EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait, }, + TerminatableTask, }, key_expr::{keyexpr, KeyExpr}, liveliness::LivelinessToken, @@ -37,7 +43,10 @@ use zenoh::{ }; use zenoh_macros::ke; -use crate::advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC}; +use crate::{ + advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC}, + z_serialize, +}; pub(crate) static KE_PUB: &keyexpr = ke!("pub"); @@ -49,6 +58,21 @@ pub(crate) enum Sequencing { SequenceNumber, } +#[derive(Default)] +#[zenoh_macros::unstable] +pub struct MissDetectionConfig { + pub(crate) state_publisher: Option, +} + +#[zenoh_macros::unstable] +impl MissDetectionConfig { + #[zenoh_macros::unstable] + pub fn last_sample_miss_detection(mut self, period: Duration) -> Self { + self.state_publisher = Some(period); + self + } +} + /// The builder of PublicationCache, allowing to configure it. #[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"] #[zenoh_macros::unstable] @@ -63,6 +87,7 @@ pub struct AdvancedPublisherBuilder<'a, 'b, 'c> { is_express: bool, meta_key_expr: Option>>, sequencing: Sequencing, + miss_config: Option, liveliness: bool, cache: bool, history: CacheConfig, @@ -83,6 +108,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> { is_express: builder.is_express, meta_key_expr: None, sequencing: Sequencing::None, + miss_config: None, liveliness: false, cache: false, history: CacheConfig::default(), @@ -118,8 +144,9 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> { /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled. #[zenoh_macros::unstable] - pub fn sample_miss_detection(mut self) -> Self { + pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self { self.sequencing = Sequencing::SequenceNumber; + self.miss_config = Some(config); self } @@ -230,9 +257,10 @@ impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> { #[zenoh_macros::unstable] pub struct AdvancedPublisher<'a> { publisher: Publisher<'a>, - seqnum: Option, + seqnum: Option>, cache: Option, _token: Option, + _state_publisher: Option, } #[zenoh_macros::unstable] @@ -270,7 +298,7 @@ impl<'a> AdvancedPublisher<'a> { }; let seqnum = match conf.sequencing { - Sequencing::SequenceNumber => Some(AtomicU32::new(0)), + Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))), Sequencing::Timestamp => { if conf.session.hlc().is_none() { bail!( @@ -299,18 +327,44 @@ impl<'a> AdvancedPublisher<'a> { Some( conf.session .liveliness() - .declare_token(prefix / &key_expr) + .declare_token(&prefix / &key_expr) .wait()?, ) } else { None }; + let state_publisher = if let Some(period) = conf.miss_config.and_then(|c| c.state_publisher) + { + if let Some(seqnum) = seqnum.as_ref() { + let seqnum = seqnum.clone(); + + let publisher = conf.session.declare_publisher(prefix / &key_expr).wait()?; + Some(TerminatableTask::spawn_abortable( + ZRuntime::Net, + async move { + loop { + tokio::time::sleep(period).await; + let seqnum = seqnum.load(Ordering::Relaxed); + if seqnum > 0 { + let _ = publisher.put(z_serialize(&(seqnum - 1))).await; + } + } + }, + )) + } else { + None + } + } else { + None + }; + Ok(AdvancedPublisher { publisher, seqnum, cache, _token: token, + _state_publisher: state_publisher, }) } diff --git a/zenoh-ext/src/advanced_subscriber.rs b/zenoh-ext/src/advanced_subscriber.rs index e922c4ecb..763ea8633 100644 --- a/zenoh-ext/src/advanced_subscriber.rs +++ b/zenoh-ext/src/advanced_subscriber.rs @@ -46,7 +46,10 @@ use { zenoh::Result as ZResult, }; -use crate::advanced_cache::{ke_liveliness, KE_UHLC}; +use crate::{ + advanced_cache::{ke_liveliness, KE_UHLC}, + z_deserialize, +}; #[derive(Debug, Default, Clone)] /// Configure query for historical data. @@ -85,11 +88,12 @@ impl HistoryConfig { } } -#[derive(Default)] +#[derive(Default, Clone, Copy)] /// Configure retransmission. #[zenoh_macros::unstable] pub struct RecoveryConfig { periodic_queries: Option, + heartbeat_listener: bool, } impl std::fmt::Debug for RecoveryConfig { @@ -116,6 +120,19 @@ impl RecoveryConfig { self.periodic_queries = period; self } + + /// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher). + /// + /// This allows to periodically receive the last published Sample's sequence number and check for misses. + /// Heartbeat listener must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher) + /// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and + /// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection). + #[zenoh_macros::unstable] + #[inline] + pub fn heartbeat_listener(mut self, enabled: bool) -> Self { + self.heartbeat_listener = enabled; + self + } } /// The builder of an [`AdvancedSubscriber`], allowing to configure it. @@ -441,6 +458,7 @@ pub struct AdvancedSubscriber { subscriber: Subscriber<()>, receiver: Receiver, liveliness_subscriber: Option>, + _heartbeat_subscriber: Option>, } #[zenoh_macros::unstable] @@ -733,12 +751,13 @@ impl AdvancedSubscriber { .wait(); } - let liveliness_subscriber = if let Some(historyconf) = conf.history { + let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() { if historyconf.liveliness { let live_callback = { let session = conf.session.clone(); let statesref = statesref.clone(); let key_expr = key_expr.clone().into_owned(); + let historyconf = historyconf.clone(); move |s: Sample| { if s.kind() == SampleKind::Put { if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) { @@ -921,6 +940,102 @@ impl AdvancedSubscriber { None }; + let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat_listener) { + let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr; + let statesref = statesref.clone(); + let heartbeat_sub = conf + .session + .declare_subscriber(ke_heartbeat_sub) + .callback(move |sample_hb| { + if sample_hb.kind() != SampleKind::Put { + return; + } + + let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr(); + let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else { + return; + }; + let source_id = { + let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else { + return; + }; + let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else { + return; + }; + EntityGlobalId::new(zid, eid) + }; + + let Ok(heartbeat_sn) = z_deserialize::(sample_hb.payload()) else { + tracing::debug!( + "Skipping invalid heartbeat payload on '{}'", + heartbeat_keyexpr + ); + return; + }; + + let mut lock = zlock!(statesref); + let states = &mut *lock; + let entry = states.sequenced_states.entry(source_id); + if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 { + tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr); + return; + } + + // FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample + let state = entry.or_insert(SourceState:: { + last_delivered: None, + pending_queries: 0, + pending_samples: BTreeMap::new(), + }); + // TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time + + // check that it's not an old sn or a pending sample's sn + if (state.last_delivered.is_none() + || state.last_delivered.is_some_and(|sn| heartbeat_sn > sn)) + && !state.pending_samples.contains_key(&heartbeat_sn) + { + let seq_num_range = seq_num_range( + state.last_delivered.map(|s| s + 1), + Some(heartbeat_sn), + ); + + let session = states.session.clone(); + let key_expr = states.key_expr.clone().into_owned(); + let query_target = states.query_target; + let query_timeout = states.query_timeout; + state.pending_queries += 1; + drop(lock); + + let handler = SequencedRepliesHandler { + source_id, + statesref: statesref.clone(), + }; + let _ = session + .get(Selector::from((heartbeat_keyexpr, seq_num_range))) + .callback({ + move |r: Reply| { + if let Ok(s) = r.into_result() { + if key_expr.intersects(s.key_expr()) { + let states = &mut *zlock!(handler.statesref); + handle_sample(states, s); + } + } + } + }) + .consolidation(ConsolidationMode::None) + .accept_replies(ReplyKeyExpr::Any) + .target(query_target) + .timeout(query_timeout) + .wait(); + } + }) + .allowed_origin(conf.origin) + .wait()?; + Some(heartbeat_sub) + } else { + None + }; + if conf.liveliness { let prefix = KE_ADV_PREFIX / KE_SUB @@ -944,6 +1059,7 @@ impl AdvancedSubscriber { subscriber, receiver, liveliness_subscriber, + _heartbeat_subscriber: heartbeat_subscriber, }; Ok(reliable_subscriber) diff --git a/zenoh-ext/src/lib.rs b/zenoh-ext/src/lib.rs index 2d73bd16c..60d805730 100644 --- a/zenoh-ext/src/lib.rs +++ b/zenoh-ext/src/lib.rs @@ -41,7 +41,7 @@ pub use crate::serialization::{ #[allow(deprecated)] pub use crate::{ advanced_cache::{CacheConfig, RepliesConfig}, - advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder}, + advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, MissDetectionConfig}, advanced_subscriber::{ AdvancedSubscriber, AdvancedSubscriberBuilder, HistoryConfig, Miss, RecoveryConfig, SampleMissHandlerUndeclaration, SampleMissListener, SampleMissListenerBuilder, diff --git a/zenoh-ext/src/publisher_ext.rs b/zenoh-ext/src/publisher_ext.rs index de045d0ff..2a4866623 100644 --- a/zenoh-ext/src/publisher_ext.rs +++ b/zenoh-ext/src/publisher_ext.rs @@ -13,7 +13,7 @@ // use zenoh::pubsub::PublisherBuilder; -use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder}; +use crate::{advanced_cache::CacheConfig, AdvancedPublisherBuilder, MissDetectionConfig}; /// Some extensions to the [`zenoh::publication::PublisherBuilder`](zenoh::publication::PublisherBuilder) #[zenoh_macros::unstable] @@ -27,7 +27,10 @@ pub trait AdvancedPublisherBuilderExt<'a, 'b, 'c> { /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled. #[zenoh_macros::unstable] - fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c>; + fn sample_miss_detection( + self, + config: MissDetectionConfig, + ) -> AdvancedPublisherBuilder<'a, 'b, 'c>; /// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber). /// @@ -53,8 +56,11 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilderExt<'a, 'b, 'c> for PublisherBuilder<'a /// /// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is also enabled. #[zenoh_macros::unstable] - fn sample_miss_detection(self) -> AdvancedPublisherBuilder<'a, 'b, 'c> { - AdvancedPublisherBuilder::new(self).sample_miss_detection() + fn sample_miss_detection( + self, + config: MissDetectionConfig, + ) -> AdvancedPublisherBuilder<'a, 'b, 'c> { + AdvancedPublisherBuilder::new(self).sample_miss_detection(config) } /// Allow this publisher to be detected by [`AdvancedSubscribers`](crate::AdvancedSubscriber). diff --git a/zenoh-ext/tests/advanced.rs b/zenoh-ext/tests/advanced.rs index f7d3593c6..6481780b1 100644 --- a/zenoh-ext/tests/advanced.rs +++ b/zenoh-ext/tests/advanced.rs @@ -16,7 +16,7 @@ use zenoh::sample::SampleKind; use zenoh_config::{EndPoint, ModeDependentValue, WhatAmI}; use zenoh_ext::{ AdvancedPublisherBuilderExt, AdvancedSubscriberBuilderExt, CacheConfig, HistoryConfig, - RecoveryConfig, + MissDetectionConfig, RecoveryConfig, }; #[tokio::test(flavor = "multi_thread", worker_threads = 4)] @@ -170,7 +170,7 @@ async fn test_advanced_retransmission() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -299,7 +299,7 @@ async fn test_advanced_retransmission_periodic() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_PERIODIC_KEYEXPR) .cache(CacheConfig::default().max_samples(10)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -421,7 +421,7 @@ async fn test_advanced_sample_miss() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_SAMPLE_MISS_KEYEXPR) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -544,7 +544,7 @@ async fn test_advanced_retransmission_sample_miss() { let publ = ztimeout!(client1 .declare_publisher(ADVANCED_RETRANSMISSION_SAMPLE_MISS_KEYEXPR) .cache(CacheConfig::default().max_samples(1)) - .sample_miss_detection()) + .sample_miss_detection(MissDetectionConfig::default())) .unwrap(); ztimeout!(publ.put("1")).unwrap(); @@ -713,3 +713,128 @@ async fn test_advanced_late_joiner() { router.close().await.unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_advanced_retransmission_heartbeat() { + use std::time::Duration; + + use zenoh::internal::ztimeout; + + const TIMEOUT: Duration = Duration::from_secs(60); + const SLEEP: Duration = Duration::from_secs(1); + const RECONNECT_SLEEP: Duration = Duration::from_secs(5); + const HEARTBEAT_PERIOD: Duration = Duration::from_secs(4); + const ROUTER_ENDPOINT: &str = "tcp/localhost:47456"; + + const ADVANCED_RETRANSMISSION_KEYEXPR: &str = "test/advanced/retransmission"; + + zenoh_util::init_log_from_env_or("error"); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + + let client1 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (1) ZID: {}", s.zid()); + s + }; + + let client2 = { + let mut c = zenoh::Config::default(); + c.connect + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Client)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Client (2) ZID: {}", s.zid()); + s + }; + + let sub = ztimeout!(client2 + .declare_subscriber(ADVANCED_RETRANSMISSION_KEYEXPR) + .recovery(RecoveryConfig::default().heartbeat_listener(true))) + .unwrap(); + tokio::time::sleep(SLEEP).await; + + let publ = ztimeout!(client1 + .declare_publisher(ADVANCED_RETRANSMISSION_KEYEXPR) + .cache(CacheConfig::default().max_samples(10)) + .sample_miss_detection( + MissDetectionConfig::default().last_sample_miss_detection(HEARTBEAT_PERIOD) + )) + .unwrap(); + ztimeout!(publ.put("1")).unwrap(); + + tokio::time::sleep(SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "1"); + + assert!(sub.try_recv().unwrap().is_none()); + + router.close().await.unwrap(); + tokio::time::sleep(SLEEP).await; + + ztimeout!(publ.put("2")).unwrap(); + ztimeout!(publ.put("3")).unwrap(); + ztimeout!(publ.put("4")).unwrap(); + tokio::time::sleep(SLEEP).await; + + assert!(sub.try_recv().unwrap().is_none()); + + let router = { + let mut c = zenoh::Config::default(); + c.listen + .endpoints + .set(vec![ROUTER_ENDPOINT.parse::().unwrap()]) + .unwrap(); + c.scouting.multicast.set_enabled(Some(false)).unwrap(); + let _ = c.set_mode(Some(WhatAmI::Router)); + let s = ztimeout!(zenoh::open(c)).unwrap(); + tracing::info!("Router ZID: {}", s.zid()); + s + }; + tokio::time::sleep(RECONNECT_SLEEP).await; + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "2"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "3"); + + let sample = ztimeout!(sub.recv_async()).unwrap(); + assert_eq!(sample.kind(), SampleKind::Put); + assert_eq!(sample.payload().try_to_string().unwrap().as_ref(), "4"); + + assert!(sub.try_recv().unwrap().is_none()); + + publ.undeclare().await.unwrap(); + // sub.undeclare().await.unwrap(); + + client1.close().await.unwrap(); + client2.close().await.unwrap(); + + router.close().await.unwrap(); +}