Skip to content

Commit

Permalink
Add support for signal source
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Jul 17, 2024
1 parent a942dcf commit cecaaeb
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 58 deletions.
38 changes: 13 additions & 25 deletions src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use async_std::channel::Sender;
use async_std::net::TcpListener;
use crate::config::{AccessControl, BrokerConfig};
use shvrpc::metamethod::AccessLevel;
use glob::{Pattern};
use log::{debug, info, warn};
use shvproto::{MetaMap, RpcValue};
use shvrpc::rpc::{SubscriptionPattern};
Expand Down Expand Up @@ -92,9 +91,9 @@ impl Peer {
subscribe_path: None,
}
}
pub(crate) fn is_signal_subscribed(&self, path: &str, method: &str) -> bool {
pub(crate) fn is_signal_subscribed(&self, path: &str, signal: &str, source: &str) -> bool {
for subs in self.subscriptions.iter() {
if subs.match_shv_method(path, method) {
if subs.match_shv_method(path, signal, source) {
return true;
}
}
Expand Down Expand Up @@ -144,35 +143,24 @@ pub(crate) enum Mount {
}

pub(crate) struct ParsedAccessRule {
pub(crate) path_method: SubscriptionPattern,
pub(crate) path_signal_source: SubscriptionPattern,
// Needed in order to pass 'dot-local' in 'Access' meta-attribute
// to support the dot-local hack on older brokers
pub(crate) grant_str: String,
pub(crate) grant_lvl: AccessLevel,
}

impl ParsedAccessRule {
pub fn new(path: &str, method: &str, grant: &str) -> shvrpc::Result<Self> {
let method = if method.is_empty() { "?*" } else { method };
let path = if path.is_empty() { "**" } else { path };
match Pattern::new(method) {
Ok(method) => {
match Pattern::new(path) {
Ok(path) => {
Ok(Self {
path_method: SubscriptionPattern { paths: path, methods: method },
grant_str: grant.to_string(),
grant_lvl: grant
.split(',')
.find_map(AccessLevel::from_str)
.unwrap_or_else(|| panic!("Invalid access grant `{grant}`")),
})
}
Err(err) => { Err(format!("{}", &err).into()) }
}
}
Err(err) => { Err(format!("{}", &err).into()) }
}
pub fn new(paths: &str, signal: &str, source: &str, grant: &str) -> shvrpc::Result<Self> {
let subpat = SubscriptionPattern::new(paths, signal, source)?;
Ok(Self {
path_signal_source: subpat,
grant_str: grant.to_string(),
grant_lvl: grant
.split(',')
.find_map(AccessLevel::from_str)
.unwrap_or_else(|| panic!("Invalid access grant `{grant}`")),
})
}
}

Expand Down
36 changes: 16 additions & 20 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl BrokerImpl {
for (name, role) in &access.roles {
let mut list: Vec<ParsedAccessRule> = Default::default();
for rule in &role.access {
match ParsedAccessRule::new(&rule.paths, &rule.methods, &rule.grant) {
match ParsedAccessRule::new(&rule.paths, &rule.signal, &rule.source, &rule.grant) {
Ok(rule) => {
list.push(rule);
}
Expand Down Expand Up @@ -140,7 +140,7 @@ impl BrokerImpl {
if let Some(mount_point) = self.client_id_to_mount_point(peer_id) {
let new_path = shvrpc::util::join_path(&mount_point, frame.shv_path().unwrap_or_default());
for (cli_id, peer) in self.peers.iter() {
if &peer_id != cli_id && peer.is_signal_subscribed(&new_path, frame.method().unwrap_or_default()) {
if &peer_id != cli_id && peer.is_signal_subscribed(&new_path, frame.method().unwrap_or_default(), frame.source().unwrap_or_default()) {
let mut frame = frame.clone();
frame.set_shvpath(&new_path);
peer.sender.send(BrokerToPeerMessage::SendFrame(frame)).await?;
Expand Down Expand Up @@ -311,7 +311,7 @@ impl BrokerImpl {
if let Some(peer) = self.peers.get(&device.peer_id) {
if peer.is_broker()? {
if let Ok(Some((_local, remote))) = split_glob_on_match(&new_subscription.paths, mount_point) {
to_subscribe.push((device.peer_id, Subscription::new(remote, &new_subscription.methods)));
to_subscribe.push((device.peer_id, Subscription::new(remote, &new_subscription.signal, &new_subscription.source)));
}
}
}
Expand Down Expand Up @@ -371,22 +371,17 @@ impl BrokerImpl {
let peer = self.peers.get_mut(&client_id).ok_or_else(|| format!("Invalid client ID: {client_id}"))?;
//let subscribe_path = peer.broker_subscribe_path()?;
let mount_point = peer.mount_point.clone().ok_or_else(|| format!("Mount point is missing, client ID: {client_id}"))?;
let mut subscribed = HashMap::new();
for peer in self.peers.values() {
//if id == &client_id { continue }
for subscr in &peer.subscriptions {
subscribed.insert(subscr.to_string(), (subscr.paths.as_str(), subscr.methods.as_str()));
}
}
let mut to_subscribe = HashSet::new();
for (_sig, (paths, methods)) in subscribed {
if let Ok(Some((_local, remote))) = split_glob_on_match(paths, &mount_point) {
to_subscribe.insert(Subscription::new(remote, methods).to_string());
} else {
error!("Invalid pattern '{paths}'.")
for peer in self.peers.values() {
for subpat in &peer.subscriptions {
if let Ok(Some((_local, remote))) = split_glob_on_match(&subpat.paths.as_str(), &mount_point) {
to_subscribe.insert(Subscription::new(remote, subpat.signal.as_str(), subpat.source.as_str()));
} else {
error!("Invalid pattern '{}'.", subpat.paths.as_str())
}
}
}
let to_subscribe = to_subscribe.iter().map(|s| Subscription::from_str_unchecked(s)).collect();
let to_subscribe = to_subscribe.into_iter().collect();
self.call_subscribe_async(client_id, to_subscribe)?;
Ok(())
}
Expand Down Expand Up @@ -447,8 +442,9 @@ impl BrokerImpl {
fn grant_for_request(&self, client_id: CliId, frame: &RpcFrame) -> Result<(Option<i32>, Option<String>), RpcError> {
log!(target: "Access", Level::Debug, "======================= grant_for_request {}", &frame);
let shv_path = frame.shv_path().unwrap_or_default();
let method = frame.method().unwrap_or("");
if method.is_empty() {
let signal = frame.method().unwrap_or_default();
let source = frame.source().unwrap_or_default();
if signal.is_empty() {
Err(RpcError::new(RpcErrorCode::PermissionDenied, "Method is empty"))
} else {
let peer = self.peers.get(&client_id).ok_or_else(|| RpcError::new(RpcErrorCode::InternalError, "Peer not found"))?;
Expand All @@ -460,8 +456,8 @@ impl BrokerImpl {
if let Some(rules) = self.role_access.get(&role_name) {
log!(target: "Access", Level::Debug, "----------- access for role: {}", role_name);
for rule in rules {
log!(target: "Access", Level::Debug, "\trule: {}", rule.path_method);
if rule.path_method.match_shv_method(shv_path, method) {
log!(target: "Access", Level::Debug, "\trule: {}", rule.path_signal_source);
if rule.path_signal_source.match_shv_method(shv_path, signal, source) {
log!(target: "Access", Level::Debug, "\t\t HIT");
return Ok((Some(rule.grant_lvl as i32), Some(rule.grant_str.clone())));
}
Expand Down
16 changes: 9 additions & 7 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ pub struct Role {
pub struct AccessRule {
pub paths: String,
#[serde(default)]
pub methods: String,
pub signal: String,
#[serde(default)]
pub source: String,
pub grant: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
Expand Down Expand Up @@ -124,7 +126,7 @@ impl Default for BrokerConfig {
("su".to_string(), Role {
roles: vec![],
access: vec![
AccessRule { paths: "**".to_string(), methods: "".to_string(), grant: "su,dot-local".to_string() },
AccessRule { paths: "**".to_string(), signal: "".to_string(), source: "".to_string(), grant: "su,dot-local".to_string() },
],
}),
("client".to_string(), Role { roles: vec!["ping".to_string(), "subscribe".to_string(), "browse".to_string()], access: vec![] }),
Expand All @@ -140,26 +142,26 @@ impl Default for BrokerConfig {
("tester".to_string(), Role {
roles: vec!["client".to_string()],
access: vec![
AccessRule { paths: "test/**".to_string(), methods: "".to_string(), grant: "cfg".to_string() },
AccessRule { paths: "test/**".to_string(), signal: "".to_string(), source: "".to_string(), grant: "cfg".to_string() },
],
}),
("ping".to_string(), Role {
roles: vec![],
access: vec![
AccessRule { paths: ".app".to_string(), methods: "ping".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".app".to_string(), signal: "ping".to_string(), source: "".to_string(), grant: "wr".to_string() },
],
}),
("subscribe".to_string(), Role {
roles: vec![],
access: vec![
AccessRule { paths: ".app/broker/currentClient".to_string(), methods: "subscribe".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".app/broker/currentClient".to_string(), methods: "unsubscribe".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".app/broker/currentClient".to_string(), signal: "subscribe".to_string(), source: "".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".app/broker/currentClient".to_string(), signal: "unsubscribe".to_string(), source: "".to_string(), grant: "wr".to_string() },
],
}),
("browse".to_string(), Role {
roles: vec![],
access: vec![
AccessRule { paths: "**".to_string(), methods: "".to_string(), grant: "bws".to_string() },
AccessRule { paths: "**".to_string(), signal: "".to_string(), source: "".to_string(), grant: "bws".to_string() },
],
}),
]),
Expand Down
12 changes: 6 additions & 6 deletions tests/child-broker-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,22 @@ access:
- client
access:
- paths: test/**
methods: ''
signal: ''
grant: cfg
browse:
roles: []
access:
- paths: '**'
methods: ''
signal: ''
grant: bws
subscribe:
roles: []
access:
- paths: .app/broker/currentClient
methods: subscribe
signal: subscribe
grant: wr
- paths: .app/broker/currentClient
methods: unsubscribe
signal: unsubscribe
grant: wr
device:
roles:
Expand All @@ -71,13 +71,13 @@ access:
roles: []
access:
- paths: .app
methods: ping
signal: ping
grant: wr
su:
roles: []
access:
- paths: '**'
methods: ''
signal: ''
grant: su
child-broker:
roles:
Expand Down

0 comments on commit cecaaeb

Please sign in to comment.