diff --git a/cluster_benchmark/tests/benchmark/store.rs b/cluster_benchmark/tests/benchmark/store.rs index 88e5fc95c..48870a447 100644 --- a/cluster_benchmark/tests/benchmark/store.rs +++ b/cluster_benchmark/tests/benchmark/store.rs @@ -16,6 +16,7 @@ use openraft::storage::RaftLogStorage; use openraft::storage::RaftSnapshotBuilder; use openraft::storage::RaftStateMachine; use openraft::storage::Snapshot; +use openraft::raft::ExampleSnapshot; use openraft::Entry; use openraft::EntryPayload; use openraft::LogId; @@ -41,7 +42,7 @@ pub type NodeId = u64; openraft::declare_raft_types!( pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = NodeId, Node = (), - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); #[derive(Debug)] @@ -165,7 +166,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -279,7 +280,7 @@ impl RaftStateMachine for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -314,7 +315,7 @@ impl RaftStateMachine for Arc { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta.clone(), - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } None => Ok(None), diff --git a/examples/raft-kv-memstore/src/lib.rs b/examples/raft-kv-memstore/src/lib.rs index 328a52ad6..296461014 100644 --- a/examples/raft-kv-memstore/src/lib.rs +++ b/examples/raft-kv-memstore/src/lib.rs @@ -1,13 +1,13 @@ #![allow(clippy::uninlined_format_args)] #![deny(unused_qualifications)] -use std::io::Cursor; use std::sync::Arc; use actix_web::middleware; use actix_web::middleware::Logger; use actix_web::web::Data; use actix_web::HttpServer; +use openraft::raft::ExampleSnapshot; use openraft::storage::Adaptor; use openraft::BasicNode; use openraft::Config; @@ -32,7 +32,7 @@ pub type NodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = BasicNode, - Entry = openraft::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = openraft::Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); pub type LogStore = Adaptor>; diff --git a/examples/raft-kv-memstore/src/store/mod.rs b/examples/raft-kv-memstore/src/store/mod.rs index c30aedd32..fb5fdf69d 100644 --- a/examples/raft-kv-memstore/src/store/mod.rs +++ b/examples/raft-kv-memstore/src/store/mod.rs @@ -153,7 +153,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -280,7 +280,7 @@ impl RaftStorage for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -320,7 +320,7 @@ impl RaftStorage for Arc { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta.clone(), - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } None => Ok(None), diff --git a/examples/raft-kv-rocksdb/src/lib.rs b/examples/raft-kv-rocksdb/src/lib.rs index afbc82672..0cd17afb6 100644 --- a/examples/raft-kv-rocksdb/src/lib.rs +++ b/examples/raft-kv-rocksdb/src/lib.rs @@ -8,6 +8,7 @@ use std::sync::Arc; use async_std::net::TcpListener; use async_std::task; +use openraft::raft::ExampleSnapshot; use openraft::storage::Adaptor; use openraft::Config; use openraft::TokioRuntime; @@ -42,7 +43,7 @@ impl Display for Node { openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: D = Request, R = Response, NodeId = NodeId, Node = Node, - Entry = openraft::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = openraft::Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); pub type LogStore = Adaptor>; diff --git a/examples/raft-kv-rocksdb/src/store.rs b/examples/raft-kv-rocksdb/src/store.rs index 970ae7441..9d47363fb 100644 --- a/examples/raft-kv-rocksdb/src/store.rs +++ b/examples/raft-kv-rocksdb/src/store.rs @@ -387,7 +387,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -511,7 +511,7 @@ impl RaftStorage for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -549,7 +549,7 @@ impl RaftStorage for Arc { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } None => Ok(None), diff --git a/memstore/src/lib.rs b/memstore/src/lib.rs index cc6f10740..3323e19ce 100644 --- a/memstore/src/lib.rs +++ b/memstore/src/lib.rs @@ -12,6 +12,7 @@ use std::sync::Arc; use std::sync::Mutex; use openraft::async_trait::async_trait; +use openraft::raft::ExampleSnapshot; use openraft::storage::LogState; use openraft::storage::RaftLogReader; use openraft::storage::RaftSnapshotBuilder; @@ -75,7 +76,7 @@ pub type MemNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: D = ClientRequest, R = ClientResponse, NodeId = MemNodeId, Node = (), - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); /// The application snapshot type which the `MemStore` works with. @@ -279,7 +280,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -438,7 +439,7 @@ impl RaftStorage for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -485,7 +486,7 @@ impl RaftStorage for Arc { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta.clone(), - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } None => Ok(None), diff --git a/openraft/src/compat/compat07.rs b/openraft/src/compat/compat07.rs index 922e147c3..1a3f0ab38 100644 --- a/openraft/src/compat/compat07.rs +++ b/openraft/src/compat/compat07.rs @@ -414,12 +414,11 @@ pub mod testing { #[cfg(test)] mod tests { - use std::io::Cursor; - use maplit::btreemap; use maplit::btreeset; use crate::compat::Upgrade; + use crate::raft::ExampleSnapshot; use crate::CommittedLeaderId; use crate::TokioRuntime; @@ -512,7 +511,7 @@ mod tests { crate::declare_raft_types!( pub TestingConfig: D = u64, R = u64, NodeId = u64, Node = crate::EmptyNode, - Entry = crate::Entry, SnapshotData = Cursor>, + Entry = crate::Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index 972a51872..07daf0b05 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -1321,8 +1321,12 @@ where let st = self.engine.state.io_state_mut(); st.update_snapshot(last_log_id); } - sm::Response::ReceiveSnapshotChunk(_) => { + sm::Response::ReceiveSnapshotChunk(meta) => { tracing::info!("sm::StateMachine command done: ReceiveSnapshotChunk: {}", func_name!()); + + if let Some(meta) = meta { + self.engine.following_handler().install_snapshot(meta); + } } sm::Response::InstallSnapshot(meta) => { tracing::info!( diff --git a/openraft/src/core/sm/mod.rs b/openraft/src/core/sm/mod.rs index 4a50e1daf..3e5a6e7d6 100644 --- a/openraft/src/core/sm/mod.rs +++ b/openraft/src/core/sm/mod.rs @@ -4,7 +4,6 @@ //! It is responsible for applying log entries, building/receiving snapshot and sending responses //! to the RaftCore. -use tokio::io::AsyncWriteExt; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -13,7 +12,9 @@ use crate::core::streaming_state::Streaming; use crate::core::ApplyResult; use crate::core::ApplyingEntry; use crate::entry::RaftPayload; +use crate::raft::InstallSnapshotData; use crate::raft::InstallSnapshotRequest; +use crate::raft::SnapshotManifest; use crate::storage::RaftStateMachine; use crate::summary::MessageSummary; use crate::AsyncRuntime; @@ -23,7 +24,6 @@ use crate::RaftTypeConfig; use crate::Snapshot; use crate::SnapshotMeta; use crate::StorageError; -use crate::StorageIOError; pub(crate) mod command; pub(crate) mod response; @@ -241,17 +241,23 @@ where "sending back snapshot: meta: {:?}", snapshot.as_ref().map(|s| s.meta.summary()) ); + let _ = tx.send(snapshot); + Ok(()) } #[tracing::instrument(level = "info", skip_all)] - async fn receive_snapshot_chunk(&mut self, req: InstallSnapshotRequest) -> Result<(), StorageError> { + async fn receive_snapshot_chunk( + &mut self, + req: InstallSnapshotRequest, + ) -> Result>, StorageError> { let snapshot_meta = req.meta.clone(); - let done = req.done; - let offset = req.offset; - - let req_id = SnapshotRequestId::new(*req.vote.leader_id(), snapshot_meta.snapshot_id.clone(), offset); + let req_id = SnapshotRequestId::new( + *req.vote.leader_id(), + snapshot_meta.snapshot_id.clone(), + req.data.chunk_id(), + ); tracing::info!( req = display(req.summary()), @@ -260,33 +266,42 @@ where func_name!() ); - let curr_id = self.streaming.as_ref().map(|s| &s.snapshot_id); + let done = match req.data { + InstallSnapshotData::Manifest(manifest) => { + let streaming_manifest = self.state_machine.begin_receiving_snapshot().await?; + self.streaming = Some(Streaming::new(req.meta, manifest, streaming_manifest)); - // Changed to another stream. re-init snapshot state. - if curr_id != Some(&req.meta.snapshot_id) { - let snapshot_data = self.state_machine.begin_receiving_snapshot().await?; - self.streaming = Some(Streaming::new(req.meta.snapshot_id.clone(), snapshot_data)); - } - - let streaming = self.streaming.as_mut().unwrap(); - streaming.receive(req).await?; - - tracing::info!(snapshot_req_id = debug(&req_id), "received snapshot chunk"); + false + } + InstallSnapshotData::Chunk(chunk) => match self.streaming.as_mut() { + Some(streaming) => { + if !streaming.receive(chunk).await? { + // we received a chunk that doesn't exist in the manifest + tracing::warn!( + snapshot_req_id = debug(&req_id), + "{} chunk does not exist in manifest", + func_name!() + ); + } + + streaming.manifest.is_complete() + } + None => { + tracing::error!("should never happen"); + false + } + }, + }; if done { + tracing::info!("store completed streaming snapshot: {:?}", snapshot_meta); let streaming = self.streaming.take().unwrap(); - let mut data = streaming.snapshot_data; - - data.as_mut() - .shutdown() - .await - .map_err(|e| StorageIOError::write_snapshot(Some(snapshot_meta.signature()), &e))?; + self.received = Some(Snapshot::new(snapshot_meta.clone(), streaming.streaming_data)); - tracing::info!("store completed streaming snapshot: {:?}", snapshot_meta); - self.received = Some(Snapshot::new(snapshot_meta, data)); + return Ok(Some(snapshot_meta)); } - Ok(()) + Ok(None) } #[tracing::instrument(level = "info", skip_all)] diff --git a/openraft/src/core/sm/response.rs b/openraft/src/core/sm/response.rs index f4b7383f7..5dc5b5808 100644 --- a/openraft/src/core/sm/response.rs +++ b/openraft/src/core/sm/response.rs @@ -14,8 +14,8 @@ where C: RaftTypeConfig /// When finishing receiving a snapshot chunk. /// - /// It does not return any value to RaftCore. - ReceiveSnapshotChunk(()), + /// Returns if the snapshot is complete + ReceiveSnapshotChunk(Option>), /// When finishing installing a snapshot. /// diff --git a/openraft/src/core/snapshot_state.rs b/openraft/src/core/snapshot_state.rs index 10c437ca5..fd6fc7585 100644 --- a/openraft/src/core/snapshot_state.rs +++ b/openraft/src/core/snapshot_state.rs @@ -5,18 +5,18 @@ use crate::SnapshotId; /// A global unique id of install-snapshot request. #[derive(Debug, Clone)] #[derive(PartialEq, Eq)] -pub(crate) struct SnapshotRequestId { +pub(crate) struct SnapshotRequestId { pub(crate) leader_id: LeaderId, pub(crate) snapshot_id: SnapshotId, - pub(crate) offset: u64, + pub(crate) chunk_id: Option, } -impl SnapshotRequestId { - pub(crate) fn new(leader_id: LeaderId, snapshot_id: SnapshotId, offset: u64) -> Self { +impl SnapshotRequestId { + pub(crate) fn new(leader_id: LeaderId, snapshot_id: SnapshotId, chunk_id: Option) -> Self { Self { leader_id, snapshot_id, - offset, + chunk_id, } } } diff --git a/openraft/src/core/streaming_state.rs b/openraft/src/core/streaming_state.rs index 53528e086..3d898bc85 100644 --- a/openraft/src/core/streaming_state.rs +++ b/openraft/src/core/streaming_state.rs @@ -1,66 +1,56 @@ -use std::io::SeekFrom; - -use tokio::io::AsyncSeekExt; -use tokio::io::AsyncWriteExt; - -use crate::raft::InstallSnapshotRequest; +use crate::raft::SnapshotChunk; +use crate::raft::SnapshotData; +use crate::raft::SnapshotManifest; +use crate::type_config::RTCSnapshotChunk; +use crate::type_config::RTCSnapshotData; +use crate::type_config::RTCSnapshotManifest; use crate::ErrorSubject; use crate::ErrorVerb; use crate::RaftTypeConfig; -use crate::SnapshotId; +use crate::SnapshotMeta; use crate::StorageError; +use crate::ToStorageResult; /// The Raft node is streaming in a snapshot from the leader. pub(crate) struct Streaming where C: RaftTypeConfig { - /// The offset of the last byte written to the snapshot. - pub(crate) offset: u64, - /// The ID of the snapshot being written. - pub(crate) snapshot_id: SnapshotId, + pub(crate) snapshot_meta: SnapshotMeta, /// A handle to the snapshot writer. - pub(crate) snapshot_data: Box, + pub(crate) streaming_data: Box>, + + pub(crate) manifest: RTCSnapshotManifest, } impl Streaming where C: RaftTypeConfig { - pub(crate) fn new(snapshot_id: SnapshotId, snapshot_data: Box) -> Self { + pub(crate) fn new( + snapshot_meta: SnapshotMeta, + manifest: RTCSnapshotManifest, + streaming_data: Box>, + ) -> Self { Self { - offset: 0, - snapshot_id, - snapshot_data, + snapshot_meta, + manifest, + streaming_data, } } - /// Receive a chunk of snapshot data. - pub(crate) async fn receive(&mut self, req: InstallSnapshotRequest) -> Result> { - // TODO: check id? + /// Receive a chunk of snapshot data. Returns true if it was a new chunk + pub(crate) async fn receive(&mut self, chunk: RTCSnapshotChunk) -> Result> { + let chunk_id = chunk.id(); + let err_x = || { + ( + ErrorSubject::Snapshot(Some(self.snapshot_meta.signature())), + ErrorVerb::Write, + ) + }; - // Always seek to the target offset if not an exact match. - if req.offset != self.offset { - if let Err(err) = self.snapshot_data.as_mut().seek(SeekFrom::Start(req.offset)).await { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Seek, - err, - )); - } - self.offset = req.offset; - } + self.streaming_data.as_mut().receive(chunk).await.sto_res(err_x)?; - // Write the next segment & update offset. - let res = self.snapshot_data.as_mut().write_all(&req.data).await; - if let Err(err) = res { - return Err(StorageError::from_io_error( - ErrorSubject::Snapshot(Some(req.meta.signature())), - ErrorVerb::Write, - err, - )); - } - self.offset += req.data.len() as u64; - Ok(req.done) + self.manifest.receive(&chunk_id).sto_res(err_x) } } diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index fe0e1c26d..2033816c8 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -483,17 +483,10 @@ where C: RaftTypeConfig pub(crate) fn install_snapshot(&mut self, req: InstallSnapshotRequest) -> Result<(), InstallSnapshotError> { tracing::info!(req = display(req.summary()), "{}", func_name!()); - let done = req.done; - let snapshot_meta = req.meta.clone(); - let mut fh = self.following_handler(); fh.receive_snapshot_chunk(req)?; - if done { - fh.install_snapshot(snapshot_meta); - } - Ok(()) } diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index 52d0318f6..0c5fec724 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use std::sync::Arc; use crate::core::sm; @@ -254,19 +255,17 @@ where C: RaftTypeConfig let snapshot_id = &req.meta.snapshot_id; - let curr_id = self.state.snapshot_streaming.as_ref().map(|s| &s.snapshot_id); + let cur_id = self.state.snapshot_streaming.as_ref().map(|s| s.snapshot_id.deref()); - // Changed to another stream. re-init snapshot state. - if curr_id != Some(&req.meta.snapshot_id) { - if req.offset > 0 { + if cur_id != Some(&req.meta.snapshot_id) { + // request from another snapshot stream. we only keep track of one at a time. + if !req.data.is_manifest() { let mismatch = SnapshotMismatch { expect: SnapshotSegmentId { - id: snapshot_id.clone(), - offset: 0, + id: cur_id.unwrap_or("").to_string(), }, got: SnapshotSegmentId { id: snapshot_id.clone(), - offset: req.offset, }, }; @@ -274,18 +273,15 @@ where C: RaftTypeConfig return Err(mismatch.into()); } - if req.offset == 0 { - self.state.snapshot_streaming = Some(StreamingState { - offset: 0, - snapshot_id: snapshot_id.clone(), - }); - } + // Changed to another stream. re-init snapshot state. + self.state.snapshot_streaming = Some(StreamingState { + snapshot_id: snapshot_id.clone(), + }); } - self.state.snapshot_streaming.as_mut().unwrap().offset = req.offset; - let sm_cmd = sm::Command::receive(req); self.output.push_command(Command::from(sm_cmd)); + Ok(()) } diff --git a/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs b/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs index bf915853c..0379f9d0f 100644 --- a/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs +++ b/openraft/src/engine/handler/following_handler/receive_snapshot_chunk_test.rs @@ -7,6 +7,10 @@ use crate::engine::Command; use crate::engine::Engine; use crate::error::InstallSnapshotError; use crate::error::SnapshotMismatch; +use crate::raft::ExampleChunkId; +use crate::raft::ExampleManifest; +use crate::raft::ExampleSnapshotChunk; +use crate::raft::InstallSnapshotData; use crate::raft::InstallSnapshotRequest; use crate::raft_state::StreamingState; use crate::testing::log_id; @@ -43,30 +47,34 @@ fn make_req(offset: u64) -> InstallSnapshotRequest { InstallSnapshotRequest { vote: Vote::new_committed(2, 1), meta: make_meta(), - offset, - data: vec![], - done: false, + data: InstallSnapshotData::Chunk(ExampleSnapshotChunk { + chunk_id: ExampleChunkId { + offset: offset as usize, + len: 0, + }, + data: vec![], + }), + } +} + +fn make_manifest() -> InstallSnapshotRequest { + InstallSnapshotRequest { + vote: Vote::new_committed(2, 1), + meta: make_meta(), + data: InstallSnapshotData::Manifest(ExampleManifest { chunks: btreeset! {} }), } } #[test] -fn test_receive_snapshot_chunk_new_chunk() -> anyhow::Result<()> { +fn test_receive_snapshot_chunk_new_chunk_no_manifest() -> anyhow::Result<()> { let mut eng = eng(); assert!(eng.state.snapshot_streaming.is_none()); - eng.following_handler().receive_snapshot_chunk(make_req(0))?; + let res = eng.following_handler().receive_snapshot_chunk(make_req(0)); - assert_eq!( - Some(StreamingState { - offset: 0, - snapshot_id: "1-2-3-4".to_string(), - }), - eng.state.snapshot_streaming - ); - assert_eq!( - vec![Command::from(sm::Command::receive(make_req(0)).with_seq(1))], - eng.output.take_commands() - ); + assert!(res.is_err()); + assert_eq!(None, eng.state.snapshot_streaming); + assert_eq!(Vec::>::new(), eng.output.take_commands()); Ok(()) } @@ -76,7 +84,6 @@ fn test_receive_snapshot_chunk_continue_receive_chunk() -> anyhow::Result<()> { let mut eng = eng(); eng.state.snapshot_streaming = Some(StreamingState { - offset: 0, snapshot_id: "1-2-3-4".to_string(), }); @@ -84,7 +91,6 @@ fn test_receive_snapshot_chunk_continue_receive_chunk() -> anyhow::Result<()> { assert_eq!( Some(StreamingState { - offset: 2, snapshot_id: "1-2-3-4".to_string(), }), eng.state.snapshot_streaming @@ -98,26 +104,25 @@ fn test_receive_snapshot_chunk_continue_receive_chunk() -> anyhow::Result<()> { } #[test] -fn test_receive_snapshot_chunk_diff_id_offset_0() -> anyhow::Result<()> { - // When receiving a chunk with different snapshot id and offset 0, starts a new snapshot streaming. +fn test_receive_snapshot_chunk_diff_id_manifest() -> anyhow::Result<()> { + // When receiving a chunk with different snapshot id and is a manifest, starts a new snapshot + // streaming. let mut eng = eng(); eng.state.snapshot_streaming = Some(StreamingState { - offset: 2, snapshot_id: "1-2-3-100".to_string(), }); - eng.following_handler().receive_snapshot_chunk(make_req(0))?; + eng.following_handler().receive_snapshot_chunk(make_manifest())?; assert_eq!( Some(StreamingState { - offset: 0, snapshot_id: "1-2-3-4".to_string(), }), eng.state.snapshot_streaming ); assert_eq!( - vec![Command::from(sm::Command::receive(make_req(0)).with_seq(1))], + vec![Command::from(sm::Command::receive(make_manifest()).with_seq(1))], eng.output.take_commands() ); @@ -131,7 +136,6 @@ fn test_receive_snapshot_chunk_diff_id_offset_gt_0() -> anyhow::Result<()> { let mut eng = eng(); eng.state.snapshot_streaming = Some(StreamingState { - offset: 2, snapshot_id: "1-2-3-100".to_string(), }); @@ -140,12 +144,10 @@ fn test_receive_snapshot_chunk_diff_id_offset_gt_0() -> anyhow::Result<()> { assert_eq!( Err(InstallSnapshotError::from(SnapshotMismatch { expect: SnapshotSegmentId { - id: "1-2-3-4".to_string(), - offset: 0 + id: "1-2-3-100".to_string(), }, got: SnapshotSegmentId { id: "1-2-3-4".to_string(), - offset: 3 }, })), res @@ -153,7 +155,6 @@ fn test_receive_snapshot_chunk_diff_id_offset_gt_0() -> anyhow::Result<()> { assert_eq!( Some(StreamingState { - offset: 2, snapshot_id: "1-2-3-100".to_string(), }), eng.state.snapshot_streaming, diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index 12b01ba87..3588dcb2c 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -1,5 +1,4 @@ -use std::io::Cursor; - +use crate::raft::ExampleSnapshot; use crate::RaftTypeConfig; use crate::TokioRuntime; @@ -13,6 +12,6 @@ impl RaftTypeConfig for UTConfig { type NodeId = u64; type Node = (); type Entry = crate::Entry; - type SnapshotData = Cursor>; + type SnapshotData = ExampleSnapshot; type AsyncRuntime = TokioRuntime; } diff --git a/openraft/src/raft/message/client_write.rs b/openraft/src/raft/message/client_write.rs index b063c8c89..bb615ac2f 100644 --- a/openraft/src/raft/message/client_write.rs +++ b/openraft/src/raft/message/client_write.rs @@ -1,6 +1,6 @@ use std::fmt::Debug; -use crate::AppDataResponse; +#[cfg(feature = "serde")] use crate::AppDataResponse; use crate::LogId; use crate::Membership; use crate::MessageSummary; diff --git a/openraft/src/raft/message/install_snapshot.rs b/openraft/src/raft/message/install_snapshot.rs index 3372e3f81..ae6fcbb69 100644 --- a/openraft/src/raft/message/install_snapshot.rs +++ b/openraft/src/raft/message/install_snapshot.rs @@ -1,12 +1,26 @@ +use std::collections::BTreeSet; +use std::fmt; +use std::fmt::Debug; +use std::fmt::Display; +use std::io::Cursor; + +use anyerror::AnyError; +use async_trait::async_trait; +use derive_more::Display; + +use crate::type_config::RTCSnapshotChunk; +use crate::type_config::RTCSnapshotChunkId; +use crate::type_config::RTCSnapshotManifest; use crate::MessageSummary; use crate::NodeId; +use crate::OptionalSerde; use crate::RaftTypeConfig; use crate::SnapshotMeta; use crate::Vote; /// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). -#[derive(Clone, Debug)] -#[derive(PartialEq, Eq)] +#[derive(Clone)] +#[derive(PartialEq)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] pub struct InstallSnapshotRequest { pub vote: Vote, @@ -14,25 +28,89 @@ pub struct InstallSnapshotRequest { /// Metadata of a snapshot: snapshot_id, last_log_ed membership etc. pub meta: SnapshotMeta, - /// The byte offset where this chunk of data is positioned in the snapshot file. - pub offset: u64, /// The raw bytes of the snapshot chunk, starting at `offset`. - pub data: Vec, + pub data: InstallSnapshotData, +} + +/// An RPC sent by the Raft leader to send chunks of a snapshot to a follower (§7). +#[derive(Clone)] +#[derive(PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize), serde(bound = ""))] +pub enum InstallSnapshotData { + Manifest(RTCSnapshotManifest), + Chunk(RTCSnapshotChunk), +} + +impl InstallSnapshotData { + pub fn chunk_id(&self) -> Option> { + match self { + Self::Manifest(_) => None, + Self::Chunk(c) => Some(c.id()), + } + } + + pub fn is_manifest(&self) -> bool { + match self { + InstallSnapshotData::Manifest(_) => true, + InstallSnapshotData::Chunk(_) => false, + } + } +} + +pub trait SnapshotManifest: Clone + Send + Sync + Default + PartialEq + OptionalSerde { + type Iter: Iterator + Send + Sync; + type ChunkId; + + // Get a list of the remaining chunks of the snapshot to send + fn chunks_to_send(&self) -> Self::Iter; + + // Apply a received chunk to the manifest. This removes it from the list of chunks that need to + // be sent/received + // Returns if true if this chunk was not sent/received before. False if it has been seen + // before. + fn receive(&mut self, c: &Self::ChunkId) -> Result; + + // Return if the snapshot has received all chunks + fn is_complete(&self) -> bool; +} + +#[async_trait] +pub trait SnapshotData: Send + Sync { + type ChunkId: Eq + PartialEq + Send + Sync + Display + Debug + OptionalSerde + 'static; + type Chunk: SnapshotChunk; + type Manifest: SnapshotManifest; + + // Generate the manifest for this snapshot. The manifest should be able to keep track of all + // the chunks to send or receive + async fn manifest(&self) -> Self::Manifest; + + // Get the chunk to be sent to the follower + async fn get_chunk(&self, id: &Self::ChunkId) -> Result; + + // Receive the chunk sent to this node and apply it. + async fn receive(&mut self, c: Self::Chunk) -> Result<(), AnyError>; +} + +pub trait SnapshotChunk: Clone + PartialEq + Send + Sync + OptionalSerde { + type ChunkId; + + fn id(&self) -> Self::ChunkId; +} - /// Will be `true` if this is the last chunk in the snapshot. - pub done: bool, +impl Debug for InstallSnapshotRequest +where C::D: Debug +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("InstallSnapshotRequest") + .field("vote", &self.vote) + .field("meta", &self.meta) + .finish() + } } impl MessageSummary> for InstallSnapshotRequest { fn summary(&self) -> String { - format!( - "vote={}, meta={}, offset={}, len={}, done={}", - self.vote, - self.meta, - self.offset, - self.data.len(), - self.done - ) + format!("vote={}, meta={}", self.vote, self.meta,) } } @@ -45,3 +123,123 @@ impl MessageSummary> for InstallSna pub struct InstallSnapshotResponse { pub vote: Vote, } + +#[derive(Clone, Default, PartialEq, Eq, PartialOrd, Ord)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +pub struct ExampleManifest { + pub chunks: BTreeSet, +} + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Display)] +#[display(fmt = "(offset: {}, len: {})", offset, len)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +pub struct ExampleChunkId { + pub offset: usize, + pub len: usize, +} + +#[derive(Clone)] +pub struct ExampleSnapshot { + pub chunk_len: usize, + pub data: Vec, +} + +impl ExampleSnapshot { + pub fn into_inner(self) -> Vec { + self.data + } + + pub fn get_ref(&self) -> &[u8] { + &self.data + } +} + +#[derive(Clone, PartialEq, Eq, Debug, Display)] +#[display(fmt = "{}", chunk_id)] +#[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] +pub struct ExampleSnapshotChunk { + pub chunk_id: ExampleChunkId, + pub data: Vec, +} + +impl SnapshotChunk for ExampleSnapshotChunk { + type ChunkId = ExampleChunkId; + + fn id(&self) -> Self::ChunkId { + self.chunk_id.clone() + } +} + +impl SnapshotManifest for ExampleManifest { + type Iter = std::collections::btree_set::IntoIter; + type ChunkId = ExampleChunkId; + + fn chunks_to_send(&self) -> Self::Iter { + self.chunks.clone().into_iter() + } + + fn receive(&mut self, c: &Self::ChunkId) -> Result { + Ok(self.chunks.remove(c)) + } + + fn is_complete(&self) -> bool { + self.chunks.is_empty() + } +} + +#[async_trait] +impl SnapshotData for ExampleSnapshot { + type Chunk = ExampleSnapshotChunk; + type ChunkId = ExampleChunkId; + type Manifest = ExampleManifest; + + async fn manifest(&self) -> Self::Manifest { + let chunks: BTreeSet<_> = self + .data + .as_slice() + .chunks(self.chunk_len) + .enumerate() + .map(|(i, c)| ExampleChunkId { + offset: i * self.chunk_len, + len: c.len(), + }) + .collect(); + + ExampleManifest { chunks } + } + + async fn get_chunk(&self, id: &Self::ChunkId) -> Result { + Ok(ExampleSnapshotChunk { + chunk_id: id.clone(), + data: self.data[id.offset..(id.offset + id.len)].to_vec(), + }) + } + + async fn receive(&mut self, c: Self::Chunk) -> Result<(), AnyError> { + if self.data.len() < (c.chunk_id.offset + c.chunk_id.len) { + self.data.extend_from_slice(&vec![0; (c.chunk_id.offset + c.chunk_id.len) - self.data.len()]); + } + + let _: Vec<_> = self.data.splice(c.chunk_id.offset..(c.chunk_id.offset + c.chunk_id.len), c.data).collect(); + + Ok(()) + } +} + +impl<'a> From<&'a [u8]> for ExampleSnapshot { + fn from(value: &'a [u8]) -> Self { + Self { + chunk_len: 1024, + data: value.to_vec(), + } + } +} + +impl From>> for ExampleSnapshot { + fn from(value: Cursor>) -> Self { + Self { + chunk_len: 1024, + data: value.into_inner(), + } + } +} diff --git a/openraft/src/raft/message/mod.rs b/openraft/src/raft/message/mod.rs index 786fca895..65f86da69 100644 --- a/openraft/src/raft/message/mod.rs +++ b/openraft/src/raft/message/mod.rs @@ -12,7 +12,15 @@ mod client_write; pub use append_entries::AppendEntriesRequest; pub use append_entries::AppendEntriesResponse; pub use client_write::ClientWriteResponse; +pub use install_snapshot::ExampleChunkId; +pub use install_snapshot::ExampleManifest; +pub use install_snapshot::ExampleSnapshot; +pub use install_snapshot::ExampleSnapshotChunk; +pub use install_snapshot::InstallSnapshotData; pub use install_snapshot::InstallSnapshotRequest; pub use install_snapshot::InstallSnapshotResponse; +pub use install_snapshot::SnapshotChunk; +pub use install_snapshot::SnapshotData; +pub use install_snapshot::SnapshotManifest; pub use vote::VoteRequest; pub use vote::VoteResponse; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 3c3ae1ef8..3481dca2e 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -17,8 +17,16 @@ use maplit::btreemap; pub use message::AppendEntriesRequest; pub use message::AppendEntriesResponse; pub use message::ClientWriteResponse; +pub use message::ExampleChunkId; +pub use message::ExampleManifest; +pub use message::ExampleSnapshot; +pub use message::ExampleSnapshotChunk; +pub use message::InstallSnapshotData; pub use message::InstallSnapshotRequest; pub use message::InstallSnapshotResponse; +pub use message::SnapshotChunk; +pub use message::SnapshotData; +pub use message::SnapshotManifest; pub use message::VoteRequest; pub use message::VoteResponse; use tokio::sync::mpsc; @@ -80,7 +88,7 @@ use crate::StorageHelper; /// NodeId = u64, /// Node = openraft::BasicNode, /// Entry = openraft::Entry, -/// SnapshotData = Cursor>, +/// SnapshotData = openraft::ExampleSnapshot, /// AsyncRuntime = openraft::TokioRuntime, /// ); /// ``` diff --git a/openraft/src/raft_state/snapshot_streaming.rs b/openraft/src/raft_state/snapshot_streaming.rs index 0c697e78d..cb4696c5f 100644 --- a/openraft/src/raft_state/snapshot_streaming.rs +++ b/openraft/src/raft_state/snapshot_streaming.rs @@ -4,9 +4,6 @@ use crate::SnapshotId; #[derive(Debug, Clone)] #[derive(PartialEq, Eq)] pub(crate) struct StreamingState { - /// The offset of the last byte written to the snapshot. - pub(crate) offset: u64, - /// The ID of the snapshot being written. pub(crate) snapshot_id: SnapshotId, } diff --git a/openraft/src/raft_types.rs b/openraft/src/raft_types.rs index 5a963b399..d42a73d5e 100644 --- a/openraft/src/raft_types.rs +++ b/openraft/src/raft_types.rs @@ -1,3 +1,4 @@ +use std::fmt::Debug; use std::fmt::Display; use std::fmt::Formatter; @@ -13,22 +14,10 @@ pub type SnapshotId = String; pub struct SnapshotSegmentId { /// The unique identifier of the snapshot stream. pub id: SnapshotId, - - /// The offset of this segment in the entire snapshot data. - pub offset: u64, -} - -impl From<(D, u64)> for SnapshotSegmentId { - fn from(v: (D, u64)) -> Self { - SnapshotSegmentId { - id: v.0.to_string(), - offset: v.1, - } - } } impl Display for SnapshotSegmentId { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "{}+{}", self.id, self.offset) + write!(f, "{}", self.id) } } diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 478e9cd5e..bdf48619b 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -4,7 +4,6 @@ mod replication_session_id; mod response; use std::fmt; -use std::io::SeekFrom; use std::sync::Arc; use std::time::Duration; @@ -12,8 +11,6 @@ use anyerror::AnyError; use futures::future::FutureExt; pub(crate) use replication_session_id::ReplicationSessionId; pub(crate) use response::Response; -use tokio::io::AsyncReadExt; -use tokio::io::AsyncSeekExt; use tokio::select; use tokio::sync::mpsc; use tokio::sync::oneshot; @@ -38,9 +35,13 @@ use crate::network::RaftNetworkFactory; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::InstallSnapshotRequest; +use crate::raft::InstallSnapshotResponse; +use crate::raft::SnapshotData; +use crate::raft::SnapshotManifest; use crate::storage::RaftLogReader; use crate::storage::RaftLogStorage; use crate::storage::Snapshot; +use crate::type_config::RTCSnapshotManifest; use crate::utime::UTime; use crate::AsyncRuntime; use crate::ErrorSubject; @@ -632,7 +633,7 @@ where snapshot.as_ref().map(|x| &x.meta).summary() ); - let mut snapshot = match snapshot { + let snapshot: Snapshot = match snapshot { None => { let io_err = StorageIOError::read_snapshot(None, AnyError::error("snapshot not found")); let sto_err = StorageError::IO { source: io_err }; @@ -643,52 +644,49 @@ where let err_x = || (ErrorSubject::Snapshot(Some(snapshot.meta.signature())), ErrorVerb::Read); - let mut offset = 0; - let end = snapshot.snapshot.seek(SeekFrom::End(0)).await.sto_res(err_x)?; - let mut buf = Vec::with_capacity(self.config.snapshot_max_chunk_size as usize); - - loop { - // Build the RPC. - snapshot.snapshot.seek(SeekFrom::Start(offset)).await.sto_res(err_x)?; - let n_read = snapshot.snapshot.read_buf(&mut buf).await.sto_res(err_x)?; - - let leader_time = ::Instant::now(); + let mut manifest: RTCSnapshotManifest = snapshot.snapshot.manifest().await; + let leader_time = ::Instant::now(); + let snap_timeout = self.config.send_snapshot_timeout(); - let done = (offset + n_read as u64) == end; + // currently not doing anything with response for manifest + let _resp = loop { let req = InstallSnapshotRequest { vote: self.session_id.vote, meta: snapshot.meta.clone(), - offset, - data: Vec::from(&buf[..n_read]), - done, + data: crate::raft::InstallSnapshotData::Manifest(manifest.clone()), }; - buf.clear(); - // Send the RPC over to the target. - tracing::debug!( - snapshot_size = req.data.len(), - req.offset, - end, - req.done, - "sending snapshot chunk" - ); + match self.send_snapshot(req, snap_timeout).await? { + // send failed + None => { + // If sender is closed, return at once + self.try_drain_events().await?; - let snap_timeout = if done { - self.config.install_snapshot_timeout() - } else { - self.config.send_snapshot_timeout() - }; + // Sleep a short time otherwise in test environment it is a dead-loop that + // never yields. Because network implementation does + // not yield. + C::AsyncRuntime::sleep(Duration::from_millis(10)).await; + } + Some(resp) => break resp, + } + }; - let option = RPCOption::new(snap_timeout); + for chunk_id in manifest.chunks_to_send() { + let res = loop { + let chunk = snapshot.snapshot.get_chunk(&chunk_id).await.sto_res(err_x)?; - let res = C::AsyncRuntime::timeout(snap_timeout, self.network.install_snapshot(req, option)).await; + // Send the RPC over to the target. + tracing::debug!(?chunk_id, "sending snapshot chunk"); - let res = match res { - Ok(outer_res) => match outer_res { - Ok(res) => res, - Err(err) => { - tracing::warn!(error=%err, "error sending InstallSnapshot RPC to target"); + let req = InstallSnapshotRequest { + vote: self.session_id.vote, + meta: snapshot.meta.clone(), + data: crate::raft::InstallSnapshotData::Chunk(chunk), + }; + match self.send_snapshot(req, snap_timeout).await? { + // send failed + None => { // If sender is closed, return at once self.try_drain_events().await?; @@ -696,23 +694,13 @@ where // never yields. Because network implementation does // not yield. C::AsyncRuntime::sleep(Duration::from_millis(10)).await; - continue; } - }, - Err(err) => { - // TODO(2): add backoff when Unreachable is returned - tracing::warn!(error=%err, "timeout while sending InstallSnapshot RPC to target"); - - // If sender is closed, return at once - self.try_drain_events().await?; - - // Sleep a short time otherwise in test environment it is a dead-loop that never - // yields. Because network implementation does not yield. - C::AsyncRuntime::sleep(Duration::from_millis(10)).await; - continue; + Some(resp) => break resp, } }; + manifest.receive(&chunk_id).sto_res(err_x)?; + // Handle response conditions. if res.vote > self.session_id.vote { return Err(ReplicationError::HigherVote(HigherVote { @@ -721,26 +709,48 @@ where })); } - // If we just sent the final chunk of the snapshot, then transition to lagging state. - if done { - tracing::debug!( - "done install snapshot: snapshot last_log_id: {:?}, matching: {}", - snapshot.meta.last_log_id, - self.matching.summary(), - ); + self.try_drain_events().await?; + } + + tracing::debug!( + "done install snapshot: snapshot last_log_id: {:?}, matching: {}", + snapshot.meta.last_log_id, + self.matching.summary(), + ); - // TODO: update leader lease for every successfully sent chunk. - self.update_matching(request_id, leader_time, snapshot.meta.last_log_id); + self.update_matching(request_id, leader_time, snapshot.meta.last_log_id); + Ok(None) + } + + /// Send the RPC over to the target. Will + #[tracing::instrument(level = "info", skip_all)] + async fn send_snapshot( + &mut self, + req: InstallSnapshotRequest, + snap_timeout: Duration, + ) -> Result>, ReplicationError> { + // tracing::debug!(?req.chunk_id(), "sending snapshot request"); + let option = RPCOption::new(snap_timeout); + + let res = C::AsyncRuntime::timeout(snap_timeout, self.network.install_snapshot(req, option)).await; + + let res: InstallSnapshotResponse = match res { + Ok(outer_res) => match outer_res { + Ok(res) => res, + Err(err) => { + tracing::warn!(error=%err, "error sending InstallSnapshot RPC to target"); + return Ok(None); + } + }, + Err(err) => { + // TODO(2): add backoff when Unreachable is returned + tracing::warn!(error=%err, "timeout while sending InstallSnapshot RPC to target"); return Ok(None); } + }; - // Everything is good, so update offset for sending the next chunk. - offset += n_read as u64; - - // Check raft channel to ensure we are staying up-to-date, then loop. - self.try_drain_events().await?; - } + Ok(Some(res)) } } diff --git a/openraft/src/storage_error.rs b/openraft/src/storage_error.rs index df65b7065..d39a7dc5b 100644 --- a/openraft/src/storage_error.rs +++ b/openraft/src/storage_error.rs @@ -35,6 +35,22 @@ where NID: NodeId } } +impl ToStorageResult for Result +where NID: NodeId +{ + fn sto_res(self, f: F) -> Result> + where F: FnOnce() -> (ErrorSubject, ErrorVerb) { + match self { + Ok(x) => Ok(x), + Err(e) => { + let (subject, verb) = f(); + let io_err = StorageIOError::new(subject, verb, e); + Err(io_err.into()) + } + } + } +} + /// An error that occurs when the RaftStore impl runs defensive check of input or output. /// E.g. re-applying an log entry is a violation that may be a potential bug. #[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 6753a5eb3..95902bf79 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -1,17 +1,13 @@ use std::fmt::Debug; -use tokio::io::AsyncRead; -use tokio::io::AsyncSeek; -use tokio::io::AsyncWrite; - use crate::entry::FromAppData; use crate::entry::RaftEntry; +use crate::raft::SnapshotData; use crate::AppData; use crate::AppDataResponse; use crate::AsyncRuntime; use crate::Node; use crate::NodeId; -use crate::OptionalSend; /// Configuration of types used by the [`Raft`] core engine. /// @@ -34,7 +30,7 @@ use crate::OptionalSend; /// NodeId = u64, /// Node = openraft::BasicNode, /// Entry = openraft::Entry, -/// SnapshotData = Cursor>, +/// SnapshotData = ExampleSnapshot, /// AsyncRuntime = openraft::TokioRuntime, /// ); /// ``` @@ -61,8 +57,13 @@ pub trait RaftTypeConfig: /// /// See the [storage chapter of the guide](https://datafuselabs.github.io/openraft/getting-started.html#implement-raftstorage) /// for details on where and how this is used. - type SnapshotData: AsyncRead + AsyncWrite + AsyncSeek + OptionalSend + Sync + Unpin + 'static; + type SnapshotData: SnapshotData + 'static; /// Asynchronous runtime type. type AsyncRuntime: AsyncRuntime; } + +pub(crate) type RTCSnapshotData = ::SnapshotData; +pub(crate) type RTCSnapshotChunkId = <::SnapshotData as SnapshotData>::ChunkId; +pub(crate) type RTCSnapshotChunk = <::SnapshotData as SnapshotData>::Chunk; +pub(crate) type RTCSnapshotManifest = <::SnapshotData as SnapshotData>::Manifest; diff --git a/rocksstore-compat07/src/lib.rs b/rocksstore-compat07/src/lib.rs index 90b497d15..3c41f50e4 100644 --- a/rocksstore-compat07/src/lib.rs +++ b/rocksstore-compat07/src/lib.rs @@ -35,6 +35,7 @@ use byteorder::WriteBytesExt; use openraft::async_trait::async_trait; use openraft::compat::compat07; use openraft::compat::Upgrade; +use openraft::raft::ExampleSnapshot; use openraft::AnyError; use openraft::EmptyNode; use openraft::Entry; @@ -67,7 +68,7 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = EmptyNode, - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); #[derive(Serialize, Deserialize, Debug, Clone)] @@ -444,7 +445,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -598,7 +599,7 @@ impl RaftStorage for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -660,7 +661,7 @@ impl RaftStorage for Arc { Ok(Some(Snapshot { meta, - snapshot: Box::new(Cursor::new(d)), + snapshot: Box::new(Cursor::new(d).into()), })) } diff --git a/rocksstore/src/lib.rs b/rocksstore/src/lib.rs index 7cd8a25a5..ce715cb36 100644 --- a/rocksstore/src/lib.rs +++ b/rocksstore/src/lib.rs @@ -16,6 +16,7 @@ use byteorder::BigEndian; use byteorder::ReadBytesExt; use byteorder::WriteBytesExt; use openraft::async_trait::async_trait; +use openraft::raft::ExampleSnapshot; use openraft::storage::LogState; use openraft::storage::Snapshot; use openraft::AnyError; @@ -47,7 +48,7 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for `MemStore`. pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); /** @@ -413,7 +414,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -546,7 +547,7 @@ impl RaftStorage for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -587,7 +588,7 @@ impl RaftStorage for Arc { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } None => Ok(None), diff --git a/sledstore/src/lib.rs b/sledstore/src/lib.rs index 757cd176e..193499e36 100644 --- a/sledstore/src/lib.rs +++ b/sledstore/src/lib.rs @@ -15,6 +15,7 @@ use byteorder::BigEndian; use byteorder::ByteOrder; use byteorder::ReadBytesExt; use openraft::async_trait::async_trait; +use openraft::raft::ExampleSnapshot as RaftExampleSnapshot; use openraft::storage::LogState; use openraft::storage::Snapshot; use openraft::AnyError; @@ -42,7 +43,7 @@ pub type ExampleNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration for example K/V store. pub TypeConfig: D = ExampleRequest, R = ExampleResponse, NodeId = ExampleNodeId, Node = BasicNode, - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = Entry, SnapshotData = RaftExampleSnapshot, AsyncRuntime = TokioRuntime ); /** @@ -460,7 +461,7 @@ impl RaftSnapshotBuilder for Arc { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -622,7 +623,7 @@ impl RaftStorage for Arc { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } #[tracing::instrument(level = "trace", skip(self, snapshot))] @@ -660,7 +661,7 @@ impl RaftStorage for Arc { let data = snapshot.data.clone(); Ok(Some(Snapshot { meta: snapshot.meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } None => Ok(None), diff --git a/stores/rocksstore-v2/src/lib.rs b/stores/rocksstore-v2/src/lib.rs index 4f85945ee..f68e3f7df 100644 --- a/stores/rocksstore-v2/src/lib.rs +++ b/stores/rocksstore-v2/src/lib.rs @@ -39,6 +39,7 @@ use openraft::StorageError; use openraft::StorageIOError; use openraft::StoredMembership; use openraft::TokioRuntime; +use openraft::raft::ExampleSnapshot; use openraft::Vote; use rand::Rng; use rocksdb::ColumnFamily; @@ -54,7 +55,7 @@ pub type RocksNodeId = u64; openraft::declare_raft_types!( /// Declare the type configuration. pub TypeConfig: D = RocksRequest, R = RocksResponse, NodeId = RocksNodeId, Node = BasicNode, - Entry = Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime + Entry = Entry, SnapshotData = ExampleSnapshot, AsyncRuntime = TokioRuntime ); /** @@ -305,7 +306,7 @@ impl RaftSnapshotBuilder for RocksStateMachine { Ok(Snapshot { meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), }) } } @@ -455,7 +456,7 @@ impl RaftStateMachine for RocksStateMachine { async fn begin_receiving_snapshot( &mut self, ) -> Result::SnapshotData>, StorageError> { - Ok(Box::new(Cursor::new(Vec::new()))) + Ok(Box::new(Cursor::new(Vec::new()).into())) } async fn install_snapshot( @@ -510,7 +511,7 @@ impl RaftStateMachine for RocksStateMachine { Ok(Some(Snapshot { meta: snapshot.meta, - snapshot: Box::new(Cursor::new(data)), + snapshot: Box::new(Cursor::new(data).into()), })) } } diff --git a/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs b/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs index f018608b0..6018bb25b 100644 --- a/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs +++ b/tests/tests/snapshot_streaming/t10_api_install_snapshot.rs @@ -2,6 +2,10 @@ use std::sync::Arc; use anyhow::Result; use maplit::btreeset; +use openraft::raft::ExampleChunkId; +use openraft::raft::ExampleManifest; +use openraft::raft::ExampleSnapshotChunk; +use openraft::raft::InstallSnapshotData; use openraft::raft::InstallSnapshotRequest; use openraft::CommittedLeaderId; use openraft::Config; @@ -48,18 +52,20 @@ async fn snapshot_arguments() -> Result<()> { }), last_membership: Default::default(), }, - offset: 0, - data: vec![1, 2, 3], - done: false, + data: InstallSnapshotData::Manifest(ExampleManifest::default()), }; tracing::info!(log_index, "--- only allow to begin a new session when offset is 0"); { let mut req = make_req(); - req.offset = 2; + req.data = InstallSnapshotData::Chunk(ExampleSnapshotChunk { + chunk_id: ExampleChunkId { offset: 2, len: 0 }, + data: vec![], + }); + let res = n.0.install_snapshot(req).await; assert_eq!( - "snapshot segment id mismatch, expect: ss1+0, got: ss1+2", + "snapshot segment id mismatch, expect: , got: ss1", res.unwrap_err().to_string() ); } @@ -72,11 +78,15 @@ async fn snapshot_arguments() -> Result<()> { tracing::info!("-- continue write with different id"); { let mut req = make_req(); - req.offset = 3; req.meta.snapshot_id = "ss2".into(); + req.data = InstallSnapshotData::Chunk(ExampleSnapshotChunk { + chunk_id: ExampleChunkId { offset: 3, len: 0 }, + data: vec![], + }); + let res = n.0.install_snapshot(req).await; assert_eq!( - "snapshot segment id mismatch, expect: ss2+0, got: ss2+3", + "snapshot segment id mismatch, expect: ss1, got: ss2", res.unwrap_err().to_string() ); } @@ -84,12 +94,22 @@ async fn snapshot_arguments() -> Result<()> { tracing::info!("-- write from offset=0 with different id, create a new session"); { let mut req = make_req(); - req.offset = 0; req.meta.snapshot_id = "ss2".into(); n.0.install_snapshot(req).await?; let mut req = make_req(); - req.offset = 3; + req.data = InstallSnapshotData::Chunk(ExampleSnapshotChunk { + chunk_id: ExampleChunkId { offset: 0, len: 0 }, + data: vec![], + }); + req.meta.snapshot_id = "ss2".into(); + n.0.install_snapshot(req).await?; + + let mut req = make_req(); + req.data = InstallSnapshotData::Chunk(ExampleSnapshotChunk { + chunk_id: ExampleChunkId { offset: 3, len: 0 }, + data: vec![], + }); req.meta.snapshot_id = "ss2".into(); n.0.install_snapshot(req).await?; } @@ -97,7 +117,11 @@ async fn snapshot_arguments() -> Result<()> { tracing::info!("-- continue write with mismatched offset is allowed"); { let mut req = make_req(); - req.offset = 8; + req.data = InstallSnapshotData::Chunk(ExampleSnapshotChunk { + chunk_id: ExampleChunkId { offset: 8, len: 0 }, + data: vec![], + }); + req.meta.snapshot_id = "ss2".into(); n.0.install_snapshot(req).await?; } diff --git a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs index 77aa5d9bb..20e35f942 100644 --- a/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs +++ b/tests/tests/snapshot_streaming/t33_snapshot_delete_conflict_logs.rs @@ -8,7 +8,10 @@ use openraft::network::RPCOption; use openraft::network::RaftNetwork; use openraft::network::RaftNetworkFactory; use openraft::raft::AppendEntriesRequest; +use openraft::raft::InstallSnapshotData; use openraft::raft::InstallSnapshotRequest; +use openraft::raft::SnapshotData; +use openraft::raft::SnapshotManifest; use openraft::storage::RaftLogStorage; use openraft::storage::RaftStateMachine; use openraft::testing; @@ -54,6 +57,8 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { let mut router = RaftRouter::new(config.clone()); let mut log_index; + // cargo test --package tests --test snapshot_streaming -- + // t33_snapshot_delete_conflict_logs::snapshot_delete_conflicting_logs --exact --nocapture tracing::info!("--- manually init node-0 with a higher vote, in order to override conflict log on learner later"); { @@ -147,17 +152,28 @@ async fn snapshot_delete_conflicting_logs() -> Result<()> { b.build_snapshot().await? }; + let manifest = snap.snapshot.manifest().await; + + let mut client = router.new_client(1, &()).await; + let req = InstallSnapshotRequest { vote: sto0.read_vote().await?.unwrap(), meta: snap.meta.clone(), - offset: 0, - data: snap.snapshot.into_inner(), - done: true, + data: InstallSnapshotData::Manifest(manifest.clone()), }; - let option = RPCOption::new(Duration::from_millis(1_000)); + client.install_snapshot(req, RPCOption::new(Duration::from_millis(1_000))).await?; - router.new_client(1, &()).await.install_snapshot(req, option).await?; + for chunk_id in manifest.chunks_to_send() { + let chunk = snap.snapshot.get_chunk(&chunk_id).await?; + let req = InstallSnapshotRequest { + vote: sto0.read_vote().await?.unwrap(), + meta: snap.meta.clone(), + data: InstallSnapshotData::Chunk(chunk), + }; + + client.install_snapshot(req, RPCOption::new(Duration::from_millis(1_000))).await?; + } tracing::info!(log_index, "--- DONE installing snapshot");