diff --git a/Cargo.toml b/Cargo.toml index fd0a636..eea8064 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ edition = "2021" simple_logger = { git = "https://github.com/fvacek/rust-simple_logger.git", branch = "main", features = ["stderr"] } futures = "0.3.29" -async-std = "1.12.0" +async-std = { version = "1.12.0", features = ["attributes"] } log = "0.4.20" bytes = "1.5.0" sha1 = "0.10.6" diff --git a/src/bin/shvbroker/main.rs b/src/bin/shvbroker.rs similarity index 58% rename from src/bin/shvbroker/main.rs rename to src/bin/shvbroker.rs index 20a5304..8ad518c 100644 --- a/src/bin/shvbroker/main.rs +++ b/src/bin/shvbroker.rs @@ -1,22 +1,12 @@ use std::{fs}; use std::path::Path; -use futures::{StreamExt}; -use async_std::{channel, net::{TcpListener}, prelude::*, task}; +use async_std::{task}; use log::*; use simple_logger::SimpleLogger; use shv::util::{join_path, parse_log_verbosity}; -use crate::config::{AccessControl, BrokerConfig}; +use shv::broker::config::{AccessControl, BrokerConfig}; use clap::{Parser}; -type Result = std::result::Result>; -type Sender = async_std::channel::Sender; -type Receiver = async_std::channel::Receiver; - -mod config; -mod node; -mod peer; -mod broker; - #[derive(Parser, Debug)] struct CliOpts { /// Config file path @@ -30,7 +20,7 @@ struct CliOpts { verbose: Option, } -pub(crate) fn main() -> Result<()> { +pub(crate) fn main() -> shv::Result<()> { let cli_opts = CliOpts::parse(); let mut logger = SimpleLogger::new(); @@ -84,43 +74,7 @@ pub(crate) fn main() -> Result<()> { info!("Creating access file {access_file}"); fs::write(access_file, serde_yaml::to_string(&access)?)?; } - task::block_on(accept_loop(config, access)) + task::block_on(shv::broker::accept_loop(config, access)) } -async fn accept_loop(config: BrokerConfig, access: AccessControl) -> Result<()> { - if let Some(address) = config.listen.tcp.clone() { - let (broker_sender, broker_receiver) = channel::unbounded(); - let parent_broker_peer_config = config.parent_broker.clone(); - let broker_task = task::spawn(broker::broker_loop(broker_receiver, access)); - if parent_broker_peer_config.enabled { - spawn_and_log_error(peer::parent_broker_peer_loop(1, parent_broker_peer_config, broker_sender.clone())); - } - info!("Listening on TCP: {}", address); - let listener = TcpListener::bind(address).await?; - info!("bind OK"); - let mut client_id = 2; // parent broker has client_id == 1 - let mut incoming = listener.incoming(); - while let Some(stream) = incoming.next().await { - let stream = stream?; - debug!("Accepting from: {}", stream.peer_addr()?); - spawn_and_log_error(peer::peer_loop(client_id, broker_sender.clone(), stream)); - client_id += 1; - } - drop(broker_sender); - broker_task.await; - } else { - return Err("No port to listen on specified".into()); - } - Ok(()) -} -fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> - where - F: Future> + Send + 'static, -{ - task::spawn(async move { - if let Err(e) = fut.await { - eprintln!("{}", e) - } - }) -} diff --git a/src/bin/shvbroker/config.rs b/src/broker/config.rs similarity index 97% rename from src/bin/shvbroker/config.rs rename to src/broker/config.rs index ed95854..ede6584 100644 --- a/src/bin/shvbroker/config.rs +++ b/src/broker/config.rs @@ -3,7 +3,7 @@ use std::fs; use std::path::Path; use log::info; use serde::{Serialize, Deserialize}; -use shv::client::ClientConfig; +use crate::client::ClientConfig; #[derive(Serialize, Deserialize, Debug)] pub struct BrokerConfig { @@ -68,13 +68,13 @@ pub struct Mount { pub description: String, } impl AccessControl { - pub fn from_file(file_name: &str) -> shv::Result { + pub fn from_file(file_name: &str) -> crate::Result { let content = fs::read_to_string(file_name)?; Ok(serde_yaml::from_str(&content)?) } } impl BrokerConfig { - pub fn from_file(file_name: &str) -> shv::Result { + pub fn from_file(file_name: &str) -> crate::Result { let content = fs::read_to_string(file_name)?; Ok(serde_yaml::from_str(&content)?) } diff --git a/src/bin/shvbroker/broker.rs b/src/broker/mod.rs similarity index 83% rename from src/bin/shvbroker/broker.rs rename to src/broker/mod.rs index c8121ad..746e3c7 100644 --- a/src/bin/shvbroker/broker.rs +++ b/src/broker/mod.rs @@ -1,16 +1,25 @@ +use async_std::{channel, task}; +use async_std::net::TcpListener; +use crate::broker::config::{AccessControl, BrokerConfig, Password}; use std::collections::{BTreeMap, HashMap}; use std::collections::hash_map::Entry; use glob::Pattern; -use log::{error, info, Level, log, warn}; -use shv::{List, RpcMessage, RpcMessageMetaTags, RpcValue, rpcvalue, util}; -use shv::rpc::Subscription; -use shv::rpcframe::RpcFrame; -use shv::rpcmessage::{CliId, RpcError, RpcErrorCode}; -use shv::shvnode::{find_longest_prefix, process_local_dir_ls, RequestCommand, ShvNode, SIG_CHNG}; -use shv::util::sha1_hash; -use crate::config::{AccessControl, Password}; -use crate::{node, Receiver, Sender}; -use crate::node::BrokerCommand; +use log::{debug, error, info, Level, log, warn}; +use crate::{List, RpcMessage, RpcMessageMetaTags, RpcValue, rpcvalue, util}; +use crate::rpc::Subscription; +use crate::rpcframe::RpcFrame; +use crate::rpcmessage::{CliId, RpcError, RpcErrorCode}; +use crate::shvnode::{find_longest_prefix, process_local_dir_ls, RequestCommand, ShvNode, SIG_CHNG}; +use crate::util::sha1_hash; +use crate::broker::node::BrokerCommand; +use async_std::stream::StreamExt; + +pub mod config; +pub mod peer; +pub mod node; + +type Sender = async_std::channel::Sender; +type Receiver = async_std::channel::Receiver; pub(crate) enum LoginResult { Ok, @@ -87,7 +96,7 @@ struct ParsedAccessRule { grant: String, } impl ParsedAccessRule { - pub fn new(path: &str, method: &str, grant: &str) -> shv::Result { + pub fn new(path: &str, method: &str, grant: &str) -> crate::Result { let method = if method.is_empty() { "?*" } else { method }; let path = if path.is_empty() { "**" } else { path }; match Pattern::new(method) { @@ -108,7 +117,7 @@ impl ParsedAccessRule { } struct Broker { peers: HashMap, - mounts: BTreeMap>, + mounts: BTreeMap>, access: AccessControl, role_access: HashMap>, } @@ -331,7 +340,7 @@ pub(crate) async fn broker_loop(events: Receiver, access: AccessCon let mut broker = Broker::new(access); - broker.mounts.insert(".app".into(), Mount::Node(Box::new(shv::shvnode::AppNode { app_name: "shvbroker", ..Default::default() }))); + broker.mounts.insert(".app".into(), Mount::Node(Box::new(crate::shvnode::AppNode { app_name: "shvbroker", ..Default::default() }))); broker.mounts.insert(".app/broker".into(), Mount::Node(Box::new(node::AppBrokerNode { }))); broker.mounts.insert(".app/broker/currentClient".into(), Mount::Node(Box::new(node::AppBrokerCurrentClientNode { }))); loop { @@ -522,3 +531,88 @@ pub(crate) async fn broker_loop(events: Receiver, access: AccessCon } } } +pub async fn accept_loop(config: BrokerConfig, access: AccessControl) -> crate::Result<()> { + if let Some(address) = config.listen.tcp.clone() { + let (broker_sender, broker_receiver) = channel::unbounded(); + let parent_broker_peer_config = config.parent_broker.clone(); + let broker_task = task::spawn(crate::broker::broker_loop(broker_receiver, access)); + if parent_broker_peer_config.enabled { + crate::spawn_and_log_error(peer::parent_broker_peer_loop(1, parent_broker_peer_config, broker_sender.clone())); + } + info!("Listening on TCP: {}", address); + let listener = TcpListener::bind(address).await?; + info!("bind OK"); + let mut client_id = 2; // parent broker has client_id == 1 + let mut incoming = listener.incoming(); + while let Some(stream) = incoming.next().await { + let stream = stream?; + debug!("Accepting from: {}", stream.peer_addr()?); + crate::spawn_and_log_error(peer::peer_loop(client_id, broker_sender.clone(), stream)); + client_id += 1; + } + drop(broker_sender); + broker_task.await; + } else { + return Err("No port to listen on specified".into()); + } + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[async_std::test] + async fn test_broker_loop() { + let config = BrokerConfig::default(); + let access = config.access.clone(); + let (broker_writer, broker_reader) = channel::unbounded(); + //let parent_broker_peer_config = config.parent_broker.clone(); + let broker_task = task::spawn(crate::broker::broker_loop(broker_reader, access)); + + let (peer_writer, peer_reader) = channel::unbounded::(); + let client_id = 2; + + struct CallCtx<'a> { + writer: &'a Sender, + reader: &'a Receiver, + client_id: CliId, + } + async fn call(path: &str, method: &str, param: Option, ctx: &CallCtx<'_>) -> RpcValue { + let msg = RpcMessage::new_request(path, method, param); + let frame = RpcFrame::from_rpcmessage(msg).expect("valid message"); + ctx.writer.send(ClientEvent::Frame { client_id: ctx.client_id, frame }).await.unwrap(); + if let PeerEvent::Message(msg) = ctx.reader.recv().await.unwrap() { + msg.result().unwrap().clone() + } else { + panic!("Response expected"); + } + } + let call_ctx = CallCtx{ + writer: &broker_writer, + reader: &peer_reader, + client_id, + }; + + // login + broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer, peer_kind: PeerKind::Client }).await.unwrap(); + broker_writer.send(ClientEvent::GetPassword { client_id, user: "admin".into() }).await.unwrap(); + peer_reader.recv().await.unwrap(); + let register_device = ClientEvent::RegisterDevice { client_id, device_id: Some("test-device".into()), mount_point: Default::default() }; + broker_writer.send(register_device).await.unwrap(); + + // device should be mounted as 'shv/dev/test' + let resp = call("shv/dev", "ls", Some("test".into()), &call_ctx).await; + assert_eq!(resp, RpcValue::from(true)); + + // test current client info + let resp = call(".app/broker/currentClient", "info", None, &call_ctx).await; + let m = resp.as_map(); + assert_eq!(m.get("clientId").unwrap(), &RpcValue::from(2)); + assert_eq!(m.get("mountPoint").unwrap(), &RpcValue::from("shv/dev/test")); + assert_eq!(m.get("userName").unwrap(), &RpcValue::from("admin")); + assert_eq!(m.get("subscriptions").unwrap(), &RpcValue::from(List::new())); + + broker_task.cancel().await; + } +} \ No newline at end of file diff --git a/src/bin/shvbroker/node.rs b/src/broker/node.rs similarity index 90% rename from src/bin/shvbroker/node.rs rename to src/broker/node.rs index f0b6783..8af7b6a 100644 --- a/src/bin/shvbroker/node.rs +++ b/src/broker/node.rs @@ -1,8 +1,8 @@ -use shv::metamethod::{Access, Flag, MetaMethod}; -use shv::{RpcMessage, RpcMessageMetaTags}; -use shv::rpc::Subscription; -use shv::rpcmessage::{CliId, RpcError}; -use shv::shvnode::{RequestCommand, ShvNode}; +use crate::metamethod::{Access, Flag, MetaMethod}; +use crate::{RpcMessage, RpcMessageMetaTags}; +use crate::rpc::Subscription; +use crate::rpcmessage::{CliId, RpcError}; +use crate::shvnode::{RequestCommand, ShvNode}; const METH_CLIENT_INFO: &str = "clientInfo"; const METH_MOUNTED_CLIENT_INFO: &str = "mountedClientInfo"; @@ -86,7 +86,7 @@ impl ShvNode for AppBrokerCurrentClientNode { RequestCommand::::Custom(BrokerCommand::Subscribe(subscription)) } Err(err) => { - RequestCommand::Error(RpcError{ code: shv::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() }) + RequestCommand::Error(RpcError{ code: crate::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() }) } } } @@ -96,7 +96,7 @@ impl ShvNode for AppBrokerCurrentClientNode { RequestCommand::::Custom(BrokerCommand::Unsubscribe(subscription)) } Err(err) => { - RequestCommand::Error(RpcError{ code: shv::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() }) + RequestCommand::Error(RpcError{ code: crate::rpcmessage::RpcErrorCode::InvalidParam, message: err.to_string() }) } } } diff --git a/src/bin/shvbroker/peer.rs b/src/broker/peer.rs similarity index 91% rename from src/bin/shvbroker/peer.rs rename to src/broker/peer.rs index 5d57cf7..2b7d5b3 100644 --- a/src/bin/shvbroker/peer.rs +++ b/src/broker/peer.rs @@ -6,26 +6,26 @@ use futures::FutureExt; use log::{debug, error, info, warn}; use rand::distributions::{Alphanumeric, DistString}; use url::Url; -use shv::{client, RpcMessage, RpcMessageMetaTags, RpcValue}; -use shv::client::LoginParams; -use shv::rpcframe::RpcFrame; -use shv::shvnode::METH_PING; -use shv::util::{join_path, login_from_url, sha1_hash}; +use crate::{client, RpcMessage, RpcMessageMetaTags, RpcValue}; +use crate::client::LoginParams; +use crate::rpcframe::RpcFrame; +use crate::shvnode::METH_PING; +use crate::util::{join_path, login_from_url, sha1_hash}; use crate::broker::{ClientEvent, LoginResult, PeerEvent, PeerKind}; -use crate::config::ParentBrokerConfig; -use crate::Sender; +use crate::broker::config::ParentBrokerConfig; +use crate::broker::Sender; pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender, stream: TcpStream) -> crate::Result<()> { let (socket_reader, mut writer) = (&stream, &stream); - let (peer_writer, peer_receiver) = channel::unbounded::(); + let (peer_writer, peer_reader) = channel::unbounded::(); broker_writer.send(ClientEvent::NewPeer { client_id, sender: peer_writer, peer_kind: PeerKind::Client }).await.unwrap(); //let stream_wr = stream.clone(); let mut brd = BufReader::new(socket_reader); - let mut frame_reader = shv::connection::FrameReader::new(&mut brd); - let mut frame_writer = shv::connection::FrameWriter::new(&mut writer); + let mut frame_reader = crate::connection::FrameReader::new(&mut brd); + let mut frame_writer = crate::connection::FrameWriter::new(&mut writer); let mut device_options = RpcValue::null(); let login_result = loop { @@ -41,7 +41,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender continue; } let nonce = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); - let mut result = shv::Map::new(); + let mut result = crate::Map::new(); result.insert("nonce".into(), RpcValue::from(&nonce)); frame_writer.send_result(resp_meta, result.into()).await?; nonce @@ -64,7 +64,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender let login_type = login.get("type").map(|v| v.as_str()).unwrap_or(""); broker_writer.send(ClientEvent::GetPassword { client_id, user: user.to_string() }).await.unwrap(); - match peer_receiver.recv().await? { + match peer_reader.recv().await? { PeerEvent::PasswordSha1(broker_shapass) => { let chkpwd = || { match broker_shapass { @@ -86,7 +86,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender } }; if chkpwd() { - let mut result = shv::Map::new(); + let mut result = crate::Map::new(); result.insert("clientId".into(), RpcValue::from(client_id)); frame_writer.send_result(resp_meta, result.into()).await?; if let Some(options) = params.get("options") { @@ -133,7 +133,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender break; } }, - event = peer_receiver.recv().fuse() => match event { + event = peer_reader.recv().fuse() => match event { Err(e) => { debug!("Peer channel closed: {}", &e); break; @@ -165,7 +165,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender info!("Client loop exit, client id: {}", client_id); Ok(()) } -pub async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, broker_writer: Sender) -> shv::Result<()> { +pub(crate) async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, broker_writer: Sender) -> crate::Result<()> { info!("Connecting to parent broker: {}", &config.client.url); let url = Url::parse(&config.client.url)?; let (scheme, host, port) = (url.scheme(), url.host_str().unwrap_or_default(), url.port().unwrap_or(3755)); @@ -179,8 +179,8 @@ pub async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, let (reader, mut writer) = (&stream, &stream); let mut brd = BufReader::new(reader); - let mut frame_reader = shv::connection::FrameReader::new(&mut brd); - let mut frame_writer = shv::connection::FrameWriter::new(&mut writer); + let mut frame_reader = crate::connection::FrameReader::new(&mut brd); + let mut frame_writer = crate::connection::FrameWriter::new(&mut writer); // login let (user, password) = login_from_url(&url); diff --git a/src/connection.rs b/src/connection.rs index e1c7c9f..d4168f6 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -1,39 +1,10 @@ use crate::writer::Writer; use async_std::io; -use crate::rpcframe::{Protocol, RpcFrame}; +use crate::rpcframe::{RpcFrame}; use futures::{AsyncReadExt, AsyncWriteExt}; use log::*; use crate::{ChainPackWriter, MetaMap, RpcMessage, RpcMessageMetaTags, RpcValue}; use crate::rpcmessage::{RpcError, RpcErrorCode, RqId}; -// use log::*; - -//pub trait Reader = async_std::io::Write + std::marker::Unpin; -// pub async fn send_frame(writer: &mut W, frame: RpcFrame) -> crate::Result<()> { -// log!(target: "RpcMsg", Level::Debug, "S<== {}", &frame.to_rpcmesage().unwrap_or_default()); -// let mut meta_data = Vec::new(); -// { -// let mut wr = ChainPackWriter::new(&mut meta_data); -// wr.write_meta(&frame.meta)?; -// } -// let mut header = Vec::new(); -// let mut wr = ChainPackWriter::new(&mut header); -// let msg_len = 1 + meta_data.len() + frame.data.len(); -// wr.write_uint_data(msg_len as u64)?; -// header.push(frame.protocol as u8); -// writer.write(&header).await?; -// writer.write(&meta_data).await?; -// writer.write(&frame.data).await?; -// // Ensure the encoded frame is written to the socket. The calls above -// // are to the buffered stream and writes. Calling `flush` writes the -// // remaining contents of the buffer to the socket. -// writer.flush().await?; -// Ok(()) -// } -// pub async fn send_message(writer: &mut W, msg: &RpcMessage) -> crate::Result<()> { -// let frame = RpcFrame::from_rpcmessage(Protocol::ChainPack, &msg)?; -// send_frame(writer, frame).await?; -// Ok(()) -// } pub struct FrameReader<'a, R: async_std::io::Read + std::marker::Unpin> { buffer: Vec, @@ -133,7 +104,7 @@ impl<'a, W: io::Write + std::marker::Unpin> FrameWriter<'a, W> { } pub async fn send_message(&mut self, msg: RpcMessage) -> crate::Result<()> { - let frame = RpcFrame::from_rpcmessage(Protocol::ChainPack, &msg)?; + let frame = RpcFrame::from_rpcmessage(msg)?; self.send_frame(frame).await?; Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 1bc1ba3..981979c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,10 +15,10 @@ pub mod connection; pub mod client; pub mod shvnode; pub mod rpc; +pub mod broker; -pub type Error = Box; -pub type Result = std::result::Result; - +use std::future::Future; +use async_std::task; pub use datetime::DateTime; pub use decimal::Decimal; pub use metamap::MetaMap; @@ -29,4 +29,18 @@ pub use rpcvalue::Value; pub use writer::{Writer, WriteResult}; pub use chainpack::{ChainPackReader, ChainPackWriter}; -pub use cpon::{CponReader, CponWriter}; \ No newline at end of file +pub use cpon::{CponReader, CponWriter}; + +pub type Error = Box; +pub type Result = std::result::Result; + +pub fn spawn_and_log_error(fut: F) -> task::JoinHandle<()> + where + F: Future> + Send + 'static, +{ + task::spawn(async move { + if let Err(e) = fut.await { + eprintln!("{}", e) + } + }) +} \ No newline at end of file diff --git a/src/rpcframe.rs b/src/rpcframe.rs index b0f73a0..9fb96a9 100644 --- a/src/rpcframe.rs +++ b/src/rpcframe.rs @@ -2,7 +2,7 @@ use std::fmt; use std::io::{Cursor, BufReader}; // use tracing::{instrument}; use bytes::Buf; -use crate::{ChainPackReader, ChainPackWriter, CponReader, CponWriter, MetaMap, RpcMessage, RpcMessageMetaTags, rpctype, RpcValue}; +use crate::{ChainPackReader, ChainPackWriter, CponReader, MetaMap, RpcMessage, RpcMessageMetaTags, rpctype, RpcValue}; use crate::writer::Writer; use crate::reader::Reader; use log::*; @@ -31,20 +31,14 @@ impl RpcFrame { pub fn new(protocol: Protocol, meta: MetaMap, data: Vec) -> RpcFrame { RpcFrame { protocol, meta, data } } - pub fn from_rpcmessage(protocol: Protocol, msg: &RpcMessage) -> crate::Result { + pub fn from_rpcmessage(msg: RpcMessage) -> crate::Result { let mut data = Vec::new(); - match &protocol { - Protocol::ChainPack => { - let mut wr = ChainPackWriter::new(&mut data); - wr.write_value(&msg.as_rpcvalue().value())?; - } - Protocol::Cpon => { - let mut wr = CponWriter::new(&mut data); - wr.write_value(&msg.as_rpcvalue().value())?; - } + { + let mut wr = ChainPackWriter::new(&mut data); + wr.write_value(&msg.as_rpcvalue().value())?; } let meta = msg.as_rpcvalue().meta().clone(); - Ok(RpcFrame { protocol, meta, data }) + Ok(RpcFrame { protocol: Protocol::ChainPack, meta, data }) } pub fn to_rpcmesage(&self) -> crate::Result { let mut buff = BufReader::new(&*self.data);