From 6c16b498cf2200dd8f8bbee848a1a7339249ef7d Mon Sep 17 00:00:00 2001 From: Joseph Perez Date: Thu, 30 Jan 2025 09:58:53 +0100 Subject: [PATCH] refactor: remove precomputed matches in routing tree Before, everytime a node was inserted in the tree, all the matching nodes were computed and the list was added to it and maintained each time the tree is modified. This PR remove this matches precomputation to only compute them when it is needed. --- zenoh/src/net/routing/dispatcher/interests.rs | 8 +- zenoh/src/net/routing/dispatcher/pubsub.rs | 28 +-- zenoh/src/net/routing/dispatcher/queries.rs | 27 +-- zenoh/src/net/routing/dispatcher/resource.rs | 188 ++++++++---------- zenoh/src/net/routing/dispatcher/token.rs | 15 +- zenoh/src/net/routing/hat/client/mod.rs | 28 +-- zenoh/src/net/routing/hat/client/pubsub.rs | 23 +-- zenoh/src/net/routing/hat/client/queries.rs | 21 +- .../src/net/routing/hat/linkstate_peer/mod.rs | 28 +-- .../net/routing/hat/linkstate_peer/pubsub.rs | 87 ++++---- .../net/routing/hat/linkstate_peer/queries.rs | 121 +++++------ .../net/routing/hat/linkstate_peer/token.rs | 141 ++++++------- zenoh/src/net/routing/hat/p2p_peer/mod.rs | 28 +-- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 76 +++---- zenoh/src/net/routing/hat/p2p_peer/queries.rs | 76 +++---- zenoh/src/net/routing/hat/p2p_peer/token.rs | 165 +++++++-------- zenoh/src/net/routing/hat/router/mod.rs | 28 +-- zenoh/src/net/routing/hat/router/pubsub.rs | 80 +++----- zenoh/src/net/routing/hat/router/queries.rs | 117 +++++------ zenoh/src/net/routing/hat/router/token.rs | 171 +++++++--------- zenoh/src/net/tests/tables.rs | 7 +- 21 files changed, 578 insertions(+), 885 deletions(-) diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index a2cbcfeaf2..119f499a76 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -20,7 +20,6 @@ use std::{ use async_trait::async_trait; use tokio_util::sync::CancellationToken; -use zenoh_keyexpr::keyexpr; use zenoh_protocol::{ core::WireExpr, network::{ @@ -220,15 +219,10 @@ pub(crate) fn declare_interest( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables_ref.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index d19373528b..f00db6aa50 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -18,11 +18,10 @@ use std::sync::Arc; use zenoh_core::zread; use zenoh_protocol::{ - core::{key_expr::keyexpr, Reliability, WireExpr}, + core::{Reliability, WireExpr}, network::{declare::SubscriberId, push::ext, Push}, zenoh::PushBody, }; -use zenoh_sync::get_mut_unchecked; use super::{ face::FaceState, @@ -72,15 +71,10 @@ pub(crate) fn declare_subscription( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; @@ -156,16 +150,16 @@ pub(crate) fn undeclare_subscription( } } -pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc) { +pub(crate) fn disable_matches_data_routes(tables: &mut Tables, res: &mut Arc) { if res.context.is_some() { - get_mut_unchecked(res).context_mut().disable_data_routes(); - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - } + if let Some(key_expr) = res.key_expr() { + Resource::iter_matches(&tables.root_res, key_expr, |m| { + if !Arc::ptr_eq(res, m) { + unsafe { &mut *Arc::as_ptr(m).cast_mut() } + .context_mut() + .disable_data_routes() + } + }) } } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 17b30bef31..e52eccd80e 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -23,7 +23,7 @@ use zenoh_buffers::ZBuf; #[cfg(feature = "stats")] use zenoh_protocol::zenoh::reply::ReplyBody; use zenoh_protocol::{ - core::{key_expr::keyexpr, Encoding, WireExpr}, + core::{Encoding, WireExpr}, network::{ declare::{ext, queryable::ext::QueryableInfoType, QueryableId}, request::{ @@ -99,15 +99,10 @@ pub(crate) fn declare_queryable( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; @@ -339,16 +334,16 @@ impl Timed for QueryCleanup { } } -pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc) { +pub(crate) fn disable_matches_query_routes(tables: &mut Tables, res: &mut Arc) { if res.context.is_some() { - get_mut_unchecked(res).context_mut().disable_query_routes(); - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - } + if let Some(key_expr) = res.key_expr() { + Resource::iter_matches(&tables.root_res, key_expr, |m| { + if !Arc::ptr_eq(res, m) { + unsafe { &mut *Arc::as_ptr(m).cast_mut() } + .context_mut() + .disable_query_routes() + } + }) } } } diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 010495f841..50ede33677 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -17,7 +17,7 @@ use std::{ collections::HashMap, convert::TryInto, hash::{Hash, Hasher}, - sync::{Arc, RwLock, Weak}, + sync::{Arc, RwLock}, }; use zenoh_config::WhatAmI; @@ -172,7 +172,6 @@ pub(crate) type DataRoutes = Routes>; pub(crate) type QueryRoutes = Routes>; pub(crate) struct ResourceContext { - pub(crate) matches: Vec>, pub(crate) hat: Box, pub(crate) data_routes: RwLock, pub(crate) query_routes: RwLock, @@ -181,7 +180,6 @@ pub(crate) struct ResourceContext { impl ResourceContext { fn new(hat: Box) -> ResourceContext { ResourceContext { - matches: Vec::new(), hat, data_routes: Default::default(), query_routes: Default::default(), @@ -252,6 +250,15 @@ impl Resource { &self.expr } + pub(crate) fn key_expr(&self) -> Option<&keyexpr> { + if !self.expr.is_empty() { + // SAFETY: resource full expr should be a valid keyexpr + Some(unsafe { keyexpr::from_str_unchecked(&self.expr) }) + } else { + None + } + } + #[inline(always)] pub(crate) fn context(&self) -> &ResourceContext { self.context.as_ref().unwrap() @@ -264,12 +271,12 @@ impl Resource { #[inline(always)] pub(crate) fn matches(&self, other: &Arc) -> bool { - self.context - .as_ref() - .unwrap() - .matches - .iter() - .any(|m| m.upgrade().is_some_and(|m| &m == other)) + // SAFETY: resource expr is supposed to be a valid keyexpr + (!self.expr.is_empty() && !other.expr.is_empty()) + && unsafe { + keyexpr::from_str_unchecked(&self.expr) + .intersects(keyexpr::from_str_unchecked(&other.expr)) + } } pub fn nonwild_prefix(res: &Arc) -> (Option>, String) { @@ -304,18 +311,6 @@ impl Resource { if Arc::strong_count(res) <= 3 && res.children.is_empty() { // consider only childless resource held by only one external object (+ 1 strong count for resclone, + 1 strong count for res.parent to a total of 3 ) tracing::debug!("Unregister resource {}", res.expr()); - if let Some(context) = mutres.context.as_mut() { - for match_ in &mut context.matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { - let mutmatch = get_mut_unchecked(&mut match_); - if let Some(ctx) = mutmatch.context.as_mut() { - ctx.matches - .retain(|x| !Arc::ptr_eq(&x.upgrade().unwrap(), res)); - } - } - } - } mutres.nonwild_prefix.take(); { get_mut_unchecked(parent).children.remove(&res.suffix); @@ -571,104 +566,101 @@ impl Resource { .unwrap_or_else(|| [&self.expr, suffix].concat().into()) } - pub fn get_matches(tables: &Tables, key_expr: &keyexpr) -> Vec> { - fn recursive_push(from: &Arc, matches: &mut Vec>) { - if from.context.is_some() { - matches.push(Arc::downgrade(from)); - } - for child in from.children.values() { - recursive_push(child, matches) + pub(crate) fn any_matches( + root: &Arc, + key_expr: &keyexpr, + mut f: impl FnMut(&Arc) -> bool, + ) -> bool { + macro_rules! return_if_true { + ($expr:expr) => { + if $expr { + return true; + } + }; + } + fn any_matches_trailing_wildcard( + res: &Arc, + f: &mut impl FnMut(&Arc) -> bool, + ) -> bool { + return_if_true!(res.context.is_some() && f(res)); + for child in res.children.values() { + return_if_true!(any_matches_trailing_wildcard(child, f)); } + false } - fn get_matches_from( + fn any_matches_rec( key_expr: &keyexpr, - from: &Arc, - matches: &mut Vec>, - ) { - if from.parent.is_none() || from.suffix == "/" { - for child in from.children.values() { - get_matches_from(key_expr, child, matches); + res: &Arc, + f: &mut impl FnMut(&Arc) -> bool, + ) -> bool { + if res.parent.is_none() || res.suffix == "/" { + for child in res.children.values() { + return_if_true!(any_matches_rec(key_expr, child, f)); } - return; + return false; } - let suffix: &keyexpr = from + let suffix: &keyexpr = res .suffix .strip_prefix('/') - .unwrap_or(&from.suffix) + .unwrap_or(&res.suffix) .try_into() .unwrap(); let (chunk, rest) = Resource::fst_chunk(key_expr); if chunk.intersects(suffix) { match rest { + None if chunk == "**" => return any_matches_trailing_wildcard(res, f), None => { - if chunk.as_bytes() == b"**" { - recursive_push(from, matches) - } else { - if from.context.is_some() { - matches.push(Arc::downgrade(from)); - } - if suffix.as_bytes() == b"**" { - for child in from.children.values() { - get_matches_from(key_expr, child, matches) - } - } - if let Some(child) = - from.children.get("/**").or_else(|| from.children.get("**")) - { - if child.context.is_some() { - matches.push(Arc::downgrade(child)) - } + return_if_true!(res.context.is_some() && f(res)); + if suffix == "**" { + for child in res.children.values() { + return_if_true!(any_matches_rec(key_expr, child, f)); } } + if let Some(child) = + res.children.get("/**").or_else(|| res.children.get("**")) + { + return_if_true!(child.context.is_some() && f(child)); + } + } + Some(rest) if rest == "**" => { + return_if_true!(any_matches_trailing_wildcard(res, f)) } - Some(rest) if rest.as_bytes() == b"**" => recursive_push(from, matches), Some(rest) => { - let recheck_keyexpr_one_level_lower = - chunk.as_bytes() == b"**" || suffix.as_bytes() == b"**"; - for child in from.children.values() { - get_matches_from(rest, child, matches); + let recheck_keyexpr_one_level_lower = chunk == "**" || suffix == "**"; + for child in res.children.values() { + return_if_true!(any_matches_rec(rest, child, f)); if recheck_keyexpr_one_level_lower { - get_matches_from(key_expr, child, matches) + return_if_true!(any_matches_rec(key_expr, child, f)); } } if recheck_keyexpr_one_level_lower { - get_matches_from(rest, from, matches) + return_if_true!(any_matches_rec(rest, res, f)); } } }; } + false } - let mut matches = Vec::new(); - get_matches_from(key_expr, &tables.root_res, &mut matches); - let mut i = 0; - while i < matches.len() { - let current = matches[i].as_ptr(); - let mut j = i + 1; - while j < matches.len() { - if std::ptr::eq(current, matches[j].as_ptr()) { - matches.swap_remove(j); - } else { - j += 1 - } - } - i += 1 - } - matches + any_matches_rec(key_expr, root, &mut f) } - pub fn match_resource(_tables: &Tables, res: &mut Arc, matches: Vec>) { - if res.context.is_some() { - for match_ in &matches { - let mut match_ = match_.upgrade().unwrap(); - get_mut_unchecked(&mut match_) - .context_mut() - .matches - .push(Arc::downgrade(res)); - } - get_mut_unchecked(res).context_mut().matches = matches; - } else { - tracing::error!("Call match_resource() on context less res {}", res.expr()); - } + pub(crate) fn iter_matches( + root: &Arc, + key_expr: &keyexpr, + mut f: impl FnMut(&Arc), + ) { + Self::any_matches(root, key_expr, |res| { + f(res); + false + }); + } + + pub(crate) fn get_matches(root: &Arc, key_expr: &keyexpr) -> Vec> { + let mut vec = Vec::new(); + Self::iter_matches(root, key_expr, |res| vec.push(res.clone())); + vec.sort_unstable_by_key(|res| Arc::as_ptr(res)); + vec.dedup_by_key(|res| Arc::as_ptr(res)); + vec } pub fn upgrade_resource(res: &mut Arc, hat: Box) { @@ -726,15 +718,10 @@ pub(crate) fn register_expr( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; let ctx = get_mut_unchecked(&mut res) @@ -791,15 +778,10 @@ pub(crate) fn register_expr_interest( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; get_mut_unchecked(face) diff --git a/zenoh/src/net/routing/dispatcher/token.rs b/zenoh/src/net/routing/dispatcher/token.rs index 23625ff8ef..2d6b384f40 100644 --- a/zenoh/src/net/routing/dispatcher/token.rs +++ b/zenoh/src/net/routing/dispatcher/token.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use zenoh_keyexpr::keyexpr; use zenoh_protocol::{ core::WireExpr, network::{ @@ -65,15 +64,10 @@ pub(crate) fn declare_token( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; @@ -125,18 +119,13 @@ pub(crate) fn undeclare_token( // TODO this could be improved let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.wire_expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = Resource::make_resource( + let res = Resource::make_resource( &mut wtables, &mut prefix, expr.wire_expr.suffix.as_ref(), ); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (Some(res), wtables) } } diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index cf25f6a682..db2fc9c860 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -168,19 +168,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -190,19 +180,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 8e1a6a43a7..37884abcbd 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -377,16 +376,7 @@ impl HatPubSubTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() && context.face.whatami == WhatAmI::Client { route.entry(*sid).or_insert_with(|| { @@ -433,16 +423,7 @@ impl HatPubSubTrait for HatCode { } } - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() && context.face.whatami == WhatAmI::Client { matching_subscriptions diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 855207cd53..2d7afecb41 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -397,15 +396,7 @@ impl HatQueriesTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); for (sid, context) in &mres.session_ctxs { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); @@ -471,15 +462,7 @@ impl HatQueriesTrait for HatCode { } } - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index fc9640c73b..e88e41d5cf 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -304,19 +304,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -326,19 +316,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index ce73ac3407..912008e472 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc}, }; @@ -332,10 +331,10 @@ fn simple_subs(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_subs(res: &Arc, face: &Arc) -> bool { +fn remote_simple_subs(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.subs.is_some()) } #[inline] @@ -405,21 +404,18 @@ fn propagate_forget_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, &face) || remote_linkstatepeer_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + && (remote_simple_subs(&m, face_id) + || remote_linkstatepeer_subs(tables, &m)) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -434,9 +430,11 @@ fn propagate_forget_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -561,22 +559,18 @@ pub(super) fn undeclare_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, face) + && (remote_simple_subs(&m, face_id) || remote_linkstatepeer_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -593,9 +587,11 @@ pub(super) fn undeclare_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -704,7 +700,8 @@ pub(super) fn declare_sub_interest( if hat!(tables).linkstatepeer_subs.iter().any(|sub| { sub.context.is_some() && sub.matches(res) - && (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub)) + && (remote_simple_subs(sub, face.id) + || remote_linkstatepeer_subs(tables, sub)) }) { let id = make_sub_id(res, face, mode); let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); @@ -729,7 +726,8 @@ pub(super) fn declare_sub_interest( for sub in &hat!(tables).linkstatepeer_subs { if sub.context.is_some() && sub.matches(res) - && (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub)) + && (remote_simple_subs(sub, face.id) + || remote_linkstatepeer_subs(tables, sub)) { let id = make_sub_id(sub, face, mode); let wire_expr = @@ -756,7 +754,7 @@ pub(super) fn declare_sub_interest( } else { for sub in &hat!(tables).linkstatepeer_subs { if sub.context.is_some() - && (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub)) + && (remote_simple_subs(sub, face.id) || remote_linkstatepeer_subs(tables, sub)) { let id = make_sub_id(sub, face, mode); let wire_expr = Resource::decl_key(sub, face, face.whatami != WhatAmI::Client); @@ -935,16 +933,8 @@ impl HatPubSubTrait for HatCode { return Arc::new(route); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); let peer_source = match source_type { WhatAmI::Router | WhatAmI::Peer => source, @@ -1022,16 +1012,7 @@ impl HatPubSubTrait for HatCode { } tracing::trace!("get_matching_subscriptions({})", key_expr,); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); insert_faces_for_subs( &mut matching_subscriptions, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index e4c3cfca78..2fa1157368 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -348,10 +347,10 @@ fn simple_qabls(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_qabls(res: &Arc, face: &Arc) -> bool { +fn remote_simple_qabls(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.qabl.is_some()) } #[inline] @@ -421,39 +420,39 @@ fn propagate_forget_simple_queryable( ), ); } - for res in face_hat!(&mut face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face) .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_qabls(&m, &face) - || remote_linkstatepeer_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_qabls(&m, face_id) + || remote_linkstatepeer_qabls(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -565,7 +564,7 @@ pub(super) fn undeclare_simple_queryable( } if simple_qabls.len() == 1 && !linkstatepeer_qabls { - let mut face = &mut simple_qabls[0]; + let face = &mut simple_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { send_declare( &face.primitives, @@ -584,22 +583,18 @@ pub(super) fn undeclare_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_qabls(&m, face) + && (remote_simple_qabls(&m, face_id) || remote_linkstatepeer_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -614,9 +609,11 @@ pub(super) fn undeclare_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -774,7 +771,7 @@ pub(super) fn declare_qabl_interest( if hat!(tables).linkstatepeer_qabls.iter().any(|qabl| { qabl.context.is_some() && qabl.matches(res) - && (remote_simple_qabls(qabl, face) + && (remote_simple_qabls(qabl, face.id) || remote_linkstatepeer_qabls(tables, qabl)) }) { let info = local_qabl_info(tables, res, face); @@ -802,7 +799,7 @@ pub(super) fn declare_qabl_interest( for qabl in hat!(tables).linkstatepeer_qabls.iter() { if qabl.context.is_some() && qabl.matches(res) - && (remote_simple_qabls(qabl, face) + && (remote_simple_qabls(qabl, face.id) || remote_linkstatepeer_qabls(tables, qabl)) { let info = local_qabl_info(tables, qabl, face); @@ -832,7 +829,8 @@ pub(super) fn declare_qabl_interest( } else { for qabl in hat!(tables).linkstatepeer_qabls.iter() { if qabl.context.is_some() - && (remote_simple_qabls(qabl, face) || remote_linkstatepeer_qabls(tables, qabl)) + && (remote_simple_qabls(qabl, face.id) + || remote_linkstatepeer_qabls(tables, qabl)) { let info = local_qabl_info(tables, qabl, face); let id = make_qabl_id(qabl, face, mode, info); @@ -976,15 +974,8 @@ impl HatQueriesTrait for HatCode { return EMPTY_ROUTE.clone(); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); @@ -1042,15 +1033,7 @@ impl HatQueriesTrait for HatCode { complete ); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/token.rs b/zenoh/src/net/routing/hat/linkstate_peer/token.rs index 193d383603..bb46b7fe38 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/token.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/token.rs @@ -304,10 +304,15 @@ fn simple_tokens(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_tokens(tables: &Tables, res: &Arc, face: &Arc) -> bool { +fn remote_simple_tokens( + tables: &Tables, + res: &Arc, + face_id: usize, + face_zid: ZenohIdProto, +) -> bool { res.session_ctxs .values() - .any(|ctx| (ctx.face.id != face.id || face.zid == tables.zid) && ctx.token) + .any(|ctx| (ctx.face.id != face_id || face_zid == tables.zid) && ctx.token) } #[inline] @@ -377,39 +382,40 @@ fn propagate_forget_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, &face) - || remote_linkstatepeer_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -532,39 +538,40 @@ pub(super) fn undeclare_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, face) - || remote_linkstatepeer_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } } @@ -669,7 +676,7 @@ pub(crate) fn declare_token_interest( if hat!(tables).linkstatepeer_tokens.iter().any(|token| { token.context.is_some() && token.matches(res) - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token)) }) { let id = make_token_id(res, face, mode); @@ -692,7 +699,7 @@ pub(crate) fn declare_token_interest( for token in &hat!(tables).linkstatepeer_tokens { if token.context.is_some() && token.matches(res) - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token)) { let id = make_token_id(token, face, mode); @@ -717,7 +724,7 @@ pub(crate) fn declare_token_interest( } else { for token in &hat!(tables).linkstatepeer_tokens { if token.context.is_some() - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token)) { let id = make_token_id(token, face, mode); diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 4986d3edbe..f0ae212b00 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -256,19 +256,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -278,19 +268,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index c799476395..85a72f3624 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -230,10 +229,10 @@ fn simple_subs(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_subs(res: &Arc, face: &Arc) -> bool { +fn remote_simple_subs(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.subs.is_some()) } fn propagate_forget_simple_subscription( @@ -260,19 +259,16 @@ fn propagate_forget_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_subs(&m, &face)) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_subs(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -287,9 +283,11 @@ fn propagate_forget_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -329,19 +327,16 @@ pub(super) fn undeclare_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_subs(&m, face)) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_subs(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -356,9 +351,11 @@ pub(super) fn undeclare_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -653,16 +650,7 @@ impl HatPubSubTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() && (source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client) @@ -698,16 +686,8 @@ impl HatPubSubTrait for HatCode { return matching_subscriptions; } tracing::trace!("get_matching_subscriptions({})", key_expr,); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() { matching_subscriptions diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index d211ee9b9a..0b250ced89 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -199,10 +198,10 @@ fn simple_qabls(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_qabls(res: &Arc, face: &Arc) -> bool { +fn remote_simple_qabls(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.qabl.is_some()) } fn propagate_forget_simple_queryable( @@ -229,19 +228,16 @@ fn propagate_forget_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_qabls(&m, face)) - }) { - if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_qabls(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -256,9 +252,11 @@ fn propagate_forget_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -284,7 +282,7 @@ pub(super) fn undeclare_simple_queryable( propagate_simple_queryable(tables, res, None, send_declare); } if simple_qabls.len() == 1 { - let mut face = &mut simple_qabls[0]; + let face = &mut simple_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { send_declare( &face.primitives, @@ -303,19 +301,16 @@ pub(super) fn undeclare_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && (remote_simple_qabls(&m, face))) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_qabls(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -330,9 +325,11 @@ pub(super) fn undeclare_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -647,15 +644,7 @@ impl HatQueriesTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); for (sid, context) in &mres.session_ctxs { if source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client { @@ -696,15 +685,8 @@ impl HatQueriesTrait for HatCode { key_expr, complete ); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index d729907619..fa1d7adb39 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -15,11 +15,14 @@ use std::sync::{atomic::Ordering, Arc}; use zenoh_config::WhatAmI; -use zenoh_protocol::network::{ - declare::{common::ext::WireExprType, TokenId}, - ext, - interest::{InterestId, InterestMode}, - Declare, DeclareBody, DeclareToken, UndeclareToken, +use zenoh_protocol::{ + core::ZenohIdProto, + network::{ + declare::{common::ext::WireExprType, TokenId}, + ext, + interest::{InterestId, InterestMode}, + Declare, DeclareBody, DeclareToken, UndeclareToken, + }, }; use zenoh_sync::get_mut_unchecked; @@ -242,10 +245,15 @@ fn simple_tokens(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_tokens(tables: &Tables, res: &Arc, face: &Arc) -> bool { +fn remote_simple_tokens( + tables: &Tables, + res: &Arc, + face_id: usize, + face_zid: ZenohIdProto, +) -> bool { res.session_ctxs .values() - .any(|ctx| (ctx.face.id != face.id || face.zid == tables.zid) && ctx.token) + .any(|ctx| (ctx.face.id != face_id || face_zid == tables.zid) && ctx.token) } fn propagate_forget_simple_token( @@ -299,61 +307,38 @@ fn propagate_forget_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_tokens(tables, &m, &face)) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); - } else if face_hat!(face) - .remote_interests - .values() - .any(|i| i.options.tokens() && i.matches(&res) && !i.options.aggregate()) - { - // Token has never been declared on this face. - // Send an Undeclare with a one shot generated id and a WireExpr ext. - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id: face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst), - ext_wire_expr: WireExprType { - wire_expr: Resource::get_best_key(&res, "", face.id), - }, - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_tokens(tables, &m, face_id, face_zid) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -398,37 +383,39 @@ pub(super) fn undeclare_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() && remote_simple_tokens(tables, &m, face) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && remote_simple_tokens(tables, &m, face_id, face_zid) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index d108f1d532..d912252050 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -474,19 +474,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -496,19 +486,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index ee3f2132e3..677a9ba1a5 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc}, }; @@ -368,10 +367,10 @@ fn simple_subs(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_subs(res: &Arc, face: &Arc) -> bool { +fn remote_simple_subs(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.subs.is_some()) } #[inline] @@ -441,23 +440,19 @@ fn propagate_forget_simple_subscription( ), ); } - for res in face_hat!(&mut face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, &face) + && (remote_simple_subs(&m, face_id) || remote_linkstatepeer_subs(tables, &m) || remote_router_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -472,9 +467,11 @@ fn propagate_forget_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -687,23 +684,19 @@ pub(super) fn undeclare_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, face) + && (remote_simple_subs(&m, face_id) || remote_linkstatepeer_subs(tables, &m) || remote_router_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -718,9 +711,11 @@ pub(super) fn undeclare_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -953,7 +948,7 @@ pub(crate) fn declare_sub_interest( if hat!(tables).router_subs.iter().any(|sub| { sub.context.is_some() && sub.matches(res) - && (remote_simple_subs(sub, face) + && (remote_simple_subs(sub, face.id) || remote_linkstatepeer_subs(tables, sub) || remote_router_subs(tables, sub)) }) { @@ -1261,20 +1256,12 @@ impl HatPubSubTrait for HatCode { return Arc::new(route); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, &key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if master || source_type == WhatAmI::Router { let net = hat!(tables).routers_net.as_ref().unwrap(); let router_source = match source_type { @@ -1370,20 +1357,11 @@ impl HatPubSubTrait for HatCode { } tracing::trace!("get_matching_subscriptions({})", key_expr,); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if master { let net = hat!(tables).routers_net.as_ref().unwrap(); insert_faces_for_subs( diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 2021750f5b..f0a67931ba 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -495,10 +494,10 @@ fn simple_qabls(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_qabls(res: &Arc, face: &Arc) -> bool { +fn remote_simple_qabls(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.qabl.is_some()) } #[inline] @@ -568,40 +567,40 @@ fn propagate_forget_simple_queryable( ), ); } - for res in face_hat!(&mut face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face) .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_qabls(&m, &face) - || remote_linkstatepeer_qabls(tables, &m) - || remote_router_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_qabls(&m, face_id) + || remote_linkstatepeer_qabls(tables, &m) + || remote_router_qabls(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -808,7 +807,7 @@ pub(super) fn undeclare_simple_queryable( } if simple_qabls.len() == 1 && !router_qabls && !linkstatepeer_qabls { - let mut face = &mut simple_qabls[0]; + let face = &mut simple_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { send_declare( &face.primitives, @@ -827,23 +826,19 @@ pub(super) fn undeclare_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_qabls(&m, face) + && (remote_simple_qabls(&m, face_id) || remote_linkstatepeer_qabls(tables, &m) || remote_router_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -858,9 +853,11 @@ pub(super) fn undeclare_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }) } } } @@ -1238,7 +1235,7 @@ pub(crate) fn declare_qabl_interest( } else { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() - && (remote_simple_qabls(qabl, face) + && (remote_simple_qabls(qabl, face.id) || remote_linkstatepeer_qabls(tables, qabl) || remote_router_qabls(tables, qabl)) { @@ -1435,19 +1432,12 @@ impl HatQueriesTrait for HatCode { return EMPTY_ROUTE.clone(); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, &key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); if master || source_type == WhatAmI::Router { let net = hat!(tables).routers_net.as_ref().unwrap(); @@ -1525,19 +1515,12 @@ impl HatQueriesTrait for HatCode { complete ); crate::net::routing::dispatcher::pubsub::get_matching_subscriptions(tables, key_expr); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index 0c6f432215..0c5618a80b 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -343,10 +343,15 @@ fn simple_tokens(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_tokens(tables: &Tables, res: &Arc, face: &Arc) -> bool { +fn remote_simple_tokens( + tables: &Tables, + res: &Arc, + face_id: usize, + face_zid: ZenohIdProto, +) -> bool { res.session_ctxs .values() - .any(|ctx| (ctx.face.id != face.id || face.zid == tables.zid) && ctx.token) + .any(|ctx| (ctx.face.id != face_id || face_zid == tables.zid) && ctx.token) } #[inline] @@ -451,70 +456,41 @@ fn propagate_forget_simple_token( ), ); } - for res in face_hat!(&mut face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, &face) - || remote_linkstatepeer_tokens(tables, &m) - || remote_router_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); - } else if face_hat!(face) - .remote_interests - .values() - .any(|i| i.options.tokens() && i.matches(&res) && !i.options.aggregate()) - && src_face.map_or(true, |src_face| { - src_face.whatami != WhatAmI::Peer - || face.whatami != WhatAmI::Peer - || hat!(tables).failover_brokering(src_face.zid, face.zid) - }) - { - // Token has never been declared on this face. - // Send an Undeclare with a one shot generated id and a WireExpr ext. - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id: face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst), - ext_wire_expr: WireExprType { - wire_expr: Resource::get_best_key(&res, "", face.id), - }, - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m) + || remote_router_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -739,40 +715,41 @@ pub(super) fn undeclare_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, face) - || remote_linkstatepeer_tokens(tables, &m) - || remote_router_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m) + || remote_router_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } } @@ -1003,7 +980,7 @@ pub(crate) fn declare_token_interest( if hat!(tables).router_tokens.iter().any(|token| { token.context.is_some() && token.matches(res) - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token) || remote_router_tokens(tables, token)) }) { diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index b2df34f9d0..75da400fff 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -157,13 +157,10 @@ fn match_test() { } for key_expr1 in key_exprs.iter() { - let res_matches = Resource::get_matches(&zread!(tables.tables), key_expr1); + let res_matches = Resource::get_matches(&zread!(tables.tables).root_res, key_expr1); dbg!(res_matches.len()); for key_expr2 in key_exprs.iter() { - if res_matches - .iter() - .any(|m| m.upgrade().unwrap().expr() == key_expr2.as_str()) - { + if res_matches.iter().any(|m| m.expr() == key_expr2.as_str()) { assert!(dbg!(dbg!(key_expr1).intersects(dbg!(key_expr2)))); } else { assert!(!dbg!(dbg!(key_expr1).intersects(dbg!(key_expr2))));