Skip to content

Commit

Permalink
process_client_peer_frame_from_parent_broker() and process_client_pee…
Browse files Browse the repository at this point in the history
…r_frame_from_child_broker()

replaced by process_client_peer_frame()
  • Loading branch information
Fanda Vacek committed Oct 20, 2024
1 parent 5f6b92d commit 0ecd755
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 132 deletions.
2 changes: 1 addition & 1 deletion src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub(crate) enum SubscribePath {
pub(crate) enum PeerKind {
Client,
ParentBroker,
ChildBroker,
//ChildBroker,
Device {
device_id: Option<String>,
mount_point: String,
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub struct BrokerConfig {
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum TreeDirection {
ToParentBroker {shv_root: String},
ToChildBroker {shv_root: String, user: String, mount_point: String},
ToChildBroker {shv_root: String, mount_point: String},
}
impl Default for TreeDirection {
fn default() -> Self {
Expand Down
62 changes: 26 additions & 36 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,35 +244,32 @@ fn is_dot_local_request(frame: &RpcFrame) -> bool {
}
false
}
async fn process_client_peer_frame_from_parent_broker(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
let TreeDirection::ToParentBroker { shv_root } = &config.tree_direction else {
panic!("wrong enum path");
};
// Only RPC requests can be received from parent broker,
// no signals, no responses
if frame.is_request() {
frame = fix_request_frame_shv_root(frame, shv_root)?;
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else {
warn!("RPC signal or response should not be received from client connection to parent broker: {}", &frame);
}
Ok(())
}
async fn process_client_peer_frame_from_child_broker(peer_id: PeerId, frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
let TreeDirection::ToChildBroker { shv_root, .. } = &config.tree_direction else {
panic!("wrong enum path");
async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
match &config.tree_direction {
TreeDirection::ToParentBroker{ shv_root } => {
// Only RPC requests can be received from parent broker,
// no signals, no responses
if frame.is_request() {
frame = fix_request_frame_shv_root(frame, shv_root)?;
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else {
warn!("RPC signal or response should not be received from client connection to parent broker: {}", &frame);
}
}
TreeDirection::ToChildBroker{ shv_root, .. } => {
// Only RPC signals and responses can be received from parent broker,
// no requests
let mut frame = frame;
if frame.is_signal() {
frame.set_shvpath(&join_path(shv_root, frame.shv_path().unwrap_or_default()));
}
if frame.is_response() || frame.is_signal() {
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else {
warn!("RPC request should not be received from client connection to child broker: {}", &frame);
}
}
};
// Only RPC signals and responses can be received from parent broker,
// no requests
let mut frame = frame;
if frame.is_signal() {
frame.set_shvpath(&join_path(shv_root, frame.shv_path().unwrap_or_default()));
}
if frame.is_response() || frame.is_signal() {
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else {
warn!("RPC request should not be received from client connection to child broker: {}", &frame);
}
Ok(())
}
async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
Expand Down Expand Up @@ -336,14 +333,7 @@ async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broke
},
res_frame = fut_receive_frame => match res_frame {
Ok(frame) => {
match &config.tree_direction {
TreeDirection::ToParentBroker{..} => {
process_client_peer_frame_from_parent_broker(peer_id, frame, &config, broker_writer.clone()).await?;
}
TreeDirection::ToChildBroker{..} => {
process_client_peer_frame_from_child_broker(peer_id, frame, &config, broker_writer.clone()).await?;
}
}
process_client_peer_frame(peer_id, frame, &config, broker_writer.clone()).await?;
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
}
Expand Down
169 changes: 169 additions & 0 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,175 @@ async fn test_broker_loop() {
broker_task.cancel().await;
}

// #[async_std::test]
// async fn test_to_child_broker_connection() {
// let mut config = BrokerConfig::default();
// config.connections = vec![
// BrokerConnectionConfig {
// enabled: true,
// client: ClientConfig{
// url: "localhost".to_string(),
// device_id: None,
// mount: None,
// heartbeat_interval: "1m".to_string(),
// reconnect_interval: None,
// },
// tree_direction: TreeDirection::ToChildBroker {
// shv_root: ".broker".to_string(),
// mount_point: "test/child-broker".to_string(),
// },
// }
// ];
// let access = config.access.clone();
// let sql = Connection::open_in_memory().unwrap();
// let broker = BrokerImpl::new(&config, access, Some(sql));
// let broker_sender = broker.command_sender.clone();
// let broker_task = task::spawn(crate::brokerimpl::broker_loop(broker));
//
// let (peer_writer, peer_reader) = channel::unbounded::<BrokerToPeerMessage>();
// let client_id = 2;
//
// let call_ctx = CallCtx {
// writer: &broker_sender,
// reader: &peer_reader,
// client_id,
// };
//
// // login
// let user = "test";
// //let password = "admin";
// broker_sender.send(BrokerCommand::NewPeer {
// peer_id: client_id,
// peer_kind: PeerKind::Client,
// user: user.to_string(),
// mount_point: Some("test/device".into()),
// device_id: None,
// sender: peer_writer.clone() }).await.unwrap();
//
// let resp = call(".broker", "ls", Some("access".into()), &call_ctx).await.unwrap();
// assert_eq!(resp, RpcValue::from(true));
// let resp = call(".broker/access", "ls", None, &call_ctx).await.unwrap();
// assert!(resp.is_list());
// assert!(resp.as_list().iter().any(|s| s.as_str() == "mounts"));
// let resp = call(".broker/acce", "ls", None, &call_ctx).await;
// assert!(resp.is_err());
//
// // device should be mounted as 'shv/dev/test'
// let resp = call("test", "ls", Some("device".into()), &call_ctx).await.unwrap();
// assert_eq!(resp, RpcValue::from(true));
//
// // test current client info
// let resp = call(".broker/currentClient", "info", None, &call_ctx).await.unwrap();
// let m = resp.as_map();
// assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2));
// assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("test/device"));
// assert_eq!(m.get("userName").unwrap(), &RpcValue::from(user));
// assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(shvproto::Map::new()));
//
// // subscriptions
// let subs_ri = "shv/**:*";
// let subs = SubscriptionParam { ri: ShvRI::try_from(subs_ri).unwrap(), ttl: None };
// {
// // subscribe
// let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await.unwrap();
// assert!(result.as_bool());
// // cannot subscribe the same twice
// let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await.unwrap();
// assert!(!result.as_bool());
// let resp = call(".broker/currentClient", "subscriptions", None, &call_ctx).await.unwrap();
// let subs_map = resp.as_map();
// // let s = format!("{:?}", subs_map);
// assert_eq!(subs_map.len(), 1);
// assert_eq!(subs_map.first_key_value().unwrap().0, subs_ri);
// }
// {
// call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await.unwrap();
// let resp = call(".broker/currentClient", "info", None, &call_ctx).await.unwrap();
// let subs = resp.as_map().get("subscriptions").unwrap();
// let subs_map = subs.as_map();
// assert_eq!(subs_map.len(), 0);
// }
//
// let config = BrokerConfig::default();
// let users: Vec<_> = config.access.users.keys().map(|k| k.to_string()).collect();
// let roles: Vec<_> = config.access.roles.keys().map(|k| k.to_string()).collect();
// // access/mounts
// {
// let path = ".broker/access/mounts";
// {
// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap();
// let list = resp.as_list();
// assert_eq!(list, RpcValue::from(["test-child-broker","test-device"].to_vec()).as_list());
// let resp = call(&join_path(path, "test-device"), METH_VALUE, None, &call_ctx).await.unwrap();
// let mount1 = Mount::try_from(&resp).unwrap();
// let mount2 = Mount { mount_point: "test/device".to_string(), description: "Testing device mount-point".to_string() };
// assert_eq!(mount1, mount2);
// }
// {
// let mount = Mount{ mount_point: "foo".to_string(), description: "bar".to_string() };
// call(path, METH_SET_VALUE, Some(vec!["baz".into(), mount.to_rpcvalue().unwrap()].into()), &call_ctx).await.unwrap();
// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap();
// let list = resp.as_list();
// assert_eq!(list, RpcValue::from(["baz", "test-child-broker","test-device"].to_vec()).as_list());
// let resp = call(&join_path(path, "baz"), METH_VALUE, None, &call_ctx).await.unwrap();
// assert_eq!(mount, Mount::try_from(&resp).unwrap());
// }
//
// // access/users
// {
// let path = ".broker/access/users";
// {
// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap();
// let list = resp.as_list();
// assert_eq!(list, RpcValue::from(users.clone()).as_list());
// let resp = call(&join_path(path, "test"), METH_VALUE, None, &call_ctx).await.unwrap();
// let user1 = User::try_from(&resp).unwrap();
// let user2 = User { password: Password::Plain("test".into()), roles: vec!["tester".into()] };
// assert_eq!(user1, user2);
// }
// {
// let user = User { password: Password::Plain("foo".into()), roles: vec!["bar".into()] };
// call(path, METH_SET_VALUE, Some(vec!["baz".into(), user.to_rpcvalue().unwrap()].into()), &call_ctx).await.unwrap();
// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap();
// let list = resp.as_list();
// let mut users = users;
// users.push("baz".to_string());
// users.sort();
// assert_eq!(list, RpcValue::from(users).as_list());
// let resp = call(&join_path(path, "baz"), METH_VALUE, None, &call_ctx).await.unwrap();
// assert_eq!(user, User::try_from(&resp).unwrap());
// }
// }
//
// // access/roles
// {
// let path = ".broker/access/roles";
// {
// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap();
// let list = resp.as_list();
// assert_eq!(list, RpcValue::from(roles.clone()).as_list());
// let resp = call(&join_path(path, "tester"), METH_VALUE, None, &call_ctx).await.unwrap();
// let role1 = Role::try_from(&resp).unwrap();
// let role2 = config.access.roles.get("tester").unwrap();
// assert_eq!(&role1, role2);
// }
// {
// let role = Role { roles: vec!["foo".into()], access: vec![AccessRule{ shv_ri: "bar/**:*".into(), grant: "cfg".into() }] };
// call(path, METH_SET_VALUE, Some(vec!["baz".into(), role.to_rpcvalue().unwrap()].into()), &call_ctx).await.unwrap();
// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap();
// let list = resp.as_list();
// let mut roles = roles;
// roles.push("baz".to_string());
// roles.sort();
// assert_eq!(list, RpcValue::from(roles).as_list());
// let resp = call(&join_path(path, "baz"), METH_VALUE, None, &call_ctx).await.unwrap();
// assert_eq!(role, Role::try_from(&resp).unwrap());
// }
// }
// }
// broker_task.cancel().await;
// }

#[async_std::test]
async fn test_tunnel_loop() {
let config = BrokerConfig::default();
Expand Down
85 changes: 0 additions & 85 deletions tests/child-broker-config.yaml

This file was deleted.

3 changes: 1 addition & 2 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ pub fn shv_call(path: &str, method: &str, param: &str, port: Option<i32>) -> shv
let output = match Command::new(SHVCALL_BINARY)
.arg("-v").arg(".:T")
.arg("--url").arg(format!("tcp://localhost:{port}?user=admin&password=admin"))
.arg("--path").arg(path)
.arg("--method").arg(method)
.arg("--method").arg(format!("{path}:{method}"))
.arg("--param").arg(param)
//.arg("--output-format").arg(output_format.as_str())
.output() {
Expand Down
Loading

0 comments on commit 0ecd755

Please sign in to comment.