diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index a39d8fd..9439b56 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -104,7 +104,7 @@ pub(crate) enum SubscribePath { pub(crate) enum PeerKind { Client, ParentBroker, - ChildBroker, + //ChildBroker, Device { device_id: Option, mount_point: String, diff --git a/src/config.rs b/src/config.rs index 5410b6c..d12ceb2 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,7 +21,7 @@ pub struct BrokerConfig { #[derive(Serialize, Deserialize, Clone, Debug)] pub enum TreeDirection { ToParentBroker {shv_root: String}, - ToChildBroker {shv_root: String, user: String, mount_point: String}, + ToChildBroker {shv_root: String, mount_point: String}, } impl Default for TreeDirection { fn default() -> Self { diff --git a/src/peer.rs b/src/peer.rs index 7f2ec41..fab0d62 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -244,35 +244,32 @@ fn is_dot_local_request(frame: &RpcFrame) -> bool { } false } -async fn process_client_peer_frame_from_parent_broker(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { - let TreeDirection::ToParentBroker { shv_root } = &config.tree_direction else { - panic!("wrong enum path"); - }; - // Only RPC requests can be received from parent broker, - // no signals, no responses - if frame.is_request() { - frame = fix_request_frame_shv_root(frame, shv_root)?; - broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; - } else { - warn!("RPC signal or response should not be received from client connection to parent broker: {}", &frame); - } - Ok(()) -} -async fn process_client_peer_frame_from_child_broker(peer_id: PeerId, frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { - let TreeDirection::ToChildBroker { shv_root, .. } = &config.tree_direction else { - panic!("wrong enum path"); +async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { + match &config.tree_direction { + TreeDirection::ToParentBroker{ shv_root } => { + // Only RPC requests can be received from parent broker, + // no signals, no responses + if frame.is_request() { + frame = fix_request_frame_shv_root(frame, shv_root)?; + broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; + } else { + warn!("RPC signal or response should not be received from client connection to parent broker: {}", &frame); + } + } + TreeDirection::ToChildBroker{ shv_root, .. } => { + // Only RPC signals and responses can be received from parent broker, + // no requests + let mut frame = frame; + if frame.is_signal() { + frame.set_shvpath(&join_path(shv_root, frame.shv_path().unwrap_or_default())); + } + if frame.is_response() || frame.is_signal() { + broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; + } else { + warn!("RPC request should not be received from client connection to child broker: {}", &frame); + } + } }; - // Only RPC signals and responses can be received from parent broker, - // no requests - let mut frame = frame; - if frame.is_signal() { - frame.set_shvpath(&join_path(shv_root, frame.shv_path().unwrap_or_default())); - } - if frame.is_response() || frame.is_signal() { - broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; - } else { - warn!("RPC request should not be received from client connection to child broker: {}", &frame); - } Ok(()) } async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { @@ -336,14 +333,7 @@ async fn client_peer_loop(peer_id: PeerId, config: BrokerConnectionConfig, broke }, res_frame = fut_receive_frame => match res_frame { Ok(frame) => { - match &config.tree_direction { - TreeDirection::ToParentBroker{..} => { - process_client_peer_frame_from_parent_broker(peer_id, frame, &config, broker_writer.clone()).await?; - } - TreeDirection::ToChildBroker{..} => { - process_client_peer_frame_from_child_broker(peer_id, frame, &config, broker_writer.clone()).await?; - } - } + process_client_peer_frame(peer_id, frame, &config, broker_writer.clone()).await?; drop(fut_receive_frame); fut_receive_frame = frame_reader.receive_frame().fuse(); } diff --git a/src/test.rs b/src/test.rs index 25665f7..2baf5f6 100644 --- a/src/test.rs +++ b/src/test.rs @@ -213,6 +213,175 @@ async fn test_broker_loop() { broker_task.cancel().await; } +// #[async_std::test] +// async fn test_to_child_broker_connection() { +// let mut config = BrokerConfig::default(); +// config.connections = vec![ +// BrokerConnectionConfig { +// enabled: true, +// client: ClientConfig{ +// url: "localhost".to_string(), +// device_id: None, +// mount: None, +// heartbeat_interval: "1m".to_string(), +// reconnect_interval: None, +// }, +// tree_direction: TreeDirection::ToChildBroker { +// shv_root: ".broker".to_string(), +// mount_point: "test/child-broker".to_string(), +// }, +// } +// ]; +// let access = config.access.clone(); +// let sql = Connection::open_in_memory().unwrap(); +// let broker = BrokerImpl::new(&config, access, Some(sql)); +// let broker_sender = broker.command_sender.clone(); +// let broker_task = task::spawn(crate::brokerimpl::broker_loop(broker)); +// +// let (peer_writer, peer_reader) = channel::unbounded::(); +// let client_id = 2; +// +// let call_ctx = CallCtx { +// writer: &broker_sender, +// reader: &peer_reader, +// client_id, +// }; +// +// // login +// let user = "test"; +// //let password = "admin"; +// broker_sender.send(BrokerCommand::NewPeer { +// peer_id: client_id, +// peer_kind: PeerKind::Client, +// user: user.to_string(), +// mount_point: Some("test/device".into()), +// device_id: None, +// sender: peer_writer.clone() }).await.unwrap(); +// +// let resp = call(".broker", "ls", Some("access".into()), &call_ctx).await.unwrap(); +// assert_eq!(resp, RpcValue::from(true)); +// let resp = call(".broker/access", "ls", None, &call_ctx).await.unwrap(); +// assert!(resp.is_list()); +// assert!(resp.as_list().iter().any(|s| s.as_str() == "mounts")); +// let resp = call(".broker/acce", "ls", None, &call_ctx).await; +// assert!(resp.is_err()); +// +// // device should be mounted as 'shv/dev/test' +// let resp = call("test", "ls", Some("device".into()), &call_ctx).await.unwrap(); +// assert_eq!(resp, RpcValue::from(true)); +// +// // test current client info +// let resp = call(".broker/currentClient", "info", None, &call_ctx).await.unwrap(); +// let m = resp.as_map(); +// assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2)); +// assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("test/device")); +// assert_eq!(m.get("userName").unwrap(), &RpcValue::from(user)); +// assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(shvproto::Map::new())); +// +// // subscriptions +// let subs_ri = "shv/**:*"; +// let subs = SubscriptionParam { ri: ShvRI::try_from(subs_ri).unwrap(), ttl: None }; +// { +// // subscribe +// let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await.unwrap(); +// assert!(result.as_bool()); +// // cannot subscribe the same twice +// let result = call(".broker/currentClient", METH_SUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await.unwrap(); +// assert!(!result.as_bool()); +// let resp = call(".broker/currentClient", "subscriptions", None, &call_ctx).await.unwrap(); +// let subs_map = resp.as_map(); +// // let s = format!("{:?}", subs_map); +// assert_eq!(subs_map.len(), 1); +// assert_eq!(subs_map.first_key_value().unwrap().0, subs_ri); +// } +// { +// call(".broker/currentClient", METH_UNSUBSCRIBE, Some(subs.to_rpcvalue()), &call_ctx).await.unwrap(); +// let resp = call(".broker/currentClient", "info", None, &call_ctx).await.unwrap(); +// let subs = resp.as_map().get("subscriptions").unwrap(); +// let subs_map = subs.as_map(); +// assert_eq!(subs_map.len(), 0); +// } +// +// let config = BrokerConfig::default(); +// let users: Vec<_> = config.access.users.keys().map(|k| k.to_string()).collect(); +// let roles: Vec<_> = config.access.roles.keys().map(|k| k.to_string()).collect(); +// // access/mounts +// { +// let path = ".broker/access/mounts"; +// { +// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap(); +// let list = resp.as_list(); +// assert_eq!(list, RpcValue::from(["test-child-broker","test-device"].to_vec()).as_list()); +// let resp = call(&join_path(path, "test-device"), METH_VALUE, None, &call_ctx).await.unwrap(); +// let mount1 = Mount::try_from(&resp).unwrap(); +// let mount2 = Mount { mount_point: "test/device".to_string(), description: "Testing device mount-point".to_string() }; +// assert_eq!(mount1, mount2); +// } +// { +// let mount = Mount{ mount_point: "foo".to_string(), description: "bar".to_string() }; +// call(path, METH_SET_VALUE, Some(vec!["baz".into(), mount.to_rpcvalue().unwrap()].into()), &call_ctx).await.unwrap(); +// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap(); +// let list = resp.as_list(); +// assert_eq!(list, RpcValue::from(["baz", "test-child-broker","test-device"].to_vec()).as_list()); +// let resp = call(&join_path(path, "baz"), METH_VALUE, None, &call_ctx).await.unwrap(); +// assert_eq!(mount, Mount::try_from(&resp).unwrap()); +// } +// +// // access/users +// { +// let path = ".broker/access/users"; +// { +// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap(); +// let list = resp.as_list(); +// assert_eq!(list, RpcValue::from(users.clone()).as_list()); +// let resp = call(&join_path(path, "test"), METH_VALUE, None, &call_ctx).await.unwrap(); +// let user1 = User::try_from(&resp).unwrap(); +// let user2 = User { password: Password::Plain("test".into()), roles: vec!["tester".into()] }; +// assert_eq!(user1, user2); +// } +// { +// let user = User { password: Password::Plain("foo".into()), roles: vec!["bar".into()] }; +// call(path, METH_SET_VALUE, Some(vec!["baz".into(), user.to_rpcvalue().unwrap()].into()), &call_ctx).await.unwrap(); +// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap(); +// let list = resp.as_list(); +// let mut users = users; +// users.push("baz".to_string()); +// users.sort(); +// assert_eq!(list, RpcValue::from(users).as_list()); +// let resp = call(&join_path(path, "baz"), METH_VALUE, None, &call_ctx).await.unwrap(); +// assert_eq!(user, User::try_from(&resp).unwrap()); +// } +// } +// +// // access/roles +// { +// let path = ".broker/access/roles"; +// { +// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap(); +// let list = resp.as_list(); +// assert_eq!(list, RpcValue::from(roles.clone()).as_list()); +// let resp = call(&join_path(path, "tester"), METH_VALUE, None, &call_ctx).await.unwrap(); +// let role1 = Role::try_from(&resp).unwrap(); +// let role2 = config.access.roles.get("tester").unwrap(); +// assert_eq!(&role1, role2); +// } +// { +// let role = Role { roles: vec!["foo".into()], access: vec![AccessRule{ shv_ri: "bar/**:*".into(), grant: "cfg".into() }] }; +// call(path, METH_SET_VALUE, Some(vec!["baz".into(), role.to_rpcvalue().unwrap()].into()), &call_ctx).await.unwrap(); +// let resp = call(path, METH_LS, None, &call_ctx).await.unwrap(); +// let list = resp.as_list(); +// let mut roles = roles; +// roles.push("baz".to_string()); +// roles.sort(); +// assert_eq!(list, RpcValue::from(roles).as_list()); +// let resp = call(&join_path(path, "baz"), METH_VALUE, None, &call_ctx).await.unwrap(); +// assert_eq!(role, Role::try_from(&resp).unwrap()); +// } +// } +// } +// broker_task.cancel().await; +// } + #[async_std::test] async fn test_tunnel_loop() { let config = BrokerConfig::default(); diff --git a/tests/child-broker-config.yaml b/tests/child-broker-config.yaml deleted file mode 100644 index 6ad2629..0000000 --- a/tests/child-broker-config.yaml +++ /dev/null @@ -1,85 +0,0 @@ -listen: - tcp: localhost:3756 -connections: -- enabled: true - client: - url: tcp://child-broker@localhost:3755?password=child-broker - device_id: "test-child-broker" - mount: null - heartbeat_interval: 1m - reconnect_interval: null - tree_direction: !ToParentBroker - shv_root: 'test' - -access: - users: - tester: - password: !Sha1 ab4d8d2a5f480a137067da17100271cd176607a1 - roles: - - tester - test: - password: !Plain test - roles: - - tester - admin: - password: !Plain admin - roles: - - su - user: - password: !Plain user - roles: - - client - child-broker: - password: !Plain child-broker - roles: - - child-broker - roles: - tester: - roles: - - client - access: - - shvRI: 'test/**:*' - grant: cfg - browse: - roles: [] - access: - - shvRI: '**:*' - grant: bws - subscribe: - roles: [] - access: - - shvRI: '.broker/currentClient:subscribe' - grant: wr - - shvRI: '.broker/currentClient:unsubscribe' - grant: wr - device: - roles: - - client - access: [] - client: - roles: - - ping - - subscribe - - browse - access: [] - ping: - roles: [] - access: - - shvRI: '.app:ping' - grant: wr - su: - roles: [] - access: - - shvRI: '**:*' - grant: su - child-broker: - roles: - - device - access: [] - mounts: - test-child-broker: - mountPoint: test/child-broker - description: Testing child broker mount-point - test-device: - mountPoint: test/device - description: Testing device mount-point diff --git a/tests/common/mod.rs b/tests/common/mod.rs index fa29b7a..a5b85fa 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -73,8 +73,7 @@ pub fn shv_call(path: &str, method: &str, param: &str, port: Option) -> shv let output = match Command::new(SHVCALL_BINARY) .arg("-v").arg(".:T") .arg("--url").arg(format!("tcp://localhost:{port}?user=admin&password=admin")) - .arg("--path").arg(path) - .arg("--method").arg(method) + .arg("--method").arg(format!("{path}:{method}")) .arg("--param").arg(param) //.arg("--output-format").arg(output_format.as_str()) .output() { diff --git a/tests/test_broker.rs b/tests/test_broker.rs index 67b6104..9c04340 100644 --- a/tests/test_broker.rs +++ b/tests/test_broker.rs @@ -1,8 +1,10 @@ use std::process::{Command}; -use std::{thread, time::Duration}; +use std::{fs, thread, time::Duration}; use shvproto::{RpcValue, rpcvalue}; +use shvrpc::client::ClientConfig; use shvrpc::metamethod; use shvrpc::metamethod::{Flag, MetaMethod}; +use shvbroker::config::{BrokerConfig, BrokerConnectionConfig, TreeDirection}; use crate::common::{KillProcessGuard, shv_call, shv_call_many}; use shvbroker::shvnode::{METH_DIR, METH_LS, METH_NAME, METH_PING}; @@ -17,12 +19,34 @@ fn test_broker() -> shvrpc::Result<()> { thread::sleep(Duration::from_millis(100)); assert!(broker_process_guard.is_running()); - let mut child_broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") - .arg("--config").arg("tests/child-broker-config.yaml") - //.arg("-v").arg("Acc") - .spawn()?); - thread::sleep(Duration::from_millis(100)); - assert!(child_broker_process_guard.is_running()); + let _process_guard_3756 = { + let mut config = BrokerConfig::default(); + config.listen.tcp = Some("localhost:3756".into()); + config.connections = vec![ + BrokerConnectionConfig { + enabled: true, + client: ClientConfig{ + url: "tcp://child-broker@localhost:3755?password=child-broker".to_string(), + device_id: Some("test-child-broker".into()), + mount: None, + heartbeat_interval: "1m".to_string(), + reconnect_interval: None, + }, + tree_direction: TreeDirection::ToParentBroker { + shv_root: "test".to_string(), + }, + } + ]; + let cfg_fn = "/tmp/test-broker-config1.yaml"; + fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; + let mut process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") + .arg("--config").arg(cfg_fn) + //.arg("-v").arg("Acc") + .spawn()?); + thread::sleep(Duration::from_millis(100)); + assert!(process_guard.is_running()); + process_guard + }; pub fn shv_call_parent(path: &str, method: &str, param: &str) -> shvrpc::Result { shv_call(path, method, param, None) @@ -146,5 +170,45 @@ fn test_broker() -> shvrpc::Result<()> { assert_eq!(shv_call_parent("test/child-broker/device/state/number", "get", "")?, RpcValue::from(123)); check_subscription("test/child-broker/device/state/number", "test/**", 3755)?; + //test_child_broker_as_client()?; + Ok(()) } + +fn test_child_broker_as_client() -> shvrpc::Result<()> { + let mut config = BrokerConfig::default(); + config.listen.tcp = Some("localhost:3754".into()); + config.connections = vec![ + BrokerConnectionConfig { + enabled: true, + client: ClientConfig{ + url: "tcp://localhost:3755?user=test&password=test".to_string(), + device_id: None, + mount: None, + heartbeat_interval: "1m".to_string(), + reconnect_interval: None, + }, + tree_direction: TreeDirection::ToChildBroker { + shv_root: "test/child-broker/device".to_string(), + mount_point: "test/child-device".to_string(), + }, + } + ]; + let cfg_fn = "/tmp/test-broker-config2.yaml"; + fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; + let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") + .arg("--config").arg(cfg_fn) + //.arg("-v").arg("Acc") + .spawn()?); + thread::sleep(Duration::from_millis(100)); + assert!(broker_process_guard.is_running()); + + pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result { + shv_call(path, method, param, Some(3754)) + } + //assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice")); + + thread::sleep(Duration::from_millis(1000 * 60 * 5)); + + Ok(()) +} \ No newline at end of file