Skip to content

Commit

Permalink
refactor: remove precomputed matches in routing tree
Browse files Browse the repository at this point in the history
Before, everytime a node was inserted in the tree, all the matching
nodes were computed and the list was added to it and maintained each
time the tree is modified.
This PR remove this matches precomputation to only compute them when it
is needed.
  • Loading branch information
wyfo committed Jan 30, 2025
1 parent ba64d7a commit 6c16b49
Show file tree
Hide file tree
Showing 21 changed files with 578 additions and 885 deletions.
8 changes: 1 addition & 7 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::{

use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use zenoh_keyexpr::keyexpr;
use zenoh_protocol::{
core::WireExpr,
network::{
Expand Down Expand Up @@ -220,15 +219,10 @@ pub(crate) fn declare_interest(
} else {
let mut fullexpr = prefix.expr().to_string();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables_ref.tables);
let mut res =
let res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

Expand Down
28 changes: 11 additions & 17 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ use std::sync::Arc;

use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::keyexpr, Reliability, WireExpr},
core::{Reliability, WireExpr},
network::{declare::SubscriberId, push::ext, Push},
zenoh::PushBody,
};
use zenoh_sync::get_mut_unchecked;

use super::{
face::FaceState,
Expand Down Expand Up @@ -72,15 +71,10 @@ pub(crate) fn declare_subscription(
} else {
let mut fullexpr = prefix.expr().to_string();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
let res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

Expand Down Expand Up @@ -156,16 +150,16 @@ pub(crate) fn undeclare_subscription(
}
}

pub(crate) fn disable_matches_data_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
pub(crate) fn disable_matches_data_routes(tables: &mut Tables, res: &mut Arc<Resource>) {
if res.context.is_some() {
get_mut_unchecked(res).context_mut().disable_data_routes();
for match_ in &res.context().matches {
let mut match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
get_mut_unchecked(&mut match_)
.context_mut()
.disable_data_routes();
}
if let Some(key_expr) = res.key_expr() {
Resource::iter_matches(&tables.root_res, key_expr, |m| {
if !Arc::ptr_eq(res, m) {
unsafe { &mut *Arc::as_ptr(m).cast_mut() }
.context_mut()
.disable_data_routes()
}
})
}
}
}
Expand Down
27 changes: 11 additions & 16 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use zenoh_buffers::ZBuf;
#[cfg(feature = "stats")]
use zenoh_protocol::zenoh::reply::ReplyBody;
use zenoh_protocol::{
core::{key_expr::keyexpr, Encoding, WireExpr},
core::{Encoding, WireExpr},
network::{
declare::{ext, queryable::ext::QueryableInfoType, QueryableId},
request::{
Expand Down Expand Up @@ -99,15 +99,10 @@ pub(crate) fn declare_queryable(
} else {
let mut fullexpr = prefix.expr().to_string();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
let res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

Expand Down Expand Up @@ -339,16 +334,16 @@ impl Timed for QueryCleanup {
}
}

pub(crate) fn disable_matches_query_routes(_tables: &mut Tables, res: &mut Arc<Resource>) {
pub(crate) fn disable_matches_query_routes(tables: &mut Tables, res: &mut Arc<Resource>) {
if res.context.is_some() {
get_mut_unchecked(res).context_mut().disable_query_routes();
for match_ in &res.context().matches {
let mut match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
get_mut_unchecked(&mut match_)
.context_mut()
.disable_query_routes();
}
if let Some(key_expr) = res.key_expr() {
Resource::iter_matches(&tables.root_res, key_expr, |m| {
if !Arc::ptr_eq(res, m) {
unsafe { &mut *Arc::as_ptr(m).cast_mut() }
.context_mut()
.disable_query_routes()
}
})
}
}
}
Expand Down
Loading

0 comments on commit 6c16b49

Please sign in to comment.