From 6fc975bceb07ea084ef1ec9cdbca4d404b2b7875 Mon Sep 17 00:00:00 2001 From: Akira Hayakawa Date: Mon, 21 Oct 2024 17:40:22 +0900 Subject: [PATCH] test: Implement persistent snapshot --- sorock/src/process/app.rs | 1 + sorock/src/process/command_log/mod.rs | 2 +- tests/env/src/lib.rs | 53 +++++++--- tests/env/tests/env.rs | 16 +-- tests/sorock-tests/src/lib.rs | 4 +- tests/sorock-tests/tests/6_persistency.rs | 7 +- tests/testapp/Cargo.toml | 1 + tests/testapp/src/raft_process/mod.rs | 116 +++++++++++++++++++++- 8 files changed, 172 insertions(+), 28 deletions(-) diff --git a/sorock/src/process/app.rs b/sorock/src/process/app.rs index 46807f5b..ff31166b 100644 --- a/sorock/src/process/app.rs +++ b/sorock/src/process/app.rs @@ -31,6 +31,7 @@ impl App { if snapshot_index == 1 { return Ok(()); } + info!("install snapshot@{snapshot_index}"); self.install_snapshot(snapshot_index).await?; Ok(()) } diff --git a/sorock/src/process/command_log/mod.rs b/sorock/src/process/command_log/mod.rs index c1ce6730..fed5c246 100644 --- a/sorock/src/process/command_log/mod.rs +++ b/sorock/src/process/command_log/mod.rs @@ -76,7 +76,7 @@ impl CommandLog { self.kern_pointer.store(start_index, Ordering::SeqCst); self.user_pointer.store(start_index, Ordering::SeqCst); - info!("restore state: snapshot_index={start_index}"); + info!("restore state: commit_index={start_index}"); Ok(()) } } diff --git a/tests/env/src/lib.rs b/tests/env/src/lib.rs index 904adb24..f4443400 100644 --- a/tests/env/src/lib.rs +++ b/tests/env/src/lib.rs @@ -16,7 +16,12 @@ struct Node { abort_tx0: Option>, } impl Node { - pub fn new(id: u8, port: u16, n_shards: u32, file: Option>) -> Result { + pub fn new( + id: u8, + port: u16, + n_shards: u32, + pstate: Option>, + ) -> Result { let nd_tag = format!("ND{port}>"); let (tx, rx) = tokio::sync::oneshot::channel(); @@ -31,8 +36,8 @@ impl Node { info!("env: create db"); let db = { - let db = match file { - Some(file) => match redb::Database::create(file.path()) { + let db = match pstate { + Some(file) => match redb::Database::create(file.log_file.path()) { Ok(x) => x, Err(e) => { error!("failed to create db: {:?}", e); @@ -49,9 +54,12 @@ impl Node { info!("env: db created"); for shard_id in 0..n_shards { + let state = pstate + .as_ref() + .map(|env| env.snapshot_file[shard_id as usize].path()); let (log, ballot) = db.get(shard_id).unwrap(); let driver = node.get_driver(shard_id); - let process = testapp::raft_process::new(log, ballot, driver) + let process = testapp::raft_process::new(state, log, ballot, driver) .await .unwrap(); node.attach_process(shard_id, process); @@ -110,31 +118,51 @@ impl Drop for Node { } } +struct PersistentState { + log_file: NamedTempFile, + snapshot_file: Vec, +} +impl PersistentState { + fn new(n_shards: u32) -> Self { + let log_file = NamedTempFile::new().unwrap(); + let snapshot_file = (0..n_shards) + .map(|_| NamedTempFile::new().unwrap()) + .collect(); + Self { + log_file, + snapshot_file, + } + } +} + struct PersistentEnv { - files: HashMap>, + files: HashMap>, + n_shards: u32, } impl PersistentEnv { - fn new() -> Self { + fn new(n_shards: u32) -> Self { Self { files: HashMap::new(), + n_shards, } } - fn get(&mut self, id: u8) -> Arc { + fn get(&mut self, id: u8) -> Arc { self.files .entry(id) - .or_insert_with(|| Arc::new(NamedTempFile::new().unwrap())) + .or_insert_with(|| Arc::new(PersistentState::new(self.n_shards))) .clone() } } pub struct Env { + n_shards: u32, allocated_ports: HashMap, nodes: HashMap, conn_cache: spin::Mutex>, penv: Option, } impl Env { - pub fn new(with_persistency: bool, with_logging: bool) -> Self { + pub fn new(n_shards: u32, with_persistency: bool, with_logging: bool) -> Self { INIT.call_once(|| { // On terminating the tokio runtime, // flooding stack traces are printed and they are super noisy. @@ -150,11 +178,12 @@ impl Env { } }); let penv = if with_persistency { - Some(PersistentEnv::new()) + Some(PersistentEnv::new(n_shards)) } else { None }; Self { + n_shards, nodes: HashMap::new(), allocated_ports: HashMap::new(), conn_cache: spin::Mutex::new(HashMap::new()), @@ -162,12 +191,12 @@ impl Env { } } - pub fn add_node(&mut self, id: u8, n_shards: u32) { + pub fn add_node(&mut self, id: u8) { let free_port = *self .allocated_ports .entry(id) .or_insert_with(|| port_check::free_local_ipv4_port().unwrap()); - let file0 = self.penv.as_mut().map(|env| env.get(id)); + let snap_states = self.penv.as_mut().map(|env| env.get(id)); let node = Node::new(id, free_port, n_shards, file0).unwrap(); port_check::is_port_reachable_with_timeout( node.address().to_string(), diff --git a/tests/env/tests/env.rs b/tests/env/tests/env.rs index 61fc976c..48de7e4d 100644 --- a/tests/env/tests/env.rs +++ b/tests/env/tests/env.rs @@ -2,16 +2,16 @@ use anyhow::Result; #[tokio::test(flavor = "multi_thread")] async fn create() -> Result<()> { - let mut env = env::Env::new(false, true); - env.add_node(0, 1); + let mut env = env::Env::new(1, false, true); + env.add_node(0); env.check_connectivity(0).await?; Ok(()) } #[tokio::test(flavor = "multi_thread")] async fn create_remove() -> Result<()> { - let mut env = env::Env::new(false, true); - env.add_node(0, 1); + let mut env = env::Env::new(1, false, true); + env.add_node(0); env.check_connectivity(0).await?; let mut cli = env.connect_ping_client(0).await?; @@ -24,8 +24,8 @@ async fn create_remove() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn panic_loop() -> Result<()> { - let mut env = env::Env::new(false, true); - env.add_node(0, 1); + let mut env = env::Env::new(1, false, true); + env.add_node(0); env.check_connectivity(0).await?; for _ in 0..10 { @@ -39,8 +39,8 @@ async fn panic_loop() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn drop_env() -> Result<()> { for _ in 0..10 { - let mut env = env::Env::new(false, true); - env.add_node(0, 1); + let mut env = env::Env::new(1, false, true); + env.add_node(0); env.check_connectivity(0).await?; } diff --git a/tests/sorock-tests/src/lib.rs b/tests/sorock-tests/src/lib.rs index dc30103d..68af61d1 100644 --- a/tests/sorock-tests/src/lib.rs +++ b/tests/sorock-tests/src/lib.rs @@ -31,9 +31,9 @@ impl Builder { pub async fn build(self, n: u8, p: u32) -> Result { ensure!(n > 0); ensure!(p > 0); - let mut env = Env::new(self.with_persistency, self.with_logging); + let mut env = Env::new(p, self.with_persistency, self.with_logging); for id in 0..n { - env.add_node(id, p); + env.add_node(id); env.check_connectivity(0).await?; } Ok(Cluster { env }) diff --git a/tests/sorock-tests/tests/6_persistency.rs b/tests/sorock-tests/tests/6_persistency.rs index f10ad630..bde2b438 100644 --- a/tests/sorock-tests/tests/6_persistency.rs +++ b/tests/sorock-tests/tests/6_persistency.rs @@ -14,11 +14,16 @@ async fn n3_restore() -> Result<()> { cluster.add_server(0, 1, 2).await?; let mut cur_state = 0; - for _ in 0..10 { + for i in 0..10 { let add_v = rand::thread_rng().gen_range(1..=9); let old_v = cluster.user(0).fetch_add(0, add_v).await?; assert_eq!(old_v, cur_state); cur_state += add_v; + + if i == 5 { + cluster.user(0).make_snapshot(0).await?; + cluster.user(1).make_snapshot(0).await?; + } } cluster.env().remove_node(0); diff --git a/tests/testapp/Cargo.toml b/tests/testapp/Cargo.toml index 371bd927..610d5555 100644 --- a/tests/testapp/Cargo.toml +++ b/tests/testapp/Cargo.toml @@ -13,6 +13,7 @@ bytes.workspace = true futures.workspace = true serde = { version = "1.0", features = ["derive"] } spin = "0.9" +tempfile = "3.13.0" tokio = { version = "1", features = ["full", "tracing"] } tokio-retry = "0.3" tokio-util = "0.7" diff --git a/tests/testapp/src/raft_process/mod.rs b/tests/testapp/src/raft_process/mod.rs index 9dc3b201..d70570d5 100644 --- a/tests/testapp/src/raft_process/mod.rs +++ b/tests/testapp/src/raft_process/mod.rs @@ -6,15 +6,19 @@ use futures::TryStreamExt; use sorock::process::*; use spin::RwLock; use std::collections::BTreeMap; +use std::fs::{File, OpenOptions}; +use std::io::{Read, Write}; +use std::path::Path; mod snapshot_io; pub async fn new( + snap_file: Option>, log: impl RaftLogStore, ballot: impl RaftBallotStore, driver: sorock::RaftDriver, ) -> Result { - let app_main = AppMain::new(); + let app_main = AppMain::new(snap_file); let process = RaftProcess::new(app_main, log, ballot, driver).await?; Ok(process) } @@ -37,21 +41,125 @@ impl AppSnapshot { } } +#[derive(serde::Serialize, serde::Deserialize)] +struct SnapshotTable { + inner: BTreeMap, +} +impl SnapshotTable { + fn insert(&mut self, idx: u64, state: AppState) { + self.inner.insert(idx, state); + } + fn get(&self, idx: &u64) -> Option { + self.inner.get(idx) + } + fn split_off(&mut self, idx: &u64) -> BTreeMap { + self.inner.split_off(idx) + } + fn contains_key(&self, idx: &u64) -> bool { + self.inner.contains_key(idx) + } + fn get_latest_snapshot(&self) -> u64 { + let mut out = vec![]; + for (&i, _) in &self.inner { + out.push(i); + } + out.sort(); + out.pop().unwrap_or(0) + } +} + +enum SnapshotsInner { + Mem(SnapshotTable), + Disk(File), +} +struct Snapshots { + inner: SnapshotsInner, +} +impl Snapshots { + fn new(file: Option) -> Self { + let inner = match file { + Some(f) => SnapshotsInner::Disk(f), + None => SnapshotsInner::Mem(SnapshotTable { + inner: BTreeMap::new(), + }), + }; + Self { inner } + } + fn put(&mut self, idx: u64, state: AppState) { + match &mut self.inner { + SnapshotsInner::Mem(m) => { + m.insert(idx, state); + } + SnapshotsInner::Disk(f) => { + let mut f = f.reopen().unwrap(); + let bytes = state.serialize(); + f.write_all(&bytes).unwrap(); + } + } + } + fn get(&self, idx: &u64) -> Option { + match &self.inner { + SnapshotsInner::Mem(m) => m.get(idx), + SnapshotsInner::Disk(f) => { + let mut f = f.reopen().unwrap(); + let mut buf = vec![]; + f.read_to_end(&mut buf).unwrap(); + Some(AppState::deserialize(&buf)) + } + } + } + fn split_off(&mut self, idx: &u64) -> BTreeMap { + match &mut self.inner { + SnapshotsInner::Mem(m) => m.split_off(idx), + SnapshotsInner::Disk(f) => { + let mut f = f.reopen().unwrap(); + let mut buf = vec![]; + f.read_to_end(&mut buf).unwrap(); + let mut m = BTreeMap::new(); + let mut cursor = std::io::Cursor::new(&buf); + while cursor.position() < buf.len() as u64 { + let snap = AppState::deserialize(&buf); + m.insert(snap.0, snap); + } + m.split_off(idx) + } + } + } + fn get_latest_snapshot(&self, idx: u64) -> u64 { + match &self.inner { + SnapshotsInner::Mem(m) => m.get_latest_snapshot(), + SnapshotsInner::Disk(f) => { + let mut f = f.reopen().unwrap(); + let mut buf = vec![]; + f.read_to_end(&mut buf).unwrap(); + let mut m = BTreeMap::new(); + let mut cursor = std::io::Cursor::new(&buf); + while cursor.position() < buf.len() as u64 { + let snap = AppState::deserialize(&buf); + m.insert(snap.0, snap); + } + m.get_latest_snapshot() + } + } + } +} + struct InnerState { state_index: u64, counter: u64, } struct AppMain { state: RwLock, - snapshots: RwLock>, + snapshots: RwLock, } impl AppMain { - pub fn new() -> Self { + pub fn new(snap_file: Option>) -> Self { let init_state = InnerState { state_index: 0, counter: 0, }; - let snapshots = BTreeMap::new(); + let file = snap_file.map(|f| File::open(f).unwrap()); + let snapshots = Snapshots::new(file); Self { state: RwLock::new(init_state), snapshots: RwLock::new(snapshots),