From 4fcdd2959b15b26e7dc367e069dc42004ef03589 Mon Sep 17 00:00:00 2001 From: Fanda Vacek Date: Mon, 16 Sep 2024 17:07:45 +0200 Subject: [PATCH] TCP tunelling via broker #3, close tunnel after inactivity period --- Cargo.toml | 2 +- src/brokerimpl.rs | 109 +++++++++++++++++++++++++++++--------------- src/tunnelnode.rs | 53 +++++++++++++++------ tests/common/mod.rs | 2 +- 4 files changed, 114 insertions(+), 52 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 081fc4a..b8f2d02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shvbroker" -version = "3.1.1" +version = "3.1.2" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/src/brokerimpl.rs b/src/brokerimpl.rs index f12a736..80dbf7c 100644 --- a/src/brokerimpl.rs +++ b/src/brokerimpl.rs @@ -25,7 +25,7 @@ use async_std::stream::StreamExt; use futures::FutureExt; use shvrpc::rpcmessage::Tag::RevCallerIds; use crate::brokerimpl::BrokerCommand::ExecSql; -use crate::tunnelnode::{tunnel_task, OpenTunnelNode, ToRemoteMsg, TunnelNode}; +use crate::tunnelnode::{ActiveTunnel, ToRemoteMsg, TunnelNode}; #[derive(Debug)] pub(crate) struct Subscription { @@ -84,7 +84,8 @@ pub(crate) enum BrokerCommand { ExecSql { query: String, }, - CloseTunnel(String), + TunnelActive(String), + TunnelClosed(String), } #[derive(Debug)] @@ -217,7 +218,7 @@ pub struct BrokerState { pub(crate) command_sender: Sender, - open_tunnels: BTreeMap, + active_tunnels: BTreeMap, next_tunnel_number: u64, } pub(crate) type SharedBrokerState = Arc>; @@ -633,52 +634,39 @@ impl BrokerState { let _ = sender.send(ExecSql { query }).await; }); } - pub(crate) fn create_tunnel(&mut self, frame: &RpcFrame) -> shvrpc::Result<()> { + pub(crate) fn create_tunnel(&mut self, request: &RpcMessage) -> shvrpc::Result<(String, Receiver)> { let tunid = self.next_tunnel_number; self.next_tunnel_number += 1; let tunid = format!("{tunid}"); debug!("create_tunnel: {tunid}"); - let rq = frame.to_rpcmesage()?; - let caller_ids = rq.caller_ids(); - let param = rq.param().unwrap_or_default().as_map(); - let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string(); + let caller_ids = request.caller_ids(); let (sender, receiver) = channel::unbounded::(); - let tun = OpenTunnelNode { caller_ids, sender }; - let rq_meta = frame.meta.clone(); - self.open_tunnels.insert(tunid.clone(), tun); - let command_sender = self.command_sender.clone(); - task::spawn(async move { - command_sender.send(BrokerCommand::SendSignal { - shv_path: format!(".app/tunnel/{tunid}"), - signal: "lsmod".to_string(), - source: "ls".to_string(), - param: Map::from([(tunid.clone(), true.into())]).into(), - }).await?; - if let Err(e) = tunnel_task(tunid.clone(), rq_meta, host, receiver, command_sender.clone()).await { - error!("{}", e) - } - command_sender.send(BrokerCommand::CloseTunnel(tunid)).await - }); - Ok(()) + let tun = ActiveTunnel { caller_ids, sender, last_activity: None }; + self.active_tunnels.insert(tunid.clone(), tun); + Ok((tunid, receiver)) } - pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result { - if let Some(tun) = self.open_tunnels.remove(tunid) { + pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result> { + debug!("close_tunnel: {tunid}"); + if let Some(tun) = self.active_tunnels.remove(tunid) { let sender = tun.sender; task::spawn(async move { let _ = sender.send(ToRemoteMsg::DestroyConnection).await; }); - Ok(true) + Ok(Some(tun.last_activity.is_some())) } else { // might be callback of previous close_tunel() - Ok(false) + Ok(None) } } - pub(crate) fn open_tunnel_ids(&self) -> Vec { - let keys = self.open_tunnels.keys().cloned().collect(); + pub(crate) fn active_tunnel_ids(&self) -> Vec { + let keys = self.active_tunnels.iter() + //.filter(|(_id, tun)| tun.last_activity.is_some()) + .map(|(id, _tun)| id.clone()) + .collect(); keys } pub(crate) fn is_request_granted(&self, tunid: &str, frame: &RpcFrame) -> bool { - if let Some(tun) = self.open_tunnels.get(tunid) { + if let Some(tun) = self.active_tunnels.get(tunid) { let cids = frame.caller_ids(); cids == tun.caller_ids || AccessLevel::try_from(frame.access_level().unwrap_or(0)).unwrap_or(AccessLevel::Browse) == AccessLevel::Superuser } else { @@ -686,16 +674,35 @@ impl BrokerState { } } pub(crate) fn write_tunnel(&self, tunid: &str, rqid: RqId, data: Vec) -> shvrpc::Result<()> { - if let Some(tun) = self.open_tunnels.get(tunid) { + if let Some(tun) = self.active_tunnels.get(tunid) { let sender = tun.sender.clone(); task::spawn(async move { - sender.send(ToRemoteMsg::SendData(rqid, data)).await + sender.send(ToRemoteMsg::WriteData(rqid, data)).await }); Ok(()) } else { Err(format!("Invalid tunnel ID: {tunid}").into()) } } + pub(crate) fn touch_tunnel(&mut self, tunid: &str) { + if let Some(tun) = self.active_tunnels.get_mut(tunid) { + tun.last_activity = Some(Instant::now()); + } + } + pub(crate) fn last_tunnel_activity(&self, tunid: &str) -> Option { + if let Some(tun) = self.active_tunnels.get(tunid) { + tun.last_activity + } else { + None + } + } + pub(crate) fn is_tunnel_active(&self, tunid: &str) -> bool { + if let Some(tun) = self.active_tunnels.get(tunid) { + tun.last_activity.is_some() + } else { + false + } + } } enum UpdateSqlOperation<'a> { Insert{id: &'a str, json: String}, @@ -750,7 +757,7 @@ impl BrokerImpl { access, role_access, command_sender: command_sender.clone(), - open_tunnels: Default::default(), + active_tunnels: Default::default(), next_tunnel_number: 1, }; let mut broker = Self { @@ -1033,8 +1040,36 @@ impl BrokerImpl { error!("SQL config is disabled, use --use-access-db CLI switch.") } } - BrokerCommand::CloseTunnel(tunnel_id) => { - if state_writer(&self.state).close_tunnel(&tunnel_id)? { + BrokerCommand::TunnelActive(tunnel_id) => { + self.command_sender.send(BrokerCommand::SendSignal { + shv_path: format!(".app/tunnel/{tunnel_id}"), + signal: "lsmod".to_string(), + source: "ls".to_string(), + param: Map::from([(tunnel_id.clone(), true.into())]).into(), + }).await?; + let command_sender = self.command_sender.clone(); + let state = self.state.clone(); + let tunid = tunnel_id.clone(); + task::spawn(async move { + const TIMEOUT: Duration = Duration::from_secs(60 * 60); + loop { + let last_activity = state_reader(&state).last_tunnel_activity(&tunid); + if let Some(last_activity) = last_activity { + if Instant::now().duration_since(last_activity) > TIMEOUT { + break; + } + } else { + break; + } + task::sleep(TIMEOUT / 60).await; + } + log!(target: "Tunnel", Level::Warn, "Closing tunnel: {tunid} as inactive for {:#?}", TIMEOUT); + command_sender.send(BrokerCommand::TunnelClosed(tunid)).await + }); + } + BrokerCommand::TunnelClosed(tunnel_id) => { + let closed = state_writer(&self.state).close_tunnel(&tunnel_id)?; + if let Some(true) = closed { self.command_sender.send(BrokerCommand::SendSignal { shv_path: format!(".app/tunnel/{tunnel_id}"), signal: "lsmod".to_string(), diff --git a/src/tunnelnode.rs b/src/tunnelnode.rs index 50233a5..424d6e3 100644 --- a/src/tunnelnode.rs +++ b/src/tunnelnode.rs @@ -1,3 +1,4 @@ +use std::time::{Instant}; use async_std::{channel, task}; use async_std::channel::{Receiver, Sender}; use async_std::io::{BufReader, BufWriter}; @@ -44,7 +45,7 @@ impl ShvNode for TunnelNode { } } fn children(&self, shv_path: &str, broker_state: &SharedBrokerState) -> Option> { - let tunnels = state_reader(broker_state).open_tunnel_ids(); + let tunnels = state_reader(broker_state).active_tunnel_ids(); if shv_path.is_empty() { Some(tunnels) } else if tunnels.contains(&shv_path.to_string()) { @@ -71,7 +72,20 @@ impl ShvNode for TunnelNode { if tunid.is_empty() { match method { METH_CREATE => { - state_writer(&ctx.state).create_tunnel(frame)?; + let rq = frame.to_rpcmesage()?; + let param = rq.param().unwrap_or_default().as_map(); + let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string(); + let (tunid, receiver) = state_writer(&ctx.state).create_tunnel(&rq)?; + let rq_meta = rq.meta().clone(); + let state = ctx.state.clone(); + let command_sender = state_reader(&state).command_sender.clone(); + let tunid2 = tunid.clone(); + task::spawn(async move { + if let Err(e) = tunnel_task(tunid2.clone(), rq_meta, host, receiver, state).await { + error!("{}", e) + } + command_sender.send(BrokerCommand::TunnelClosed(tunid2)).await + }); Ok(ProcessRequestRetval::RetvalDeferred) } _ => { @@ -87,8 +101,13 @@ impl ShvNode for TunnelNode { Ok(ProcessRequestRetval::RetvalDeferred) } METH_CLOSE => { - let res = state_writer(&ctx.state).close_tunnel(tunid)?; - Ok(ProcessRequestRetval::Retval(res.into())) + let command_sender = state_reader(&ctx.state).command_sender.clone(); + let is_active = state_reader(&ctx.state).is_tunnel_active(tunid); + let tunid = tunid.to_string(); + task::spawn(async move { + let _ = command_sender.send(BrokerCommand::TunnelClosed(tunid)).await; + }); + Ok(ProcessRequestRetval::Retval(is_active.into())) } _ => { Ok(ProcessRequestRetval::MethodNotFound) @@ -99,17 +118,19 @@ impl ShvNode for TunnelNode { } #[derive(Debug)] pub(crate) enum ToRemoteMsg { - SendData(RqId, Vec), + WriteData(RqId, Vec), DestroyConnection, } -pub(crate) struct OpenTunnelNode { +pub(crate) struct ActiveTunnel { pub(crate) caller_ids: Vec, pub(crate) sender: Sender, + pub(crate) last_activity: Option, } -pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: String, from_broker_receiver: Receiver, to_broker_sender: Sender) -> shvrpc::Result<()> { +pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: String, from_broker_receiver: Receiver, state: SharedBrokerState) -> shvrpc::Result<()> { let peer_id= *request_meta.caller_ids().first().ok_or("Invalid peer id")?; let mut response_meta = RpcFrame::prepare_response_meta(&request_meta)?; + let to_broker_sender = state_reader(&state).command_sender.clone(); log!(target: "Tunnel", Level::Debug, "connecting to: {addr} ..."); let stream = match TcpStream::connect(addr).await { Ok(stream) => { @@ -130,10 +151,12 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: return Err(e.to_string().into()) } }; + state_writer(&state).touch_tunnel(&tunnel_id); + to_broker_sender.send(BrokerCommand::TunnelActive(tunnel_id.clone())).await?; let (reader, writer) = stream.split(); let mut read_buff: [u8; 256] = [0; 256]; let mut response_buff: Vec = vec![]; - let mut request_id = None; + let mut write_request_id = None; let mut reader = BufReader::new(reader); let (write_task_sender, write_task_receiver) = channel::unbounded::>(); task::spawn(async move { @@ -182,6 +205,7 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: bytes_read = reader.read(&mut read_buff).fuse() => match bytes_read { Ok(bytes_read) => { log!(target: "Tunnel", Level::Trace, "Read {bytes_read} bytes from client socket."); + state_writer(&state).touch_tunnel(&tunnel_id); if bytes_read == 0 { log!(target: "Tunnel", Level::Trace, "Client socket closed."); break; @@ -200,9 +224,10 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: Ok(cmd) => { log!(target: "Tunnel", Level::Trace, "CMD: {:?}", cmd); match cmd { - ToRemoteMsg::SendData(rqid, data) => { - if request_id.is_none() { - request_id = Some(rqid); + ToRemoteMsg::WriteData(rqid, data) => { + state_writer(&state).touch_tunnel(&tunnel_id); + if write_request_id.is_none() { + write_request_id = Some(rqid); response_meta.set_request_id(rqid); if !response_buff.is_empty() { log!(target: "Tunnel", Level::Trace, "to_broker_sender send: {} bytes to {peer_id}", response_buff.len()); @@ -215,7 +240,9 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: } } ToRemoteMsg::DestroyConnection => { - to_broker_sender.send(make_err_response(peer_id, response_meta.clone(), RpcError::new(RpcErrorCode::MethodCallCancelled, "Tunnel closed."))).await?; + if write_request_id.is_some() { + to_broker_sender.send(make_err_response(peer_id, response_meta.clone(), RpcError::new(RpcErrorCode::MethodCallCancelled, format!("Tunnel: {tunnel_id} closed.")))).await?; + } break } } @@ -229,6 +256,6 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: } // cancel write task write_task_sender.send(vec![]).await?; - to_broker_sender.send(BrokerCommand::CloseTunnel(tunnel_id)).await?; + to_broker_sender.send(BrokerCommand::TunnelClosed(tunnel_id)).await?; Ok(()) } \ No newline at end of file diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 152abd8..fa29b7a 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -53,7 +53,7 @@ pub fn text_from_output(output: Output) -> shvrpc::Result { pub fn string_list_from_output(output: Output) -> shvrpc::Result> { let bytes = text_from_output(output)?; let mut values = Vec::new(); - for cpon in bytes.split(|b| b == '\n').filter(|line| !line.is_empty()) { + for cpon in bytes.split('\n').filter(|line| !line.is_empty()) { values.push(cpon.trim().to_owned()); } Ok(values)