Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Code reorg
Browse files Browse the repository at this point in the history
OlivierHecart committed Jun 12, 2024
1 parent d380738 commit 28f96e4
Showing 22 changed files with 1,225 additions and 1,180 deletions.
68 changes: 14 additions & 54 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
@@ -22,10 +22,8 @@ use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, WhatAmI, ZenohId},
network::{
declare::ext,
interest::{InterestId, InterestMode, InterestOptions},
Declare, DeclareBody, DeclareFinal, Mapping, Push, Request, RequestId, Response,
ResponseFinal,
Mapping, Push, Request, RequestId, Response, ResponseFinal,
},
zenoh::RequestBody,
};
@@ -35,15 +33,17 @@ use zenoh_transport::multicast::TransportMulticast;
#[cfg(feature = "stats")]
use zenoh_transport::stats::TransportStats;

use super::{super::router::*, resource::*, tables, tables::TablesLock};
use super::{
super::router::*,
interests::{declare_interest, undeclare_interest},
resource::*,
tables::{self, TablesLock},
};
use crate::{
api::key_expr::KeyExpr,
net::{
primitives::{McastMux, Mux, Primitives},
routing::{
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
},
routing::interceptor::{InterceptorTrait, InterceptorsChain},
},
};

@@ -212,57 +212,17 @@ impl Primitives for Face {
fn send_interest(&self, msg: zenoh_protocol::network::Interest) {
let ctrl_lock = zlock!(self.tables.ctrl_lock);
if msg.mode != InterestMode::Final {
if msg.options.keyexprs() && msg.mode != InterestMode::Current {
register_expr_interest(
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
);
}
if msg.options.subscribers() {
declare_sub_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
msg.mode,
msg.options.aggregate(),
);
}
if msg.options.queryables() {
declare_qabl_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
msg.mode,
msg.options.aggregate(),
);
}
if msg.mode != InterestMode::Future {
self.state.primitives.send_declare(RoutingContext::new_out(
Declare {
interest_id: Some(msg.id),
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareFinal(DeclareFinal),
},
self.clone(),
));
}
} else {
unregister_expr_interest(&self.tables, &mut self.state.clone(), msg.id);
undeclare_sub_interest(
declare_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
msg.id,
msg.wire_expr.as_ref(),
msg.mode,
msg.options,
);
undeclare_qabl_interest(
} else {
undeclare_interest(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
123 changes: 123 additions & 0 deletions zenoh/src/net/routing/dispatcher/interests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
//
// Copyright (c) 2023 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <zenoh@zettascale.tech>
//

use std::sync::Arc;

use zenoh_keyexpr::keyexpr;
use zenoh_protocol::{
core::WireExpr,
network::{
declare::ext,
interest::{InterestId, InterestMode, InterestOptions},
Declare, DeclareBody, DeclareFinal,
},
};

use super::{
face::FaceState,
tables::{register_expr_interest, TablesLock},
};
use crate::net::routing::{
hat::HatTrait,
router::{unregister_expr_interest, Resource},
RoutingContext,
};

pub(crate) fn declare_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
expr: Option<&WireExpr>,
mode: InterestMode,
options: InterestOptions,
) {
if options.keyexprs() && mode != InterestMode::Current {
register_expr_interest(tables, face, id, expr);
}

if let Some(expr) = expr {
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
tracing::debug!(
"{} Declare interest {} ({}{})",
face,
id,
prefix.expr(),
expr.suffix
);
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) = if res
.as_ref()
.map(|r| r.context.is_some())
.unwrap_or(false)
{
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_interest(&mut wtables, face, id, Some(&mut res), mode, options);
}
None => tracing::error!(
"{} Declare interest {} for unknown scope {}!",
face,
id,
expr.scope
),
}
} else {
let mut wtables = zwrite!(tables.tables);
hat_code.declare_interest(&mut wtables, face, id, None, mode, options);
}

if mode != InterestMode::Future {
face.primitives.send_declare(RoutingContext::new(Declare {
interest_id: Some(id),
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareFinal(DeclareFinal),
}));
}
}

pub(crate) fn undeclare_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
) {
tracing::debug!("{} Undeclare interest {}", face, id,);
unregister_expr_interest(tables, face, id);
let mut wtables = zwrite!(tables.tables);
hat_code.undeclare_interest(&mut wtables, face, id);
}
1 change: 1 addition & 0 deletions zenoh/src/net/routing/dispatcher/mod.rs
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
pub mod face;
pub mod interests;
pub mod pubsub;
pub mod queries;
pub mod resource;
81 changes: 0 additions & 81 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
@@ -18,7 +18,6 @@ use zenoh_protocol::{
core::{key_expr::keyexpr, WhatAmI, WireExpr},
network::{
declare::{ext, subscriber::ext::SubscriberInfo, SubscriberId},
interest::{InterestId, InterestMode},
Push,
},
zenoh::PushBody,
@@ -34,86 +33,6 @@ use super::{
use crate::key_expr::KeyExpr;
use crate::net::routing::hat::HatTrait;

pub(crate) fn declare_sub_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
expr: Option<&WireExpr>,
mode: InterestMode,
aggregate: bool,
) {
if let Some(expr) = expr {
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
tracing::debug!(
"{} Declare sub interest {} ({}{})",
face,
id,
prefix.expr(),
expr.suffix
);
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) = if res
.as_ref()
.map(|r| r.context.is_some())
.unwrap_or(false)
{
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_sub_interest(
&mut wtables,
face,
id,
Some(&mut res),
mode,
aggregate,
);
}
None => tracing::error!(
"{} Declare sub interest {} for unknown scope {}!",
face,
id,
expr.scope
),
}
} else {
let mut wtables = zwrite!(tables.tables);
hat_code.declare_sub_interest(&mut wtables, face, id, None, mode, aggregate);
}
}

pub(crate) fn undeclare_sub_interest(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
face: &mut Arc<FaceState>,
id: InterestId,
) {
tracing::debug!("{} Undeclare sub interest {}", face, id,);
let mut wtables = zwrite!(tables.tables);
hat_code.undeclare_sub_interest(&mut wtables, face, id);
}

pub(crate) fn declare_subscription(
hat_code: &(dyn HatTrait + Send + Sync),
tables: &TablesLock,
Loading

0 comments on commit 28f96e4

Please sign in to comment.