diff --git a/Cargo.toml b/Cargo.toml index 25e34a7..2606575 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shvbroker" -version = "3.1.3" +version = "3.1.4" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index 8de2907..67ba5ef 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -368,10 +368,10 @@ impl BrokerState { ConnectionKind::ToChildBroker { mount_point, .. } => { Some(mount_point.to_string()) } } } - PeerKind::Device { device_id, mount_point, .. } => loop { + PeerKind::Device { device_id, mount_point, .. } => 'find_mount: { if mount_point.starts_with("test/") { info!("Client id: {} mounted on path: '{}'", peer_id, &mount_point); - break Some(mount_point.clone()); + break 'find_mount Some(mount_point.clone()); } if let Some(device_id) = &device_id { match self.access.mounts.get(device_id) { @@ -381,11 +381,11 @@ impl BrokerState { Some(mount) => { *mount_point = mount.mount_point.clone(); info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point); - break Some(mount_point.clone()); + break 'find_mount Some(mount_point.clone()); } } } - break None + None } }; let peer = Peer { diff --git a/src/peer.rs b/src/peer.rs index 9771f73..61da671 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -51,6 +51,8 @@ pub(crate) async fn server_peer_loop(peer_id: PeerId, broker_writer: Sender match frame { Ok(frame) => { - //println!("=====================> Recv frame, client id: {peer_id}, frame: {}", frame); broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; drop(fut_receive_frame); fut_receive_frame = frame_reader.receive_frame().fuse(); @@ -251,13 +252,13 @@ fn is_dot_local_request(frame: &RpcFrame) -> bool { } false } -async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender) -> shvrpc::Result<()> { - match &config.connection_kind { - ConnectionKind::ToParentBroker{ shv_root } => { +async fn process_broker_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, connection_kind: &ConnectionKind, broker_writer: Sender) -> shvrpc::Result<()> { + match &connection_kind { + ConnectionKind::ToParentBroker{ .. } => { // Only RPC requests can be received from parent broker, // no signals, no responses if frame.is_request() { - frame = fix_request_frame_shv_root(frame, shv_root)?; + frame = fix_request_frame_shv_root(frame, connection_kind)?; broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; } else if frame.is_response() { broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?; @@ -297,6 +298,8 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection let bwr = BufWriter::new(writer); let mut frame_reader = StreamFrameReader::new(brd); let mut frame_writer = StreamFrameWriter::new(bwr); + frame_reader.set_peer_id(peer_id); + frame_writer.set_peer_id(peer_id); // login let (user, password) = login_from_url(&url); @@ -339,7 +342,7 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection }, res_frame = fut_receive_frame => match res_frame { Ok(frame) => { - process_client_peer_frame(peer_id, frame, &config, broker_writer.clone()).await?; + process_broker_client_peer_frame(peer_id, frame, &config.connection_kind, broker_writer.clone()).await?; drop(fut_receive_frame); fut_receive_frame = frame_reader.receive_frame().fuse(); } @@ -372,9 +375,9 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection } } } - ConnectionKind::ToChildBroker{shv_root, .. } => { + ConnectionKind::ToChildBroker{ .. } => { if frame.is_request() { - frame = fix_request_frame_shv_root(frame, shv_root)?; + frame = fix_request_frame_shv_root(frame, &config.connection_kind)?; } } } @@ -390,9 +393,17 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection }; Ok(()) } -fn fix_request_frame_shv_root(mut frame: RpcFrame, shv_root: &str) -> shvrpc::Result { +fn fix_request_frame_shv_root(mut frame: RpcFrame, connection_kind: &ConnectionKind) -> shvrpc::Result { let shv_path = frame.shv_path().unwrap_or_default().to_owned(); - println!("current path: {shv_path}"); + let (add_dot_local_hack, shv_root) = match connection_kind { + ConnectionKind::ToParentBroker { shv_root } => { + (shv_path.is_empty(), shv_root) + } + ConnectionKind::ToChildBroker { shv_root, .. } => { + (&shv_path == shv_root, shv_root) + } + }; + // println!("current path: {shv_path}"); let shv_path = if starts_with_path(&shv_path, ".broker") { if frame.method() == Some(METH_SUBSCRIBE) || frame.method() == Some(METH_UNSUBSCRIBE) { // prepend exported root to subscribed path @@ -403,13 +414,13 @@ fn fix_request_frame_shv_root(mut frame: RpcFrame, shv_root: &str) -> shvrpc::Re // hack to enable parent broker to call paths under exported_root strip_prefix_path(&shv_path, DOT_LOCAL_DIR).expect("DOT_LOCAL_DIR").to_string() } else { - if shv_path.is_empty() && is_dot_local_granted(&frame) { + if add_dot_local_hack && is_dot_local_granted(&frame) { frame.meta.insert(DOT_LOCAL_HACK, true.into()); } join_path(shv_root, &shv_path) }; frame.set_shvpath(&shv_path); - println!("new path: {}", frame.shv_path().unwrap_or_default()); + // println!("new path: {}", frame.shv_path().unwrap_or_default()); Ok(frame) } fn fix_subscribe_param(frame: RpcFrame, exported_root: &str) -> shvrpc::Result { diff --git a/src/shvnode.rs b/src/shvnode.rs index 0768e6b..bd33d38 100644 --- a/src/shvnode.rs +++ b/src/shvnode.rs @@ -104,7 +104,8 @@ pub fn process_local_dir_ls(mounts: &BTreeMap, frame: &RpcFrame) - return None } let shv_path = frame.shv_path().unwrap_or_default(); - let children_on_path = children_on_path(mounts, shv_path).map(|children| { + let children = children_on_path(mounts, shv_path); + let children = children.map(|children| { if frame.meta.get(DOT_LOCAL_HACK).is_some() { let mut children = children; children.insert(0, DOT_LOCAL_DIR.into()); @@ -114,13 +115,13 @@ pub fn process_local_dir_ls(mounts: &BTreeMap, frame: &RpcFrame) - } }); let mount_pair = find_longest_prefix(mounts, shv_path); - if mount_pair.is_none() && children_on_path.is_none() { + if mount_pair.is_none() && children.is_none() { // path doesn't exist return Some(Err(RpcError::new(RpcErrorCode::MethodNotFound, format!("Invalid shv path: {}", shv_path)))) } let is_mount_point = mount_pair.is_some() && mount_pair.unwrap().1.is_empty(); - let is_remote_dir = mount_pair.is_some() && children_on_path.is_none(); - let is_tree_leaf = mount_pair.is_some() && children_on_path.is_some() && children_on_path.as_ref().unwrap().is_empty(); + let is_remote_dir = mount_pair.is_some() && children.is_none(); + let is_tree_leaf = mount_pair.is_some() && children.is_some() && children.as_ref().unwrap().is_empty(); //println!("shv path: {shv_path}, method: {method}, mount pair: {:?}", mount_pair); //println!("is_mount_point: {is_mount_point}, is_tree_leaf: {is_tree_leaf}"); if method == METH_DIR && !is_mount_point && !is_remote_dir && !is_tree_leaf { @@ -135,7 +136,7 @@ pub fn process_local_dir_ls(mounts: &BTreeMap, frame: &RpcFrame) - if method == METH_LS && !is_tree_leaf && !is_remote_dir { // ls on not-leaf node must be resolved locally if let Ok(rpcmsg) = frame.to_rpcmesage() { - let ls = ls_children_to_result(children_on_path, rpcmsg.param().into()); + let ls = ls_children_to_result(children, rpcmsg.param().into()); return Some(ls) } else { return Some(Err(RpcError::new(RpcErrorCode::InvalidRequest, "Cannot convert RPC frame to Rpc message"))) diff --git a/tests/test_broker.rs b/tests/test_broker.rs index ccfb48c..cb17601 100644 --- a/tests/test_broker.rs +++ b/tests/test_broker.rs @@ -37,7 +37,7 @@ fn test_broker() -> shvrpc::Result<()> { }, } ]; - let cfg_fn = "/tmp/test-broker-config1.yaml"; + let cfg_fn = "/tmp/test-broker-config3756.yaml"; fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; let mut process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") .arg("--config").arg(cfg_fn) @@ -170,45 +170,42 @@ fn test_broker() -> shvrpc::Result<()> { assert_eq!(shv_call_parent("test/child-broker/device/state/number", "get", "")?, RpcValue::from(123)); check_subscription("test/child-broker/device/state/number", "test/**", 3755)?; - //test_child_broker_as_client()?; + test_child_broker_as_client()?; Ok(()) } -// fn test_child_broker_as_client() -> shvrpc::Result<()> { -// let mut config = BrokerConfig::default(); -// config.listen.tcp = Some("localhost:3754".into()); -// config.connections = vec![ -// BrokerConnectionConfig { -// enabled: true, -// client: ClientConfig{ -// url: "tcp://localhost:3755?user=test&password=test".to_string(), -// device_id: None, -// mount: None, -// heartbeat_interval: "1m".to_string(), -// reconnect_interval: None, -// }, -// connection_kind: ConnectionKind::ToChildBroker { -// shv_root: "test/child-broker/device".to_string(), -// mount_point: "test/child-device".to_string(), -// }, -// } -// ]; -// let cfg_fn = "/tmp/test-broker-config2.yaml"; -// fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; -// let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") -// .arg("--config").arg(cfg_fn) -// //.arg("-v").arg("Acc") -// .spawn()?); -// thread::sleep(Duration::from_millis(100)); -// assert!(broker_process_guard.is_running()); -// -// pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result { -// shv_call(path, method, param, Some(3754)) -// } -// //assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice")); -// -// thread::sleep(Duration::from_millis(1000 * 60 * 5)); -// -// Ok(()) -// } \ No newline at end of file +fn test_child_broker_as_client() -> shvrpc::Result<()> { + let mut config = BrokerConfig::default(); + config.listen.tcp = Some("localhost:3754".into()); + config.connections = vec![ + BrokerConnectionConfig { + enabled: true, + client: ClientConfig{ + url: "tcp://localhost:3755?user=test&password=test".to_string(), + device_id: None, + mount: None, + heartbeat_interval: "1m".to_string(), + reconnect_interval: None, + }, + connection_kind: ConnectionKind::ToChildBroker { + shv_root: "test/child-broker/device".to_string(), + mount_point: "test/child-device".to_string(), + }, + } + ]; + let cfg_fn = "/tmp/test-broker-config3754.yaml"; + fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?; + let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker") + .arg("--config").arg(cfg_fn) + //.arg("-v").arg("Acc") + .spawn()?); + thread::sleep(Duration::from_millis(100)); + assert!(broker_process_guard.is_running()); + + pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result { + shv_call(path, method, param, Some(3754)) + } + assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice")); + Ok(()) +} \ No newline at end of file