From 51c82f79194b5956f0b0ecd80ade26bae11af6f3 Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 1 Dec 2021 14:37:37 +0000 Subject: [PATCH] Broadcast new transitions immediately, not waiting for heartbeat timer --- Cargo.lock | 2 +- little_raft/Cargo.toml | 2 +- little_raft/src/replica.rs | 21 +++++++++++++-------- 3 files changed, 15 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 806abba..034d9d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,7 +118,7 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" [[package]] name = "little_raft" -version = "0.1.3" +version = "0.1.6" dependencies = [ "crossbeam", "crossbeam-channel", diff --git a/little_raft/Cargo.toml b/little_raft/Cargo.toml index ad44d83..29a050d 100644 --- a/little_raft/Cargo.toml +++ b/little_raft/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "The lightest distributed consensus library. Run your own replicated state machine!" name = "little_raft" -version = "0.1.4" +version = "0.1.6" authors = ["Ilya Andreev "] edition = "2018" license = "MIT" diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 1397328..f0277eb 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -212,25 +212,30 @@ where oper.recv(recv_transition) .expect("could not react to a new transition"); self.load_new_transitions(); + self.broadcast_append_entry_request(); } // Broadcast heartbeat messages. i if i == heartbeat => { oper.recv(recv_heartbeat) .expect("could not react to the heartbeat"); - self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { - term: self.current_term, - from_id: self.id, - prev_log_index: self.next_index[&peer_id] - 1, - prev_log_term: self.log[self.next_index[&peer_id] - 1].term, - entries: self.get_entries_for_peer(peer_id), - commit_index: self.commit_index, - }); + self.broadcast_append_entry_request(); self.heartbeat_timer.renew(); } _ => unreachable!(), } } + fn broadcast_append_entry_request(&mut self) { + self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { + term: self.current_term, + from_id: self.id, + prev_log_index: self.next_index[&peer_id] - 1, + prev_log_term: self.log[self.next_index[&peer_id] - 1].term, + entries: self.get_entries_for_peer(peer_id), + commit_index: self.commit_index, + }); + } + fn poll_as_follower(&mut self, recv_msg: &Receiver<()>) { match recv_msg.recv_deadline(self.next_election_deadline) { // Process pending messages.