Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add advanced pub/sub last sample miss detection #1701

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions zenoh-ext/examples/examples/z_advanced_pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::time::Duration;
use clap::{arg, Parser};
use zenoh::{config::Config, key_expr::KeyExpr};
use zenoh_config::ModeDependentValue;
use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig};
use zenoh_ext::{AdvancedPublisherBuilderExt, CacheConfig, MissDetectionConfig};
use zenoh_ext_examples::CommonArgs;

#[tokio::main]
Expand All @@ -33,7 +33,9 @@ async fn main() {
let publisher = session
.declare_publisher(&key_expr)
.cache(CacheConfig::default().max_samples(history))
.sample_miss_detection()
.sample_miss_detection(
MissDetectionConfig::default().last_sample_miss_detection(Duration::from_secs(5)),
)
.publisher_detection()
.await
.unwrap();
Expand Down
6 changes: 5 additions & 1 deletion zenoh-ext/examples/examples/z_advanced_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,11 @@ async fn main() {
let subscriber = session
.declare_subscriber(key_expr)
.history(HistoryConfig::default().detect_late_publishers())
.recovery(RecoveryConfig::default().periodic_queries(Some(Duration::from_secs(1))))
.recovery(
RecoveryConfig::default()
.periodic_queries(Some(Duration::from_secs(1)))
.heartbeat_listener(true),
)
.subscriber_detection()
.await
.unwrap();
Expand Down
66 changes: 60 additions & 6 deletions zenoh-ext/src/advanced_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@
//
use std::{
future::{IntoFuture, Ready},
sync::atomic::{AtomicU32, Ordering},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
time::Duration,
};

use zenoh::{
bytes::{Encoding, OptionZBytes, ZBytes},
internal::{
bail,
runtime::ZRuntime,
traits::{
EncodingBuilderTrait, QoSBuilderTrait, SampleBuilderTrait, TimestampBuilderTrait,
},
TerminatableTask,
},
key_expr::{keyexpr, KeyExpr},
liveliness::LivelinessToken,
Expand All @@ -37,7 +43,10 @@ use zenoh::{
};
use zenoh_macros::ke;

use crate::advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC};
use crate::{
advanced_cache::{AdvancedCache, AdvancedCacheBuilder, CacheConfig, KE_UHLC},
z_serialize,
};

pub(crate) static KE_PUB: &keyexpr = ke!("pub");

Expand All @@ -49,6 +58,21 @@ pub(crate) enum Sequencing {
SequenceNumber,
}

#[derive(Default)]
#[zenoh_macros::unstable]
pub struct MissDetectionConfig {
pub(crate) state_publisher: Option<Duration>,
}

#[zenoh_macros::unstable]
impl MissDetectionConfig {
#[zenoh_macros::unstable]
pub fn last_sample_miss_detection(mut self, period: Duration) -> Self {
self.state_publisher = Some(period);
self
}
}

/// The builder of PublicationCache, allowing to configure it.
#[must_use = "Resolvables do nothing unless you resolve them using the `res` method from either `SyncResolve` or `AsyncResolve`"]
#[zenoh_macros::unstable]
Expand All @@ -63,6 +87,7 @@ pub struct AdvancedPublisherBuilder<'a, 'b, 'c> {
is_express: bool,
meta_key_expr: Option<ZResult<KeyExpr<'c>>>,
sequencing: Sequencing,
miss_config: Option<MissDetectionConfig>,
liveliness: bool,
cache: bool,
history: CacheConfig,
Expand All @@ -83,6 +108,7 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
is_express: builder.is_express,
meta_key_expr: None,
sequencing: Sequencing::None,
miss_config: None,
liveliness: false,
cache: false,
history: CacheConfig::default(),
Expand Down Expand Up @@ -118,8 +144,9 @@ impl<'a, 'b, 'c> AdvancedPublisherBuilder<'a, 'b, 'c> {
///
/// Retransmission can only be achieved if [`cache`](crate::AdvancedPublisherBuilder::cache) is enabled.
#[zenoh_macros::unstable]
pub fn sample_miss_detection(mut self) -> Self {
pub fn sample_miss_detection(mut self, config: MissDetectionConfig) -> Self {
self.sequencing = Sequencing::SequenceNumber;
self.miss_config = Some(config);
self
}

Expand Down Expand Up @@ -230,9 +257,10 @@ impl IntoFuture for AdvancedPublisherBuilder<'_, '_, '_> {
#[zenoh_macros::unstable]
pub struct AdvancedPublisher<'a> {
publisher: Publisher<'a>,
seqnum: Option<AtomicU32>,
seqnum: Option<Arc<AtomicU32>>,
cache: Option<AdvancedCache>,
_token: Option<LivelinessToken>,
_state_publisher: Option<TerminatableTask>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -270,7 +298,7 @@ impl<'a> AdvancedPublisher<'a> {
};

let seqnum = match conf.sequencing {
Sequencing::SequenceNumber => Some(AtomicU32::new(0)),
Sequencing::SequenceNumber => Some(Arc::new(AtomicU32::new(0))),
Sequencing::Timestamp => {
if conf.session.hlc().is_none() {
bail!(
Expand Down Expand Up @@ -299,18 +327,44 @@ impl<'a> AdvancedPublisher<'a> {
Some(
conf.session
.liveliness()
.declare_token(prefix / &key_expr)
.declare_token(&prefix / &key_expr)
.wait()?,
)
} else {
None
};

let state_publisher = if let Some(period) = conf.miss_config.and_then(|c| c.state_publisher)
{
if let Some(seqnum) = seqnum.as_ref() {
let seqnum = seqnum.clone();

let publisher = conf.session.declare_publisher(prefix / &key_expr).wait()?;
Some(TerminatableTask::spawn_abortable(
ZRuntime::Net,
async move {
loop {
tokio::time::sleep(period).await;
let seqnum = seqnum.load(Ordering::Relaxed);
if seqnum > 0 {
let _ = publisher.put(z_serialize(&(seqnum - 1))).await;
}
}
},
))
} else {
None
}
} else {
None
};

Ok(AdvancedPublisher {
publisher,
seqnum,
cache,
_token: token,
_state_publisher: state_publisher,
})
}

Expand Down
122 changes: 119 additions & 3 deletions zenoh-ext/src/advanced_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ use {
zenoh::Result as ZResult,
};

use crate::advanced_cache::{ke_liveliness, KE_UHLC};
use crate::{
advanced_cache::{ke_liveliness, KE_UHLC},
z_deserialize,
};

#[derive(Debug, Default, Clone)]
/// Configure query for historical data.
Expand Down Expand Up @@ -85,11 +88,12 @@ impl HistoryConfig {
}
}

#[derive(Default)]
#[derive(Default, Clone, Copy)]
/// Configure retransmission.
#[zenoh_macros::unstable]
pub struct RecoveryConfig {
periodic_queries: Option<Duration>,
heartbeat_listener: bool,
}

impl std::fmt::Debug for RecoveryConfig {
Expand All @@ -116,6 +120,19 @@ impl RecoveryConfig {
self.periodic_queries = period;
self
}

/// Subscribe to heartbeats of [`AdvancedPublishers`](crate::AdvancedPublisher).
///
/// This allows to periodically receive the last published Sample's sequence number and check for misses.
/// Heartbeat listener must be paired with [`AdvancedPublishers`](crate::AdvancedPublisher)
/// that enable [`cache`](crate::AdvancedPublisherBuilder::cache) and
/// [`sample_miss_detection`](crate::AdvancedPublisherBuilder::sample_miss_detection).
#[zenoh_macros::unstable]
#[inline]
pub fn heartbeat_listener(mut self, enabled: bool) -> Self {
self.heartbeat_listener = enabled;
self
}
}

/// The builder of an [`AdvancedSubscriber`], allowing to configure it.
Expand Down Expand Up @@ -441,6 +458,7 @@ pub struct AdvancedSubscriber<Receiver> {
subscriber: Subscriber<()>,
receiver: Receiver,
liveliness_subscriber: Option<Subscriber<()>>,
_heartbeat_subscriber: Option<Subscriber<()>>,
}

#[zenoh_macros::unstable]
Expand Down Expand Up @@ -733,12 +751,13 @@ impl<Handler> AdvancedSubscriber<Handler> {
.wait();
}

let liveliness_subscriber = if let Some(historyconf) = conf.history {
let liveliness_subscriber = if let Some(historyconf) = conf.history.as_ref() {
if historyconf.liveliness {
let live_callback = {
let session = conf.session.clone();
let statesref = statesref.clone();
let key_expr = key_expr.clone().into_owned();
let historyconf = historyconf.clone();
move |s: Sample| {
if s.kind() == SampleKind::Put {
if let Ok(parsed) = ke_liveliness::parse(s.key_expr().as_keyexpr()) {
Expand Down Expand Up @@ -921,6 +940,102 @@ impl<Handler> AdvancedSubscriber<Handler> {
None
};

let heartbeat_subscriber = if retransmission.is_some_and(|r| r.heartbeat_listener) {
let ke_heartbeat_sub = KE_ADV_PREFIX / KE_PUB / KE_STARSTAR / KE_AT / &key_expr;
let statesref = statesref.clone();
let heartbeat_sub = conf
.session
.declare_subscriber(ke_heartbeat_sub)
.callback(move |sample_hb| {
if sample_hb.kind() != SampleKind::Put {
return;
}

let heartbeat_keyexpr = sample_hb.key_expr().as_keyexpr();
let Ok(parsed_keyexpr) = ke_liveliness::parse(heartbeat_keyexpr) else {
return;
};
let source_id = {
let Ok(zid) = ZenohId::from_str(parsed_keyexpr.zid().as_str()) else {
return;
};
let Ok(eid) = EntityId::from_str(parsed_keyexpr.eid().as_str()) else {
return;
};
EntityGlobalId::new(zid, eid)
};

let Ok(heartbeat_sn) = z_deserialize::<u32>(sample_hb.payload()) else {
tracing::debug!(
"Skipping invalid heartbeat payload on '{}'",
heartbeat_keyexpr
);
return;
};

let mut lock = zlock!(statesref);
let states = &mut *lock;
let entry = states.sequenced_states.entry(source_id);
if matches!(&entry, Entry::Vacant(_)) && states.global_pending_queries > 0 {
tracing::debug!("Skipping heartbeat on '{}' from publisher that is currently being pulled by liveliness task", heartbeat_keyexpr);
return;
}

// FIXME: This breaks vacancy check in handle_sample: spawning periodic queries will not occur if heartbeat sample is received before data sample
let state = entry.or_insert(SourceState::<u32> {
last_delivered: None,
pending_queries: 0,
pending_samples: BTreeMap::new(),
});
// TODO: add state to avoid sending multiple queries for the same heartbeat if its periodicity is higher than the query response time

// check that it's not an old sn or a pending sample's sn
if (state.last_delivered.is_none()
|| state.last_delivered.is_some_and(|sn| heartbeat_sn > sn))
&& !state.pending_samples.contains_key(&heartbeat_sn)
{
Comment on lines +990 to +996
Copy link
Contributor Author

@oteffahi oteffahi Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO could be covered by not sending a query unless there are no pending queries (i.e no pending queries from heartbeat, periodic queries, or miss detection queries). Does that sound right @OlivierHecart ?

Copy link
Contributor Author

@oteffahi oteffahi Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hypothetically speaking, doing so might risk that (in some potential high-load cases) heartbeats would get denied by other querying mechanisms and never result in a heartbeat query...

let seq_num_range = seq_num_range(
state.last_delivered.map(|s| s + 1),
Some(heartbeat_sn),
);

let session = states.session.clone();
let key_expr = states.key_expr.clone().into_owned();
let query_target = states.query_target;
let query_timeout = states.query_timeout;
state.pending_queries += 1;
drop(lock);

let handler = SequencedRepliesHandler {
source_id,
statesref: statesref.clone(),
};
let _ = session
.get(Selector::from((heartbeat_keyexpr, seq_num_range)))
.callback({
move |r: Reply| {
if let Ok(s) = r.into_result() {
if key_expr.intersects(s.key_expr()) {
let states = &mut *zlock!(handler.statesref);
handle_sample(states, s);
}
}
}
})
.consolidation(ConsolidationMode::None)
.accept_replies(ReplyKeyExpr::Any)
.target(query_target)
.timeout(query_timeout)
.wait();
}
})
.allowed_origin(conf.origin)
.wait()?;
Some(heartbeat_sub)
} else {
None
};

if conf.liveliness {
let prefix = KE_ADV_PREFIX
/ KE_SUB
Expand All @@ -944,6 +1059,7 @@ impl<Handler> AdvancedSubscriber<Handler> {
subscriber,
receiver,
liveliness_subscriber,
_heartbeat_subscriber: heartbeat_subscriber,
};

Ok(reliable_subscriber)
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub use crate::serialization::{
#[allow(deprecated)]
pub use crate::{
advanced_cache::{CacheConfig, RepliesConfig},
advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder},
advanced_publisher::{AdvancedPublisher, AdvancedPublisherBuilder, MissDetectionConfig},
advanced_subscriber::{
AdvancedSubscriber, AdvancedSubscriberBuilder, HistoryConfig, Miss, RecoveryConfig,
SampleMissHandlerUndeclaration, SampleMissListener, SampleMissListenerBuilder,
Expand Down
Loading
Loading