From 7b2d860c14c848b4e68eea656948ec21e8d837c9 Mon Sep 17 00:00:00 2001 From: Fanda Vacek Date: Wed, 27 Dec 2023 16:41:29 +0100 Subject: [PATCH] shvcall more output formats --- examples/device.rs | 9 +-- src/bin/shvbroker/main.rs | 18 ++++-- src/bin/shvcall.rs | 127 ++++++++++++++++++++++++-------------- src/rpc.rs | 6 ++ src/rpcmessage.rs | 9 ++- tests/common/mod.rs | 51 ++++++++++++--- tests/test_broker.rs | 44 ++++++------- tests/test_shvcall.rs | 11 ++-- 8 files changed, 180 insertions(+), 95 deletions(-) diff --git a/examples/device.rs b/examples/device.rs index 9ede2ef..163a267 100644 --- a/examples/device.rs +++ b/examples/device.rs @@ -115,12 +115,13 @@ async fn try_main(url: &Url, opts: &Opts) -> shv::Result<()> { let mut resp = RpcMessage::from_meta(meta); match result { Ok((value, signal)) => { - resp.set_result(value); - shv::connection::send_message(&mut writer, &resp).await?; + // send signal before response, node.process_request(...) can also eventually send other signals if let Some(signal) = signal { let sig = RpcMessage::new_signal(shv_path, signal.method, Some(signal.value)); shv::connection::send_message(&mut writer, &sig).await?; } + resp.set_result(value); + shv::connection::send_message(&mut writer, &resp).await?; } Err(errmsg) => { resp.set_error(errmsg); @@ -157,10 +158,10 @@ impl ShvNode for IntPropertyNode { if v.is_int() { let v = v.as_i32(); if v == state.int_prop { - Ok((RpcValue::from(false), None)) + Ok((RpcValue::from(()), None)) } else { state.int_prop = v; - Ok((RpcValue::from(true), Some(Signal{ value: v.into(), method: SIG_CHNG }))) + Ok((RpcValue::from(()), Some(Signal{ value: v.into(), method: SIG_CHNG }))) } } else { Err(RpcError::new(RpcErrorCode::InvalidParam, "Invalid parameter")) diff --git a/src/bin/shvbroker/main.rs b/src/bin/shvbroker/main.rs index 390ce69..062622f 100644 --- a/src/bin/shvbroker/main.rs +++ b/src/bin/shvbroker/main.rs @@ -8,7 +8,7 @@ use rand::distributions::{Alphanumeric, DistString}; use log::*; use structopt::StructOpt; use shv::rpcframe::{RpcFrame}; -use shv::{RpcMessage, RpcValue, rpcvalue, util}; +use shv::{List, RpcMessage, RpcValue, rpcvalue, util}; use shv::rpcmessage::{CliId, RpcError, RpcErrorCode}; use shv::RpcMessageMetaTags; use simple_logger::SimpleLogger; @@ -419,11 +419,12 @@ impl Broker { } } fn peer_to_info(client_id: CliId, peer: &Peer) -> rpcvalue::Map { + let subs: List = peer.subscriptions.iter().map(|subs| subs.to_rpcvalue()).collect(); rpcvalue::Map::from([ ("clientId".to_string(), client_id.into()), ("userName".to_string(), RpcValue::from(&peer.user)), ("mountPoint".to_string(), RpcValue::from(peer.mount_point.clone().unwrap_or_default())), - ("subscriptions".to_string(), "NIY".into()), + ("subscriptions".to_string(), subs.into()), ] ) } @@ -454,11 +455,11 @@ impl Broker { RpcValue::from(peer.mount_point.clone().unwrap()) } ).collect() } - pub fn subscribe(&mut self, client_id: CliId, subscription: Subscription) -> Result { + pub fn subscribe(&mut self, client_id: CliId, subscription: Subscription) -> Result<()> { log!(target: "Subscr", Level::Debug, "New subscription for client id: {} - {}", client_id, &subscription); let peer = self.peers.get_mut(&client_id).expect("client ID must be valid here"); peer.subscriptions.push(subscription); - Ok(true) + Ok(()) } pub fn unsubscribe(&mut self, client_id: CliId, subscription: &Subscription) -> Result { let peer = self.peers.get_mut(&client_id).expect("client ID must be valid here"); @@ -529,16 +530,21 @@ async fn broker_loop(events: Receiver) { }; if let Some(result) = result { if let Ok(meta) = response_meta { + let peer = broker.peers.get(&client_id).unwrap(); let mut resp = RpcMessage::from_meta(meta); match result { - Ok((value, _signal)) => { + Ok((value, signal)) => { + // send signal before response, node.process_request(...) can also eventually send other signals + if let Some(signal) = signal { + let sig = RpcMessage::new_signal(&shv_path, signal.method, Some(signal.value)); + peer.sender.send(PeerEvent::Message(sig)).await.unwrap(); + } resp.set_result(value); } Err(err) => { resp.set_error(err); } } - let peer = broker.peers.get(&client_id).unwrap(); peer.sender.send(PeerEvent::Message(resp)).await.unwrap(); } } diff --git a/src/bin/shvcall.rs b/src/bin/shvcall.rs index d3e01f8..8f349f4 100644 --- a/src/bin/shvcall.rs +++ b/src/bin/shvcall.rs @@ -2,14 +2,14 @@ use async_std::{io, prelude::*}; use structopt::StructOpt; use async_std::io::{BufReader}; use async_std::net::TcpStream; -use shv::{client, RpcMessage, RpcValue}; +use shv::{client, RpcMessage, RpcMessageMetaTags, RpcValue}; use async_std::task; use log::*; use percent_encoding::percent_decode; use simple_logger::SimpleLogger; use url::Url; use shv::client::LoginParams; -use shv::connection::FrameReader; +use shv::rpcmessage::{RqId}; use shv::util::parse_log_verbosity; type Result = shv::Result<()>; @@ -26,17 +26,28 @@ struct Opts { method: Option, #[structopt(short = "a", long = "param")] param: Option, - /// Write only result, trim RPC message - #[structopt(short = "r", long = "trim-meta")] - trim_meta: bool, + /// Output format: [ cpon | chainpack | simple | value ], default is 'cpon' + #[structopt(short = "o", long = "output-format", default_value = "cpon")] + output_format: String, /// Verbose mode (module, .) #[structopt(short = "v", long = "verbose")] verbose: Option, - ///Output as Chainpack instead of default CPON - #[structopt(short = "-x", long = "--chainpack")] - chainpack: bool, } - +enum OutputFormat { + Cpon, + ChainPack, + Simple, + Value, +} +impl From<&str> for OutputFormat { + fn from(value: &str) -> Self { + let s = value.to_ascii_lowercase(); + if "chainpack".starts_with(&s) { return Self::ChainPack } + if "simple".starts_with(&s) { return Self::Simple } + if "value".starts_with(&s) { return Self::Value } + Self::Cpon + } +} // const DEFAULT_RPC_TIMEOUT_MSEC: u64 = 5000; pub(crate) fn main() -> Result { let opts = Opts::from_args(); @@ -79,35 +90,57 @@ async fn make_call(url: &Url, opts: &Opts) -> Result { client::login(&mut frame_reader, &mut writer, &login_params).await?; info!("Connected to broker."); - async fn print_resp(stdout: &mut io::Stdout, resp: &RpcMessage, to_chainpack: bool, trim_meta: bool) -> Result { - let rpcval = if resp.is_success() { - if trim_meta { - resp.result().expect("result should be set").clone() - } else { - resp.as_rpcvalue().clone() + async fn print_resp(stdout: &mut io::Stdout, resp: &RpcMessage, output_format: OutputFormat) -> Result { + let bytes = match output_format { + OutputFormat::Cpon => { + let mut s = resp.as_rpcvalue().to_cpon(); + s.push('\n'); + s.as_bytes().to_owned() + } + OutputFormat::ChainPack => { + resp.as_rpcvalue().to_chainpack().to_owned() + } + OutputFormat::Simple => { + let s = if resp.is_request() { + format!("REQ {}:{} {}\n", resp.shv_path().unwrap_or_default(), resp.method().unwrap_or_default(), resp.param().unwrap_or_default().to_cpon()) + } else if resp.is_response() { + match resp.result() { + Ok(res) => { + format!("RES {}\n", res.to_cpon()) + } + Err(err) => { + format!("ERR {}\n", err.to_string()) + } + } + } else { + format!("SIG {}:{} {}\n", resp.shv_path().unwrap_or_default(), resp.method().unwrap_or_default(), resp.param().unwrap_or_default().to_cpon()) + }; + s.as_bytes().to_owned() + } + OutputFormat::Value => { + let mut s = if resp.is_request() { + resp.param().unwrap_or_default().to_cpon() + } else if resp.is_response() { + match resp.result() { + Ok(res) => { + res.to_cpon() + } + Err(err) => { + err.to_string() + } + } + } else { + resp.param().unwrap_or_default().to_cpon() + }; + s.push('\n'); + s.as_bytes().to_owned() } - } else { - resp.error().expect("error should be set").to_rpcvalue() }; - if to_chainpack { - let bytes = rpcval.to_chainpack(); - stdout.write_all(&bytes).await?; - Ok(stdout.flush().await?) - } else { - let bytes = rpcval.to_cpon().into_bytes(); - stdout.write_all(&bytes).await?; - Ok(stdout.write_all("\n".as_bytes()).await?) - } - } - async fn print_error(stdout: &mut io::Stdout, err: &str, to_chainpack: bool) -> Result { - if to_chainpack { - Ok(()) - } else { - stdout.write_all(err.as_bytes()).await?; - Ok(stdout.write_all("\n".as_bytes()).await?) - } + stdout.write_all(&bytes).await?; + Ok(stdout.flush().await?) } - async fn send_request(mut writer: &TcpStream, reader: &mut FrameReader<'_, BufReader<&TcpStream>>, path: &str, method: &str, param: &str) -> shv::Result { + + async fn send_request(mut writer: &TcpStream, path: &str, method: &str, param: &str) -> shv::Result { let param = if param.is_empty() { None } else { @@ -115,11 +148,9 @@ async fn make_call(url: &Url, opts: &Opts) -> Result { }; let rpcmsg = RpcMessage::new_request(path, method, param); shv::connection::send_message(&mut writer, &rpcmsg).await?; - - let resp = reader.receive_message().await?.ok_or("Receive error")?; - Ok(resp) - + Ok(rpcmsg.request_id().expect("Request ID should exist here.")) } + if opts.path.is_none() && opts.method.is_some() { return Err("--path parameter missing".into()) } @@ -139,8 +170,7 @@ async fn make_call(url: &Url, opts: &Opts) -> Result { } else { let method_ix = match line.find(':') { None => { - print_error(&mut stdout, &format!("Invalid line format, method not found: {line}"), opts.chainpack).await?; - break; + return Err(format!("Invalid line format, method not found: {line}").into()); } Some(ix) => { ix } }; @@ -150,8 +180,14 @@ async fn make_call(url: &Url, opts: &Opts) -> Result { None => { (line[method_ix + 1 .. ].trim(), "") } Some(ix) => { (line[method_ix + 1 .. ix].trim(), line[ix + 1 ..].trim()) } }; - let resp = send_request(writer, &mut frame_reader, &path, &method, ¶m).await?; - print_resp(&mut stdout, &resp, opts.chainpack, opts.trim_meta).await?; + let rqid = send_request(writer, &path, &method, ¶m).await?; + loop { + let resp = frame_reader.receive_message().await?.ok_or("Receive error")?; + print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?; + if resp.is_response() && resp.request_id().unwrap_or_default() == rqid { + break; + } + } } } Err(err) => { return Err(format!("Read line error: {err}").into()) } @@ -161,8 +197,9 @@ async fn make_call(url: &Url, opts: &Opts) -> Result { let path = opts.path.clone().unwrap_or_default(); let method = opts.method.clone().unwrap_or_default(); let param = opts.param.clone().unwrap_or_default(); - let resp = send_request(writer, &mut frame_reader, &path, &method, ¶m).await?; - print_resp(&mut stdout, &resp, opts.chainpack, opts.trim_meta).await?; + send_request(writer, &path, &method, ¶m).await?; + let resp = frame_reader.receive_message().await?.ok_or("Receive error")?; + print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?; } Ok(()) diff --git a/src/rpc.rs b/src/rpc.rs index 3281bd0..3db9209 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -36,6 +36,12 @@ impl Subscription { let paths = m.get("path").unwrap_or(m.get("paths").unwrap_or_default()).as_str(); Self::new(paths, methods) } + pub fn to_rpcvalue(&self) -> RpcValue { + let mut m = Map::new(); + m.insert("paths".into(), self.paths.as_str().into()); + m.insert("methods".into(), self.methods.as_str().into()); + RpcValue::from(m) + } } impl Display for Subscription { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { diff --git a/src/rpcmessage.rs b/src/rpcmessage.rs index 7cfa40d..be149f8 100644 --- a/src/rpcmessage.rs +++ b/src/rpcmessage.rs @@ -429,7 +429,14 @@ impl RpcError { RpcValue::from(m) } } - +impl Default for RpcError { + fn default() -> Self { + RpcError { + code: RpcErrorCode::NoError, + message: "".to_string(), + } + } +} impl Debug for RpcError { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { write!(f, "code: {}, message: {}", self.code, self.message) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index b6675cc..cae0fd7 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -46,14 +46,27 @@ pub fn text_from_output(output: Output) -> shv::Result { let bytes = bytes_from_output(output)?; Ok(String::from_utf8(bytes)?) } -pub fn value_list_from_output(output: Output) -> shv::Result> { +pub fn string_list_from_output(output: Output) -> shv::Result> { let bytes = text_from_output(output)?; let mut values = Vec::new(); for cpon in bytes.split(|b| b == '\n').filter(|line| !line.is_empty()) { - values.push(RpcValue::from_cpon(cpon)?); + values.push(cpon.trim().to_owned()); } Ok(values) } +//pub fn value_list_from_output(output: Output) -> shv::Result> { +// let mut values = List::new(); +// let bytes = bytes_from_output(output)?; +// let mut buff: &[u8] = &bytes; +// let mut rd = CponReader::new(&mut buff); +// loop { +// match rd.read() { +// Ok(rv) => { values.push(rv) } +// Err(_) => { break; } +// } +// } +// Ok(values) +//} pub fn result_from_output(output: Output) -> shv::Result { let msg = rpcmsg_from_output(output)?; let result = msg.result()?; @@ -61,6 +74,23 @@ pub fn result_from_output(output: Output) -> shv::Result { //assert_eq!(result, expected_value); Ok(result.clone()) } +#[allow(dead_code)] +pub enum ShvCallOutputFormat { + Cpon, + ChainPack, + Simple, + Value, +} +impl ShvCallOutputFormat { + fn as_str(&self) -> &'static str { + match self { + ShvCallOutputFormat::Cpon => { "cpon" } + ShvCallOutputFormat::ChainPack => { "chainpack" } + ShvCallOutputFormat::Simple => { "simple" } + ShvCallOutputFormat::Value => { "value" } + } + } +} pub fn shv_call(path: &str, method: &str, param: &str) -> shv::Result { let output = Command::new("target/debug/shvcall") .arg("-v").arg(".:T") @@ -68,18 +98,19 @@ pub fn shv_call(path: &str, method: &str, param: &str) -> shv::Result .arg("--path").arg(path) .arg("--method").arg(method) .arg("--param").arg(param) + //.arg("--output-format").arg(output_format.as_str()) .output()?; result_from_output(output) } -pub fn shv_call_many(calls: Vec) -> shv::Result> { - let mut child = Command::new("target/debug/shvcall") - .arg("--url").arg("tcp://admin:admin@localhost") - .stdin(Stdio::piped()) +pub fn shv_call_many(calls: Vec, output_format: ShvCallOutputFormat) -> shv::Result> { + let mut cmd = Command::new("target/debug/shvcall"); + cmd.stdin(Stdio::piped()) .stdout(Stdio::piped()) - .arg("-r") - .arg("-v").arg(".:I") - .spawn()?; + .arg("--url").arg("tcp://admin:admin@localhost") + .arg("--output-format").arg(output_format.as_str()) + .arg("-v").arg(".:I"); + let mut child = cmd.spawn()?; let mut stdin = child.stdin.take().expect("shvcall should be running"); thread::spawn(move || { for line in calls { @@ -88,5 +119,5 @@ pub fn shv_call_many(calls: Vec) -> shv::Result> { } }); let output = child.wait_with_output()?; - value_list_from_output(output) + string_list_from_output(output) } \ No newline at end of file diff --git a/tests/test_broker.rs b/tests/test_broker.rs index 9e967ca..5694410 100644 --- a/tests/test_broker.rs +++ b/tests/test_broker.rs @@ -1,11 +1,11 @@ extern crate shv; -use std::process::{Command, Output}; +use std::process::{Command}; use std::{thread, time::Duration}; -use shv::{metamethod, RpcMessage, RpcValue, rpcvalue}; +use shv::{metamethod, RpcValue, rpcvalue}; use shv::metamethod::{Flag, MetaMethod}; use shv::shvnode::{METH_DIR, METH_LS, METH_NAME, METH_PING}; -use crate::common::{KillProcessGuard, shv_call}; +use crate::common::{KillProcessGuard, shv_call, shv_call_many, ShvCallOutputFormat}; mod common; @@ -96,27 +96,23 @@ fn test_broker() -> shv::Result<()> { println!("---broker---: .app/broker:mounts()"); assert_eq!(shv_call(".app/broker", "mounts", "")?, vec![RpcValue::from("test/device")].into()); { - //println!("====== subscriptions ====="); - //let sig_trap_proc = Command::new("target/debug/shvcall") - // .arg("--url").arg("tcp://admin:admin@localhost") - // .arg("--signal-trap") - // .arg("--path").arg("test/**") - // .arg("--method").arg("") - // .arg("--timeout").arg("1s") - // .arg("-v").arg("RpcMsg") - // .spawn()?; - //call("test/device/number", "set", "1234")?; - //let output = sig_trap_proc.wait_with_output()?; - //match output.status.code() { - // Some(code) => println!("Exited with status code: {code}"), - // None => println!("Process terminated by signal") - //} - //println!("STATUS {:?}", output.status.code()); - //assert!(output.status.success()); - //let msg = rpcmsg_from_output(output)?; - //assert_eq!(msg.shv_path().ok_or("shv path missing")?, "test/device/number"); - //assert_eq!(msg.method().ok_or("method missing")?, "chng"); - //assert_eq!(msg.param().ok_or("param missing")?.as_i32(), 1234); + println!("====== subscriptions ====="); + let calls: Vec = vec![ + r#".app/broker/currentClient:subscribe {"methods": "chng", "paths": "test/**"}"#.into(), + r#"test/device/number:set 42"#.into(), + ]; + let values = shv_call_many(calls, ShvCallOutputFormat::Simple)?; + for v in values.iter() { + println!("\t{}", v); + } + let expected: Vec<&str> = vec![ + "RES null", // response to subscribe + "SIG test/device/number:chng 42", // SIG chng + "RES null", // response to subscribe + ]; + for (no, val) in values.iter().enumerate() { + assert_eq!(expected[no], val); + } } Ok(()) } diff --git a/tests/test_shvcall.rs b/tests/test_shvcall.rs index 27c8b48..9a5e9fc 100644 --- a/tests/test_shvcall.rs +++ b/tests/test_shvcall.rs @@ -1,11 +1,10 @@ extern crate shv; -use std::io::Write; -use std::process::{Command, Stdio}; +use std::process::{Command}; use std::thread; use std::time::Duration; use shv::{RpcValue}; -use crate::common::{shv_call, shv_call_many, value_list_from_output}; +use crate::common::{shv_call, shv_call_many, ShvCallOutputFormat}; mod common; @@ -33,14 +32,16 @@ fn test_call_ping_stdin() -> shv::Result<()> { thread::sleep(Duration::from_millis(100)); assert!(broker_process_guard.is_running()); + println!("---shvcall---: .app/ping()"); assert_eq!(shv_call(".app", "ping", "")?, RpcValue::null()); + println!("---shvcall---: .app/name()"); let calls: Vec = vec![ ".app:ping".into(), ".app:name".into(), ]; - let values = shv_call_many(calls)?; - let expected: [RpcValue; 2] = [RpcValue::null(), RpcValue::from("shvbroker")]; + let values = shv_call_many(calls, ShvCallOutputFormat::Value)?; + let expected = vec!["null", r#""shvbroker""#]; for (no, val) in values.iter().enumerate() { assert_eq!(&expected[no], val); }