Skip to content

Commit

Permalink
Notify peer if tunnel is closed by 'tunnel/<id>:close'
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Sep 3, 2024
1 parent 62e5e6d commit 6dace71
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 12 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvbroker"
version = "3.0.3"
version = "3.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand All @@ -26,9 +26,6 @@ serde_json = "1.0"
clap = { version = "4.4.12", features = ["derive"] }
crc = "3.2.1"
rusqlite = { version = "0.32.1", features = ["bundled"] }
#tryvial = "0.2.0"
#async-trait = "0.1.80"
#getrandom = { version = "0.2", features = ["js"] }

# For local development
#[patch."https://github.com/silicon-heaven/libshvproto-rs"]
Expand Down
1 change: 1 addition & 0 deletions src/bin/shvbroker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(crate) fn main() -> shvrpc::Result<()> {
logger = logger.with_level(LevelFilter::Info);
if let Some(module_names) = cli_opts.verbose {
for (module, level) in parse_log_verbosity(&module_names, module_path!()) {
//println!("log level for {module} == {level}");
logger = logger.with_module_level(module, level);
}
}
Expand Down
34 changes: 26 additions & 8 deletions src/tunnelnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId};
use crate::brokerimpl::{state_reader, state_writer, BrokerCommand, NodeRequestContext, SharedBrokerState};
use crate::shvnode::{is_request_granted_methods, ProcessRequestRetval, ShvNode, META_METHOD_PUBLIC_DIR, METH_DIR, METH_LS};
use futures::FutureExt;
use log::{debug, error, log, Level};
use log::{error, log, Level};
use crate::shvnode;

const META_METHOD_PRIVATE_DIR: MetaMethod = MetaMethod { name: METH_DIR, flags: Flag::None as u32, access: AccessLevel::Superuser, param: "DirParam", result: "DirResult", signals: &[], description: "" };
Expand Down Expand Up @@ -82,7 +82,7 @@ impl ShvNode for TunnelNode {
match method {
METH_WRITE => {
let rq = frame.to_rpcmesage()?;
let data = rq.param().ok_or("Param missing")?.as_blob().to_vec();
let data = rq.param().unwrap_or_default().as_blob().to_vec();
state_reader(&ctx.state).write_tunnel(tunid, rq.request_id().unwrap_or_default(), data)?;
Ok(ProcessRequestRetval::RetvalDeferred)
}
Expand Down Expand Up @@ -137,24 +137,28 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
let mut reader = BufReader::new(reader);
let (write_task_sender, write_task_receiver) = channel::unbounded::<Vec<u8>>();
task::spawn(async move {
log!(target: "Tunnel", Level::Debug, "ENTER write task");
let mut writer = BufWriter::new(writer);
loop {
match write_task_receiver.recv().await {
Ok(data) => {
log!(target: "Tunnel", Level::Trace, "write_task_receiver read {} bytes to write.", data.len());
if data.is_empty() {
break;
} else {
log!(target: "Tunnel", Level::Trace, "Write {} bytes to client socket.", data.len());
writer.write_all(&data).await?;
writer.flush().await?;
// println!("DATA written: {:?}", data);
}
}
Err(e) => {
debug!("read broker channel error: {e}");
error!("write broker channel error: {e}");
break;
}
}
}
log!(target: "Tunnel", Level::Debug, "EXIT write task");
Ok::<(), Error>(())
});
fn make_response(peer_id: PeerId, response_meta: MetaMap, data: &mut Vec<u8>) -> BrokerCommand {
Expand All @@ -166,13 +170,20 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
result: Ok(blob),
}
}
fn make_err_response(peer_id: PeerId, response_meta: MetaMap, err: RpcError) -> BrokerCommand {
BrokerCommand::SendResponse {
peer_id,
meta: response_meta,
result: Err(err),
}
}
loop {
select! {
bytes_read = reader.read(&mut read_buff).fuse() => match bytes_read {
Ok(bytes_read) => {
debug!("read: {bytes_read}");
log!(target: "Tunnel", Level::Trace, "Read {bytes_read} bytes from client socket.");
if bytes_read == 0 {
debug!("socket closed?");
log!(target: "Tunnel", Level::Trace, "Client socket closed.");
break;
} else {
let mut data = read_buff[.. bytes_read].to_vec();
Expand All @@ -187,19 +198,26 @@ pub(crate) async fn tunnel_task(tunnel_id: String, request_meta: MetaMap, addr:
},
cmd = from_broker_receiver.recv().fuse() => match cmd {
Ok(cmd) => {
// println!("CMD: {:?}", cmd);
log!(target: "Tunnel", Level::Trace, "CMD: {:?}", cmd);
match cmd {
ToRemoteMsg::SendData(rqid, data) => {
if request_id.is_none() {
request_id = Some(rqid);
response_meta.set_request_id(rqid);
if !response_buff.is_empty() {
log!(target: "Tunnel", Level::Trace, "to_broker_sender send: {} bytes to {peer_id}", response_buff.len());
to_broker_sender.send(make_response(peer_id, response_meta.clone(), &mut response_buff)).await?;
}
}
write_task_sender.send(data).await?;
if !data.is_empty() {
log!(target: "Tunnel", Level::Trace, "write_task_sender send: {} bytes", data.len());
write_task_sender.send(data).await?;
}
}
ToRemoteMsg::DestroyConnection => {
to_broker_sender.send(make_err_response(peer_id, response_meta.clone(), RpcError::new(RpcErrorCode::MethodCallCancelled, "Tunnel closed."))).await?;
break
}
ToRemoteMsg::DestroyConnection => { break }
}
}
Err(e) => {
Expand Down

0 comments on commit 6dace71

Please sign in to comment.