From 6eac056018ff2ca346b207e9e6f8619c66ce0f49 Mon Sep 17 00:00:00 2001 From: Akira Hayakawa Date: Mon, 21 Oct 2024 00:31:19 +0000 Subject: [PATCH] test: Add persistency test --- sorock/src/backend/redb/log.rs | 4 +- sorock/src/backend/redb/mod.rs | 7 ++- sorock/src/process/command_log/mod.rs | 6 ++- tests/env/Cargo.toml | 1 + tests/env/src/lib.rs | 65 +++++++++++++++++++---- tests/env/tests/env.rs | 8 +-- tests/env/tests/persistency.rs | 27 ++++++++++ tests/sorock-tests/src/lib.rs | 15 +++++- tests/sorock-tests/tests/6_persistency.rs | 36 +++++++++++++ 9 files changed, 151 insertions(+), 18 deletions(-) create mode 100644 tests/env/tests/persistency.rs create mode 100644 tests/sorock-tests/tests/6_persistency.rs diff --git a/sorock/src/backend/redb/log.rs b/sorock/src/backend/redb/log.rs index 84b4ae8a..ef9233f5 100644 --- a/sorock/src/backend/redb/log.rs +++ b/sorock/src/backend/redb/log.rs @@ -1,5 +1,7 @@ use super::*; +use std::time::Duration; + mod value { use super::*; @@ -68,7 +70,7 @@ impl Reaper { pub fn reap(&self) -> Result<()> { // Blocked until the first element is received. - let head = self.rx.recv()?; + let head = self.rx.recv_timeout(Duration::from_millis(100))?; let tail = self.rx.drain(); let mut notifiers = vec![]; diff --git a/sorock/src/backend/redb/mod.rs b/sorock/src/backend/redb/mod.rs index 701a072d..769758b3 100644 --- a/sorock/src/backend/redb/mod.rs +++ b/sorock/src/backend/redb/mod.rs @@ -12,17 +12,22 @@ mod log; pub struct Backend { db: Arc, tx: log::Sender, + _kill_tx: flume::Receiver<()>, } impl Backend { pub fn new(redb: redb::Database) -> Self { let db = Arc::new(redb); let (reaper, tx) = log::Reaper::new(db.clone()); + let (kill_rx, _kill_tx) = flume::bounded(1); std::thread::spawn(move || loop { + if kill_rx.is_disconnected() { + break; + } reaper.reap().ok(); }); - Self { db, tx } + Self { db, tx, _kill_tx } } pub fn get(&self, shard_id: u32) -> Result<(impl RaftLogStore, impl RaftBallotStore)> { diff --git a/sorock/src/process/command_log/mod.rs b/sorock/src/process/command_log/mod.rs index 9c563fdb..c1ce6730 100644 --- a/sorock/src/process/command_log/mod.rs +++ b/sorock/src/process/command_log/mod.rs @@ -58,7 +58,10 @@ impl CommandLog { pub async fn restore_state(&self) -> Result<()> { let log_last_index = self.get_log_last_index().await?; let snapshot_index = match self.find_last_snapshot_index(log_last_index).await? { - Some(x) => x, + Some(x) => { + info!("restore state: found snapshot_index={x}"); + x + } None => 0, }; let start_index = if snapshot_index == 0 { @@ -73,6 +76,7 @@ impl CommandLog { self.kern_pointer.store(start_index, Ordering::SeqCst); self.user_pointer.store(start_index, Ordering::SeqCst); + info!("restore state: snapshot_index={start_index}"); Ok(()) } } diff --git a/tests/env/Cargo.toml b/tests/env/Cargo.toml index fa0a2c99..b015217b 100644 --- a/tests/env/Cargo.toml +++ b/tests/env/Cargo.toml @@ -10,6 +10,7 @@ anyhow.workspace = true port_check = "0.2.1" redb.workspace = true spin.workspace = true +tempfile = "3.13.0" tokio.workspace = true tonic.workspace = true tracing.workspace = true diff --git a/tests/env/src/lib.rs b/tests/env/src/lib.rs index 50bf2a74..904adb24 100644 --- a/tests/env/src/lib.rs +++ b/tests/env/src/lib.rs @@ -1,10 +1,13 @@ use anyhow::Result; +use core::error; use std::collections::HashMap; +use std::sync::Arc; use std::sync::Once; use std::time::Duration; +use tempfile::NamedTempFile; use tonic::codegen::CompressionEncoding; use tonic::transport::{Channel, Endpoint, Uri}; -use tracing::info; +use tracing::{error, info}; static INIT: Once = Once::new(); @@ -13,12 +16,12 @@ struct Node { abort_tx0: Option>, } impl Node { - pub fn new(id: u8, port: u16, n_shards: u32) -> Result { + pub fn new(id: u8, port: u16, n_shards: u32, file: Option>) -> Result { let nd_tag = format!("ND{port}>"); let (tx, rx) = tokio::sync::oneshot::channel(); let svc_task = async move { - info!("add (id={id})"); + info!("env: add (id={id})"); let node_id = { let address = format!("http://127.0.0.1:{port}"); @@ -26,11 +29,24 @@ impl Node { }; let node = sorock::RaftNode::new(node_id); + info!("env: create db"); let db = { - let mem = redb::backends::InMemoryBackend::new(); - let db = redb::Database::builder().create_with_backend(mem).unwrap(); + let db = match file { + Some(file) => match redb::Database::create(file.path()) { + Ok(x) => x, + Err(e) => { + error!("failed to create db: {:?}", e); + panic!() + } + }, + None => { + let mem = redb::backends::InMemoryBackend::new(); + redb::Database::builder().create_with_backend(mem).unwrap() + } + }; sorock::backend::redb::Backend::new(db) }; + info!("env: db created"); for shard_id in 0..n_shards { let (log, ballot) = db.get(shard_id).unwrap(); @@ -57,7 +73,7 @@ impl Node { .add_service(reflection_svc) .add_service(ping_svc) .serve_with_shutdown(socket, async { - info!("remove (id={id})"); + info!("env: remove (id={id})"); rx.await.ok(); }) .await @@ -93,12 +109,32 @@ impl Drop for Node { tx.send(()).ok(); } } + +struct PersistentEnv { + files: HashMap>, +} +impl PersistentEnv { + fn new() -> Self { + Self { + files: HashMap::new(), + } + } + fn get(&mut self, id: u8) -> Arc { + self.files + .entry(id) + .or_insert_with(|| Arc::new(NamedTempFile::new().unwrap())) + .clone() + } +} + pub struct Env { + allocated_ports: HashMap, nodes: HashMap, conn_cache: spin::Mutex>, + penv: Option, } impl Env { - pub fn new(with_logging: bool) -> Self { + pub fn new(with_persistency: bool, with_logging: bool) -> Self { INIT.call_once(|| { // On terminating the tokio runtime, // flooding stack traces are printed and they are super noisy. @@ -113,15 +149,26 @@ impl Env { tracing_subscriber::fmt().event_format(format).init(); } }); + let penv = if with_persistency { + Some(PersistentEnv::new()) + } else { + None + }; Self { nodes: HashMap::new(), + allocated_ports: HashMap::new(), conn_cache: spin::Mutex::new(HashMap::new()), + penv, } } pub fn add_node(&mut self, id: u8, n_shards: u32) { - let free_port = port_check::free_local_ipv4_port().unwrap(); - let node = Node::new(id, free_port, n_shards).unwrap(); + let free_port = *self + .allocated_ports + .entry(id) + .or_insert_with(|| port_check::free_local_ipv4_port().unwrap()); + let file0 = self.penv.as_mut().map(|env| env.get(id)); + let node = Node::new(id, free_port, n_shards, file0).unwrap(); port_check::is_port_reachable_with_timeout( node.address().to_string(), Duration::from_secs(5), diff --git a/tests/env/tests/env.rs b/tests/env/tests/env.rs index fc7b2d0b..61fc976c 100644 --- a/tests/env/tests/env.rs +++ b/tests/env/tests/env.rs @@ -2,7 +2,7 @@ use anyhow::Result; #[tokio::test(flavor = "multi_thread")] async fn create() -> Result<()> { - let mut env = env::Env::new(true); + let mut env = env::Env::new(false, true); env.add_node(0, 1); env.check_connectivity(0).await?; Ok(()) @@ -10,7 +10,7 @@ async fn create() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn create_remove() -> Result<()> { - let mut env = env::Env::new(true); + let mut env = env::Env::new(false, true); env.add_node(0, 1); env.check_connectivity(0).await?; @@ -24,7 +24,7 @@ async fn create_remove() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn panic_loop() -> Result<()> { - let mut env = env::Env::new(true); + let mut env = env::Env::new(false, true); env.add_node(0, 1); env.check_connectivity(0).await?; @@ -39,7 +39,7 @@ async fn panic_loop() -> Result<()> { #[tokio::test(flavor = "multi_thread")] async fn drop_env() -> Result<()> { for _ in 0..10 { - let mut env = env::Env::new(true); + let mut env = env::Env::new(false, true); env.add_node(0, 1); env.check_connectivity(0).await?; } diff --git a/tests/env/tests/persistency.rs b/tests/env/tests/persistency.rs new file mode 100644 index 00000000..ca087cfd --- /dev/null +++ b/tests/env/tests/persistency.rs @@ -0,0 +1,27 @@ +use anyhow::Result; +use std::time::Duration; + +#[tokio::test(flavor = "multi_thread")] +async fn create() -> Result<()> { + let mut env = env::Env::new(true, true); + env.add_node(0, 1); + env.check_connectivity(0).await?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn create_remove() -> Result<()> { + let mut env = env::Env::new(true, true); + env.add_node(0, 1); + env.check_connectivity(0).await?; + let addr0 = env.address(0); + env.remove_node(0); + tokio::time::sleep(Duration::from_secs(1)).await; + + env.add_node(0, 1); + env.check_connectivity(0).await?; + let addr1 = env.address(0); + + assert_eq!(addr0, addr1); + Ok(()) +} diff --git a/tests/sorock-tests/src/lib.rs b/tests/sorock-tests/src/lib.rs index 5f248640..dc30103d 100644 --- a/tests/sorock-tests/src/lib.rs +++ b/tests/sorock-tests/src/lib.rs @@ -3,11 +3,22 @@ use env::Env; use sorock::service::raft::client::*; pub struct Builder { + with_persistency: bool, with_logging: bool, } impl Builder { fn new() -> Self { - Self { with_logging: true } + Self { + with_persistency: false, + with_logging: true, + } + } + + pub fn with_persistency(self, b: bool) -> Self { + Self { + with_persistency: b, + ..self + } } pub fn with_logging(self, b: bool) -> Self { @@ -20,7 +31,7 @@ impl Builder { pub async fn build(self, n: u8, p: u32) -> Result { ensure!(n > 0); ensure!(p > 0); - let mut env = Env::new(self.with_logging); + let mut env = Env::new(self.with_persistency, self.with_logging); for id in 0..n { env.add_node(id, p); env.check_connectivity(0).await?; diff --git a/tests/sorock-tests/tests/6_persistency.rs b/tests/sorock-tests/tests/6_persistency.rs new file mode 100644 index 00000000..f10ad630 --- /dev/null +++ b/tests/sorock-tests/tests/6_persistency.rs @@ -0,0 +1,36 @@ +use anyhow::Result; +use rand::Rng; +use sorock_tests::*; +use std::time::Duration; + +#[tokio::test(flavor = "multi_thread")] +async fn n3_restore() -> Result<()> { + let mut cluster = Cluster::builder() + .with_persistency(true) + .build(3, 1) + .await?; + cluster.add_server(0, 0, 0).await?; + cluster.add_server(0, 0, 1).await?; + cluster.add_server(0, 1, 2).await?; + + let mut cur_state = 0; + for _ in 0..10 { + let add_v = rand::thread_rng().gen_range(1..=9); + let old_v = cluster.user(0).fetch_add(0, add_v).await?; + assert_eq!(old_v, cur_state); + cur_state += add_v; + } + + cluster.env().remove_node(0); + cluster.env().remove_node(1); + cluster.env().remove_node(2); + tokio::time::sleep(Duration::from_secs(1)).await; + + cluster.env().add_node(0, 1); + cluster.env().add_node(1, 1); + // Wait for election. + tokio::time::sleep(Duration::from_secs(5)).await; + assert_eq!(cluster.user(1).read(0).await?, cur_state); + + Ok(()) +}