diff --git a/zenoh/src/net/routing/dispatcher/interests.rs b/zenoh/src/net/routing/dispatcher/interests.rs index a2cbcfeaf..119f499a7 100644 --- a/zenoh/src/net/routing/dispatcher/interests.rs +++ b/zenoh/src/net/routing/dispatcher/interests.rs @@ -20,7 +20,6 @@ use std::{ use async_trait::async_trait; use tokio_util::sync::CancellationToken; -use zenoh_keyexpr::keyexpr; use zenoh_protocol::{ core::WireExpr, network::{ @@ -220,15 +219,10 @@ pub(crate) fn declare_interest( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables_ref.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; diff --git a/zenoh/src/net/routing/dispatcher/pubsub.rs b/zenoh/src/net/routing/dispatcher/pubsub.rs index d19373528..f00db6aa5 100644 --- a/zenoh/src/net/routing/dispatcher/pubsub.rs +++ b/zenoh/src/net/routing/dispatcher/pubsub.rs @@ -18,11 +18,10 @@ use std::sync::Arc; use zenoh_core::zread; use zenoh_protocol::{ - core::{key_expr::keyexpr, Reliability, WireExpr}, + core::{Reliability, WireExpr}, network::{declare::SubscriberId, push::ext, Push}, zenoh::PushBody, }; -use zenoh_sync::get_mut_unchecked; use super::{ face::FaceState, @@ -72,15 +71,10 @@ pub(crate) fn declare_subscription( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; @@ -156,16 +150,16 @@ pub(crate) fn undeclare_subscription( } } -pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc) { +pub(crate) fn disable_matches_data_routes(tables: &mut Tables, res: &mut Arc) { if res.context.is_some() { - get_mut_unchecked(res).context_mut().disable_data_routes(); - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - } + if let Some(key_expr) = res.key_expr() { + Resource::iter_matches(&tables.root_res, key_expr, |m| { + if !Arc::ptr_eq(res, m) { + unsafe { &mut *Arc::as_ptr(m).cast_mut() } + .context_mut() + .disable_data_routes() + } + }) } } } diff --git a/zenoh/src/net/routing/dispatcher/queries.rs b/zenoh/src/net/routing/dispatcher/queries.rs index 17b30bef3..e52eccd80 100644 --- a/zenoh/src/net/routing/dispatcher/queries.rs +++ b/zenoh/src/net/routing/dispatcher/queries.rs @@ -23,7 +23,7 @@ use zenoh_buffers::ZBuf; #[cfg(feature = "stats")] use zenoh_protocol::zenoh::reply::ReplyBody; use zenoh_protocol::{ - core::{key_expr::keyexpr, Encoding, WireExpr}, + core::{Encoding, WireExpr}, network::{ declare::{ext, queryable::ext::QueryableInfoType, QueryableId}, request::{ @@ -99,15 +99,10 @@ pub(crate) fn declare_queryable( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; @@ -339,16 +334,16 @@ impl Timed for QueryCleanup { } } -pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc) { +pub(crate) fn disable_matches_query_routes(tables: &mut Tables, res: &mut Arc) { if res.context.is_some() { - get_mut_unchecked(res).context_mut().disable_query_routes(); - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - } + if let Some(key_expr) = res.key_expr() { + Resource::iter_matches(&tables.root_res, key_expr, |m| { + if !Arc::ptr_eq(res, m) { + unsafe { &mut *Arc::as_ptr(m).cast_mut() } + .context_mut() + .disable_query_routes() + } + }) } } } diff --git a/zenoh/src/net/routing/dispatcher/resource.rs b/zenoh/src/net/routing/dispatcher/resource.rs index 010495f84..50ede3367 100644 --- a/zenoh/src/net/routing/dispatcher/resource.rs +++ b/zenoh/src/net/routing/dispatcher/resource.rs @@ -17,7 +17,7 @@ use std::{ collections::HashMap, convert::TryInto, hash::{Hash, Hasher}, - sync::{Arc, RwLock, Weak}, + sync::{Arc, RwLock}, }; use zenoh_config::WhatAmI; @@ -172,7 +172,6 @@ pub(crate) type DataRoutes = Routes>; pub(crate) type QueryRoutes = Routes>; pub(crate) struct ResourceContext { - pub(crate) matches: Vec>, pub(crate) hat: Box, pub(crate) data_routes: RwLock, pub(crate) query_routes: RwLock, @@ -181,7 +180,6 @@ pub(crate) struct ResourceContext { impl ResourceContext { fn new(hat: Box) -> ResourceContext { ResourceContext { - matches: Vec::new(), hat, data_routes: Default::default(), query_routes: Default::default(), @@ -252,6 +250,15 @@ impl Resource { &self.expr } + pub(crate) fn key_expr(&self) -> Option<&keyexpr> { + if !self.expr.is_empty() { + // SAFETY: resource full expr should be a valid keyexpr + Some(unsafe { keyexpr::from_str_unchecked(&self.expr) }) + } else { + None + } + } + #[inline(always)] pub(crate) fn context(&self) -> &ResourceContext { self.context.as_ref().unwrap() @@ -264,12 +271,12 @@ impl Resource { #[inline(always)] pub(crate) fn matches(&self, other: &Arc) -> bool { - self.context - .as_ref() - .unwrap() - .matches - .iter() - .any(|m| m.upgrade().is_some_and(|m| &m == other)) + // SAFETY: resource expr is supposed to be a valid keyexpr + (!self.expr.is_empty() && !other.expr.is_empty()) + && unsafe { + keyexpr::from_str_unchecked(&self.expr) + .intersects(keyexpr::from_str_unchecked(&other.expr)) + } } pub fn nonwild_prefix(res: &Arc) -> (Option>, String) { @@ -304,18 +311,6 @@ impl Resource { if Arc::strong_count(res) <= 3 && res.children.is_empty() { // consider only childless resource held by only one external object (+ 1 strong count for resclone, + 1 strong count for res.parent to a total of 3 ) tracing::debug!("Unregister resource {}", res.expr()); - if let Some(context) = mutres.context.as_mut() { - for match_ in &mut context.matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, res) { - let mutmatch = get_mut_unchecked(&mut match_); - if let Some(ctx) = mutmatch.context.as_mut() { - ctx.matches - .retain(|x| !Arc::ptr_eq(&x.upgrade().unwrap(), res)); - } - } - } - } mutres.nonwild_prefix.take(); { get_mut_unchecked(parent).children.remove(&res.suffix); @@ -571,104 +566,101 @@ impl Resource { .unwrap_or_else(|| [&self.expr, suffix].concat().into()) } - pub fn get_matches(tables: &Tables, key_expr: &keyexpr) -> Vec> { - fn recursive_push(from: &Arc, matches: &mut Vec>) { - if from.context.is_some() { - matches.push(Arc::downgrade(from)); - } - for child in from.children.values() { - recursive_push(child, matches) + pub(crate) fn any_matches( + root: &Arc, + key_expr: &keyexpr, + mut f: impl FnMut(&Arc) -> bool, + ) -> bool { + macro_rules! return_if_true { + ($expr:expr) => { + if $expr { + return true; + } + }; + } + fn any_matches_trailing_wildcard( + res: &Arc, + f: &mut impl FnMut(&Arc) -> bool, + ) -> bool { + return_if_true!(res.context.is_some() && f(res)); + for child in res.children.values() { + return_if_true!(any_matches_trailing_wildcard(child, f)); } + false } - fn get_matches_from( + fn any_matches_rec( key_expr: &keyexpr, - from: &Arc, - matches: &mut Vec>, - ) { - if from.parent.is_none() || from.suffix == "/" { - for child in from.children.values() { - get_matches_from(key_expr, child, matches); + res: &Arc, + f: &mut impl FnMut(&Arc) -> bool, + ) -> bool { + if res.parent.is_none() || res.suffix == "/" { + for child in res.children.values() { + return_if_true!(any_matches_rec(key_expr, child, f)); } - return; + return false; } - let suffix: &keyexpr = from + let suffix: &keyexpr = res .suffix .strip_prefix('/') - .unwrap_or(&from.suffix) + .unwrap_or(&res.suffix) .try_into() .unwrap(); let (chunk, rest) = Resource::fst_chunk(key_expr); if chunk.intersects(suffix) { match rest { + None if chunk == "**" => return any_matches_trailing_wildcard(res, f), None => { - if chunk.as_bytes() == b"**" { - recursive_push(from, matches) - } else { - if from.context.is_some() { - matches.push(Arc::downgrade(from)); - } - if suffix.as_bytes() == b"**" { - for child in from.children.values() { - get_matches_from(key_expr, child, matches) - } - } - if let Some(child) = - from.children.get("/**").or_else(|| from.children.get("**")) - { - if child.context.is_some() { - matches.push(Arc::downgrade(child)) - } + return_if_true!(res.context.is_some() && f(res)); + if suffix == "**" { + for child in res.children.values() { + return_if_true!(any_matches_rec(key_expr, child, f)); } } + if let Some(child) = + res.children.get("/**").or_else(|| res.children.get("**")) + { + return_if_true!(child.context.is_some() && f(child)); + } + } + Some(rest) if rest == "**" => { + return_if_true!(any_matches_trailing_wildcard(res, f)) } - Some(rest) if rest.as_bytes() == b"**" => recursive_push(from, matches), Some(rest) => { - let recheck_keyexpr_one_level_lower = - chunk.as_bytes() == b"**" || suffix.as_bytes() == b"**"; - for child in from.children.values() { - get_matches_from(rest, child, matches); + let recheck_keyexpr_one_level_lower = chunk == "**" || suffix == "**"; + for child in res.children.values() { + return_if_true!(any_matches_rec(rest, child, f)); if recheck_keyexpr_one_level_lower { - get_matches_from(key_expr, child, matches) + return_if_true!(any_matches_rec(key_expr, child, f)); } } if recheck_keyexpr_one_level_lower { - get_matches_from(rest, from, matches) + return_if_true!(any_matches_rec(rest, res, f)); } } }; } + false } - let mut matches = Vec::new(); - get_matches_from(key_expr, &tables.root_res, &mut matches); - let mut i = 0; - while i < matches.len() { - let current = matches[i].as_ptr(); - let mut j = i + 1; - while j < matches.len() { - if std::ptr::eq(current, matches[j].as_ptr()) { - matches.swap_remove(j); - } else { - j += 1 - } - } - i += 1 - } - matches + any_matches_rec(key_expr, root, &mut f) } - pub fn match_resource(_tables: &Tables, res: &mut Arc, matches: Vec>) { - if res.context.is_some() { - for match_ in &matches { - let mut match_ = match_.upgrade().unwrap(); - get_mut_unchecked(&mut match_) - .context_mut() - .matches - .push(Arc::downgrade(res)); - } - get_mut_unchecked(res).context_mut().matches = matches; - } else { - tracing::error!("Call match_resource() on context less res {}", res.expr()); - } + pub(crate) fn iter_matches( + root: &Arc, + key_expr: &keyexpr, + mut f: impl FnMut(&Arc), + ) { + Self::any_matches(root, key_expr, |res| { + f(res); + false + }); + } + + pub(crate) fn get_matches(root: &Arc, key_expr: &keyexpr) -> Vec> { + let mut vec = Vec::new(); + Self::iter_matches(root, key_expr, |res| vec.push(res.clone())); + vec.sort_unstable_by_key(|res| Arc::as_ptr(res)); + vec.dedup_by_key(|res| Arc::as_ptr(res)); + vec } pub fn upgrade_resource(res: &mut Arc, hat: Box) { @@ -726,15 +718,10 @@ pub(crate) fn register_expr( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; let ctx = get_mut_unchecked(&mut res) @@ -791,15 +778,10 @@ pub(crate) fn register_expr_interest( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; get_mut_unchecked(face) diff --git a/zenoh/src/net/routing/dispatcher/token.rs b/zenoh/src/net/routing/dispatcher/token.rs index 23625ff8e..2d6b384f4 100644 --- a/zenoh/src/net/routing/dispatcher/token.rs +++ b/zenoh/src/net/routing/dispatcher/token.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use zenoh_keyexpr::keyexpr; use zenoh_protocol::{ core::WireExpr, network::{ @@ -65,15 +64,10 @@ pub(crate) fn declare_token( } else { let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = + let res = Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref()); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (res, wtables) }; @@ -125,18 +119,13 @@ pub(crate) fn undeclare_token( // TODO this could be improved let mut fullexpr = prefix.expr().to_string(); fullexpr.push_str(expr.wire_expr.suffix.as_ref()); - let mut matches = keyexpr::new(fullexpr.as_str()) - .map(|ke| Resource::get_matches(&rtables, ke)) - .unwrap_or_default(); drop(rtables); let mut wtables = zwrite!(tables.tables); - let mut res = Resource::make_resource( + let res = Resource::make_resource( &mut wtables, &mut prefix, expr.wire_expr.suffix.as_ref(), ); - matches.push(Arc::downgrade(&res)); - Resource::match_resource(&wtables, &mut res, matches); (Some(res), wtables) } } diff --git a/zenoh/src/net/routing/hat/client/mod.rs b/zenoh/src/net/routing/hat/client/mod.rs index cf25f6a68..db2fc9c86 100644 --- a/zenoh/src/net/routing/hat/client/mod.rs +++ b/zenoh/src/net/routing/hat/client/mod.rs @@ -168,19 +168,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -190,19 +180,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index 8e1a6a43a..37884abcb 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -377,16 +376,7 @@ impl HatPubSubTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() && context.face.whatami == WhatAmI::Client { route.entry(*sid).or_insert_with(|| { @@ -433,16 +423,7 @@ impl HatPubSubTrait for HatCode { } } - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() && context.face.whatami == WhatAmI::Client { matching_subscriptions diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 855207cd5..2d7afecb4 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -397,15 +396,7 @@ impl HatQueriesTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); for (sid, context) in &mres.session_ctxs { let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid); @@ -471,15 +462,7 @@ impl HatQueriesTrait for HatCode { } } - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs index fc9640c73..e88e41d5c 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/mod.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/mod.rs @@ -304,19 +304,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -326,19 +316,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index ce73ac340..912008e47 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc}, }; @@ -332,10 +331,10 @@ fn simple_subs(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_subs(res: &Arc, face: &Arc) -> bool { +fn remote_simple_subs(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.subs.is_some()) } #[inline] @@ -405,21 +404,18 @@ fn propagate_forget_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, &face) || remote_linkstatepeer_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + && (remote_simple_subs(&m, face_id) + || remote_linkstatepeer_subs(tables, &m)) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -434,9 +430,11 @@ fn propagate_forget_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -561,22 +559,18 @@ pub(super) fn undeclare_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, face) + && (remote_simple_subs(&m, face_id) || remote_linkstatepeer_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -593,9 +587,11 @@ pub(super) fn undeclare_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -704,7 +700,8 @@ pub(super) fn declare_sub_interest( if hat!(tables).linkstatepeer_subs.iter().any(|sub| { sub.context.is_some() && sub.matches(res) - && (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub)) + && (remote_simple_subs(sub, face.id) + || remote_linkstatepeer_subs(tables, sub)) }) { let id = make_sub_id(res, face, mode); let wire_expr = Resource::decl_key(res, face, face.whatami != WhatAmI::Client); @@ -729,7 +726,8 @@ pub(super) fn declare_sub_interest( for sub in &hat!(tables).linkstatepeer_subs { if sub.context.is_some() && sub.matches(res) - && (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub)) + && (remote_simple_subs(sub, face.id) + || remote_linkstatepeer_subs(tables, sub)) { let id = make_sub_id(sub, face, mode); let wire_expr = @@ -756,7 +754,7 @@ pub(super) fn declare_sub_interest( } else { for sub in &hat!(tables).linkstatepeer_subs { if sub.context.is_some() - && (remote_simple_subs(sub, face) || remote_linkstatepeer_subs(tables, sub)) + && (remote_simple_subs(sub, face.id) || remote_linkstatepeer_subs(tables, sub)) { let id = make_sub_id(sub, face, mode); let wire_expr = Resource::decl_key(sub, face, face.whatami != WhatAmI::Client); @@ -935,16 +933,8 @@ impl HatPubSubTrait for HatCode { return Arc::new(route); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); let peer_source = match source_type { WhatAmI::Router | WhatAmI::Peer => source, @@ -1022,16 +1012,7 @@ impl HatPubSubTrait for HatCode { } tracing::trace!("get_matching_subscriptions({})", key_expr,); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); insert_faces_for_subs( &mut matching_subscriptions, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index e4c3cfca7..2fa115736 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -348,10 +347,10 @@ fn simple_qabls(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_qabls(res: &Arc, face: &Arc) -> bool { +fn remote_simple_qabls(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.qabl.is_some()) } #[inline] @@ -421,39 +420,39 @@ fn propagate_forget_simple_queryable( ), ); } - for res in face_hat!(&mut face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face) .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_qabls(&m, &face) - || remote_linkstatepeer_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_qabls(&m, face_id) + || remote_linkstatepeer_qabls(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -565,7 +564,7 @@ pub(super) fn undeclare_simple_queryable( } if simple_qabls.len() == 1 && !linkstatepeer_qabls { - let mut face = &mut simple_qabls[0]; + let face = &mut simple_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { send_declare( &face.primitives, @@ -584,22 +583,18 @@ pub(super) fn undeclare_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_qabls(&m, face) + && (remote_simple_qabls(&m, face_id) || remote_linkstatepeer_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -614,9 +609,11 @@ pub(super) fn undeclare_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -774,7 +771,7 @@ pub(super) fn declare_qabl_interest( if hat!(tables).linkstatepeer_qabls.iter().any(|qabl| { qabl.context.is_some() && qabl.matches(res) - && (remote_simple_qabls(qabl, face) + && (remote_simple_qabls(qabl, face.id) || remote_linkstatepeer_qabls(tables, qabl)) }) { let info = local_qabl_info(tables, res, face); @@ -802,7 +799,7 @@ pub(super) fn declare_qabl_interest( for qabl in hat!(tables).linkstatepeer_qabls.iter() { if qabl.context.is_some() && qabl.matches(res) - && (remote_simple_qabls(qabl, face) + && (remote_simple_qabls(qabl, face.id) || remote_linkstatepeer_qabls(tables, qabl)) { let info = local_qabl_info(tables, qabl, face); @@ -832,7 +829,8 @@ pub(super) fn declare_qabl_interest( } else { for qabl in hat!(tables).linkstatepeer_qabls.iter() { if qabl.context.is_some() - && (remote_simple_qabls(qabl, face) || remote_linkstatepeer_qabls(tables, qabl)) + && (remote_simple_qabls(qabl, face.id) + || remote_linkstatepeer_qabls(tables, qabl)) { let info = local_qabl_info(tables, qabl, face); let id = make_qabl_id(qabl, face, mode, info); @@ -976,15 +974,8 @@ impl HatQueriesTrait for HatCode { return EMPTY_ROUTE.clone(); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); let net = hat!(tables).linkstatepeers_net.as_ref().unwrap(); @@ -1042,15 +1033,7 @@ impl HatQueriesTrait for HatCode { complete ); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/linkstate_peer/token.rs b/zenoh/src/net/routing/hat/linkstate_peer/token.rs index 193d38360..bb46b7fe3 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/token.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/token.rs @@ -304,10 +304,15 @@ fn simple_tokens(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_tokens(tables: &Tables, res: &Arc, face: &Arc) -> bool { +fn remote_simple_tokens( + tables: &Tables, + res: &Arc, + face_id: usize, + face_zid: ZenohIdProto, +) -> bool { res.session_ctxs .values() - .any(|ctx| (ctx.face.id != face.id || face.zid == tables.zid) && ctx.token) + .any(|ctx| (ctx.face.id != face_id || face_zid == tables.zid) && ctx.token) } #[inline] @@ -377,39 +382,40 @@ fn propagate_forget_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, &face) - || remote_linkstatepeer_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -532,39 +538,40 @@ pub(super) fn undeclare_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, face) - || remote_linkstatepeer_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } } @@ -669,7 +676,7 @@ pub(crate) fn declare_token_interest( if hat!(tables).linkstatepeer_tokens.iter().any(|token| { token.context.is_some() && token.matches(res) - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token)) }) { let id = make_token_id(res, face, mode); @@ -692,7 +699,7 @@ pub(crate) fn declare_token_interest( for token in &hat!(tables).linkstatepeer_tokens { if token.context.is_some() && token.matches(res) - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token)) { let id = make_token_id(token, face, mode); @@ -717,7 +724,7 @@ pub(crate) fn declare_token_interest( } else { for token in &hat!(tables).linkstatepeer_tokens { if token.context.is_some() - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token)) { let id = make_token_id(token, face, mode); diff --git a/zenoh/src/net/routing/hat/p2p_peer/mod.rs b/zenoh/src/net/routing/hat/p2p_peer/mod.rs index 4986d3edb..f0ae212b0 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/mod.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/mod.rs @@ -256,19 +256,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -278,19 +268,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index c79947639..85a72f362 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -230,10 +229,10 @@ fn simple_subs(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_subs(res: &Arc, face: &Arc) -> bool { +fn remote_simple_subs(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.subs.is_some()) } fn propagate_forget_simple_subscription( @@ -260,19 +259,16 @@ fn propagate_forget_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_subs(&m, &face)) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_subs(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -287,9 +283,11 @@ fn propagate_forget_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -329,19 +327,16 @@ pub(super) fn undeclare_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_subs(&m, face)) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_subs(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -356,9 +351,11 @@ pub(super) fn undeclare_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -653,16 +650,7 @@ impl HatPubSubTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() && (source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client) @@ -698,16 +686,8 @@ impl HatPubSubTrait for HatCode { return matching_subscriptions; } tracing::trace!("get_matching_subscriptions({})", key_expr,); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { for (sid, context) in &mres.session_ctxs { if context.subs.is_some() { matching_subscriptions diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index d211ee9b9..0b250ced8 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -199,10 +198,10 @@ fn simple_qabls(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_qabls(res: &Arc, face: &Arc) -> bool { +fn remote_simple_qabls(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.qabl.is_some()) } fn propagate_forget_simple_queryable( @@ -229,19 +228,16 @@ fn propagate_forget_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_qabls(&m, face)) - }) { - if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_qabls(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -256,9 +252,11 @@ fn propagate_forget_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -284,7 +282,7 @@ pub(super) fn undeclare_simple_queryable( propagate_simple_queryable(tables, res, None, send_declare); } if simple_qabls.len() == 1 { - let mut face = &mut simple_qabls[0]; + let face = &mut simple_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { send_declare( &face.primitives, @@ -303,19 +301,16 @@ pub(super) fn undeclare_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && (remote_simple_qabls(&m, face))) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_qabls(m, face_id) + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -330,9 +325,11 @@ pub(super) fn undeclare_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -647,15 +644,7 @@ impl HatQueriesTrait for HatCode { } } - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); - - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); for (sid, context) in &mres.session_ctxs { if source_type == WhatAmI::Client || context.face.whatami == WhatAmI::Client { @@ -696,15 +685,8 @@ impl HatQueriesTrait for HatCode { key_expr, complete ); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/p2p_peer/token.rs b/zenoh/src/net/routing/hat/p2p_peer/token.rs index d72990761..fa1d7adb3 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/token.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/token.rs @@ -15,11 +15,14 @@ use std::sync::{atomic::Ordering, Arc}; use zenoh_config::WhatAmI; -use zenoh_protocol::network::{ - declare::{common::ext::WireExprType, TokenId}, - ext, - interest::{InterestId, InterestMode}, - Declare, DeclareBody, DeclareToken, UndeclareToken, +use zenoh_protocol::{ + core::ZenohIdProto, + network::{ + declare::{common::ext::WireExprType, TokenId}, + ext, + interest::{InterestId, InterestMode}, + Declare, DeclareBody, DeclareToken, UndeclareToken, + }, }; use zenoh_sync::get_mut_unchecked; @@ -242,10 +245,15 @@ fn simple_tokens(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_tokens(tables: &Tables, res: &Arc, face: &Arc) -> bool { +fn remote_simple_tokens( + tables: &Tables, + res: &Arc, + face_id: usize, + face_zid: ZenohIdProto, +) -> bool { res.session_ctxs .values() - .any(|ctx| (ctx.face.id != face.id || face.zid == tables.zid) && ctx.token) + .any(|ctx| (ctx.face.id != face_id || face_zid == tables.zid) && ctx.token) } fn propagate_forget_simple_token( @@ -299,61 +307,38 @@ fn propagate_forget_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade() - .is_some_and(|m| m.context.is_some() && remote_simple_tokens(tables, &m, &face)) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); - } else if face_hat!(face) - .remote_interests - .values() - .any(|i| i.options.tokens() && i.matches(&res) && !i.options.aggregate()) - { - // Token has never been declared on this face. - // Send an Undeclare with a one shot generated id and a WireExpr ext. - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id: face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst), - ext_wire_expr: WireExprType { - wire_expr: Resource::get_best_key(&res, "", face.id), - }, - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() && remote_simple_tokens(tables, &m, face_id, face_zid) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -398,37 +383,39 @@ pub(super) fn undeclare_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() && remote_simple_tokens(tables, &m, face) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && remote_simple_tokens(tables, &m, face_id, face_zid) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } } diff --git a/zenoh/src/net/routing/hat/router/mod.rs b/zenoh/src/net/routing/hat/router/mod.rs index d108f1d53..d91225205 100644 --- a/zenoh/src/net/routing/hat/router/mod.rs +++ b/zenoh/src/net/routing/hat/router/mod.rs @@ -474,19 +474,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_subscription(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_data_routes(); - subs_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + subs_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_data_routes(); - subs_matches.push(res); } } @@ -496,19 +486,9 @@ impl HatBaseTrait for HatCode { undeclare_simple_queryable(&mut wtables, &mut face_clone, &mut res, send_declare); if res.context.is_some() { - for match_ in &res.context().matches { - let mut match_ = match_.upgrade().unwrap(); - if !Arc::ptr_eq(&match_, &res) { - get_mut_unchecked(&mut match_) - .context_mut() - .disable_query_routes(); - qabls_matches.push(match_); - } + if let Some(key_expr) = res.key_expr() { + qabls_matches.extend(Resource::get_matches(&wtables.root_res, &key_expr)); } - get_mut_unchecked(&mut res) - .context_mut() - .disable_query_routes(); - qabls_matches.push(res); } } diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index ee3f2132e..677a9ba1a 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::{HashMap, HashSet}, sync::{atomic::Ordering, Arc}, }; @@ -368,10 +367,10 @@ fn simple_subs(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_subs(res: &Arc, face: &Arc) -> bool { +fn remote_simple_subs(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.subs.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.subs.is_some()) } #[inline] @@ -441,23 +440,19 @@ fn propagate_forget_simple_subscription( ), ); } - for res in face_hat!(&mut face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, &face) + && (remote_simple_subs(&m, face_id) || remote_linkstatepeer_subs(tables, &m) || remote_router_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -472,9 +467,11 @@ fn propagate_forget_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } @@ -687,23 +684,19 @@ pub(super) fn undeclare_simple_subscription( ), ); } - for res in face_hat!(face) - .local_subs - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face).local_subs.retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_subs(&m, face) + && (remote_simple_subs(&m, face_id) || remote_linkstatepeer_subs(tables, &m) || remote_router_subs(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_subs.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -718,9 +711,11 @@ pub(super) fn undeclare_simple_subscription( res.expr().to_string(), ), ); + return false; } } - } + true + }); } } } @@ -953,7 +948,7 @@ pub(crate) fn declare_sub_interest( if hat!(tables).router_subs.iter().any(|sub| { sub.context.is_some() && sub.matches(res) - && (remote_simple_subs(sub, face) + && (remote_simple_subs(sub, face.id) || remote_linkstatepeer_subs(tables, sub) || remote_router_subs(tables, sub)) }) { @@ -1261,20 +1256,12 @@ impl HatPubSubTrait for HatCode { return Arc::new(route); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, &key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if master || source_type == WhatAmI::Router { let net = hat!(tables).routers_net.as_ref().unwrap(); let router_source = match source_type { @@ -1370,20 +1357,11 @@ impl HatPubSubTrait for HatCode { } tracing::trace!("get_matching_subscriptions({})", key_expr,); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); - let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); - + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if master { let net = hat!(tables).routers_net.as_ref().unwrap(); insert_faces_for_subs( diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 2021750f5..f0a67931b 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -12,7 +12,6 @@ // ZettaScale Zenoh Team, // use std::{ - borrow::Cow, collections::HashMap, sync::{atomic::Ordering, Arc}, }; @@ -495,10 +494,10 @@ fn simple_qabls(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_qabls(res: &Arc, face: &Arc) -> bool { +fn remote_simple_qabls(res: &Arc, face_id: usize) -> bool { res.session_ctxs .values() - .any(|ctx| ctx.face.id != face.id && ctx.qabl.is_some()) + .any(|ctx| ctx.face.id != face_id && ctx.qabl.is_some()) } #[inline] @@ -568,40 +567,40 @@ fn propagate_forget_simple_queryable( ), ); } - for res in face_hat!(&mut face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(&mut face) .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_qabls(&m, &face) - || remote_linkstatepeer_qabls(tables, &m) - || remote_router_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareQueryable(UndeclareQueryable { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_qabls(&m, face_id) + || remote_linkstatepeer_qabls(tables, &m) + || remote_router_qabls(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareQueryable(UndeclareQueryable { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -808,7 +807,7 @@ pub(super) fn undeclare_simple_queryable( } if simple_qabls.len() == 1 && !router_qabls && !linkstatepeer_qabls { - let mut face = &mut simple_qabls[0]; + let face = &mut simple_qabls[0]; if let Some((id, _)) = face_hat_mut!(face).local_qabls.remove(res) { send_declare( &face.primitives, @@ -827,23 +826,19 @@ pub(super) fn undeclare_simple_queryable( ), ); } - for res in face_hat!(face) - .local_qabls - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + face_hat_mut!(face).local_qabls.retain(|res, &mut (id, _)| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { m.context.is_some() - && (remote_simple_qabls(&m, face) + && (remote_simple_qabls(&m, face_id) || remote_linkstatepeer_qabls(tables, &m) || remote_router_qabls(tables, &m)) - }) - }) { - if let Some((id, _)) = face_hat_mut!(&mut face).local_qabls.remove(&res) { + }) { send_declare( - &face.primitives, + &primitives, RoutingContext::with_expr( Declare { interest_id: None, @@ -858,9 +853,11 @@ pub(super) fn undeclare_simple_queryable( res.expr().to_string(), ), ); + return false; } } - } + true + }) } } } @@ -1238,7 +1235,7 @@ pub(crate) fn declare_qabl_interest( } else { for qabl in hat!(tables).router_qabls.iter() { if qabl.context.is_some() - && (remote_simple_qabls(qabl, face) + && (remote_simple_qabls(qabl, face.id) || remote_linkstatepeer_qabls(tables, qabl) || remote_router_qabls(tables, qabl)) { @@ -1435,19 +1432,12 @@ impl HatQueriesTrait for HatCode { return EMPTY_ROUTE.clone(); } }; - let res = Resource::get_resource(expr.prefix, expr.suffix); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, &key_expr))); let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, &key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { let complete = DEFAULT_INCLUDER.includes(mres.expr().as_bytes(), key_expr.as_bytes()); if master || source_type == WhatAmI::Router { let net = hat!(tables).routers_net.as_ref().unwrap(); @@ -1525,19 +1515,12 @@ impl HatQueriesTrait for HatCode { complete ); crate::net::routing::dispatcher::pubsub::get_matching_subscriptions(tables, key_expr); - let res = Resource::get_resource(&tables.root_res, key_expr); - let matches = res - .as_ref() - .and_then(|res| res.context.as_ref()) - .map(|ctx| Cow::from(&ctx.matches)) - .unwrap_or_else(|| Cow::from(Resource::get_matches(tables, key_expr))); let master = !hat!(tables).full_net(WhatAmI::Peer) || *hat!(tables).elect_router(&tables.zid, key_expr, hat!(tables).shared_nodes.iter()) == tables.zid; - for mres in matches.iter() { - let mres = mres.upgrade().unwrap(); + for mres in Resource::get_matches(&tables.root_res, &key_expr).iter() { if complete && !KeyExpr::keyexpr_include(mres.expr(), key_expr) { continue; } diff --git a/zenoh/src/net/routing/hat/router/token.rs b/zenoh/src/net/routing/hat/router/token.rs index 0c6f43221..0c5618a80 100644 --- a/zenoh/src/net/routing/hat/router/token.rs +++ b/zenoh/src/net/routing/hat/router/token.rs @@ -343,10 +343,15 @@ fn simple_tokens(res: &Arc) -> Vec> { } #[inline] -fn remote_simple_tokens(tables: &Tables, res: &Arc, face: &Arc) -> bool { +fn remote_simple_tokens( + tables: &Tables, + res: &Arc, + face_id: usize, + face_zid: ZenohIdProto, +) -> bool { res.session_ctxs .values() - .any(|ctx| (ctx.face.id != face.id || face.zid == tables.zid) && ctx.token) + .any(|ctx| (ctx.face.id != face_id || face_zid == tables.zid) && ctx.token) } #[inline] @@ -451,70 +456,41 @@ fn propagate_forget_simple_token( ), ); } - for res in face_hat!(&mut face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, &face) - || remote_linkstatepeer_tokens(tables, &m) - || remote_router_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); - } else if face_hat!(face) - .remote_interests - .values() - .any(|i| i.options.tokens() && i.matches(&res) && !i.options.aggregate()) - && src_face.map_or(true, |src_face| { - src_face.whatami != WhatAmI::Peer - || face.whatami != WhatAmI::Peer - || hat!(tables).failover_brokering(src_face.zid, face.zid) - }) - { - // Token has never been declared on this face. - // Send an Undeclare with a one shot generated id and a WireExpr ext. - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id: face_hat!(face).next_id.fetch_add(1, Ordering::SeqCst), - ext_wire_expr: WireExprType { - wire_expr: Resource::get_best_key(&res, "", face.id), - }, - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m) + || remote_router_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } @@ -739,40 +715,41 @@ pub(super) fn undeclare_simple_token( ), ); } - for res in face_hat!(face) + let root = tables.root_res.clone(); + let primitives = face.primitives.clone(); + let face_id = face.id; + let face_zid = face.zid; + face_hat_mut!(&mut face) .local_tokens - .keys() - .cloned() - .collect::>>() - { - if !res.context().matches.iter().any(|m| { - m.upgrade().is_some_and(|m| { - m.context.is_some() - && (remote_simple_tokens(tables, &m, face) - || remote_linkstatepeer_tokens(tables, &m) - || remote_router_tokens(tables, &m)) - }) - }) { - if let Some(id) = face_hat_mut!(&mut face).local_tokens.remove(&res) { - send_declare( - &face.primitives, - RoutingContext::with_expr( - Declare { - interest_id: None, - ext_qos: ext::QoSType::DECLARE, - ext_tstamp: None, - ext_nodeid: ext::NodeIdType::DEFAULT, - body: DeclareBody::UndeclareToken(UndeclareToken { - id, - ext_wire_expr: WireExprType::null(), - }), - }, - res.expr().to_string(), - ), - ); + .retain(|res, &mut id| { + if let Some(key_expr) = res.key_expr() { + if !Resource::any_matches(&root, &key_expr, |m| { + m.context.is_some() + && (remote_simple_tokens(tables, &m, face_id, face_zid) + || remote_linkstatepeer_tokens(tables, &m) + || remote_router_tokens(tables, &m)) + }) { + send_declare( + &primitives, + RoutingContext::with_expr( + Declare { + interest_id: None, + ext_qos: ext::QoSType::DECLARE, + ext_tstamp: None, + ext_nodeid: ext::NodeIdType::DEFAULT, + body: DeclareBody::UndeclareToken(UndeclareToken { + id, + ext_wire_expr: WireExprType::null(), + }), + }, + res.expr().to_string(), + ), + ); + return false; + } } - } - } + true + }); } } } @@ -1003,7 +980,7 @@ pub(crate) fn declare_token_interest( if hat!(tables).router_tokens.iter().any(|token| { token.context.is_some() && token.matches(res) - && (remote_simple_tokens(tables, token, face) + && (remote_simple_tokens(tables, token, face.id, face.zid) || remote_linkstatepeer_tokens(tables, token) || remote_router_tokens(tables, token)) }) { diff --git a/zenoh/src/net/tests/tables.rs b/zenoh/src/net/tests/tables.rs index b2df34f9d..75da400ff 100644 --- a/zenoh/src/net/tests/tables.rs +++ b/zenoh/src/net/tests/tables.rs @@ -157,13 +157,10 @@ fn match_test() { } for key_expr1 in key_exprs.iter() { - let res_matches = Resource::get_matches(&zread!(tables.tables), key_expr1); + let res_matches = Resource::get_matches(&zread!(tables.tables).root_res, key_expr1); dbg!(res_matches.len()); for key_expr2 in key_exprs.iter() { - if res_matches - .iter() - .any(|m| m.upgrade().unwrap().expr() == key_expr2.as_str()) - { + if res_matches.iter().any(|m| m.expr() == key_expr2.as_str()) { assert!(dbg!(dbg!(key_expr1).intersects(dbg!(key_expr2)))); } else { assert!(!dbg!(dbg!(key_expr1).intersects(dbg!(key_expr2))));