Skip to content

Commit

Permalink
Merge pull request #14 from andreev-io/ilya/further
Browse files Browse the repository at this point in the history
Adding tests
  • Loading branch information
andreev-io authored Aug 21, 2021
2 parents 8cdc4ae + 5fbb76a commit ffee5ec
Show file tree
Hide file tree
Showing 9 changed files with 819 additions and 307 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 25 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Little Raft
The lightest distributed consensus library. Run your own replicated state machine! :heart:

## Using
To use this library, you only need to do three things.
## Installing
Simply import the crate. In your `Cargo.toml`, add
```
[dependencies]
little_raft = "0.1"
```

## Using
To start running Little Raft, you only need to do three things.
1. Implement the StateMachine that you want your cluster to maintain. Little Raft will take care of replicating this machine across the cluster and achieving consensus on its state.
```rust
/// StateMachine describes a user-defined state machine that is replicated
Expand Down Expand Up @@ -122,13 +128,28 @@ where
```


With that, you're good to go. We are working on examples, but for now you can look at the `little_raft/tests` directory. We're working on adding more tests.
With that, you're good to go. We are working on examples, but for now you can look at the `little_raft/tests` directory and at the documentation at [https://docs.rs/little_raft/0.1.3/little_raft/](https://docs.rs/little_raft/0.1.3/little_raft/). We're working on adding more tests.


## Testing
Run `cargo test`.

## Contributing
Contributions are very welcome! Do remember that one of the goals of this library is to be as small and simple as possible. Let's keep the code in `little_raft/src` under 1,000 lines. No PRs breaking this rule will be merged.
Contributions are very welcome! Do remember that one of the goals of this library is to be as small and simple as possible. Let's keep the code in `little_raft/src` **under 1,000 lines**. PRs breaking this rule will be declined.
```bash
> cloc little_raft/src
6 text files.
6 unique files.
0 files ignored.

github.com/AlDanial/cloc v 1.90 T=0.02 s (369.2 files/s, 56185.0 lines/s)
-------------------------------------------------------------------------------
Language files blank comment code
-------------------------------------------------------------------------------
Rust 6 82 199 632
-------------------------------------------------------------------------------
SUM: 6 82 199 632
-------------------------------------------------------------------------------
```

You are welcome to pick up and work on any of the issues open for this project. Or you can submit new issues if anything comes up from your experience using this library.
2 changes: 1 addition & 1 deletion little_raft/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
description = "The lightest distributed consensus library. Run your own replicated state machine!"
name = "little_raft"
version = "0.1.2"
version = "0.1.3"
authors = ["Ilya Andreev <[email protected]>"]
edition = "2018"
license = "MIT"
Expand Down
3 changes: 2 additions & 1 deletion little_raft/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ where

/// This function is used by the Replica to receive pending messages from
/// the cluster. The receive_messages implementation must not block and must
/// not return the same message more than once.
/// not return the same message more than once. Note that receive_messages
/// is only called when the Replica is notified via the recv_msg channel.
fn receive_messages(&mut self) -> Vec<Message<T>>;

/// By returning true from halt you can signal to the Replica that it should
Expand Down
43 changes: 28 additions & 15 deletions little_raft/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ where
.expect("could not react to a new message");
let messages = self.cluster.lock().unwrap().receive_messages();
for message in messages {
self.process_message_as_leader(message);
self.process_message(message);
}
}
// Process pending transitions.
Expand Down Expand Up @@ -242,7 +242,7 @@ where
}

for message in messages {
self.process_message_as_follower(message);
self.process_message(message);
}
}
// Become candidate and update elction deadline.
Expand All @@ -252,10 +252,20 @@ where
}
}

// Load new transitions.
// Load new transitions. The follower will ignore these transitions, but
// they are still polled for periodically to ensure there are no stale
// transitions in case the Replica's state changes.
self.load_new_transitions();
}

fn process_message(&mut self, message: Message<T>) {
match self.state {
State::Leader => self.process_message_as_leader(message),
State::Candidate => self.process_message_as_candidate(message),
State::Follower => self.process_message_as_follower(message),
}
}

fn update_election_deadline(&mut self) {
// Randomize each election deadline within the allowed range.
self.next_election_deadline = Instant::now()
Expand All @@ -273,7 +283,7 @@ where
self.update_election_deadline();
}
for message in messages {
self.process_message_as_candidate(message);
self.process_message(message);
}
}
// Become candidate and update elction deadline.
Expand All @@ -283,7 +293,9 @@ where
}
}

// Load new transitions.
// Load new transitions. The candidate will ignore these transitions,
// but they are still polled for periodically to ensure there are no
// stale transitions in case the Replica's state changes.
self.load_new_transitions();
}

Expand Down Expand Up @@ -376,7 +388,7 @@ where
} => {
if term > self.current_term {
// Become follower if another node's term is higher.
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
} else if success {
// Update information about the peer's logs.
Expand Down Expand Up @@ -411,7 +423,7 @@ where
);
} else if self.current_term < term {
// Become follower if the other replica's term is higher.
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
}

Expand All @@ -420,7 +432,7 @@ where
&& self.log[self.log.len() - 1].term <= last_log_term
{
// If the criteria are met, grant the vote.
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().register_leader(None);
self.cluster.lock().unwrap().send_message(
from_id,
Message::VoteResponse {
Expand All @@ -429,6 +441,7 @@ where
vote_granted: true,
},
);
self.voted_for = Some(from_id);
} else {
// If the criteria are not met, do not grant the vote.
self.cluster.lock().unwrap().send_message(
Expand Down Expand Up @@ -559,7 +572,7 @@ where
fn process_message_as_candidate(&mut self, message: Message<T>) {
match message {
Message::AppendEntryRequest { term, from_id, .. } => {
self.process_transition_request_as_candidate(term, from_id, message)
self.process_append_entry_request_as_candidate(term, from_id, message)
}
Message::VoteRequest { term, from_id, .. } => {
self.process_vote_request_as_candidate(term, from_id, message)
Expand All @@ -580,7 +593,7 @@ where
vote_granted: bool,
) {
if term > self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
} else if vote_granted {
// Record that the vote has been granted.
Expand All @@ -603,9 +616,9 @@ where
message: Message<T>,
) {
if term > self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
self.process_message_as_follower(message);
self.process_message(message);
} else {
self.cluster.lock().unwrap().send_message(
from_id,
Expand All @@ -618,16 +631,16 @@ where
}
}

fn process_transition_request_as_candidate(
fn process_append_entry_request_as_candidate(
&mut self,
term: usize,
from_id: ReplicaID,
message: Message<T>,
) {
if term >= self.current_term {
self.cluster.lock().unwrap().register_leader(Some(from_id));
self.cluster.lock().unwrap().register_leader(None);
self.become_follower(term);
self.process_message_as_follower(message);
self.process_message(message);
} else {
self.cluster.lock().unwrap().send_message(
from_id,
Expand Down
9 changes: 5 additions & 4 deletions little_raft/src/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ where
fn apply_transition(&mut self, transition: T);

/// This function is used to receive transitions from the user that need to
/// be applied to the replicated state machine. Note that while all Replicas
/// poll get_pending_transitions periodically, only the Leader Replica
/// actually processes them. All other Replicas discard pending transitions.
/// get_pending_transitions must not return the same transition twice.
/// be applied to the replicated state machine. Note that only the Leader
/// Replica processes transitions and only when notified via the
/// recv_transition channel. All other Replicas poll for transitions and
/// discard them. get_pending_transitions must not return the same
/// transition twice.
fn get_pending_transitions(&mut self) -> Vec<T>;
}
Loading

0 comments on commit ffee5ec

Please sign in to comment.