Skip to content

Commit

Permalink
Better test
Browse files Browse the repository at this point in the history
  • Loading branch information
andreev-io committed Aug 17, 2021
1 parent 5399867 commit 4365fd7
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 42 deletions.
2 changes: 2 additions & 0 deletions little_raft/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ where
fn get_pending_transitions(&mut self) -> Vec<T>;

fn halt(&self) -> bool;

fn register_leader(&mut self, leader_id: Option<usize>);
}
11 changes: 11 additions & 0 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ where
fn load_new_entries(&mut self) {
for transition in self.cluster.lock().unwrap().get_pending_transitions() {
if self.state == State::Leader {
// panic!("got a transition am a leader");
self.entries.push(Entry {
index: self.entries.len(),
transition: transition,
Expand All @@ -296,6 +297,8 @@ where
let mut state_machine = self.state_machine.lock().unwrap();
state_machine
.register_transition_state(transition.get_id(), TransitionState::Queued);
} else {
// panic!("got a transition but am not a leader");
}
}
}
Expand All @@ -309,6 +312,7 @@ where
last_index,
} => {
if term > self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.become_follower(term);
} else if success {
self.next_index.insert(from_id, last_index + 1);
Expand Down Expand Up @@ -339,13 +343,15 @@ where
},
);
} else if self.current_term < term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.become_follower(term);
}

if self.voted_for == None || self.voted_for == Some(from_id) {
if self.entries[self.entries.len() - 1].index <= last_log_index
&& self.entries[self.entries.len() - 1].term <= last_log_term
{
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().send(
from_id,
Message::VoteResponse {
Expand Down Expand Up @@ -434,6 +440,7 @@ where
}
}

self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().send(
from_id,
Message::AppendEntryResponse {
Expand Down Expand Up @@ -499,6 +506,7 @@ where
vote_granted: bool,
) {
if term > self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.become_follower(term);
} else if vote_granted {
if let Some(cur_votes) = &mut self.current_votes {
Expand All @@ -517,6 +525,7 @@ where
message: Message<T>,
) {
if term > self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.become_follower(term);
self.process_message_as_follower(message);
} else {
Expand All @@ -538,6 +547,7 @@ where
message: Message<T>,
) {
if term >= self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.become_follower(term);
self.process_message_as_follower(message);
} else {
Expand All @@ -554,6 +564,7 @@ where
}

fn become_leader(&mut self) {
self.cluster.lock().unwrap().register_leader(Some(self.id));
self.state = State::Leader;
self.current_votes = None;
self.voted_for = None;
Expand Down
137 changes: 95 additions & 42 deletions little_raft/tests/raft.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crossbeam_channel as channel;
use crossbeam_channel::{unbounded, Receiver, Sender};
use crossbeam_channel::{unbounded, Sender};
use little_raft::{
cluster::Cluster,
message::Message,
Expand Down Expand Up @@ -56,9 +56,23 @@ struct MyCluster {
pending_transitions: Vec<ArithmeticOperation>,
pending_messages: Vec<Message<ArithmeticOperation>>,
halt: bool,
leader: bool,
id: usize,
}

impl Cluster<ArithmeticOperation> for MyCluster {
fn register_leader(&mut self, leader_id: Option<usize>) {
if let Some(id) = leader_id {
if id == self.id {
self.leader = true;
} else {
self.leader = false;
}
} else {
self.leader = false;
}
}

fn send(&mut self, to_id: usize, message: Message<ArithmeticOperation>) {
if let Some(transmitter) = self.transmitters.get(&to_id) {
match transmitter.send(message) {
Expand Down Expand Up @@ -103,6 +117,8 @@ fn run_replicas() {
pending_transitions: Vec::new(),
pending_messages: Vec::new(),
halt: false,
leader: false,
id: i,
}));
clusters.push((cluster.clone(), receivers.remove(&i).unwrap()));

Expand Down Expand Up @@ -152,49 +168,86 @@ fn run_replicas() {
});
}

thread::sleep(Duration::from_secs(5));
for (cluster, _) in clusters.into_iter() {
thread::sleep(Duration::from_secs(2));
for (i, (cluster, _)) in clusters.iter().enumerate() {
let mut c = cluster.lock().unwrap();
if c.leader {
c.pending_transitions
.push(ArithmeticOperation { delta: 5, id: 1 });
let _ = notifiers[i].1.send(());
break;
}
}

thread::sleep(Duration::from_secs(2));
for (i, (cluster, _)) in clusters.iter().enumerate() {
let mut c = cluster.lock().unwrap();
if c.leader {
c.pending_transitions
.push(ArithmeticOperation { delta: -51, id: 2 });
let _ = notifiers[i].1.send(());
break;
}
}

thread::sleep(Duration::from_secs(2));
for (i, (cluster, _)) in clusters.iter().enumerate() {
let mut c = cluster.lock().unwrap();
if c.leader {
c.pending_transitions
.push(ArithmeticOperation { delta: -511, id: 3 });
let _ = notifiers[i].1.send(());
break;
}
}

thread::sleep(Duration::from_secs(2));
for (i, (cluster, _)) in clusters.iter().enumerate() {
let mut c = cluster.lock().unwrap();
if c.leader {
c.pending_transitions
.push(ArithmeticOperation { delta: 3, id: 4 });
let _ = notifiers[i].1.send(());
break;
}
}

thread::sleep(Duration::from_secs(2));
for (cluster, _) in clusters.iter() {
let mut c = cluster.lock().unwrap();
c.halt = true;
}
thread::sleep(Duration::from_secs(5));

thread::sleep(Duration::from_secs(1));
// thread::sleep(std::time::Duration::from_secs(2));
// task_tx.send(ArithmeticOperation { delta: 5, id: 1 })?;
// task_tx.send(ArithmeticOperation { delta: -51, id: 2 })?;
// task_tx.send(ArithmeticOperation { delta: -511, id: 3 })?;
// task_tx.send(ArithmeticOperation { delta: 3, id: 4 })?;
// thread::sleep(std::time::Duration::from_secs(2));

// let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect();
// let expected_vec: Vec<usize> = vec![0, 1, 2, 3, 4];
// assert_eq!(
// expected_vec,
// applied_transactions.iter().fold(Vec::new(), |mut acc, x| {
// if x.0 == 0 {
// acc.push(x.1);
// };
// acc
// })
// );

// assert_eq!(
// expected_vec,
// applied_transactions.iter().fold(Vec::new(), |mut acc, x| {
// if x.0 == 1 {
// acc.push(x.1);
// };
// acc
// })
// );

// assert_eq!(
// expected_vec,
// applied_transactions.iter().fold(Vec::new(), |mut acc, x| {
// if x.0 == 2 {
// acc.push(x.1);
// };
// acc
// })
// );
let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect();
let expected_vec: Vec<usize> = vec![0, 1, 2, 3, 4];
assert_eq!(
expected_vec,
applied_transactions.iter().fold(Vec::new(), |mut acc, x| {
if x.0 == 0 {
acc.push(x.1);
};
acc
})
);

assert_eq!(
expected_vec,
applied_transactions.iter().fold(Vec::new(), |mut acc, x| {
if x.0 == 1 {
acc.push(x.1);
};
acc
})
);

assert_eq!(
expected_vec,
applied_transactions.iter().fold(Vec::new(), |mut acc, x| {
if x.0 == 2 {
acc.push(x.1);
};
acc
})
);
}

0 comments on commit 4365fd7

Please sign in to comment.