Skip to content

Commit

Permalink
Move .app/broker to .broker according to spec SHV RPC 3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Jul 19, 2024
1 parent 8ccc125 commit 828788a
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 50 deletions.
48 changes: 26 additions & 22 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use shvrpc::rpcmessage::{CliId, RpcError, RpcErrorCode, Tag};
use shvproto::{List, RpcValue, rpcvalue};
use shvrpc::rpc::{Subscription, SubscriptionPattern};
use crate::shvnode::{DIR_APP, AppNode, find_longest_prefix, METH_DIR, METH_LS, process_local_dir_ls, ShvNode};
use shvrpc::util::{sha1_hash, split_glob_on_match};
use shvrpc::util::{join_path, sha1_hash, split_glob_on_match};
use log::Level;
use crate::node::{DIR_APP_BROKER_CURRENTCLIENT, DIR_APP_BROKER, AppBrokerCurrentClientNode, AppBrokerNode, METH_SUBSCRIBE};
use crate::node::{DIR_BROKER_CURRENTCLIENT, DIR_BROKER, BrokerCurrentClientNode, BrokerNode, METH_SUBSCRIBE};
use shvrpc::metamethod::{MetaMethod, AccessLevel};
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use crate::{node, shvnode};
Expand All @@ -26,8 +26,8 @@ pub struct BrokerImpl {
pub(crate) command_receiver: Receiver<BrokerCommand>,

node_app: AppNode,
node_app_broker: AppBrokerNode,
node_app_broker_currentclient: AppBrokerCurrentClientNode,
node_app_broker: BrokerNode,
node_app_broker_currentclient: BrokerCurrentClientNode,

}

Expand Down Expand Up @@ -64,12 +64,12 @@ impl BrokerImpl {
shv_version_major: 3,
shv_version_minor: 0,
},
node_app_broker: AppBrokerNode {},
node_app_broker_currentclient: AppBrokerCurrentClientNode {},
node_app_broker: BrokerNode {},
node_app_broker_currentclient: BrokerCurrentClientNode {},
};
broker.mounts.insert(DIR_APP.into(), Mount::Node(broker.node_app.new_shvnode()));
broker.mounts.insert(DIR_APP_BROKER.into(), Mount::Node(broker.node_app_broker.new_shvnode()));
broker.mounts.insert(DIR_APP_BROKER_CURRENTCLIENT.into(), Mount::Node(broker.node_app_broker_currentclient.new_shvnode()));
broker.mounts.insert(DIR_BROKER.into(), Mount::Node(broker.node_app_broker.new_shvnode()));
broker.mounts.insert(DIR_BROKER_CURRENTCLIENT.into(), Mount::Node(broker.node_app_broker_currentclient.new_shvnode()));
broker
}

Expand Down Expand Up @@ -203,7 +203,7 @@ impl BrokerImpl {
info!("Unmounting path: '{}'", path);
self.mounts.remove(&path);
}
let client_path = format!(".app/broker/client/{}", peer_id);
let client_path = join_path(DIR_BROKER, &format!("client/{}", peer_id));
self.mounts.remove(&client_path);
self.pending_rpc_calls.retain(|c| c.client_id != peer_id);
}
Expand Down Expand Up @@ -261,7 +261,7 @@ impl BrokerImpl {
}
}
pub(crate) async fn mount_device(&mut self, client_id: i32, device_id: Option<String>, mount_point: Option<String>, subscribe_path: Option<SubscribePath>) -> shvrpc::Result<()> {
let client_path = format!(".app/broker/client/{}", client_id);
let client_path = join_path(DIR_BROKER, &format!("client/{}", client_id));
self.mounts.insert(client_path, Mount::Peer(Device { peer_id: client_id }));
let mount_point = 'mount_point: {
if let Some(ref mount_point) = mount_point {
Expand All @@ -270,8 +270,8 @@ impl BrokerImpl {
break 'mount_point Some(mount_point.clone());
}
}
if let Some(device_id) = device_id {
match self.access.mounts.get(&device_id) {
if let Some(device_id) = &device_id {
match self.access.mounts.get(device_id) {
None => {
warn!("Cannot find mount-point for device ID: {device_id}");
None
Expand Down Expand Up @@ -351,11 +351,11 @@ impl BrokerImpl {
Ok(false)
}
let found_cmd = BrokerCommand::PropagateSubscriptions { client_id };
if let Ok(true) = check_path(client_id, "/.app/broker/currentClient", &broker_command_sender).await {
if let Ok(true) = check_path(client_id, "/.broker/currentClient", &broker_command_sender).await {
let _ = broker_command_sender.send(found_cmd).await;
} else if let Ok(true) = check_path(client_id, ".app/broker/currentClient", &broker_command_sender).await{
let _ = broker_command_sender.send(found_cmd).await;
} else if let Ok(true) = check_path(client_id,".broker/app", &broker_command_sender).await {
} else if let Ok(true) = check_path(client_id,"/.app/broker/currentClient", &broker_command_sender).await {
let _ = broker_command_sender.send(found_cmd).await;
} else {
let cmd = BrokerCommand::SetSubscribeMethodPath {
Expand Down Expand Up @@ -445,11 +445,12 @@ impl BrokerImpl {
let signal = frame.method().unwrap_or_default();
let source = frame.source().unwrap_or_default();
if signal.is_empty() {
Err(RpcError::new(RpcErrorCode::PermissionDenied, "Method is empty"))
return Err(RpcError::new(RpcErrorCode::PermissionDenied, "Method is empty"))
} else {
let peer = self.peers.get(&client_id).ok_or_else(|| RpcError::new(RpcErrorCode::InternalError, "Peer not found"))?;
match peer.peer_kind {
PeerKind::Client => {
log!(target: "Access", Level::Debug, "Peer: {}", client_id);
if let Some(flatten_roles) = self.flatten_roles(&peer.user) {
log!(target: "Access", Level::Debug, "user: {}, flatten roles: {:?}", &peer.user, flatten_roles);
for role_name in flatten_roles {
Expand All @@ -465,15 +466,18 @@ impl BrokerImpl {
}
}
}
Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", peer.user)))
return Err(RpcError::new(RpcErrorCode::PermissionDenied, format!("Access denied for user: {}", peer.user)))
}
PeerKind::ParentBroker => {
let access = frame.tag(Tag::Access as i32).map(RpcValue::as_str).map(String::from);
let access_level = frame.tag(Tag::AccessLevel as i32).map(RpcValue::as_i32);
log!(target: "Access", Level::Debug, "ParentBroker: {}", client_id);
let access = frame.tag(Tag::Access as i32);
let access_level = frame.tag(Tag::AccessLevel as i32);
if access_level.is_some() || access.is_some() {
Ok((access_level, access))
log!(target: "Access", Level::Debug, "\tGranted access: {:?}, access level: {:?}", access, access_level);
return Ok((access_level.map(RpcValue::as_i32), access.map(RpcValue::as_str).map(|s| s.to_string())))
} else {
Err(RpcError::new(RpcErrorCode::PermissionDenied, ""))
log!(target: "Access", Level::Debug, "\tPermissionDenied");
return Err(RpcError::new(RpcErrorCode::PermissionDenied, ""))
}
}
}
Expand Down Expand Up @@ -572,7 +576,7 @@ impl BrokerImpl {
_ => {}
}
}
DIR_APP_BROKER => {
DIR_BROKER => {
match ctx.method.name {
node::METH_CLIENT_INFO => {
let peer_id = rq.param().unwrap_or_default().as_i32();
Expand Down Expand Up @@ -615,7 +619,7 @@ impl BrokerImpl {
_ => {}
}
}
DIR_APP_BROKER_CURRENTCLIENT => {
DIR_BROKER_CURRENTCLIENT => {
match ctx.method.name {
node::METH_SUBSCRIBE => {
let subscription = Subscription::from_rpcvalue(rq.param().unwrap_or_default());
Expand Down
8 changes: 4 additions & 4 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ impl Default for BrokerConfig {
//("parent-broker".to_string(), Role { roles: vec![],
// access: vec![
// AccessRule { paths: ".app/**".to_string(), methods: "".to_string(), grant: "rd".to_string() },
// AccessRule { paths: ".app/broker/currentClient".to_string(), methods: "*subscribe".to_string(), grant: "wr".to_string() },
// AccessRule { paths: ".app/broker/currentClient".to_string(), methods: "subscriptions".to_string(), grant: "rd".to_string() },
// AccessRule { paths: ".broker/currentClient".to_string(), methods: "*subscribe".to_string(), grant: "wr".to_string() },
// AccessRule { paths: ".broker/currentClient".to_string(), methods: "subscriptions".to_string(), grant: "rd".to_string() },
// ]
//}),
("child-broker".to_string(), Role { roles: vec!["device".to_string()], access: vec![] }),
Expand All @@ -127,8 +127,8 @@ impl Default for BrokerConfig {
("subscribe".to_string(), Role {
roles: vec![],
access: vec![
AccessRule { paths: ".app/broker/currentClient".to_string(), signal: "subscribe".to_string(), source: "".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".app/broker/currentClient".to_string(), signal: "unsubscribe".to_string(), source: "".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".broker/currentClient".to_string(), signal: "subscribe".to_string(), source: "".to_string(), grant: "wr".to_string() },
AccessRule { paths: ".broker/currentClient".to_string(), signal: "unsubscribe".to_string(), source: "".to_string(), grant: "wr".to_string() },
],
}),
("browse".to_string(), Role {
Expand Down
12 changes: 6 additions & 6 deletions src/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use shvrpc::metamethod::{AccessLevel, Flag, MetaMethod};
use crate::shvnode::{META_METHOD_DIR, META_METHOD_LS, ShvNode};

pub const DIR_APP_BROKER: &str = ".app/broker";
pub const DIR_APP_BROKER_CURRENTCLIENT: &str = ".app/broker/currentClient";
pub const DIR_BROKER: &str = ".broker";
pub const DIR_BROKER_CURRENTCLIENT: &str = ".broker/currentClient";

pub const METH_CLIENT_INFO: &str = "clientInfo";
pub const METH_MOUNTED_CLIENT_INFO: &str = "mountedClientInfo";
Expand Down Expand Up @@ -33,8 +33,8 @@ pub const METH_UNSUBSCRIBE: &str = "unsubscribe";
pub const METH_SUBSCRIPTIONS: &str = "subscriptions";


pub(crate) struct AppBrokerNode {}
impl AppBrokerNode {
pub(crate) struct BrokerNode {}
impl BrokerNode {
pub fn new_shvnode(&self) -> ShvNode {
ShvNode { methods: vec![
&META_METHOD_DIR,
Expand Down Expand Up @@ -79,8 +79,8 @@ const META_METH_SUBSCRIBE: MetaMethod = MetaMethod { name: METH_SUBSCRIBE, flags
const META_METH_UNSUBSCRIBE: MetaMethod = MetaMethod { name: METH_UNSUBSCRIBE, flags: Flag::None as u32, access: AccessLevel::Browse, param: "SubscribeParams", result: "void", description: "" };
const META_METH_SUBSCRIPTIONS: MetaMethod = MetaMethod { name: METH_SUBSCRIPTIONS, flags: Flag::None as u32, access: AccessLevel::Browse, param: "void", result: "List", description: "" };

pub(crate) struct AppBrokerCurrentClientNode {}
impl AppBrokerCurrentClientNode {
pub(crate) struct BrokerCurrentClientNode {}
impl BrokerCurrentClientNode {
pub fn new_shvnode(&self) -> ShvNode {
ShvNode { methods: vec![
&META_METHOD_DIR,
Expand Down
14 changes: 7 additions & 7 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ async fn test_broker_loop() {
let register_device = BrokerCommand::RegisterDevice {
client_id, device_id: Some("test-device".into()),
mount_point: Default::default(),
subscribe_path: Some(SubscribePath::CanSubscribe(".app/broker/currentClient".into()))
subscribe_path: Some(SubscribePath::CanSubscribe(".broker/currentClient".into()))
};
broker_sender.send(register_device).await.unwrap();

Expand All @@ -91,7 +91,7 @@ async fn test_broker_loop() {
assert_eq!(resp, RpcValue::from(true));

// test current client info
let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await;
let resp = call(".broker/currentClient", "info", None, &call_ctx).await;
let m = resp.as_map();
assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2));
assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("shv/test/device"));
Expand All @@ -102,19 +102,19 @@ async fn test_broker_loop() {
let subs = Subscription::new("shv/**", "", "");
{
// subscribe
let result = call(".app/broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
assert_eq!(result.as_bool(), true);
// cannot subscribe the same twice
let result = call(".app/broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
assert_eq!(result.as_bool(), false);
let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await;
let resp = call(".broker/currentClient", "info", None, &call_ctx).await;
let subs = resp.as_map().get("subscriptions").unwrap();
let subs_list = subs.as_list();
assert_eq!(subs_list.len(), 1);
}
{
call(".app/broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await;
call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await;
let resp = call(".broker/currentClient", "info", None, &call_ctx).await;
let subs = resp.as_map().get("subscriptions").unwrap();
let subs_list = subs.as_list();
assert_eq!(subs_list.len(), 0);
Expand Down
4 changes: 2 additions & 2 deletions tests/child-broker-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ access:
subscribe:
roles: []
access:
- paths: .app/broker/currentClient
- paths: .broker/currentClient
signal: subscribe
grant: wr
- paths: .app/broker/currentClient
- paths: .broker/currentClient
signal: unsubscribe
grant: wr
device:
Expand Down
23 changes: 14 additions & 9 deletions tests/test_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ fn test_broker() -> shvrpc::Result<()> {
println!("====== broker =====");
println!("---broker---: :ls(\".app\")");
assert_eq!(shv_call_child("", "ls", r#"".app""#)?, RpcValue::from(true));
assert_eq!(shv_call_child(".app", "ls", r#""broker""#)?, RpcValue::from(true));
assert_eq!(shv_call_child(".app/broker", "ls", r#""client""#)?, RpcValue::from(true));
//assert_eq!(shv_call_child(".app", "ls", r#""broker""#)?, RpcValue::from(true));
assert_eq!(shv_call_child(".broker", "ls", r#""client""#)?, RpcValue::from(true));
{
println!("---broker---: .app:dir()");
let expected_methods = vec![
Expand Down Expand Up @@ -106,22 +106,27 @@ fn test_broker() -> shvrpc::Result<()> {
println!("---broker---: test/device/number:ls()");
assert_eq!(shv_call_child("test/device/state/number", "ls", "")?, rpcvalue::List::new().into());
assert_eq!(shv_call_parent("shv/test/child-broker/device/state/number", "ls", "")?, rpcvalue::List::new().into());
println!("---broker---: .app/broker:clients()");
assert!(!shv_call_child(".app/broker", "clients", "")?.as_list().is_empty());
assert_eq!(shv_call_parent("shv/test/child-broker/device/state/number", "set", "27")?, ().into());
assert_eq!(shv_call_parent("shv/test/child-broker/device/state/number", "get", "")?, 27.into());
println!("---broker---: .broker:clients()");
assert!(!shv_call_child(".broker", "clients", "")?.as_list().is_empty());

println!("---broker---: .app/broker:mounts()");
assert_eq!(shv_call_child(".app/broker", "mounts", "")?, vec![RpcValue::from("test/device")].into());
println!("---broker---: .broker:mounts()");
assert_eq!(shv_call_child(".broker", "mounts", "")?, vec![RpcValue::from("test/device")].into());
println!("====== subscriptions =====");
fn check_subscription(property_path: &str, subscribe_path: &str, port: i32) -> shvrpc::Result<()> {
//let info = shv_call_child(".app/broker/currentClient", "info", "")?;
//let info = shv_call_child(".broker/currentClient", "info", "")?;
//println!("INFO: {info}");
let calls: Vec<String> = vec![
format!(r#".app/broker/currentClient:subscribe {{"methods": "chng", "paths": "{subscribe_path}"}}"#),
format!(r#".broker/currentClient:subscribe {{"methods": "chng", "paths": "{subscribe_path}"}}"#),
format!(r#"{property_path}:set 42"#),
format!(r#".app/broker/currentClient:unsubscribe {{"methods": "chng", "paths": "{subscribe_path}"}}"#),
format!(r#".broker/currentClient:unsubscribe {{"methods": "chng", "paths": "{subscribe_path}"}}"#),
format!(r#"{property_path}:set 123"#),
];
println!("shv_call_many property: {property_path}");
for c in calls.iter() { println!("\t{}", c); }
let values = shv_call_many(calls, Some(port))?;
println!("shv_call_many result:");
for v in values.iter() { println!("\t{}", v); }
let expected: Vec<String> = vec![
"RES true".into(), // response to subscribe
Expand Down

0 comments on commit 828788a

Please sign in to comment.