diff --git a/src/broker/peer.rs b/src/broker/peer.rs index 8a4d904..edee6e3 100644 --- a/src/broker/peer.rs +++ b/src/broker/peer.rs @@ -116,19 +116,24 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender match frame { + frame = fut_receive_frame => match frame { Ok(frame) => { // log!(target: "RpcMsg", Level::Debug, "----> Recv frame, client id: {}", client_id); broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await?; + drop(fut_receive_frame); + fut_receive_frame = frame_reader.receive_frame().fuse(); } Err(e) => { error!("Read socket error: {}", &e); break; } }, - event = peer_reader.recv().fuse() => match event { + event = fut_receive_broker_event => match event { Err(e) => { debug!("Peer channel closed: {}", &e); break; @@ -151,6 +156,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender(); broker_writer.send(BrokerCommand::NewPeer { client_id, sender: broker_to_peer_sender, peer_kind: PeerKind::ParentBroker }).await?; + + let mut fut_receive_frame = frame_reader.receive_frame().fuse(); + let mut fut_receive_broker_event = broker_to_peer_receiver.recv().fuse(); loop { let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>()); select! { @@ -246,7 +255,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro debug!("sending ping"); frame_writer.send_message(msg).await?; }, - res_frame = frame_reader.receive_frame().fuse() => match res_frame { + res_frame = fut_receive_frame => match res_frame { Ok(mut frame) => { if frame.is_request() { fn is_dot_local_granted(frame: &RpcFrame) -> bool { @@ -279,12 +288,14 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro frame.set_shvpath(&shv_path); broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await.unwrap(); } + drop(fut_receive_frame); + fut_receive_frame = frame_reader.receive_frame().fuse(); } Err(e) => { return Err(format!("Read frame error: {e}").into()); } }, - event = broker_to_peer_receiver.recv().fuse() => match event { + event = fut_receive_broker_event => match event { Err(e) => { debug!("broker loop has closed peer channel, client ID {client_id}"); return Err(e.into()); @@ -321,6 +332,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro frame_writer.send_message(rpcmsg).await?; }, } + fut_receive_broker_event = broker_to_peer_receiver.recv().fuse(); } } } diff --git a/src/device/mod.rs b/src/device/mod.rs index cdaa443..41b488b 100644 --- a/src/device/mod.rs +++ b/src/device/mod.rs @@ -109,6 +109,9 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender) let mut ping_rq_id = 0; let mut time_to_ping = heartbeat_interval; let mut recent_ping_ts = Instant::now(); + + let mut fut_receive_frame = frame_reader.receive_frame().fuse(); + let mut fut_receive_command = receiver.recv().fuse(); loop { select! { _ = future::timeout(time_to_ping, future::pending::<()>()).fuse() => { @@ -119,30 +122,37 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender) frame_writer.send_frame(frame).await?; recent_ping_ts = Instant::now(); }, - frame = frame_reader.receive_frame().fuse() => match frame { - Ok(frame) => { - if frame.request_id().unwrap_or_default() == ping_rq_id { - debug!("ping response received: {:?}", frame); - } else { - device_sender.send(DeviceCommand::FrameReceived(frame)).await?; + frame = fut_receive_frame => { + match frame { + Ok(frame) => { + if frame.request_id().unwrap_or_default() == ping_rq_id { + debug!("ping response received: {:?}", frame); + } else { + device_sender.send(DeviceCommand::FrameReceived(frame)).await?; + } + } + Err(err) => { + error!("Receive frame error: {}", err); } } - Err(err) => { - error!("Receive frame error: {}", err); - } + drop(fut_receive_frame); + fut_receive_frame = frame_reader.receive_frame().fuse(); }, - command = receiver.recv().fuse() => match command { - Ok(command) => { - match command { - DeviceToPeerCommand::SendFrame(frame) => { - recent_ping_ts = Instant::now(); - frame_writer.send_frame(frame).await? + command = fut_receive_command => { + match command { + Ok(command) => { + match command { + DeviceToPeerCommand::SendFrame(frame) => { + recent_ping_ts = Instant::now(); + frame_writer.send_frame(frame).await? + } } } + Err(err) => { + warn!("Receive command error: {}", err); + } } - Err(err) => { - warn!("Receive command error: {}", err); - } + fut_receive_command = receiver.recv().fuse(); }, } let elapsed = recent_ping_ts.elapsed();