Skip to content

Commit

Permalink
Send lsmod signal, when tunnel is created or destroyied
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Sep 4, 2024
1 parent 67a4d54 commit 12c36b9
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvbroker"
version = "3.1.0"
version = "3.1.1"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
34 changes: 27 additions & 7 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub(crate) enum BrokerCommand {
peer_id: PeerId,
},
SendResponse {peer_id: PeerId, meta: MetaMap, result: Result<RpcValue, RpcError>},
SendSignal {shv_path: String, signal: String, source: String, param: RpcValue},
RpcCall {
client_id: PeerId,
request: RpcMessage,
Expand Down Expand Up @@ -635,16 +636,21 @@ impl BrokerState {
let host = param.get("host").ok_or("'host' parameter must be provided")?.as_str().to_string();
let (sender, receiver) = channel::unbounded::<ToRemoteMsg>();
let tun = OpenTunnelNode { caller_ids, sender };
let command_sender = self.command_sender.clone();
let rq_meta = frame.meta.clone();
let tunid2 = tunid.clone();
self.open_tunnels.insert(tunid.clone(), tun);
let command_sender = self.command_sender.clone();
task::spawn(async move {
if let Err(e) = tunnel_task(tunid2.clone(), rq_meta, host, receiver, command_sender.clone()).await {
command_sender.send(BrokerCommand::SendSignal {
shv_path: format!(".app/tunnel/{tunid}"),
signal: "lsmod".to_string(),
source: "ls".to_string(),
param: Map::from([(tunid.clone(), true.into())]).into(),
}).await?;
if let Err(e) = tunnel_task(tunid.clone(), rq_meta, host, receiver, command_sender.clone()).await {
error!("{}", e)
}
command_sender.send(BrokerCommand::CloseTunnel(tunid2)).await
command_sender.send(BrokerCommand::CloseTunnel(tunid)).await
});
self.open_tunnels.insert(tunid, tun);
Ok(())
}
pub(crate) fn close_tunnel(&mut self, tunid: &str) -> shvrpc::Result<bool> {
Expand Down Expand Up @@ -964,6 +970,13 @@ impl BrokerImpl {
let peer_sender = state_reader(&self.state).peers.get(&peer_id).ok_or("Invalid peer ID")?.sender.clone();
peer_sender.send(BrokerToPeerMessage::SendFrame(RpcFrame::from_rpcmessage(&msg)?)).await?;
}
BrokerCommand::SendSignal { shv_path, signal, source, param } => {
let msg = RpcMessage::new_signal_with_source(&shv_path, &signal, &source, Some(param));
let senders: Vec<_> = state_reader(&self.state).peers.values().map(|peer| peer.sender.clone()).collect();
for sender in senders {
sender.send(BrokerToPeerMessage::SendFrame(RpcFrame::from_rpcmessage(&msg)?)).await?;
}
}
BrokerCommand::RpcCall { client_id, request, response_sender } => {
let request_meta = request.meta().clone();
let mut rq2 = request;
Expand All @@ -983,11 +996,18 @@ impl BrokerImpl {
0
});
} else {
error!("SQL config is disabled, use --use-acces-db CLI switch.")
error!("SQL config is disabled, use --use-access-db CLI switch.")
}
}
BrokerCommand::CloseTunnel(tunnel_id) => {
state_writer(&self.state).close_tunnel(&tunnel_id)?;
if state_writer(&self.state).close_tunnel(&tunnel_id)? {
self.command_sender.send(BrokerCommand::SendSignal {
shv_path: format!(".app/tunnel/{tunnel_id}"),
signal: "lsmod".to_string(),
source: "ls".to_string(),
param: Map::from([(tunnel_id, false.into())]).into(),
}).await?;
}
}
}
Ok(())
Expand Down

0 comments on commit 12c36b9

Please sign in to comment.