Skip to content

Commit

Permalink
fixup! Introducing snapshotting
Browse files Browse the repository at this point in the history
  • Loading branch information
andreev-io committed Dec 22, 2021
1 parent 72ed46d commit accf685
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 62 deletions.
11 changes: 6 additions & 5 deletions little_raft/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
use crate::{message::Message, replica::ReplicaID, state_machine::StateMachineTransition};
use crate::{message::Message, replica::ReplicaID, state_machine::{StateMachineTransition}};

/// Cluster is used for the local Raft Replica to communicate with the rest of
/// the Raft cluster. It is up to the user how to abstract that communication.
/// The Cluster trait also contains hooks which the Replica will use to inform
/// the crate user of state changes.
pub trait Cluster<T>
pub trait Cluster<T, D>
where
T: StateMachineTransition,
D: Clone,
{
/// This function is used to deliver messages to target Replicas. The
/// Replica will provide the to_id of the other Replica it's trying to send
/// its message to and provide the message itself. The send_message
/// implementation must not block but is allowed silently fail -- Raft
/// implementation must not block but is allowed to silently fail -- Raft
/// exists to achieve consensus in spite of failures, after all.
fn send_message(&mut self, to_id: usize, message: Message<T>);
fn send_message(&mut self, to_id: usize, message: Message<T, D>);

/// This function is used by the Replica to receive pending messages from
/// the cluster. The receive_messages implementation must not block and must
/// not return the same message more than once. Note that receive_messages
/// is only called when the Replica is notified via the recv_msg channel.
fn receive_messages(&mut self) -> Vec<Message<T>>;
fn receive_messages(&mut self) -> Vec<Message<T, D>>;

/// By returning true from halt you can signal to the Replica that it should
/// stop running.
Expand Down
8 changes: 4 additions & 4 deletions little_raft/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::replica::ReplicaID;
use crate::state_machine::StateMachineTransition;
use bytes::Bytes;
use crate::state_machine::{StateMachineTransition};

/// LogEntry is a state machine transition along with some metadata needed for
/// Raft.
Expand All @@ -17,9 +16,10 @@ where
/// Message describes messages that the replicas pass between each other to
/// achieve consensus on the distributed state machine.
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd)]
pub enum Message<T>
pub enum Message<T, D>
where
T: StateMachineTransition,
D: Clone,
{
/// AppendEntryRequest is used by the Leader to send out logs for other
/// replicas to append to their log. It also has information on what logs
Expand Down Expand Up @@ -66,7 +66,7 @@ where
last_included_index: usize,
last_included_term: usize,
offset: usize,
data: Bytes,
data: D,
done: bool,
},

Expand Down
37 changes: 19 additions & 18 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::{
},
timer::Timer,
};
use bytes::Bytes;
use crossbeam_channel::{Receiver, Select};
use rand::Rng;
use std::cmp::Ordering;
Expand Down Expand Up @@ -38,11 +37,12 @@ enum ReplicaError {
/// to maintain the consistency of the user-defined StateMachine across the
/// cluster. It uses the user-defined Cluster implementation to talk to other
/// Replicas, be it over the network or pigeon post.
pub struct Replica<C, M, T>
pub struct Replica<C, M, T, D>
where
C: Cluster<T>,
M: StateMachine<T>,
C: Cluster<T, D>,
M: StateMachine<T, D>,
T: StateMachineTransition,
D: Clone,
{
/// ID of this Replica.
id: ReplicaID,
Expand Down Expand Up @@ -109,7 +109,7 @@ where
/// The log snapshot of this Replica. Even if snapshot_delta is 0, the
/// snapshot field can be Some(_), since the Replica can be started with a
/// seed snapshot.
snapshot: Option<Snapshot>,
snapshot: Option<Snapshot<D>>,

/// The length of the log sequence that is represented by the snapshot.
/// Since compacted entries aren't in the log anymore, access to the log
Expand All @@ -121,11 +121,12 @@ where
index_offset: usize,
}

impl<C, M, T> Replica<C, M, T>
impl<C, M, T, D> Replica<C, M, T, D>
where
C: Cluster<T>,
M: StateMachine<T>,
C: Cluster<T, D>,
M: StateMachine<T, D>,
T: StateMachineTransition,
D: Clone,
{
/// Create a new Replica.
///
Expand Down Expand Up @@ -164,7 +165,7 @@ where
noop_transition: T,
heartbeat_timeout: Duration,
election_timeout_range: (Duration, Duration),
) -> Replica<C, M, T> {
) -> Replica<C, M, T, D> {
let snapshot = state_machine.lock().unwrap().get_snapshot();
// index_offset is the "length" of the snapshot, so calculate it as
// snapshot.last_included_index + 1.
Expand Down Expand Up @@ -343,7 +344,7 @@ where
self.load_new_transitions();
}

fn process_message(&mut self, message: Message<T>) {
fn process_message(&mut self, message: Message<T, D>) {
match self.state {
State::Leader => self.process_message_as_leader(message),
State::Candidate => self.process_message_as_candidate(message),
Expand Down Expand Up @@ -386,7 +387,7 @@ where

fn broadcast_message<F>(&self, message_generator: F)
where
F: Fn(usize) -> Message<T>,
F: Fn(usize) -> Message<T, D>,
{
self.peer_ids.iter().for_each(|peer_id| {
self.cluster
Expand Down Expand Up @@ -491,7 +492,7 @@ where
}
}

fn process_message_as_leader(&mut self, message: Message<T>) {
fn process_message_as_leader(&mut self, message: Message<T, D>) {
match message {
Message::AppendEntryResponse {
from_id,
Expand Down Expand Up @@ -617,7 +618,7 @@ where
last_included_index: usize,
last_included_term: usize,
_offset: usize,
data: Bytes,
data: D,
_done: bool,
) {
if self.current_term > term {
Expand Down Expand Up @@ -744,7 +745,7 @@ where
);
}

fn process_message_as_follower(&mut self, message: Message<T>) {
fn process_message_as_follower(&mut self, message: Message<T, D>) {
match message {
Message::VoteRequest {
from_id,
Expand Down Expand Up @@ -790,7 +791,7 @@ where
}
}

fn process_message_as_candidate(&mut self, message: Message<T>) {
fn process_message_as_candidate(&mut self, message: Message<T, D>) {
match message {
Message::AppendEntryRequest { term, from_id, .. } => {
self.process_append_entry_request_as_candidate(term, from_id, message)
Expand All @@ -814,7 +815,7 @@ where
&mut self,
from_id: ReplicaID,
term: usize,
message: Message<T>,
message: Message<T, D>,
) {
// If the term is greater or equal to current term, then there's an
// active Leader, so convert self to a follower. If the term is smaller
Expand Down Expand Up @@ -862,7 +863,7 @@ where
&mut self,
term: usize,
from_id: ReplicaID,
message: Message<T>,
message: Message<T, D>,
) {
if term > self.current_term {
self.cluster.lock().unwrap().register_leader(None);
Expand All @@ -884,7 +885,7 @@ where
&mut self,
term: usize,
from_id: ReplicaID,
message: Message<T>,
message: Message<T, D>,
) {
if term >= self.current_term {
self.cluster.lock().unwrap().register_leader(None);
Expand Down
14 changes: 7 additions & 7 deletions little_raft/src/state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use bytes::Bytes;
use std::fmt::Debug;

/// TransitionState describes the state of a particular transition.
Expand Down Expand Up @@ -50,18 +49,19 @@ pub trait StateMachineTransition: Clone + Debug {
/// Replica start from a saved state or perform log compaction before the log
/// sequence starts taking up too much memory.
#[derive(Clone)]
pub struct Snapshot {
pub struct Snapshot<D> where D: Clone {
pub last_included_index: usize,
pub last_included_term: usize,
pub data: Bytes,
pub data: D,
}

/// StateMachine describes a user-defined state machine that is replicated
/// across the cluster. Raft can replicate whatever distributed state machine
/// can implement this trait.
pub trait StateMachine<T>
pub trait StateMachine<T, D>
where
T: StateMachineTransition,
D: Clone,
{
/// This is a hook that the local Replica will call each time the state of a
/// particular transition changes. It is up to the user what to do with that
Expand Down Expand Up @@ -94,7 +94,7 @@ where
/// snapshot.last_included_term are truthful. However, it is up to the user
/// to put the StateMachine into the right state before returning from
/// load_snapshot().
fn get_snapshot(&mut self) -> Option<Snapshot>;
fn get_snapshot(&mut self) -> Option<Snapshot<D>>;

/// create_snapshot is periodically called by the Replica if log compaction
/// is enabled by setting snapshot_delta > 0. The implementation MUST create
Expand All @@ -107,11 +107,11 @@ where
&mut self,
last_included_index: usize,
last_included_term: usize,
) -> Snapshot;
) -> Snapshot<D>;

/// When a Replica receives a snapshot from another Replica, set_snapshot is
/// called. The StateMachine MUST then load its state from the provided
/// snapshot and potentially save said snapshot to persistent storage, same
/// way it is done in create_snapshot.
fn set_snapshot(&mut self, snapshot: Snapshot);
fn set_snapshot(&mut self, snapshot: Snapshot<D>);
}
28 changes: 14 additions & 14 deletions little_raft/tests/raft_stable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ struct Calculator {
pending_transitions: Vec<ArithmeticOperation>,
}

impl StateMachine<ArithmeticOperation> for Calculator {
impl StateMachine<ArithmeticOperation, Bytes> for Calculator {
fn apply_transition(&mut self, transition: ArithmeticOperation) {
self.value += transition.delta;
println!("id {} my value is now {} after applying delta {}", self.id, self.value, transition.delta);
Expand All @@ -66,12 +66,12 @@ impl StateMachine<ArithmeticOperation> for Calculator {
cur
}

fn get_snapshot(&mut self) -> Option<Snapshot> {
fn get_snapshot(&mut self) -> Option<Snapshot<Bytes>> {
println!("checked for snapshot");
None
}

fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot {
fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot<Bytes> {
println!("created snapshot");
Snapshot {
last_included_index: index,
Expand All @@ -80,7 +80,7 @@ impl StateMachine<ArithmeticOperation> for Calculator {
}
}

fn set_snapshot(&mut self, snapshot: Snapshot) {
fn set_snapshot(&mut self, snapshot: Snapshot<Bytes>) {
let v: Vec<u8> = snapshot.data.into_iter().collect();
self.value = i32::from_be_bytes(v[..].try_into().expect("incorrect length"));
println!("my value is now {} after loading", self.value);
Expand All @@ -91,12 +91,12 @@ impl StateMachine<ArithmeticOperation> for Calculator {
struct ThreadCluster {
id: usize,
is_leader: bool,
transmitters: BTreeMap<usize, Sender<Message<ArithmeticOperation>>>,
pending_messages: Vec<Message<ArithmeticOperation>>,
transmitters: BTreeMap<usize, Sender<Message<ArithmeticOperation, Bytes>>>,
pending_messages: Vec<Message<ArithmeticOperation, Bytes>>,
halt: bool,
}

impl Cluster<ArithmeticOperation> for ThreadCluster {
impl Cluster<ArithmeticOperation, Bytes> for ThreadCluster {
fn register_leader(&mut self, leader_id: Option<usize>) {
if let Some(id) = leader_id {
if id == self.id {
Expand All @@ -109,7 +109,7 @@ impl Cluster<ArithmeticOperation> for ThreadCluster {
}
}

fn send_message(&mut self, to_id: usize, message: Message<ArithmeticOperation>) {
fn send_message(&mut self, to_id: usize, message: Message<ArithmeticOperation, Bytes>) {
if let Some(transmitter) = self.transmitters.get(&to_id) {
transmitter.send(message).expect("could not send message");
}
Expand All @@ -119,7 +119,7 @@ impl Cluster<ArithmeticOperation> for ThreadCluster {
self.halt
}

fn receive_messages(&mut self) -> Vec<Message<ArithmeticOperation>> {
fn receive_messages(&mut self) -> Vec<Message<ArithmeticOperation, Bytes>> {
let cur = self.pending_messages.clone();
self.pending_messages = Vec::new();
cur
Expand All @@ -130,7 +130,7 @@ impl Cluster<ArithmeticOperation> for ThreadCluster {
// communication between replicas (threads).
fn create_clusters(
n: usize,
transmitters: BTreeMap<usize, Sender<Message<ArithmeticOperation>>>,
transmitters: BTreeMap<usize, Sender<Message<ArithmeticOperation, Bytes>>>,
) -> Vec<Arc<Mutex<ThreadCluster>>> {
let mut clusters = Vec::new();
for i in 0..n {
Expand All @@ -152,12 +152,12 @@ fn create_clusters(
fn create_communication_between_clusters(
n: usize,
) -> (
BTreeMap<usize, Sender<Message<ArithmeticOperation>>>,
Vec<Receiver<Message<ArithmeticOperation>>>,
BTreeMap<usize, Sender<Message<ArithmeticOperation, Bytes>>>,
Vec<Receiver<Message<ArithmeticOperation, Bytes>>>,
) {
let (mut transmitters, mut receivers) = (BTreeMap::new(), Vec::new());
for i in 0..n {
let (tx, rx) = unbounded::<Message<ArithmeticOperation>>();
let (tx, rx) = unbounded::<Message<ArithmeticOperation, Bytes>>();
transmitters.insert(i, tx);
receivers.push(rx);
}
Expand Down Expand Up @@ -228,7 +228,7 @@ fn create_notifiers(

fn run_clusters_communication(
mut clusters: Vec<Arc<Mutex<ThreadCluster>>>,
mut cluster_message_receivers: Vec<Receiver<Message<ArithmeticOperation>>>,
mut cluster_message_receivers: Vec<Receiver<Message<ArithmeticOperation, Bytes>>>,
mut message_notifiers_tx: Vec<Sender<()>>,
) {
for _ in (0..clusters.len()).rev() {
Expand Down
Loading

0 comments on commit accf685

Please sign in to comment.