Skip to content

Commit

Permalink
test: Add persistency test
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Oct 21, 2024
1 parent d595d2f commit e972d63
Show file tree
Hide file tree
Showing 13 changed files with 381 additions and 51 deletions.
4 changes: 3 additions & 1 deletion sorock/src/backend/redb/log.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::*;

use std::time::Duration;

mod value {
use super::*;

Expand Down Expand Up @@ -68,7 +70,7 @@ impl Reaper {

pub fn reap(&self) -> Result<()> {
// Blocked until the first element is received.
let head = self.rx.recv()?;
let head = self.rx.recv_timeout(Duration::from_millis(100))?;
let tail = self.rx.drain();

let mut notifiers = vec![];
Expand Down
7 changes: 6 additions & 1 deletion sorock/src/backend/redb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,22 @@ mod log;
pub struct Backend {
db: Arc<redb::Database>,
tx: log::Sender,
_kill_tx: flume::Receiver<()>,
}
impl Backend {
pub fn new(redb: redb::Database) -> Self {
let db = Arc::new(redb);

let (reaper, tx) = log::Reaper::new(db.clone());
let (kill_rx, _kill_tx) = flume::bounded(1);
std::thread::spawn(move || loop {
if kill_rx.is_disconnected() {
break;
}
reaper.reap().ok();
});

Self { db, tx }
Self { db, tx, _kill_tx }
}

pub fn get(&self, shard_id: u32) -> Result<(impl RaftLogStore, impl RaftBallotStore)> {
Expand Down
1 change: 1 addition & 0 deletions sorock/src/process/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl App {
if snapshot_index == 1 {
return Ok(());
}
info!("install snapshot@{snapshot_index}");
self.install_snapshot(snapshot_index).await?;
Ok(())
}
Expand Down
6 changes: 5 additions & 1 deletion sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl CommandLog {
pub async fn restore_state(&self) -> Result<()> {
let log_last_index = self.get_log_last_index().await?;
let snapshot_index = match self.find_last_snapshot_index(log_last_index).await? {
Some(x) => x,
Some(x) => {
info!("restore state: found snapshot_index={x}");
x
}
None => 0,
};
let start_index = if snapshot_index == 0 {
Expand All @@ -73,6 +76,7 @@ impl CommandLog {
self.kern_pointer.store(start_index, Ordering::SeqCst);
self.user_pointer.store(start_index, Ordering::SeqCst);

info!("restore state: commit_index={start_index}");
Ok(())
}
}
Expand Down
1 change: 1 addition & 0 deletions tests/env/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ anyhow.workspace = true
port_check = "0.2.1"
redb.workspace = true
spin.workspace = true
tempfile = "3.13.0"
tokio.workspace = true
tonic.workspace = true
tracing.workspace = true
Expand Down
100 changes: 89 additions & 11 deletions tests/env/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::Result;
use core::error;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Once;
use std::time::Duration;
use tempfile::NamedTempFile;
use tonic::codegen::CompressionEncoding;
use tonic::transport::{Channel, Endpoint, Uri};
use tracing::info;
use tracing::{error, info};

static INIT: Once = Once::new();

Expand All @@ -13,29 +16,50 @@ struct Node {
abort_tx0: Option<tokio::sync::oneshot::Sender<()>>,
}
impl Node {
pub fn new(id: u8, port: u16, n_shards: u32) -> Result<Self> {
pub fn new(
id: u8,
port: u16,
n_shards: u32,
pstate: Option<Arc<PersistentState>>,
) -> Result<Self> {
let nd_tag = format!("ND{port}>");
let (tx, rx) = tokio::sync::oneshot::channel();

let svc_task = async move {
info!("add (id={id})");
info!("env: add (id={id})");

let node_id = {
let address = format!("http://127.0.0.1:{port}");
address.parse().unwrap()
};
let node = sorock::RaftNode::new(node_id);

info!("env: create db");
let db = {
let mem = redb::backends::InMemoryBackend::new();
let db = redb::Database::builder().create_with_backend(mem).unwrap();
let db = match &pstate {
Some(file) => match redb::Database::create(file.log_file.path()) {
Ok(x) => x,
Err(e) => {
error!("failed to create db: {:?}", e);
panic!()
}
},
None => {
let mem = redb::backends::InMemoryBackend::new();
redb::Database::builder().create_with_backend(mem).unwrap()
}
};
sorock::backend::redb::Backend::new(db)
};
info!("env: db created");

for shard_id in 0..n_shards {
let state = pstate
.as_ref()
.map(|env| env.snapshot_files[shard_id as usize].path());
let (log, ballot) = db.get(shard_id).unwrap();
let driver = node.get_driver(shard_id);
let process = testapp::raft_process::new(log, ballot, driver)
let process = testapp::raft_process::new(state, log, ballot, driver)
.await
.unwrap();
node.attach_process(shard_id, process);
Expand All @@ -57,7 +81,7 @@ impl Node {
.add_service(reflection_svc)
.add_service(ping_svc)
.serve_with_shutdown(socket, async {
info!("remove (id={id})");
info!("env: remove (id={id})");
rx.await.ok();
})
.await
Expand Down Expand Up @@ -93,12 +117,54 @@ impl Drop for Node {
tx.send(()).ok();
}
}

struct PersistentState {
log_file: NamedTempFile,
snapshot_files: Vec<NamedTempFile>,
}
impl PersistentState {
fn new(n_shards: u32) -> Self {
let log_file = NamedTempFile::new().unwrap();
let mut snapshot_files = vec![];
for _ in 0..n_shards {
let snapshot_file = NamedTempFile::new().unwrap();
snapshot_files.push(snapshot_file);
}
Self {
log_file,
snapshot_files,
}
}
}

struct PersistentEnv {
files: HashMap<u8, Arc<PersistentState>>,
n_shards: u32,
}
impl PersistentEnv {
fn new(n_shards: u32) -> Self {
Self {
files: HashMap::new(),
n_shards,
}
}
fn get(&mut self, id: u8) -> Arc<PersistentState> {
self.files
.entry(id)
.or_insert_with(|| Arc::new(PersistentState::new(self.n_shards)))
.clone()
}
}

pub struct Env {
n_shards: u32,
allocated_ports: HashMap<u8, u16>,
nodes: HashMap<u8, Node>,
conn_cache: spin::Mutex<HashMap<u8, Channel>>,
penv: Option<PersistentEnv>,
}
impl Env {
pub fn new(with_logging: bool) -> Self {
pub fn new(n_shards: u32, with_persistency: bool, with_logging: bool) -> Self {
INIT.call_once(|| {
// On terminating the tokio runtime,
// flooding stack traces are printed and they are super noisy.
Expand All @@ -113,15 +179,27 @@ impl Env {
tracing_subscriber::fmt().event_format(format).init();
}
});
let penv = if with_persistency {
Some(PersistentEnv::new(n_shards))
} else {
None
};
Self {
n_shards,
nodes: HashMap::new(),
allocated_ports: HashMap::new(),
conn_cache: spin::Mutex::new(HashMap::new()),
penv,
}
}

pub fn add_node(&mut self, id: u8, n_shards: u32) {
let free_port = port_check::free_local_ipv4_port().unwrap();
let node = Node::new(id, free_port, n_shards).unwrap();
pub fn add_node(&mut self, id: u8) {
let free_port = *self
.allocated_ports
.entry(id)
.or_insert_with(|| port_check::free_local_ipv4_port().unwrap());
let snap_states = self.penv.as_mut().map(|env| env.get(id));
let node = Node::new(id, free_port, self.n_shards, snap_states).unwrap();
port_check::is_port_reachable_with_timeout(
node.address().to_string(),
Duration::from_secs(5),
Expand Down
16 changes: 8 additions & 8 deletions tests/env/tests/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use anyhow::Result;

#[tokio::test(flavor = "multi_thread")]
async fn create() -> Result<()> {
let mut env = env::Env::new(true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn create_remove() -> Result<()> {
let mut env = env::Env::new(true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;

let mut cli = env.connect_ping_client(0).await?;
Expand All @@ -24,8 +24,8 @@ async fn create_remove() -> Result<()> {

#[tokio::test(flavor = "multi_thread")]
async fn panic_loop() -> Result<()> {
let mut env = env::Env::new(true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;

for _ in 0..10 {
Expand All @@ -39,8 +39,8 @@ async fn panic_loop() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
async fn drop_env() -> Result<()> {
for _ in 0..10 {
let mut env = env::Env::new(true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;
}

Expand Down
27 changes: 27 additions & 0 deletions tests/env/tests/persistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use anyhow::Result;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn create() -> Result<()> {
let mut env = env::Env::new(1, true, true);
env.add_node(0);
env.check_connectivity(0).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn create_remove() -> Result<()> {
let mut env = env::Env::new(1, true, true);
env.add_node(0);
env.check_connectivity(0).await?;
let addr0 = env.address(0);
env.remove_node(0);
tokio::time::sleep(Duration::from_secs(1)).await;

env.add_node(0);
env.check_connectivity(0).await?;
let addr1 = env.address(0);

assert_eq!(addr0, addr1);
Ok(())
}
17 changes: 14 additions & 3 deletions tests/sorock-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,22 @@ use env::Env;
use sorock::service::raft::client::*;

pub struct Builder {
with_persistency: bool,
with_logging: bool,
}
impl Builder {
fn new() -> Self {
Self { with_logging: true }
Self {
with_persistency: false,
with_logging: true,
}
}

pub fn with_persistency(self, b: bool) -> Self {
Self {
with_persistency: b,
..self
}
}

pub fn with_logging(self, b: bool) -> Self {
Expand All @@ -20,9 +31,9 @@ impl Builder {
pub async fn build(self, n: u8, p: u32) -> Result<Cluster> {
ensure!(n > 0);
ensure!(p > 0);
let mut env = Env::new(self.with_logging);
let mut env = Env::new(p, self.with_persistency, self.with_logging);
for id in 0..n {
env.add_node(id, p);
env.add_node(id);
env.check_connectivity(0).await?;
}
Ok(Cluster { env })
Expand Down
41 changes: 41 additions & 0 deletions tests/sorock-tests/tests/6_persistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use anyhow::Result;
use rand::Rng;
use sorock_tests::*;
use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn n3_restore() -> Result<()> {
let mut cluster = Cluster::builder()
.with_persistency(true)
.build(3, 1)
.await?;
cluster.add_server(0, 0, 0).await?;
cluster.add_server(0, 0, 1).await?;
cluster.add_server(0, 1, 2).await?;

let mut cur_state = 0;
for i in 0..10 {
let add_v = rand::thread_rng().gen_range(1..=9);
let old_v = cluster.user(0).fetch_add(0, add_v).await?;
assert_eq!(old_v, cur_state);
cur_state += add_v;

if i == 5 {
cluster.user(0).make_snapshot(0).await?;
cluster.user(1).make_snapshot(0).await?;
}
}

cluster.env().remove_node(0);
cluster.env().remove_node(1);
cluster.env().remove_node(2);
tokio::time::sleep(Duration::from_secs(1)).await;

cluster.env().add_node(0);
cluster.env().add_node(1);
// Wait for election.
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(cluster.user(1).read(0).await?, cur_state);

Ok(())
}
Loading

0 comments on commit e972d63

Please sign in to comment.