Skip to content

Commit

Permalink
Peer structure simplified
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 20, 2024
1 parent 0ecd755 commit cc2b5b0
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 140 deletions.
148 changes: 79 additions & 69 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use async_std::channel::{Receiver, Sender, unbounded};
use async_std::{future};
use async_std::net::TcpListener;
use log::{debug, error, info, log, warn};
use crate::config::{AccessConfig, BrokerConfig, Password};
use crate::config::{AccessConfig, BrokerConfig, ConnectionKind, Password};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId, Tag};
use shvproto::{Map, MetaMap, RpcValue, rpcvalue};
Expand Down Expand Up @@ -62,9 +62,6 @@ pub(crate) enum BrokerCommand {
NewPeer {
peer_id: PeerId,
peer_kind: PeerKind,
user: String,
mount_point: Option<String>,
device_id: Option<String>,
sender: Sender<BrokerToPeerMessage>,
},
FrameReceived {
Expand Down Expand Up @@ -102,10 +99,12 @@ pub(crate) enum SubscribePath {
}
#[derive(Debug, Clone)]
pub(crate) enum PeerKind {
Client,
ParentBroker,
//ChildBroker,
Client {
user: String,
},
Broker(ConnectionKind),
Device {
user: String,
device_id: Option<String>,
mount_point: String,
subscribe_path: Option<SubscribePath>,
Expand All @@ -114,7 +113,6 @@ pub(crate) enum PeerKind {
#[derive(Debug)]
pub(crate) struct Peer {
pub(crate) peer_kind: PeerKind,
pub(crate) user: String,
pub(crate) sender: Sender<BrokerToPeerMessage>,
pub(crate) subscriptions: Vec<Subscription>,
pub(crate) forwarded_subscriptions: Vec<ForwardedSubscription>,
Expand All @@ -130,7 +128,14 @@ impl Peer {
}
false
}
}
pub(crate) fn user(&self) -> Option<&str> {
match &self.peer_kind {
PeerKind::Client { user, .. } => { Some(user) }
PeerKind::Broker(_) => None,
PeerKind::Device { user, .. } => { Some(user) }
}
}
}

pub(crate) enum Mount {
Peer(PeerId),
Expand Down Expand Up @@ -253,37 +258,51 @@ impl BrokerState {
return Err(RpcError::new(RpcErrorCode::InvalidRequest, e))
}
};
match peer.peer_kind {
PeerKind::ParentBroker => {
log!(target: "Access", Level::Debug, "ParentBroker: {}", client_id);
let access = frame.tag(Tag::Access as i32);
let access_level = frame.tag(Tag::AccessLevel as i32);
if access_level.is_some() || access.is_some() {
log!(target: "Access", Level::Debug, "\tGranted access: {:?}, access level: {:?}", access, access_level);
Ok((access_level.map(RpcValue::as_i32), access.map(RpcValue::as_str).map(|s| s.to_string())))
} else {
log!(target: "Access", Level::Debug, "\tPermissionDenied");
Err(RpcError::new(RpcErrorCode::PermissionDenied, ""))
if let Some(user) = peer.user() {
log!(target: "Access", Level::Debug, "Peer: {}", client_id);
if let Some(flatten_roles) = self.flatten_roles(user) {
log!(target: "Access", Level::Debug, "user: {}, flatten roles: {:?}", user, flatten_roles);
for role_name in flatten_roles {
if let Some(rules) = self.role_access.get(&role_name) {
log!(target: "Access", Level::Debug, "----------- access for role: {}", role_name);
for rule in rules {
log!(target: "Access", Level::Debug, "\trule: {}", rule.glob.as_str());
if rule.glob.match_shv_ri(&ri) {
log!(target: "Access", Level::Debug, "\t\t HIT");
return Ok((Some(rule.access_level as i32), Some(rule.access.clone())));
}
}
}
}
}
_ => {
log!(target: "Access", Level::Debug, "Peer: {}", client_id);
if let Some(flatten_roles) = self.flatten_roles(&peer.user) {
log!(target: "Access", Level::Debug, "user: {}, flatten roles: {:?}", &peer.user, flatten_roles);
for role_name in flatten_roles {
if let Some(rules) = self.role_access.get(&role_name) {
log!(target: "Access", Level::Debug, "----------- access for role: {}", role_name);
for rule in rules {
log!(target: "Access", Level::Debug, "\trule: {}", rule.glob.as_str());
if rule.glob.match_shv_ri(&ri) {
log!(target: "Access", Level::Debug, "\t\t HIT");
return Ok((Some(rule.access_level as i32), Some(rule.access.clone())));
}
Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", user)))
} else {
match &peer.peer_kind {
PeerKind::Broker(connection_kind) => {
match connection_kind {
ConnectionKind::ToParentBroker { .. } => {
log!(target: "Access", Level::Debug, "ParentBroker: {}", client_id);
let access = frame.tag(Tag::Access as i32);
let access_level = frame.tag(Tag::AccessLevel as i32);
if access_level.is_some() || access.is_some() {
log!(target: "Access", Level::Debug, "\tGranted access: {:?}, access level: {:?}", access, access_level);
Ok((access_level.map(RpcValue::as_i32), access.map(RpcValue::as_str).map(|s| s.to_string())))
} else {
log!(target: "Access", Level::Debug, "\tPermissionDenied");
Err(RpcError::new(RpcErrorCode::PermissionDenied, ""))
}
}
ConnectionKind::ToChildBroker { .. } => {
// requests from child broker should not be allowed
log!(target: "Access", Level::Debug, "\tPermissionDenied");
Err(RpcError::new(RpcErrorCode::PermissionDenied, ""))
}
}
}
Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", peer.user)))
_ => {
log!(target: "Access", Level::Debug, "\tWeird peer kind");
Err(RpcError::new(RpcErrorCode::PermissionDenied, ""))
}
}
}
}
Expand Down Expand Up @@ -333,50 +352,44 @@ impl BrokerState {
Err("Not a device".into())
}
}
fn add_peer(&mut self, peer_id: PeerId, user: String, peer_kind: PeerKind, mount_point: Option<String>, device_id: Option<String>, sender: Sender<BrokerToPeerMessage>) -> shvrpc::Result<()> {
fn add_peer(&mut self, peer_id: PeerId, peer_kind: PeerKind, sender: Sender<BrokerToPeerMessage>) -> shvrpc::Result<()> {
if self.peers.contains_key(&peer_id) {
// this might happen when connection to parent broker is restored
// after parent broker reset
// note that parent broker connection has always ID == 1
panic!("Peer ID: {peer_id} exists already!");
}
let client_path = join_path(DIR_BROKER, &format!("client/{}", peer_id));
let effective_mount_point = 'mount_point: {
if let Some(ref mount_point) = mount_point {
let mut peer_kind = peer_kind;
let effective_mount_point = match &mut peer_kind {
PeerKind::Client { .. } => { None }
PeerKind::Broker(connection_kind) => {
match connection_kind {
ConnectionKind::ToParentBroker { .. } => { None }
ConnectionKind::ToChildBroker { mount_point, .. } => { Some(mount_point.to_string()) }
}
}
PeerKind::Device { device_id, mount_point, .. } => loop {
if mount_point.starts_with("test/") {
info!("Client id: {} mounted on path: '{}'", peer_id, &mount_point);
break 'mount_point Some(mount_point.clone());
break Some(mount_point.clone());
}
}
if let Some(device_id) = &device_id {
match self.access.mounts.get(device_id) {
None => {
warn!("Cannot find mount-point for device ID: {device_id}");
None
}
Some(mount) => {
let mount_point = mount.mount_point.clone();
info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point);
Some(mount_point)
if let Some(device_id) = &device_id {
match self.access.mounts.get(device_id) {
None => {
return Err(format!("Cannot find mount-point for device ID: {device_id}").into());
}
Some(mount) => {
*mount_point = mount.mount_point.clone();
info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point);
break Some(mount_point.clone());
}
}
}
} else {
None
}
};
let effective_peer_kind = match peer_kind {
PeerKind::ParentBroker => { PeerKind::ParentBroker }
_ => {
if let Some(mount_point) = &effective_mount_point {
PeerKind::Device{ device_id, mount_point: mount_point.clone(), subscribe_path: None }
} else {
PeerKind::Client
}
break None
}
};
let peer = Peer {
peer_kind: effective_peer_kind,
user,
peer_kind,
sender,
subscriptions: vec![],
forwarded_subscriptions: vec![],
Expand Down Expand Up @@ -412,7 +425,7 @@ impl BrokerState {
};
rpcvalue::Map::from([
("clientId".to_string(), client_id.into()),
("userName".to_string(), RpcValue::from(&peer.user)),
("userName".to_string(), RpcValue::from(peer.user().unwrap_or_default())),
("deviceId".to_string(), RpcValue::from(device_id)),
("mountPoint".to_string(), RpcValue::from(mount_point)),
("subscriptions".to_string(), subs.into()),
Expand Down Expand Up @@ -974,13 +987,10 @@ impl BrokerImpl {
}
BrokerCommand::NewPeer {
peer_id,
user,
peer_kind,
mount_point,
device_id,
sender} => {
info!("New peer, id: {peer_id}.");
state_writer(&self.state).add_peer(peer_id, user, peer_kind, mount_point, device_id, sender)?;
state_writer(&self.state).add_peer(peer_id, peer_kind, sender)?;
let mount_point = state_reader(&self.state).mount_point(peer_id);
if let Some(mount_point) = mount_point {
let (shv_path, dir) = split_mount_point(&mount_point)?;
Expand Down
10 changes: 5 additions & 5 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ pub struct BrokerConfig {
pub access: AccessConfig,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum TreeDirection {
pub enum ConnectionKind {
ToParentBroker {shv_root: String},
ToChildBroker {shv_root: String, mount_point: String},
}
impl Default for TreeDirection {
impl Default for ConnectionKind {
fn default() -> Self {
TreeDirection::ToParentBroker { shv_root: "".to_string() }
ConnectionKind::ToParentBroker { shv_root: "".to_string() }
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct BrokerConnectionConfig {
#[serde(default)]
pub enabled:bool,
pub client: ClientConfig,
#[serde(default)]
pub tree_direction: TreeDirection,
pub connection_kind: ConnectionKind,
pub client: ClientConfig,
}
type DeviceId = String;
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
Expand Down
44 changes: 25 additions & 19 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::brokerimpl::{BrokerCommand, BrokerToPeerMessage, PeerKind};
use shvrpc::framerw::{FrameReader, FrameWriter};
use shvrpc::rpc::{ShvRI, SubscriptionParam};
use shvrpc::streamrw::{StreamFrameReader, StreamFrameWriter};
use crate::config::{BrokerConnectionConfig, TreeDirection};
use crate::config::{BrokerConnectionConfig, ConnectionKind};

static G_PEER_COUNT: AtomicI64 = AtomicI64::new(0);
pub(crate) fn next_peer_id() -> i64 {
Expand Down Expand Up @@ -81,7 +81,7 @@ pub(crate) async fn server_peer_loop(peer_id: PeerId, broker_writer: Sender<Brok
debug!("Client ID: {peer_id}, login received.");
let params = rpcmsg.param().ok_or("No login params")?.as_map();
let login = params.get("login").ok_or("Invalid login params")?.as_map();
user = login.get("user").ok_or("User login param is missing")?.clone();
user = login.get("user").ok_or("User login param is missing")?.as_str().to_string();
let password = login.get("password").ok_or("Password login param is missing")?.as_str();
let login_type = login.get("type").map(|v| v.as_str()).unwrap_or("");

Expand Down Expand Up @@ -133,13 +133,20 @@ pub(crate) async fn server_peer_loop(peer_id: PeerId, broker_writer: Sender<Brok
let device_id = device_options.as_map().get("deviceId").map(|v| v.as_str().to_string());
let mount_point = device_options.as_map().get("mountPoint").map(|v| v.as_str().to_string());
debug!("Client ID: {peer_id} login success.");
let peer_kind = if device_id.is_some() || mount_point.is_some() {
PeerKind::Device {
user,
device_id,
mount_point: mount_point.unwrap_or_default(),
subscribe_path: None,
}
} else {
PeerKind::Client { user }
};
broker_writer.send(
BrokerCommand::NewPeer {
peer_id,
peer_kind: PeerKind::Client,
user: user.as_str().to_string(),
mount_point,
device_id,
peer_kind,
sender: peer_writer
}).await?;

Expand Down Expand Up @@ -202,7 +209,7 @@ pub(crate) async fn client_peer_loop_with_reconnect(peer_id: PeerId, config: Bro
};
info!("Reconnect interval set to: {:?}", reconnect_interval);
loop {
match client_peer_loop(peer_id, config.clone(), broker_writer.clone()).await {
match broker_client_connection_loop(peer_id, config.clone(), broker_writer.clone()).await {
Ok(_) => {
info!("Parent broker peer loop finished without error");
}
Expand Down Expand Up @@ -245,18 +252,20 @@ fn is_dot_local_request(frame: &RpcFrame) -> bool {
false
}
async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
match &config.tree_direction {
TreeDirection::ToParentBroker{ shv_root } => {
match &config.connection_kind {
ConnectionKind::ToParentBroker{ shv_root } => {
// Only RPC requests can be received from parent broker,
// no signals, no responses
if frame.is_request() {
frame = fix_request_frame_shv_root(frame, shv_root)?;
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else if frame.is_response() {
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else {
warn!("RPC signal or response should not be received from client connection to parent broker: {}", &frame);
warn!("RPC signal should not be received from client connection to parent broker: {}", &frame);
}
}
TreeDirection::ToChildBroker{ shv_root, .. } => {
ConnectionKind::ToChildBroker{ shv_root, .. } => {
// Only RPC signals and responses can be received from parent broker,
// no requests
let mut frame = frame;
Expand All @@ -272,7 +281,7 @@ async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config:
};
Ok(())
}
async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
let url = Url::parse(&config.client.url)?;
let (scheme, host, port) = (url.scheme(), url.host_str().unwrap_or_default(), url.port().unwrap_or(3755));
if scheme != "tcp" {
Expand Down Expand Up @@ -308,10 +317,7 @@ async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broke
let (broker_to_peer_sender, broker_to_peer_receiver) = channel::unbounded::<BrokerToPeerMessage>();
broker_writer.send(BrokerCommand::NewPeer {
peer_id,
peer_kind: PeerKind::ParentBroker,
user: "".into(),
mount_point: None,
device_id: None,
peer_kind: PeerKind::Broker(config.connection_kind.clone()),
sender: broker_to_peer_sender,
}).await?;

Expand Down Expand Up @@ -358,15 +364,15 @@ async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broke
BrokerToPeerMessage::SendFrame(frame) => {
// log!(target: "RpcMsg", Level::Debug, "<---- Send frame, client id: {}", client_id);
let mut frame = frame;
match &config.tree_direction {
TreeDirection::ToParentBroker{shv_root} => {
match &config.connection_kind {
ConnectionKind::ToParentBroker{shv_root} => {
if frame.is_signal() {
if let Some(new_path) = cut_prefix(frame.shv_path().unwrap_or_default(), shv_root) {
frame.set_shvpath(&new_path);
}
}
}
TreeDirection::ToChildBroker{shv_root, .. } => {
ConnectionKind::ToChildBroker{shv_root, .. } => {
if frame.is_request() {
frame = fix_request_frame_shv_root(frame, shv_root)?;
}
Expand Down
Loading

0 comments on commit cc2b5b0

Please sign in to comment.