Skip to content

Commit

Permalink
Mounts added to broker config.
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Jan 1, 2024
1 parent 5ad4957 commit e0b1c45
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 19 deletions.
65 changes: 49 additions & 16 deletions src/bin/shvbroker/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ pub(crate) enum ClientEvent {
NewPeer {
client_id: CliId,
sender: Sender<PeerEvent>,
peer_kind: PeerKind,
},
RegisterDevice {
client_id: CliId,
Expand All @@ -49,11 +50,19 @@ pub(crate) enum PeerEvent {
DisconnectByBroker,
}
#[derive(Debug)]
pub enum PeerKind {
Client,
ParentBroker,
#[allow(dead_code)]
ChildBroker,
}
#[derive(Debug)]
struct Peer {
sender: Sender<PeerEvent>,
user: String,
mount_point: Option<String>,
subscriptions: Vec<Subscription>,
peer_kind: PeerKind,
}
impl Peer {
fn is_signal_subscribed(&self, path: &str, method: &str) -> bool {
Expand Down Expand Up @@ -161,9 +170,17 @@ impl Broker {
}
}
if let Some(device_id) = device_id {
let mount_point = "test/".to_owned() + &device_id;
info!("Client id: {}, device id: {} mounted on path: '{}'", client_id, device_id, &mount_point);
break Some(mount_point);
break match self.access.mounts.get(&device_id) {
None => {
warn!("Cannot find mount-point for device ID: {device_id}");
None
}
Some(mount) => {
let mount_point = mount.path.clone();
info!("Client id: {}, device id: {} mounted on path: '{}'", client_id, device_id, &mount_point);
Some(mount_point)
}
};
}
break None;
} {
Expand Down Expand Up @@ -216,20 +233,34 @@ impl Broker {
None
} else {
if let Some(peer) = self.peers.get(&client_id) {
if let Some(flatten_roles) = self.flatten_roles(&peer.user) {
log!(target: "Acc", Level::Debug, "user: {}, flatten roles: {:?}", &peer.user, flatten_roles);
for role_name in flatten_roles {
if let Some(rules) = self.role_access.get(role_name) {
log!(target: "Acc", Level::Debug, "----------- access for role: {}", role_name);
for rule in rules {
log!(target: "Acc", Level::Debug, "\trule: {}", rule.path_method);
if rule.path_method.match_shv_method(shv_path, method) {
log!(target: "Acc", Level::Debug, "\t\t HIT");
return Some(rule.grant.clone());
match peer.peer_kind {
PeerKind::Client => {
if let Some(flatten_roles) = self.flatten_roles(&peer.user) {
log!(target: "Acc", Level::Debug, "user: {}, flatten roles: {:?}", &peer.user, flatten_roles);
for role_name in flatten_roles {
if let Some(rules) = self.role_access.get(role_name) {
log!(target: "Acc", Level::Debug, "----------- access for role: {}", role_name);
for rule in rules {
log!(target: "Acc", Level::Debug, "\trule: {}", rule.path_method);
if rule.path_method.match_shv_method(shv_path, method) {
log!(target: "Acc", Level::Debug, "\t\t HIT");
return Some(rule.grant.clone());
}
}
}
}
}
}
PeerKind::ParentBroker => {
match frame.access() {
None => { return None }
Some(access) => { return Some(access.to_owned()) }
};
}
PeerKind::ChildBroker => {
warn!("Child broker request are not supported yet.");
return None;
}
}
}
None
Expand Down Expand Up @@ -316,7 +347,9 @@ pub(crate) async fn broker_loop(events: Receiver<ClientEvent>, access: AccessCon
type Command = RequestCommand<BrokerCommand>;
let shv_path = frame.shv_path().unwrap_or_default().to_string();
let response_meta= RpcFrame::prepare_response_meta(&frame.meta);
let command = match broker.grant_for_request(client_id, &frame) {

let grant_for_request = broker.grant_for_request(client_id, &frame);
let command = match grant_for_request {
None => {
Command::Error(RpcError::new(RpcErrorCode::PermissionDenied, &format!("Cannot resolve call method grant for current user.")))
}
Expand Down Expand Up @@ -453,14 +486,15 @@ pub(crate) async fn broker_loop(events: Receiver<ClientEvent>, access: AccessCon
}
}
}
ClientEvent::NewPeer { client_id, sender } => match broker.peers.entry(client_id) {
ClientEvent::NewPeer { client_id, sender, peer_kind } => match broker.peers.entry(client_id) {
Entry::Occupied(..) => (),
Entry::Vacant(entry) => {
entry.insert(Peer {
sender,
user: "".to_string(),
mount_point: None,
subscriptions: vec![],
peer_kind,
});
}
},
Expand All @@ -487,5 +521,4 @@ pub(crate) async fn broker_loop(events: Receiver<ClientEvent>, access: AccessCon
}
}
}
//drop(peers);
}
10 changes: 10 additions & 0 deletions src/bin/shvbroker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct ParentBrokerConfig {
pub struct AccessControl {
pub users: HashMap<String, User>,
pub roles: HashMap<String, Role>,
pub mounts: HashMap<String, Mount>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Listen {
Expand Down Expand Up @@ -60,6 +61,12 @@ pub struct AccessRule {
pub methods: String,
pub grant: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Mount {
pub path: String,
#[serde(default)]
pub description: String,
}
impl AccessControl {
pub fn from_file(file_name: &str) -> shv::Result<Self> {
let content = fs::read_to_string(file_name)?;
Expand Down Expand Up @@ -144,6 +151,9 @@ impl Default for BrokerConfig {
],
}),
]),
mounts: HashMap::from([
("test-device".into(), Mount{ path: "shv/dev/test".to_string(), description: "Testing device mount-point".to_string() })
]),
},
}
}
Expand Down
6 changes: 3 additions & 3 deletions src/bin/shvbroker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use shv::client::LoginParams;
use shv::rpcframe::RpcFrame;
use shv::shvnode::METH_PING;
use shv::util::{join_path, login_from_url, sha1_hash};
use crate::broker::{ClientEvent, LoginResult, PeerEvent};
use crate::broker::{ClientEvent, LoginResult, PeerEvent, PeerKind};
use crate::config::ParentBrokerConfig;
use crate::Sender;

pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>, stream: TcpStream) -> crate::Result<()> {
let (socket_reader, mut writer) = (&stream, &stream);
let (peer_writer, peer_receiver) = channel::unbounded::<PeerEvent>();

broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer }).await.unwrap();
broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer, peer_kind: PeerKind::Client }).await.unwrap();

//let stream_wr = stream.clone();
let mut brd = BufReader::new(socket_reader);
Expand Down Expand Up @@ -199,7 +199,7 @@ pub async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig,
client::login(&mut frame_reader, &mut frame_writer, &login_params).await?;

let (peer_writer, peer_receiver) = channel::unbounded::<PeerEvent>();
broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer }).await.unwrap();
broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer, peer_kind: PeerKind::ParentBroker }).await.unwrap();

loop {
let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>());
Expand Down

0 comments on commit e0b1c45

Please sign in to comment.