From 0e0d8fb842993c7d6b6aafcc053526d7ceae35fd Mon Sep 17 00:00:00 2001 From: ilya Date: Mon, 16 Aug 2021 23:15:05 +0100 Subject: [PATCH] Finalizing abstractions and adding simple tests --- Cargo.lock | 100 ---------------- Cargo.toml | 1 - example/Cargo.toml | 15 --- example/src/main.rs | 188 ----------------------------- little_raft/src/cluster.rs | 3 +- little_raft/src/message.rs | 17 +-- little_raft/src/replica.rs | 100 ++++++---------- little_raft/src/state_machine.rs | 19 ++- little_raft/tests/raft.rs | 198 +++++++++++++++++++++++++++++++ 9 files changed, 255 insertions(+), 386 deletions(-) delete mode 100644 example/Cargo.toml delete mode 100644 example/src/main.rs create mode 100644 little_raft/tests/raft.rs diff --git a/Cargo.lock b/Cargo.lock index c74a672..e17c95f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,38 +2,12 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "ansi_term" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee49baf6cb617b853aa8d93bf420db2383fab46d314482ca2803b40d5fde979b" -dependencies = [ - "winapi", -] - -[[package]] -name = "atty" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" -dependencies = [ - "hermit-abi", - "libc", - "winapi", -] - [[package]] name = "autocfg" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" -[[package]] -name = "bitflags" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" - [[package]] name = "cfg-if" version = "1.0.0" @@ -50,32 +24,6 @@ dependencies = [ "time", ] -[[package]] -name = "clap" -version = "2.33.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" -dependencies = [ - "ansi_term", - "atty", - "bitflags", - "strsim", - "textwrap", - "unicode-width", - "vec_map", -] - -[[package]] -name = "colored" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3616f750b84d8f0de8a58bda93e08e2a81ad3f523089b05f1dffecab48c6cbd" -dependencies = [ - "atty", - "lazy_static", - "winapi", -] - [[package]] name = "crossbeam" version = "0.8.0" @@ -145,18 +93,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "example" -version = "0.1.0" -dependencies = [ - "clap", - "colored", - "crossbeam", - "crossbeam-channel", - "little_raft", - "time", -] - [[package]] name = "getrandom" version = "0.2.2" @@ -168,15 +104,6 @@ dependencies = [ "wasi", ] -[[package]] -name = "hermit-abi" -version = "0.1.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "322f4de77956e22ed0e5032c359a0f1273f1f7f0d79bfa3b8ffbc730d7fbcc5c" -dependencies = [ - "libc", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -302,21 +229,6 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d29ab0c6d3fc0ee92fe66e2d99f700eab17a8d57d1c1d3b748380fb20baa78cd" -[[package]] -name = "strsim" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a" - -[[package]] -name = "textwrap" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" -dependencies = [ - "unicode-width", -] - [[package]] name = "time" version = "0.1.43" @@ -336,18 +248,6 @@ dependencies = [ "chrono", ] -[[package]] -name = "unicode-width" -version = "0.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" - -[[package]] -name = "vec_map" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191" - [[package]] name = "wasi" version = "0.10.2+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 1c439d8..1162617 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,4 @@ [workspace] members = [ - "example", "little_raft" ] \ No newline at end of file diff --git a/example/Cargo.toml b/example/Cargo.toml deleted file mode 100644 index baa237c..0000000 --- a/example/Cargo.toml +++ /dev/null @@ -1,15 +0,0 @@ -[package] -name = "example" -version = "0.1.0" -authors = ["ilya "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -crossbeam-channel = "0.5.1" -crossbeam = "0.8.0" -time = "0.1.39" -colored = "2" -clap = "2.33.3" -little_raft = { path = "../little_raft" } diff --git a/example/src/main.rs b/example/src/main.rs deleted file mode 100644 index db4ddf5..0000000 --- a/example/src/main.rs +++ /dev/null @@ -1,188 +0,0 @@ -use little_raft::{ - cluster::Cluster, - message::Message, - replica::Replica, - state_machine::{StateMachine, StateMachineTransition}, -}; -use std::{ - collections::BTreeMap, - fs, - sync::mpsc::{channel, Receiver, Sender}, - thread, -}; - -const HEARTBEAT_TIMEOUT: u64 = 1000; -const ELECTION_MIN_TIMEOUT: u64 = 2500; -const ELECTION_MAX_TIMEOUT: u64 = 3500; - -#[derive(Clone, Copy)] -struct ArithmeticalTransition { - delta: i32, - id: usize, -} - -impl StateMachineTransition for ArithmeticalTransition { - type TransitionID = usize; - fn get_id(&self) -> Self::TransitionID { - self.id - } -} - -struct Calculator { - value: i32, -} - -impl StateMachine for Calculator { - fn apply_transition(&mut self, transition: ArithmeticalTransition) { - self.value += transition.delta; - } -} - -struct MyCluster { - receiver: Receiver>, - transmitters: BTreeMap>>, - tasks: Receiver, -} - -impl Cluster for MyCluster { - fn send(&self, to_id: usize, message: Message) { - if let Some(transmitter) = self.transmitters.get(&to_id) { - match transmitter.send(message) { - Ok(_) => {} - Err(t) => println!("{}", t), - } - } - } - - fn receive_timeout( - &self, - timeout: std::time::Duration, - ) -> Option> { - match self.receiver.recv_timeout(timeout) { - Ok(t) => Some(t), - Err(_) => None, - } - } - - fn get_transitions(&self) -> Vec { - match self.tasks.try_recv() { - Ok(t) => vec![t; 1], - Err(_) => vec![], - } - } -} - -// Start a simple cluster with 3 replicas. The distributed state machine -// maintains a number that a user can add to or subtract from. -fn main() { - // Create transmitter and receivers that replicas will be communicating - // through. In these example, replicas communicate over mspc channels. - let mut transmitters = BTreeMap::new(); - let mut receivers = BTreeMap::new(); - for i in 0..=2 { - let (tx, rx) = channel::>(); - transmitters.insert(i, tx); - receivers.insert(i, rx); - } - - // Create a cluster abstraction and an mspc channel for each of the - // replicas. The channel is used to send mathematical operations for the - // cluster to process to replicas. - let mut clusters = BTreeMap::new(); - let mut task_transmitters = BTreeMap::new(); - for i in 0..=2 { - let (task_tx, task_rx) = channel::(); - task_transmitters.insert(i, task_tx); - clusters.insert( - i, - MyCluster { - receiver: receivers.remove(&i).unwrap(), - transmitters: transmitters.clone(), - tasks: task_rx, - }, - ); - } - - for i in (0..2).rev() { - let cluster = clusters.remove(&i).unwrap(); - let peer_ids = transmitters.keys().cloned().filter(|id| id != &i).collect(); - thread::spawn(move || { - Replica::new( - i, - peer_ids, - Box::new(cluster), - Box::new(Calculator { value: 0 }), - ArithmeticalTransition { delta: 0, id: 0 }, - ) - .start( - ELECTION_MIN_TIMEOUT, - ELECTION_MAX_TIMEOUT, - std::time::Duration::from_millis(HEARTBEAT_TIMEOUT), - ); - }); - } - - process_control_messages(task_transmitters); -} - -fn parse_control_line(s: &str) -> (usize, String) { - let entry: Vec<&str> = s.split(":").collect(); - let idx = entry[0].parse::().expect("non-digit index"); - let mut command = entry[1].to_string(); - command.retain(|c| c != '/' && c != ' '); - return (idx, command); -} - -// This function blocks forever. -fn process_control_messages(transmitters: BTreeMap>) { - let mut next_unprocessed_line: usize = 0; - let mut cur_id = 1; - loop { - let buffer = match fs::read_to_string("input.txt") { - Ok(buf) => buf, - Err(_) => { - println!("Could not open input.txt"); - continue; - } - }; - - let lines: Vec<&str> = buffer.split("\n").collect(); - if lines.len() >= next_unprocessed_line + 1 && lines[next_unprocessed_line].contains("//") { - let (id, mut command) = parse_control_line(lines[next_unprocessed_line]); - next_unprocessed_line += 1; - - let delta = if command.contains("Apply") { - let delta = command - .split("Apply") - .nth(1) - .unwrap() - .parse::() - .expect("unparseable delta"); - command = String::from("Apply"); - delta - } else { - 0 - }; - - println!("Action for {:?} with delta {:?}", id, delta); - match command.as_str() { - "Apply" => { - transmitters - .get(&id) - .unwrap() - .send(ArithmeticalTransition { - delta: delta, - id: cur_id, - }) - .unwrap_or_else(|error| { - println!("{}", error); - }); - cur_id += 1; - } - _ => {} - }; - } - - thread::sleep(std::time::Duration::from_millis(100)); - } -} diff --git a/little_raft/src/cluster.rs b/little_raft/src/cluster.rs index 55753c6..25501aa 100644 --- a/little_raft/src/cluster.rs +++ b/little_raft/src/cluster.rs @@ -18,5 +18,6 @@ where // distributed state machine needs to replicate and apply. All replicas poll // this function periodically but only Leaders merit the return value. // Non-Leaders ignore the return value of get_action. - fn get_transitions(&self) -> Vec; + fn get_pending_transitions(&self) -> Vec; + fn register_leader_change(&mut self, leader_id: Option); } diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index e0b59a4..69ef0be 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -4,7 +4,7 @@ use crate::state_machine::StateMachineTransition; // Entry describes a user-defined transition of the distributed state machine. // It has some associated metadata, namely the term when the entry was created // and its index in the log. -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Entry where T: StateMachineTransition, @@ -12,24 +12,11 @@ where pub transition: T, pub index: usize, pub term: usize, - pub state: EntryState, -} - -// State of a particular entry. -#[derive(Clone, Copy)] -pub enum EntryState { - // Entry being queued means that the replica is aware of it and is - // replicating it across the cluster. - Queued, - // Entry being committed means that the entry is guaranteed to be in the log - // of all future leaders in the cluster. - Committed, - // Entry being applied means that it has been applied to the state machine. - Applied, } // Message describes messages that the replicas pass between each other to // achieve consensus on the distributed state machine. +#[derive(Debug)] pub enum Message where T: StateMachineTransition, diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 3a60eee..a18b9bd 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,8 +1,8 @@ use crate::{ cluster::Cluster, heartbeat_timer::HeartbeatTimer, - message::{Entry, EntryState, Message}, - state_machine::{StateMachine, StateMachineTransition}, + message::{Entry, Message}, + state_machine::{StateMachine, StateMachineTransition, TransitionState}, }; use rand::Rng; use std::{ @@ -10,7 +10,7 @@ use std::{ time::Duration, }; -#[derive(Clone, Copy, PartialEq)] +#[derive(Clone, Copy, PartialEq, Debug)] enum State { Follower, Candidate, @@ -92,7 +92,6 @@ where term: 0, index: 0, transition: noop_transition, - state: EntryState::Queued, }], noop_transition: noop_transition, commit_index: 0, @@ -124,47 +123,6 @@ where ); } - // Check if the replica is the current Leader. If yes, the Result is Ok. If - // not, the result if Err with the ID of the current Leader, if it's known. - pub fn is_leader(&self) -> Result<(), Option> { - match self.state { - State::Leader => Ok(()), - _ => { - if let Some(leader_id) = self.leader_id { - Err(Some(leader_id)) - } else { - Err(None) - } - } - } - } - - pub fn get_transition_status(&self, transition_id: T::TransitionID) -> Result { - if let Some(entry) = self - .entries - .iter() - .find(|entry| entry.transition.get_id() == transition_id) - { - Ok(entry.state) - } else { - Err(()) - } - } - - pub fn get_last_transition_statuses(&self, mut n: usize) -> Vec<(T::TransitionID, EntryState)> { - let mut results = Vec::new(); - while n > 0 && n <= self.entries.len() { - let index = self.entries.len() - n; - let id = self.entries[index].transition.get_id(); - let state = self.entries[index].state; - results.push((id, state)); - - n -= 1; - } - - results - } - fn broadcast_message(&self, message_generator: F) where F: Fn(usize) -> Message, @@ -193,7 +151,6 @@ where self.apply_ready_entries(); self.load_new_entries(); - std::thread::sleep(std::time::Duration::from_millis(5000)); } } @@ -243,6 +200,7 @@ where // replicated on the majority of the replicas. if self.state == State::Leader && self.commit_index < self.entries.len() - 1 { let mut n = self.entries.len() - 1; + let old_commit_index = self.commit_index; while n > self.commit_index { let num_replications = self.match_index.iter().fold( @@ -258,18 +216,11 @@ where n -= 1; } - let mut last_committed = self.commit_index; - loop { - match self.entries[last_committed].state { - EntryState::Queued => { - self.entries[last_committed].state = EntryState::Committed; - if last_committed == 0 { - break; - } - last_committed -= 1; - } - _ => break, - } + for i in old_commit_index + 1..=self.commit_index { + self.state_machine.register_transition_state( + self.entries[i].transition.get_id(), + TransitionState::Committed, + ); } } @@ -278,19 +229,24 @@ where self.last_applied += 1; self.state_machine .apply_transition(self.entries[self.last_applied].transition); - self.entries[self.last_applied].state = EntryState::Applied; + self.state_machine.register_transition_state( + self.entries[self.last_applied].transition.get_id(), + TransitionState::Applied, + ); } } fn load_new_entries(&mut self) { - for transition in self.cluster.get_transitions().iter() { + for transition in self.cluster.get_pending_transitions().iter() { if self.state == State::Leader { self.entries.push(Entry { index: self.entries.len(), transition: *transition, term: self.current_term, - state: EntryState::Queued, }); + + self.state_machine + .register_transition_state(transition.get_id(), TransitionState::Queued); } } } @@ -371,6 +327,21 @@ where } } + fn register_leader(&mut self, leader_id: usize) { + match self.leader_id { + Some(cur_leader_id) => { + if cur_leader_id != leader_id { + self.leader_id = Some(leader_id); + self.cluster.register_leader_change(Some(leader_id)); + } + } + None => { + self.leader_id = Some(leader_id); + self.cluster.register_leader_change(Some(leader_id)); + } + } + } + fn process_append_entry_request_as_follower( &mut self, from_id: ReplicaID, @@ -380,7 +351,7 @@ where entries: Vec>, commit_index: usize, ) { - self.leader_id = Some(from_id); + self.register_leader(from_id); // Check that the leader's term is at least as large as ours. if self.current_term > term { @@ -534,7 +505,7 @@ where from_id: ReplicaID, message: Message, ) { - self.leader_id = Some(from_id); + self.register_leader(from_id); if term >= self.current_term { self.become_follower(term); @@ -553,7 +524,7 @@ where } fn become_leader(&mut self) { - self.leader_id = Some(self.id); + self.register_leader(self.id); self.state = State::Leader; self.current_votes = None; self.voted_for = None; @@ -574,7 +545,6 @@ where index: self.entries.len(), transition: self.noop_transition, term: self.current_term, - state: EntryState::Queued, }); } diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index f6c27dc..d62c52f 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -1,4 +1,19 @@ -pub trait StateMachineTransition: Copy + Clone { +use std::fmt::Debug; + +// State of a particular transition. +#[derive(Clone, Copy, Debug, PartialEq)] +pub enum TransitionState { + // Transition being queued means that the replica is aware of it and is + // replicating it across the cluster. + Queued, + // Transition being committed means that the entry is guaranteed to be in + // the log of all future leaders in the cluster. + Committed, + // Entry being applied means that it has been applied to the state machine. + Applied, +} + +pub trait StateMachineTransition: Copy + Clone + Debug { type TransitionID: Eq; fn get_id(&self) -> Self::TransitionID; } @@ -9,6 +24,8 @@ pub trait StateMachine where T: StateMachineTransition, { + fn register_transition_state(&mut self, transition_id: T::TransitionID, state: TransitionState); + // When apply_transition is called, the local state machine must apply the // specified transition. fn apply_transition(&mut self, transition: T); diff --git a/little_raft/tests/raft.rs b/little_raft/tests/raft.rs new file mode 100644 index 0000000..d9086a6 --- /dev/null +++ b/little_raft/tests/raft.rs @@ -0,0 +1,198 @@ +use crossbeam_channel::{unbounded, Receiver, SendError, Sender}; +use little_raft::{ + cluster::Cluster, + message::Message, + replica::Replica, + state_machine::{StateMachine, StateMachineTransition, TransitionState}, +}; + +use std::{collections::BTreeMap, thread}; + +const HEARTBEAT_TIMEOUT: u64 = 200; +const ELECTION_MIN_TIMEOUT: u64 = 500; +const ELECTION_MAX_TIMEOUT: u64 = 700; + +#[derive(Clone, Copy, Debug)] +struct ArithmeticOperation { + delta: i32, + id: usize, +} + +impl StateMachineTransition for ArithmeticOperation { + type TransitionID = usize; + fn get_id(&self) -> Self::TransitionID { + self.id + } +} + +struct Calculator { + id: usize, + value: i32, + value_tx: Sender, + applied_ids_tx: Sender<(usize, usize)>, +} + +impl StateMachine for Calculator { + fn apply_transition(&mut self, transition: ArithmeticOperation) { + self.value += transition.delta; + self.value_tx + .send(self.value) + .expect("could not send calculator value"); + } + + fn register_transition_state( + &mut self, + transition_id: ::TransitionID, + state: TransitionState, + ) { + if state == TransitionState::Applied { + self.applied_ids_tx + .send((self.id, transition_id)) + .expect("could not send applied transition id"); + } + } +} + +struct MyCluster { + receiver: Receiver>, + transmitters: BTreeMap>>, + tasks: Receiver, + id: usize, + leader: Option, +} + +impl Cluster for MyCluster { + fn send(&self, to_id: usize, message: Message) { + if let Some(transmitter) = self.transmitters.get(&to_id) { + match transmitter.send(message) { + Ok(_) => {} + Err(t) => println!("{}", t), + } + } + } + + fn receive_timeout( + &self, + timeout: std::time::Duration, + ) -> Option> { + match self.receiver.recv_timeout(timeout) { + Ok(t) => Some(t), + Err(_) => None, + } + } + + fn get_pending_transitions(&self) -> Vec { + if let Some(cur_leader) = self.leader { + if cur_leader == self.id { + return match self.tasks.try_recv() { + Ok(t) => vec![t; 1], + Err(_) => vec![], + }; + } + } + + Vec::new() + } + + fn register_leader_change(&mut self, leader_id: Option) { + self.leader = leader_id; + } +} + +#[test] +fn run_replicas() -> Result<(), SendError> { + // Create transmitters and receivers that replicas will be communicating + // through. In this test, replicas communicate over mspc channels. + let (mut transmitters, mut receivers) = (BTreeMap::new(), BTreeMap::new()); + for i in 0..=2 { + let (tx, rx) = unbounded::>(); + transmitters.insert(i, tx); + receivers.insert(i, rx); + } + + // Create clusters and mspc channel for each of the replicas. The channels + // are used to send mathematical operations for the cluster to pipe to the + // replicas. + let (task_tx, task_rx) = unbounded::(); + let (calculator_tx, _calculator_rx) = unbounded::(); + let (applied_tx, applied_rx) = unbounded::<(usize, usize)>(); + for i in 0..=2 { + let cluster = MyCluster { + id: i, + receiver: receivers.remove(&i).unwrap(), + transmitters: transmitters.clone(), + tasks: task_rx.clone(), + leader: None, + }; + + let mut peer_ids = Vec::new(); + for n in 0..=2 { + if n != i { + peer_ids.push(n); + } + } + + let new_calculator_tx = calculator_tx.clone(); + let new_applied_tx = applied_tx.clone(); + thread::spawn(move || { + Replica::new( + i, + peer_ids, + Box::new(cluster), + Box::new(Calculator { + id: i, + value: 0, + value_tx: new_calculator_tx, + applied_ids_tx: new_applied_tx, + }), + ArithmeticOperation { delta: 0, id: 0 }, + ) + .start( + ELECTION_MIN_TIMEOUT, + ELECTION_MAX_TIMEOUT, + std::time::Duration::from_millis(HEARTBEAT_TIMEOUT), + ); + }); + } + + thread::sleep(std::time::Duration::from_secs(2)); + task_tx.send(ArithmeticOperation { delta: 5, id: 1 })?; + task_tx.send(ArithmeticOperation { delta: -51, id: 2 })?; + task_tx.send(ArithmeticOperation { delta: -511, id: 3 })?; + task_tx.send(ArithmeticOperation { delta: 3, id: 4 })?; + thread::sleep(std::time::Duration::from_secs(2)); + + let applied_transactions: Vec<(usize, usize)> = applied_rx.try_iter().collect(); + let expected_vec: Vec = vec![0, 1, 2, 3, 4]; + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 0 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 1 { + acc.push(x.1); + }; + acc + }) + ); + + assert_eq!( + expected_vec, + applied_transactions.iter().fold(Vec::new(), |mut acc, x| { + if x.0 == 2 { + acc.push(x.1); + }; + acc + }) + ); + + Ok(()) +}