Skip to content

Commit

Permalink
TCP tunelling via broker #3, close tunnel after inactivity period
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Sep 17, 2024
1 parent bbc66af commit 4fcdd29
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvbroker"
version = "3.1.1"
version = "3.1.2"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
109 changes: 72 additions & 37 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use async_std::stream::StreamExt;
use futures::FutureExt;
use shvrpc::rpcmessage::Tag::RevCallerIds;
use crate::brokerimpl::BrokerCommand::ExecSql;
use crate::tunnelnode::{tunnel_task, OpenTunnelNode, ToRemoteMsg, TunnelNode};
use crate::tunnelnode::{ActiveTunnel, ToRemoteMsg, TunnelNode};

#[derive(Debug)]
pub(crate) struct Subscription {
Expand Down Expand Up @@ -84,7 +84,8 @@ pub(crate) enum BrokerCommand {
ExecSql {
query: String,
},
CloseTunnel(String),
TunnelActive(String),
TunnelClosed(String),
}

#[derive(Debug)]
Expand Down Expand Up @@ -217,7 +218,7 @@ pub struct BrokerState {

pub(crate) command_sender: Sender<BrokerCommand>,

open_tunnels: BTreeMap<String, OpenTunnelNode>,
active_tunnels: BTreeMap<String, ActiveTunnel>,
next_tunnel_number: u64,
}
pub(crate) type SharedBrokerState = Arc<RwLock<BrokerState>>;
Expand Down Expand Up @@ -633,69 +634,75 @@ impl BrokerState {
let _ = sender.send(ExecSql { query }).await;
});
}
pub(crate) fn create_tunnel(&mut self, frame: &RpcFrame) -> shvrpc::Result<()> {
pub(crate) fn create_tunnel(&mut self, request: &RpcMessage) -> shvrpc::Result<(String, Receiver<ToRemoteMsg>)> {
let tunid = self.next_tunnel_number;
self.next_tunnel_number += 1;
let tunid = format!("{tunid}");
debug!("create_tunnel: {tunid}");
let rq = frame.to_rpcmesage()?;
let caller_ids = rq.caller_ids();
let param = rq.param().unwrap_or_default().as_map();
let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string();
let caller_ids = request.caller_ids();
let (sender, receiver) = channel::unbounded::<ToRemoteMsg>();
let tun = OpenTunnelNode { caller_ids, sender };
let rq_meta = frame.meta.clone();
self.open_tunnels.insert(tunid.clone(), tun);
let command_sender = self.command_sender.clone();
task::spawn(async move {
command_sender.send(BrokerCommand::SendSignal {
shv_path: format!(".app/tunnel/{tunid}"),
signal: "lsmod".to_string(),
source: "ls".to_string(),
param: Map::from([(tunid.clone(), true.into())]).into(),
}).await?;
if let Err(e) = tunnel_task(tunid.clone(), rq_meta, host, receiver, command_sender.clone()).await {
error!("{}", e)
}
command_sender.send(BrokerCommand::CloseTunnel(tunid)).await
});
Ok(())
let tun = ActiveTunnel { caller_ids, sender, last_activity: None };
self.active_tunnels.insert(tunid.clone(), tun);
Ok((tunid, receiver))
}
pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result<bool> {
if let Some(tun) = self.open_tunnels.remove(tunid) {
pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result<Option<bool>> {
debug!("close_tunnel: {tunid}");
if let Some(tun) = self.active_tunnels.remove(tunid) {
let sender = tun.sender;
task::spawn(async move {
let _ = sender.send(ToRemoteMsg::DestroyConnection).await;
});
Ok(true)
Ok(Some(tun.last_activity.is_some()))
} else {
// might be callback of previous close_tunel()
Ok(false)
Ok(None)
}
}
pub(crate) fn open_tunnel_ids(&self) -> Vec<String> {
let keys = self.open_tunnels.keys().cloned().collect();
pub(crate) fn active_tunnel_ids(&self) -> Vec<String> {
let keys = self.active_tunnels.iter()
//.filter(|(_id, tun)| tun.last_activity.is_some())
.map(|(id, _tun)| id.clone())
.collect();
keys
}
pub(crate) fn is_request_granted(&self, tunid: &str, frame: &RpcFrame) -> bool {
if let Some(tun) = self.open_tunnels.get(tunid) {
if let Some(tun) = self.active_tunnels.get(tunid) {
let cids = frame.caller_ids();
cids == tun.caller_ids || AccessLevel::try_from(frame.access_level().unwrap_or(0)).unwrap_or(AccessLevel::Browse) == AccessLevel::Superuser
} else {
false
}
}
pub(crate) fn write_tunnel(&self, tunid: &str, rqid: RqId, data: Vec<u8>) -> shvrpc::Result<()> {
if let Some(tun) = self.open_tunnels.get(tunid) {
if let Some(tun) = self.active_tunnels.get(tunid) {
let sender = tun.sender.clone();
task::spawn(async move {
sender.send(ToRemoteMsg::SendData(rqid, data)).await
sender.send(ToRemoteMsg::WriteData(rqid, data)).await
});
Ok(())
} else {
Err(format!("Invalid tunnel ID: {tunid}").into())
}
}
pub(crate) fn touch_tunnel(&mut self, tunid: &str) {
if let Some(tun) = self.active_tunnels.get_mut(tunid) {
tun.last_activity = Some(Instant::now());
}
}
pub(crate) fn last_tunnel_activity(&self, tunid: &str) -> Option<Instant> {
if let Some(tun) = self.active_tunnels.get(tunid) {
tun.last_activity
} else {
None
}
}
pub(crate) fn is_tunnel_active(&self, tunid: &str) -> bool {
if let Some(tun) = self.active_tunnels.get(tunid) {
tun.last_activity.is_some()
} else {
false
}
}
}
enum UpdateSqlOperation<'a> {
Insert{id: &'a str, json: String},
Expand Down Expand Up @@ -750,7 +757,7 @@ impl BrokerImpl {
access,
role_access,
command_sender: command_sender.clone(),
open_tunnels: Default::default(),
active_tunnels: Default::default(),
next_tunnel_number: 1,
};
let mut broker = Self {
Expand Down Expand Up @@ -1033,8 +1040,36 @@ impl BrokerImpl {
error!("SQL config is disabled, use --use-access-db CLI switch.")
}
}
BrokerCommand::CloseTunnel(tunnel_id) => {
if state_writer(&self.state).close_tunnel(&tunnel_id)? {
BrokerCommand::TunnelActive(tunnel_id) => {
self.command_sender.send(BrokerCommand::SendSignal {
shv_path: format!(".app/tunnel/{tunnel_id}"),
signal: "lsmod".to_string(),
source: "ls".to_string(),
param: Map::from([(tunnel_id.clone(), true.into())]).into(),
}).await?;
let command_sender = self.command_sender.clone();
let state = self.state.clone();
let tunid = tunnel_id.clone();
task::spawn(async move {
const TIMEOUT: Duration = Duration::from_secs(60 * 60);
loop {
let last_activity = state_reader(&state).last_tunnel_activity(&tunid);
if let Some(last_activity) = last_activity {
if Instant::now().duration_since(last_activity) > TIMEOUT {
break;
}
} else {
break;
}
task::sleep(TIMEOUT / 60).await;
}
log!(target: "Tunnel", Level::Warn, "Closing tunnel: {tunid} as inactive for {:#?}", TIMEOUT);
command_sender.send(BrokerCommand::TunnelClosed(tunid)).await
});
}
BrokerCommand::TunnelClosed(tunnel_id) => {
let closed = state_writer(&self.state).close_tunnel(&tunnel_id)?;
if let Some(true) = closed {
self.command_sender.send(BrokerCommand::SendSignal {
shv_path: format!(".app/tunnel/{tunnel_id}"),
signal: "lsmod".to_string(),
Expand Down
53 changes: 40 additions & 13 deletions src/tunnelnode.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::time::{Instant};
use async_std::{channel, task};
use async_std::channel::{Receiver, Sender};
use async_std::io::{BufReader, BufWriter};
Expand Down Expand Up @@ -44,7 +45,7 @@ impl ShvNode for TunnelNode {
}
}
fn children(&self, shv_path: &str, broker_state: &SharedBrokerState) -> Option<Vec<String>> {
let tunnels = state_reader(broker_state).open_tunnel_ids();
let tunnels = state_reader(broker_state).active_tunnel_ids();
if shv_path.is_empty() {
Some(tunnels)
} else if tunnels.contains(&shv_path.to_string()) {
Expand All @@ -71,7 +72,20 @@ impl ShvNode for TunnelNode {
if tunid.is_empty() {
match method {
METH_CREATE => {
state_writer(&ctx.state).create_tunnel(frame)?;
let rq = frame.to_rpcmesage()?;
let param = rq.param().unwrap_or_default().as_map();
let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string();
let (tunid, receiver) = state_writer(&ctx.state).create_tunnel(&rq)?;
let rq_meta = rq.meta().clone();
let state = ctx.state.clone();
let command_sender = state_reader(&state).command_sender.clone();
let tunid2 = tunid.clone();
task::spawn(async move {
if let Err(e) = tunnel_task(tunid2.clone(), rq_meta, host, receiver, state).await {
error!("{}", e)
}
command_sender.send(BrokerCommand::TunnelClosed(tunid2)).await
});
Ok(ProcessRequestRetval::RetvalDeferred)
}
_ => {
Expand All @@ -87,8 +101,13 @@ impl ShvNode for TunnelNode {
Ok(ProcessRequestRetval::RetvalDeferred)
}
METH_CLOSE => {
let res = state_writer(&ctx.state).close_tunnel(tunid)?;
Ok(ProcessRequestRetval::Retval(res.into()))
let command_sender = state_reader(&ctx.state).command_sender.clone();
let is_active = state_reader(&ctx.state).is_tunnel_active(tunid);
let tunid = tunid.to_string();
task::spawn(async move {
let _ = command_sender.send(BrokerCommand::TunnelClosed(tunid)).await;
});
Ok(ProcessRequestRetval::Retval(is_active.into()))
}
_ => {
Ok(ProcessRequestRetval::MethodNotFound)
Expand All @@ -99,17 +118,19 @@ impl ShvNode for TunnelNode {
}
#[derive(Debug)]
pub(crate) enum ToRemoteMsg {
SendData(RqId, Vec<u8>),
WriteData(RqId, Vec<u8>),
DestroyConnection,
}
pub(crate) struct OpenTunnelNode {
pub(crate) struct ActiveTunnel {
pub(crate) caller_ids: Vec<PeerId>,
pub(crate) sender: Sender<ToRemoteMsg>,
pub(crate) last_activity: Option<Instant>,
}

pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: String, from_broker_receiver: Receiver<ToRemoteMsg>, to_broker_sender: Sender<BrokerCommand>) -> shvrpc::Result<()> {
pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr: String, from_broker_receiver: Receiver<ToRemoteMsg>, state: SharedBrokerState) -> shvrpc::Result<()> {
let peer_id= *request_meta.caller_ids().first().ok_or("Invalid peer id")?;
let mut response_meta = RpcFrame::prepare_response_meta(&request_meta)?;
let to_broker_sender = state_reader(&state).command_sender.clone();
log!(target: "Tunnel", Level::Debug, "connecting to: {addr} ...");
let stream = match TcpStream::connect(addr).await {
Ok(stream) => {
Expand All @@ -130,10 +151,12 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
return Err(e.to_string().into())
}
};
state_writer(&state).touch_tunnel(&tunnel_id);
to_broker_sender.send(BrokerCommand::TunnelActive(tunnel_id.clone())).await?;
let (reader, writer) = stream.split();
let mut read_buff: [u8; 256] = [0; 256];
let mut response_buff: Vec<u8> = vec![];
let mut request_id = None;
let mut write_request_id = None;
let mut reader = BufReader::new(reader);
let (write_task_sender, write_task_receiver) = channel::unbounded::<Vec<u8>>();
task::spawn(async move {
Expand Down Expand Up @@ -182,6 +205,7 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
bytes_read = reader.read(&mut read_buff).fuse() => match bytes_read {
Ok(bytes_read) => {
log!(target: "Tunnel", Level::Trace, "Read {bytes_read} bytes from client socket.");
state_writer(&state).touch_tunnel(&tunnel_id);
if bytes_read == 0 {
log!(target: "Tunnel", Level::Trace, "Client socket closed.");
break;
Expand All @@ -200,9 +224,10 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
Ok(cmd) => {
log!(target: "Tunnel", Level::Trace, "CMD: {:?}", cmd);
match cmd {
ToRemoteMsg::SendData(rqid, data) => {
if request_id.is_none() {
request_id = Some(rqid);
ToRemoteMsg::WriteData(rqid, data) => {
state_writer(&state).touch_tunnel(&tunnel_id);
if write_request_id.is_none() {
write_request_id = Some(rqid);
response_meta.set_request_id(rqid);
if !response_buff.is_empty() {
log!(target: "Tunnel", Level::Trace, "to_broker_sender send: {} bytes to {peer_id}", response_buff.len());
Expand All @@ -215,7 +240,9 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
}
}
ToRemoteMsg::DestroyConnection => {
to_broker_sender.send(make_err_response(peer_id, response_meta.clone(), RpcError::new(RpcErrorCode::MethodCallCancelled, "Tunnel closed."))).await?;
if write_request_id.is_some() {
to_broker_sender.send(make_err_response(peer_id, response_meta.clone(), RpcError::new(RpcErrorCode::MethodCallCancelled, format!("Tunnel: {tunnel_id} closed.")))).await?;
}
break
}
}
Expand All @@ -229,6 +256,6 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
}
// cancel write task
write_task_sender.send(vec![]).await?;
to_broker_sender.send(BrokerCommand::CloseTunnel(tunnel_id)).await?;
to_broker_sender.send(BrokerCommand::TunnelClosed(tunnel_id)).await?;
Ok(())
}
2 changes: 1 addition & 1 deletion tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub fn text_from_output(output: Output) -> shvrpc::Result<String> {
pub fn string_list_from_output(output: Output) -> shvrpc::Result<Vec<String>> {
let bytes = text_from_output(output)?;
let mut values = Vec::new();
for cpon in bytes.split(|b| b == '\n').filter(|line| !line.is_empty()) {
for cpon in bytes.split('\n').filter(|line| !line.is_empty()) {
values.push(cpon.trim().to_owned());
}
Ok(values)
Expand Down

0 comments on commit 4fcdd29

Please sign in to comment.