Skip to content

Commit

Permalink
Change: Snapshot Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
zach-schoenberger committed Oct 19, 2023
1 parent d8e0417 commit 055202a
Show file tree
Hide file tree
Showing 33 changed files with 2,901 additions and 288 deletions.
9 changes: 5 additions & 4 deletions cluster_benchmark/tests/benchmark/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use openraft::storage::RaftLogStorage;
use openraft::storage::RaftSnapshotBuilder;
use openraft::storage::RaftStateMachine;
use openraft::storage::Snapshot;
use openraft::raft::ExampleSnapshot;
use openraft::Entry;
use openraft::EntryPayload;
use openraft::LogId;
Expand All @@ -41,7 +42,7 @@ pub type NodeId = u64;

openraft::declare_raft_types!(
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
Entry = Entry<TypeConfig>, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime
);

#[derive(Debug)]
Expand Down Expand Up @@ -165,7 +166,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<StateMachineStore> {

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
})
}
}
Expand Down Expand Up @@ -279,7 +280,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
Ok(Box::new(Cursor::new(Vec::new()).into()))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
Expand Down Expand Up @@ -314,7 +315,7 @@ impl RaftStateMachine<TypeConfig> for Arc<StateMachineStore> {
let data = snapshot.data.clone();
Ok(Some(Snapshot {
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
}))
}
None => Ok(None),
Expand Down
4 changes: 2 additions & 2 deletions examples/raft-kv-memstore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_qualifications)]

use std::io::Cursor;
use std::sync::Arc;

use actix_web::middleware;
use actix_web::middleware::Logger;
use actix_web::web::Data;
use actix_web::HttpServer;
use openraft::raft::ExampleSnapshot;
use openraft::storage::Adaptor;
use openraft::BasicNode;
use openraft::Config;
Expand All @@ -32,7 +32,7 @@ pub type NodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
Entry = openraft::Entry<TypeConfig>, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-memstore/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<Store> {

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
})
}
}
Expand Down Expand Up @@ -280,7 +280,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
Ok(Box::new(Cursor::new(Vec::new()).into()))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
Expand Down Expand Up @@ -320,7 +320,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
let data = snapshot.data.clone();
Ok(Some(Snapshot {
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
}))
}
None => Ok(None),
Expand Down
3 changes: 2 additions & 1 deletion examples/raft-kv-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::sync::Arc;

use async_std::net::TcpListener;
use async_std::task;
use openraft::raft::ExampleSnapshot;
use openraft::storage::Adaptor;
use openraft::Config;
use openraft::TokioRuntime;
Expand Down Expand Up @@ -42,7 +43,7 @@ impl Display for Node {
openraft::declare_raft_types!(
/// Declare the type configuration for example K/V store.
pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
Entry = openraft::Entry<TypeConfig>, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Arc<Store>>;
Expand Down
6 changes: 3 additions & 3 deletions examples/raft-kv-rocksdb/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<Store> {

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
})
}
}
Expand Down Expand Up @@ -511,7 +511,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<NodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
Ok(Box::new(Cursor::new(Vec::new()).into()))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
Expand Down Expand Up @@ -549,7 +549,7 @@ impl RaftStorage<TypeConfig> for Arc<Store> {
let data = snapshot.data.clone();
Ok(Some(Snapshot {
meta: snapshot.meta,
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
}))
}
None => Ok(None),
Expand Down
9 changes: 5 additions & 4 deletions memstore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;
use std::sync::Mutex;

use openraft::async_trait::async_trait;
use openraft::raft::ExampleSnapshot;
use openraft::storage::LogState;
use openraft::storage::RaftLogReader;
use openraft::storage::RaftSnapshotBuilder;
Expand Down Expand Up @@ -75,7 +76,7 @@ pub type MemNodeId = u64;
openraft::declare_raft_types!(
/// Declare the type configuration for `MemStore`.
pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (),
Entry = Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = TokioRuntime
Entry = Entry<TypeConfig>, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime
);

/// The application snapshot type which the `MemStore` works with.
Expand Down Expand Up @@ -279,7 +280,7 @@ impl RaftSnapshotBuilder<TypeConfig> for Arc<MemStore> {

Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
})
}
}
Expand Down Expand Up @@ -438,7 +439,7 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<<TypeConfig as RaftTypeConfig>::SnapshotData>, StorageError<MemNodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
Ok(Box::new(Cursor::new(Vec::new()).into()))
}

#[tracing::instrument(level = "trace", skip(self, snapshot))]
Expand Down Expand Up @@ -485,7 +486,7 @@ impl RaftStorage<TypeConfig> for Arc<MemStore> {
let data = snapshot.data.clone();
Ok(Some(Snapshot {
meta: snapshot.meta.clone(),
snapshot: Box::new(Cursor::new(data)),
snapshot: Box::new(Cursor::new(data).into()),
}))
}
None => Ok(None),
Expand Down
5 changes: 2 additions & 3 deletions openraft/src/compat/compat07.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,11 @@ pub mod testing {

#[cfg(test)]
mod tests {
use std::io::Cursor;

use maplit::btreemap;
use maplit::btreeset;

use crate::compat::Upgrade;
use crate::raft::ExampleSnapshot;
use crate::CommittedLeaderId;
use crate::TokioRuntime;

Expand Down Expand Up @@ -512,7 +511,7 @@ mod tests {
crate::declare_raft_types!(
pub TestingConfig:
D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode,
Entry = crate::Entry<TestingConfig>, SnapshotData = Cursor<Vec<u8>>,
Entry = crate::Entry<TestingConfig>, SnapshotData = ExampleSnapshot,
AsyncRuntime = TokioRuntime
);

Expand Down
6 changes: 5 additions & 1 deletion openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,8 +1321,12 @@ where
let st = self.engine.state.io_state_mut();
st.update_snapshot(last_log_id);
}
sm::Response::ReceiveSnapshotChunk(_) => {
sm::Response::ReceiveSnapshotChunk(meta) => {
tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!());

if let Some(meta) = meta {
self.engine.following_handler().install_snapshot(meta);
}
}
sm::Response::InstallSnapshot(meta) => {
tracing::info!(
Expand Down
69 changes: 42 additions & 27 deletions openraft/src/core/sm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//! It is responsible for applying log entries, building/receiving snapshot and sending responses
//! to the RaftCore.
use tokio::io::AsyncWriteExt;
use tokio::sync::mpsc;
use tokio::sync::oneshot;

Expand All @@ -13,7 +12,9 @@ use crate::core::streaming_state::Streaming;
use crate::core::ApplyResult;
use crate::core::ApplyingEntry;
use crate::entry::RaftPayload;
use crate::raft::InstallSnapshotData;
use crate::raft::InstallSnapshotRequest;
use crate::raft::SnapshotManifest;
use crate::storage::RaftStateMachine;
use crate::summary::MessageSummary;
use crate::AsyncRuntime;
Expand All @@ -23,7 +24,6 @@ use crate::RaftTypeConfig;
use crate::Snapshot;
use crate::SnapshotMeta;
use crate::StorageError;
use crate::StorageIOError;

pub(crate) mod command;
pub(crate) mod response;
Expand Down Expand Up @@ -241,17 +241,23 @@ where
"sending back snapshot: meta: {:?}",
snapshot.as_ref().map(|s| s.meta.summary())
);

let _ = tx.send(snapshot);

Ok(())
}

#[tracing::instrument(level = "info", skip_all)]
async fn receive_snapshot_chunk(&mut self, req: InstallSnapshotRequest<C>) -> Result<(), StorageError<C::NodeId>> {
async fn receive_snapshot_chunk(
&mut self,
req: InstallSnapshotRequest<C>,
) -> Result<Option<SnapshotMeta<C::NodeId, C::Node>>, StorageError<C::NodeId>> {
let snapshot_meta = req.meta.clone();
let done = req.done;
let offset = req.offset;

let req_id = SnapshotRequestId::new(*req.vote.leader_id(), snapshot_meta.snapshot_id.clone(), offset);
let req_id = SnapshotRequestId::new(
*req.vote.leader_id(),
snapshot_meta.snapshot_id.clone(),
req.data.chunk_id(),
);

tracing::info!(
req = display(req.summary()),
Expand All @@ -260,33 +266,42 @@ where
func_name!()
);

let curr_id = self.streaming.as_ref().map(|s| &s.snapshot_id);
let done = match req.data {
InstallSnapshotData::Manifest(manifest) => {
let streaming_manifest = self.state_machine.begin_receiving_snapshot().await?;
self.streaming = Some(Streaming::new(req.meta, manifest, streaming_manifest));

// Changed to another stream. re-init snapshot state.
if curr_id != Some(&req.meta.snapshot_id) {
let snapshot_data = self.state_machine.begin_receiving_snapshot().await?;
self.streaming = Some(Streaming::new(req.meta.snapshot_id.clone(), snapshot_data));
}

let streaming = self.streaming.as_mut().unwrap();
streaming.receive(req).await?;

tracing::info!(snapshot_req_id = debug(&req_id), "received snapshot chunk");
false
}
InstallSnapshotData::Chunk(chunk) => match self.streaming.as_mut() {
Some(streaming) => {
if !streaming.receive(chunk).await? {
// we received a chunk that doesn't exist in the manifest
tracing::warn!(
snapshot_req_id = debug(&req_id),
"{} chunk does not exist in manifest",
func_name!()
);
}

streaming.manifest.is_complete()
}
None => {
tracing::error!("should never happen");
false
}
},
};

if done {
tracing::info!("store completed streaming snapshot: {:?}", snapshot_meta);
let streaming = self.streaming.take().unwrap();
let mut data = streaming.snapshot_data;

data.as_mut()
.shutdown()
.await
.map_err(|e| StorageIOError::write_snapshot(Some(snapshot_meta.signature()), &e))?;
self.received = Some(Snapshot::new(snapshot_meta.clone(), streaming.streaming_data));

tracing::info!("store completed streaming snapshot: {:?}", snapshot_meta);
self.received = Some(Snapshot::new(snapshot_meta, data));
return Ok(Some(snapshot_meta));
}

Ok(())
Ok(None)
}

#[tracing::instrument(level = "info", skip_all)]
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/core/sm/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ where C: RaftTypeConfig

/// When finishing receiving a snapshot chunk.
///
/// It does not return any value to RaftCore.
ReceiveSnapshotChunk(()),
/// Returns if the snapshot is complete
ReceiveSnapshotChunk(Option<SnapshotMeta<C::NodeId, C::Node>>),

/// When finishing installing a snapshot.
///
Expand Down
10 changes: 5 additions & 5 deletions openraft/src/core/snapshot_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@ use crate::SnapshotId;
/// A global unique id of install-snapshot request.
#[derive(Debug, Clone)]
#[derive(PartialEq, Eq)]
pub(crate) struct SnapshotRequestId<NID: NodeId> {
pub(crate) struct SnapshotRequestId<NID: NodeId, ChunkId> {
pub(crate) leader_id: LeaderId<NID>,
pub(crate) snapshot_id: SnapshotId,
pub(crate) offset: u64,
pub(crate) chunk_id: Option<ChunkId>,
}

impl<NID: NodeId> SnapshotRequestId<NID> {
pub(crate) fn new(leader_id: LeaderId<NID>, snapshot_id: SnapshotId, offset: u64) -> Self {
impl<NID: NodeId, ChunkId> SnapshotRequestId<NID, ChunkId> {
pub(crate) fn new(leader_id: LeaderId<NID>, snapshot_id: SnapshotId, chunk_id: Option<ChunkId>) -> Self {
Self {
leader_id,
snapshot_id,
offset,
chunk_id,
}
}
}
Loading

0 comments on commit 055202a

Please sign in to comment.