diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index 9439b56..8de2907 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -7,7 +7,7 @@ use async_std::channel::{Receiver, Sender, unbounded}; use async_std::{future}; use async_std::net::TcpListener; use log::{debug, error, info, log, warn}; -use crate::config::{AccessConfig, BrokerConfig, Password}; +use crate::config::{AccessConfig, BrokerConfig, ConnectionKind, Password}; use shvrpc::rpcframe::RpcFrame; use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId, Tag}; use shvproto::{Map, MetaMap, RpcValue, rpcvalue}; @@ -62,9 +62,6 @@ pub(crate) enum BrokerCommand { NewPeer { peer_id: PeerId, peer_kind: PeerKind, - user: String, - mount_point: Option, - device_id: Option, sender: Sender, }, FrameReceived { @@ -102,10 +99,12 @@ pub(crate) enum SubscribePath { } #[derive(Debug, Clone)] pub(crate) enum PeerKind { - Client, - ParentBroker, - //ChildBroker, + Client { + user: String, + }, + Broker(ConnectionKind), Device { + user: String, device_id: Option, mount_point: String, subscribe_path: Option, @@ -114,7 +113,6 @@ pub(crate) enum PeerKind { #[derive(Debug)] pub(crate) struct Peer { pub(crate) peer_kind: PeerKind, - pub(crate) user: String, pub(crate) sender: Sender, pub(crate) subscriptions: Vec, pub(crate) forwarded_subscriptions: Vec, @@ -130,7 +128,14 @@ impl Peer { } false } -} + pub(crate) fn user(&self) -> Option<&str> { + match &self.peer_kind { + PeerKind::Client { user, .. } => { Some(user) } + PeerKind::Broker(_) => None, + PeerKind::Device { user, .. } => { Some(user) } + } + } + } pub(crate) enum Mount { Peer(PeerId), @@ -253,37 +258,51 @@ impl BrokerState { return Err(RpcError::new(RpcErrorCode::InvalidRequest, e)) } }; - match peer.peer_kind { - PeerKind::ParentBroker => { - log!(target: "Access", Level::Debug, "ParentBroker: {}", client_id); - let access = frame.tag(Tag::Access as i32); - let access_level = frame.tag(Tag::AccessLevel as i32); - if access_level.is_some() || access.is_some() { - log!(target: "Access", Level::Debug, "\tGranted access: {:?}, access level: {:?}", access, access_level); - Ok((access_level.map(RpcValue::as_i32), access.map(RpcValue::as_str).map(|s| s.to_string()))) - } else { - log!(target: "Access", Level::Debug, "\tPermissionDenied"); - Err(RpcError::new(RpcErrorCode::PermissionDenied, "")) + if let Some(user) = peer.user() { + log!(target: "Access", Level::Debug, "Peer: {}", client_id); + if let Some(flatten_roles) = self.flatten_roles(user) { + log!(target: "Access", Level::Debug, "user: {}, flatten roles: {:?}", user, flatten_roles); + for role_name in flatten_roles { + if let Some(rules) = self.role_access.get(&role_name) { + log!(target: "Access", Level::Debug, "----------- access for role: {}", role_name); + for rule in rules { + log!(target: "Access", Level::Debug, "\trule: {}", rule.glob.as_str()); + if rule.glob.match_shv_ri(&ri) { + log!(target: "Access", Level::Debug, "\t\t HIT"); + return Ok((Some(rule.access_level as i32), Some(rule.access.clone()))); + } + } + } } } - _ => { - log!(target: "Access", Level::Debug, "Peer: {}", client_id); - if let Some(flatten_roles) = self.flatten_roles(&peer.user) { - log!(target: "Access", Level::Debug, "user: {}, flatten roles: {:?}", &peer.user, flatten_roles); - for role_name in flatten_roles { - if let Some(rules) = self.role_access.get(&role_name) { - log!(target: "Access", Level::Debug, "----------- access for role: {}", role_name); - for rule in rules { - log!(target: "Access", Level::Debug, "\trule: {}", rule.glob.as_str()); - if rule.glob.match_shv_ri(&ri) { - log!(target: "Access", Level::Debug, "\t\t HIT"); - return Ok((Some(rule.access_level as i32), Some(rule.access.clone()))); - } + Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", user))) + } else { + match &peer.peer_kind { + PeerKind::Broker(connection_kind) => { + match connection_kind { + ConnectionKind::ToParentBroker { .. } => { + log!(target: "Access", Level::Debug, "ParentBroker: {}", client_id); + let access = frame.tag(Tag::Access as i32); + let access_level = frame.tag(Tag::AccessLevel as i32); + if access_level.is_some() || access.is_some() { + log!(target: "Access", Level::Debug, "\tGranted access: {:?}, access level: {:?}", access, access_level); + Ok((access_level.map(RpcValue::as_i32), access.map(RpcValue::as_str).map(|s| s.to_string()))) + } else { + log!(target: "Access", Level::Debug, "\tPermissionDenied"); + Err(RpcError::new(RpcErrorCode::PermissionDenied, "")) } } + ConnectionKind::ToChildBroker { .. } => { + // requests from child broker should not be allowed + log!(target: "Access", Level::Debug, "\tPermissionDenied"); + Err(RpcError::new(RpcErrorCode::PermissionDenied, "")) + } } } - Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", peer.user))) + _ => { + log!(target: "Access", Level::Debug, "\tWeird peer kind"); + Err(RpcError::new(RpcErrorCode::PermissionDenied, "")) + } } } } @@ -333,50 +352,44 @@ impl BrokerState { Err("Not a device".into()) } } - fn add_peer(&mut self, peer_id: PeerId, user: String, peer_kind: PeerKind, mount_point: Option, device_id: Option, sender: Sender) -> shvrpc::Result<()> { + fn add_peer(&mut self, peer_id: PeerId, peer_kind: PeerKind, sender: Sender) -> shvrpc::Result<()> { if self.peers.contains_key(&peer_id) { // this might happen when connection to parent broker is restored // after parent broker reset - // note that parent broker connection has always ID == 1 panic!("Peer ID: {peer_id} exists already!"); } let client_path = join_path(DIR_BROKER, &format!("client/{}", peer_id)); - let effective_mount_point = 'mount_point: { - if let Some(ref mount_point) = mount_point { + let mut peer_kind = peer_kind; + let effective_mount_point = match &mut peer_kind { + PeerKind::Client { .. } => { None } + PeerKind::Broker(connection_kind) => { + match connection_kind { + ConnectionKind::ToParentBroker { .. } => { None } + ConnectionKind::ToChildBroker { mount_point, .. } => { Some(mount_point.to_string()) } + } + } + PeerKind::Device { device_id, mount_point, .. } => loop { if mount_point.starts_with("test/") { info!("Client id: {} mounted on path: '{}'", peer_id, &mount_point); - break 'mount_point Some(mount_point.clone()); + break Some(mount_point.clone()); } - } - if let Some(device_id) = &device_id { - match self.access.mounts.get(device_id) { - None => { - warn!("Cannot find mount-point for device ID: {device_id}"); - None - } - Some(mount) => { - let mount_point = mount.mount_point.clone(); - info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point); - Some(mount_point) + if let Some(device_id) = &device_id { + match self.access.mounts.get(device_id) { + None => { + return Err(format!("Cannot find mount-point for device ID: {device_id}").into()); + } + Some(mount) => { + *mount_point = mount.mount_point.clone(); + info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point); + break Some(mount_point.clone()); + } } } - } else { - None - } - }; - let effective_peer_kind = match peer_kind { - PeerKind::ParentBroker => { PeerKind::ParentBroker } - _ => { - if let Some(mount_point) = &effective_mount_point { - PeerKind::Device{ device_id, mount_point: mount_point.clone(), subscribe_path: None } - } else { - PeerKind::Client - } + break None } }; let peer = Peer { - peer_kind: effective_peer_kind, - user, + peer_kind, sender, subscriptions: vec![], forwarded_subscriptions: vec![], @@ -412,7 +425,7 @@ impl BrokerState { }; rpcvalue::Map::from([ ("clientId".to_string(), client_id.into()), - ("userName".to_string(), RpcValue::from(&peer.user)), + ("userName".to_string(), RpcValue::from(peer.user().unwrap_or_default())), ("deviceId".to_string(), RpcValue::from(device_id)), ("mountPoint".to_string(), RpcValue::from(mount_point)), ("subscriptions".to_string(), subs.into()), @@ -974,13 +987,10 @@ impl BrokerImpl { } BrokerCommand::NewPeer { peer_id, - user, peer_kind, - mount_point, - device_id, sender} => { info!("New peer, id: {peer_id}."); - state_writer(&self.state).add_peer(peer_id, user, peer_kind, mount_point, device_id, sender)?; + state_writer(&self.state).add_peer(peer_id, peer_kind, sender)?; let mount_point = state_reader(&self.state).mount_point(peer_id); if let Some(mount_point) = mount_point { let (shv_path, dir) = split_mount_point(&mount_point)?; diff --git a/src/config.rs b/src/config.rs index d12ceb2..0cd98b8 100644 --- a/src/config.rs +++ b/src/config.rs @@ -19,22 +19,22 @@ pub struct BrokerConfig { pub access: AccessConfig, } #[derive(Serialize, Deserialize, Clone, Debug)] -pub enum TreeDirection { +pub enum ConnectionKind { ToParentBroker {shv_root: String}, ToChildBroker {shv_root: String, mount_point: String}, } -impl Default for TreeDirection { +impl Default for ConnectionKind { fn default() -> Self { - TreeDirection::ToParentBroker { shv_root: "".to_string() } + ConnectionKind::ToParentBroker { shv_root: "".to_string() } } } #[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct BrokerConnectionConfig { #[serde(default)] pub enabled:bool, - pub client: ClientConfig, #[serde(default)] - pub tree_direction: TreeDirection, + pub connection_kind: ConnectionKind, + pub client: ClientConfig, } type DeviceId = String; #[derive(Serialize, Deserialize, Debug, Default, Clone)] diff --git a/src/peer.rs b/src/peer.rs index fab0d62..9771f73 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -21,7 +21,7 @@ use crate::brokerimpl::{BrokerCommand, BrokerToPeerMessage, PeerKind}; use shvrpc::framerw::{FrameReader, FrameWriter}; use shvrpc::rpc::{ShvRI, SubscriptionParam}; use shvrpc::streamrw::{StreamFrameReader, StreamFrameWriter}; -use crate::config::{BrokerConnectionConfig, TreeDirection}; +use crate::config::{BrokerConnectionConfig, ConnectionKind}; static G_PEER_COUNT: AtomicI64 = AtomicI64::new(0); pub(crate) fn next_peer_id() -> i64 { @@ -81,7 +81,7 @@ pub(crate) async fn server_peer_loop(peer_id: PeerId, broker_writer: Sender { info!("Parent broker peer loop finished without error"); } @@ -245,18 +252,20 @@ fn is_dot_local_request(frame: &RpcFrame) -> bool { false } async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { - match &config.tree_direction { - TreeDirection::ToParentBroker{ shv_root } => { + match &config.connection_kind { + ConnectionKind::ToParentBroker{ shv_root } => { // Only RPC requests can be received from parent broker, // no signals, no responses if frame.is_request() { frame = fix_request_frame_shv_root(frame, shv_root)?; broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; + } else if frame.is_response() { + broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; } else { - warn!("RPC signal or response should not be received from client connection to parent broker: {}", &frame); + warn!("RPC signal should not be received from client connection to parent broker: {}", &frame); } } - TreeDirection::ToChildBroker{ shv_root, .. } => { + ConnectionKind::ToChildBroker{ shv_root, .. } => { // Only RPC signals and responses can be received from parent broker, // no requests let mut frame = frame; @@ -272,7 +281,7 @@ async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: }; Ok(()) } -async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { +async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { let url = Url::parse(&config.client.url)?; let (scheme, host, port) = (url.scheme(), url.host_str().unwrap_or_default(), url.port().unwrap_or(3755)); if scheme != "tcp" { @@ -308,10 +317,7 @@ async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broke let (broker_to_peer_sender, broker_to_peer_receiver) = channel::unbounded::(); broker_writer.send(BrokerCommand::NewPeer { peer_id, - peer_kind: PeerKind::ParentBroker, - user: "".into(), - mount_point: None, - device_id: None, + peer_kind: PeerKind::Broker(config.connection_kind.clone()), sender: broker_to_peer_sender, }).await?; @@ -358,15 +364,15 @@ async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broke BrokerToPeerMessage::SendFrame(frame) => { // log!(target: "RpcMsg", Level::Debug, "<---- Send frame, client id: {}", client_id); let mut frame = frame; - match &config.tree_direction { - TreeDirection::ToParentBroker{shv_root} => { + match &config.connection_kind { + ConnectionKind::ToParentBroker{shv_root} => { if frame.is_signal() { if let Some(new_path) = cut_prefix(frame.shv_path().unwrap_or_default(), shv_root) { frame.set_shvpath(&new_path); } } } - TreeDirection::ToChildBroker{shv_root, .. } => { + ConnectionKind::ToChildBroker{shv_root, .. } => { if frame.is_request() { frame = fix_request_frame_shv_root(frame, shv_root)?; } diff --git a/src/test.rs b/src/test.rs index 2baf5f6..7c250d4 100644 --- a/src/test.rs +++ b/src/test.rs @@ -71,10 +71,12 @@ async fn test_broker_loop() { //let password = "admin"; broker_sender.send(BrokerCommand::NewPeer { peer_id: client_id, - peer_kind: PeerKind::Client, - user: user.to_string(), - mount_point: Some("test/device".into()), - device_id: None, + peer_kind: PeerKind::Device{ + user: user.to_string(), + device_id: None, + mount_point: "test/device".to_string(), + subscribe_path: None + }, sender: peer_writer.clone() }).await.unwrap(); loop { @@ -404,10 +406,7 @@ async fn test_tunnel_loop() { //let password = "test"; broker_sender.send(BrokerCommand::NewPeer { peer_id: client_id, - peer_kind: PeerKind::Client, - user: user.to_string(), - mount_point: None, - device_id: None, + peer_kind: PeerKind::Client{ user: user.to_string() }, sender: peer_writer.clone() }).await.unwrap(); let tunid = call(".app/tunnel", "create", None, &call_ctx).await; diff --git a/tests/test_broker.rs b/tests/test_broker.rs index 9c04340..ccfb48c 100644 --- a/tests/test_broker.rs +++ b/tests/test_broker.rs @@ -4,7 +4,7 @@ use shvproto::{RpcValue, rpcvalue}; use shvrpc::client::ClientConfig; use shvrpc::metamethod; use shvrpc::metamethod::{Flag, MetaMethod}; -use shvbroker::config::{BrokerConfig, BrokerConnectionConfig, TreeDirection}; +use shvbroker::config::{BrokerConfig, BrokerConnectionConfig, ConnectionKind}; use crate::common::{KillProcessGuard, shv_call, shv_call_many}; use shvbroker::shvnode::{METH_DIR, METH_LS, METH_NAME, METH_PING}; @@ -32,7 +32,7 @@ fn test_broker() -> shvrpc::Result<()> { heartbeat_interval: "1m".to_string(), reconnect_interval: None, }, - tree_direction: TreeDirection::ToParentBroker { + connection_kind: ConnectionKind::ToParentBroker { shv_root: "test".to_string(), }, } @@ -175,40 +175,40 @@ fn test_broker() -> shvrpc::Result<()> { Ok(()) } -fn test_child_broker_as_client() -> shvrpc::Result<()> { - let mut config = BrokerConfig::default(); - config.listen.tcp = Some("localhost:3754".into()); - config.connections = vec![ - BrokerConnectionConfig { - enabled: true, - client: ClientConfig{ - url: "tcp://localhost:3755?user=test&password=test".to_string(), - device_id: None, - mount: None, - heartbeat_interval: "1m".to_string(), - reconnect_interval: None, - }, - tree_direction: TreeDirection::ToChildBroker { - shv_root: "test/child-broker/device".to_string(), - mount_point: "test/child-device".to_string(), - }, - } - ]; - let cfg_fn = "/tmp/test-broker-config2.yaml"; - fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; - let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") - .arg("--config").arg(cfg_fn) - //.arg("-v").arg("Acc") - .spawn()?); - thread::sleep(Duration::from_millis(100)); - assert!(broker_process_guard.is_running()); - - pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result { - shv_call(path, method, param, Some(3754)) - } - //assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice")); - - thread::sleep(Duration::from_millis(1000 * 60 * 5)); - - Ok(()) -} \ No newline at end of file +// fn test_child_broker_as_client() -> shvrpc::Result<()> { +// let mut config = BrokerConfig::default(); +// config.listen.tcp = Some("localhost:3754".into()); +// config.connections = vec![ +// BrokerConnectionConfig { +// enabled: true, +// client: ClientConfig{ +// url: "tcp://localhost:3755?user=test&password=test".to_string(), +// device_id: None, +// mount: None, +// heartbeat_interval: "1m".to_string(), +// reconnect_interval: None, +// }, +// connection_kind: ConnectionKind::ToChildBroker { +// shv_root: "test/child-broker/device".to_string(), +// mount_point: "test/child-device".to_string(), +// }, +// } +// ]; +// let cfg_fn = "/tmp/test-broker-config2.yaml"; +// fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; +// let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") +// .arg("--config").arg(cfg_fn) +// //.arg("-v").arg("Acc") +// .spawn()?); +// thread::sleep(Duration::from_millis(100)); +// assert!(broker_process_guard.is_running()); +// +// pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result { +// shv_call(path, method, param, Some(3754)) +// } +// //assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice")); +// +// thread::sleep(Duration::from_millis(1000 * 60 * 5)); +// +// Ok(()) +// } \ No newline at end of file