Skip to content

Commit

Permalink
Move subscribe_path and mount_point fields to Peer structure
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 22, 2024
1 parent 0c0f602 commit 4041e40
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 106 deletions.
132 changes: 67 additions & 65 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use shvrpc::metamethod::{AccessLevel};
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use crate::spawn::spawn_and_log_error;
use futures::select;
use crate::peer;
use crate::{cut_prefix, peer};
use crate::peer::next_peer_id;
use async_std::stream::StreamExt;
use futures::FutureExt;
Expand Down Expand Up @@ -74,7 +74,7 @@ pub(crate) enum BrokerCommand {
SendResponse {peer_id: PeerId, meta: MetaMap, result: Result<RpcValue, RpcError>},
SendSignal {shv_path: String, signal: String, source: String, param: RpcValue},
RpcCall {
client_id: PeerId,
peer_id: PeerId,
request: RpcMessage,
response_sender: Sender<RpcFrame>,
},
Expand Down Expand Up @@ -106,14 +106,15 @@ pub(crate) enum PeerKind {
Device {
user: String,
device_id: Option<String>,
mount_point: String,
subscribe_path: Option<SubscribePath>,
mount_point: Option<String>,
},
}
#[derive(Debug)]
pub(crate) struct Peer {
pub(crate) peer_kind: PeerKind,
pub(crate) sender: Sender<BrokerToPeerMessage>,
pub(crate) mount_point: Option<String>,
pub(crate) subscribe_path: Option<SubscribePath>,
pub(crate) subscriptions: Vec<Subscription>,
pub(crate) forwarded_subscriptions: Vec<ForwardedSubscription>,
}
Expand Down Expand Up @@ -233,15 +234,7 @@ pub(crate) type SharedBrokerState = Arc<RwLock<BrokerState>>;
impl BrokerState {
fn mount_point(&self, peer_id: PeerId) -> Option<String> {
self.peers.get(&peer_id)
.and_then(|peer| {
if let PeerKind::Device { mount_point, .. } = &peer.peer_kind {
if mount_point.is_empty() {
None
} else {
Some(mount_point.clone())
}
} else { None }
})
.and_then(|peer| peer.mount_point.clone())
}
fn grant_for_request(&self, client_id: PeerId, frame: &RpcFrame) -> Result<(Option<i32>, Option<String>), RpcError> {
log!(target: "Access", Level::Debug, "======================= grant_for_request {}", &frame);
Expand Down Expand Up @@ -330,12 +323,11 @@ impl BrokerState {
}
}
fn remove_peer(&mut self, peer_id: PeerId) -> shvrpc::Result<Option<String>> {
let mount_point = self.mount_point(peer_id);
self.peers.remove(&peer_id);
let mut mount_point = None;
self.mounts.retain(|k, v| {
self.mounts.retain(|_k, v| {
if let Mount::Peer(id) = v {
if *id == peer_id {
mount_point = Some(k.clone());
return false
}
}
Expand All @@ -345,12 +337,8 @@ impl BrokerState {
}
fn set_subscribe_path(&mut self, peer_id: PeerId, subscribe_path: SubscribePath) -> shvrpc::Result<()> {
let peer = self.peers.get_mut(&peer_id).ok_or("Peer not found")?;
if let PeerKind::Device { subscribe_path: subpath, .. } = &mut peer.peer_kind {
*subpath = Some(subscribe_path);
Ok(())
} else {
Err("Not a device".into())
}
peer.subscribe_path = Some(subscribe_path);
Ok(())
}
fn add_peer(&mut self, peer_id: PeerId, peer_kind: PeerKind, sender: Sender<BrokerToPeerMessage>) -> shvrpc::Result<()> {
if self.peers.contains_key(&peer_id) {
Expand All @@ -359,29 +347,36 @@ impl BrokerState {
panic!("Peer ID: {peer_id} exists already!");
}
let client_path = join_path(DIR_BROKER, &format!("client/{}", peer_id));
let mut peer_kind = peer_kind;
let effective_mount_point = match &mut peer_kind {
let effective_mount_point = match &peer_kind {
PeerKind::Client { .. } => { None }
PeerKind::Broker(connection_kind) => {
match connection_kind {
ConnectionKind::ToParentBroker { .. } => { None }
ConnectionKind::ToChildBroker { mount_point, .. } => { Some(mount_point.to_string()) }
ConnectionKind::ToChildBroker { mount_point, .. } => {
if mount_point.is_empty() {
None
} else {
Some(mount_point.to_string())
}
}
}
}
PeerKind::Device { device_id, mount_point, .. } => 'find_mount: {
if mount_point.starts_with("test/") {
info!("Client id: {} mounted on path: '{}'", peer_id, &mount_point);
break 'find_mount Some(mount_point.clone());
if let Some(mount_point) = mount_point {
if mount_point.starts_with("test/") {
info!("Client id: {} mounted on path: '{}'", peer_id, &mount_point);
break 'find_mount Some(mount_point.clone());
}
}
if let Some(device_id) = &device_id {
match self.access.mounts.get(device_id) {
None => {
return Err(format!("Cannot find mount-point for device ID: {device_id}").into());
}
Some(mount) => {
*mount_point = mount.mount_point.clone();
let mount_point = mount.mount_point.clone();
info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point);
break 'find_mount Some(mount_point.clone());
break 'find_mount Some(mount_point);
}
}
}
Expand All @@ -391,6 +386,8 @@ impl BrokerState {
let peer = Peer {
peer_kind,
sender,
mount_point: effective_mount_point.clone(),
subscribe_path: None,
subscriptions: vec![],
forwarded_subscriptions: vec![],
};
Expand Down Expand Up @@ -418,16 +415,16 @@ impl BrokerState {
}
fn peer_to_info(client_id: PeerId, peer: &Peer) -> rpcvalue::Map {
let subs = Self::subscriptions_to_map(&peer.subscriptions);
let (device_id, mount_point) = if let PeerKind::Device { mount_point, device_id, .. } = &peer.peer_kind {
(device_id.clone().unwrap_or_default(), mount_point.clone())
let device_id = if let PeerKind::Device { device_id, .. } = &peer.peer_kind {
device_id.clone().unwrap_or_default()
} else {
("".to_owned(), "".to_owned())
"".to_owned()
};
rpcvalue::Map::from([
("clientId".to_string(), client_id.into()),
("userName".to_string(), RpcValue::from(peer.user().unwrap_or_default())),
("deviceId".to_string(), RpcValue::from(device_id)),
("mountPoint".to_string(), RpcValue::from(mount_point)),
("mountPoint".to_string(), RpcValue::from(peer.mount_point.clone().unwrap_or_default())),
("subscriptions".to_string(), subs.into()),
]
)
Expand All @@ -437,7 +434,7 @@ impl BrokerState {
}
pub(crate) fn mounted_client_info(&self, mount_point: &str) -> Option<rpcvalue::Map> {
for (client_id, peer) in &self.peers {
if let PeerKind::Device {mount_point: mount_point1, ..} = &peer.peer_kind {
if let Some(mount_point1) = &peer.mount_point {
if mount_point1 == mount_point {
return Some(BrokerState::peer_to_info(*client_id, peer));
}
Expand All @@ -464,32 +461,28 @@ impl BrokerState {
let peer = self.peers.get(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
Ok(Self::subscriptions_to_map(&peer.subscriptions))
}
pub(crate) fn subscribe(&mut self, client_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result<bool> {
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
pub(crate) fn subscribe(&mut self, peer_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result<bool> {
let peer = self.peers.get_mut(&peer_id).ok_or_else(|| format!("Invalid client ID: {peer_id}"))?;
if let Some(sub) = peer.subscriptions.iter_mut().find(|sub| sub.param.ri == subpar.ri) {
log!(target: "Subscr", Level::Debug, "Changing subscription TTL for client id: {} - {:?}", client_id, subpar);
log!(target: "Subscr", Level::Debug, "Changing subscription TTL for client id: {} - {:?}", peer_id, subpar);
sub.param.ttl = subpar.ttl;
Ok(false)
} else {
log!(target: "Subscr", Level::Debug, "Adding subscription for client id: {} - {:?}", client_id, subpar);
log!(target: "Subscr", Level::Debug, "Adding subscription for client id: {} - {:?}", peer_id, subpar);
peer.subscriptions.push(Subscription::new(subpar)?);
Ok(true)
}
}
pub(crate) fn unsubscribe(&mut self, client_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result<bool> {
log!(target: "Subscr", Level::Debug, "Removing subscription for client id: {} - {:?}", client_id, subpar);
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
pub(crate) fn unsubscribe(&mut self, peer_id: PeerId, subpar: &SubscriptionParam) -> shvrpc::Result<bool> {
log!(target: "Subscr", Level::Debug, "Removing subscription for client id: {} - {:?}", peer_id, subpar);
let peer = self.peers.get_mut(&peer_id).ok_or_else(|| format!("Invalid client ID: {peer_id}"))?;
let cnt = peer.subscriptions.len();
peer.subscriptions.retain(|subscr| subscr.param.ri != subpar.ri);
Ok(cnt != peer.subscriptions.len())
}
fn is_subscribe_path_resolved(&self, peer_id: PeerId) -> shvrpc::Result<Option<SubscribePath>> {
fn subscribe_path(&self, peer_id: PeerId) -> shvrpc::Result<Option<SubscribePath>> {
let peer = self.peers.get(&peer_id).ok_or_else(|| format!("Invalid peer ID: {peer_id}"))?;
if let PeerKind::Device{ subscribe_path, .. } = &peer.peer_kind {
Ok(subscribe_path.clone())
} else {
Err(format!("Not device: {:?}", peer).into())
}
Ok(peer.subscribe_path.clone())
}
pub(crate) fn gc_subscriptions(&mut self) {
let now = Instant::now();
Expand Down Expand Up @@ -520,17 +513,19 @@ impl BrokerState {
const DEFAULT_TTL: Option<u32> = Some(10 * 60);
let mut to_forward: HashMap<PeerId, HashSet<ShvRI>> = Default::default();
for (peer_id, peer) in &self.peers {
if let PeerKind::Device { mount_point, subscribe_path: Some(SubscribePath::CanSubscribe(_)), .. } = &peer.peer_kind {
if let Some(SubscribePath::CanSubscribe(_)) = &peer.subscribe_path {
for ri in &fwd_subs {
if let Ok(Some((_local, remote))) = split_glob_on_match(ri.path(), mount_point) {
log!(target: "Subscr", Level::Debug, "Schedule forward subscription: {}:{}:{:?}", remote, ri.method(), ri.signal());
let ri = ShvRI::from_path_method_signal(remote, ri.method(), ri.signal())?;
if let Some(val) = to_forward.get_mut(peer_id) {
val.insert(ri);
} else {
let mut set1: HashSet<ShvRI> = Default::default();
set1.insert(ri);
to_forward.insert(*peer_id, set1);
if let Some(mount_point) = &peer.mount_point {
if let Ok(Some((_local, remote))) = split_glob_on_match(ri.path(), mount_point) {
log!(target: "Subscr", Level::Debug, "Schedule forward subscription: {}:{}:{:?}", remote, ri.method(), ri.signal());
let ri = ShvRI::from_path_method_signal(remote, ri.method(), ri.signal())?;
if let Some(val) = to_forward.get_mut(peer_id) {
val.insert(ri);
} else {
let mut set1: HashSet<ShvRI> = Default::default();
set1.insert(ri);
to_forward.insert(*peer_id, set1);
}
}
}
}
Expand Down Expand Up @@ -907,8 +902,15 @@ impl BrokerImpl {
{
let state = state_reader(&self.state);
if let Some(peer) = state.peers.get(&peer_id) {
if let PeerKind::Device {mount_point, .. } = &peer.peer_kind {
let new_path = join_path(mount_point, frame.shv_path().unwrap_or_default());
let mut shv_path = frame.shv_path().unwrap_or_default().to_string();
if let PeerKind::Broker(ConnectionKind::ToChildBroker { shv_root, .. }) = &peer.peer_kind {
// remove shv_root in notifications coming from child broker
if let Some(new_path) = cut_prefix(&shv_path, shv_root) {
shv_path = new_path;
}
}
if let Some(mount_point) = &peer.mount_point {
let new_path = join_path(mount_point, &shv_path);
for (tested_peer_id, peer) in state.peers.iter() {
let ri = ShvRI::from_path_method_signal(&new_path, frame.source().unwrap_or_default(), frame.method())?;
if &peer_id != tested_peer_id && peer.is_signal_subscribed(&ri) {
Expand Down Expand Up @@ -1037,7 +1039,7 @@ impl BrokerImpl {
sender.send(BrokerToPeerMessage::SendFrame(RpcFrame::from_rpcmessage(&msg)?)).await?;
}
}
BrokerCommand::RpcCall { client_id, request, response_sender } => {
BrokerCommand::RpcCall { peer_id: client_id, request, response_sender } => {
let request_meta = request.meta().clone();
let mut rq2 = request;
// broker calls can have any access level, set 'su' to bypass client access control
Expand Down Expand Up @@ -1114,7 +1116,7 @@ impl BrokerImpl {
}
async fn check_subscribe_path(state: SharedBrokerState, client_id: PeerId) -> shvrpc::Result<SubscribePath> {
log!(target: "Subscr", Level::Debug, "check_subscribe_path, peer_id: {client_id}");
if let Some(subpath) = state_reader(&state).is_subscribe_path_resolved(client_id)? {
if let Some(subpath) = state_reader(&state).subscribe_path(client_id)? {
log!(target: "Subscr", Level::Debug, "Device subscribe path resolved already, peer_id: {client_id}, path: {:?}", &subpath);
return Ok(subpath)
}
Expand All @@ -1123,7 +1125,7 @@ impl BrokerImpl {
let (response_sender, response_receiver) = unbounded();
let request = RpcMessage::new_request(path, METH_DIR, Some(METH_SUBSCRIBE.into()));
let cmd = BrokerCommand::RpcCall {
client_id,
peer_id: client_id,
request,
response_sender,
};
Expand Down Expand Up @@ -1169,7 +1171,7 @@ impl BrokerImpl {
pub(crate) async fn renew_forwarded_subscriptions(state: SharedBrokerState) -> shvrpc::Result<()> {
let mut to_subscribe: HashMap<_, _> = Default::default();
for (peer_id, peer) in &mut state_writer(&state).peers {
if let PeerKind::Device { subscribe_path: Some(SubscribePath::CanSubscribe(subpath)), .. } = &peer.peer_kind {
if let Some(SubscribePath::CanSubscribe(subpath)) = &peer.subscribe_path {
let mut to_subscribe_peer: Vec<_> = Default::default();
for subscr in &mut peer.forwarded_subscriptions {
if subscr.subscribed.is_none() {
Expand Down Expand Up @@ -1198,7 +1200,7 @@ impl BrokerImpl {
log!(target: "Subscr", Level::Debug, "call_subscribe, peer_id: {peer_id}, subscriptions: {:?}", &subscription);
let (response_sender, response_receiver) = unbounded();
let cmd = BrokerCommand::RpcCall {
client_id: peer_id,
peer_id,
request: RpcMessage::new_request(subscribe_path, METH_SUBSCRIBE, Some(subscription.to_rpcvalue())),
response_sender,
};
Expand Down
15 changes: 14 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,17 @@ mod spawn {
})
}
}
pub(crate) use spawn::spawn_and_log_error;
pub(crate) use spawn::spawn_and_log_error;

fn cut_prefix(shv_path: &str, prefix: &str) -> Option<String> {
if shv_path.starts_with(prefix) && (shv_path.len() == prefix.len() || shv_path[prefix.len() ..].starts_with('/')) {
let shv_path = &shv_path[prefix.len() ..];
if let Some(stripped_path) = shv_path.strip_prefix('/') {
Some(stripped_path.to_string())
} else {
Some(shv_path.to_string())
}
} else {
None
}
}
Loading

0 comments on commit 4041e40

Please sign in to comment.