Skip to content

Commit

Permalink
Update libshvrpc
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Oct 14, 2024
1 parent 0feae34 commit 9da28cd
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "shvcall"
version = "3.2.1"
version = "3.2.2"
edition = "2021"

[features]
Expand Down
44 changes: 18 additions & 26 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use futures::{select, AsyncReadExt};
use futures::AsyncWriteExt;
use log::*;
use shvrpc::client::LoginParams;
use shvrpc::framerw::{FrameReader, FrameWriter, RpcFrameReception};
use shvrpc::framerw::{FrameReader, FrameWriter};
use shvrpc::rpcmessage::{RqId};
use shvrpc::serialrw::{SerialFrameReader, SerialFrameWriter};
use shvrpc::streamrw::{StreamFrameReader, StreamFrameWriter};
Expand Down Expand Up @@ -383,40 +383,32 @@ async fn make_call(mut frame_reader: BoxedFrameReader, mut frame_writer: BoxedFr
};
let param = opts.param.clone().unwrap_or_default();
let rqid = send_request(&mut *frame_writer, &path, &method, &param).await?;
async fn read_meta(frame_reader: &mut BoxedFrameReader, rq_id: RqId) {
async fn receive_response(frame_reader: &mut BoxedFrameReader, rq_id: RqId) -> shvrpc::Result<RpcFrame> {
loop {
if let Ok(RpcFrameReception::MetaAnnouncement { response_id, ..}) = frame_reader.receive_frame_or_meta().await {
if let Some(response_id) = response_id {
if response_id == rq_id {
break;
}
}
let frame = frame_reader.receive_frame().await?;
if frame.is_response() && frame.request_id().unwrap_or_default() == rq_id {
return Ok(frame);
}
}
}
let read_meta_fut = read_meta(&mut frame_reader, rqid);
if opts.timeout > 0 {
if read_meta_fut.timeout(Duration::from_millis(opts.timeout)).await.is_err() {
return Err(format!("Method call response timeout after {} msec.", opts.timeout).into())
let res = if opts.timeout > 0 {
match receive_response(&mut frame_reader, rqid).timeout(Duration::from_millis(opts.timeout)).await {
Ok(maybe_frame) => { maybe_frame }
Err(_) => {
return Err(format!("Method call response timeout after {} msec.", opts.timeout).into())
}
}
} else {
read_meta_fut.await
receive_response(&mut frame_reader, rqid).await
};
return match frame_reader.receive_frame_or_meta().await {
return match res {
Ok(frame) => {
match frame {
RpcFrameReception::MetaAnnouncement { .. } => {
Err("Unexpected meta received.".into())
}
RpcFrameReception::Frame(frame) => {
let resp = frame.to_rpcmesage()?;
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
Ok(())
}
}
let resp = frame.to_rpcmesage()?;
print_resp(&mut stdout, &resp, (&*opts.output_format).into()).await?;
Ok(())
}
Err(e) => {
Err(e.into())
Err(e)
}
}
}
Expand Down Expand Up @@ -483,7 +475,7 @@ async fn make_tunnel(mut frame_reader: BoxedFrameReader, mut frame_writer: Boxed
get_frame_fut = frame_reader.receive_frame().fuse();
}
Err(e) => {
info!("RPC socket read error: {e}");
info!("RPC socket read error: {}", shvrpc::Error::from(e));
break;
}
}
Expand Down

0 comments on commit 9da28cd

Please sign in to comment.