Skip to content

Commit

Permalink
Add timeout CLI switch
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 8, 2024
1 parent c24ea0c commit 6342c94
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 22 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvcall"
version = "3.1.0"
version = "3.2.0"
edition = "2021"

[features]
Expand All @@ -16,8 +16,9 @@ async-std = { version = "1.12.0", features = ["attributes"] }
log = "0.4.20"
clap = { version = "4.4.12", features = ["derive"] }
rustyline-async = { version = "0.4.1", optional = true }
crossterm = { version = "0.27.0", optional = true }
crossterm = { version = "0.28", optional = true }
url = "2.4.1"
futures-time = "3.0.0"

# For local development
#[patch."https://github.com/silicon-heaven/libshvproto-rs"]
Expand Down
70 changes: 50 additions & 20 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use async_std::os::unix::net::UnixStream;
use async_std::{channel, io, task};
use clap::Parser;
use futures::io::BufWriter;
use futures::{select, AsyncReadExt, FutureExt};
use futures::{select, AsyncReadExt};
use futures::AsyncWriteExt;
use log::*;
use shvrpc::client::LoginParams;
use shvrpc::framerw::{FrameReader, FrameWriter};
use shvrpc::framerw::{FrameReader, FrameWriter, RpcFrameReception};
use shvrpc::rpcmessage::{RqId};
use shvrpc::serialrw::{SerialFrameReader, SerialFrameWriter};
use shvrpc::streamrw::{StreamFrameReader, StreamFrameWriter};
Expand All @@ -21,6 +21,9 @@ use url::Url;
use async_std::channel::{Sender};
use async_std::stream::StreamExt;
use shvrpc::rpcframe::RpcFrame;
use futures_time::future::FutureExt as ff;
use futures_time::time::Duration;
use futures::FutureExt;

#[cfg(feature = "readline")]
use crossterm::tty::IsTty;
Expand All @@ -38,13 +41,14 @@ struct Opts {
///Url to connect to, example tcp://admin@localhost:3755?password=dj4j5HHb, localsocket:path/to/socket
#[arg(name = "url", short = 's', long = "url")]
url: String,
#[arg(short = 't', long = "path")]
path: Option<String>,
/// Method can be specified also together with path like: shv/path:method
/// Method is specified together with path like: shv/path:method
#[arg(short, long)]
method: Option<String>,
#[arg(short, long)]
param: Option<String>,
/// Timeout in milliseconds, value 0 means wait forever.
#[arg(short, long, default_value = "5000")]
timeout: u64,
/// Output format: [ cpon | chainpack | simple | value | "Placeholders {PATH} {METHOD} {VALUE} in any number and combination in custom string." ]
#[arg(short = 'o', long = "output-format", default_value = "cpon")]
output_format: String,
Expand Down Expand Up @@ -106,7 +110,7 @@ pub(crate) fn main() -> Result {
logger.init().unwrap();

info!("=====================================================");
info!("{} starting", std::module_path!());
info!("{} starting", module_path!());
info!("=====================================================");

// let rpc_timeout = Duration::from_millis(DEFAULT_RPC_TIMEOUT_MSEC);
Expand Down Expand Up @@ -271,11 +275,11 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
};
Ok((path, method, param))
}
if opts.path.is_some() && opts.method.is_none() {
if opts.method.is_none() {
return Err("--method parameter missing".into());
}
let mut stdout = io::stdout();
if opts.path.is_none() && opts.method.is_none() {
if opts.method.is_none() {
if is_tty() {
#[cfg(feature = "readline")]
{
Expand Down Expand Up @@ -371,23 +375,49 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
}
}
} else {
let mut path = opts.path.clone().unwrap_or_default();
let mut method = opts.method.clone().unwrap_or_default();
if let Some(ix) = method.find(':') {
path = method[0..ix].to_owned();
method = method[ix + 1..].to_owned();
}
let method = opts.method.clone().unwrap();
let (path, method) = if let Some(ix) = method.find(':') {
(method[0..ix].to_owned(), method[ix + 1..].to_owned())
} else {
return Err("--method parameter must be in form shv/path:method".into());
};
let param = opts.param.clone().unwrap_or_default();
let rqid = send_request(&mut *frame_writer, &path, &method, &param).await?;
loop {
let resp = frame_reader.receive_message().await?;
if resp.request_id().unwrap_or_default() == rqid {
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
break
async fn read_meta(frame_reader: &mut BoxedFrameReader, rq_id: RqId) {
loop {
if let Ok(RpcFrameReception::Meta { request_id, shv_path, method, ..}) = frame_reader.receive_frame_or_meta().await {
if request_id.unwrap_or_default() == rq_id && method.is_none() && shv_path.is_none() {
break;
}
}
}
}
let read_meta_fut = read_meta(&mut frame_reader, rqid);
if opts.timeout > 0 {
if read_meta_fut.timeout(Duration::from_millis(opts.timeout)).await.is_err() {
return Err(format!("Method call response timeout after {} msec.", opts.timeout).into())
}
} else {
read_meta_fut.await
};
return match frame_reader.receive_frame_or_meta().await {
Ok(frame) => {
match frame {
RpcFrameReception::Meta { .. } => {
Err("Unexpected meta received.".into())
}
RpcFrameReception::Frame(frame) => {
let resp = frame.to_rpcmesage()?;
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
Ok(())
}
}
}
Err(e) => {
Err(e.into())
}
}
}

Ok(())
}
fn split_quoted(s: &str) -> Vec<&str> {
Expand Down

0 comments on commit 6342c94

Please sign in to comment.