Skip to content

Commit

Permalink
test: Add persistency test
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Oct 21, 2024
1 parent d595d2f commit 6eac056
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 18 deletions.
4 changes: 3 additions & 1 deletion sorock/src/backend/redb/log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;

use std::time::Duration;

mod value {
use super::*;

Expand Down Expand Up @@ -68,7 +70,7 @@ impl Reaper {

pub fn reap(&self) -> Result<()> {
// Blocked until the first element is received.
let head = self.rx.recv()?;
let head = self.rx.recv_timeout(Duration::from_millis(100))?;
let tail = self.rx.drain();

let mut notifiers = vec![];
Expand Down
7 changes: 6 additions & 1 deletion sorock/src/backend/redb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@ mod log;
pub struct Backend {
db: Arc<redb::Database>,
tx: log::Sender,
_kill_tx: flume::Receiver<()>,
}
impl Backend {
pub fn new(redb: redb::Database) -> Self {
let db = Arc::new(redb);

let (reaper, tx) = log::Reaper::new(db.clone());
let (kill_rx, _kill_tx) = flume::bounded(1);
std::thread::spawn(move || loop {
if kill_rx.is_disconnected() {
break;
}
reaper.reap().ok();
});

Self { db, tx }
Self { db, tx, _kill_tx }
}

pub fn get(&self, shard_id: u32) -> Result<(impl RaftLogStore, impl RaftBallotStore)> {
Expand Down
6 changes: 5 additions & 1 deletion sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl CommandLog {
pub async fn restore_state(&self) -> Result<()> {
let log_last_index = self.get_log_last_index().await?;
let snapshot_index = match self.find_last_snapshot_index(log_last_index).await? {
Some(x) => x,
Some(x) => {
info!("restore state: found snapshot_index={x}");
x
}
None => 0,
};
let start_index = if snapshot_index == 0 {
Expand All @@ -73,6 +76,7 @@ impl CommandLog {
self.kern_pointer.store(start_index, Ordering::SeqCst);
self.user_pointer.store(start_index, Ordering::SeqCst);

info!("restore state: snapshot_index={start_index}");
Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/env/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ anyhow.workspace = true
port_check = "0.2.1"
redb.workspace = true
spin.workspace = true
tempfile = "3.13.0"
tokio.workspace = true
tonic.workspace = true
tracing.workspace = true
Expand Down
65 changes: 56 additions & 9 deletions tests/env/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::Result;
use core::error;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Once;
use std::time::Duration;
use tempfile::NamedTempFile;
use tonic::codegen::CompressionEncoding;
use tonic::transport::{Channel, Endpoint, Uri};
use tracing::info;
use tracing::{error, info};

static INIT: Once = Once::new();

Expand All @@ -13,24 +16,37 @@ struct Node {
abort_tx0: Option<tokio::sync::oneshot::Sender<()>>,
}
impl Node {
pub fn new(id: u8, port: u16, n_shards: u32) -> Result<Self> {
pub fn new(id: u8, port: u16, n_shards: u32, file: Option<Arc<NamedTempFile>>) -> Result<Self> {
let nd_tag = format!("ND{port}>");
let (tx, rx) = tokio::sync::oneshot::channel();

let svc_task = async move {
info!("add (id={id})");
info!("env: add (id={id})");

let node_id = {
let address = format!("http://127.0.0.1:{port}");
address.parse().unwrap()
};
let node = sorock::RaftNode::new(node_id);

info!("env: create db");
let db = {
let mem = redb::backends::InMemoryBackend::new();
let db = redb::Database::builder().create_with_backend(mem).unwrap();
let db = match file {
Some(file) => match redb::Database::create(file.path()) {
Ok(x) => x,
Err(e) => {
error!("failed to create db: {:?}", e);
panic!()
}
},
None => {
let mem = redb::backends::InMemoryBackend::new();
redb::Database::builder().create_with_backend(mem).unwrap()
}
};
sorock::backend::redb::Backend::new(db)
};
info!("env: db created");

for shard_id in 0..n_shards {
let (log, ballot) = db.get(shard_id).unwrap();
Expand All @@ -57,7 +73,7 @@ impl Node {
.add_service(reflection_svc)
.add_service(ping_svc)
.serve_with_shutdown(socket, async {
info!("remove (id={id})");
info!("env: remove (id={id})");
rx.await.ok();
})
.await
Expand Down Expand Up @@ -93,12 +109,32 @@ impl Drop for Node {
tx.send(()).ok();
}
}

struct PersistentEnv {
files: HashMap<u8, Arc<NamedTempFile>>,
}
impl PersistentEnv {
fn new() -> Self {
Self {
files: HashMap::new(),
}
}
fn get(&mut self, id: u8) -> Arc<NamedTempFile> {
self.files
.entry(id)
.or_insert_with(|| Arc::new(NamedTempFile::new().unwrap()))
.clone()
}
}

pub struct Env {
allocated_ports: HashMap<u8, u16>,
nodes: HashMap<u8, Node>,
conn_cache: spin::Mutex<HashMap<u8, Channel>>,
penv: Option<PersistentEnv>,
}
impl Env {
pub fn new(with_logging: bool) -> Self {
pub fn new(with_persistency: bool, with_logging: bool) -> Self {
INIT.call_once(|| {
// On terminating the tokio runtime,
// flooding stack traces are printed and they are super noisy.
Expand All @@ -113,15 +149,26 @@ impl Env {
tracing_subscriber::fmt().event_format(format).init();
}
});
let penv = if with_persistency {
Some(PersistentEnv::new())
} else {
None
};
Self {
nodes: HashMap::new(),
allocated_ports: HashMap::new(),
conn_cache: spin::Mutex::new(HashMap::new()),
penv,
}
}

pub fn add_node(&mut self, id: u8, n_shards: u32) {
let free_port = port_check::free_local_ipv4_port().unwrap();
let node = Node::new(id, free_port, n_shards).unwrap();
let free_port = *self
.allocated_ports
.entry(id)
.or_insert_with(|| port_check::free_local_ipv4_port().unwrap());
let file0 = self.penv.as_mut().map(|env| env.get(id));
let node = Node::new(id, free_port, n_shards, file0).unwrap();
port_check::is_port_reachable_with_timeout(
node.address().to_string(),
Duration::from_secs(5),
Expand Down
8 changes: 4 additions & 4 deletions tests/env/tests/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ use anyhow::Result;

#[tokio::test(flavor = "multi_thread")]
async fn create() -> Result<()> {
let mut env = env::Env::new(true);
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
env.check_connectivity(0).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn create_remove() -> Result<()> {
let mut env = env::Env::new(true);
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
env.check_connectivity(0).await?;

Expand All @@ -24,7 +24,7 @@ async fn create_remove() -> Result<()> {

#[tokio::test(flavor = "multi_thread")]
async fn panic_loop() -> Result<()> {
let mut env = env::Env::new(true);
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
env.check_connectivity(0).await?;

Expand All @@ -39,7 +39,7 @@ async fn panic_loop() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
async fn drop_env() -> Result<()> {
for _ in 0..10 {
let mut env = env::Env::new(true);
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
env.check_connectivity(0).await?;
}
Expand Down
27 changes: 27 additions & 0 deletions tests/env/tests/persistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Result;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn create() -> Result<()> {
let mut env = env::Env::new(true, true);
env.add_node(0, 1);
env.check_connectivity(0).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn create_remove() -> Result<()> {
let mut env = env::Env::new(true, true);
env.add_node(0, 1);
env.check_connectivity(0).await?;
let addr0 = env.address(0);
env.remove_node(0);
tokio::time::sleep(Duration::from_secs(1)).await;

env.add_node(0, 1);
env.check_connectivity(0).await?;
let addr1 = env.address(0);

assert_eq!(addr0, addr1);
Ok(())
}
15 changes: 13 additions & 2 deletions tests/sorock-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ use env::Env;
use sorock::service::raft::client::*;

pub struct Builder {
with_persistency: bool,
with_logging: bool,
}
impl Builder {
fn new() -> Self {
Self { with_logging: true }
Self {
with_persistency: false,
with_logging: true,
}
}

pub fn with_persistency(self, b: bool) -> Self {
Self {
with_persistency: b,
..self
}
}

pub fn with_logging(self, b: bool) -> Self {
Expand All @@ -20,7 +31,7 @@ impl Builder {
pub async fn build(self, n: u8, p: u32) -> Result<Cluster> {
ensure!(n > 0);
ensure!(p > 0);
let mut env = Env::new(self.with_logging);
let mut env = Env::new(self.with_persistency, self.with_logging);
for id in 0..n {
env.add_node(id, p);
env.check_connectivity(0).await?;
Expand Down
36 changes: 36 additions & 0 deletions tests/sorock-tests/tests/6_persistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use anyhow::Result;
use rand::Rng;
use sorock_tests::*;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn n3_restore() -> Result<()> {
let mut cluster = Cluster::builder()
.with_persistency(true)
.build(3, 1)
.await?;
cluster.add_server(0, 0, 0).await?;
cluster.add_server(0, 0, 1).await?;
cluster.add_server(0, 1, 2).await?;

let mut cur_state = 0;
for _ in 0..10 {
let add_v = rand::thread_rng().gen_range(1..=9);
let old_v = cluster.user(0).fetch_add(0, add_v).await?;
assert_eq!(old_v, cur_state);
cur_state += add_v;
}

cluster.env().remove_node(0);
cluster.env().remove_node(1);
cluster.env().remove_node(2);
tokio::time::sleep(Duration::from_secs(1)).await;

cluster.env().add_node(0, 1);
cluster.env().add_node(1, 1);
// Wait for election.
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(cluster.user(1).read(0).await?, cur_state);

Ok(())
}

0 comments on commit 6eac056

Please sign in to comment.