From ef356416f9fe106910df57c963bab7ce6d06639c Mon Sep 17 00:00:00 2001 From: ilya Date: Wed, 15 Dec 2021 16:53:50 +0000 Subject: [PATCH] mvp of snapshotting --- Cargo.lock | 26 +++++++ little_raft/Cargo.toml | 1 + little_raft/src/message.rs | 18 ++++- little_raft/src/replica.rs | 116 +++++++++++++++++++++-------- little_raft/src/state_machine.rs | 15 +++- little_raft/tests/raft_stable.rs | 18 ++++- little_raft/tests/raft_unstable.rs | 16 +++- 7 files changed, 173 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 034d9d5..a4e4259 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,22 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" +[[package]] +name = "byteorder" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" + +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "cfg-if" version = "1.0.0" @@ -104,6 +120,15 @@ dependencies = [ "wasi", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -120,6 +145,7 @@ checksum = "9385f66bf6105b241aa65a61cb923ef20efc665cb9f9bb50ac2f0c4b7f378d41" name = "little_raft" version = "0.1.6" dependencies = [ + "bytes", "crossbeam", "crossbeam-channel", "rand", diff --git a/little_raft/Cargo.toml b/little_raft/Cargo.toml index 29a050d..07d3346 100644 --- a/little_raft/Cargo.toml +++ b/little_raft/Cargo.toml @@ -19,3 +19,4 @@ crossbeam-channel = "0.5.1" crossbeam = "0.8.0" timer = "0.1.3" time = "0.1.39" +bytes = "0.4.7" diff --git a/little_raft/src/message.rs b/little_raft/src/message.rs index a9f7b17..041d68b 100644 --- a/little_raft/src/message.rs +++ b/little_raft/src/message.rs @@ -1,5 +1,6 @@ use crate::replica::ReplicaID; use crate::state_machine::StateMachineTransition; +use bytes::Bytes; /// LogEntry is a state machine transition along with some metadata needed for /// Raft. @@ -23,7 +24,7 @@ where /// AppendEntryRequest is used by the Leader to send out logs for other /// replicas to append to their log. It also has information on what logs /// are ready to be applied to the state machine. AppendEntryRequest is also - /// used as a heart beat message by the Leader even when no new logs need to + /// used as a heartbeat message by the Leader even when no new logs need to /// be processed. AppendEntryRequest { from_id: ReplicaID, @@ -58,4 +59,19 @@ where term: usize, vote_granted: bool, }, + + InstallSnapshotRequest { + from_id: ReplicaID, + term: usize, + last_included_index: usize, + last_included_term: usize, + offset: usize, + data: Bytes, + done: bool, + }, + + InstallSnapshotResponse { + from_id: ReplicaID, + term: usize, + }, } diff --git a/little_raft/src/replica.rs b/little_raft/src/replica.rs index 34e4f28..ed20631 100644 --- a/little_raft/src/replica.rs +++ b/little_raft/src/replica.rs @@ -1,7 +1,7 @@ use crate::{ cluster::Cluster, message::{LogEntry, Message}, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{Snapshot, StateMachine, StateMachineTransition, TransitionState}, timer::Timer, }; use crossbeam_channel::{Receiver, Select}; @@ -28,10 +28,10 @@ pub type ReplicaID = usize; /// to maintain the consistency of the user-defined StateMachine across the /// cluster. It uses the user-defined Cluster implementation to talk to other /// Replicas, be it over the network or pigeon post. -pub struct Replica +pub struct Replica where T: StateMachineTransition, - S: StateMachine, + M: StateMachine, C: Cluster, { /// ID of this Replica. @@ -41,7 +41,7 @@ where peer_ids: Vec, /// User-defined state machine that the cluster Replicates. - state_machine: Arc>, + state_machine: Arc>, /// Interface a Replica uses to communicate with the rest of the cluster. cluster: Arc>, @@ -90,12 +90,18 @@ where /// If no heartbeat message is received by the deadline, the Replica will /// start an election. next_election_deadline: Instant, + + snapshot_delta: usize, + + snapshot: Option, + + index_offset: usize, } -impl Replica +impl Replica where T: StateMachineTransition, - S: StateMachine, + M: StateMachine, C: Cluster, { /// Create a new Replica. @@ -128,11 +134,19 @@ where id: ReplicaID, peer_ids: Vec, cluster: Arc>, - state_machine: Arc>, + state_machine: Arc>, + snapshot_delta: usize, noop_transition: T, heartbeat_timeout: Duration, election_timeout_range: (Duration, Duration), - ) -> Replica { + ) -> Replica { + let snapshot = state_machine.lock().unwrap().load_snapshot(); + let index_offset = if let Some(ref snapshot) = snapshot { + snapshot.last_included_index + } else { + 0 + }; + Replica { state_machine, cluster, @@ -155,6 +169,9 @@ where election_timeout: election_timeout_range, heartbeat_timer: Timer::new(heartbeat_timeout), next_election_deadline: Instant::now(), + snapshot: snapshot, + snapshot_delta: snapshot_delta, + index_offset: index_offset, } } @@ -226,8 +243,8 @@ where self.broadcast_message(|peer_id: ReplicaID| Message::AppendEntryRequest { term: self.current_term, from_id: self.id, - prev_log_index: self.next_index[&peer_id] - 1, - prev_log_term: self.log[self.next_index[&peer_id] - 1].term, + prev_log_index: self.next_index[&peer_id] - 1 + self.index_offset, + prev_log_term: self.log[self.next_index[&peer_id] - 1 - self.index_offset].term, entries: self.get_entries_for_peer(peer_id), commit_index: self.commit_index, }); @@ -316,13 +333,14 @@ where // Get log entries that have not been acknowledged by the peer. fn get_entries_for_peer(&self, peer_id: ReplicaID) -> Vec> { - self.log[self.next_index[&peer_id]..self.log.len()].to_vec() + self.log[self.next_index[&peer_id] - self.index_offset..self.log.len()].to_vec() } // Apply entries that are ready to be applied. fn apply_ready_entries(&mut self) { // Move the commit index to the latest log index that has been // replicated on the majority of the replicas. + let mut state_machine = self.state_machine.lock().unwrap(); if self.state == State::Leader && self.commit_index < self.log.len() - 1 { let mut n = self.log.len() - 1; let old_commit_index = self.commit_index; @@ -334,7 +352,7 @@ where ); if num_replications * 2 >= self.peer_ids.len() - && self.log[n].term == self.current_term + && self.log[n - self.index_offset].term == self.current_term { self.commit_index = n; } @@ -342,9 +360,8 @@ where } for i in old_commit_index + 1..=self.commit_index { - let mut state_machine = self.state_machine.lock().unwrap(); state_machine.register_transition_state( - self.log[i].transition.get_id(), + self.log[i - self.index_offset].transition.get_id(), TransitionState::Committed, ); } @@ -353,19 +370,47 @@ where // Apply entries that are behind the currently committed index. while self.commit_index > self.last_applied { self.last_applied += 1; - let mut state_machine = self.state_machine.lock().unwrap(); - state_machine.apply_transition(self.log[self.last_applied].transition.clone()); + state_machine.apply_transition( + self.log[self.last_applied - self.index_offset] + .transition + .clone(), + ); state_machine.register_transition_state( - self.log[self.last_applied].transition.get_id(), + self.log[self.last_applied - self.index_offset] + .transition + .get_id(), TransitionState::Applied, ); - } + + if self.commit_index - self.last_applied == 1 && self.snapshot_delta > 0 { + let curr_delta = if let Some(snapshot) = &self.snapshot { + self.last_applied - snapshot.last_included_index + } else { + self.last_applied + }; + + if curr_delta >= self.snapshot_delta { + println!("snapshotting!!!"); + let last_applied = self.last_applied; + self.snapshot = Some(state_machine.create_snapshot( + last_applied, + self.log[last_applied - self.index_offset].term, + )); + println!("{:?}", self.log); + self.log.retain(|l| l.index > last_applied); + self.index_offset = last_applied+1; + println!("{}", self.index_offset); + println!("{:?}", self.log); + } + } + } } fn load_new_transitions(&mut self) { // Load new transitions. Ignore the transitions if the replica is not // the Leader. - let transitions = self.state_machine.lock().unwrap().get_pending_transitions(); + let mut state_machine = self.state_machine.lock().unwrap(); + let transitions = state_machine.get_pending_transitions(); for transition in transitions { if self.state == State::Leader { self.log.push(LogEntry { @@ -374,7 +419,6 @@ where term: self.current_term, }); - let mut state_machine = self.state_machine.lock().unwrap(); state_machine .register_transition_state(transition.get_id(), TransitionState::Queued); } @@ -453,12 +497,13 @@ where } if self.voted_for == None || self.voted_for == Some(from_id) { - if self.log[self.log.len() - 1].index <= last_log_index - && self.log[self.log.len() - 1].term <= last_log_term + if self.log[self.log.len() - 1 - self.index_offset].index <= last_log_index + && self.log[self.log.len() - 1 - self.index_offset].term <= last_log_term { // If the criteria are met, grant the vote. - self.cluster.lock().unwrap().register_leader(None); - self.cluster.lock().unwrap().send_message( + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(None); + cluster.send_message( from_id, Message::VoteResponse { from_id: self.id, @@ -515,7 +560,8 @@ where return; // If our log doesn't contain an entry at prev_log_index with the // prev_log_term term, reply false. - } else if prev_log_index >= self.log.len() || self.log[prev_log_index].term != prev_log_term + } else if prev_log_index >= self.log.len() + || self.log[prev_log_index - self.index_offset].term != prev_log_term { self.cluster.lock().unwrap().send_message( from_id, @@ -523,7 +569,7 @@ where from_id: self.id, term: self.current_term, success: false, - last_index: self.log.len() - 1, + last_index: self.log.len() + self.index_offset - 1, mismatch_index: Some(prev_log_index), }, ); @@ -532,12 +578,14 @@ where for entry in entries { // Drop local inconsistent logs. - if entry.index < self.log.len() && entry.term != self.log[entry.index].term { - self.log.truncate(entry.index); + if entry.index < self.log.len() + self.index_offset + && entry.term != self.log[entry.index - self.index_offset].term + { + self.log.truncate(entry.index - self.index_offset); } // Push received logs. - if entry.index == self.log.len() { + if entry.index == self.log.len() + self.index_offset { self.log.push(entry); } } @@ -551,8 +599,10 @@ where self.log[self.log.len() - 1].index } } - self.cluster.lock().unwrap().register_leader(Some(from_id)); - self.cluster.lock().unwrap().send_message( + + let mut cluster = self.cluster.lock().unwrap(); + cluster.register_leader(Some(from_id)); + cluster.send_message( from_id, Message::AppendEntryResponse { from_id: self.id, @@ -591,6 +641,7 @@ where ), Message::AppendEntryResponse { .. } => { /* ignore */ } Message::VoteResponse { .. } => { /* ignore */ } + _ => {} } } @@ -608,6 +659,7 @@ where vote_granted, } => self.process_vote_response_as_candidate(from_id, term, vote_granted), Message::AppendEntryResponse { .. } => { /* ignore */ } + _ => {} } } @@ -726,7 +778,7 @@ where self.broadcast_message(|_: usize| Message::VoteRequest { from_id: self.id, term: self.current_term, - last_log_index: self.log.len() - 1, + last_log_index: self.log.len() + self.index_offset - 1, last_log_term: self.log[self.log.len() - 1].term, }); diff --git a/little_raft/src/state_machine.rs b/little_raft/src/state_machine.rs index fe445ac..336f675 100644 --- a/little_raft/src/state_machine.rs +++ b/little_raft/src/state_machine.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use std::fmt::Debug; /// TransitionState describes the state of a particular transition. @@ -29,9 +30,15 @@ pub trait StateMachineTransition: Clone + Debug { fn get_id(&self) -> Self::TransitionID; } +pub struct Snapshot { + pub last_included_index: usize, + pub last_included_term: usize, + pub data: Bytes, +} + /// StateMachine describes a user-defined state machine that is replicated -/// across the cluster. Raft can Replica whatever distributed state machine can -/// implement this trait. +/// across the cluster. Raft can replicate whatever distributed state machine +/// can implement this trait. pub trait StateMachine where T: StateMachineTransition, @@ -53,4 +60,8 @@ where /// discard them. get_pending_transitions must not return the same /// transition twice. fn get_pending_transitions(&mut self) -> Vec; + + fn load_snapshot(&mut self) -> Option; + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot; } diff --git a/little_raft/tests/raft_stable.rs b/little_raft/tests/raft_stable.rs index 389c0f8..bb4fdd6 100644 --- a/little_raft/tests/raft_stable.rs +++ b/little_raft/tests/raft_stable.rs @@ -1,10 +1,11 @@ use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; +use bytes::Bytes; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{StateMachine, StateMachineTransition, TransitionState, Snapshot}, }; use std::sync::{Arc, Mutex}; @@ -62,6 +63,18 @@ impl StateMachine for Calculator { self.pending_transitions = Vec::new(); cur } + + fn load_snapshot(&mut self) -> Option { + None + } + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + Snapshot { + last_included_index: index, + last_included_term: term, + data: Bytes::new(), + } + } } // Our test replicas will be running each in its own thread. @@ -284,6 +297,7 @@ fn run_replicas() { let (applied_transitions_tx, applied_transitions_rx) = unbounded(); let state_machines = create_state_machines(n, applied_transitions_tx); let (message_tx, transition_tx, message_rx, transition_rx) = create_notifiers(n); + for i in 0..n { let noop = noop.clone(); let local_peer_ids = peer_ids[i].clone(); @@ -291,12 +305,14 @@ fn run_replicas() { let state_machine = state_machines[i].clone(); let m_rx = message_rx[i].clone(); let t_rx = transition_rx[i].clone(); + thread::spawn(move || { let mut replica = Replica::new( i, local_peer_ids, cluster, state_machine, + 1, noop.clone(), HEARTBEAT_TIMEOUT, (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT), diff --git a/little_raft/tests/raft_unstable.rs b/little_raft/tests/raft_unstable.rs index 09591f4..c791dcd 100644 --- a/little_raft/tests/raft_unstable.rs +++ b/little_raft/tests/raft_unstable.rs @@ -1,10 +1,11 @@ use crossbeam_channel as channel; use crossbeam_channel::{unbounded, Receiver, Sender}; +use bytes::Bytes; use little_raft::{ cluster::Cluster, message::Message, replica::Replica, - state_machine::{StateMachine, StateMachineTransition, TransitionState}, + state_machine::{StateMachine, StateMachineTransition, TransitionState, Snapshot}, }; use std::sync::{Arc, Mutex}; @@ -62,6 +63,18 @@ impl StateMachine for Calculator { self.pending_transitions = Vec::new(); cur } + + fn load_snapshot(&mut self) -> Option { + None + } + + fn create_snapshot(&mut self, index: usize, term: usize) -> Snapshot { + Snapshot { + last_included_index: index, + last_included_term: term, + data: Bytes::new(), + } + } } // Our test replicas will be running each in its own thread. @@ -297,6 +310,7 @@ fn run_replicas() { local_peer_ids, cluster, state_machine, + 1, noop.clone(), HEARTBEAT_TIMEOUT, (MIN_ELECTION_TIMEOUT, MAX_ELECTION_TIMEOUT),