Skip to content

Commit

Permalink
Merge pull request #8 from j4r0u53k/reuse-unresolved-futures-in-selec…
Browse files Browse the repository at this point in the history
…t-loop

Reuse unresolved futures in select-in-loop
  • Loading branch information
fvacek authored Mar 7, 2024
2 parents dcb8652 + 1c6a216 commit c2b4e25
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 22 deletions.
20 changes: 16 additions & 4 deletions src/broker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,24 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<BrokerComman
subscribe_path: None,
};
broker_writer.send(register_device).await.unwrap();

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_broker_event = peer_reader.recv().fuse();
loop {
select! {
frame = frame_reader.receive_frame().fuse() => match frame {
frame = fut_receive_frame => match frame {
Ok(frame) => {
// log!(target: "RpcMsg", Level::Debug, "----> Recv frame, client id: {}", client_id);
broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await?;
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
}
Err(e) => {
error!("Read socket error: {}", &e);
break;
}
},
event = peer_reader.recv().fuse() => match event {
event = fut_receive_broker_event => match event {
Err(e) => {
debug!("Peer channel closed: {}", &e);
break;
Expand All @@ -151,6 +156,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<BrokerComman
frame_writer.send_message(rpcmsg).await?;
},
}
fut_receive_broker_event = peer_reader.recv().fuse();
}
}
}
Expand Down Expand Up @@ -236,6 +242,9 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro

let (broker_to_peer_sender, broker_to_peer_receiver) = channel::unbounded::<BrokerToPeerMessage>();
broker_writer.send(BrokerCommand::NewPeer { client_id, sender: broker_to_peer_sender, peer_kind: PeerKind::ParentBroker }).await?;

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();
loop {
let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>());
select! {
Expand All @@ -246,7 +255,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
debug!("sending ping");
frame_writer.send_message(msg).await?;
},
res_frame = frame_reader.receive_frame().fuse() => match res_frame {
res_frame = fut_receive_frame => match res_frame {
Ok(mut frame) => {
if frame.is_request() {
fn is_dot_local_granted(frame: &RpcFrame) -> bool {
Expand Down Expand Up @@ -279,12 +288,14 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
frame.set_shvpath(&shv_path);
broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await.unwrap();
}
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
}
Err(e) => {
return Err(format!("Read frame error: {e}").into());
}
},
event = broker_to_peer_receiver.recv().fuse() => match event {
event = fut_receive_broker_event => match event {
Err(e) => {
debug!("broker loop has closed peer channel, client ID {client_id}");
return Err(e.into());
Expand Down Expand Up @@ -321,6 +332,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
frame_writer.send_message(rpcmsg).await?;
},
}
fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();
}
}
}
Expand Down
46 changes: 28 additions & 18 deletions src/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender<DeviceCommand>)
let mut ping_rq_id = 0;
let mut time_to_ping = heartbeat_interval;
let mut recent_ping_ts = Instant::now();

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_command = receiver.recv().fuse();
loop {
select! {
_ = future::timeout(time_to_ping, future::pending::<()>()).fuse() => {
Expand All @@ -119,30 +122,37 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender<DeviceCommand>)
frame_writer.send_frame(frame).await?;
recent_ping_ts = Instant::now();
},
frame = frame_reader.receive_frame().fuse() => match frame {
Ok(frame) => {
if frame.request_id().unwrap_or_default() == ping_rq_id {
debug!("ping response received: {:?}", frame);
} else {
device_sender.send(DeviceCommand::FrameReceived(frame)).await?;
frame = fut_receive_frame => {
match frame {
Ok(frame) => {
if frame.request_id().unwrap_or_default() == ping_rq_id {
debug!("ping response received: {:?}", frame);
} else {
device_sender.send(DeviceCommand::FrameReceived(frame)).await?;
}
}
Err(err) => {
error!("Receive frame error: {}", err);
}
}
Err(err) => {
error!("Receive frame error: {}", err);
}
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
},
command = receiver.recv().fuse() => match command {
Ok(command) => {
match command {
DeviceToPeerCommand::SendFrame(frame) => {
recent_ping_ts = Instant::now();
frame_writer.send_frame(frame).await?
command = fut_receive_command => {
match command {
Ok(command) => {
match command {
DeviceToPeerCommand::SendFrame(frame) => {
recent_ping_ts = Instant::now();
frame_writer.send_frame(frame).await?
}
}
}
Err(err) => {
warn!("Receive command error: {}", err);
}
}
Err(err) => {
warn!("Receive command error: {}", err);
}
fut_receive_command = receiver.recv().fuse();
},
}
let elapsed = recent_ping_ts.elapsed();
Expand Down

0 comments on commit c2b4e25

Please sign in to comment.