From 13da92995700705de249072b4b0ea70b143dc7a9 Mon Sep 17 00:00:00 2001 From: Fanda Vacek Date: Mon, 2 Sep 2024 13:53:10 +0200 Subject: [PATCH] TCP tunelling via broker #3 tests --- src/brokerimpl.rs | 19 +++++++++---- src/config.rs | 3 ++ src/shvnode.rs | 2 +- src/test.rs | 71 +++++++++++++++++++++++++++++++++++++++++++++-- src/tunnelnode.rs | 60 ++++++++++++++++++++++++++------------- 5 files changed, 127 insertions(+), 28 deletions(-) diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index d7b4dd1..e5a8f03 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -624,19 +624,28 @@ impl BrokerState { let _ = sender.send(ExecSql { query }).await; }); } - pub(crate) fn create_tunnel(&mut self, frame: &RpcFrame) -> shvrpc::Result { + pub(crate) fn create_tunnel(&mut self, frame: &RpcFrame) -> shvrpc::Result<()> { let tunid = self.next_tunnel_number; self.next_tunnel_number += 1; let tunid = format!("{tunid}"); + debug!("create_tunnel: {tunid}"); let rq = frame.to_rpcmesage()?; let caller_ids = rq.caller_ids(); let param = rq.param().unwrap_or_default().as_map(); - let host = param.get("host").unwrap_or_default().to_string(); + let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string(); let (sender, receiver) = channel::unbounded::(); let tun = OpenTunnelNode { caller_ids, sender }; - task::spawn(tunnel_task(tunid.clone(), frame.meta.clone(), host, receiver, self.command_sender.clone())); - self.open_tunnels.insert(tunid.clone(), tun); - Ok(tunid) + let command_sender = self.command_sender.clone(); + let rq_meta = frame.meta.clone(); + let tunid2 = tunid.clone(); + task::spawn(async move { + if let Err(e) = tunnel_task(tunid2.clone(), rq_meta, host, receiver, command_sender.clone()).await { + error!("{}", e) + } + command_sender.send(BrokerCommand::CloseTunnel(tunid2)).await + }); + self.open_tunnels.insert(tunid, tun); + Ok(()) } pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result { if let Some(tun) = self.open_tunnels.remove(tunid) { diff --git a/src/config.rs b/src/config.rs index dd37f0d..a3ecf4f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -152,6 +152,9 @@ impl Default for BrokerConfig { ("tester".to_string(), Role { roles: vec!["client".to_string()], access: vec![ + AccessRule { shv_ri: ".app/tunnel:create".into(), grant: "wr".to_string() }, + AccessRule { shv_ri: ".app/tunnel:ls".into(), grant: "su".to_string() }, + AccessRule { shv_ri: ".app/tunnel:dir".into(), grant: "su".to_string() }, AccessRule { shv_ri: "test/**:*".into(), grant: "cfg".to_string() }, ], }), diff --git a/src/shvnode.rs b/src/shvnode.rs index 78ee481..f1f941b 100644 --- a/src/shvnode.rs +++ b/src/shvnode.rs @@ -250,7 +250,7 @@ impl dyn ShvNode { Ok(ProcessRequestRetval::Retval(children.into())) } LsParam::Exists(path) => { - Ok(ProcessRequestRetval::Retval(children.iter().find(|s| *s == &path).into())) + Ok(ProcessRequestRetval::Retval(children.iter().any(|s| s == &path).into())) } } diff --git a/src/test.rs b/src/test.rs index e6082a1..2c9587a 100644 --- a/src/test.rs +++ b/src/test.rs @@ -2,11 +2,11 @@ use crate::brokerimpl::BrokerImpl; use async_std::channel::{Receiver, Sender}; use async_std::{channel, task}; use rusqlite::Connection; -use shvproto::{RpcValue}; +use shvproto::{Map, RpcValue}; use shvrpc::rpcframe::RpcFrame; use shvrpc::{RpcMessage, RpcMessageMetaTags}; use shvrpc::rpc::{ShvRI, SubscriptionParam}; -use shvrpc::rpcmessage::{PeerId, RpcError}; +use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode}; use shvrpc::util::join_path; use crate::brokerimpl::{BrokerToPeerMessage, PeerKind, BrokerCommand}; use crate::config::{AccessRule, BrokerConfig, Mount, Password, Role, User}; @@ -194,5 +194,72 @@ async fn test_broker_loop() { } } } + broker_task.cancel().await; +} + +#[async_std::test] +async fn test_tunnel_loop() { + let config = BrokerConfig::default(); + let access = config.access.clone(); + let broker = BrokerImpl::new(access, None); + let broker_sender = broker.command_sender.clone(); + let broker_task = task::spawn(crate::brokerimpl::broker_loop(broker)); + + let (peer_writer, peer_reader) = channel::unbounded::(); + let client_id = 3; + + let call_ctx = CallCtx { + writer: &broker_sender, + reader: &peer_reader, + client_id, + }; + + // login + let user = "test"; + //let password = "test"; + broker_sender.send(BrokerCommand::NewPeer { + peer_id: client_id, + peer_kind: PeerKind::Client, + user: user.to_string(), + mount_point: None, + device_id: None, + sender: peer_writer.clone() }).await.unwrap(); + + let tunid = call(".app/tunnel", "create", None, &call_ctx).await; + // host param is missing + assert!(tunid.is_err()); + + let param = Map::from([("host".to_string(), "localhost:54321".into())]); + let tunid = call(".app/tunnel", "create", Some(param.into()), &call_ctx).await; + // service not running + assert_eq!(tunid.err().unwrap().code, RpcErrorCode::MethodCallException); + + // service is running + // ncat -e /bin/cat -k -l 8888 + let param = Map::from([("host".to_string(), "localhost:8888".into())]); + let tunid = call(".app/tunnel", "create", Some(param.into()), &call_ctx).await.unwrap(); + assert!(tunid.is_string()); + + let tunid = tunid.as_str(); + + let res = call(".app/tunnel", "ls", None, &call_ctx).await.unwrap(); + assert_eq!(res.as_list(), &[tunid.into()].to_vec()); + let res = call(".app/tunnel", "ls", Some(tunid.into()), &call_ctx).await.unwrap(); + assert!(res.as_bool()); + + let data = "hello".as_bytes(); + let res = call(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx).await.unwrap(); + assert_eq!(res.as_blob(), data); + + let data = "tunnel".as_bytes(); + let res = call(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx).await.unwrap(); + assert_eq!(res.as_blob(), data); + + let res = call(&format!(".app/tunnel/{tunid}"), "close", None, &call_ctx).await.unwrap(); + assert!(res.as_bool()); + + let res = call(".app/tunnel", "ls", None, &call_ctx).await.unwrap(); + assert!(res.as_list().is_empty()); + broker_task.cancel().await; } \ No newline at end of file diff --git a/src/tunnelnode.rs b/src/tunnelnode.rs index 5f479e0..42e6dfe 100644 --- a/src/tunnelnode.rs +++ b/src/tunnelnode.rs @@ -1,17 +1,17 @@ use async_std::{channel, task}; use async_std::channel::{Receiver, Sender}; -use async_std::io::{BufReader, BufWriter, WriteExt}; +use async_std::io::{BufReader, BufWriter}; use async_std::net::{TcpStream}; -use futures::{select, AsyncReadExt}; +use futures::{select, AsyncReadExt, AsyncWriteExt}; use shvproto::{MetaMap, RpcValue}; -use shvrpc::{Error, RpcMessage, RpcMessageMetaTags}; +use shvrpc::{Error, RpcMessageMetaTags}; use shvrpc::metamethod::{AccessLevel, Flag, MetaMethod}; use shvrpc::rpcframe::RpcFrame; -use shvrpc::rpcmessage::{PeerId, RqId}; +use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId}; use crate::brokerimpl::{state_reader, state_writer, BrokerCommand, NodeRequestContext, SharedBrokerState}; use crate::shvnode::{is_request_granted_methods, ProcessRequestRetval, ShvNode, META_METHOD_PUBLIC_DIR, METH_DIR, METH_LS}; use futures::FutureExt; -use log::{debug, error}; +use log::{debug, error, log, Level}; use crate::shvnode; const META_METHOD_PRIVATE_DIR: MetaMethod = MetaMethod { name: METH_DIR, flags: Flag::None as u32, access: AccessLevel::Superuser, param: "DirParam", result: "DirResult", signals: &[], description: "" }; @@ -71,8 +71,8 @@ impl ShvNode for TunnelNode { if tunid.is_empty() { match method { METH_CREATE => { - let tunid = state_writer(&ctx.state).create_tunnel(frame)?; - Ok(ProcessRequestRetval::Retval(tunid.into())) + state_writer(&ctx.state).create_tunnel(frame)?; + Ok(ProcessRequestRetval::RetvalDeferred) } _ => { Ok(ProcessRequestRetval::MethodNotFound) @@ -82,7 +82,7 @@ impl ShvNode for TunnelNode { match method { METH_WRITE => { let rq = frame.to_rpcmesage()?; - let data = rq.result()?.as_blob().to_vec(); + let data = rq.param().ok_or("Param missing")?.as_blob().to_vec(); state_reader(&ctx.state).write_tunnel(tunid, rq.request_id().unwrap_or_default(), data)?; Ok(ProcessRequestRetval::RetvalDeferred) } @@ -97,6 +97,7 @@ impl ShvNode for TunnelNode { } } } +#[derive(Debug)] pub(crate) enum ToRemoteMsg { SendData(RqId, Vec), DestroyConnection, @@ -107,16 +108,33 @@ pub(crate) struct OpenTunnelNode { } pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: String, from_broker_receiver: Receiver, to_broker_sender: Sender) -> shvrpc::Result<()> { - let stream = TcpStream::connect(addr).await?; + let peer_id= *request_meta.caller_ids().first().ok_or("Invalid peer id")?; + let mut response_meta = RpcFrame::prepare_response_meta(&request_meta)?; + log!(target: "Tunnel", Level::Debug, "connecting to: {addr} ..."); + let stream = match TcpStream::connect(addr).await { + Ok(stream) => { + log!(target: "Tunnel", Level::Debug, "connected OK"); + to_broker_sender.send(BrokerCommand::SendResponse { + peer_id, + meta: response_meta.clone(), + result: Ok(tunnel_id.clone().into()), + }).await?; + stream + } + Err(e) => { + to_broker_sender.send(BrokerCommand::SendResponse { + peer_id, + meta: response_meta.clone(), + result: Err(RpcError{ code: RpcErrorCode::MethodCallException, message: e.to_string() }), + }).await?; + return Err(e.to_string().into()) + } + }; let (reader, writer) = stream.split(); let mut read_buff: [u8; 256] = [0; 256]; - let mut write_buff: Vec = vec![]; + let mut response_buff: Vec = vec![]; let mut request_id = None; - let mut response_meta = RpcMessage::from_meta(request_meta).meta().clone(); - let peer_id = 0; - //let caller_ids = tunnel.caller_ids.clone(); let mut reader = BufReader::new(reader); - let mut fut_from_broker = from_broker_receiver.recv().fuse(); let (write_task_sender, write_task_receiver) = channel::unbounded::>(); task::spawn(async move { let mut writer = BufWriter::new(writer); @@ -127,6 +145,8 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: break; } else { writer.write_all(&data).await?; + writer.flush().await?; + // println!("DATA written: {:?}", data); } } Err(e) => { @@ -147,7 +167,6 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: } } loop { - let make_response2 = make_response; select! { bytes_read = reader.read(&mut read_buff).fuse() => match bytes_read { Ok(bytes_read) => { @@ -157,8 +176,8 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: break; } else { let mut data = read_buff[.. bytes_read].to_vec(); - write_buff.append(&mut data); - to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut write_buff)).await?; + response_buff.append(&mut data); + to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut response_buff)).await?; } }, Err(e) => { @@ -166,15 +185,16 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: break; } }, - cmd = fut_from_broker => match cmd { + cmd = from_broker_receiver.recv().fuse() => match cmd { Ok(cmd) => { + // println!("CMD: {:?}", cmd); match cmd { ToRemoteMsg::SendData(rqid, data) => { if request_id.is_none() { request_id = Some(rqid); response_meta.set_request_id(rqid); - if !write_buff.is_empty() { - to_broker_sender.send(make_response2(peer_id, response_meta.clone(), &mut write_buff)).await?; + if !response_buff.is_empty() { + to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut response_buff)).await?; } } write_task_sender.send(data).await?;