Skip to content

Commit

Permalink
Add --version CLI param
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 30, 2024
1 parent f40953e commit 960f106
Showing 1 changed file with 112 additions and 46 deletions.
158 changes: 112 additions & 46 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,38 @@
use std::collections::BTreeMap;
use std::future::Future;
use async_std::channel::Sender;
use async_std::io::BufReader;
use async_std::net::{TcpListener, TcpStream};
use async_std::os::unix::net::UnixStream;
use async_std::{channel, io, task};
use clap::Parser;
use futures::io::BufWriter;
use futures::{select, AsyncReadExt};
use futures::AsyncWriteExt;
use futures::{select, AsyncReadExt};
use futures::{FutureExt, StreamExt};
use futures_time::future::FutureExt as ff;
use futures_time::time::Duration;
use log::*;
use shvrpc::client::LoginParams;
use shvrpc::framerw::{FrameReader, FrameWriter};
use shvrpc::rpcmessage::{RqId};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::rpcmessage::RqId;
use shvrpc::serialrw::{SerialFrameReader, SerialFrameWriter};
use shvrpc::streamrw::{StreamFrameReader, StreamFrameWriter};
use shvrpc::util::{login_from_url, parse_log_verbosity};
use shvrpc::{client, RpcMessage, RpcMessageMetaTags};
use shvrpc::{client, Error, RpcMessage, RpcMessageMetaTags};
use simple_logger::SimpleLogger;
use std::collections::BTreeMap;
use std::future::Future;
use url::Url;
use async_std::channel::{Sender};
use shvrpc::rpcframe::RpcFrame;
use futures_time::future::FutureExt as ff;
use futures_time::time::Duration;
use futures::{FutureExt, StreamExt};

#[cfg(feature = "readline")]
use crossterm::tty::IsTty;
use futures::stream::FuturesUnordered;
#[cfg(feature = "readline")]
use rustyline_async::ReadlineEvent;
use shvproto::{Map, RpcValue};
use shvrpc::rpc::ShvRI;
#[cfg(feature = "readline")]
use std::io::Write;
use futures::stream::FuturesUnordered;
use shvrpc::rpc::ShvRI;

type Result = shvrpc::Result<()>;

Expand All @@ -41,7 +41,7 @@ type Result = shvrpc::Result<()>;
struct Opts {
///Url to connect to, example tcp://admin@localhost:3755?password=dj4j5HHb, localsocket:path/to/socket
#[arg(name = "url", short = 's', long = "url")]
url: String,
url: Option<String>,
/// Method is specified together with path like: shv/path:method
#[arg(short, long)]
method: Option<String>,
Expand All @@ -62,6 +62,8 @@ struct Opts {
/// Verbose mode (module, .)
#[arg(short, long)]
verbose: Option<String>,
#[arg(long)]
version: bool,
}
enum OutputFormat {
Cpon,
Expand Down Expand Up @@ -102,8 +104,17 @@ where
}

pub(crate) fn main() -> Result {

let opts = Opts::parse();

//println!("opts.version: {}", opts.version);
let app_name = env!("CARGO_PKG_NAME");
let app_version = env!("CARGO_PKG_VERSION");
if opts.version == true {
println!("{app_name} ver. {app_version}");
return Ok(())
}

let mut logger = SimpleLogger::new();
logger = logger.with_level(LevelFilter::Info);
if let Some(module_names) = &opts.verbose {
Expand All @@ -114,11 +125,11 @@ pub(crate) fn main() -> Result {
logger.init().unwrap();

info!("=====================================================");
info!("{} starting", module_path!());
info!("{app_name} ver. {app_version}");
info!("=====================================================");

// let rpc_timeout = Duration::from_millis(DEFAULT_RPC_TIMEOUT_MSEC);
let url = Url::parse(&opts.url)?;
let url = opts.url.as_ref().ok_or::<Error>("URL param must be provided.".into())?;
let url = Url::parse(url)?;

task::block_on(try_main(&url, opts))
}
Expand Down Expand Up @@ -193,7 +204,11 @@ async fn send_request(
};
frame_writer.send_request(path, method, param).await
}
async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFrameWriter, opts: &Opts) -> Result {
async fn make_call(
mut frame_reader: BoxedFrameReader,
mut frame_writer: BoxedFrameWriter,
opts: &Opts,
) -> Result {
async fn print_resp(
stdout: &mut io::Stdout,
resp: &RpcMessage,
Expand Down Expand Up @@ -384,7 +399,10 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
};
let param = opts.param.clone().unwrap_or_default();
let rqid = send_request(&mut *frame_writer, &path, &method, &param).await?;
async fn receive_response(frame_reader: &mut BoxedFrameReader, rq_id: RqId) -> shvrpc::Result<RpcFrame> {
async fn receive_response(
frame_reader: &mut BoxedFrameReader,
rq_id: RqId,
) -> shvrpc::Result<RpcFrame> {
loop {
let frame = frame_reader.receive_frame().await?;
if frame.is_response() && frame.request_id().unwrap_or_default() == rq_id {
Expand All @@ -393,10 +411,17 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
}
}
let res = if opts.timeout > 0 {
match receive_response(&mut frame_reader, rqid).timeout(Duration::from_millis(opts.timeout)).await {
Ok(maybe_frame) => { maybe_frame }
match receive_response(&mut frame_reader, rqid)
.timeout(Duration::from_millis(opts.timeout))
.await
{
Ok(maybe_frame) => maybe_frame,
Err(_) => {
return Err(format!("Method call response timeout after {} msec.", opts.timeout).into())
return Err(format!(
"Method call response timeout after {} msec.",
opts.timeout
)
.into())
}
}
} else {
Expand All @@ -408,14 +433,15 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
Ok(())
}
Err(e) => {
Err(e)
}
}
Err(e) => Err(e),
};
}
Ok(())
}
async fn receive_response(frame_reader: &mut BoxedFrameReader, rq_id: RqId) -> shvrpc::Result<RpcFrame> {
async fn receive_response(
frame_reader: &mut BoxedFrameReader,
rq_id: RqId,
) -> shvrpc::Result<RpcFrame> {
loop {
let frame = frame_reader.receive_frame().await?;
if frame.is_response() && frame.request_id().unwrap_or_default() == rq_id {
Expand All @@ -439,42 +465,76 @@ async fn make_burst_call(url: &Url, opts: &Opts) -> Result {
let method = opts.method.clone().unwrap();
let ri = ShvRI::try_from(method)?;
let param = opts.param.clone().map(|p| RpcValue::from_cpon(&p).unwrap());
async fn burst_task(url: Url, path: String, method: String, param: Option<RpcValue>, taskno: i32, count: i32) {
async fn burst_task(
url: Url,
path: String,
method: String,
param: Option<RpcValue>,
taskno: i32,
count: i32,
) {
println!("Starting burst task #{taskno}, {count} calls of {path}:{method}");
let (mut frame_reader, mut frame_writer) = login(&url).await.unwrap();
for _ in 0 .. count {
let rqid = frame_writer.send_request(&path, &method, param.clone()).await.unwrap();
for _ in 0..count {
let rqid = frame_writer
.send_request(&path, &method, param.clone())
.await
.unwrap();
receive_response(&mut frame_reader, rqid).await.unwrap();
}
println!("Burst task #{taskno} finished, after {count} calls made successfully.");
}
(0..ntask).map(|taskno| {
task::spawn(burst_task(url.clone(), ri.path().to_owned(), ri.method().to_owned(), param.clone(), taskno + 1, nmsg))
}).collect::<FuturesUnordered<_>>().collect::<Vec<_>>().await;
(0..ntask)
.map(|taskno| {
task::spawn(burst_task(
url.clone(),
ri.path().to_owned(),
ri.method().to_owned(),
param.clone(),
taskno + 1,
nmsg,
))
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
.await;

Ok(())
}
fn split_quoted(s: &str) -> Vec<&str> {
let mut wrapped = false;
let ret = s.split(|c| {
if c == '[' {
wrapped = true;
} else if c == ']' {
wrapped = false;
}
c == ':' && !wrapped
}).collect::<Vec<&str>>();
let ret = s
.split(|c| {
if c == '[' {
wrapped = true;
} else if c == ']' {
wrapped = false;
}
c == ':' && !wrapped
})
.collect::<Vec<&str>>();
ret
}
async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFrameWriter, opts: &Opts) -> Result {
async fn make_tunnel(
mut frame_reader: BoxedFrameReader,
mut frame_writer: BoxedFrameWriter,
opts: &Opts,
) -> Result {
let tunnel_str = opts.tunnel.as_ref().unwrap().as_str();
let tunnel: Vec<_> = split_quoted(tunnel_str);
let tunnel = &tunnel[..];
if tunnel.len() < 3 || tunnel.len() > 4 {
return Err(format!("Invalid tunnel specification: {tunnel_str}").into());
}
let (local_host, tunnel) = if tunnel.len() == 4 {
(if tunnel[0].is_empty() { "0.0.0.0" } else { tunnel[0] }, &tunnel[ 1 ..])
(
if tunnel[0].is_empty() {
"0.0.0.0"
} else {
tunnel[0]
},
&tunnel[1..],
)
} else {
("127.0.0.1", tunnel)
};
Expand Down Expand Up @@ -567,15 +627,19 @@ async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: Boxed
let rq = RpcMessage::new_request(".app/tunnel", "create", Some(tun_opts.into()));
let rqid = rq.request_id().unwrap();
let (sender, receiver) = channel::unbounded::<RpcFrame>();
reader_cmd_sender.send(RpcReaderCmd::RegisterResponse(rqid, sender, true)).await?;
reader_cmd_sender
.send(RpcReaderCmd::RegisterResponse(rqid, sender, true))
.await?;
writer_sender.send(rq.to_frame()?).await?;
let resp = receiver.recv().await?;
resp.to_rpcmesage()?.result()?.as_str().to_owned()
};
let rq = RpcMessage::new_request(&format!(".app/tunnel/{tunid}"), "write", None);
let rqid = rq.request_id().unwrap();
let (sender, receiver) = channel::unbounded::<RpcFrame>();
reader_cmd_sender.send(RpcReaderCmd::RegisterResponse(rqid, sender, false)).await?;
reader_cmd_sender
.send(RpcReaderCmd::RegisterResponse(rqid, sender, false))
.await?;
writer_sender.send(rq.to_frame()?).await?;
let (mut sock_reader, mut sock_writer) = stream.split();
let mut sock_read_buff: [u8; 1024] = [0; 1024];
Expand Down Expand Up @@ -607,15 +671,17 @@ async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: Boxed
}
}
}
reader_cmd_sender.send(RpcReaderCmd::UnregisterResponse(rqid)).await?;
reader_cmd_sender
.send(RpcReaderCmd::UnregisterResponse(rqid))
.await?;
Ok(())
});
}
Ok(())
}
async fn try_main(url: &Url, opts: Opts) -> Result {
if opts.burst.is_some() {
return make_burst_call(url, &opts).await
return make_burst_call(url, &opts).await;
}
let (frame_reader, frame_writer) = login(url).await?;
let res = if opts.tunnel.is_some() {
Expand Down

0 comments on commit 960f106

Please sign in to comment.