Skip to content

Commit

Permalink
Connectivity to parent broker implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Jan 1, 2024
1 parent a9f9cca commit d3229a5
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 29 deletions.
6 changes: 5 additions & 1 deletion src/broker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl Default for BrokerConfig {
users: HashMap::from([
("admin".to_string(), User { password: Password::Plain("admin".into()), roles: vec!["su".to_string()] }),
("user".to_string(), User { password: Password::Plain("user".into()), roles: vec!["client".to_string()] }),
("child-broker".to_string(), User { password: Password::Plain("child-broker".into()), roles: vec!["child-broker".to_string()] }),
("tester".to_string(), User { password: Password::Sha1("ab4d8d2a5f480a137067da17100271cd176607a1".into()), roles: vec!["tester".to_string()] }),
]),
roles: HashMap::from([
Expand All @@ -126,6 +127,8 @@ impl Default for BrokerConfig {
].into(),
}),
("client".to_string(), Role { roles: vec!["ping".to_string(), "subscribe".to_string(), "browse".to_string()], access: vec![] }),
("device".to_string(), Role { roles: vec!["client".to_string()], access: vec![] }),
("child-broker".to_string(), Role { roles: vec!["device".to_string()], access: vec![] }),
("tester".to_string(), Role {
roles: vec!["client".to_string()].into(),
access: vec![
Expand Down Expand Up @@ -153,7 +156,8 @@ impl Default for BrokerConfig {
}),
]),
mounts: HashMap::from([
("test-device".into(), Mount{ mount_point: "shv/dev/test".to_string(), description: "Testing device mount-point".to_string() })
("test-device".into(), Mount{ mount_point: "shv/test/device".to_string(), description: "Testing device mount-point".to_string() }),
("test-child-broker".into(), Mount{ mount_point: "shv/test/child-broker".to_string(), description: "Testing child broker mount-point".to_string() }),
]),
},
}
Expand Down
6 changes: 3 additions & 3 deletions src/broker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ pub async fn accept_loop(config: BrokerConfig, access: AccessControl) -> crate::
let parent_broker_peer_config = config.parent_broker.clone();
let broker_task = task::spawn(crate::broker::broker_loop(broker_receiver, access));
if parent_broker_peer_config.enabled {
crate::spawn_and_log_error(peer::parent_broker_peer_loop(1, parent_broker_peer_config, broker_sender.clone()));
crate::spawn_and_log_error(peer::parent_broker_peer_loop_with_reconnect(1, parent_broker_peer_config, broker_sender.clone()));
}
info!("Listening on TCP: {}", address);
let listener = TcpListener::bind(address).await?;
Expand Down Expand Up @@ -602,14 +602,14 @@ mod tests {
broker_writer.send(register_device).await.unwrap();

// device should be mounted as 'shv/dev/test'
let resp = call("shv/dev", "ls", Some("test".into()), &call_ctx).await;
let resp = call("shv/test", "ls", Some("device".into()), &call_ctx).await;
assert_eq!(resp, RpcValue::from(true));

// test current client info
let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await;
let m = resp.as_map();
assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2));
assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("shv/dev/test"));
assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("shv/test/device"));
assert_eq!(m.get("userName").unwrap(), &RpcValue::from("admin"));
assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(List::new()));

Expand Down
37 changes: 33 additions & 4 deletions src/broker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use async_std::io::BufReader;
use async_std::net::TcpStream;
use futures::select;
use futures::FutureExt;
use log::{debug, error, info, warn};
use log::{debug, error, info};
use rand::distributions::{Alphanumeric, DistString};
use url::Url;
use crate::{client, RpcMessage, RpcMessageMetaTags, RpcValue};
Expand Down Expand Up @@ -165,16 +165,45 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>
info!("Client loop exit, client id: {}", client_id);
Ok(())
}
pub(crate) async fn parent_broker_peer_loop_with_reconnect(client_id: i32, config: ParentBrokerConfig, broker_writer: Sender<ClientEvent>) -> crate::Result<()> {
let url = Url::parse(&config.client.url)?;
if url.scheme() != "tcp" {
return Err(format!("Scheme {} is not supported yet.", url.scheme()).into());
}
let reconnect_interval: std::time::Duration = loop {
if let Some(time_str) = &config.client.reconnect_interval {
if let Ok(interval) = duration_str::parse(time_str) {
break interval
}
}
const DEFAULT_RECONNECT_INTERVAL_SEC: u64 = 10;
info!("Parent broker connection reconnect interval is not set explicitly, default value {DEFAULT_RECONNECT_INTERVAL_SEC} will be used.");
break std::time::Duration::from_secs(DEFAULT_RECONNECT_INTERVAL_SEC)
};
info!("Reconnect interval set to: {:?}", reconnect_interval);
loop {
match parent_broker_peer_loop(client_id, config.clone(), broker_writer.clone()).await {
Ok(_) => {
info!("Parent broker peer loop finished without error");
}
Err(err) => {
error!("Parent broker peer loop finished with error: {err}");
}
}
info!("Reconnecting to parent broker after: {:?}", reconnect_interval);
async_std::task::sleep(reconnect_interval).await;
}
}

pub(crate) async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, broker_writer: Sender<ClientEvent>) -> crate::Result<()> {
info!("Connecting to parent broker: {}", &config.client.url);
let url = Url::parse(&config.client.url)?;
let (scheme, host, port) = (url.scheme(), url.host_str().unwrap_or_default(), url.port().unwrap_or(3755));
if scheme != "tcp" {
return Err(format!("Scheme {scheme} is not supported yet.").into());
}
let address = format!("{host}:{port}");
// Establish a connection
info!("Connecting to parent broker: {address}");
info!("Connecting to parent broker: tcp://{address}");
let stream = TcpStream::connect(&address).await?;
let (reader, mut writer) = (&stream, &stream);

Expand All @@ -195,7 +224,7 @@ pub(crate) async fn parent_broker_peer_loop(client_id: i32, config: ParentBroker
};

info!("Parent broker connected OK");
warn!("Heartbeat interval set to: {:?}", &heartbeat_interval);
info!("Heartbeat interval set to: {:?}", &heartbeat_interval);
client::login(&mut frame_reader, &mut frame_writer, &login_params).await?;

let (peer_writer, peer_receiver) = channel::unbounded::<PeerEvent>();
Expand Down
88 changes: 88 additions & 0 deletions tests/child-broker-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
listen:
tcp: localhost:3756
ssl: null
editable_access: false
data_directory: null
parent_broker:
enabled: true
client:
url: tcp://child-broker@localhost:3755?password=child-broker
device_id: "test-child-broker"
mount: null
heartbeat_interval: 1m
reconnect_interval: null
exported_root: 'test'
access:
users:
tester:
password: !Sha1 ab4d8d2a5f480a137067da17100271cd176607a1
roles:
- tester
admin:
password: !Plain admin
roles:
- su
user:
password: !Plain user
roles:
- client
child-broker:
password: !Plain child-broker
roles:
- child-broker
roles:
tester:
roles:
- client
access:
- paths: test/**
methods: ''
grant: cfg
browse:
roles: []
access:
- paths: '**'
methods: ''
grant: bws
subscribe:
roles: []
access:
- paths: .app/broker/currentClient
methods: subscribe
grant: wr
- paths: .app/broker/currentClient
methods: unsubscribe
grant: wr
device:
roles:
- client
access: []
client:
roles:
- ping
- subscribe
- browse
access: []
ping:
roles: []
access:
- paths: .app
methods: ping
grant: wr
su:
roles: []
access:
- paths: '**'
methods: ''
grant: su
child-broker:
roles:
- device
access: []
mounts:
test-child-broker:
mountPoint: shv/test/child-broker
description: Testing child broker mount-point
test-device:
mountPoint: shv/test/device
description: Testing device mount-point
10 changes: 6 additions & 4 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,11 @@ impl ShvCallOutputFormat {
}
}
}
pub fn shv_call(path: &str, method: &str, param: &str) -> shv::Result<RpcValue> {
pub fn shv_call(path: &str, method: &str, param: &str, port: Option<i32>) -> shv::Result<RpcValue> {
let port = port.unwrap_or(3755);
let output = Command::new("target/debug/shvcall")
.arg("-v").arg(".:T")
.arg("--url").arg("tcp://admin:admin@localhost")
.arg("--url").arg(format!("tcp://admin@localhost:{port}?password=admin"))
.arg("--path").arg(path)
.arg("--method").arg(method)
.arg("--param").arg(param)
Expand All @@ -104,11 +105,12 @@ pub fn shv_call(path: &str, method: &str, param: &str) -> shv::Result<RpcValue>

result_from_output(output)
}
pub fn shv_call_many(calls: Vec<String>, output_format: ShvCallOutputFormat) -> shv::Result<Vec<String>> {
pub fn shv_call_many(calls: Vec<String>, output_format: ShvCallOutputFormat, port: Option<i32>) -> shv::Result<Vec<String>> {
let port = port.unwrap_or(3755);
let mut cmd = Command::new("target/debug/shvcall");
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.arg("--url").arg("tcp://admin:admin@localhost")
.arg("--url").arg(format!("tcp://admin@localhost:{port}?password=admin"))
.arg("--output-format").arg(output_format.as_str())
.arg("-v").arg(".:I");
let mut child = cmd.spawn()?;
Expand Down
53 changes: 38 additions & 15 deletions tests/test_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,25 @@ fn test_broker() -> shv::Result<()> {
thread::sleep(Duration::from_millis(100));
assert!(broker_process_guard.is_running());

let mut child_broker_process_guard = KillProcessGuard::new(Command::new("target/debug/shvbroker")
.arg("--config").arg("tests/child-broker-config.yaml")
//.arg("-v").arg("Acc")
.spawn()?);
thread::sleep(Duration::from_millis(100));
assert!(child_broker_process_guard.is_running());

pub fn shv_call_parent(path: &str, method: &str, param: &str) -> shv::Result<RpcValue> {
shv_call(path, method, param, None)
}
pub fn shv_call_child(path: &str, method: &str, param: &str) -> shv::Result<RpcValue> {
shv_call(path, method, param, Some(3756))
}

println!("====== broker =====");
println!("---broker---: :ls(\".app\")");
assert_eq!(shv_call("", "ls", r#"".app""#)?, RpcValue::from(true));
assert_eq!(shv_call(".app", "ls", r#""broker""#)?, RpcValue::from(true));
assert_eq!(shv_call(".app/broker", "ls", r#""client""#)?, RpcValue::from(true));
assert_eq!(shv_call_child("", "ls", r#"".app""#)?, RpcValue::from(true));
assert_eq!(shv_call_child(".app", "ls", r#""broker""#)?, RpcValue::from(true));
assert_eq!(shv_call_child(".app/broker", "ls", r#""client""#)?, RpcValue::from(true));
{
println!("---broker---: .app:dir()");
let expected_methods = vec![
Expand All @@ -32,7 +46,7 @@ fn test_broker() -> shv::Result<()> {
MetaMethod { name: METH_PING.into(), ..Default::default() },
];
{
let methods = shv_call(".app", "dir", "")?;
let methods = shv_call_child(".app", "dir", "")?;
let methods = methods.as_list();
'outer: for xmm in expected_methods.iter() {
for mm in methods.iter() {
Expand All @@ -47,7 +61,7 @@ fn test_broker() -> shv::Result<()> {
}
println!("---broker---: .app:dir(true)");
{
let methods = shv_call(".app", "dir", "true")?;
let methods = shv_call_child(".app", "dir", "true")?;
let methods = methods.as_list();
'outer: for xmm in expected_methods.iter() {
for mm in methods.iter() {
Expand All @@ -62,39 +76,43 @@ fn test_broker() -> shv::Result<()> {
}
println!("---broker---: .app:dir(\"ping\")");
{
let method = shv_call(".app", "dir", r#""ping""#)?;
let method = shv_call_child(".app", "dir", r#""ping""#)?;
assert!(method.is_imap());
let name = method.as_imap().get(&metamethod::DirAttribute::Name.into()).ok_or("Name attribute doesn't exist")?.as_str();
assert_eq!(name, "ping");
}
}
println!("---broker---: .app:ping()");
assert_eq!(shv_call(".app", "ping", "")?, RpcValue::null());
assert_eq!(shv_call_child(".app", "ping", "")?, RpcValue::null());

println!("====== device =====");
let mut device_process_guard = common::KillProcessGuard {
child: Command::new("target/debug/examples/device")
.arg("--url").arg("tcp://admin:admin@localhost")
.arg("--url").arg("tcp://admin:admin@localhost:3756")
.arg("--mount").arg("test/device")
.spawn()?
};
thread::sleep(Duration::from_millis(100));
assert!(device_process_guard.is_running());

println!("---broker---: test:ls()");
assert_eq!(shv_call("test", "ls", "")?, vec![RpcValue::from("device")].into());
assert_eq!(shv_call_child("test", "ls", "")?, vec![RpcValue::from("device")].into());
assert_eq!(shv_call_parent("shv/test/child-broker", "ls", "")?, vec![RpcValue::from("device")].into());
println!("---broker---: test/device:ls()");
assert_eq!(shv_call("test/device", "ls", "")?, vec![RpcValue::from(".app"), RpcValue::from("number")].into());
assert_eq!(shv_call_child("test/device", "ls", "")?, vec![RpcValue::from(".app"), RpcValue::from("number")].into());
assert_eq!(shv_call_parent("shv/test/child-broker/device", "ls", "")?, vec![RpcValue::from(".app"), RpcValue::from("number")].into());
println!("---broker---: test/device/.app:ping()");
assert_eq!(shv_call("test/device/.app", "ping", "")?, RpcValue::null());
assert_eq!(shv_call_child("test/device/.app", "ping", "")?, RpcValue::null());
assert_eq!(shv_call_parent("shv/test/child-broker/device/.app", "ping", "")?, RpcValue::null());
println!("---broker---: test/device/number:ls()");
assert_eq!(shv_call("test/device/number", "ls", "")?, rpcvalue::List::new().into());
assert_eq!(shv_call_child("test/device/number", "ls", "")?, rpcvalue::List::new().into());
assert_eq!(shv_call_parent("shv/test/child-broker/device/number", "ls", "")?, rpcvalue::List::new().into());

println!("---broker---: .app/broker:clients()");
assert_eq!(shv_call(".app/broker", "clients", "")?.as_list().len(), 2); // [device-id, shvcall-id]
assert!(shv_call_child(".app/broker", "clients", "")?.as_list().len() > 0);

println!("---broker---: .app/broker:mounts()");
assert_eq!(shv_call(".app/broker", "mounts", "")?, vec![RpcValue::from("test/device")].into());
assert_eq!(shv_call_child(".app/broker", "mounts", "")?, vec![RpcValue::from("test/device")].into());
{
println!("====== subscriptions =====");
let calls: Vec<String> = vec![
Expand All @@ -103,7 +121,7 @@ fn test_broker() -> shv::Result<()> {
r#".app/broker/currentClient:unsubscribe {"methods": "chng", "paths": "test/**"}"#.into(),
r#"test/device/number:set 123"#.into(),
];
let values = shv_call_many(calls, ShvCallOutputFormat::Simple)?;
let values = shv_call_many(calls, ShvCallOutputFormat::Simple, Some(3756))?;
for v in values.iter() {
println!("\t{}", v);
}
Expand All @@ -118,5 +136,10 @@ fn test_broker() -> shv::Result<()> {
assert_eq!(expected[no], val);
}
}

println!("====== child broker =====");
assert_eq!(shv_call_parent("shv/test", "ls", r#""child-broker""#)?, RpcValue::from(true));
assert_eq!(shv_call_parent("shv/test/child-broker/device/.app", "name", "")?, RpcValue::from("device"));

Ok(())
}
4 changes: 2 additions & 2 deletions tests/test_shvcall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ fn test_call_ping_stdin() -> shv::Result<()> {
assert!(broker_process_guard.is_running());

println!("---shvcall---: .app:ping()");
assert_eq!(shv_call(".app", "ping", "")?, RpcValue::null());
assert_eq!(shv_call(".app", "ping", "", None)?, RpcValue::null());

println!("---shvcall---: .app:name()");
let calls: Vec<String> = vec![
".app:ping".into(),
".app:name".into(),
];
let values = shv_call_many(calls, ShvCallOutputFormat::Value)?;
let values = shv_call_many(calls, ShvCallOutputFormat::Value, None)?;
let expected = vec!["null", r#""shvbroker""#];
for (no, val) in values.iter().enumerate() {
assert_eq!(&expected[no], val);
Expand Down

0 comments on commit d3229a5

Please sign in to comment.