Skip to content

Commit

Permalink
Merge pull request #10 from andreev-io/ilya/transition-ids
Browse files Browse the repository at this point in the history
Adding IDs to state machine transitions and adding transition statuses
  • Loading branch information
andreev-io committed Aug 15, 2021
2 parents 87c1fb5 + b541464 commit f3178ef
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 132 deletions.
53 changes: 36 additions & 17 deletions example/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use little_raft::{
cluster::Cluster, message::Message, replica::Replica, state_machine::StateMachine,
cluster::Cluster,
message::Message,
replica::Replica,
state_machine::{StateMachine, StateMachineTransition},
};
use std::{
collections::BTreeMap,
Expand All @@ -13,28 +16,36 @@ const ELECTION_MIN_TIMEOUT: u64 = 2500;
const ELECTION_MAX_TIMEOUT: u64 = 3500;

#[derive(Clone, Copy)]
struct MathAction {
struct ArithmeticalTransition {
delta: i32,
id: usize,
}

impl StateMachineTransition for ArithmeticalTransition {
type TransitionID = usize;
fn get_id(&self) -> Self::TransitionID {
self.id
}
}

struct Calculator {
value: i32,
}

impl StateMachine<MathAction> for Calculator {
fn apply_action(&mut self, action: MathAction) {
self.value += action.delta;
impl StateMachine<ArithmeticalTransition> for Calculator {
fn apply_transition(&mut self, transition: ArithmeticalTransition) {
self.value += transition.delta;
}
}

struct MyCluster {
receiver: Receiver<Message<MathAction>>,
transmitters: BTreeMap<usize, Sender<Message<MathAction>>>,
tasks: Receiver<MathAction>,
receiver: Receiver<Message<ArithmeticalTransition>>,
transmitters: BTreeMap<usize, Sender<Message<ArithmeticalTransition>>>,
tasks: Receiver<ArithmeticalTransition>,
}

impl Cluster<MathAction> for MyCluster {
fn send(&self, to_id: usize, message: Message<MathAction>) {
impl Cluster<ArithmeticalTransition> for MyCluster {
fn send(&self, to_id: usize, message: Message<ArithmeticalTransition>) {
if let Some(transmitter) = self.transmitters.get(&to_id) {
match transmitter.send(message) {
Ok(_) => {}
Expand All @@ -43,14 +54,17 @@ impl Cluster<MathAction> for MyCluster {
}
}

fn receive_timeout(&self, timeout: std::time::Duration) -> Option<Message<MathAction>> {
fn receive_timeout(
&self,
timeout: std::time::Duration,
) -> Option<Message<ArithmeticalTransition>> {
match self.receiver.recv_timeout(timeout) {
Ok(t) => Some(t),
Err(_) => None,
}
}

fn get_actions(&self) -> Vec<MathAction> {
fn get_transitions(&self) -> Vec<ArithmeticalTransition> {
match self.tasks.try_recv() {
Ok(t) => vec![t; 1],
Err(_) => vec![],
Expand All @@ -66,7 +80,7 @@ fn main() {
let mut transmitters = BTreeMap::new();
let mut receivers = BTreeMap::new();
for i in 0..=2 {
let (tx, rx) = channel::<Message<MathAction>>();
let (tx, rx) = channel::<Message<ArithmeticalTransition>>();
transmitters.insert(i, tx);
receivers.insert(i, rx);
}
Expand All @@ -77,7 +91,7 @@ fn main() {
let mut clusters = BTreeMap::new();
let mut task_transmitters = BTreeMap::new();
for i in 0..=2 {
let (task_tx, task_rx) = channel::<MathAction>();
let (task_tx, task_rx) = channel::<ArithmeticalTransition>();
task_transmitters.insert(i, task_tx);
clusters.insert(
i,
Expand All @@ -98,7 +112,7 @@ fn main() {
peer_ids,
Box::new(cluster),
Box::new(Calculator { value: 0 }),
MathAction { delta: 0 },
ArithmeticalTransition { delta: 0, id: 0 },
)
.start(
ELECTION_MIN_TIMEOUT,
Expand All @@ -120,8 +134,9 @@ fn parse_control_line(s: &str) -> (usize, String) {
}

// This function blocks forever.
fn process_control_messages(transmitters: BTreeMap<usize, Sender<MathAction>>) {
fn process_control_messages(transmitters: BTreeMap<usize, Sender<ArithmeticalTransition>>) {
let mut next_unprocessed_line: usize = 0;
let mut cur_id = 1;
loop {
let buffer = match fs::read_to_string("input.txt") {
Ok(buf) => buf,
Expand Down Expand Up @@ -155,10 +170,14 @@ fn process_control_messages(transmitters: BTreeMap<usize, Sender<MathAction>>) {
transmitters
.get(&id)
.unwrap()
.send(MathAction { delta: delta })
.send(ArithmeticalTransition {
delta: delta,
id: cur_id,
})
.unwrap_or_else(|error| {
println!("{}", error);
});
cur_id += 1;
}
_ => {}
};
Expand Down
13 changes: 8 additions & 5 deletions little_raft/src/cluster.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
use crate::message::Message;
use crate::{message::Message, state_machine::StateMachineTransition};
use std::time::Duration;

// Cluster provides the means of communication of a replica with the rest of the
// cluster and the user.
pub trait Cluster<A> {
pub trait Cluster<T>
where
T: StateMachineTransition,
{
// This function is used to deliver messages to target replicas. The
// algorithm assumes that send can silently fail.
fn send(&self, to_id: usize, message: Message<A>);
fn send(&self, to_id: usize, message: Message<T>);
// This function is used to received messages for the replicas. This
// function must block until timeout expires or a message is received,
// whichever comes first.
fn receive_timeout(&self, timeout: Duration) -> Option<Message<A>>;
fn receive_timeout(&self, timeout: Duration) -> Option<Message<T>>;
// This function is used to receive actions from the user that the
// distributed state machine needs to replicate and apply. All replicas poll
// this function periodically but only Leaders merit the return value.
// Non-Leaders ignore the return value of get_action.
fn get_actions(&self) -> Vec<A>;
fn get_transitions(&self) -> Vec<T>;
}
58 changes: 41 additions & 17 deletions little_raft/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,69 @@
// Action describes a user-defined transition of the distributed state machine.
// It has some associated metadata, namely the term when the action was created
use crate::replica::ReplicaID;
use crate::state_machine::StateMachineTransition;

// Entry describes a user-defined transition of the distributed state machine.
// It has some associated metadata, namely the term when the entry was created
// and its index in the log.
#[derive(Clone)]
pub struct Action<A> {
pub action: A,
pub struct Entry<T>
where
T: StateMachineTransition,
{
pub transition: T,
pub index: usize,
pub term: usize,
pub state: EntryState,
}

// State of a particular entry.
#[derive(Clone, Copy)]
pub enum EntryState {
// Entry being queued means that the replica is aware of it and is
// replicating it across the cluster.
Queued,
// Entry being committed means that the entry is guaranteed to be in the log
// of all future leaders in the cluster.
Committed,
// Entry being applied means that it has been applied to the state machine.
Applied,
}

// Message describes messages that the replicas pass between each other to
// achieve consensus on the distributed state machine.
pub enum Message<A> {
// ActionRequest is used by Leaders to send out actions for other nodes to
// append to their log. It also has information on what actions are ready to
// be applied to the state machine. ActionRequest is also used as a heart
// beat message by Leaders even when no new actions need to be processed.
ActionRequest {
from_id: usize,
pub enum Message<T>
where
T: StateMachineTransition,
{
// AppendEntryRequest is used by Leaders to send out logs for other replicas
// to append to their log. It also has information on what logs are ready to
// be applied to the state machine. AppendEntryRequest is also used as a
// heart beat message by Leaders even when no new logs need to be processed.
AppendEntryRequest {
from_id: ReplicaID,
term: usize,
prev_log_index: usize,
prev_log_term: usize,
actions: Vec<Action<A>>,
entries: Vec<Entry<T>>,
commit_index: usize,
},
// ActionResponse is used by replicas to respond to ActionRequest messages.
ActionResponse {
from_id: usize,
// AppendEntryResponse is used by replicas to respond to AppendEntryRequest
// messages.
AppendEntryResponse {
from_id: ReplicaID,
term: usize,
success: bool,
last_index: usize,
},
// VoteRequest is used by Candidates to solicit votes for themselves.
VoteRequest {
from_id: usize,
from_id: ReplicaID,
term: usize,
last_log_index: usize,
last_log_term: usize,
},
// VoteResponse is used by replicas to respond to VoteRequest messages.
VoteResponse {
from_id: usize,
from_id: ReplicaID,
term: usize,
vote_granted: bool,
},
Expand Down
Loading

0 comments on commit f3178ef

Please sign in to comment.