diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index f3cd755..e810f32 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -19,4 +19,6 @@ where fn get_pending_transitions(&mut self) -> Vec; fn halt(&self) -> bool; + + fn register_leader(&mut self, leader_id: Option); } diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 8ade248..813e212 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -287,6 +287,7 @@ where fn load_new_entries(&mut self) { for transition in self.cluster.lock().unwrap().get_pending_transitions() { if self.state == State::Leader { + // panic!("got a transition am a leader"); self.entries.push(Entry { index: self.entries.len(), transition: transition, @@ -296,6 +297,8 @@ where let mut state_machine = self.state_machine.lock().unwrap(); state_machine .register_transition_state(transition.get_id(), TransitionState::Queued); + } else { + // panic!("got a transition but am not a leader"); } } } @@ -309,6 +312,7 @@ where last_index, } => { if term > self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); } else if success { self.next_index.insert(from_id, last_index + 1); @@ -339,6 +343,7 @@ where }, ); } else if self.current_term < term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); } @@ -346,6 +351,7 @@ where if self.entries[self.entries.len() - 1].index <= last_log_index && self.entries[self.entries.len() - 1].term <= last_log_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.cluster.lock().unwrap().send( from_id, Message::VoteResponse { @@ -434,6 +440,7 @@ where } } + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.cluster.lock().unwrap().send( from_id, Message::AppendEntryResponse { @@ -499,6 +506,7 @@ where vote_granted: bool, ) { if term > self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); } else if vote_granted { if let Some(cur_votes) = &mut self.current_votes { @@ -517,6 +525,7 @@ where message: Message, ) { if term > self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); self.process_message_as_follower(message); } else { @@ -538,6 +547,7 @@ where message: Message, ) { if term >= self.current_term { + self.cluster.lock().unwrap().register_leader(Some(from_id)); self.become_follower(term); self.process_message_as_follower(message); } else { @@ -554,6 +564,7 @@ where } fn become_leader(&mut self) { + self.cluster.lock().unwrap().register_leader(Some(self.id)); self.state = State::Leader; self.current_votes = None; self.voted_for = None; diff --git a/little_raft/tests/raft.rs b/little_raft/tests/raft.rs index 5617ea9..c1b2b7e 100644 --- a/little_raft/tests/raft.rs +++ b/little_raft/tests/raft.rs @@ -1,5 +1,5 @@ use crossbeam_channel as channel; -use crossbeam_channel::{unbounded, Receiver, Sender}; +use crossbeam_channel::{unbounded, Sender}; use little_raft::{ cluster::Cluster, message::Message, @@ -56,9 +56,23 @@ struct MyCluster { pending_transitions: Vec, pending_messages: Vec>, halt: bool, + leader: bool, + id: usize, } impl Cluster for MyCluster { + fn register_leader(&mut self, leader_id: Option) { + if let Some(id) = leader_id { + if id == self.id { + self.leader = true; + } else { + self.leader = false; + } + } else { + self.leader = false; + } + } + fn send(&mut self, to_id: usize, message: Message) { if let Some(transmitter) = self.transmitters.get(&to_id) { match transmitter.send(message) { @@ -103,6 +117,8 @@ fn run_replicas() { pending_transitions: Vec::new(), pending_messages: Vec::new(), halt: false, + leader: false, + id: i, })); clusters.push((cluster.clone(), receivers.remove(&i).unwrap())); @@ -152,49 +168,86 @@ fn run_replicas() { }); } - thread::sleep(Duration::from_secs(5)); - for (cluster, _) in clusters.into_iter() { + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut c = cluster.lock().unwrap(); + if c.leader { + c.pending_transitions + .push(ArithmeticOperation { delta: 5, id: 1 }); + let _ = notifiers[i].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut c = cluster.lock().unwrap(); + if c.leader { + c.pending_transitions + .push(ArithmeticOperation { delta: -51, id: 2 }); + let _ = notifiers[i].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut c = cluster.lock().unwrap(); + if c.leader { + c.pending_transitions + .push(ArithmeticOperation { delta: -511, id: 3 }); + let _ = notifiers[i].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (i, (cluster, _)) in clusters.iter().enumerate() { + let mut c = cluster.lock().unwrap(); + if c.leader { + c.pending_transitions + .push(ArithmeticOperation { delta: 3, id: 4 }); + let _ = notifiers[i].1.send(()); + break; + } + } + + thread::sleep(Duration::from_secs(2)); + for (cluster, _) in clusters.iter() { let mut c = cluster.lock().unwrap(); c.halt = true; } + thread::sleep(Duration::from_secs(5)); - thread::sleep(Duration::from_secs(1)); - // thread::sleep(std::time::Duration::from_secs(2)); - // task_tx.send(ArithmeticOperation { delta: 5, id: 1 })?; - // task_tx.send(ArithmeticOperation { delta: -51, id: 2 })?; - // task_tx.send(ArithmeticOperation { delta: -511, id: 3 })?; - // task_tx.send(ArithmeticOperation { delta: 3, id: 4 })?; - // thread::sleep(std::time::Duration::from_secs(2)); - - // let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect(); - // let expected_vec: Vec = vec![0, 1, 2, 3, 4]; - // assert_eq!( - // expected_vec, - // applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - // if x.0 == 0 { - // acc.push(x.1); - // }; - // acc - // }) - // ); - - // assert_eq!( - // expected_vec, - // applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - // if x.0 == 1 { - // acc.push(x.1); - // }; - // acc - // }) - // ); - - // assert_eq!( - // expected_vec, - // applied_transactions.iter().fold(Vec::new(), |mut acc, x| { - // if x.0 == 2 { - // acc.push(x.1); - // }; - // acc - // }) - // ); + let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect(); + let expected_vec: Vec = vec![0, 1, 2, 3, 4]; + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 0 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 1 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 2 { + acc.push(x.1); + }; + acc + }) + ); }