Skip to content

Commit

Permalink
shvcall can process input from stdin if called without --path and --m…
Browse files Browse the repository at this point in the history
…ethod
  • Loading branch information
Fanda Vacek committed Dec 23, 2023
1 parent 63ce857 commit cc972b6
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 51 deletions.
10 changes: 5 additions & 5 deletions src/bin/shvbroker/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,11 @@ pub(crate) fn main() -> Result<()> {
}
logger.init().unwrap();

trace!("trace message");
debug!("debug message");
info!("info message");
warn!("warn message");
error!("error message");
//trace!("trace message");
//debug!("debug message");
//info!("info message");
//warn!("warn message");
//error!("error message");
log!(target: "RpcMsg", Level::Debug, "RPC message");
log!(target: "Acl", Level::Debug, "ACL message");

Expand Down
6 changes: 3 additions & 3 deletions src/bin/shvbroker/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use shv::metamethod::{Access, Flag, MetaMethod};
use shv::{RpcMessage, RpcMessageMetaTags, RpcValue};
use shv::rpc::Subscription;
use shv::rpcmessage::RpcError;
use shv::shvnode::{DIR_LS_METHODS, ProcessRequestResult, ShvNode};
use shv::shvnode::{ProcessRequestResult, ShvNode};
use crate::{Broker};

const METH_CLIENT_INFO: &str = "clientInfo";
Expand All @@ -20,7 +20,7 @@ const APP_BROKER_METHODS: [MetaMethod; 4] = [
pub(crate) struct AppBrokerNode {}
impl ShvNode<crate::Broker> for AppBrokerNode {
fn methods(&self) -> Vec<&MetaMethod> {
DIR_LS_METHODS.iter().chain(APP_BROKER_METHODS.iter()).collect()
APP_BROKER_METHODS.iter().collect()
}

fn process_request(&mut self, rq: &RpcMessage, broker: &mut crate::Broker) -> ProcessRequestResult {
Expand Down Expand Up @@ -66,7 +66,7 @@ const METH_UNSUBSCRIBE: &str = "unsubscribe";
pub(crate) struct AppBrokerCurrentClientNode {}
impl ShvNode<Broker> for AppBrokerCurrentClientNode {
fn methods(&self) -> Vec<&MetaMethod> {
DIR_LS_METHODS.iter().chain(APP_BROKER_CURRENT_CLIENT_METHODS.iter()).collect()
APP_BROKER_CURRENT_CLIENT_METHODS.iter().collect()
}

fn process_request(&mut self, rq: &RpcMessage, broker: &mut Broker) -> ProcessRequestResult {
Expand Down
124 changes: 94 additions & 30 deletions src/bin/shvcall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use percent_encoding::percent_decode;
use simple_logger::SimpleLogger;
use url::Url;
use shv::client::LoginParams;
use shv::connection::FrameReader;
use shv::util::parse_log_verbosity;

type Result = shv::Result<()>;

Expand All @@ -18,12 +20,15 @@ struct Opts {
///Url to connect to, example tcp://localhost:3755, localsocket:path/to/socket
#[structopt(name = "url", short = "-s", long = "--url")]
url: String,
#[structopt(short = "-p", long = "--path")]
path: String,
#[structopt(short = "-m", long = "--method")]
method: String,
#[structopt(short = "-a", long = "--param")]
#[structopt(short = "p", long = "path")]
path: Option<String>,
#[structopt(short = "m", long = "method")]
method: Option<String>,
#[structopt(short = "a", long = "param")]
param: Option<String>,
/// Write only result, trim RPC message
#[structopt(short = "r", long = "trim-meta")]
trim_meta: bool,
/// Verbose mode (module, .)
#[structopt(short = "v", long = "verbose")]
verbose: Option<String>,
Expand All @@ -37,15 +42,10 @@ pub(crate) fn main() -> Result {
let opts = Opts::from_args();

let mut logger = SimpleLogger::new();
logger = logger.with_level(LevelFilter::Error);
logger = logger.with_level(LevelFilter::Info);
if let Some(module_names) = &opts.verbose {
for module_name in module_names.split(',') {
let module_name = if module_name == "." {
module_path!().to_string()
} else {
module_name.to_string()
};
logger = logger.with_module_level(&module_name, LevelFilter::Trace);
for (module, level) in parse_log_verbosity(&module_names, module_path!()) {
logger = logger.with_module_level(module, level);
}
}
logger.init().unwrap();
Expand Down Expand Up @@ -78,29 +78,93 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {
};

client::login(&mut frame_reader, &mut writer, &login_params).await?;

let param = match &opts.param {
None => None,
Some(p) => {
if p.is_empty() {
None
info!("Connected to broker.");
async fn print_resp(stdout: &mut io::Stdout, resp: &RpcMessage, to_chainpack: bool, trim_meta: bool) -> Result {
let rpcval = if resp.is_success() {
if trim_meta {
resp.result().expect("result should be set").clone()
} else {
Some(RpcValue::from_cpon(&p)?)
resp.as_rpcvalue().clone()
}
} else {
resp.error().expect("error should be set").to_rpcvalue()
};
if to_chainpack {
let bytes = rpcval.to_chainpack();
stdout.write_all(&bytes).await?;
Ok(stdout.flush().await?)
} else {
let bytes = rpcval.to_cpon().into_bytes();
stdout.write_all(&bytes).await?;
Ok(stdout.write_all("\n".as_bytes()).await?)
}
};
let rpcmsg = RpcMessage::new_request(&opts.path, &opts.method, param);
shv::connection::send_message(&mut writer, &rpcmsg).await?;
}
async fn print_error(stdout: &mut io::Stdout, err: &str, to_chainpack: bool) -> Result {
if to_chainpack {
Ok(())
} else {
stdout.write_all(err.as_bytes()).await?;
Ok(stdout.write_all("\n".as_bytes()).await?)
}
}
async fn send_request(mut writer: &TcpStream, reader: &mut FrameReader<'_, BufReader<&TcpStream>>, path: &str, method: &str, param: &str) -> shv::Result<RpcMessage> {
let param = if param.is_empty() {
None
} else {
Some(RpcValue::from_cpon(param)?)
};
let rpcmsg = RpcMessage::new_request(path, method, param);
shv::connection::send_message(&mut writer, &rpcmsg).await?;

let resp = frame_reader.receive_message().await?.ok_or("Receive error")?;
let resp = reader.receive_message().await?.ok_or("Receive error")?;
Ok(resp)

}
if opts.path.is_none() && opts.method.is_some() {
return Err("--path parameter missing".into())
}
if opts.path.is_some() && opts.method.is_none() {
return Err("--method parameter missing".into())
}
let mut stdout = io::stdout();
let response_bytes = if opts.chainpack {
resp.as_rpcvalue().to_chainpack()
if opts.path.is_none() && opts.method.is_none() {
let stdin = io::stdin();
loop {
let mut line = String::new();
match stdin.read_line(&mut line).await {
Ok(nbytes) => {
if nbytes == 0 {
// stream closed
break;
} else {
let method_ix = match line.find(':') {
None => {
print_error(&mut stdout, &format!("Invalid line format, method not found: {line}"), opts.chainpack).await?;
break;
}
Some(ix) => { ix }
};
let param_ix = line.find(' ');
let path = line[..method_ix].trim();
let (method, param) = match param_ix {
None => { (line[method_ix + 1 .. ].trim(), "") }
Some(ix) => { (line[method_ix + 1 .. ix].trim(), line[ix + 1 ..].trim()) }
};
let resp = send_request(writer, &mut frame_reader, &path, &method, &param).await?;
print_resp(&mut stdout, &resp, opts.chainpack, opts.trim_meta).await?;
}
}
Err(err) => { return Err(format!("Read line error: {err}").into()) }
}
}
} else {
resp.to_cpon().into_bytes()
};
stdout.write_all(&response_bytes).await?;
stdout.flush().await?;
let path = opts.path.clone().unwrap_or_default();
let method = opts.method.clone().unwrap_or_default();
let param = opts.param.clone().unwrap_or_default();
let resp = send_request(writer, &mut frame_reader, &path, &method, &param).await?;
print_resp(&mut stdout, &resp, opts.chainpack, opts.trim_meta).await?;
}

Ok(())
}

Expand Down
23 changes: 15 additions & 8 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,26 @@ impl Drop for KillProcessGuard {
}

pub fn rpcmsg_from_output(output: Output) -> shv::Result<RpcMessage> {
if !output.status.success() {
let errmsg = std::str::from_utf8(&output.stderr)?;
return Err(format!("Process exited with error code {:?}, stderr: {}", output.status.code(), errmsg).into());
}
let out = output.stdout;
let cpon = std::str::from_utf8(&out)?;
let rv = RpcValue::from_cpon(cpon)?;
let rv = rpcvalue_from_output(output)?;
Ok(RpcMessage::from_rpcvalue(rv)?)
}
pub fn result_from_output(output: Output) -> shv::Result<RpcValue> {
pub fn rpcvalue_from_output(output: Output) -> shv::Result<RpcValue> {
let out = bytes_from_output(output)?;
let cpon = std::str::from_utf8(&out)?;
Ok(RpcValue::from_cpon(cpon)?)
}
pub fn bytes_from_output(output: Output) -> shv::Result<Vec<u8>> {
if !output.status.success() {
let errmsg = std::str::from_utf8(&output.stderr)?;
return Err(format!("Process exited with error code {:?}, stderr: {}", output.status.code(), errmsg).into());
}
Ok(output.stdout)
}
pub fn text_from_output(output: Output) -> shv::Result<String> {
let bytes = bytes_from_output(output)?;
Ok(String::from_utf8(bytes)?)
}
pub fn result_from_output(output: Output) -> shv::Result<RpcValue> {
let msg = rpcmsg_from_output(output)?;
let result = msg.result()?;
//println!("cpon: {}, expected: {}", result, expected_value.to_cpon());
Expand All @@ -47,6 +53,7 @@ pub fn result_from_output(output: Output) -> shv::Result<RpcValue> {
}
pub fn shv_call(path: &str, method: &str, param: &str) -> shv::Result<RpcValue> {
let output = Command::new("target/debug/shvcall")
.arg("-v").arg(".:T")
.arg("--url").arg("tcp://admin:admin@localhost")
.arg("--path").arg(path)
.arg("--method").arg(method)
Expand Down
31 changes: 26 additions & 5 deletions tests/test_shvcall.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
extern crate shv;

use std::process::{Command};
use std::io::Write;
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;
use shv::{RpcValue};
use crate::common::shv_call;
use crate::common::{shv_call, text_from_output};

mod common;

Expand All @@ -25,15 +26,35 @@ fn test_cannot_connect() -> shv::Result<()> {
Ok(())
}
#[test]
fn test_call_ping() -> shv::Result<()> {
fn test_call_ping_stdin() -> shv::Result<()> {
let mut broker_process_guard = common::KillProcessGuard::new(Command::new("target/debug/shvbroker")
.arg("-v").arg(".:W")
//.arg("-v").arg("Acl")
.arg("-v").arg(".:I")
.spawn()?);
thread::sleep(Duration::from_millis(100));
assert!(broker_process_guard.is_running());

assert_eq!(shv_call(".app", "ping", "")?, RpcValue::null());

let mut child = Command::new("target/debug/shvcall")
.arg("--url").arg("tcp://admin:admin@localhost")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.arg("-r")
.arg("-v").arg(".:I")
.spawn()?;
let mut stdin = child.stdin.take().expect("shvcall should be running");
thread::spawn(move || {
stdin.write_all(".app:ping\n".as_bytes()).expect("Failed to write to stdin");
stdin.write_all(".app:name\n".as_bytes()).expect("Failed to write to stdin");
});
let output = child.wait_with_output()?;
let out = text_from_output(output)?;
let expected: [&str; 2] = ["null", "\"shvbroker\""];
for (no, line) in out.split(|b| b == '\n').enumerate() {
if no < expected.len() {
assert_eq!(expected[no], line);
}
}

Ok(())
}

0 comments on commit cc972b6

Please sign in to comment.