Skip to content

Commit

Permalink
shvcall more output formats
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Dec 27, 2023
1 parent b23bbd8 commit 7b2d860
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 95 deletions.
9 changes: 5 additions & 4 deletions examples/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,13 @@ async fn try_main(url: &Url, opts: &Opts) -> shv::Result<()> {
let mut resp = RpcMessage::from_meta(meta);
match result {
Ok((value, signal)) => {
resp.set_result(value);
shv::connection::send_message(&mut writer, &resp).await?;
// send signal before response, node.process_request(...) can also eventually send other signals
if let Some(signal) = signal {
let sig = RpcMessage::new_signal(shv_path, signal.method, Some(signal.value));
shv::connection::send_message(&mut writer, &sig).await?;
}
resp.set_result(value);
shv::connection::send_message(&mut writer, &resp).await?;
}
Err(errmsg) => {
resp.set_error(errmsg);
Expand Down Expand Up @@ -157,10 +158,10 @@ impl ShvNode<DeviceState> for IntPropertyNode {
if v.is_int() {
let v = v.as_i32();
if v == state.int_prop {
Ok((RpcValue::from(false), None))
Ok((RpcValue::from(()), None))
} else {
state.int_prop = v;
Ok((RpcValue::from(true), Some(Signal{ value: v.into(), method: SIG_CHNG })))
Ok((RpcValue::from(()), Some(Signal{ value: v.into(), method: SIG_CHNG })))
}
} else {
Err(RpcError::new(RpcErrorCode::InvalidParam, "Invalid parameter"))
Expand Down
18 changes: 12 additions & 6 deletions src/bin/shvbroker/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use rand::distributions::{Alphanumeric, DistString};
use log::*;
use structopt::StructOpt;
use shv::rpcframe::{RpcFrame};
use shv::{RpcMessage, RpcValue, rpcvalue, util};
use shv::{List, RpcMessage, RpcValue, rpcvalue, util};
use shv::rpcmessage::{CliId, RpcError, RpcErrorCode};
use shv::RpcMessageMetaTags;
use simple_logger::SimpleLogger;
Expand Down Expand Up @@ -419,11 +419,12 @@ impl Broker {
}
}
fn peer_to_info(client_id: CliId, peer: &Peer) -> rpcvalue::Map {
let subs: List = peer.subscriptions.iter().map(|subs| subs.to_rpcvalue()).collect();
rpcvalue::Map::from([
("clientId".to_string(), client_id.into()),
("userName".to_string(), RpcValue::from(&peer.user)),
("mountPoint".to_string(), RpcValue::from(peer.mount_point.clone().unwrap_or_default())),
("subscriptions".to_string(), "NIY".into()),
("subscriptions".to_string(), subs.into()),
]
)
}
Expand Down Expand Up @@ -454,11 +455,11 @@ impl Broker {
RpcValue::from(peer.mount_point.clone().unwrap())
} ).collect()
}
pub fn subscribe(&mut self, client_id: CliId, subscription: Subscription) -> Result<bool> {
pub fn subscribe(&mut self, client_id: CliId, subscription: Subscription) -> Result<()> {
log!(target: "Subscr", Level::Debug, "New subscription for client id: {} - {}", client_id, &subscription);
let peer = self.peers.get_mut(&client_id).expect("client ID must be valid here");
peer.subscriptions.push(subscription);
Ok(true)
Ok(())
}
pub fn unsubscribe(&mut self, client_id: CliId, subscription: &Subscription) -> Result<bool> {
let peer = self.peers.get_mut(&client_id).expect("client ID must be valid here");
Expand Down Expand Up @@ -529,16 +530,21 @@ async fn broker_loop(events: Receiver<ClientEvent>) {
};
if let Some(result) = result {
if let Ok(meta) = response_meta {
let peer = broker.peers.get(&client_id).unwrap();
let mut resp = RpcMessage::from_meta(meta);
match result {
Ok((value, _signal)) => {
Ok((value, signal)) => {
// send signal before response, node.process_request(...) can also eventually send other signals
if let Some(signal) = signal {
let sig = RpcMessage::new_signal(&shv_path, signal.method, Some(signal.value));
peer.sender.send(PeerEvent::Message(sig)).await.unwrap();
}
resp.set_result(value);
}
Err(err) => {
resp.set_error(err);
}
}
let peer = broker.peers.get(&client_id).unwrap();
peer.sender.send(PeerEvent::Message(resp)).await.unwrap();
}
}
Expand Down
127 changes: 82 additions & 45 deletions src/bin/shvcall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use async_std::{io, prelude::*};
use structopt::StructOpt;
use async_std::io::{BufReader};
use async_std::net::TcpStream;
use shv::{client, RpcMessage, RpcValue};
use shv::{client, RpcMessage, RpcMessageMetaTags, RpcValue};
use async_std::task;
use log::*;
use percent_encoding::percent_decode;
use simple_logger::SimpleLogger;
use url::Url;
use shv::client::LoginParams;
use shv::connection::FrameReader;
use shv::rpcmessage::{RqId};
use shv::util::parse_log_verbosity;

type Result = shv::Result<()>;
Expand All @@ -26,17 +26,28 @@ struct Opts {
method: Option<String>,
#[structopt(short = "a", long = "param")]
param: Option<String>,
/// Write only result, trim RPC message
#[structopt(short = "r", long = "trim-meta")]
trim_meta: bool,
/// Output format: [ cpon | chainpack | simple | value ], default is 'cpon'
#[structopt(short = "o", long = "output-format", default_value = "cpon")]
output_format: String,
/// Verbose mode (module, .)
#[structopt(short = "v", long = "verbose")]
verbose: Option<String>,
///Output as Chainpack instead of default CPON
#[structopt(short = "-x", long = "--chainpack")]
chainpack: bool,
}

enum OutputFormat {
Cpon,
ChainPack,
Simple,
Value,
}
impl From<&str> for OutputFormat {
fn from(value: &str) -> Self {
let s = value.to_ascii_lowercase();
if "chainpack".starts_with(&s) { return Self::ChainPack }
if "simple".starts_with(&s) { return Self::Simple }
if "value".starts_with(&s) { return Self::Value }
Self::Cpon
}
}
// const DEFAULT_RPC_TIMEOUT_MSEC: u64 = 5000;
pub(crate) fn main() -> Result {
let opts = Opts::from_args();
Expand Down Expand Up @@ -79,47 +90,67 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {

client::login(&mut frame_reader, &mut writer, &login_params).await?;
info!("Connected to broker.");
async fn print_resp(stdout: &mut io::Stdout, resp: &RpcMessage, to_chainpack: bool, trim_meta: bool) -> Result {
let rpcval = if resp.is_success() {
if trim_meta {
resp.result().expect("result should be set").clone()
} else {
resp.as_rpcvalue().clone()
async fn print_resp(stdout: &mut io::Stdout, resp: &RpcMessage, output_format: OutputFormat) -> Result {
let bytes = match output_format {
OutputFormat::Cpon => {
let mut s = resp.as_rpcvalue().to_cpon();
s.push('\n');
s.as_bytes().to_owned()
}
OutputFormat::ChainPack => {
resp.as_rpcvalue().to_chainpack().to_owned()
}
OutputFormat::Simple => {
let s = if resp.is_request() {
format!("REQ {}:{} {}\n", resp.shv_path().unwrap_or_default(), resp.method().unwrap_or_default(), resp.param().unwrap_or_default().to_cpon())
} else if resp.is_response() {
match resp.result() {
Ok(res) => {
format!("RES {}\n", res.to_cpon())
}
Err(err) => {
format!("ERR {}\n", err.to_string())
}
}
} else {
format!("SIG {}:{} {}\n", resp.shv_path().unwrap_or_default(), resp.method().unwrap_or_default(), resp.param().unwrap_or_default().to_cpon())
};
s.as_bytes().to_owned()
}
OutputFormat::Value => {
let mut s = if resp.is_request() {
resp.param().unwrap_or_default().to_cpon()
} else if resp.is_response() {
match resp.result() {
Ok(res) => {
res.to_cpon()
}
Err(err) => {
err.to_string()
}
}
} else {
resp.param().unwrap_or_default().to_cpon()
};
s.push('\n');
s.as_bytes().to_owned()
}
} else {
resp.error().expect("error should be set").to_rpcvalue()
};
if to_chainpack {
let bytes = rpcval.to_chainpack();
stdout.write_all(&bytes).await?;
Ok(stdout.flush().await?)
} else {
let bytes = rpcval.to_cpon().into_bytes();
stdout.write_all(&bytes).await?;
Ok(stdout.write_all("\n".as_bytes()).await?)
}
}
async fn print_error(stdout: &mut io::Stdout, err: &str, to_chainpack: bool) -> Result {
if to_chainpack {
Ok(())
} else {
stdout.write_all(err.as_bytes()).await?;
Ok(stdout.write_all("\n".as_bytes()).await?)
}
stdout.write_all(&bytes).await?;
Ok(stdout.flush().await?)
}
async fn send_request(mut writer: &TcpStream, reader: &mut FrameReader<'_, BufReader<&TcpStream>>, path: &str, method: &str, param: &str) -> shv::Result<RpcMessage> {

async fn send_request(mut writer: &TcpStream, path: &str, method: &str, param: &str) -> shv::Result<RqId> {
let param = if param.is_empty() {
None
} else {
Some(RpcValue::from_cpon(param)?)
};
let rpcmsg = RpcMessage::new_request(path, method, param);
shv::connection::send_message(&mut writer, &rpcmsg).await?;

let resp = reader.receive_message().await?.ok_or("Receive error")?;
Ok(resp)

Ok(rpcmsg.request_id().expect("Request ID should exist here."))
}

if opts.path.is_none() && opts.method.is_some() {
return Err("--path parameter missing".into())
}
Expand All @@ -139,8 +170,7 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {
} else {
let method_ix = match line.find(':') {
None => {
print_error(&mut stdout, &format!("Invalid line format, method not found: {line}"), opts.chainpack).await?;
break;
return Err(format!("Invalid line format, method not found: {line}").into());
}
Some(ix) => { ix }
};
Expand All @@ -150,8 +180,14 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {
None => { (line[method_ix + 1 .. ].trim(), "") }
Some(ix) => { (line[method_ix + 1 .. ix].trim(), line[ix + 1 ..].trim()) }
};
let resp = send_request(writer, &mut frame_reader, &path, &method, &param).await?;
print_resp(&mut stdout, &resp, opts.chainpack, opts.trim_meta).await?;
let rqid = send_request(writer, &path, &method, &param).await?;
loop {
let resp = frame_reader.receive_message().await?.ok_or("Receive error")?;
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
if resp.is_response() && resp.request_id().unwrap_or_default() == rqid {
break;
}
}
}
}
Err(err) => { return Err(format!("Read line error: {err}").into()) }
Expand All @@ -161,8 +197,9 @@ async fn make_call(url: &Url, opts: &Opts) -> Result {
let path = opts.path.clone().unwrap_or_default();
let method = opts.method.clone().unwrap_or_default();
let param = opts.param.clone().unwrap_or_default();
let resp = send_request(writer, &mut frame_reader, &path, &method, &param).await?;
print_resp(&mut stdout, &resp, opts.chainpack, opts.trim_meta).await?;
send_request(writer, &path, &method, &param).await?;
let resp = frame_reader.receive_message().await?.ok_or("Receive error")?;
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
}

Ok(())
Expand Down
6 changes: 6 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ impl Subscription {
let paths = m.get("path").unwrap_or(m.get("paths").unwrap_or_default()).as_str();
Self::new(paths, methods)
}
pub fn to_rpcvalue(&self) -> RpcValue {
let mut m = Map::new();
m.insert("paths".into(), self.paths.as_str().into());
m.insert("methods".into(), self.methods.as_str().into());
RpcValue::from(m)
}
}
impl Display for Subscription {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Expand Down
9 changes: 8 additions & 1 deletion src/rpcmessage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,14 @@ impl RpcError {
RpcValue::from(m)
}
}

impl Default for RpcError {
fn default() -> Self {
RpcError {
code: RpcErrorCode::NoError,
message: "".to_string(),
}
}
}
impl Debug for RpcError {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "code: {}, message: {}", self.code, self.message)
Expand Down
51 changes: 41 additions & 10 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,40 +46,71 @@ pub fn text_from_output(output: Output) -> shv::Result<String> {
let bytes = bytes_from_output(output)?;
Ok(String::from_utf8(bytes)?)
}
pub fn value_list_from_output(output: Output) -> shv::Result<Vec<RpcValue>> {
pub fn string_list_from_output(output: Output) -> shv::Result<Vec<String>> {
let bytes = text_from_output(output)?;
let mut values = Vec::new();
for cpon in bytes.split(|b| b == '\n').filter(|line| !line.is_empty()) {
values.push(RpcValue::from_cpon(cpon)?);
values.push(cpon.trim().to_owned());
}
Ok(values)
}
//pub fn value_list_from_output(output: Output) -> shv::Result<Vec<RpcValue>> {
// let mut values = List::new();
// let bytes = bytes_from_output(output)?;
// let mut buff: &[u8] = &bytes;
// let mut rd = CponReader::new(&mut buff);
// loop {
// match rd.read() {
// Ok(rv) => { values.push(rv) }
// Err(_) => { break; }
// }
// }
// Ok(values)
//}
pub fn result_from_output(output: Output) -> shv::Result<RpcValue> {
let msg = rpcmsg_from_output(output)?;
let result = msg.result()?;
//println!("cpon: {}, expected: {}", result, expected_value.to_cpon());
//assert_eq!(result, expected_value);
Ok(result.clone())
}
#[allow(dead_code)]
pub enum ShvCallOutputFormat {
Cpon,
ChainPack,
Simple,
Value,
}
impl ShvCallOutputFormat {
fn as_str(&self) -> &'static str {
match self {
ShvCallOutputFormat::Cpon => { "cpon" }
ShvCallOutputFormat::ChainPack => { "chainpack" }
ShvCallOutputFormat::Simple => { "simple" }
ShvCallOutputFormat::Value => { "value" }
}
}
}
pub fn shv_call(path: &str, method: &str, param: &str) -> shv::Result<RpcValue> {
let output = Command::new("target/debug/shvcall")
.arg("-v").arg(".:T")
.arg("--url").arg("tcp://admin:admin@localhost")
.arg("--path").arg(path)
.arg("--method").arg(method)
.arg("--param").arg(param)
//.arg("--output-format").arg(output_format.as_str())
.output()?;

result_from_output(output)
}
pub fn shv_call_many(calls: Vec<String>) -> shv::Result<Vec<RpcValue>> {
let mut child = Command::new("target/debug/shvcall")
.arg("--url").arg("tcp://admin:admin@localhost")
.stdin(Stdio::piped())
pub fn shv_call_many(calls: Vec<String>, output_format: ShvCallOutputFormat) -> shv::Result<Vec<String>> {
let mut cmd = Command::new("target/debug/shvcall");
cmd.stdin(Stdio::piped())
.stdout(Stdio::piped())
.arg("-r")
.arg("-v").arg(".:I")
.spawn()?;
.arg("--url").arg("tcp://admin:admin@localhost")
.arg("--output-format").arg(output_format.as_str())
.arg("-v").arg(".:I");
let mut child = cmd.spawn()?;
let mut stdin = child.stdin.take().expect("shvcall should be running");
thread::spawn(move || {
for line in calls {
Expand All @@ -88,5 +119,5 @@ pub fn shv_call_many(calls: Vec<String>) -> shv::Result<Vec<RpcValue>> {
}
});
let output = child.wait_with_output()?;
value_list_from_output(output)
string_list_from_output(output)
}
Loading

0 comments on commit 7b2d860

Please sign in to comment.