From 1c6a2168d41dd089aee93c28e78f7021dc655a18 Mon Sep 17 00:00:00 2001 From: Jaroslav Beran Date: Thu, 7 Mar 2024 16:25:24 +0100 Subject: [PATCH] Reuse unresolved futures in select-in-loop When waiting for multiple futures in select inside a loop, we want to reset only a future that got resolved in that loop iteration and let the other futures in their current state. Failing to do so can lead to data loss, e.g. when a FrameReader is in the middle of receiving a frame and a timeout future gets resolved first, calling `frame_reader.receive_frame()` in next loop iteration will cause that the frame being received so far gets lost and the frame_reader will start receiving new frame. --- src/broker/peer.rs | 20 ++++++++++++++++---- src/device/mod.rs | 46 ++++++++++++++++++++++++++++------------------ 2 files changed, 44 insertions(+), 22 deletions(-) diff --git a/src/broker/peer.rs b/src/broker/peer.rs index 8a4d904..edee6e3 100644 --- a/src/broker/peer.rs +++ b/src/broker/peer.rs @@ -116,19 +116,24 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender match frame { + frame = fut_receive_frame => match frame { Ok(frame) => { // log!(target: "RpcMsg", Level::Debug, "----> Recv frame, client id: {}", client_id); broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await?; + drop(fut_receive_frame); + fut_receive_frame = frame_reader.receive_frame().fuse(); } Err(e) => { error!("Read socket error: {}", &e); break; } }, - event = peer_reader.recv().fuse() => match event { + event = fut_receive_broker_event => match event { Err(e) => { debug!("Peer channel closed: {}", &e); break; @@ -151,6 +156,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender(); broker_writer.send(BrokerCommand::NewPeer { client_id, sender: broker_to_peer_sender, peer_kind: PeerKind::ParentBroker }).await?; + + let mut fut_receive_frame = frame_reader.receive_frame().fuse(); + let mut fut_receive_broker_event = broker_to_peer_receiver.recv().fuse(); loop { let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>()); select! { @@ -246,7 +255,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro debug!("sending ping"); frame_writer.send_message(msg).await?; }, - res_frame = frame_reader.receive_frame().fuse() => match res_frame { + res_frame = fut_receive_frame => match res_frame { Ok(mut frame) => { if frame.is_request() { fn is_dot_local_granted(frame: &RpcFrame) -> bool { @@ -279,12 +288,14 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro frame.set_shvpath(&shv_path); broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await.unwrap(); } + drop(fut_receive_frame); + fut_receive_frame = frame_reader.receive_frame().fuse(); } Err(e) => { return Err(format!("Read frame error: {e}").into()); } }, - event = broker_to_peer_receiver.recv().fuse() => match event { + event = fut_receive_broker_event => match event { Err(e) => { debug!("broker loop has closed peer channel, client ID {client_id}"); return Err(e.into()); @@ -321,6 +332,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro frame_writer.send_message(rpcmsg).await?; }, } + fut_receive_broker_event = broker_to_peer_receiver.recv().fuse(); } } } diff --git a/src/device/mod.rs b/src/device/mod.rs index cdaa443..41b488b 100644 --- a/src/device/mod.rs +++ b/src/device/mod.rs @@ -109,6 +109,9 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender) let mut ping_rq_id = 0; let mut time_to_ping = heartbeat_interval; let mut recent_ping_ts = Instant::now(); + + let mut fut_receive_frame = frame_reader.receive_frame().fuse(); + let mut fut_receive_command = receiver.recv().fuse(); loop { select! { _ = future::timeout(time_to_ping, future::pending::<()>()).fuse() => { @@ -119,30 +122,37 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender) frame_writer.send_frame(frame).await?; recent_ping_ts = Instant::now(); }, - frame = frame_reader.receive_frame().fuse() => match frame { - Ok(frame) => { - if frame.request_id().unwrap_or_default() == ping_rq_id { - debug!("ping response received: {:?}", frame); - } else { - device_sender.send(DeviceCommand::FrameReceived(frame)).await?; + frame = fut_receive_frame => { + match frame { + Ok(frame) => { + if frame.request_id().unwrap_or_default() == ping_rq_id { + debug!("ping response received: {:?}", frame); + } else { + device_sender.send(DeviceCommand::FrameReceived(frame)).await?; + } + } + Err(err) => { + error!("Receive frame error: {}", err); } } - Err(err) => { - error!("Receive frame error: {}", err); - } + drop(fut_receive_frame); + fut_receive_frame = frame_reader.receive_frame().fuse(); }, - command = receiver.recv().fuse() => match command { - Ok(command) => { - match command { - DeviceToPeerCommand::SendFrame(frame) => { - recent_ping_ts = Instant::now(); - frame_writer.send_frame(frame).await? + command = fut_receive_command => { + match command { + Ok(command) => { + match command { + DeviceToPeerCommand::SendFrame(frame) => { + recent_ping_ts = Instant::now(); + frame_writer.send_frame(frame).await? + } } } + Err(err) => { + warn!("Receive command error: {}", err); + } } - Err(err) => { - warn!("Receive command error: {}", err); - } + fut_receive_command = receiver.recv().fuse(); }, } let elapsed = recent_ping_ts.elapsed();