Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse unresolved futures in select-in-loop #8

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/broker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,24 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<BrokerComman
subscribe_path: None,
};
broker_writer.send(register_device).await.unwrap();

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_broker_event = peer_reader.recv().fuse();
loop {
select! {
frame = frame_reader.receive_frame().fuse() => match frame {
frame = fut_receive_frame => match frame {
Ok(frame) => {
// log!(target: "RpcMsg", Level::Debug, "----> Recv frame, client id: {}", client_id);
broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await?;
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
}
Err(e) => {
error!("Read socket error: {}", &e);
break;
}
},
event = peer_reader.recv().fuse() => match event {
event = fut_receive_broker_event => match event {
Err(e) => {
debug!("Peer channel closed: {}", &e);
break;
Expand All @@ -151,6 +156,7 @@ pub(crate) async fn peer_loop(client_id: i32, broker_writer: Sender<BrokerComman
frame_writer.send_message(rpcmsg).await?;
},
}
fut_receive_broker_event = peer_reader.recv().fuse();
}
}
}
Expand Down Expand Up @@ -236,6 +242,9 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro

let (broker_to_peer_sender, broker_to_peer_receiver) = channel::unbounded::<BrokerToPeerMessage>();
broker_writer.send(BrokerCommand::NewPeer { client_id, sender: broker_to_peer_sender, peer_kind: PeerKind::ParentBroker }).await?;

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();
loop {
let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>());
select! {
Expand All @@ -246,7 +255,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
debug!("sending ping");
frame_writer.send_message(msg).await?;
},
res_frame = frame_reader.receive_frame().fuse() => match res_frame {
res_frame = fut_receive_frame => match res_frame {
Ok(mut frame) => {
if frame.is_request() {
fn is_dot_local_granted(frame: &RpcFrame) -> bool {
Expand Down Expand Up @@ -279,12 +288,14 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
frame.set_shvpath(&shv_path);
broker_writer.send(BrokerCommand::FrameReceived { client_id, frame }).await.unwrap();
}
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
}
Err(e) => {
return Err(format!("Read frame error: {e}").into());
}
},
event = broker_to_peer_receiver.recv().fuse() => match event {
event = fut_receive_broker_event => match event {
Err(e) => {
debug!("broker loop has closed peer channel, client ID {client_id}");
return Err(e.into());
Expand Down Expand Up @@ -321,6 +332,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
frame_writer.send_message(rpcmsg).await?;
},
}
fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();
}
}
}
Expand Down
46 changes: 28 additions & 18 deletions src/device/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender<DeviceCommand>)
let mut ping_rq_id = 0;
let mut time_to_ping = heartbeat_interval;
let mut recent_ping_ts = Instant::now();

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_command = receiver.recv().fuse();
loop {
select! {
_ = future::timeout(time_to_ping, future::pending::<()>()).fuse() => {
Expand All @@ -119,30 +122,37 @@ async fn peer_loop(config: &ClientConfig, device_sender: Sender<DeviceCommand>)
frame_writer.send_frame(frame).await?;
recent_ping_ts = Instant::now();
},
frame = frame_reader.receive_frame().fuse() => match frame {
Ok(frame) => {
if frame.request_id().unwrap_or_default() == ping_rq_id {
debug!("ping response received: {:?}", frame);
} else {
device_sender.send(DeviceCommand::FrameReceived(frame)).await?;
frame = fut_receive_frame => {
match frame {
Ok(frame) => {
if frame.request_id().unwrap_or_default() == ping_rq_id {
debug!("ping response received: {:?}", frame);
} else {
device_sender.send(DeviceCommand::FrameReceived(frame)).await?;
}
}
Err(err) => {
error!("Receive frame error: {}", err);
}
}
Err(err) => {
error!("Receive frame error: {}", err);
}
drop(fut_receive_frame);
fut_receive_frame = frame_reader.receive_frame().fuse();
},
command = receiver.recv().fuse() => match command {
Ok(command) => {
match command {
DeviceToPeerCommand::SendFrame(frame) => {
recent_ping_ts = Instant::now();
frame_writer.send_frame(frame).await?
command = fut_receive_command => {
match command {
Ok(command) => {
match command {
DeviceToPeerCommand::SendFrame(frame) => {
recent_ping_ts = Instant::now();
frame_writer.send_frame(frame).await?
}
}
}
Err(err) => {
warn!("Receive command error: {}", err);
}
}
Err(err) => {
warn!("Receive command error: {}", err);
}
fut_receive_command = receiver.recv().fuse();
},
}
let elapsed = recent_ping_ts.elapsed();
Expand Down
Loading