Skip to content

Commit

Permalink
Add publishers and queriers to admin space (#1682)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart authored Jan 6, 2025
1 parent f0bdbea commit de8e257
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 1 deletion.
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,25 @@ impl HatPubSubTrait for HatCode {
Vec::from_iter(subs)
}

fn get_publications(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.subscribers() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_data_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,25 @@ impl HatQueriesTrait for HatCode {
Vec::from_iter(qabls)
}

fn get_queriers(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.queryables() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_query_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,25 @@ impl HatPubSubTrait for HatCode {
.collect()
}

fn get_publications(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.subscribers() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_data_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/linkstate_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -936,6 +936,25 @@ impl HatQueriesTrait for HatCode {
.collect()
}

fn get_queriers(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.queryables() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_query_route(
&self,
tables: &Tables,
Expand Down
6 changes: 5 additions & 1 deletion zenoh/src/net/routing/hat/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ zconfigurable! {
pub static ref TREES_COMPUTATION_DELAY_MS: u64 = 100;
}

#[derive(serde::Serialize)]
#[derive(Default, serde::Serialize)]
pub(crate) struct Sources {
routers: Vec<ZenohIdProto>,
peers: Vec<ZenohIdProto>,
Expand Down Expand Up @@ -181,6 +181,8 @@ pub(crate) trait HatPubSubTrait {

fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;

fn get_publications(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;

fn compute_data_route(
&self,
tables: &Tables,
Expand Down Expand Up @@ -223,6 +225,8 @@ pub(crate) trait HatQueriesTrait {

fn get_queryables(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;

fn get_queriers(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)>;

fn compute_query_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,25 @@ impl HatPubSubTrait for HatCode {
Vec::from_iter(subs)
}

fn get_publications(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.subscribers() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_data_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/p2p_peer/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,25 @@ impl HatQueriesTrait for HatCode {
Vec::from_iter(qabls)
}

fn get_queriers(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.queryables() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_query_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/router/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1183,6 +1183,25 @@ impl HatPubSubTrait for HatCode {
.collect()
}

fn get_publications(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.subscribers() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_data_route(
&self,
tables: &Tables,
Expand Down
19 changes: 19 additions & 0 deletions zenoh/src/net/routing/hat/router/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,6 +1395,25 @@ impl HatQueriesTrait for HatCode {
.collect()
}

fn get_queriers(&self, tables: &Tables) -> Vec<(Arc<Resource>, Sources)> {
let mut result = HashMap::new();
for face in tables.faces.values() {
for interest in face_hat!(face).remote_interests.values() {
if interest.options.queryables() {
if let Some(res) = interest.res.as_ref() {
let sources = result.entry(res.clone()).or_insert_with(Sources::default);
match face.whatami {
WhatAmI::Router => sources.routers.push(face.zid),
WhatAmI::Peer => sources.peers.push(face.zid),
WhatAmI::Client => sources.clients.push(face.zid),
}
}
}
}
}
result.into_iter().collect()
}

fn compute_query_route(
&self,
tables: &Tables,
Expand Down
60 changes: 60 additions & 0 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,12 +184,24 @@ impl AdminSpace {
.unwrap(),
Arc::new(subscribers_data),
);
handlers.insert(
format!("@/{zid_str}/{whatami_str}/publisher/**")
.try_into()
.unwrap(),
Arc::new(publishers_data),
);
handlers.insert(
format!("@/{zid_str}/{whatami_str}/queryable/**")
.try_into()
.unwrap(),
Arc::new(queryables_data),
);
handlers.insert(
format!("@/{zid_str}/{whatami_str}/queriers/**")
.try_into()
.unwrap(),
Arc::new(queriers_data),
);

#[cfg(feature = "plugins")]
handlers.insert(
Expand Down Expand Up @@ -730,6 +742,30 @@ fn subscribers_data(context: &AdminContext, query: Query) {
}
}

fn publishers_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);
for sub in tables.hat_code.get_publications(&tables) {
let key = KeyExpr::try_from(format!(
"@/{}/{}/publisher/{}",
context.runtime.state.zid,
context.runtime.state.whatami,
sub.0.expr()
))
.unwrap();
if query.key_expr().intersects(&key) {
let payload =
ZBytes::from(serde_json::to_string(&sub.1).unwrap_or_else(|_| "{}".to_string()));
if let Err(e) = query
.reply(key, payload)
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
}
}

fn queryables_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);
for qabl in tables.hat_code.get_queryables(&tables) {
Expand All @@ -754,6 +790,30 @@ fn queryables_data(context: &AdminContext, query: Query) {
}
}

fn queriers_data(context: &AdminContext, query: Query) {
let tables = zread!(context.runtime.state.router.tables.tables);
for sub in tables.hat_code.get_queriers(&tables) {
let key = KeyExpr::try_from(format!(
"@/{}/{}/querier/{}",
context.runtime.state.zid,
context.runtime.state.whatami,
sub.0.expr()
))
.unwrap();
if query.key_expr().intersects(&key) {
let payload =
ZBytes::from(serde_json::to_string(&sub.1).unwrap_or_else(|_| "{}".to_string()));
if let Err(e) = query
.reply(key, payload)
.encoding(Encoding::APPLICATION_JSON)
.wait()
{
tracing::error!("Error sending AdminSpace reply: {:?}", e);
}
}
}
}

#[cfg(feature = "plugins")]
fn plugins_data(context: &AdminContext, query: Query) {
let guard = context.runtime.plugins_manager();
Expand Down

0 comments on commit de8e257

Please sign in to comment.