Skip to content

Commit

Permalink
Connect to foreign broker and mount arbitrary path to own tree #11
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 21, 2024
1 parent cc2b5b0 commit 274a890
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 61 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvbroker"
version = "3.1.3"
version = "3.1.4"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
8 changes: 4 additions & 4 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ impl BrokerState {
ConnectionKind::ToChildBroker { mount_point, .. } => { Some(mount_point.to_string()) }
}
}
PeerKind::Device { device_id, mount_point, .. } => loop {
PeerKind::Device { device_id, mount_point, .. } => 'find_mount: {
if mount_point.starts_with("test/") {
info!("Client id: {} mounted on path: '{}'", peer_id, &mount_point);
break Some(mount_point.clone());
break 'find_mount Some(mount_point.clone());
}
if let Some(device_id) = &device_id {
match self.access.mounts.get(device_id) {
Expand All @@ -381,11 +381,11 @@ impl BrokerState {
Some(mount) => {
*mount_point = mount.mount_point.clone();
info!("Client id: {}, device id: {} mounted on path: '{}'", peer_id, device_id, &mount_point);
break Some(mount_point.clone());
break 'find_mount Some(mount_point.clone());
}
}
}
break None
None
}
};
let peer = Peer {
Expand Down
35 changes: 23 additions & 12 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub(crate) async fn server_peer_loop(peer_id: PeerId, broker_writer: Sender<Brok

let mut frame_reader = StreamFrameReader::new(brd);
let mut frame_writer = StreamFrameWriter::new(bwr);
frame_reader.set_peer_id(peer_id);
frame_writer.set_peer_id(peer_id);

let mut device_options = RpcValue::null();
let mut user;
Expand Down Expand Up @@ -156,7 +158,6 @@ pub(crate) async fn server_peer_loop(peer_id: PeerId, broker_writer: Sender<Brok
select! {
frame = fut_receive_frame => match frame {
Ok(frame) => {
//println!("=====================> Recv frame, client id: {peer_id}, frame: {}", frame);
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
Expand Down Expand Up @@ -251,13 +252,13 @@ fn is_dot_local_request(frame: &RpcFrame) -> bool {
}
false
}
async fn process_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, config: &BrokerConnectionConfig, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
match &config.connection_kind {
ConnectionKind::ToParentBroker{ shv_root } => {
async fn process_broker_client_peer_frame(peer_id: PeerId, mut frame: RpcFrame, connection_kind: &ConnectionKind, broker_writer: Sender<BrokerCommand>) -> shvrpc::Result<()> {
match &connection_kind {
ConnectionKind::ToParentBroker{ .. } => {
// Only RPC requests can be received from parent broker,
// no signals, no responses
if frame.is_request() {
frame = fix_request_frame_shv_root(frame, shv_root)?;
frame = fix_request_frame_shv_root(frame, connection_kind)?;
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
} else if frame.is_response() {
broker_writer.send(BrokerCommand::FrameReceived { peer_id, frame }).await?;
Expand Down Expand Up @@ -297,6 +298,8 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection
let bwr = BufWriter::new(writer);
let mut frame_reader = StreamFrameReader::new(brd);
let mut frame_writer = StreamFrameWriter::new(bwr);
frame_reader.set_peer_id(peer_id);
frame_writer.set_peer_id(peer_id);

// login
let (user, password) = login_from_url(&url);
Expand Down Expand Up @@ -339,7 +342,7 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection
},
res_frame = fut_receive_frame => match res_frame {
Ok(frame) => {
process_client_peer_frame(peer_id, frame, &config, broker_writer.clone()).await?;
process_broker_client_peer_frame(peer_id, frame, &config.connection_kind, broker_writer.clone()).await?;
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
}
Expand Down Expand Up @@ -372,9 +375,9 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection
}
}
}
ConnectionKind::ToChildBroker{shv_root, .. } => {
ConnectionKind::ToChildBroker{ .. } => {
if frame.is_request() {
frame = fix_request_frame_shv_root(frame, shv_root)?;
frame = fix_request_frame_shv_root(frame, &config.connection_kind)?;
}
}
}
Expand All @@ -390,9 +393,17 @@ async fn broker_client_connection_loop(peer_id: PeerId, config: BrokerConnection
};
Ok(())
}
fn fix_request_frame_shv_root(mut frame: RpcFrame, shv_root: &str) -> shvrpc::Result<RpcFrame> {
fn fix_request_frame_shv_root(mut frame: RpcFrame, connection_kind: &ConnectionKind) -> shvrpc::Result<RpcFrame> {
let shv_path = frame.shv_path().unwrap_or_default().to_owned();
println!("current path: {shv_path}");
let (add_dot_local_hack, shv_root) = match connection_kind {
ConnectionKind::ToParentBroker { shv_root } => {
(shv_path.is_empty(), shv_root)
}
ConnectionKind::ToChildBroker { shv_root, .. } => {
(&shv_path == shv_root, shv_root)
}
};
// println!("current path: {shv_path}");
let shv_path = if starts_with_path(&shv_path, ".broker") {
if frame.method() == Some(METH_SUBSCRIBE) || frame.method() == Some(METH_UNSUBSCRIBE) {
// prepend exported root to subscribed path
Expand All @@ -403,13 +414,13 @@ fn fix_request_frame_shv_root(mut frame: RpcFrame, shv_root: &str) -> shvrpc::Re
// hack to enable parent broker to call paths under exported_root
strip_prefix_path(&shv_path, DOT_LOCAL_DIR).expect("DOT_LOCAL_DIR").to_string()
} else {
if shv_path.is_empty() && is_dot_local_granted(&frame) {
if add_dot_local_hack && is_dot_local_granted(&frame) {
frame.meta.insert(DOT_LOCAL_HACK, true.into());
}
join_path(shv_root, &shv_path)
};
frame.set_shvpath(&shv_path);
println!("new path: {}", frame.shv_path().unwrap_or_default());
// println!("new path: {}", frame.shv_path().unwrap_or_default());
Ok(frame)
}
fn fix_subscribe_param(frame: RpcFrame, exported_root: &str) -> shvrpc::Result<RpcFrame> {
Expand Down
11 changes: 6 additions & 5 deletions src/shvnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ pub fn process_local_dir_ls<V>(mounts: &BTreeMap<String, V>, frame: &RpcFrame) -
return None
}
let shv_path = frame.shv_path().unwrap_or_default();
let children_on_path = children_on_path(mounts, shv_path).map(|children| {
let children = children_on_path(mounts, shv_path);
let children = children.map(|children| {
if frame.meta.get(DOT_LOCAL_HACK).is_some() {
let mut children = children;
children.insert(0, DOT_LOCAL_DIR.into());
Expand All @@ -114,13 +115,13 @@ pub fn process_local_dir_ls<V>(mounts: &BTreeMap<String, V>, frame: &RpcFrame) -
}
});
let mount_pair = find_longest_prefix(mounts, shv_path);
if mount_pair.is_none() && children_on_path.is_none() {
if mount_pair.is_none() && children.is_none() {
// path doesn't exist
return Some(Err(RpcError::new(RpcErrorCode::MethodNotFound, format!("Invalid shv path: {}", shv_path))))
}
let is_mount_point = mount_pair.is_some() && mount_pair.unwrap().1.is_empty();
let is_remote_dir = mount_pair.is_some() && children_on_path.is_none();
let is_tree_leaf = mount_pair.is_some() && children_on_path.is_some() && children_on_path.as_ref().unwrap().is_empty();
let is_remote_dir = mount_pair.is_some() && children.is_none();
let is_tree_leaf = mount_pair.is_some() && children.is_some() && children.as_ref().unwrap().is_empty();
//println!("shv path: {shv_path}, method: {method}, mount pair: {:?}", mount_pair);
//println!("is_mount_point: {is_mount_point}, is_tree_leaf: {is_tree_leaf}");
if method == METH_DIR && !is_mount_point && !is_remote_dir && !is_tree_leaf {
Expand All @@ -135,7 +136,7 @@ pub fn process_local_dir_ls<V>(mounts: &BTreeMap<String, V>, frame: &RpcFrame) -
if method == METH_LS && !is_tree_leaf && !is_remote_dir {
// ls on not-leaf node must be resolved locally
if let Ok(rpcmsg) = frame.to_rpcmesage() {
let ls = ls_children_to_result(children_on_path, rpcmsg.param().into());
let ls = ls_children_to_result(children, rpcmsg.param().into());
return Some(ls)
} else {
return Some(Err(RpcError::new(RpcErrorCode::InvalidRequest, "Cannot convert RPC frame to Rpc message")))
Expand Down
75 changes: 36 additions & 39 deletions tests/test_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn test_broker() -> shvrpc::Result<()> {
},
}
];
let cfg_fn = "/tmp/test-broker-config1.yaml";
let cfg_fn = "/tmp/test-broker-config3756.yaml";
fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?;
let mut process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker")
.arg("--config").arg(cfg_fn)
Expand Down Expand Up @@ -170,45 +170,42 @@ fn test_broker() -> shvrpc::Result<()> {
assert_eq!(shv_call_parent("test/child-broker/device/state/number", "get", "")?, RpcValue::from(123));
check_subscription("test/child-broker/device/state/number", "test/**", 3755)?;

//test_child_broker_as_client()?;
test_child_broker_as_client()?;

Ok(())
}

// fn test_child_broker_as_client() -> shvrpc::Result<()> {
// let mut config = BrokerConfig::default();
// config.listen.tcp = Some("localhost:3754".into());
// config.connections = vec![
// BrokerConnectionConfig {
// enabled: true,
// client: ClientConfig{
// url: "tcp://localhost:3755?user=test&password=test".to_string(),
// device_id: None,
// mount: None,
// heartbeat_interval: "1m".to_string(),
// reconnect_interval: None,
// },
// connection_kind: ConnectionKind::ToChildBroker {
// shv_root: "test/child-broker/device".to_string(),
// mount_point: "test/child-device".to_string(),
// },
// }
// ];
// let cfg_fn = "/tmp/test-broker-config2.yaml";
// fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?;
// let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker")
// .arg("--config").arg(cfg_fn)
// //.arg("-v").arg("Acc")
// .spawn()?);
// thread::sleep(Duration::from_millis(100));
// assert!(broker_process_guard.is_running());
//
// pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result<RpcValue> {
// shv_call(path, method, param, Some(3754))
// }
// //assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice"));
//
// thread::sleep(Duration::from_millis(1000 * 60 * 5));
//
// Ok(())
// }
fn test_child_broker_as_client() -> shvrpc::Result<()> {
let mut config = BrokerConfig::default();
config.listen.tcp = Some("localhost:3754".into());
config.connections = vec![
BrokerConnectionConfig {
enabled: true,
client: ClientConfig{
url: "tcp://localhost:3755?user=test&password=test".to_string(),
device_id: None,
mount: None,
heartbeat_interval: "1m".to_string(),
reconnect_interval: None,
},
connection_kind: ConnectionKind::ToChildBroker {
shv_root: "test/child-broker/device".to_string(),
mount_point: "test/child-device".to_string(),
},
}
];
let cfg_fn = "/tmp/test-broker-config3754.yaml";
fs::write(cfg_fn, &serde_yaml::to_string(&config)?)?;
let mut broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker")
.arg("--config").arg(cfg_fn)
//.arg("-v").arg("Acc")
.spawn()?);
thread::sleep(Duration::from_millis(100));
assert!(broker_process_guard.is_running());

pub fn shv_call_3754(path: &str, method: &str, param: &str) -> shvrpc::Result<RpcValue> {
shv_call(path, method, param, Some(3754))
}
assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice"));
Ok(())
}

0 comments on commit 274a890

Please sign in to comment.