Skip to content

Commit

Permalink
Add gossip_target configuration option (#1678)
Browse files Browse the repository at this point in the history
* Add gossip target option for p2p peers

* Apply gossip_target option to routers and linkstate peers

* homogenize

* add gossip_target to DEFAULT_CONFIG.json5

* Improve DEFAULT_CONFIG.json5 doc

* zenoh::open returns an error if 'client' is defined as gossip target
  • Loading branch information
OlivierHecart authored Jan 7, 2025
1 parent 29746d9 commit e9298c6
Show file tree
Hide file tree
Showing 13 changed files with 154 additions and 81 deletions.
8 changes: 7 additions & 1 deletion DEFAULT_CONFIG.json5
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@
/// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
/// direct connectivity with each other.
multihop: false,
/// Which type of Zenoh instances to send gossip messages to.
/// Accepts a single value (e.g. target: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. target: { router: ["router", "peer"], peer: ["router"] }).
/// Each value is a list of: "peer", "router" and/or "client".
target: { router: ["router", "peer"], peer: ["router", "peer"]},
/// Which type of Zenoh instances to automatically establish sessions with upon discovery on gossip.
/// Accepts a single value (e.g. autoconnect: ["router", "peer"]) which applies whatever the configured "mode" is,
/// or different values for router, peer or client mode (e.g. autoconnect: { router: [], peer: ["router", "peer"] }).
Expand Down Expand Up @@ -180,7 +185,8 @@
/// When set to true a router will forward data between two peers
/// directly connected to it if it detects that those peers are not
/// connected to each other.
/// The failover brokering only works if gossip discovery is enabled.
/// The failover brokering only works if gossip discovery is enabled
/// and peers are configured with gossip target "router".
peers_failover_brokering: true,
},
/// The routing strategy to use in peers and it's configuration.
Expand Down
9 changes: 9 additions & 0 deletions commons/zenoh-config/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,15 @@ pub mod scouting {
pub mod gossip {
pub const enabled: bool = true;
pub const multihop: bool = false;
pub mod target {
pub const router: &crate::WhatAmIMatcher = // "router|peer"
&crate::WhatAmIMatcher::empty().router().peer();
pub const peer: &crate::WhatAmIMatcher = // "router|peer"
&crate::WhatAmIMatcher::empty().router().peer();
pub const client: &crate::WhatAmIMatcher = // ""
&crate::WhatAmIMatcher::empty();
mode_accessor!(crate::WhatAmIMatcher);
}
pub mod autoconnect {
pub const router: &crate::WhatAmIMatcher = // ""
&crate::WhatAmIMatcher::empty();
Expand Down
2 changes: 2 additions & 0 deletions commons/zenoh-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ validated_struct::validator! {
/// It mostly makes sense when using "linkstate" routing mode where all nodes in the subsystem don't have
/// direct connectivity with each other.
multihop: Option<bool>,
/// Which type of Zenoh instances to send gossip messages to.
target: Option<ModeDependentValue<WhatAmIMatcher>>,
/// Which type of Zenoh instances to automatically establish sessions with upon discovery through gossip.
autoconnect: Option<ModeDependentValue<WhatAmIMatcher>>,
},
Expand Down
4 changes: 3 additions & 1 deletion zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ impl HatTables {
pub(crate) struct HatCode {}

impl HatBaseTrait for HatCode {
fn init(&self, _tables: &mut Tables, _runtime: Runtime) {}
fn init(&self, _tables: &mut Tables, _runtime: Runtime) -> ZResult<()> {
Ok(())
}

fn new_tables(&self, _router_peers_failover_brokering: bool) -> Box<dyn Any + Send + Sync> {
Box::new(HatTables::new())
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/net/routing/hat/linkstate_peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,16 @@ impl HatTables {
pub(crate) struct HatCode {}

impl HatBaseTrait for HatCode {
fn init(&self, tables: &mut Tables, runtime: Runtime) {
fn init(&self, tables: &mut Tables, runtime: Runtime) -> ZResult<()> {
let config_guard = runtime.config().lock();
let config = &config_guard.0;
let whatami = tables.whatami;
let gossip = unwrap_or_default!(config.scouting().gossip().enabled());
let gossip_multihop = unwrap_or_default!(config.scouting().gossip().multihop());
let gossip_target = *unwrap_or_default!(config.scouting().gossip().target().get(whatami));
if gossip_target.matches(WhatAmI::Client) {
bail!("\"client\" is not allowed as gossip target")
}
let autoconnect = if gossip {
*unwrap_or_default!(config.scouting().gossip().autoconnect().get(whatami))
} else {
Expand All @@ -205,8 +209,10 @@ impl HatBaseTrait for HatCode {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
));
Ok(())
}

fn new_tables(&self, _router_peers_failover_brokering: bool) -> Box<dyn Any + Send + Sync> {
Expand Down
77 changes: 44 additions & 33 deletions zenoh/src/net/routing/hat/linkstate_peer/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::net::{
runtime::{Runtime, WeakRuntime},
};

#[derive(Clone)]
#[derive(Clone, Default)]
struct Details {
zid: bool,
locators: bool,
Expand Down Expand Up @@ -119,6 +119,7 @@ pub(super) struct Network {
pub(super) router_peers_failover_brokering: bool,
pub(super) gossip: bool,
pub(super) gossip_multihop: bool,
pub(super) gossip_target: WhatAmIMatcher,
pub(super) autoconnect: WhatAmIMatcher,
pub(super) idx: NodeIndex,
pub(super) links: VecMap<Link>,
Expand All @@ -138,6 +139,7 @@ impl Network {
router_peers_failover_brokering: bool,
gossip: bool,
gossip_multihop: bool,
gossip_target: WhatAmIMatcher,
autoconnect: WhatAmIMatcher,
) -> Self {
let mut graph = petgraph::stable_graph::StableGraph::default();
Expand All @@ -155,6 +157,7 @@ impl Network {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
idx,
links: VecMap::new(),
Expand Down Expand Up @@ -225,7 +228,7 @@ impl Network {
idx
}

fn make_link_state(&self, idx: NodeIndex, details: Details) -> LinkState {
fn make_link_state(&self, idx: NodeIndex, details: &Details) -> LinkState {
let links = if details.links {
self.graph[idx]
.links
Expand Down Expand Up @@ -268,10 +271,10 @@ impl Network {
}
}

fn make_msg(&self, idxs: Vec<(NodeIndex, Details)>) -> Result<NetworkMessage, DidntWrite> {
fn make_msg(&self, idxs: &Vec<(NodeIndex, Details)>) -> Result<NetworkMessage, DidntWrite> {
let mut link_states = vec![];
for (idx, details) in idxs {
link_states.push(self.make_link_state(idx, details));
link_states.push(self.make_link_state(*idx, details));
}
let codec = Zenoh080Routing::new();
let mut buf = ZBuf::empty();
Expand All @@ -285,8 +288,11 @@ impl Network {
.into())
}

fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) {
if let Ok(msg) = self.make_msg(idxs) {
fn send_on_link(&self, mut idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) {
for idx in &mut idxs {
idx.1.locators = self.propagate_locators(idx.0, transport);
}
if let Ok(msg) = self.make_msg(&idxs) {
tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg);
if let Err(e) = transport.schedule(msg) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
Expand All @@ -296,30 +302,35 @@ impl Network {
}
}

fn send_on_links<P>(&self, idxs: Vec<(NodeIndex, Details)>, mut parameters: P)
fn send_on_links<P>(&self, mut idxs: Vec<(NodeIndex, Details)>, mut parameters: P)
where
P: FnMut(&Link) -> bool,
{
if let Ok(msg) = self.make_msg(idxs) {
for link in self.links.values() {
for link in self.links.values() {
for idx in &mut idxs {
idx.1.locators = self.propagate_locators(idx.0, &link.transport);
}
if let Ok(msg) = self.make_msg(&idxs) {
if parameters(link) {
tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg);
if let Err(e) = link.transport.schedule(msg.clone()) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
}
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
}

// Indicates if locators should be included when propagating Linkstate message
// from the given node.
// Returns true if gossip is enabled and if multihop gossip is enabled or
// the node is one of self neighbours.
fn propagate_locators(&self, idx: NodeIndex) -> bool {
fn propagate_locators(&self, idx: NodeIndex, target: &TransportUnicast) -> bool {
let target_whatami = target.get_whatami().unwrap_or_default();
self.gossip
&& self.gossip_target.matches(target_whatami)
&& (self.gossip_multihop
|| idx == self.idx
|| self.links.values().any(|link| {
Expand Down Expand Up @@ -491,8 +502,8 @@ impl Network {
idx,
Details {
zid: true,
locators: true,
links: false,
..Default::default()
},
)],
|link| link.zid != zid,
Expand Down Expand Up @@ -657,20 +668,20 @@ impl Network {
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
Vec<(Vec<ZenohIdProto>, NodeIndex, bool)>,
) = link_states.into_iter().partition(|(_, _, new)| *new);
let new_idxs = new_idxs
.into_iter()
.map(|(_, idx1, _new_node)| {
(
idx1,
Details {
zid: true,
locators: self.propagate_locators(idx1),
links: true,
},
)
})
.collect::<Vec<(NodeIndex, Details)>>();
for link in self.links.values() {
let new_idxs = new_idxs
.iter()
.map(|(_, idx1, _new_node)| {
(
*idx1,
Details {
zid: true,
links: true,
..Default::default()
},
)
})
.collect::<Vec<(NodeIndex, Details)>>();
if link.zid != src {
let updated_idxs: Vec<(NodeIndex, Details)> = updated_idxs
.clone()
Expand All @@ -681,8 +692,8 @@ impl Network {
idx1,
Details {
zid: false,
locators: self.propagate_locators(idx1),
links: true,
..Default::default()
},
))
} else {
Expand Down Expand Up @@ -761,16 +772,16 @@ impl Network {
idx,
Details {
zid: true,
locators: false,
links: false,
..Default::default()
},
),
(
self.idx,
Details {
zid: false,
locators: self.propagate_locators(idx),
links: true,
..Default::default()
},
),
]
Expand All @@ -779,8 +790,8 @@ impl Network {
self.idx,
Details {
zid: false,
locators: self.propagate_locators(idx),
links: true,
..Default::default()
},
)]
},
Expand All @@ -806,11 +817,11 @@ impl Network {
idx,
Details {
zid: true,
locators: self.propagate_locators(idx),
links: self.full_linkstate
|| (self.router_peers_failover_brokering
&& idx == self.idx
&& whatami == WhatAmI::Router),
..Default::default()
},
)
})
Expand Down Expand Up @@ -840,8 +851,8 @@ impl Network {
self.idx,
Details {
zid: false,
locators: self.gossip,
links: true,
..Default::default()
},
)],
|_| true,
Expand All @@ -858,8 +869,8 @@ impl Network {
self.idx,
Details {
zid: false,
locators: self.gossip,
links: true,
..Default::default()
},
)],
|link| {
Expand Down
2 changes: 1 addition & 1 deletion zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub(crate) trait HatTrait:
}

pub(crate) trait HatBaseTrait {
fn init(&self, tables: &mut Tables, runtime: Runtime);
fn init(&self, tables: &mut Tables, runtime: Runtime) -> ZResult<()>;

fn new_tables(&self, router_peers_failover_brokering: bool) -> Box<dyn Any + Send + Sync>;

Expand Down
27 changes: 20 additions & 7 deletions zenoh/src/net/routing/hat/p2p_peer/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub(super) struct Network {
pub(super) router_peers_failover_brokering: bool,
pub(super) gossip: bool,
pub(super) gossip_multihop: bool,
pub(super) gossip_target: WhatAmIMatcher,
pub(super) autoconnect: WhatAmIMatcher,
pub(super) wait_declares: bool,
pub(super) idx: NodeIndex,
Expand All @@ -113,6 +114,7 @@ impl Network {
router_peers_failover_brokering: bool,
gossip: bool,
gossip_multihop: bool,
gossip_target: WhatAmIMatcher,
autoconnect: WhatAmIMatcher,
wait_declares: bool,
) -> Self {
Expand All @@ -130,6 +132,7 @@ impl Network {
router_peers_failover_brokering,
gossip,
gossip_multihop,
gossip_target,
autoconnect,
wait_declares,
idx,
Expand Down Expand Up @@ -231,13 +234,18 @@ impl Network {
}

fn send_on_link(&self, idxs: Vec<(NodeIndex, Details)>, transport: &TransportUnicast) {
if let Ok(msg) = self.make_msg(idxs) {
tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg);
if let Err(e) = transport.schedule(msg) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
if transport
.get_whatami()
.is_ok_and(|w| self.gossip_target.matches(w))
{
if let Ok(msg) = self.make_msg(idxs) {
tracing::trace!("{} Send to {:?} {:?}", self.name, transport.get_zid(), msg);
if let Err(e) = transport.schedule(msg) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
} else {
tracing::error!("Failed to encode Linkstate message");
}
}

Expand All @@ -247,7 +255,12 @@ impl Network {
{
if let Ok(msg) = self.make_msg(idxs) {
for link in self.links.values() {
if parameters(link) {
if link
.transport
.get_whatami()
.is_ok_and(|w| self.gossip_target.matches(w))
&& parameters(link)
{
tracing::trace!("{} Send to {} {:?}", self.name, link.zid, msg);
if let Err(e) = link.transport.schedule(msg.clone()) {
tracing::debug!("{} Error sending LinkStateList: {}", self.name, e);
Expand Down
Loading

0 comments on commit e9298c6

Please sign in to comment.