Skip to content

Commit

Permalink
refactor: use HashSet instead of Vec for network links
Browse files Browse the repository at this point in the history
Links are used a lot for `contains` operation.
  • Loading branch information
wyfo committed Jan 23, 2025
1 parent da1b0d3 commit 183bd4b
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 40 deletions.
1 change: 1 addition & 0 deletions examples/examples/z_pub_thr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use zenoh::{
Wait,
};
use zenoh_examples::CommonArgs;
use zenoh_ext::z_serialize;

fn main() {
// initiate logging
Expand Down
23 changes: 12 additions & 11 deletions zenoh/src/net/routing/hat/linkstate_peer/network.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -52,7 +53,7 @@ pub(super) struct Node {
pub(super) whatami: Option<WhatAmI>,
pub(super) locators: Option<Vec<Locator>>,
pub(super) sn: u64,
pub(super) links: Vec<ZenohIdProto>,
pub(super) links: HashSet<ZenohIdProto>,
}

impl std::fmt::Debug for Node {
Expand Down Expand Up @@ -149,7 +150,7 @@ impl Network {
whatami: Some(runtime.whatami()),
locators: None,
sn: 1,
links: vec![],
links: HashSet::new(),
});
Network {
name,
Expand Down Expand Up @@ -424,7 +425,7 @@ impl Network {
let link_states = link_states
.into_iter()
.map(|(zid, wai, locs, sn, links)| {
let links: Vec<ZenohIdProto> = links
let links: HashSet<ZenohIdProto> = links
.iter()
.filter_map(|l| {
if let Some(zid) = src_link.get_zid(l) {
Expand Down Expand Up @@ -574,7 +575,7 @@ impl Network {
}
},
)
.collect::<Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>>();
.collect::<Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>>();

// Add/remove edges from graph
let mut reintroduced_nodes = vec![];
Expand All @@ -596,11 +597,11 @@ impl Network {
whatami: None,
locators: None,
sn: 0,
links: vec![],
links: HashSet::new(),
};
tracing::debug!("{} Add node (reintroduced) {}", self.name, link.clone());
let idx = self.add_node(node);
reintroduced_nodes.push((vec![], idx, true));
reintroduced_nodes.push((HashSet::new(), idx, true));
}
}
let mut edges = vec![];
Expand All @@ -626,7 +627,7 @@ impl Network {
let link_states = link_states
.into_iter()
.filter(|ls| !removed.iter().any(|(idx, _)| idx == &ls.1))
.collect::<Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>>();
.collect::<Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>>();

if !self.autoconnect.is_empty() {
// Connect discovered peers
Expand Down Expand Up @@ -665,8 +666,8 @@ impl Network {
#[allow(clippy::type_complexity)] // This is only used here
if !link_states.is_empty() {
let (new_idxs, updated_idxs): (
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>,
Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>,
) = link_states.into_iter().partition(|(_, _, new)| *new);
for link in self.links.values() {
let new_idxs = new_idxs
Expand Down Expand Up @@ -742,7 +743,7 @@ impl Network {
whatami: Some(whatami),
locators: None,
sn: 0,
links: vec![],
links: HashSet::new(),
}),
true,
)
Expand All @@ -752,7 +753,7 @@ impl Network {
tracing::trace!("Update edge (link) {} {}", self.graph[self.idx].zid, zid);
self.update_edge(self.idx, idx);
}
self.graph[self.idx].links.push(zid);
self.graph[self.idx].links.insert(zid);
self.graph[self.idx].sn += 1;

// Send updated self linkstate on all existing links except new one
Expand Down
11 changes: 6 additions & 5 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -47,7 +48,7 @@ pub(super) struct Node {
pub(super) whatami: Option<WhatAmI>,
pub(super) locators: Option<Vec<Locator>>,
pub(super) sn: u64,
pub(super) links: Vec<ZenohIdProto>,
pub(super) links: HashSet<ZenohIdProto>,
}

impl std::fmt::Debug for Node {
Expand Down Expand Up @@ -125,7 +126,7 @@ impl Network {
whatami: Some(runtime.whatami()),
locators: None,
sn: 1,
links: vec![],
links: HashSet::new(),
});
Network {
name,
Expand Down Expand Up @@ -355,7 +356,7 @@ impl Network {
let link_states = link_states
.into_iter()
.map(|(zid, wai, locs, sn, links)| {
let links: Vec<ZenohIdProto> = links
let links: HashSet<ZenohIdProto> = links
.iter()
.filter_map(|l| {
if let Some(zid) = src_link.get_zid(l) {
Expand Down Expand Up @@ -497,13 +498,13 @@ impl Network {
whatami: Some(whatami),
locators: None,
sn: 0,
links: vec![],
links: HashSet::new(),
}),
true,
)
}
};
self.graph[self.idx].links.push(zid);
self.graph[self.idx].links.insert(zid);
self.graph[self.idx].sn += 1;

// Send updated self linkstate on all existing links except new one
Expand Down
10 changes: 6 additions & 4 deletions zenoh/src/net/routing/hat/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ impl HatTables {
.as_ref()
.unwrap()
.get_links(peer)
.iter()
.into_iter()
.flatten()
.filter(move |nid| {
if let Some(node) = self.routers_net.as_ref().unwrap().get_node(nid) {
node.whatami.unwrap_or(WhatAmI::Router) == WhatAmI::Router
Expand Down Expand Up @@ -269,7 +270,7 @@ impl HatTables {
}

#[inline]
fn failover_brokering_to(source_links: &[ZenohIdProto], dest: ZenohIdProto) -> bool {
fn failover_brokering_to(source_links: &HashSet<ZenohIdProto>, dest: ZenohIdProto) -> bool {
// if source_links is empty then gossip is probably disabled in source peer
!source_links.is_empty() && !source_links.contains(&dest)
}
Expand All @@ -281,8 +282,9 @@ impl HatTables {
.linkstatepeers_net
.as_ref()
.map(|net| {
let links = net.get_links(peer1);
let res = HatTables::failover_brokering_to(links, peer2);
let res = net
.get_links(peer1)
.is_some_and(|links| HatTables::failover_brokering_to(links, peer2));
tracing::trace!("failover_brokering {} {} : {}", peer1, peer2, res);
res
})
Expand Down
29 changes: 14 additions & 15 deletions zenoh/src/net/routing/hat/router/network.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
//
// Copyright (c) 2023 ZettaScale Technology
//
Expand Down Expand Up @@ -52,7 +53,7 @@ pub(super) struct Node {
pub(super) whatami: Option<WhatAmI>,
pub(super) locators: Option<Vec<Locator>>,
pub(super) sn: u64,
pub(super) links: Vec<ZenohIdProto>,
pub(super) links: HashSet<ZenohIdProto>,
}

impl std::fmt::Debug for Node {
Expand Down Expand Up @@ -149,7 +150,7 @@ impl Network {
whatami: Some(runtime.whatami()),
locators: None,
sn: 1,
links: vec![],
links: HashSet::new(),
});
Network {
name,
Expand Down Expand Up @@ -428,7 +429,7 @@ impl Network {
let link_states = link_states
.into_iter()
.map(|(zid, wai, locs, sn, links)| {
let links: Vec<ZenohIdProto> = links
let links: HashSet<ZenohIdProto> = links
.iter()
.filter_map(|l| {
if let Some(zid) = src_link.get_zid(l) {
Expand Down Expand Up @@ -578,7 +579,7 @@ impl Network {
}
},
)
.collect::<Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>>();
.collect::<Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>>();

// Add/remove edges from graph
let mut reintroduced_nodes = vec![];
Expand All @@ -600,11 +601,11 @@ impl Network {
whatami: None,
locators: None,
sn: 0,
links: vec![],
links: HashSet::new(),
};
tracing::debug!("{} Add node (reintroduced) {}", self.name, link.clone());
let idx = self.add_node(node);
reintroduced_nodes.push((vec![], idx, true));
reintroduced_nodes.push((HashSet::new(), idx, true));
}
}
let mut edges = vec![];
Expand All @@ -630,7 +631,7 @@ impl Network {
let link_states = link_states
.into_iter()
.filter(|ls| !removed.iter().any(|(idx, _)| idx == &ls.1))
.collect::<Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>>();
.collect::<Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>>();

if !self.autoconnect.is_empty() {
// Connect discovered peers
Expand Down Expand Up @@ -669,8 +670,8 @@ impl Network {
#[allow(clippy::type_complexity)] // This is only used here
if !link_states.is_empty() {
let (new_idxs, updated_idxs): (
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>,
Vec<(HashSet<ZenohIdProto>, NodeIndex, bool)>,
) = link_states.into_iter().partition(|(_, _, new)| *new);
for link in self.links.values() {
let new_idxs = new_idxs
Expand Down Expand Up @@ -746,7 +747,7 @@ impl Network {
whatami: Some(whatami),
locators: None,
sn: 0,
links: vec![],
links: HashSet::new(),
}),
true,
)
Expand All @@ -756,7 +757,7 @@ impl Network {
tracing::trace!("Update edge (link) {} {}", self.graph[self.idx].zid, zid);
self.update_edge(self.idx, idx);
}
self.graph[self.idx].links.push(zid);
self.graph[self.idx].links.insert(zid);
self.graph[self.idx].sn += 1;

// Send updated self linkstate on all existing links except new one
Expand Down Expand Up @@ -1015,10 +1016,8 @@ impl Network {
}

#[inline]
pub(super) fn get_links(&self, node: ZenohIdProto) -> &[ZenohIdProto] {
self.get_node(&node)
.map(|node| &node.links[..])
.unwrap_or_default()
pub(super) fn get_links(&self, node: ZenohIdProto) -> Option<&HashSet<ZenohIdProto>> {
Some(&self.get_node(&node)?.links)
}
}

Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/router/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ pub(super) fn pubsub_tree_change(
pub(super) fn pubsub_linkstate_change(
tables: &mut Tables,
zid: &ZenohIdProto,
links: &[ZenohIdProto],
links: &HashSet<ZenohIdProto>,
send_declare: &mut SendDeclare,
) {
if let Some(mut src_face) = tables.get_face(zid).cloned() {
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/routing/hat/router/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//
use std::{
borrow::Cow,
collections::HashMap,
collections::{HashMap, HashSet},
sync::{atomic::Ordering, Arc},
};

Expand Down Expand Up @@ -947,7 +947,7 @@ pub(super) fn queries_remove_node(
pub(super) fn queries_linkstate_change(
tables: &mut Tables,
zid: &ZenohIdProto,
links: &[ZenohIdProto],
links: &HashSet<ZenohIdProto>,
send_declare: &mut SendDeclare,
) {
if let Some(mut src_face) = tables.get_face(zid).cloned() {
Expand Down
7 changes: 5 additions & 2 deletions zenoh/src/net/routing/hat/router/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
// ZettaScale Zenoh Team, <[email protected]>
//

use std::sync::{atomic::Ordering, Arc};
use std::{
collections::HashSet,
sync::{atomic::Ordering, Arc},
};

use petgraph::graph::NodeIndex;
use zenoh_protocol::{
Expand Down Expand Up @@ -888,7 +891,7 @@ pub(super) fn token_tree_change(
pub(super) fn token_linkstate_change(
tables: &mut Tables,
zid: &ZenohIdProto,
links: &[ZenohIdProto],
links: &HashSet<ZenohIdProto>,
send_declare: &mut SendDeclare,
) {
if let Some(mut src_face) = tables.get_face(zid).cloned() {
Expand Down

0 comments on commit 183bd4b

Please sign in to comment.