Skip to content

Commit

Permalink
Merge pull request #9 from j4r0u53k/fix-heartbeat-to-parent-broker
Browse files Browse the repository at this point in the history
Fix parent broker connection heartbeat interval
  • Loading branch information
fvacek authored Mar 7, 2024
2 parents c2b4e25 + 7e72d6a commit e3d8be3
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions src/broker/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,19 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro

let mut fut_receive_frame = frame_reader.receive_frame().fuse();
let mut fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();
let make_timeout = || {
Box::pin(future::timeout(heartbeat_interval, future::pending::<()>())).fuse()
};
let mut fut_timeout = make_timeout();
loop {
let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>());
select! {
res_timeout = fut_timeout.fuse() => {
res_timeout = fut_timeout => {
assert!(res_timeout.is_err());
// send heartbeat
let msg = RpcMessage::new_request(".app", METH_PING, None);
debug!("sending ping");
frame_writer.send_message(msg).await?;
fut_timeout = make_timeout();
},
res_frame = fut_receive_frame => match res_frame {
Ok(mut frame) => {
Expand Down Expand Up @@ -319,6 +323,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
}
debug!("Sending rpc frame");
frame_writer.send_frame(frame).await?;
fut_timeout = make_timeout();
}
BrokerToPeerMessage::SendMessage(rpcmsg) => {
// log!(target: "RpcMsg", Level::Debug, "<---- Send message, client id: {}", client_id);
Expand All @@ -330,6 +335,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro
}
debug!("Sending rpc message");
frame_writer.send_message(rpcmsg).await?;
fut_timeout = make_timeout();
},
}
fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();
Expand Down

0 comments on commit e3d8be3

Please sign in to comment.