Skip to content

Commit

Permalink
More subscribe testing with check_subscription_along_property_path()
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 21, 2024
1 parent 274a890 commit 0c0f602
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 30 deletions.
8 changes: 5 additions & 3 deletions src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,9 +405,11 @@ fn fix_request_frame_shv_root(mut frame: RpcFrame, connection_kind: &ConnectionK
};
// println!("current path: {shv_path}");
let shv_path = if starts_with_path(&shv_path, ".broker") {
if frame.method() == Some(METH_SUBSCRIBE) || frame.method() == Some(METH_UNSUBSCRIBE) {
// prepend exported root to subscribed path
frame = fix_subscribe_param(frame, shv_root)?;
if let ConnectionKind::ToParentBroker { .. } = connection_kind {
if frame.method() == Some(METH_SUBSCRIBE) || frame.method() == Some(METH_UNSUBSCRIBE) {
// prepend exported root to subscribed path
frame = fix_subscribe_param(frame, shv_root)?;
}
}
shv_path
} else if is_dot_local_request(&frame) {
Expand Down
65 changes: 38 additions & 27 deletions tests/test_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,45 +136,55 @@ fn test_broker() -> shvrpc::Result<()> {
println!("---broker---: .broker:mounts()");
assert_eq!(shv_call_child(".broker", "mounts", "")?, vec![RpcValue::from("test/device")].into());
println!("====== subscriptions =====");
fn check_subscription(property_path: &str, subscribe_path: &str, port: i32) -> shvrpc::Result<()> {
//let info = shv_call_child(".broker/currentClient", "info", "")?;
//println!("INFO: {info}");
let calls: Vec<String> = vec![
format!(r#".broker/currentClient:subscribe ["{subscribe_path}:*:chng"]"#),
format!(r#"{property_path}:set 42"#),
format!(r#".broker/currentClient:unsubscribe ["{subscribe_path}:*:chng"]"#),
format!(r#"{property_path}:set 123"#),
];
println!("shv_call_many property: {property_path}");
for c in calls.iter() { println!("\t{}", c); }
let values = shv_call_many(calls, Some(port))?;
println!("shv_call_many result:");
for v in values.iter() { println!("\t{}", v); }
let expected: Vec<String> = vec![
"RES true".into(), // response to subscribe
format!("SIG {property_path}:chng 42"), // SIG chng
"RES null".into(), // response to SET
"RES true".into(), // response to unsubscribe
"RES null".into(), // response to SET
];
for (no, val) in values.iter().enumerate() {
assert_eq!(&expected[no], val);
}
Ok(())
}
check_subscription("test/device/state/number", "test/**", 3756)?;

println!("====== child broker =====");
assert_eq!(shv_call_parent("test", "ls", r#""child-broker""#)?, RpcValue::from(true));
assert_eq!(shv_call_parent("test/child-broker/device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice"));
assert_eq!(shv_call_parent("test/child-broker/device/state/number", "get", "")?, RpcValue::from(123));
check_subscription("test/child-broker/device/state/number", "test/**", 3755)?;
check_subscription("test/child-broker/device/state/number", "test/child-broker/**", 3755)?;
check_subscription("test/child-broker/device/state/number", "test/child-broker/device/**", 3755)?;
check_subscription("test/child-broker/device/state/number", "test/child-broker/device/state/**", 3755)?;

test_child_broker_as_client()?;

Ok(())
}

fn check_subscription(property_path: &str, subscribe_path: &str, port: i32) -> shvrpc::Result<()> {
//let info = shv_call_child(".broker/currentClient", "info", "")?;
//println!("INFO: {info}");
let calls: Vec<String> = vec![
format!(r#".broker/currentClient:subscribe ["{subscribe_path}:*:chng"]"#),
format!(r#"{property_path}:set 42"#),
format!(r#".broker/currentClient:unsubscribe ["{subscribe_path}:*:chng"]"#),
format!(r#"{property_path}:set 123"#),
];
println!("shv_call_many property: {property_path}");
for c in calls.iter() { println!("\t{}", c); }
let values = shv_call_many(calls, Some(port))?;
println!("shv_call_many result:");
for v in values.iter() { println!("\t{}", v); }
let expected: Vec<String> = vec![
"RES true".into(), // response to subscribe
format!("SIG {property_path}:chng 42"), // SIG chng
"RES null".into(), // response to SET
"RES true".into(), // response to unsubscribe
"RES null".into(), // response to SET
];
for (no, val) in values.iter().enumerate() {
assert_eq!(&expected[no], val);
}
Ok(())
}
fn check_subscription_along_property_path(property_path: &str, port: i32) -> shvrpc::Result<()> {
let dirs = property_path.split('/').collect::<Vec<_>>();
for i in 1 .. dirs.len() - 1 {
let subscribe_path = dirs[.. i].join("/") + "/**";
check_subscription(property_path, &subscribe_path, port)?
}
Ok(())
}
fn test_child_broker_as_client() -> shvrpc::Result<()> {
let mut config = BrokerConfig::default();
config.listen.tcp = Some("localhost:3754".into());
Expand Down Expand Up @@ -207,5 +217,6 @@ fn test_child_broker_as_client() -> shvrpc::Result<()> {
shv_call(path, method, param, Some(3754))
}
assert_eq!(shv_call_3754("test/child-device/.app", "name", "")?, RpcValue::from("shvbrokertestingdevice"));
//check_subscription_along_property_path("test/child-device/state/number", 3754)?;
Ok(())
}

0 comments on commit 0c0f602

Please sign in to comment.