diff --git a/Cargo.lock b/Cargo.lock index 00fa58f..806abba 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -118,7 +118,7 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" [[package]] name = "little_raft" -version = "0.1.2" +version = "0.1.3" dependencies = [ "crossbeam", "crossbeam-channel", diff --git a/README.md b/README.md index 3beda73..1f866d3 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,15 @@ # Little Raft The lightest distributed consensus library. Run your own replicated state machine! :heart: -## Using -To use this library, you only need to do three things. +## Installing +Simply import the crate. In your `Cargo.toml`, add +``` +[dependencies] +little_raft = "0.1" +``` +## Using +To start running Little Raft, you only need to do three things. 1. Implement the StateMachine that you want your cluster to maintain. Little Raft will take care of replicating this machine across the cluster and achieving consensus on its state. ```rust /// StateMachine describes a user-defined state machine that is replicated @@ -122,13 +128,28 @@ where ``` -With that, you're good to go. We are working on examples, but for now you can look at the `little_raft/tests` directory. We're working on adding more tests. +With that, you're good to go. We are working on examples, but for now you can look at the `little_raft/tests` directory and at the documentation at [https://docs.rs/little_raft/0.1.3/little_raft/](https://docs.rs/little_raft/0.1.3/little_raft/). We're working on adding more tests. ## Testing Run `cargo test`. ## Contributing -Contributions are very welcome! Do remember that one of the goals of this library is to be as small and simple as possible. Let's keep the code in `little_raft/src` under 1,000 lines. No PRs breaking this rule will be merged. +Contributions are very welcome! Do remember that one of the goals of this library is to be as small and simple as possible. Let's keep the code in `little_raft/src` **under 1,000 lines**. PRs breaking this rule will be declined. +```bash +> cloc little_raft/src + 6 text files. + 6 unique files. + 0 files ignored. + +github.com/AlDanial/cloc v 1.90 T=0.02 s (369.2 files/s, 56185.0 lines/s) +------------------------------------------------------------------------------- +Language files blank comment code +------------------------------------------------------------------------------- +Rust 6 82 199 632 +------------------------------------------------------------------------------- +SUM: 6 82 199 632 +------------------------------------------------------------------------------- +``` You are welcome to pick up and work on any of the issues open for this project. Or you can submit new issues if anything comes up from your experience using this library. \ No newline at end of file diff --git a/little_raft/Cargo.toml b/little_raft/Cargo.toml index c13acbf..50d12ad 100644 --- a/little_raft/Cargo.toml +++ b/little_raft/Cargo.toml @@ -1,7 +1,7 @@ [package] description = "The lightest distributed consensus library. Run your own replicated state machine!" name = "little_raft" -version = "0.1.2" +version = "0.1.3" authors = ["Ilya Andreev "] edition = "2018" license = "MIT" diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index cbe4563..67b0f4b 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -17,7 +17,8 @@ where /// This function is used by the Replica to receive pending messages from /// the cluster. The receive_messages implementation must not block and must - /// not return the same message more than once. + /// not return the same message more than once. Note that receive_messages + /// is only called when the Replica is notified via the recv_msg channel. fn receive_messages(&mut self) -> Vec>; /// By returning true from halt you can signal to the Replica that it should diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index b1ef9c6..8c852b4 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -203,7 +203,7 @@ where .expect("could not react to a new message"); let messages = self.cluster.lock().unwrap().receive_messages(); for message in messages { - self.process_message_as_leader(message); + self.process_message(message); } } // Process pending transitions. @@ -242,7 +242,7 @@ where } for message in messages { - self.process_message_as_follower(message); + self.process_message(message); } } // Become candidate and update elction deadline. @@ -252,10 +252,20 @@ where } } - // Load new transitions. + // Load new transitions. The follower will ignore these transitions, but + // they are still polled for periodically to ensure there are no stale + // transitions in case the Replica's state changes. self.load_new_transitions(); } + fn process_message(&mut self, message: Message) { + match self.state { + State::Leader => self.process_message_as_leader(message), + State::Candidate => self.process_message_as_candidate(message), + State::Follower => self.process_message_as_follower(message), + } + } + fn update_election_deadline(&mut self) { // Randomize each election deadline within the allowed range. self.next_election_deadline = Instant::now() @@ -273,7 +283,7 @@ where self.update_election_deadline(); } for message in messages { - self.process_message_as_candidate(message); + self.process_message(message); } } // Become candidate and update elction deadline. @@ -283,7 +293,9 @@ where } } - // Load new transitions. + // Load new transitions. The candidate will ignore these transitions, + // but they are still polled for periodically to ensure there are no + // stale transitions in case the Replica's state changes. self.load_new_transitions(); } @@ -376,7 +388,7 @@ where } => { if term > self.current_term { // Become follower if another node's term is higher. - self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } else if success { // Update information about the peer's logs. @@ -411,7 +423,7 @@ where ); } else if self.current_term < term { // Become follower if the other replica's term is higher. - self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } @@ -420,7 +432,7 @@ where && self.log[self.log.len() - 1].term <= last_log_term { // If the criteria are met, grant the vote. - self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().register_leader(None); self.cluster.lock().unwrap().send_message( from_id, Message::VoteResponse { @@ -429,6 +441,7 @@ where vote_granted: true, }, ); + self.voted_for = Some(from_id); } else { // If the criteria are not met, do not grant the vote. self.cluster.lock().unwrap().send_message( @@ -559,7 +572,7 @@ where fn process_message_as_candidate(&mut self, message: Message) { match message { Message::AppendEntryRequest { term, from_id, .. } => { - self.process_transition_request_as_candidate(term, from_id, message) + self.process_append_entry_request_as_candidate(term, from_id, message) } Message::VoteRequest { term, from_id, .. } => { self.process_vote_request_as_candidate(term, from_id, message) @@ -580,7 +593,7 @@ where vote_granted: bool, ) { if term > self.current_term { - self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); } else if vote_granted { // Record that the vote has been granted. @@ -603,9 +616,9 @@ where message: Message, ) { if term > self.current_term { - self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); - self.process_message_as_follower(message); + self.process_message(message); } else { self.cluster.lock().unwrap().send_message( from_id, @@ -618,16 +631,16 @@ where } } - fn process_transition_request_as_candidate( + fn process_append_entry_request_as_candidate( &mut self, term: usize, from_id: ReplicaID, message: Message, ) { if term >= self.current_term { - self.cluster.lock().unwrap().register_leader(Some(from_id)); + self.cluster.lock().unwrap().register_leader(None); self.become_follower(term); - self.process_message_as_follower(message); + self.process_message(message); } else { self.cluster.lock().unwrap().send_message( from_id, diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index 7abdef1..b87836b 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -47,9 +47,10 @@ where fn apply_transition(&mut self, transition: T); /// This function is used to receive transitions from the user that need to - /// be applied to the replicated state machine. Note that while all Replicas - /// poll get_pending_transitions periodically, only the Leader Replica - /// actually processes them. All other Replicas discard pending transitions. - /// get_pending_transitions must not return the same transition twice. + /// be applied to the replicated state machine. Note that only the Leader + /// Replica processes transitions and only when notified via the + /// recv_transition channel. All other Replicas poll for transitions and + /// discard them. get_pending_transitions must not return the same + /// transition twice. fn get_pending_transitions(&mut self) -> Vec; } diff --git a/little_raft/tests/raft.rs b/little_raft/tests/raft.rs deleted file mode 100644 index afffc63..0000000 --- a/little_raft/tests/raft.rs +++ /dev/null @@ -1,281 +0,0 @@ -use crossbeam_channel as channel; -use crossbeam_channel::{unbounded, Sender}; -use little_raft::{ - cluster::Cluster, - message::Message, - replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, -}; -use std::sync::{Arc, Mutex}; - -use std::{collections::BTreeMap, thread, time::Duration}; - -const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(500); -const ELECTION_MIN_TIMEOUT: Duration = Duration::from_millis(750); -const ELECTION_MAX_TIMEOUT: Duration = Duration::from_millis(950); - -#[derive(Clone, Copy, Debug)] -struct ArithmeticOperation { - delta: i32, - id: usize, -} - -impl StateMachineTransition for ArithmeticOperation { - type TransitionID = usize; - fn get_id(&self) -> Self::TransitionID { - self.id - } -} - -struct Calculator { - id: usize, - value: i32, - applied_ids_tx: Sender<(usize, usize)>, - pending_transitions: Vec, -} - -impl StateMachine for Calculator { - fn apply_transition(&mut self, transition: ArithmeticOperation) { - self.value += transition.delta; - } - - fn register_transition_state( - &mut self, - transition_id: ::TransitionID, - state: TransitionState, - ) { - if state == TransitionState::Applied { - self.applied_ids_tx - .send((self.id, transition_id)) - .expect("could not send applied transition id"); - } - } - - fn get_pending_transitions(&mut self) -> Vec { - let cur = self.pending_transitions.clone(); - self.pending_transitions = Vec::new(); - cur - } -} - -struct MyCluster { - transmitters: BTreeMap>>, - pending_messages: Vec>, - halt: bool, - leader: bool, - id: usize, -} - -impl Cluster for MyCluster { - fn register_leader(&mut self, leader_id: Option) { - if let Some(id) = leader_id { - if id == self.id { - self.leader = true; - } else { - self.leader = false; - } - } else { - self.leader = false; - } - } - - fn send_message(&mut self, to_id: usize, message: Message) { - if let Some(transmitter) = self.transmitters.get(&to_id) { - match transmitter.send(message) { - Ok(_) => {} - Err(t) => println!("{}", t), - } - } - } - - fn halt(&self) -> bool { - self.halt - } - - fn receive_messages(&mut self) -> Vec> { - let cur = self.pending_messages.clone(); - self.pending_messages = Vec::new(); - cur - } -} - -#[test] -fn run_replicas() { - let (mut transmitters, mut receivers) = (BTreeMap::new(), BTreeMap::new()); - for i in 0..=2 { - let (tx, rx) = unbounded::>(); - transmitters.insert(i, tx); - receivers.insert(i, rx); - } - - let (mut state_machines, mut clusters, mut notifiers) = (Vec::new(), Vec::new(), Vec::new()); - let (applied_tx, applied_rx) = unbounded(); - for i in 0..=2 { - // Create the cluster. - let cluster = Arc::new(Mutex::new(MyCluster { - transmitters: transmitters.clone(), - pending_messages: Vec::new(), - halt: false, - leader: false, - id: i, - })); - clusters.push((cluster.clone(), receivers.remove(&i).unwrap())); - - // Create peer ids. - let mut peer_ids = Vec::new(); - for n in 0..=2 { - if n != i { - peer_ids.push(n); - } - } - - // Create the state machine. - let new_applied_tx = applied_tx.clone(); - let state_machine = Arc::new(Mutex::new(Calculator { - id: i, - value: 0, - pending_transitions: Vec::new(), - applied_ids_tx: new_applied_tx, - })); - - // Create noop transition. - let noop = ArithmeticOperation { delta: 0, id: 0 }; - state_machines.push(state_machine.clone()); - let (message_notifier_tx, message_notifier_rx) = channel::unbounded(); - let (transition_notifier_tx, transition_notifier_rx) = channel::unbounded(); - notifiers.push((message_notifier_tx, transition_notifier_tx)); - thread::spawn(move || { - let mut replica = Replica::new( - i, - peer_ids, - cluster, - state_machine, - noop, - HEARTBEAT_TIMEOUT, - (ELECTION_MIN_TIMEOUT, ELECTION_MAX_TIMEOUT), - ); - replica.start(message_notifier_rx, transition_notifier_rx); - }); - } - - let new_clusters = clusters.clone(); - for (i, (cluster, receiver)) in new_clusters.into_iter().enumerate() { - let message_notifier = notifiers[i].0.clone(); - thread::spawn(move || loop { - let msg = receiver.recv().unwrap(); - cluster.lock().unwrap().pending_messages.push(msg); - let _ = message_notifier.send(()); - }); - } - - thread::sleep(Duration::from_secs(3)); - for (i, (cluster, _)) in clusters.iter().enumerate() { - let mut leader_id = None; - if cluster.lock().unwrap().leader { - leader_id = Some(i); - } - - if let Some(l_id) = leader_id { - state_machines[l_id] - .lock() - .unwrap() - .pending_transitions - .push(ArithmeticOperation { delta: 5, id: 1 }); - let _ = notifiers[l_id].1.send(()); - break; - } - } - - thread::sleep(Duration::from_secs(2)); - for (i, (cluster, _)) in clusters.iter().enumerate() { - let mut leader_id = None; - if cluster.lock().unwrap().leader { - leader_id = Some(i); - } - - if let Some(l_id) = leader_id { - state_machines[l_id] - .lock() - .unwrap() - .pending_transitions - .push(ArithmeticOperation { delta: -51, id: 2 }); - let _ = notifiers[l_id].1.send(()); - break; - } - } - - thread::sleep(Duration::from_secs(2)); - for (i, (cluster, _)) in clusters.iter().enumerate() { - let mut leader_id = None; - if cluster.lock().unwrap().leader { - leader_id = Some(i); - } - - if let Some(l_id) = leader_id { - state_machines[l_id] - .lock() - .unwrap() - .pending_transitions - .push(ArithmeticOperation { delta: -511, id: 3 }); - let _ = notifiers[l_id].1.send(()); - break; - } - } - - thread::sleep(Duration::from_secs(2)); - for (i, (cluster, _)) in clusters.iter().enumerate() { - let mut leader_id = None; - if cluster.lock().unwrap().leader { - leader_id = Some(i); - } - - if let Some(l_id) = leader_id { - state_machines[l_id] - .lock() - .unwrap() - .pending_transitions - .push(ArithmeticOperation { delta: 3, id: 4 }); - let _ = notifiers[l_id].1.send(()); - break; - } - } - - thread::sleep(Duration::from_secs(2)); - for (cluster, _) in clusters.iter() { - let mut c = cluster.lock().unwrap(); - c.halt = true; - } - - thread::sleep(Duration::from_secs(5)); - let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect(); - let expected_vec: Vec = vec![0, 1, 2, 3, 4]; - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 0 { - acc.push(x.1); - }; - acc - }) - ); - - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 1 { - acc.push(x.1); - }; - acc - }) - ); - - assert_eq!( - expected_vec, - applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - if x.0 == 2 { - acc.push(x.1); - }; - acc - }) - ); -} diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs new file mode 100644 index 0000000..9ec648f --- /dev/null +++ b/little_raft/tests/raft_stable.rs @@ -0,0 +1,380 @@ +use crossbeam_channel as channel; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use little_raft::{ + cluster::Cluster, + message::Message, + replica::Replica, + state_machine::{StateMachine, StateMachineTransition, TransitionState}, +}; +use std::sync::{Arc, Mutex}; + +use std::{collections::BTreeMap, thread, time::Duration}; + +const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(500); +const MIN_ELECTION_TIMEOUT: Duration = Duration::from_millis(750); +const MAX_ELECTION_TIMEOUT: Duration = Duration::from_millis(950); + +// Our state machine will carry out simple plus and minus operations on a +// number, starting from zero. +#[derive(Clone, Copy, Debug)] +struct ArithmeticOperation { + id: usize, + delta: i32, +} + +impl StateMachineTransition for ArithmeticOperation { + type TransitionID = usize; + fn get_id(&self) -> Self::TransitionID { + self.id + } +} + +// The Calculator is the state machine that maintains a number that we can add +// to or subtract from. ID is simply for convenience. +struct Calculator { + id: usize, + value: i32, + applied_ids_tx: Sender<(usize, usize)>, + pending_transitions: Vec, +} + +impl StateMachine for Calculator { + fn apply_transition(&mut self, transition: ArithmeticOperation) { + self.value += transition.delta; + } + + fn register_transition_state( + &mut self, + transition_id: ::TransitionID, + state: TransitionState, + ) { + // Send IDs of applied transitions down the channel so we can confirm + // they were applied in the right order. + if state == TransitionState::Applied { + self.applied_ids_tx + .send((self.id, transition_id)) + .expect("could not send applied transition id"); + } + } + + fn get_pending_transitions(&mut self) -> Vec { + let cur = self.pending_transitions.clone(); + self.pending_transitions = Vec::new(); + cur + } +} + +// Our test replicas will be running each in its own thread. +struct ThreadCluster { + id: usize, + is_leader: bool, + transmitters: BTreeMap>>, + pending_messages: Vec>, + halt: bool, +} + +impl Cluster for ThreadCluster { + fn register_leader(&mut self, leader_id: Option) { + if let Some(id) = leader_id { + if id == self.id { + self.is_leader = true; + } else { + self.is_leader = false; + } + } else { + self.is_leader = false; + } + } + + fn send_message(&mut self, to_id: usize, message: Message) { + if let Some(transmitter) = self.transmitters.get(&to_id) { + transmitter.send(message).expect("could not send message"); + } + } + + fn halt(&self) -> bool { + self.halt + } + + fn receive_messages(&mut self) -> Vec> { + let cur = self.pending_messages.clone(); + self.pending_messages = Vec::new(); + cur + } +} + +// Create n clusters, each with their own copy of trasmitters used for +// communication between replicas (threads). +fn create_clusters( + n: usize, + transmitters: BTreeMap>>, +) -> Vec>> { + let mut clusters = Vec::new(); + for i in 0..n { + let cluster = Arc::new(Mutex::new(ThreadCluster { + id: i, + is_leader: false, + transmitters: transmitters.clone(), + pending_messages: Vec::new(), + halt: false, + })); + + clusters.push(cluster); + } + + clusters +} + +// Create channels for the threads to communicate with. +fn create_communication_between_clusters( + n: usize, +) -> ( + BTreeMap>>, + Vec>>, +) { + let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new()); + for i in 0..n { + let (tx, rx) = unbounded::>(); + transmitters.insert(i, tx); + receivers.push(rx); + } + + (transmitters, receivers) +} + +fn create_peer_ids(n: usize) -> Vec> { + let mut all_peer_ids = Vec::new(); + for i in 0..n { + let mut peer_ids = Vec::new(); + for n in 0..n { + if n != i { + peer_ids.push(n); + } + } + all_peer_ids.push(peer_ids); + } + + all_peer_ids +} + +// Create state machines, each with its own copy on which to send +// (state_machine_id, transition_id) for transitions that have been applied. +fn create_state_machines( + n: usize, + applied_transitions_tx: Sender<(usize, usize)>, +) -> Vec>> { + let mut state_machines = Vec::new(); + for i in 0..n { + let state_machine = Arc::new(Mutex::new(Calculator { + id: i, + value: 0, + pending_transitions: Vec::new(), + applied_ids_tx: applied_transitions_tx.clone(), + })); + state_machines.push(state_machine); + } + state_machines +} + +// Create sending ends of message notifiers, sending ends of transition +// notifiers, receiving ends of message notifiers, receiving neds of transition +// notifiers. +fn create_notifiers( + n: usize, +) -> ( + Vec>, + Vec>, + Vec>, + Vec>, +) { + let mut message_tx = Vec::new(); + let mut message_rx = Vec::new(); + let mut transition_tx = Vec::new(); + let mut transition_rx = Vec::new(); + for _ in 0..n { + let (message_notifier_tx, message_notifier_rx) = channel::unbounded(); + let (transition_notifier_tx, transition_notifier_rx) = channel::unbounded(); + message_tx.push(message_notifier_tx); + message_rx.push(message_notifier_rx); + transition_tx.push(transition_notifier_tx); + transition_rx.push(transition_notifier_rx); + } + + (message_tx, transition_tx, message_rx, transition_rx) +} + +fn run_clusters_communication( + mut clusters: Vec>>, + mut cluster_message_receivers: Vec>>, + mut message_notifiers_tx: Vec>, +) { + for _ in (0..clusters.len()).rev() { + let cluster = clusters.pop().unwrap(); + let cluster_message_rx = cluster_message_receivers.pop().unwrap(); + let message_notifier = message_notifiers_tx.pop().unwrap(); + + // For each cluster, start a thread where we notify the cluster replica + // of a new message as soon as we receive one for it. + thread::spawn(move || loop { + let msg = cluster_message_rx.recv().unwrap(); + match cluster.lock() { + Ok(mut unlocked_cluster) => { + unlocked_cluster.pending_messages.push(msg); + message_notifier + .send(()) + .expect("could not notify of message"); + } + _ => return, + } + }); + } +} + +fn run_arithmetic_operation_on_cluster( + clusters: Vec>>, + state_machines: Vec>>, + transition_notifiers: Vec>, + delta: i32, + id: usize, +) { + thread::sleep(Duration::from_secs(1)); + // Find the leader and send the transition request to it. + for cluster in clusters.iter() { + let cluster = cluster.lock().unwrap(); + if cluster.is_leader { + state_machines[cluster.id] + .lock() + .unwrap() + .pending_transitions + .push(ArithmeticOperation { + delta: delta, + id: id, + }); + transition_notifiers[cluster.id] + .send(()) + .expect("could not send transition notification"); + break; + } + } + + thread::sleep(Duration::from_secs(2)); +} + +fn halt_clusters(clusters: Vec>>) { + thread::sleep(Duration::from_secs(1)); + for cluster in clusters.iter() { + let mut c = cluster.lock().unwrap(); + c.halt = true; + } + thread::sleep(Duration::from_secs(2)); +} + +#[test] +fn run_replicas() { + let n = 3; + // We are going to test that three replicas can elect a leader and process a + // few simple operations. + // + // Main complexity of this test set up comes from the fact that everything + // is running on a single machine, so we have to keep track of every + // cluster, replica, and state machine object. In the real world usage of + // the library it's unlikely there will ever be more than a single instance + // of each object per process or even a physical machine. + let (transmitters, receivers) = create_communication_between_clusters(3); + let clusters = create_clusters(n, transmitters); + let peer_ids = create_peer_ids(n); + let noop = ArithmeticOperation { delta: 0, id: 0 }; + let (applied_transitions_tx, applied_transitions_rx) = unbounded(); + let state_machines = create_state_machines(n, applied_transitions_tx); + let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); + for i in 0..n { + let local_peer_ids = peer_ids[i].clone(); + let cluster = clusters[i].clone(); + let state_machine = state_machines[i].clone(); + let m_rx = message_rx[i].clone(); + let t_rx = transition_rx[i].clone(); + thread::spawn(move || { + let mut replica = Replica::new( + i, + local_peer_ids, + cluster, + state_machine, + noop.clone(), + HEARTBEAT_TIMEOUT, + (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), + ); + + replica.start(m_rx, t_rx); + }); + } + + run_clusters_communication(clusters.clone(), receivers.clone(), message_tx); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + 5, + 1, + ); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + -51, + 2, + ); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + -511, + 3, + ); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + 3, + 4, + ); + + halt_clusters(clusters.clone()); + + // Below we confirm that every replica applied the same transitions in the + // same order. + let applied_transactions: Vec<(usize, usize)> = applied_transitions_rx.try_iter().collect(); + let expected_vec: Vec = vec![0, 1, 2, 3, 4]; + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 0 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 1 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 2 { + acc.push(x.1); + }; + acc + }) + ); +} diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs new file mode 100644 index 0000000..16b3483 --- /dev/null +++ b/little_raft/tests/raft_unstable.rs @@ -0,0 +1,377 @@ +use crossbeam_channel as channel; +use crossbeam_channel::{unbounded, Receiver, Sender}; +use little_raft::{ + cluster::Cluster, + message::Message, + replica::Replica, + state_machine::{StateMachine, StateMachineTransition, TransitionState}, +}; +use std::sync::{Arc, Mutex}; + +use std::{collections::BTreeMap, thread, time::Duration}; + +const HEARTBEAT_TIMEOUT: Duration = Duration::from_millis(500); +const MIN_ELECTION_TIMEOUT: Duration = Duration::from_millis(750); +const MAX_ELECTION_TIMEOUT: Duration = Duration::from_millis(950); + +// Our state machine will carry out simple plus and minus operations on a +// number, starting from zero. +#[derive(Clone, Copy, Debug)] +struct ArithmeticOperation { + id: usize, + delta: i32, +} + +impl StateMachineTransition for ArithmeticOperation { + type TransitionID = usize; + fn get_id(&self) -> Self::TransitionID { + self.id + } +} + +// The Calculator is the state machine that maintains a number that we can add +// to or subtract from. ID is simply for convenience. +struct Calculator { + id: usize, + value: i32, + applied_ids_tx: Sender<(usize, usize)>, + pending_transitions: Vec, +} + +impl StateMachine for Calculator { + fn apply_transition(&mut self, transition: ArithmeticOperation) { + self.value += transition.delta; + } + + fn register_transition_state( + &mut self, + transition_id: ::TransitionID, + state: TransitionState, + ) { + // Send IDs of applied transitions down the channel so we can confirm + // they were applied in the right order. + if state == TransitionState::Applied { + self.applied_ids_tx + .send((self.id, transition_id)) + .expect("could not send applied transition id"); + } + } + + fn get_pending_transitions(&mut self) -> Vec { + let cur = self.pending_transitions.clone(); + self.pending_transitions = Vec::new(); + cur + } +} + +// Our test replicas will be running each in its own thread. +struct ThreadCluster { + id: usize, + is_leader: bool, + transmitters: BTreeMap>>, + pending_messages: Vec>, + halt: bool, +} + +impl Cluster for ThreadCluster { + fn register_leader(&mut self, leader_id: Option) { + if let Some(id) = leader_id { + if id == self.id { + self.is_leader = true; + } else { + self.is_leader = false; + } + } else { + self.is_leader = false; + } + } + + fn send_message(&mut self, to_id: usize, message: Message) { + if let Some(transmitter) = self.transmitters.get(&to_id) { + transmitter.send(message).expect("could not send message"); + } + } + + fn halt(&self) -> bool { + self.halt + } + + fn receive_messages(&mut self) -> Vec> { + let cur = self.pending_messages.clone(); + self.pending_messages = Vec::new(); + cur + } +} + +// Create n clusters, each with their own copy of trasmitters used for +// communication between replicas (threads). +fn create_clusters( + n: usize, + transmitters: BTreeMap>>, +) -> Vec>> { + let mut clusters = Vec::new(); + for i in 0..n { + let cluster = Arc::new(Mutex::new(ThreadCluster { + id: i, + is_leader: false, + transmitters: transmitters.clone(), + pending_messages: Vec::new(), + halt: false, + })); + + clusters.push(cluster); + } + + clusters +} + +// Create channels for the threads to communicate with. +fn create_communication_between_clusters( + n: usize, +) -> ( + BTreeMap>>, + Vec>>, +) { + let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new()); + for i in 0..n { + let (tx, rx) = unbounded::>(); + transmitters.insert(i, tx); + receivers.push(rx); + } + + (transmitters, receivers) +} + +fn create_peer_ids(n: usize) -> Vec> { + let mut all_peer_ids = Vec::new(); + for i in 0..n { + let mut peer_ids = Vec::new(); + for n in 0..n { + if n != i { + peer_ids.push(n); + } + } + all_peer_ids.push(peer_ids); + } + + all_peer_ids +} + +// Create state machines, each with its own copy on which to send +// (state_machine_id, transition_id) for transitions that have been applied. +fn create_state_machines( + n: usize, + applied_transitions_tx: Sender<(usize, usize)>, +) -> Vec>> { + let mut state_machines = Vec::new(); + for i in 0..n { + let state_machine = Arc::new(Mutex::new(Calculator { + id: i, + value: 0, + pending_transitions: Vec::new(), + applied_ids_tx: applied_transitions_tx.clone(), + })); + state_machines.push(state_machine); + } + state_machines +} + +// Create sending ends of message notifiers, sending ends of transition +// notifiers, receiving ends of message notifiers, receiving neds of transition +// notifiers. +fn create_notifiers( + n: usize, +) -> ( + Vec>, + Vec>, + Vec>, + Vec>, +) { + let mut message_tx = Vec::new(); + let mut message_rx = Vec::new(); + let mut transition_tx = Vec::new(); + let mut transition_rx = Vec::new(); + for _ in 0..n { + let (message_notifier_tx, message_notifier_rx) = channel::unbounded(); + let (transition_notifier_tx, transition_notifier_rx) = channel::unbounded(); + message_tx.push(message_notifier_tx); + message_rx.push(message_notifier_rx); + transition_tx.push(transition_notifier_tx); + transition_rx.push(transition_notifier_rx); + } + + (message_tx, transition_tx, message_rx, transition_rx) +} + +fn run_clusters_communication( + mut clusters: Vec>>, + mut cluster_message_receivers: Vec>>, + mut message_notifiers_tx: Vec>, +) { + for _ in (0..clusters.len()).rev() { + let cluster = clusters.pop().unwrap(); + let cluster_message_rx = cluster_message_receivers.pop().unwrap(); + let message_notifier = message_notifiers_tx.pop().unwrap(); + + // For each cluster, start a thread where we notify the cluster replica + // of a new message as soon as we receive one for it. + thread::spawn(move || loop { + let msg = cluster_message_rx.recv().unwrap(); + match cluster.lock() { + Ok(mut unlocked_cluster) => { + unlocked_cluster.pending_messages.push(msg); + message_notifier + .send(()) + .expect("could not notify of message"); + } + _ => return, + } + }); + } +} + +fn run_arithmetic_operation_on_cluster( + clusters: Vec>>, + state_machines: Vec>>, + transition_notifiers: Vec>, + delta: i32, + id: usize, +) { + thread::sleep(Duration::from_secs(1)); + // Find the leader and send the transition request to it. + for cluster in clusters.iter() { + let cluster = cluster.lock().unwrap(); + if cluster.is_leader { + state_machines[cluster.id] + .lock() + .unwrap() + .pending_transitions + .push(ArithmeticOperation { + delta: delta, + id: id, + }); + transition_notifiers[cluster.id] + .send(()) + .expect("could not send transition notification"); + break; + } + } + + thread::sleep(Duration::from_secs(2)); +} + +fn halt_clusters(clusters: Vec>>) { + thread::sleep(Duration::from_secs(1)); + for cluster in clusters.iter() { + let mut c = cluster.lock().unwrap(); + c.halt = true; + } + thread::sleep(Duration::from_secs(2)); +} + +#[test] +fn run_replicas() { + let n = 3; + // We are going to test that three replicas can elect a leader and process a + // few simple operations. + // + // Main complexity of this test set up comes from the fact that everything + // is running on a single machine, so we have to keep track of every + // cluster, replica, and state machine object. In the real world usage of + // the library it's unlikely there will ever be more than a single instance + // of each object per process or even a physical machine. + let (transmitters, receivers) = create_communication_between_clusters(3); + let clusters = create_clusters(n, transmitters); + let peer_ids = create_peer_ids(n); + let noop = ArithmeticOperation { delta: 0, id: 0 }; + let (applied_transitions_tx, applied_transitions_rx) = unbounded(); + let state_machines = create_state_machines(n, applied_transitions_tx); + let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); + for i in 0..n { + let local_peer_ids = peer_ids[i].clone(); + let cluster = clusters[i].clone(); + let state_machine = state_machines[i].clone(); + let m_rx = message_rx[i].clone(); + let t_rx = transition_rx[i].clone(); + thread::spawn(move || { + let mut replica = Replica::new( + i, + local_peer_ids, + cluster, + state_machine, + noop.clone(), + HEARTBEAT_TIMEOUT, + (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), + ); + + replica.start(m_rx, t_rx); + }); + } + + run_clusters_communication(clusters.clone(), receivers.clone(), message_tx); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + 5, + 1, + ); + + // Signal to the 0th replica that it should halt, give the remaining + // replicas some time to reelect the leader, and mark the 0th replica as a + // non-leader. + clusters.clone()[0].lock().unwrap().halt = true; + thread::sleep(Duration::from_secs(2)); + clusters.clone()[0].lock().unwrap().is_leader = false; + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + -51, + 2, + ); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + -511, + 3, + ); + + run_arithmetic_operation_on_cluster( + clusters.clone(), + state_machines.clone(), + transition_tx.clone(), + 3, + 4, + ); + + halt_clusters(clusters.clone()); + + // Below we confirm that every replica applied the same transitions in the + // same order. + let applied_transactions: Vec<(usize, usize)> = applied_transitions_rx.try_iter().collect(); + let expected_vec: Vec = vec![1, 2, 3, 4]; + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 1 && x.1 != 0 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 2 && x.1 != 0 { + acc.push(x.1); + }; + acc + }) + ); +}