diff --git a/src/broker/peer.rs b/src/broker/peer.rs index edee6e3..97f6f2b 100644 --- a/src/broker/peer.rs +++ b/src/broker/peer.rs @@ -245,15 +245,19 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro let mut fut_receive_frame = frame_reader.receive_frame().fuse(); let mut fut_receive_broker_event = broker_to_peer_receiver.recv().fuse(); + let make_timeout = || { + Box::pin(future::timeout(heartbeat_interval, future::pending::<()>())).fuse() + }; + let mut fut_timeout = make_timeout(); loop { - let fut_timeout = future::timeout(heartbeat_interval, future::pending::<()>()); select! { - res_timeout = fut_timeout.fuse() => { + res_timeout = fut_timeout => { assert!(res_timeout.is_err()); // send heartbeat let msg = RpcMessage::new_request(".app", METH_PING, None); debug!("sending ping"); frame_writer.send_message(msg).await?; + fut_timeout = make_timeout(); }, res_frame = fut_receive_frame => match res_frame { Ok(mut frame) => { @@ -319,6 +323,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro } debug!("Sending rpc frame"); frame_writer.send_frame(frame).await?; + fut_timeout = make_timeout(); } BrokerToPeerMessage::SendMessage(rpcmsg) => { // log!(target: "RpcMsg", Level::Debug, "<---- Send message, client id: {}", client_id); @@ -330,6 +335,7 @@ async fn parent_broker_peer_loop(client_id: i32, config: ParentBrokerConfig, bro } debug!("Sending rpc message"); frame_writer.send_message(rpcmsg).await?; + fut_timeout = make_timeout(); }, } fut_receive_broker_event = broker_to_peer_receiver.recv().fuse();