Skip to content

Commit

Permalink
Broker implementation moved to crate to enable unit testing of shvbroker
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Jan 1, 2024
1 parent e0b1c45 commit 6034ded
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 138 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ edition = "2021"
simple_logger = { git = "https://github.com/fvacek/rust-simple_logger.git", branch = "main", features = ["stderr"] }

futures = "0.3.29"
async-std = "1.12.0"
async-std = { version = "1.12.0", features = ["attributes"] }
log = "0.4.20"
bytes = "1.5.0"
sha1 = "0.10.6"
Expand Down
54 changes: 4 additions & 50 deletions src/bin/shvbroker/main.rs → src/bin/shvbroker.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,12 @@
use std::{fs};
use std::path::Path;
use futures::{StreamExt};
use async_std::{channel, net::{TcpListener}, prelude::*, task};
use async_std::{task};
use log::*;
use simple_logger::SimpleLogger;
use shv::util::{join_path, parse_log_verbosity};
use crate::config::{AccessControl, BrokerConfig};
use shv::broker::config::{AccessControl, BrokerConfig};
use clap::{Parser};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
type Sender<T> = async_std::channel::Sender<T>;
type Receiver<T> = async_std::channel::Receiver<T>;

mod config;
mod node;
mod peer;
mod broker;

#[derive(Parser, Debug)]
struct CliOpts {
/// Config file path
Expand All @@ -30,7 +20,7 @@ struct CliOpts {
verbose: Option<String>,
}

pub(crate) fn main() -> Result<()> {
pub(crate) fn main() -> shv::Result<()> {
let cli_opts = CliOpts::parse();

let mut logger = SimpleLogger::new();
Expand Down Expand Up @@ -84,43 +74,7 @@ pub(crate) fn main() -> Result<()> {
info!("Creating access file {access_file}");
fs::write(access_file, serde_yaml::to_string(&access)?)?;
}
task::block_on(accept_loop(config, access))
task::block_on(shv::broker::accept_loop(config, access))
}

async fn accept_loop(config: BrokerConfig, access: AccessControl) -> Result<()> {
if let Some(address) = config.listen.tcp.clone() {
let (broker_sender, broker_receiver) = channel::unbounded();
let parent_broker_peer_config = config.parent_broker.clone();
let broker_task = task::spawn(broker::broker_loop(broker_receiver, access));
if parent_broker_peer_config.enabled {
spawn_and_log_error(peer::parent_broker_peer_loop(1, parent_broker_peer_config, broker_sender.clone()));
}
info!("Listening on TCP: {}", address);
let listener = TcpListener::bind(address).await?;
info!("bind OK");
let mut client_id = 2; // parent broker has client_id == 1
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
debug!("Accepting from: {}", stream.peer_addr()?);
spawn_and_log_error(peer::peer_loop(client_id, broker_sender.clone(), stream));
client_id += 1;
}
drop(broker_sender);
broker_task.await;
} else {
return Err("No port to listen on specified".into());
}
Ok(())
}

fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>
where
F: Future<Output = Result<()>> + Send + 'static,
{
task::spawn(async move {
if let Err(e) = fut.await {
eprintln!("{}", e)
}
})
}
6 changes: 3 additions & 3 deletions src/bin/shvbroker/config.rs → src/broker/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fs;
use std::path::Path;
use log::info;
use serde::{Serialize, Deserialize};
use shv::client::ClientConfig;
use crate::client::ClientConfig;

#[derive(Serialize, Deserialize, Debug)]
pub struct BrokerConfig {
Expand Down Expand Up @@ -68,13 +68,13 @@ pub struct Mount {
pub description: String,
}
impl AccessControl {
pub fn from_file(file_name: &str) -> shv::Result<Self> {
pub fn from_file(file_name: &str) -> crate::Result<Self> {
let content = fs::read_to_string(file_name)?;
Ok(serde_yaml::from_str(&content)?)
}
}
impl BrokerConfig {
pub fn from_file(file_name: &str) -> shv::Result<Self> {
pub fn from_file(file_name: &str) -> crate::Result<Self> {
let content = fs::read_to_string(file_name)?;
Ok(serde_yaml::from_str(&content)?)
}
Expand Down
120 changes: 107 additions & 13 deletions src/bin/shvbroker/broker.rs → src/broker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,25 @@
use async_std::{channel, task};
use async_std::net::TcpListener;
use crate::broker::config::{AccessControl, BrokerConfig, Password};
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry;
use glob::Pattern;
use log::{error, info, Level, log, warn};
use shv::{List, RpcMessage, RpcMessageMetaTags, RpcValue, rpcvalue, util};
use shv::rpc::Subscription;
use shv::rpcframe::RpcFrame;
use shv::rpcmessage::{CliId, RpcError, RpcErrorCode};
use shv::shvnode::{find_longest_prefix, process_local_dir_ls, RequestCommand, ShvNode, SIG_CHNG};
use shv::util::sha1_hash;
use crate::config::{AccessControl, Password};
use crate::{node, Receiver, Sender};
use crate::node::BrokerCommand;
use log::{debug, error, info, Level, log, warn};
use crate::{List, RpcMessage, RpcMessageMetaTags, RpcValue, rpcvalue, util};
use crate::rpc::Subscription;
use crate::rpcframe::RpcFrame;
use crate::rpcmessage::{CliId, RpcError, RpcErrorCode};
use crate::shvnode::{find_longest_prefix, process_local_dir_ls, RequestCommand, ShvNode, SIG_CHNG};
use crate::util::sha1_hash;
use crate::broker::node::BrokerCommand;
use async_std::stream::StreamExt;

pub mod config;
pub mod peer;
pub mod node;

type Sender<T> = async_std::channel::Sender<T>;
type Receiver<T> = async_std::channel::Receiver<T>;

pub(crate) enum LoginResult {
Ok,
Expand Down Expand Up @@ -87,7 +96,7 @@ struct ParsedAccessRule {
grant: String,
}
impl ParsedAccessRule {
pub fn new(path: &str, method: &str, grant: &str) -> shv::Result<Self> {
pub fn new(path: &str, method: &str, grant: &str) -> crate::Result<Self> {
let method = if method.is_empty() { "?*" } else { method };
let path = if path.is_empty() { "**" } else { path };
match Pattern::new(method) {
Expand All @@ -108,7 +117,7 @@ impl ParsedAccessRule {
}
struct Broker {
peers: HashMap<CliId, Peer>,
mounts: BTreeMap<String, Mount<crate::node::BrokerCommand>>,
mounts: BTreeMap<String, Mount<crate::broker::node::BrokerCommand>>,
access: AccessControl,
role_access: HashMap<String, Vec<ParsedAccessRule>>,
}
Expand Down Expand Up @@ -331,7 +340,7 @@ pub(crate) async fn broker_loop(events: Receiver<ClientEvent>, access: AccessCon

let mut broker = Broker::new(access);

broker.mounts.insert(".app".into(), Mount::Node(Box::new(shv::shvnode::AppNode { app_name: "shvbroker", ..Default::default() })));
broker.mounts.insert(".app".into(), Mount::Node(Box::new(crate::shvnode::AppNode { app_name: "shvbroker", ..Default::default() })));
broker.mounts.insert(".app/broker".into(), Mount::Node(Box::new(node::AppBrokerNode { })));
broker.mounts.insert(".app/broker/currentClient".into(), Mount::Node(Box::new(node::AppBrokerCurrentClientNode { })));
loop {
Expand Down Expand Up @@ -522,3 +531,88 @@ pub(crate) async fn broker_loop(events: Receiver<ClientEvent>, access: AccessCon
}
}
}
pub async fn accept_loop(config: BrokerConfig, access: AccessControl) -> crate::Result<()> {
if let Some(address) = config.listen.tcp.clone() {
let (broker_sender, broker_receiver) = channel::unbounded();
let parent_broker_peer_config = config.parent_broker.clone();
let broker_task = task::spawn(crate::broker::broker_loop(broker_receiver, access));
if parent_broker_peer_config.enabled {
crate::spawn_and_log_error(peer::parent_broker_peer_loop(1, parent_broker_peer_config, broker_sender.clone()));
}
info!("Listening on TCP: {}", address);
let listener = TcpListener::bind(address).await?;
info!("bind OK");
let mut client_id = 2; // parent broker has client_id == 1
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
debug!("Accepting from: {}", stream.peer_addr()?);
crate::spawn_and_log_error(peer::peer_loop(client_id, broker_sender.clone(), stream));
client_id += 1;
}
drop(broker_sender);
broker_task.await;
} else {
return Err("No port to listen on specified".into());
}
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;

#[async_std::test]
async fn test_broker_loop() {
let config = BrokerConfig::default();
let access = config.access.clone();
let (broker_writer, broker_reader) = channel::unbounded();
//let parent_broker_peer_config = config.parent_broker.clone();
let broker_task = task::spawn(crate::broker::broker_loop(broker_reader, access));

let (peer_writer, peer_reader) = channel::unbounded::<PeerEvent>();
let client_id = 2;

struct CallCtx<'a> {
writer: &'a Sender<ClientEvent>,
reader: &'a Receiver<PeerEvent>,
client_id: CliId,
}
async fn call(path: &str, method: &str, param: Option<RpcValue>, ctx: &CallCtx<'_>) -> RpcValue {
let msg = RpcMessage::new_request(path, method, param);
let frame = RpcFrame::from_rpcmessage(msg).expect("valid message");
ctx.writer.send(ClientEvent::Frame { client_id: ctx.client_id, frame }).await.unwrap();
if let PeerEvent::Message(msg) = ctx.reader.recv().await.unwrap() {
msg.result().unwrap().clone()
} else {
panic!("Response expected");
}
}
let call_ctx = CallCtx{
writer: &broker_writer,
reader: &peer_reader,
client_id,
};

// login
broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer, peer_kind: PeerKind::Client }).await.unwrap();
broker_writer.send(ClientEvent::GetPassword { client_id, user: "admin".into() }).await.unwrap();
peer_reader.recv().await.unwrap();
let register_device = ClientEvent::RegisterDevice { client_id, device_id: Some("test-device".into()), mount_point: Default::default() };
broker_writer.send(register_device).await.unwrap();

// device should be mounted as 'shv/dev/test'
let resp = call("shv/dev", "ls", Some("test".into()), &call_ctx).await;
assert_eq!(resp, RpcValue::from(true));

// test current client info
let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await;
let m = resp.as_map();
assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2));
assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("shv/dev/test"));
assert_eq!(m.get("userName").unwrap(), &RpcValue::from("admin"));
assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(List::new()));

broker_task.cancel().await;
}
}
14 changes: 7 additions & 7 deletions src/bin/shvbroker/node.rs → src/broker/node.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use shv::metamethod::{Access, Flag, MetaMethod};
use shv::{RpcMessage, RpcMessageMetaTags};
use shv::rpc::Subscription;
use shv::rpcmessage::{CliId, RpcError};
use shv::shvnode::{RequestCommand, ShvNode};
use crate::metamethod::{Access, Flag, MetaMethod};
use crate::{RpcMessage, RpcMessageMetaTags};
use crate::rpc::Subscription;
use crate::rpcmessage::{CliId, RpcError};
use crate::shvnode::{RequestCommand, ShvNode};

const METH_CLIENT_INFO: &str = "clientInfo";
const METH_MOUNTED_CLIENT_INFO: &str = "mountedClientInfo";
Expand Down Expand Up @@ -86,7 +86,7 @@ impl ShvNode<BrokerCommand> for AppBrokerCurrentClientNode {
RequestCommand::<BrokerCommand>::Custom(BrokerCommand::Subscribe(subscription))
}
Err(err) => {
RequestCommand::Error(RpcError{ code: shv::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() })
RequestCommand::Error(RpcError{ code: crate::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() })
}
}
}
Expand All @@ -96,7 +96,7 @@ impl ShvNode<BrokerCommand> for AppBrokerCurrentClientNode {
RequestCommand::<BrokerCommand>::Custom(BrokerCommand::Unsubscribe(subscription))
}
Err(err) => {
RequestCommand::Error(RpcError{ code: shv::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() })
RequestCommand::Error(RpcError{ code: crate::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() })
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions src/bin/shvbroker/peer.rs → src/broker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,26 @@ use futures::FutureExt;
use log::{debug, error, info, warn};
use rand::distributions::{Alphanumeric, DistString};
use url::Url;
use shv::{client, RpcMessage, RpcMessageMetaTags, RpcValue};
use shv::client::LoginParams;
use shv::rpcframe::RpcFrame;
use shv::shvnode::METH_PING;
use shv::util::{join_path, login_from_url, sha1_hash};
use crate::{client, RpcMessage, RpcMessageMetaTags, RpcValue};
use crate::client::LoginParams;
use crate::rpcframe::RpcFrame;
use crate::shvnode::METH_PING;
use crate::util::{join_path, login_from_url, sha1_hash};
use crate::broker::{ClientEvent, LoginResult, PeerEvent, PeerKind};
use crate::config::ParentBrokerConfig;
use crate::Sender;
use crate::broker::config::ParentBrokerConfig;
use crate::broker::Sender;

pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>, stream: TcpStream) -> crate::Result<()> {
let (socket_reader, mut writer) = (&stream, &stream);
let (peer_writer, peer_receiver) = channel::unbounded::<PeerEvent>();
let (peer_writer, peer_reader) = channel::unbounded::<PeerEvent>();

broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer, peer_kind: PeerKind::Client }).await.unwrap();

//let stream_wr = stream.clone();
let mut brd = BufReader::new(socket_reader);

let mut frame_reader = shv::connection::FrameReader::new(&mut brd);
let mut frame_writer = shv::connection::FrameWriter::new(&mut writer);
let mut frame_reader = crate::connection::FrameReader::new(&mut brd);
let mut frame_writer = crate::connection::FrameWriter::new(&mut writer);

let mut device_options = RpcValue::null();
let login_result = loop {
Expand All @@ -41,7 +41,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>
continue;
}
let nonce = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
let mut result = shv::Map::new();
let mut result = crate::Map::new();
result.insert("nonce".into(), RpcValue::from(&nonce));
frame_writer.send_result(resp_meta, result.into()).await?;
nonce
Expand All @@ -64,7 +64,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>
let login_type = login.get("type").map(|v| v.as_str()).unwrap_or("");

broker_writer.send(ClientEvent::GetPassword { client_id, user: user.to_string() }).await.unwrap();
match peer_receiver.recv().await? {
match peer_reader.recv().await? {
PeerEvent::PasswordSha1(broker_shapass) => {
let chkpwd = || {
match broker_shapass {
Expand All @@ -86,7 +86,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>
}
};
if chkpwd() {
let mut result = shv::Map::new();
let mut result = crate::Map::new();
result.insert("clientId".into(), RpcValue::from(client_id));
frame_writer.send_result(resp_meta, result.into()).await?;
if let Some(options) = params.get("options") {
Expand Down Expand Up @@ -133,7 +133,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>
break;
}
},
event = peer_receiver.recv().fuse() => match event {
event = peer_reader.recv().fuse() => match event {
Err(e) => {
debug!("Peer channel closed: {}", &e);
break;
Expand Down Expand Up @@ -165,7 +165,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<ClientEvent>
info!("Client loop exit, client id: {}", client_id);
Ok(())
}
pub async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, broker_writer: Sender<ClientEvent>) -> shv::Result<()> {
pub(crate) async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, broker_writer: Sender<ClientEvent>) -> crate::Result<()> {
info!("Connecting to parent broker: {}", &config.client.url);
let url = Url::parse(&config.client.url)?;
let (scheme, host, port) = (url.scheme(), url.host_str().unwrap_or_default(), url.port().unwrap_or(3755));
Expand All @@ -179,8 +179,8 @@ pub async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig,
let (reader, mut writer) = (&stream, &stream);

let mut brd = BufReader::new(reader);
let mut frame_reader = shv::connection::FrameReader::new(&mut brd);
let mut frame_writer = shv::connection::FrameWriter::new(&mut writer);
let mut frame_reader = crate::connection::FrameReader::new(&mut brd);
let mut frame_writer = crate::connection::FrameWriter::new(&mut writer);

// login
let (user, password) = login_from_url(&url);
Expand Down
Loading

0 comments on commit 6034ded

Please sign in to comment.