Skip to content

Commit

Permalink
Make Interceptor framework updatable
Browse files Browse the repository at this point in the history
  • Loading branch information
sashacmc committed Feb 7, 2025
1 parent 69f810d commit 9424893
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 35 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions zenoh/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ zenoh-util = { workspace = true }
zenoh-runtime = { workspace = true }
zenoh-task = { workspace = true }
once_cell = { workspace = true }
arc-swap = "1.7.1"

[dev-dependencies]
tokio = { workspace = true }
Expand Down
66 changes: 39 additions & 27 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::sync::OnceLock;
use arc_swap::ArcSwap;
use std::sync::{Arc, OnceLock};

use zenoh_protocol::{
core::Reliability,
Expand All @@ -32,15 +33,15 @@ use crate::net::routing::{
pub struct Mux {
pub handler: TransportUnicast,
pub(crate) face: OnceLock<WeakFace>,
pub(crate) interceptor: InterceptorsChain,
pub(crate) interceptor: ArcSwap<InterceptorsChain>,
}

impl Mux {
pub(crate) fn new(handler: TransportUnicast, interceptor: InterceptorsChain) -> Mux {
pub(crate) fn new(handler: TransportUnicast, interceptor: Arc<InterceptorsChain>) -> Mux {
Mux {
handler,
face: OnceLock::new(),
interceptor,
interceptor: ArcSwap::new(interceptor),
}
}
}
Expand All @@ -67,7 +68,7 @@ impl EPrimitives for Mux {
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = self.interceptor.load().intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -93,7 +94,7 @@ impl EPrimitives for Mux {
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = self.interceptor.load().intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -105,7 +106,8 @@ impl EPrimitives for Mux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -115,7 +117,7 @@ impl EPrimitives for Mux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -130,7 +132,8 @@ impl EPrimitives for Mux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -140,7 +143,7 @@ impl EPrimitives for Mux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -155,7 +158,8 @@ impl EPrimitives for Mux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -165,7 +169,7 @@ impl EPrimitives for Mux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -180,7 +184,8 @@ impl EPrimitives for Mux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -190,7 +195,7 @@ impl EPrimitives for Mux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -206,15 +211,18 @@ impl EPrimitives for Mux {
pub struct McastMux {
pub handler: TransportMulticast,
pub(crate) face: OnceLock<Face>,
pub(crate) interceptor: InterceptorsChain,
pub(crate) interceptor: ArcSwap<InterceptorsChain>,
}

impl McastMux {
pub(crate) fn new(handler: TransportMulticast, interceptor: InterceptorsChain) -> McastMux {
pub(crate) fn new(
handler: TransportMulticast,
interceptor: Arc<InterceptorsChain>,
) -> McastMux {
McastMux {
handler,
face: OnceLock::new(),
interceptor,
interceptor: ArcSwap::new(interceptor),
}
}
}
Expand All @@ -241,7 +249,7 @@ impl EPrimitives for McastMux {
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = self.interceptor.load().intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -267,7 +275,7 @@ impl EPrimitives for McastMux {
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = self.interceptor.load().intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -279,7 +287,8 @@ impl EPrimitives for McastMux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -289,7 +298,7 @@ impl EPrimitives for McastMux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -304,7 +313,8 @@ impl EPrimitives for McastMux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -314,7 +324,7 @@ impl EPrimitives for McastMux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -329,7 +339,8 @@ impl EPrimitives for McastMux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -339,7 +350,7 @@ impl EPrimitives for McastMux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand All @@ -354,7 +365,8 @@ impl EPrimitives for McastMux {
#[cfg(feature = "stats")]
size: None,
};
if self.interceptor.interceptors.is_empty() {
let interceptor = self.interceptor.load();
if interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
Expand All @@ -364,7 +376,7 @@ impl EPrimitives for McastMux {
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
if let Some(ctx) = interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
Expand Down
51 changes: 45 additions & 6 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::{
time::Duration,
};

use arc_swap::ArcSwapOption;
use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, Reliability, WhatAmI, WireExpr, ZenohIdProto},
Expand Down Expand Up @@ -46,7 +47,10 @@ use crate::{
primitives::{McastMux, Mux, Primitives},
routing::{
dispatcher::interests::finalize_pending_interests,
interceptor::{InterceptorTrait, InterceptorsChain},
interceptor::{
EgressInterceptor, IngressInterceptor, InterceptorFactory, InterceptorTrait,
InterceptorsChain,
},
},
},
};
Expand All @@ -73,7 +77,7 @@ pub struct FaceState {
pub(crate) next_qid: RequestId,
pub(crate) pending_queries: HashMap<RequestId, (Arc<Query>, CancellationToken)>,
pub(crate) mcast_group: Option<TransportMulticast>,
pub(crate) in_interceptors: Option<Arc<InterceptorsChain>>,
pub(crate) in_interceptors: ArcSwapOption<InterceptorsChain>,
pub(crate) hat: Box<dyn Any + Send + Sync>,
pub(crate) task_controller: TaskController,
}
Expand Down Expand Up @@ -105,7 +109,7 @@ impl FaceState {
next_qid: 0,
pending_queries: HashMap::new(),
mcast_group,
in_interceptors,
in_interceptors: in_interceptors.into(),
hat,
task_controller: TaskController::default(),
})
Expand Down Expand Up @@ -145,7 +149,7 @@ impl FaceState {

pub(crate) fn update_interceptors_caches(&self, res: &mut Arc<Resource>) {
if let Ok(expr) = KeyExpr::try_from(res.expr().to_string()) {
if let Some(interceptor) = self.in_interceptors.as_ref() {
if let Some(interceptor) = self.in_interceptors.load().as_ref() {
let cache = interceptor.compute_keyexpr_cache(&expr);
get_mut_unchecked(
get_mut_unchecked(res)
Expand All @@ -156,7 +160,7 @@ impl FaceState {
.in_interceptor_cache = cache;
}
if let Some(mux) = self.primitives.as_any().downcast_ref::<Mux>() {
let cache = mux.interceptor.compute_keyexpr_cache(&expr);
let cache = mux.interceptor.load().compute_keyexpr_cache(&expr);
get_mut_unchecked(
get_mut_unchecked(res)
.session_ctxs
Expand All @@ -166,7 +170,7 @@ impl FaceState {
.e_interceptor_cache = cache;
}
if let Some(mux) = self.primitives.as_any().downcast_ref::<McastMux>() {
let cache = mux.interceptor.compute_keyexpr_cache(&expr);
let cache = mux.interceptor.load().compute_keyexpr_cache(&expr);
get_mut_unchecked(
get_mut_unchecked(res)
.session_ctxs
Expand All @@ -177,6 +181,41 @@ impl FaceState {
}
}
}

pub(crate) fn regen_interceptors(&self, factories: &Vec<InterceptorFactory>) {
if let Some(mux) = self.primitives.as_any().downcast_ref::<&mut Mux>() {
let (ingress, egress): (Vec<_>, Vec<_>) = factories
.iter()
.map(|itor| itor.new_transport_unicast(&mux.handler))
.unzip();
let (ingress, egress) = (
Arc::new(InterceptorsChain::from(
ingress.into_iter().flatten().collect::<Vec<_>>(),
)),
InterceptorsChain::from(egress.into_iter().flatten().collect::<Vec<_>>()),
);
mux.interceptor.store(Arc::new(egress));
self.in_interceptors.store(Some(ingress));
}
if let Some(mux) = self.primitives.as_any().downcast_ref::<&mut McastMux>() {
let interceptor = InterceptorsChain::from(
factories
.iter()
.filter_map(|itor| itor.new_transport_multicast(&mux.handler))
.collect::<Vec<EgressInterceptor>>(),
);
mux.interceptor.store(Arc::new(interceptor));
}
if let Some(transport) = &self.mcast_group {
let interceptor = Arc::new(InterceptorsChain::from(
factories
.iter()
.filter_map(|itor| itor.new_peer_multicast(&transport))
.collect::<Vec<IngressInterceptor>>(),
));
self.in_interceptors.store(Some(interceptor));
}
}
}

impl fmt::Display for FaceState {
Expand Down
7 changes: 7 additions & 0 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ impl Tables {
pub(crate) fn disable_all_routes(&mut self) {
self.routes_version = self.routes_version.saturating_add(1);
}

#[allow(dead_code)]
pub(crate) fn regen_interceptors(&self) {
self.faces
.values()
.for_each(|face| face.regen_interceptors(&self.interceptors));
}
}

pub struct TablesLock {
Expand Down
Loading

0 comments on commit 9424893

Please sign in to comment.