From de8e257d7baeb1d6afd0d8d42712018f2a4e8136 Mon Sep 17 00:00:00 2001 From: OlivierHecart Date: Mon, 6 Jan 2025 14:36:28 +0100 Subject: [PATCH] Add publishers and queriers to admin space (#1682) --- zenoh/src/net/routing/hat/client/pubsub.rs | 19 ++++++ zenoh/src/net/routing/hat/client/queries.rs | 19 ++++++ .../net/routing/hat/linkstate_peer/pubsub.rs | 19 ++++++ .../net/routing/hat/linkstate_peer/queries.rs | 19 ++++++ zenoh/src/net/routing/hat/mod.rs | 6 +- zenoh/src/net/routing/hat/p2p_peer/pubsub.rs | 19 ++++++ zenoh/src/net/routing/hat/p2p_peer/queries.rs | 19 ++++++ zenoh/src/net/routing/hat/router/pubsub.rs | 19 ++++++ zenoh/src/net/routing/hat/router/queries.rs | 19 ++++++ zenoh/src/net/runtime/adminspace.rs | 60 +++++++++++++++++++ 10 files changed, 217 insertions(+), 1 deletion(-) diff --git a/zenoh/src/net/routing/hat/client/pubsub.rs b/zenoh/src/net/routing/hat/client/pubsub.rs index cf92614e5f..5214185852 100644 --- a/zenoh/src/net/routing/hat/client/pubsub.rs +++ b/zenoh/src/net/routing/hat/client/pubsub.rs @@ -307,6 +307,25 @@ impl HatPubSubTrait for HatCode { Vec::from_iter(subs) } + fn get_publications(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.subscribers() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_data_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/client/queries.rs b/zenoh/src/net/routing/hat/client/queries.rs index 7f2f9cb8d9..cfd675f0f6 100644 --- a/zenoh/src/net/routing/hat/client/queries.rs +++ b/zenoh/src/net/routing/hat/client/queries.rs @@ -327,6 +327,25 @@ impl HatQueriesTrait for HatCode { Vec::from_iter(qabls) } + fn get_queriers(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.queryables() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_query_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs index fc2074c0b9..83966ff5db 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs @@ -857,6 +857,25 @@ impl HatPubSubTrait for HatCode { .collect() } + fn get_publications(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.subscribers() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_data_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs index 2f7772193a..4d1949771c 100644 --- a/zenoh/src/net/routing/hat/linkstate_peer/queries.rs +++ b/zenoh/src/net/routing/hat/linkstate_peer/queries.rs @@ -936,6 +936,25 @@ impl HatQueriesTrait for HatCode { .collect() } + fn get_queriers(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.queryables() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_query_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/mod.rs b/zenoh/src/net/routing/hat/mod.rs index 92d33115f1..bb9db3536c 100644 --- a/zenoh/src/net/routing/hat/mod.rs +++ b/zenoh/src/net/routing/hat/mod.rs @@ -53,7 +53,7 @@ zconfigurable! { pub static ref TREES_COMPUTATION_DELAY_MS: u64 = 100; } -#[derive(serde::Serialize)] +#[derive(Default, serde::Serialize)] pub(crate) struct Sources { routers: Vec, peers: Vec, @@ -181,6 +181,8 @@ pub(crate) trait HatPubSubTrait { fn get_subscriptions(&self, tables: &Tables) -> Vec<(Arc, Sources)>; + fn get_publications(&self, tables: &Tables) -> Vec<(Arc, Sources)>; + fn compute_data_route( &self, tables: &Tables, @@ -223,6 +225,8 @@ pub(crate) trait HatQueriesTrait { fn get_queryables(&self, tables: &Tables) -> Vec<(Arc, Sources)>; + fn get_queriers(&self, tables: &Tables) -> Vec<(Arc, Sources)>; + fn compute_query_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs index 8a050d5c03..ce4295b810 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/pubsub.rs @@ -575,6 +575,25 @@ impl HatPubSubTrait for HatCode { Vec::from_iter(subs) } + fn get_publications(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.subscribers() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_data_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/p2p_peer/queries.rs b/zenoh/src/net/routing/hat/p2p_peer/queries.rs index db67952745..c7dcedf9c0 100644 --- a/zenoh/src/net/routing/hat/p2p_peer/queries.rs +++ b/zenoh/src/net/routing/hat/p2p_peer/queries.rs @@ -567,6 +567,25 @@ impl HatQueriesTrait for HatCode { Vec::from_iter(qabls) } + fn get_queriers(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.queryables() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_query_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/router/pubsub.rs b/zenoh/src/net/routing/hat/router/pubsub.rs index a1fb8de164..0ff94ef325 100644 --- a/zenoh/src/net/routing/hat/router/pubsub.rs +++ b/zenoh/src/net/routing/hat/router/pubsub.rs @@ -1183,6 +1183,25 @@ impl HatPubSubTrait for HatCode { .collect() } + fn get_publications(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.subscribers() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_data_route( &self, tables: &Tables, diff --git a/zenoh/src/net/routing/hat/router/queries.rs b/zenoh/src/net/routing/hat/router/queries.rs index 6739f22bb9..bd8ff3c330 100644 --- a/zenoh/src/net/routing/hat/router/queries.rs +++ b/zenoh/src/net/routing/hat/router/queries.rs @@ -1395,6 +1395,25 @@ impl HatQueriesTrait for HatCode { .collect() } + fn get_queriers(&self, tables: &Tables) -> Vec<(Arc, Sources)> { + let mut result = HashMap::new(); + for face in tables.faces.values() { + for interest in face_hat!(face).remote_interests.values() { + if interest.options.queryables() { + if let Some(res) = interest.res.as_ref() { + let sources = result.entry(res.clone()).or_insert_with(Sources::default); + match face.whatami { + WhatAmI::Router => sources.routers.push(face.zid), + WhatAmI::Peer => sources.peers.push(face.zid), + WhatAmI::Client => sources.clients.push(face.zid), + } + } + } + } + } + result.into_iter().collect() + } + fn compute_query_route( &self, tables: &Tables, diff --git a/zenoh/src/net/runtime/adminspace.rs b/zenoh/src/net/runtime/adminspace.rs index 6271bcf05a..3085e4dea5 100644 --- a/zenoh/src/net/runtime/adminspace.rs +++ b/zenoh/src/net/runtime/adminspace.rs @@ -184,12 +184,24 @@ impl AdminSpace { .unwrap(), Arc::new(subscribers_data), ); + handlers.insert( + format!("@/{zid_str}/{whatami_str}/publisher/**") + .try_into() + .unwrap(), + Arc::new(publishers_data), + ); handlers.insert( format!("@/{zid_str}/{whatami_str}/queryable/**") .try_into() .unwrap(), Arc::new(queryables_data), ); + handlers.insert( + format!("@/{zid_str}/{whatami_str}/queriers/**") + .try_into() + .unwrap(), + Arc::new(queriers_data), + ); #[cfg(feature = "plugins")] handlers.insert( @@ -730,6 +742,30 @@ fn subscribers_data(context: &AdminContext, query: Query) { } } +fn publishers_data(context: &AdminContext, query: Query) { + let tables = zread!(context.runtime.state.router.tables.tables); + for sub in tables.hat_code.get_publications(&tables) { + let key = KeyExpr::try_from(format!( + "@/{}/{}/publisher/{}", + context.runtime.state.zid, + context.runtime.state.whatami, + sub.0.expr() + )) + .unwrap(); + if query.key_expr().intersects(&key) { + let payload = + ZBytes::from(serde_json::to_string(&sub.1).unwrap_or_else(|_| "{}".to_string())); + if let Err(e) = query + .reply(key, payload) + .encoding(Encoding::APPLICATION_JSON) + .wait() + { + tracing::error!("Error sending AdminSpace reply: {:?}", e); + } + } + } +} + fn queryables_data(context: &AdminContext, query: Query) { let tables = zread!(context.runtime.state.router.tables.tables); for qabl in tables.hat_code.get_queryables(&tables) { @@ -754,6 +790,30 @@ fn queryables_data(context: &AdminContext, query: Query) { } } +fn queriers_data(context: &AdminContext, query: Query) { + let tables = zread!(context.runtime.state.router.tables.tables); + for sub in tables.hat_code.get_queriers(&tables) { + let key = KeyExpr::try_from(format!( + "@/{}/{}/querier/{}", + context.runtime.state.zid, + context.runtime.state.whatami, + sub.0.expr() + )) + .unwrap(); + if query.key_expr().intersects(&key) { + let payload = + ZBytes::from(serde_json::to_string(&sub.1).unwrap_or_else(|_| "{}".to_string())); + if let Err(e) = query + .reply(key, payload) + .encoding(Encoding::APPLICATION_JSON) + .wait() + { + tracing::error!("Error sending AdminSpace reply: {:?}", e); + } + } + } +} + #[cfg(feature = "plugins")] fn plugins_data(context: &AdminContext, query: Query) { let guard = context.runtime.plugins_manager();