Skip to content

Commit

Permalink
TCP tunelling via broker #3 tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Sep 2, 2024
1 parent 51dc5b8 commit 13da929
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 28 deletions.
19 changes: 14 additions & 5 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -624,19 +624,28 @@ impl BrokerState {
let _ = sender.send(ExecSql { query }).await;
});
}
pub(crate) fn create_tunnel(&mut self, frame: &RpcFrame) -> shvrpc::Result<String> {
pub(crate) fn create_tunnel(&mut self, frame: &RpcFrame) -> shvrpc::Result<()> {
let tunid = self.next_tunnel_number;
self.next_tunnel_number += 1;
let tunid = format!("{tunid}");
debug!("create_tunnel: {tunid}");
let rq = frame.to_rpcmesage()?;
let caller_ids = rq.caller_ids();
let param = rq.param().unwrap_or_default().as_map();
let host = param.get("host").unwrap_or_default().to_string();
let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string();
let (sender, receiver) = channel::unbounded::<ToRemoteMsg>();
let tun = OpenTunnelNode { caller_ids, sender };
task::spawn(tunnel_task(tunid.clone(), frame.meta.clone(), host, receiver, self.command_sender.clone()));
self.open_tunnels.insert(tunid.clone(), tun);
Ok(tunid)
let command_sender = self.command_sender.clone();
let rq_meta = frame.meta.clone();
let tunid2 = tunid.clone();
task::spawn(async move {
if let Err(e) = tunnel_task(tunid2.clone(), rq_meta, host, receiver, command_sender.clone()).await {
error!("{}", e)
}
command_sender.send(BrokerCommand::CloseTunnel(tunid2)).await
});
self.open_tunnels.insert(tunid, tun);
Ok(())
}
pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result<bool> {
if let Some(tun) = self.open_tunnels.remove(tunid) {
Expand Down
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ impl Default for BrokerConfig {
("tester".to_string(), Role {
roles: vec!["client".to_string()],
access: vec![
AccessRule { shv_ri: ".app/tunnel:create".into(), grant: "wr".to_string() },
AccessRule { shv_ri: ".app/tunnel:ls".into(), grant: "su".to_string() },
AccessRule { shv_ri: ".app/tunnel:dir".into(), grant: "su".to_string() },
AccessRule { shv_ri: "test/**:*".into(), grant: "cfg".to_string() },
],
}),
Expand Down
2 changes: 1 addition & 1 deletion src/shvnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ impl dyn ShvNode {
Ok(ProcessRequestRetval::Retval(children.into()))
}
LsParam::Exists(path) => {
Ok(ProcessRequestRetval::Retval(children.iter().find(|s| *s == &path).into()))
Ok(ProcessRequestRetval::Retval(children.iter().any(|s| s == &path).into()))
}
}

Expand Down
71 changes: 69 additions & 2 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use crate::brokerimpl::BrokerImpl;
use async_std::channel::{Receiver, Sender};
use async_std::{channel, task};
use rusqlite::Connection;
use shvproto::{RpcValue};
use shvproto::{Map, RpcValue};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use shvrpc::rpc::{ShvRI, SubscriptionParam};
use shvrpc::rpcmessage::{PeerId, RpcError};
use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode};
use shvrpc::util::join_path;
use crate::brokerimpl::{BrokerToPeerMessage, PeerKind, BrokerCommand};
use crate::config::{AccessRule, BrokerConfig, Mount, Password, Role, User};
Expand Down Expand Up @@ -194,5 +194,72 @@ async fn test_broker_loop() {
}
}
}
broker_task.cancel().await;
}

#[async_std::test]
async fn test_tunnel_loop() {
let config = BrokerConfig::default();
let access = config.access.clone();
let broker = BrokerImpl::new(access, None);
let broker_sender = broker.command_sender.clone();
let broker_task = task::spawn(crate::brokerimpl::broker_loop(broker));

let (peer_writer, peer_reader) = channel::unbounded::<BrokerToPeerMessage>();
let client_id = 3;

let call_ctx = CallCtx {
writer: &broker_sender,
reader: &peer_reader,
client_id,
};

// login
let user = "test";
//let password = "test";
broker_sender.send(BrokerCommand::NewPeer {
peer_id: client_id,
peer_kind: PeerKind::Client,
user: user.to_string(),
mount_point: None,
device_id: None,
sender: peer_writer.clone() }).await.unwrap();

let tunid = call(".app/tunnel", "create", None, &call_ctx).await;
// host param is missing
assert!(tunid.is_err());

let param = Map::from([("host".to_string(), "localhost:54321".into())]);
let tunid = call(".app/tunnel", "create", Some(param.into()), &call_ctx).await;
// service not running
assert_eq!(tunid.err().unwrap().code, RpcErrorCode::MethodCallException);

// service is running
// ncat -e /bin/cat -k -l 8888
let param = Map::from([("host".to_string(), "localhost:8888".into())]);
let tunid = call(".app/tunnel", "create", Some(param.into()), &call_ctx).await.unwrap();
assert!(tunid.is_string());

let tunid = tunid.as_str();

let res = call(".app/tunnel", "ls", None, &call_ctx).await.unwrap();
assert_eq!(res.as_list(), &[tunid.into()].to_vec());
let res = call(".app/tunnel", "ls", Some(tunid.into()), &call_ctx).await.unwrap();
assert!(res.as_bool());

let data = "hello".as_bytes();
let res = call(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx).await.unwrap();
assert_eq!(res.as_blob(), data);

let data = "tunnel".as_bytes();
let res = call(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx).await.unwrap();
assert_eq!(res.as_blob(), data);

let res = call(&format!(".app/tunnel/{tunid}"), "close", None, &call_ctx).await.unwrap();
assert!(res.as_bool());

let res = call(".app/tunnel", "ls", None, &call_ctx).await.unwrap();
assert!(res.as_list().is_empty());

broker_task.cancel().await;
}
60 changes: 40 additions & 20 deletions src/tunnelnode.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use async_std::{channel, task};
use async_std::channel::{Receiver, Sender};
use async_std::io::{BufReader, BufWriter, WriteExt};
use async_std::io::{BufReader, BufWriter};
use async_std::net::{TcpStream};
use futures::{select, AsyncReadExt};
use futures::{select, AsyncReadExt, AsyncWriteExt};
use shvproto::{MetaMap, RpcValue};
use shvrpc::{Error, RpcMessage, RpcMessageMetaTags};
use shvrpc::{Error, RpcMessageMetaTags};
use shvrpc::metamethod::{AccessLevel, Flag, MetaMethod};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::rpcmessage::{PeerId, RqId};
use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId};
use crate::brokerimpl::{state_reader, state_writer, BrokerCommand, NodeRequestContext, SharedBrokerState};
use crate::shvnode::{is_request_granted_methods, ProcessRequestRetval, ShvNode, META_METHOD_PUBLIC_DIR, METH_DIR, METH_LS};
use futures::FutureExt;
use log::{debug, error};
use log::{debug, error, log, Level};
use crate::shvnode;

const META_METHOD_PRIVATE_DIR: MetaMethod = MetaMethod { name: METH_DIR, flags: Flag::None as u32, access: AccessLevel::Superuser, param: "DirParam", result: "DirResult", signals: &[], description: "" };
Expand Down Expand Up @@ -71,8 +71,8 @@ impl ShvNode for TunnelNode {
if tunid.is_empty() {
match method {
METH_CREATE => {
let tunid = state_writer(&ctx.state).create_tunnel(frame)?;
Ok(ProcessRequestRetval::Retval(tunid.into()))
state_writer(&ctx.state).create_tunnel(frame)?;
Ok(ProcessRequestRetval::RetvalDeferred)
}
_ => {
Ok(ProcessRequestRetval::MethodNotFound)
Expand All @@ -82,7 +82,7 @@ impl ShvNode for TunnelNode {
match method {
METH_WRITE => {
let rq = frame.to_rpcmesage()?;
let data = rq.result()?.as_blob().to_vec();
let data = rq.param().ok_or("Param missing")?.as_blob().to_vec();
state_reader(&ctx.state).write_tunnel(tunid, rq.request_id().unwrap_or_default(), data)?;
Ok(ProcessRequestRetval::RetvalDeferred)
}
Expand All @@ -97,6 +97,7 @@ impl ShvNode for TunnelNode {
}
}
}
#[derive(Debug)]
pub(crate) enum ToRemoteMsg {
SendData(RqId, Vec<u8>),
DestroyConnection,
Expand All @@ -107,16 +108,33 @@ pub(crate) struct OpenTunnelNode {
}

pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: String, from_broker_receiver: Receiver<ToRemoteMsg>, to_broker_sender: Sender<BrokerCommand>) -> shvrpc::Result<()> {
let stream = TcpStream::connect(addr).await?;
let peer_id= *request_meta.caller_ids().first().ok_or("Invalid peer id")?;
let mut response_meta = RpcFrame::prepare_response_meta(&request_meta)?;
log!(target: "Tunnel", Level::Debug, "connecting to: {addr} ...");
let stream = match TcpStream::connect(addr).await {
Ok(stream) => {
log!(target: "Tunnel", Level::Debug, "connected OK");
to_broker_sender.send(BrokerCommand::SendResponse {
peer_id,
meta: response_meta.clone(),
result: Ok(tunnel_id.clone().into()),
}).await?;
stream
}
Err(e) => {
to_broker_sender.send(BrokerCommand::SendResponse {
peer_id,
meta: response_meta.clone(),
result: Err(RpcError{ code: RpcErrorCode::MethodCallException, message: e.to_string() }),
}).await?;
return Err(e.to_string().into())
}
};
let (reader, writer) = stream.split();
let mut read_buff: [u8; 256] = [0; 256];
let mut write_buff: Vec<u8> = vec![];
let mut response_buff: Vec<u8> = vec![];
let mut request_id = None;
let mut response_meta = RpcMessage::from_meta(request_meta).meta().clone();
let peer_id = 0;
//let caller_ids = tunnel.caller_ids.clone();
let mut reader = BufReader::new(reader);
let mut fut_from_broker = from_broker_receiver.recv().fuse();
let (write_task_sender, write_task_receiver) = channel::unbounded::<Vec<u8>>();
task::spawn(async move {
let mut writer = BufWriter::new(writer);
Expand All @@ -127,6 +145,8 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
break;
} else {
writer.write_all(&data).await?;
writer.flush().await?;
// println!("DATA written: {:?}", data);
}
}
Err(e) => {
Expand All @@ -147,7 +167,6 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
}
}
loop {
let make_response2 = make_response;
select! {
bytes_read = reader.read(&mut read_buff).fuse() => match bytes_read {
Ok(bytes_read) => {
Expand All @@ -157,24 +176,25 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
break;
} else {
let mut data = read_buff[.. bytes_read].to_vec();
write_buff.append(&mut data);
to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut write_buff)).await?;
response_buff.append(&mut data);
to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut response_buff)).await?;
}
},
Err(e) => {
error!("tunnel socket error: {e}");
break;
}
},
cmd = fut_from_broker => match cmd {
cmd = from_broker_receiver.recv().fuse() => match cmd {
Ok(cmd) => {
// println!("CMD: {:?}", cmd);
match cmd {
ToRemoteMsg::SendData(rqid, data) => {
if request_id.is_none() {
request_id = Some(rqid);
response_meta.set_request_id(rqid);
if !write_buff.is_empty() {
to_broker_sender.send(make_response2(peer_id, response_meta.clone(), &mut write_buff)).await?;
if !response_buff.is_empty() {
to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut response_buff)).await?;
}
}
write_task_sender.send(data).await?;
Expand Down

0 comments on commit 13da929

Please sign in to comment.