diff --git a/Cargo.toml b/Cargo.toml index 0689384..a7fc1e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shvcall" -version = "3.2.1" +version = "3.2.2" edition = "2021" [features] diff --git a/src/main.rs b/src/main.rs index 7ca8447..f0b9254 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use futures::{select, AsyncReadExt}; use futures::AsyncWriteExt; use log::*; use shvrpc::client::LoginParams; -use shvrpc::framerw::{FrameReader, FrameWriter, RpcFrameReception}; +use shvrpc::framerw::{FrameReader, FrameWriter}; use shvrpc::rpcmessage::{RqId}; use shvrpc::serialrw::{SerialFrameReader, SerialFrameWriter}; use shvrpc::streamrw::{StreamFrameReader, StreamFrameWriter}; @@ -383,40 +383,32 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr }; let param = opts.param.clone().unwrap_or_default(); let rqid = send_request(&mut *frame_writer, &path, &method, ¶m).await?; - async fn read_meta(frame_reader: &mut BoxedFrameReader, rq_id: RqId) { + async fn receive_response(frame_reader: &mut BoxedFrameReader, rq_id: RqId) -> shvrpc::Result { loop { - if let Ok(RpcFrameReception::MetaAnnouncement { response_id, ..}) = frame_reader.receive_frame_or_meta().await { - if let Some(response_id) = response_id { - if response_id == rq_id { - break; - } - } + let frame = frame_reader.receive_frame().await?; + if frame.is_response() && frame.request_id().unwrap_or_default() == rq_id { + return Ok(frame); } } } - let read_meta_fut = read_meta(&mut frame_reader, rqid); - if opts.timeout > 0 { - if read_meta_fut.timeout(Duration::from_millis(opts.timeout)).await.is_err() { - return Err(format!("Method call response timeout after {} msec.", opts.timeout).into()) + let res = if opts.timeout > 0 { + match receive_response(&mut frame_reader, rqid).timeout(Duration::from_millis(opts.timeout)).await { + Ok(maybe_frame) => { maybe_frame } + Err(_) => { + return Err(format!("Method call response timeout after {} msec.", opts.timeout).into()) + } } } else { - read_meta_fut.await + receive_response(&mut frame_reader, rqid).await }; - return match frame_reader.receive_frame_or_meta().await { + return match res { Ok(frame) => { - match frame { - RpcFrameReception::MetaAnnouncement { .. } => { - Err("Unexpected meta received.".into()) - } - RpcFrameReception::Frame(frame) => { - let resp = frame.to_rpcmesage()?; - print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?; - Ok(()) - } - } + let resp = frame.to_rpcmesage()?; + print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?; + Ok(()) } Err(e) => { - Err(e.into()) + Err(e) } } } @@ -483,7 +475,7 @@ async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: Boxed get_frame_fut = frame_reader.receive_frame().fuse(); } Err(e) => { - info!("RPC socket read error: {e}"); + info!("RPC socket read error: {}", shvrpc::Error::from(e)); break; } }