diff --git a/Cargo.lock b/Cargo.lock index e17c95f..00fa58f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,7 +118,7 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" [[package]] name = "little_raft" -version = "0.1.0" +version = "0.1.2" dependencies = [ "crossbeam", "crossbeam-channel", diff --git a/README.md b/README.md index b3ae4c3..3beda73 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,134 @@ -# Raft Distributed Consensus Library +# Little Raft +The lightest distributed consensus library. Run your own replicated state machine! :heart: +## Using +To use this library, you only need to do three things. + +1. Implement the StateMachine that you want your cluster to maintain. Little Raft will take care of replicating this machine across the cluster and achieving consensus on its state. +```rust +/// StateMachine describes a user-defined state machine that is replicated +/// across the cluster. Raft can Replica whatever distributed state machine can +/// implement this trait. +pub trait StateMachine +where + T: StateMachineTransition, +{ + /// This is a hook that the local Replica will call each time the state of a + /// particular transition changes. It is up to the user what to do with that + /// information. + fn register_transition_state(&mut self, transition_id: T::TransitionID, state: TransitionState); + + /// When a particular transition is ready to be applied, the Replica will + /// call apply_transition to apply said transition to the local state + /// machine. + fn apply_transition(&mut self, transition: T); + + /// This function is used to receive transitions from the user that need to + /// be applied to the replicated state machine. Note that while all Replicas + /// poll get_pending_transitions periodically, only the Leader Replica + /// actually processes them. All other Replicas discard pending transitions. + /// get_pending_transitions must not return the same transition twice. + fn get_pending_transitions(&mut self) -> Vec; +} +``` + +2. Implement the Cluster abstraction so that the local Replica can communicate with other nodes. +```rust +/// Cluster is used for the local Raft Replica to communicate with the rest of +/// the Raft cluster. It is up to the user how to abstract that communication. +/// The Cluster trait also contains hooks which the Replica will use to inform +/// the crate user of state changes. +pub trait Cluster +where + T: StateMachineTransition, +{ + /// This function is used to deliver messages to target Replicas. The + /// Replica will provide the to_id of the other Replica it's trying to send + /// its message to and provide the message itself. The send_message + /// implementation must not block but is allowed silently fail -- Raft + /// exists to achieve consensus in spite of failures, after all. + fn send_message(&mut self, to_id: usize, message: Message); + + /// This function is used by the Replica to receive pending messages from + /// the cluster. The receive_messages implementation must not block and must + /// not return the same message more than once. + fn receive_messages(&mut self) -> Vec>; + + /// By returning true from halt you can signal to the Replica that it should + /// stop running. + fn halt(&self) -> bool; + + /// This function is a hook that the Replica uses to inform the user of the + /// Leader change. The leader_id is an Option because the Leader + /// might be unknown for a period of time. Remember that only Leaders can + /// process transitions submitted by the Raft users, so the leader_id can be + /// used to redirect the requests from non-Leader nodes to the Leader node. + fn register_leader(&mut self, leader_id: Option); +} +``` +3. Start your replica! +```rust + /// Create a new Replica. + /// + /// id is the ID of this Replica within the cluster. + /// + /// peer_ids is a vector of IDs of all other Replicas in the cluster. + /// + /// cluster represents the abstraction the Replica uses to talk with other + /// Replicas. + /// + /// state_machine is the state machine that Raft maintains. + /// + /// noop_transition is a transition that can be applied to the state machine + /// multiple times with no effect. + /// + /// heartbeat_timeout defines how often the Leader Replica sends out + /// heartbeat messages. + /// + /// election_timeout_range defines the election timeout interval. If the + /// Replica gets no messages from the Leader before the timeout, it + /// initiates an election. + /// + /// In practice, pick election_timeout_range to be 2-3x the value of + /// heartbeat_timeout, depending on your particular use-case network latency + /// and responsiveness needs. An election_timeout_range / heartbeat_timeout + /// ratio that's too low might cause unwarranted re-elections in the + /// cluster. + pub fn new( + id: ReplicaID, + peer_ids: Vec, + cluster: Arc>, + state_machine: Arc>, + noop_transition: T, + heartbeat_timeout: Duration, + election_timeout_range: (Duration, Duration), + ) -> Replica; + + /// This function starts the Replica and blocks forever. + /// + /// recv_msg is a channel on which the user must notify the Replica whenever + /// new messages from the Cluster are available. The Replica will not poll + /// for messages from the Cluster unless notified through recv_msg. + /// + /// recv_msg is a channel on which the user must notify the Replica whenever + /// new messages from the Cluster are available. The Replica will not poll + /// for messages from the Cluster unless notified through recv_msg. + /// + /// recv_transition is a channel on which the user must notify the Replica + /// whenever new transitions to be processed for the StateMachine are + /// available. The Replica will not poll for pending transitions for the + /// StateMachine unless notified through recv_transition. + pub fn start(&mut self, recv_msg: Receiver<()>, recv_transition: Receiver<()>); +``` + + +With that, you're good to go. We are working on examples, but for now you can look at the `little_raft/tests` directory. We're working on adding more tests. + + +## Testing +Run `cargo test`. + +## Contributing +Contributions are very welcome! Do remember that one of the goals of this library is to be as small and simple as possible. Let's keep the code in `little_raft/src` under 1,000 lines. No PRs breaking this rule will be merged. + +You are welcome to pick up and work on any of the issues open for this project. Or you can submit new issues if anything comes up from your experience using this library. \ No newline at end of file diff --git a/input.txt b/input.txt deleted file mode 100644 index 1a064a5..0000000 --- a/input.txt +++ /dev/null @@ -1,2 +0,0 @@ -1: Apply+1 // -2: Apply+110 // \ No newline at end of file diff --git a/little_raft/Cargo.toml b/little_raft/Cargo.toml index 3e05eb3..c13acbf 100644 --- a/little_raft/Cargo.toml +++ b/little_raft/Cargo.toml @@ -1,8 +1,15 @@ [package] +description = "The lightest distributed consensus library. Run your own replicated state machine!" name = "little_raft" -version = "0.1.0" -authors = ["ilya "] +version = "0.1.2" +authors = ["Ilya Andreev "] edition = "2018" +license = "MIT" +homepage = "https://github.com/andreev-io/little-raft" +repository = "https://github.com/andreev-io/little-raft" +readme = "../README.md" +keywords = ["distributed-systems", "raft", "consensus"] +categories = ["concurrency", "database", "database-implementations"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index 25501aa..cbe4563 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -1,23 +1,33 @@ -use crate::{message::Message, state_machine::StateMachineTransition}; -use std::time::Duration; +use crate::{message::Message, replica::ReplicaID, state_machine::StateMachineTransition}; -// Cluster provides the means of communication of a replica with the rest of the -// cluster and the user. +/// Cluster is used for the local Raft Replica to communicate with the rest of +/// the Raft cluster. It is up to the user how to abstract that communication. +/// The Cluster trait also contains hooks which the Replica will use to inform +/// the crate user of state changes. pub trait Cluster where T: StateMachineTransition, { - // This function is used to deliver messages to target replicas. The - // algorithm assumes that send can silently fail. - fn send(&self, to_id: usize, message: Message); - // This function is used to received messages for the replicas. This - // function must block until timeout expires or a message is received, - // whichever comes first. - fn receive_timeout(&self, timeout: Duration) -> Option>; - // This function is used to receive actions from the user that the - // distributed state machine needs to replicate and apply. All replicas poll - // this function periodically but only Leaders merit the return value. - // Non-Leaders ignore the return value of get_action. - fn get_pending_transitions(&self) -> Vec; - fn register_leader_change(&mut self, leader_id: Option); + /// This function is used to deliver messages to target Replicas. The + /// Replica will provide the to_id of the other Replica it's trying to send + /// its message to and provide the message itself. The send_message + /// implementation must not block but is allowed silently fail -- Raft + /// exists to achieve consensus in spite of failures, after all. + fn send_message(&mut self, to_id: usize, message: Message); + + /// This function is used by the Replica to receive pending messages from + /// the cluster. The receive_messages implementation must not block and must + /// not return the same message more than once. + fn receive_messages(&mut self) -> Vec>; + + /// By returning true from halt you can signal to the Replica that it should + /// stop running. + fn halt(&self) -> bool; + + /// This function is a hook that the Replica uses to inform the user of the + /// Leader change. The leader_id is an Option because the Leader + /// might be unknown for a period of time. Remember that only Leaders can + /// process transitions submitted by the Raft users, so the leader_id can be + /// used to redirect the requests from non-Leader nodes to the Leader node. + fn register_leader(&mut self, leader_id: Option); } diff --git a/little_raft/src/lib.rs b/little_raft/src/lib.rs index e91291e..9841ddf 100644 --- a/little_raft/src/lib.rs +++ b/little_raft/src/lib.rs @@ -1,5 +1,19 @@ +//! This crate is a small but full-featured implementation of the Raft +//! distributed consensus protocol. By using this library, you can run a +//! replicated state machine in your own cluster. The cluster could be comprised +//! of dozens of physical servers in different parts of the world or of two +//! threads on a single CPU. +//! +//! The goal of this library is to provide a generic implementation of the +//! algorithm that the library user can leverage in their own way. It is +//! entirely up to the user how to configure the Raft cluster, how to ensure +//! communication between the nodes, how to process client's messages, how to do +//! service discovery, and what kind of state machine to replicate. +//! +//! The implementation is kept as simple as possible on purpose, with the entire +//! library code base fitting in under 1,000 lines of code. pub mod cluster; -mod heartbeat_timer; pub mod message; pub mod replica; pub mod state_machine; +mod timer; diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index 69ef0be..129600f 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -1,11 +1,10 @@ use crate::replica::ReplicaID; use crate::state_machine::StateMachineTransition; -// Entry describes a user-defined transition of the distributed state machine. -// It has some associated metadata, namely the term when the entry was created -// and its index in the log. -#[derive(Clone, Debug)] -pub struct Entry +/// LogEntry is a state machine transition along with some metadata needed for +/// Raft. +#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)] +pub struct LogEntry where T: StateMachineTransition, { @@ -14,41 +13,45 @@ where pub term: usize, } -// Message describes messages that the replicas pass between each other to -// achieve consensus on the distributed state machine. -#[derive(Debug)] +/// Message describes messages that the replicas pass between each other to +/// achieve consensus on the distributed state machine. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)] pub enum Message where T: StateMachineTransition, { - // AppendEntryRequest is used by Leaders to send out logs for other replicas - // to append to their log. It also has information on what logs are ready to - // be applied to the state machine. AppendEntryRequest is also used as a - // heart beat message by Leaders even when no new logs need to be processed. + /// AppendEntryRequest is used by the Leader to send out logs for other + /// replicas to append to their log. It also has information on what logs + /// are ready to be applied to the state machine. AppendEntryRequest is also + /// used as a heart beat message by the Leader even when no new logs need to + /// be processed. AppendEntryRequest { from_id: ReplicaID, term: usize, prev_log_index: usize, prev_log_term: usize, - entries: Vec>, + entries: Vec>, commit_index: usize, }, - // AppendEntryResponse is used by replicas to respond to AppendEntryRequest - // messages. + + /// AppendEntryResponse is used by replicas to respond to AppendEntryRequest + /// messages. AppendEntryResponse { from_id: ReplicaID, term: usize, success: bool, last_index: usize, }, - // VoteRequest is used by Candidates to solicit votes for themselves. + + /// VoteRequest is used by Candidates to solicit votes for themselves. VoteRequest { from_id: ReplicaID, term: usize, last_log_index: usize, last_log_term: usize, }, - // VoteResponse is used by replicas to respond to VoteRequest messages. + + /// VoteResponse is used by replicas to respond to VoteRequest messages. VoteResponse { from_id: ReplicaID, term: usize, diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index a18b9bd..b1ef9c6 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,13 +1,15 @@ use crate::{ cluster::Cluster, - heartbeat_timer::HeartbeatTimer, - message::{Entry, Message}, + message::{LogEntry, Message}, state_machine::{StateMachine, StateMachineTransition, TransitionState}, + timer::Timer, }; +use crossbeam_channel::{Receiver, Select}; use rand::Rng; +use std::sync::{Arc, Mutex}; use std::{ collections::{BTreeMap, BTreeSet}, - time::Duration, + time::{Duration, Instant}, }; #[derive(Clone, Copy, PartialEq, Debug)] @@ -17,68 +19,118 @@ enum State { Leader, } +/// ReplicaID is a type alias used to identify Raft nodes. pub type ReplicaID = usize; -pub struct Replica +/// Replica describes the local instance running the Raft algorithm. Its goal is +/// to maintain the consistency of the user-defined StateMachine across the +/// cluster. It uses the user-defined Cluster implementation to talk to other +/// Replicas, be it over the network or pigeon post. +pub struct Replica where T: StateMachineTransition, S: StateMachine, + C: Cluster, { - // ID of this replica. + /// ID of this Replica. id: ReplicaID, - // IDs of other replicas in the cluster. + + /// IDs of other Replicas in the cluster. peer_ids: Vec, - // User-defined state machine that the cluster replicates. - state_machine: Box, - // Interface a replica uses to communicate with the rest of the cluster and - // the user. - cluster: Box>, - // Current term. + + /// User-defined state machine that the cluster Replicates. + state_machine: Arc>, + + /// Interface a Replica uses to communicate with the rest of the cluster. + cluster: Arc>, + + /// Current term. current_term: usize, - // ID of peers with votes for self. + + /// ID of peers with votes for self. current_votes: Option>>, - // State of this replica. + + /// State of this Replica. state: State, - // Who the last vote was cast for. + + /// Who the last vote was cast for. voted_for: Option, - // entries this replica is aware of. - entries: Vec>, - // Index of the highest transition known to be committed. + + /// entries this Replica is aware of. + log: Vec>, + + /// Index of the highest transition known to be committed. commit_index: usize, - // Index of the highest transition applied to the state machine. + + /// Index of the highest transition applied to the local state machine. last_applied: usize, - // For each server, index of the next log entry to send to that server. Only - // present on leaders. + + /// For each server, index of the next log entry to send to that server. + /// Only present on leaders. next_index: BTreeMap, - // For each server, index of highest log entry known to be replicated on - // that server. Only present on leaders. + + /// For each server, index of highest log entry known to be replicated on + /// that server. Only present on leaders. match_index: BTreeMap, - // No-op transition used to force a faster replica update when a cluster Leader - // changes. + + /// No-op transition used to force a faster Replica update when a cluster + /// Leader changes. Applied this transition multiple times must have no + /// affect on the state machine. noop_transition: T, - rng: rand::prelude::ThreadRng, - leader_id: Option, + + /// Timer used for heartbeat messages. + heartbeat_timer: Timer, + + /// Timeout range within a randomized timeout is picked for when to start a + /// new Leader election if the current Leader is not sending heartbeats. + election_timeout: (Duration, Duration), + + /// If no heartbeat message is received by the deadline, the Replica will + /// start an election. + next_election_deadline: Instant, } -// Replica describes a single state machine running the Raft algorithm. -// Instances of replicas communicate with each other to achieve consensus on the -// state of the user-defined state machine. -impl Replica +impl Replica where T: StateMachineTransition, S: StateMachine, + C: Cluster, { - // Create a new Replica. Provide its unique identifier within the cluster, a - // vector of its peers identifiers (all peers in the cluster), the state - // machine that the cluster maintains, and an instance of a no-op transition - // used for faster forced updates. + /// Create a new Replica. + /// + /// id is the ID of this Replica within the cluster. + /// + /// peer_ids is a vector of IDs of all other Replicas in the cluster. + /// + /// cluster represents the abstraction the Replica uses to talk with other + /// Replicas. + /// + /// state_machine is the state machine that Raft maintains. + /// + /// noop_transition is a transition that can be applied to the state machine + /// multiple times with no effect. + /// + /// heartbeat_timeout defines how often the Leader Replica sends out + /// heartbeat messages. + /// + /// election_timeout_range defines the election timeout interval. If the + /// Replica gets no messages from the Leader before the timeout, it + /// initiates an election. + /// + /// In practice, pick election_timeout_range to be 2-3x the value of + /// heartbeat_timeout, depending on your particular use-case network latency + /// and responsiveness needs. An election_timeout_range / heartbeat_timeout + /// ratio that's too low might cause unwarranted re-elections in the + /// cluster. pub fn new( id: ReplicaID, peer_ids: Vec, - cluster: Box>, - state_machine: Box, + cluster: Arc>, + state_machine: Arc>, noop_transition: T, - ) -> Replica { + heartbeat_timeout: Duration, + election_timeout_range: (Duration, Duration), + ) -> Replica { Replica { state_machine: state_machine, cluster: cluster, @@ -88,7 +140,7 @@ where current_votes: None, state: State::Follower, voted_for: None, - entries: vec![Entry { + log: vec![LogEntry { term: 0, index: 0, transition: noop_transition, @@ -98,108 +150,166 @@ where last_applied: 0, next_index: BTreeMap::new(), match_index: BTreeMap::new(), - rng: rand::thread_rng(), - leader_id: None, + election_timeout: election_timeout_range, + heartbeat_timer: Timer::new(heartbeat_timeout), + next_election_deadline: Instant::now(), } } - // This function starts the replica and blocks forever. Election duration is - // randomized to avoid perpetual elections cycles. Recommended min and max - // election timeouts are 2,500 and 3,500 milliseconds, respectively. The - // heartbeat timeout defines how often the Leader notifies other replicas of - // its liveness. Recommended heartbeat timeout is 1 second. - // - // We recommend that min election timeout should be a multiple of the - // heartbeat timeout, depending on expected network latency. - pub fn start( - &mut self, - min_election_timeout: u64, - max_election_timeout: u64, - heartbeat_timeout: std::time::Duration, - ) { - self.poll( - (min_election_timeout, max_election_timeout), - heartbeat_timeout, - ); - } + /// This function starts the Replica and blocks forever. + /// + /// recv_msg is a channel on which the user must notify the Replica whenever + /// new messages from the Cluster are available. The Replica will not poll + /// for messages from the Cluster unless notified through recv_msg. + /// + /// recv_msg is a channel on which the user must notify the Replica whenever + /// new messages from the Cluster are available. The Replica will not poll + /// for messages from the Cluster unless notified through recv_msg. + /// + /// recv_transition is a channel on which the user must notify the Replica + /// whenever new transitions to be processed for the StateMachine are + /// available. The Replica will not poll for pending transitions for the + /// StateMachine unless notified through recv_transition. + pub fn start(&mut self, recv_msg: Receiver<()>, recv_transition: Receiver<()>) { + loop { + if self.cluster.lock().unwrap().halt() { + return; + } - fn broadcast_message(&self, message_generator: F) - where - F: Fn(usize) -> Message, - { - self.peer_ids.iter().for_each(|peer_id| { - self.cluster - .send(peer_id.clone(), message_generator(peer_id.clone())) - }); - } + match self.state { + State::Leader => self.poll_as_leader(&recv_msg, &recv_transition), + State::Follower => self.poll_as_follower(&recv_msg), + State::Candidate => self.poll_as_candidate(&recv_msg), + } - fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { - self.entries[self.next_index[&peer_id]..self.entries.len()].to_vec() + self.apply_ready_entries(); + } } - fn poll(&mut self, election_timeout: (u64, u64), heartbeat_timeout: std::time::Duration) { - let mut heartbeat_timer = HeartbeatTimer::new(heartbeat_timeout); + fn poll_as_leader(&mut self, recv_msg: &Receiver<()>, recv_transition: &Receiver<()>) { + let mut select = Select::new(); + let recv_heartbeat = self.heartbeat_timer.get_rx(); + let (msg, transition, heartbeat) = ( + select.recv(recv_msg), + select.recv(recv_transition), + select.recv(recv_heartbeat), + ); - loop { - let election_timeout = - Duration::from_millis(self.rng.gen_range(election_timeout.0..=election_timeout.1)); - match self.state { - State::Leader => self.poll_as_leader(&mut heartbeat_timer, heartbeat_timeout), - State::Follower => self.poll_as_follower(election_timeout), - State::Candidate => self.poll_as_candidate(election_timeout), + let oper = select.select(); + match oper.index() { + // Process pending messages. + i if i == msg => { + oper.recv(recv_msg) + .expect("could not react to a new message"); + let messages = self.cluster.lock().unwrap().receive_messages(); + for message in messages { + self.process_message_as_leader(message); + } } - - self.apply_ready_entries(); - self.load_new_entries(); + // Process pending transitions. + i if i == transition => { + oper.recv(recv_transition) + .expect("could not react to a new transition"); + self.load_new_transitions(); + } + // Broadcast heartbeat messages. + i if i == heartbeat => { + oper.recv(recv_heartbeat) + .expect("could not react to the heartbeat"); + self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { + term: self.current_term, + from_id: self.id, + prev_log_index: self.next_index[&peer_id] - 1, + prev_log_term: self.log[self.next_index[&peer_id] - 1].term, + entries: self.get_entries_for_peer(peer_id), + commit_index: self.commit_index, + }); + self.heartbeat_timer.renew(); + } + _ => unreachable!(), } } - fn poll_as_leader( - &mut self, - heartbeat_timer: &mut HeartbeatTimer, - heartbeat_timeout: Duration, - ) { - if heartbeat_timer.fired() { - self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { - term: self.current_term, - from_id: self.id, - prev_log_index: self.next_index[&peer_id] - 1, - prev_log_term: self.entries[self.next_index[&peer_id] - 1].term, - entries: self.get_entries_for_peer(peer_id), - commit_index: self.commit_index, - }); - heartbeat_timer.renew(); + fn poll_as_follower(&mut self, recv_msg: &Receiver<()>) { + match recv_msg.recv_deadline(self.next_election_deadline) { + // Process pending messages. + Ok(_) => { + let messages = self.cluster.lock().unwrap().receive_messages(); + // Update the election deadline if more than zero messages were + // actually received. + if messages.len() != 0 { + self.update_election_deadline(); + } + + for message in messages { + self.process_message_as_follower(message); + } + } + // Become candidate and update elction deadline. + _ => { + self.become_candidate(); + self.update_election_deadline(); + } } - let message = self.cluster.receive_timeout(heartbeat_timeout); - if let Some(msg) = message { - self.process_message_as_leader(msg) - }; + // Load new transitions. + self.load_new_transitions(); } - fn poll_as_follower(&mut self, election_timeout: Duration) { - let message = self.cluster.receive_timeout(election_timeout); - if let Some(msg) = message { - self.process_message_as_follower(msg); - } else { - self.become_candidate(); - } + fn update_election_deadline(&mut self) { + // Randomize each election deadline within the allowed range. + self.next_election_deadline = Instant::now() + + rand::thread_rng().gen_range(self.election_timeout.0..=self.election_timeout.1); } - fn poll_as_candidate(&mut self, election_timeout: Duration) { - let message = self.cluster.receive_timeout(election_timeout); - if let Some(msg) = message { - self.process_message_as_candidate(msg); - } else { - self.become_candidate(); + fn poll_as_candidate(&mut self, recv_msg: &Receiver<()>) { + match recv_msg.recv_deadline(self.next_election_deadline) { + Ok(_) => { + // Process pending messages. + let messages = self.cluster.lock().unwrap().receive_messages(); + // Update the election deadline if more than zero messages were + // actually received. + if messages.len() != 0 { + self.update_election_deadline(); + } + for message in messages { + self.process_message_as_candidate(message); + } + } + // Become candidate and update elction deadline. + _ => { + self.become_candidate(); + self.update_election_deadline(); + } } + + // Load new transitions. + self.load_new_transitions(); + } + + fn broadcast_message(&self, message_generator: F) + where + F: Fn(usize) -> Message, + { + self.peer_ids.iter().for_each(|peer_id| { + self.cluster + .lock() + .unwrap() + .send_message(peer_id.clone(), message_generator(peer_id.clone())) + }); } + // Get log entries that have not been acknowledge by the peer. + fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { + self.log[self.next_index[&peer_id]..self.log.len()].to_vec() + } + + // Apply entries that are ready to be apploed. fn apply_ready_entries(&mut self) { // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. - if self.state == State::Leader && self.commit_index < self.entries.len() - 1 { - let mut n = self.entries.len() - 1; + if self.state == State::Leader && self.commit_index < self.log.len() - 1 { + let mut n = self.log.len() - 1; let old_commit_index = self.commit_index; while n > self.commit_index { let num_replications = @@ -209,7 +319,7 @@ where ); if num_replications * 2 >= self.peer_ids.len() - && self.entries[n].term == self.current_term + && self.log[n].term == self.current_term { self.commit_index = n; } @@ -217,8 +327,9 @@ where } for i in old_commit_index + 1..=self.commit_index { - self.state_machine.register_transition_state( - self.entries[i].transition.get_id(), + let mut state_machine = self.state_machine.lock().unwrap(); + state_machine.register_transition_state( + self.log[i].transition.get_id(), TransitionState::Committed, ); } @@ -227,25 +338,29 @@ where // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; - self.state_machine - .apply_transition(self.entries[self.last_applied].transition); - self.state_machine.register_transition_state( - self.entries[self.last_applied].transition.get_id(), + let mut state_machine = self.state_machine.lock().unwrap(); + state_machine.apply_transition(self.log[self.last_applied].transition); + state_machine.register_transition_state( + self.log[self.last_applied].transition.get_id(), TransitionState::Applied, ); } } - fn load_new_entries(&mut self) { - for transition in self.cluster.get_pending_transitions().iter() { + fn load_new_transitions(&mut self) { + // Load new transitions. Ignore the transitions if the replica is not + // the Leader. + let transitions = self.state_machine.lock().unwrap().get_pending_transitions(); + for transition in transitions { if self.state == State::Leader { - self.entries.push(Entry { - index: self.entries.len(), - transition: *transition, + self.log.push(LogEntry { + index: self.log.len(), + transition: transition, term: self.current_term, }); - self.state_machine + let mut state_machine = self.state_machine.lock().unwrap(); + state_machine .register_transition_state(transition.get_id(), TransitionState::Queued); } } @@ -260,11 +375,15 @@ where last_index, } => { if term > self.current_term { + // Become follower if another node's term is higher. + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); } else if success { + // Update information about the peer's logs. self.next_index.insert(from_id, last_index + 1); self.match_index.insert(from_id, last_index); } else { + // Update information about the peer's logs. self.next_index .insert(from_id, self.next_index[&from_id] - 1); } @@ -281,7 +400,8 @@ where last_log_term: usize, ) { if self.current_term > term { - self.cluster.send( + // Do not vote for Replicas that are behind. + self.cluster.lock().unwrap().send_message( from_id, Message::VoteResponse { from_id: self.id, @@ -290,14 +410,18 @@ where }, ); } else if self.current_term < term { + // Become follower if the other replica's term is higher. + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); } if self.voted_for == None || self.voted_for == Some(from_id) { - if self.entries[self.entries.len() - 1].index <= last_log_index - && self.entries[self.entries.len() - 1].term <= last_log_term + if self.log[self.log.len() - 1].index <= last_log_index + && self.log[self.log.len() - 1].term <= last_log_term { - self.cluster.send( + // If the criteria are met, grant the vote. + self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().send_message( from_id, Message::VoteResponse { from_id: self.id, @@ -306,7 +430,8 @@ where }, ); } else { - self.cluster.send( + // If the criteria are not met, do not grant the vote. + self.cluster.lock().unwrap().send_message( from_id, Message::VoteResponse { from_id: self.id, @@ -316,7 +441,8 @@ where ); } } else { - self.cluster.send( + // If voted for someone else, don't grant the vote. + self.cluster.lock().unwrap().send_message( from_id, Message::VoteResponse { from_id: self.id, @@ -327,57 +453,39 @@ where } } - fn register_leader(&mut self, leader_id: usize) { - match self.leader_id { - Some(cur_leader_id) => { - if cur_leader_id != leader_id { - self.leader_id = Some(leader_id); - self.cluster.register_leader_change(Some(leader_id)); - } - } - None => { - self.leader_id = Some(leader_id); - self.cluster.register_leader_change(Some(leader_id)); - } - } - } - fn process_append_entry_request_as_follower( &mut self, from_id: ReplicaID, term: usize, prev_log_index: usize, prev_log_term: usize, - entries: Vec>, + entries: Vec>, commit_index: usize, ) { - self.register_leader(from_id); - // Check that the leader's term is at least as large as ours. if self.current_term > term { - self.cluster.send( + self.cluster.lock().unwrap().send_message( from_id, Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: false, - last_index: self.entries.len() - 1, + last_index: self.log.len() - 1, }, ); return; // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. - } else if prev_log_index >= self.entries.len() - || self.entries[prev_log_index].term != prev_log_term + } else if prev_log_index >= self.log.len() || self.log[prev_log_index].term != prev_log_term { - self.cluster.send( + self.cluster.lock().unwrap().send_message( from_id, Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: false, - last_index: self.entries.len() - 1, + last_index: self.log.len() - 1, }, ); @@ -385,30 +493,35 @@ where } for entry in entries { - if entry.index < self.entries.len() && entry.term != self.entries[entry.index].term { - self.entries.truncate(entry.index); + // Drop local inconsistent logs. + if entry.index < self.log.len() && entry.term != self.log[entry.index].term { + self.log.truncate(entry.index); } - if entry.index == self.entries.len() { - self.entries.push(entry); + // Push received logs. + if entry.index == self.log.len() { + self.log.push(entry); } } - if commit_index > self.commit_index && self.entries.len() != 0 { - self.commit_index = if commit_index < self.entries[self.entries.len() - 1].index { + // Update local commit index to either the received commit index or the + // latest local log position, whichever is smaller. + if commit_index > self.commit_index && self.log.len() != 0 { + self.commit_index = if commit_index < self.log[self.log.len() - 1].index { commit_index } else { - self.entries[self.entries.len() - 1].index + self.log[self.log.len() - 1].index } } - self.cluster.send( + self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().send_message( from_id, Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: true, - last_index: self.entries.len() - 1, + last_index: self.log.len() - 1, }, ); } @@ -467,11 +580,16 @@ where vote_granted: bool, ) { if term > self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); } else if vote_granted { + // Record that the vote has been granted. if let Some(cur_votes) = &mut self.current_votes { cur_votes.insert(from_id); - if cur_votes.len() * 2 >= self.peer_ids.len() { + // If more than half of the cluster has voted for the Replica + // (the Replica itself included), it's time to become the + // Leader. + if cur_votes.len() * 2 > self.peer_ids.len() { self.become_leader(); } } @@ -485,10 +603,11 @@ where message: Message, ) { if term > self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); self.process_message_as_follower(message); } else { - self.cluster.send( + self.cluster.lock().unwrap().send_message( from_id, Message::VoteResponse { from_id: self.id, @@ -505,51 +624,49 @@ where from_id: ReplicaID, message: Message, ) { - self.register_leader(from_id); - if term >= self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); self.process_message_as_follower(message); } else { - self.cluster.send( + self.cluster.lock().unwrap().send_message( from_id, Message::AppendEntryResponse { from_id: self.id, term: self.current_term, success: false, - last_index: self.entries.len() - 1, + last_index: self.log.len() - 1, }, ); } } fn become_leader(&mut self) { - self.register_leader(self.id); + self.cluster.lock().unwrap().register_leader(Some(self.id)); self.state = State::Leader; self.current_votes = None; self.voted_for = None; self.next_index = BTreeMap::new(); self.match_index = BTreeMap::new(); for peer_id in &self.peer_ids { - self.next_index.insert(peer_id.clone(), self.entries.len()); + self.next_index.insert(peer_id.clone(), self.log.len()); self.match_index.insert(peer_id.clone(), 0); } - // If the previous leader had some uncommitted entries that were - // replicated to this now-leader server, this replica will not commit + // If the previous Leader had some uncommitted entries that were + // replicated to this now-Leader server, this replica will not commit // them until its commit index advances to a log entry appended in this - // leader's term. To carry out this operation as soon as the new leader + // Leader's term. To carry out this operation as soon as the new Leader // emerges, append a no-op entry. This is a neat optimization described - // in the part 8 of the paper). - self.entries.push(Entry { - index: self.entries.len(), + // in the part 8 of the paper. + self.log.push(LogEntry { + index: self.log.len(), transition: self.noop_transition, term: self.current_term, }); } fn become_follower(&mut self, term: usize) { - self.leader_id = None; self.current_term = term; self.state = State::Follower; self.current_votes = None; @@ -557,7 +674,6 @@ where } fn become_candidate(&mut self) { - self.leader_id = None; // Increase current term. self.current_term += 1; // Claim yourself a candidate. @@ -571,8 +687,8 @@ where self.broadcast_message(|_: usize| Message::VoteRequest { from_id: self.id, term: self.current_term, - last_log_index: self.entries.len() - 1, - last_log_term: self.entries[self.entries.len() - 1].term, + last_log_index: self.log.len() - 1, + last_log_term: self.log[self.log.len() - 1].term, }); } } diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index d62c52f..7abdef1 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -1,32 +1,55 @@ use std::fmt::Debug; -// State of a particular transition. +/// TransitionState describes the state of a particular transition. #[derive(Clone, Copy, Debug, PartialEq)] pub enum TransitionState { - // Transition being queued means that the replica is aware of it and is - // replicating it across the cluster. + /// Queued transitions have been received from the user but have not been + /// processed yet. They are in the queue. + /// Queued, - // Transition being committed means that the entry is guaranteed to be in - // the log of all future leaders in the cluster. + + /// Committed transitions have not yet been applied to the state machine but + /// have already been replicated across the cluster such that they are + /// guaranteed to be present in the log of all future cluster leaders. Committed, - // Entry being applied means that it has been applied to the state machine. + + /// Applied transitions have been replicated across the cluster and have + /// been applied to the local state machine. Applied, } +/// StateMachineTransition describes a user-defined transition that can be +/// applied to the state machine replicated by Raft. pub trait StateMachineTransition: Copy + Clone + Debug { + /// TransitionID is used to identify the transition. type TransitionID: Eq; + + /// get_id is used by the Replica to identify the transition to be able to + /// call register_transition_state. fn get_id(&self) -> Self::TransitionID; } -// StateMachine describes a user-defined state machine that is replicated across -// the cluster. +/// StateMachine describes a user-defined state machine that is replicated +/// across the cluster. Raft can Replica whatever distributed state machine can +/// implement this trait. pub trait StateMachine where T: StateMachineTransition, { + /// This is a hook that the local Replica will call each time the state of a + /// particular transition changes. It is up to the user what to do with that + /// information. fn register_transition_state(&mut self, transition_id: T::TransitionID, state: TransitionState); - // When apply_transition is called, the local state machine must apply the - // specified transition. + /// When a particular transition is ready to be applied, the Replica will + /// call apply_transition to apply said transition to the local state + /// machine. fn apply_transition(&mut self, transition: T); + + /// This function is used to receive transitions from the user that need to + /// be applied to the replicated state machine. Note that while all Replicas + /// poll get_pending_transitions periodically, only the Leader Replica + /// actually processes them. All other Replicas discard pending transitions. + /// get_pending_transitions must not return the same transition twice. + fn get_pending_transitions(&mut self) -> Vec; } diff --git a/little_raft/src/heartbeat_timer.rs b/little_raft/src/timer.rs similarity index 56% rename from little_raft/src/heartbeat_timer.rs rename to little_raft/src/timer.rs index 10033a2..732ab17 100644 --- a/little_raft/src/heartbeat_timer.rs +++ b/little_raft/src/timer.rs @@ -1,22 +1,21 @@ use crossbeam::channel::{bounded, Receiver}; use std::{thread, time::Duration}; -pub struct HeartbeatTimer { - timeout: Duration, +pub struct Timer { rx: Receiver<()>, + timeout: Duration, } -// HeartbeatTimer fires after the specified duration. The timer can be renewed. -impl HeartbeatTimer { - pub fn new(timeout: Duration) -> HeartbeatTimer { +// Timer fires after the specified duration. The timer can be renewed. +impl Timer { + pub fn new(timeout: Duration) -> Timer { let (tx, rx) = bounded(1); - thread::spawn(move || { thread::sleep(timeout); - tx.send(()).unwrap(); + let _ = tx.send(()); }); - HeartbeatTimer { + Timer { timeout: timeout, rx: rx, } @@ -27,16 +26,13 @@ impl HeartbeatTimer { let timeout = self.timeout; thread::spawn(move || { thread::sleep(timeout); - tx.send(()).unwrap(); + let _ = tx.send(()); }); self.rx = rx; } - pub fn fired(&mut self) -> bool { - match self.rx.try_recv() { - Ok(_) => true, - Err(_) => false, - } + pub fn get_rx(&self) -> &Receiver<()> { + &self.rx } } diff --git a/little_raft/tests/raft.rs b/little_raft/tests/raft.rs index d9086a6..afffc63 100644 --- a/little_raft/tests/raft.rs +++ b/little_raft/tests/raft.rs @@ -1,16 +1,18 @@ -use crossbeam_channel::{unbounded, Receiver, SendError, Sender}; +use crossbeam_channel as channel; +use crossbeam_channel::{unbounded, Sender}; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, state_machine::{StateMachine, StateMachineTransition, TransitionState}, }; +use std::sync::{Arc, Mutex}; -use std::{collections::BTreeMap, thread}; +use std::{collections::BTreeMap, thread, time::Duration}; -const HEARTBEAT_TIMEOUT: u64 = 200; -const ELECTION_MIN_TIMEOUT: u64 = 500; -const ELECTION_MAX_TIMEOUT: u64 = 700; +const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(500); +const ELECTION_MIN_TIMEOUT: Duration = Duration::from_millis(750); +const ELECTION_MAX_TIMEOUT: Duration = Duration::from_millis(950); #[derive(Clone, Copy, Debug)] struct ArithmeticOperation { @@ -28,16 +30,13 @@ impl StateMachineTransition for ArithmeticOperation { struct Calculator { id: usize, value: i32, - value_tx: Sender, applied_ids_tx: Sender<(usize, usize)>, + pending_transitions: Vec, } impl StateMachine for Calculator { fn apply_transition(&mut self, transition: ArithmeticOperation) { self.value += transition.delta; - self.value_tx - .send(self.value) - .expect("could not send calculator value"); } fn register_transition_state( @@ -51,18 +50,36 @@ impl StateMachine for Calculator { .expect("could not send applied transition id"); } } + + fn get_pending_transitions(&mut self) -> Vec { + let cur = self.pending_transitions.clone(); + self.pending_transitions = Vec::new(); + cur + } } struct MyCluster { - receiver: Receiver>, transmitters: BTreeMap>>, - tasks: Receiver, + pending_messages: Vec>, + halt: bool, + leader: bool, id: usize, - leader: Option, } impl Cluster for MyCluster { - fn send(&self, to_id: usize, message: Message) { + fn register_leader(&mut self, leader_id: Option) { + if let Some(id) = leader_id { + if id == self.id { + self.leader = true; + } else { + self.leader = false; + } + } else { + self.leader = false; + } + } + + fn send_message(&mut self, to_id: usize, message: Message) { if let Some(transmitter) = self.transmitters.get(&to_id) { match transmitter.send(message) { Ok(_) => {} @@ -71,38 +88,19 @@ impl Cluster for MyCluster { } } - fn receive_timeout( - &self, - timeout: std::time::Duration, - ) -> Option> { - match self.receiver.recv_timeout(timeout) { - Ok(t) => Some(t), - Err(_) => None, - } + fn halt(&self) -> bool { + self.halt } - fn get_pending_transitions(&self) -> Vec { - if let Some(cur_leader) = self.leader { - if cur_leader == self.id { - return match self.tasks.try_recv() { - Ok(t) => vec![t; 1], - Err(_) => vec![], - }; - } - } - - Vec::new() - } - - fn register_leader_change(&mut self, leader_id: Option) { - self.leader = leader_id; + fn receive_messages(&mut self) -> Vec> { + let cur = self.pending_messages.clone(); + self.pending_messages = Vec::new(); + cur } } #[test] -fn run_replicas() -> Result<(), SendError> { - // Create transmitters and receivers that replicas will be communicating - // through. In this test, replicas communicate over mspc channels. +fn run_replicas() { let (mut transmitters, mut receivers) = (BTreeMap::new(), BTreeMap::new()); for i in 0..=2 { let (tx, rx) = unbounded::>(); @@ -110,21 +108,20 @@ fn run_replicas() -> Result<(), SendError> { receivers.insert(i, rx); } - // Create clusters and mspc channel for each of the replicas. The channels - // are used to send mathematical operations for the cluster to pipe to the - // replicas. - let (task_tx, task_rx) = unbounded::(); - let (calculator_tx, _calculator_rx) = unbounded::(); - let (applied_tx, applied_rx) = unbounded::<(usize, usize)>(); + let (mut state_machines, mut clusters, mut notifiers) = (Vec::new(), Vec::new(), Vec::new()); + let (applied_tx, applied_rx) = unbounded(); for i in 0..=2 { - let cluster = MyCluster { - id: i, - receiver: receivers.remove(&i).unwrap(), + // Create the cluster. + let cluster = Arc::new(Mutex::new(MyCluster { transmitters: transmitters.clone(), - tasks: task_rx.clone(), - leader: None, - }; + pending_messages: Vec::new(), + halt: false, + leader: false, + id: i, + })); + clusters.push((cluster.clone(), receivers.remove(&i).unwrap())); + // Create peer ids. let mut peer_ids = Vec::new(); for n in 0..=2 { if n != i { @@ -132,36 +129,124 @@ fn run_replicas() -> Result<(), SendError> { } } - let new_calculator_tx = calculator_tx.clone(); + // Create the state machine. let new_applied_tx = applied_tx.clone(); + let state_machine = Arc::new(Mutex::new(Calculator { + id: i, + value: 0, + pending_transitions: Vec::new(), + applied_ids_tx: new_applied_tx, + })); + + // Create noop transition. + let noop = ArithmeticOperation { delta: 0, id: 0 }; + state_machines.push(state_machine.clone()); + let (message_notifier_tx, message_notifier_rx) = channel::unbounded(); + let (transition_notifier_tx, transition_notifier_rx) = channel::unbounded(); + notifiers.push((message_notifier_tx, transition_notifier_tx)); thread::spawn(move || { - Replica::new( + let mut replica = Replica::new( i, peer_ids, - Box::new(cluster), - Box::new(Calculator { - id: i, - value: 0, - value_tx: new_calculator_tx, - applied_ids_tx: new_applied_tx, - }), - ArithmeticOperation { delta: 0, id: 0 }, - ) - .start( - ELECTION_MIN_TIMEOUT, - ELECTION_MAX_TIMEOUT, - std::time::Duration::from_millis(HEARTBEAT_TIMEOUT), + cluster, + state_machine, + noop, + HEARTBEAT_TIMEOUT, + (ELECTION_MIN_TIMEOUT, ELECTION_MAX_TIMEOUT), ); + replica.start(message_notifier_rx, transition_notifier_rx); }); } - thread::sleep(std::time::Duration::from_secs(2)); - task_tx.send(ArithmeticOperation { delta: 5, id: 1 })?; - task_tx.send(ArithmeticOperation { delta: -51, id: 2 })?; - task_tx.send(ArithmeticOperation { delta: -511, id: 3 })?; - task_tx.send(ArithmeticOperation { delta: 3, id: 4 })?; - thread::sleep(std::time::Duration::from_secs(2)); + let new_clusters = clusters.clone(); + for (i, (cluster, receiver)) in new_clusters.into_iter().enumerate() { + let message_notifier = notifiers[i].0.clone(); + thread::spawn(move || loop { + let msg = receiver.recv().unwrap(); + cluster.lock().unwrap().pending_messages.push(msg); + let _ = message_notifier.send(()); + }); + } + thread::sleep(Duration::from_secs(3)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut leader_id = None; + if cluster.lock().unwrap().leader { + leader_id = Some(i); + } + + if let Some(l_id) = leader_id { + state_machines[l_id] + .lock() + .unwrap() + .pending_transitions + .push(ArithmeticOperation { delta: 5, id: 1 }); + let _ = notifiers[l_id].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut leader_id = None; + if cluster.lock().unwrap().leader { + leader_id = Some(i); + } + + if let Some(l_id) = leader_id { + state_machines[l_id] + .lock() + .unwrap() + .pending_transitions + .push(ArithmeticOperation { delta: -51, id: 2 }); + let _ = notifiers[l_id].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut leader_id = None; + if cluster.lock().unwrap().leader { + leader_id = Some(i); + } + + if let Some(l_id) = leader_id { + state_machines[l_id] + .lock() + .unwrap() + .pending_transitions + .push(ArithmeticOperation { delta: -511, id: 3 }); + let _ = notifiers[l_id].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut leader_id = None; + if cluster.lock().unwrap().leader { + leader_id = Some(i); + } + + if let Some(l_id) = leader_id { + state_machines[l_id] + .lock() + .unwrap() + .pending_transitions + .push(ArithmeticOperation { delta: 3, id: 4 }); + let _ = notifiers[l_id].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (cluster, _) in clusters.iter() { + let mut c = cluster.lock().unwrap(); + c.halt = true; + } + + thread::sleep(Duration::from_secs(5)); let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect(); let expected_vec: Vec = vec![0, 1, 2, 3, 4]; assert_eq!( @@ -193,6 +278,4 @@ fn run_replicas() -> Result<(), SendError> { acc }) ); - - Ok(()) }