Skip to content

Commit

Permalink
Timeout on broker capabilities discovery RPC calls #5
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Aug 27, 2024
1 parent 64b2dbd commit 65d2a24
Showing 1 changed file with 27 additions and 15 deletions.
42 changes: 27 additions & 15 deletions src/brokerimpl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -935,27 +935,39 @@ impl BrokerImpl {
log!(target: "Subscr", Level::Debug, "Device subscribe path resolved already, peer_id: {client_id}, path: {:?}", &subpath);
return Ok(subpath)
}
async fn check_path(client_id: PeerId, path: &str, broker_command_sender: &Sender<BrokerCommand>) -> shvrpc::Result<Option<String>> {
let (response_sender, response_receiver) = unbounded();
let request = RpcMessage::new_request(path, METH_DIR, Some(METH_SUBSCRIBE.into()));
let cmd = BrokerCommand::RpcCall {
client_id,
request,
response_sender,
};
broker_command_sender.send(cmd).await?;
let resp = response_receiver.recv().await?.to_rpcmesage()?;
if let Ok(val) = resp.result() {
if !val.is_null() {
return Ok(Some(path.to_string()));
async fn check_path_with_timeout(client_id: PeerId, path: &str, broker_command_sender: &Sender<BrokerCommand>) -> shvrpc::Result<Option<String>> {
async fn check_path(client_id: PeerId, path: &str, broker_command_sender: &Sender<BrokerCommand>) -> shvrpc::Result<Option<String>> {
let (response_sender, response_receiver) = unbounded();
let request = RpcMessage::new_request(path, METH_DIR, Some(METH_SUBSCRIBE.into()));
let cmd = BrokerCommand::RpcCall {
client_id,
request,
response_sender,
};
broker_command_sender.send(cmd).await?;
let resp = response_receiver.recv().await?.to_rpcmesage()?;
if let Ok(val) = resp.result() {
if !val.is_null() {
return Ok(Some(path.to_string()));
}
}
Ok(None)
}
let dur = Duration::from_secs(5);
let fut = check_path(client_id, path, broker_command_sender);
match future::timeout(dur, fut).await {
Ok(res) => {
res
}
Err(err) => {
Err(err.into())
}
}
Ok(None)
}
let broker_command_sender = state_reader(&state).command_sender.clone();
let mut subscribe_path = SubscribePath::NotBroker;
for path in [".broker/currentClient", ".broker/app"] {
match check_path(client_id, path, &broker_command_sender).await {
match check_path_with_timeout(client_id, path, &broker_command_sender).await {
Ok(path) => {
if path.is_some() {
subscribe_path = SubscribePath::CanSubscribe(path.unwrap().clone());
Expand Down

0 comments on commit 65d2a24

Please sign in to comment.