Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanda Vacek committed Sep 3, 2024
1 parent 6dace71 commit 67a4d54
Showing 1 changed file with 14 additions and 9 deletions.
23 changes: 14 additions & 9 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use shvproto::{Map, RpcValue};
use shvrpc::rpcframe::RpcFrame;
use shvrpc::{RpcMessage, RpcMessageMetaTags};
use shvrpc::rpc::{ShvRI, SubscriptionParam};
use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode};
use shvrpc::rpcmessage::{PeerId, RpcError, RpcErrorCode, RqId};
use shvrpc::util::join_path;
use crate::brokerimpl::{BrokerToPeerMessage, PeerKind, BrokerCommand};
use crate::config::{AccessRule, BrokerConfig, Mount, Password, Role, User};
Expand All @@ -17,10 +17,10 @@ struct CallCtx<'a> {
reader: &'a Receiver<BrokerToPeerMessage>,
client_id: PeerId,
}

async fn call(shv_path: &str, method: &str, param: Option<RpcValue>, ctx: &CallCtx<'_>) -> Result<RpcValue, RpcError> {
let msg = RpcMessage::new_request(shv_path, method, param);
let frame = RpcFrame::from_rpcmessage(&msg).expect("valid message");
async fn call2(shv_path: &str, method: &str, param: Option<RpcValue>, ctx: &CallCtx<'_>, resp_rq_id: Option<RqId>) -> Result<(RqId, RpcValue), RpcError> {
let rq = RpcMessage::new_request(shv_path, method, param);
let rqid = if let Some(resp_rq_id) = resp_rq_id { Some(resp_rq_id) } else { rq.request_id() };
let frame = RpcFrame::from_rpcmessage(&rq).expect("valid message");
println!("request: {}", frame.to_rpcmesage().unwrap());
ctx.writer.send(BrokerCommand::FrameReceived { peer_id: ctx.client_id, frame }).await.unwrap();
let retval = loop {
Expand All @@ -32,7 +32,7 @@ async fn call(shv_path: &str, method: &str, param: Option<RpcValue>, ctx: &CallC
panic!("unexpected message: {:?}", msg);
}
};
if msg.is_response() {
if msg.is_response() && msg.request_id() == rqid {
println!("response: {msg}");
break msg.result().cloned()
} else {
Expand All @@ -41,7 +41,12 @@ async fn call(shv_path: &str, method: &str, param: Option<RpcValue>, ctx: &CallC
continue;
}
};
retval
retval.map(|v| (rqid.unwrap_or_default(), v))
}

async fn call(shv_path: &str, method: &str, param: Option<RpcValue>, ctx: &CallCtx<'_>) -> Result<RpcValue, RpcError> {
let ret = call2(shv_path, method, param, ctx, None).await;
ret.map(|(_rqid, val)| val )
}

#[async_std::test]
Expand Down Expand Up @@ -248,11 +253,11 @@ async fn test_tunnel_loop() {
assert!(res.as_bool());

let data = "hello".as_bytes();
let res = call(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx).await.unwrap();
let (tun_rq_id, res) = call2(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx, None).await.unwrap();
assert_eq!(res.as_blob(), data);

let data = "tunnel".as_bytes();
let res = call(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx).await.unwrap();
let (_, res) = call2(&format!(".app/tunnel/{tunid}"), "write", Some(data.into()), &call_ctx, Some(tun_rq_id)).await.unwrap();
assert_eq!(res.as_blob(), data);

let res = call(&format!(".app/tunnel/{tunid}"), "close", None, &call_ctx).await.unwrap();
Expand Down

0 comments on commit 67a4d54

Please sign in to comment.