Skip to content

Commit

Permalink
test: Implement persistent snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
akiradeveloper committed Oct 21, 2024
1 parent 6eac056 commit 6fc975b
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 28 deletions.
1 change: 1 addition & 0 deletions sorock/src/process/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl App {
if snapshot_index == 1 {
return Ok(());
}
info!("install snapshot@{snapshot_index}");
self.install_snapshot(snapshot_index).await?;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion sorock/src/process/command_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl CommandLog {
self.kern_pointer.store(start_index, Ordering::SeqCst);
self.user_pointer.store(start_index, Ordering::SeqCst);

info!("restore state: snapshot_index={start_index}");
info!("restore state: commit_index={start_index}");
Ok(())
}
}
Expand Down
53 changes: 41 additions & 12 deletions tests/env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,12 @@ struct Node {
abort_tx0: Option<tokio::sync::oneshot::Sender<()>>,
}
impl Node {
pub fn new(id: u8, port: u16, n_shards: u32, file: Option<Arc<NamedTempFile>>) -> Result<Self> {
pub fn new(
id: u8,
port: u16,
n_shards: u32,
pstate: Option<Arc<PersistentState>>,
) -> Result<Self> {
let nd_tag = format!("ND{port}>");
let (tx, rx) = tokio::sync::oneshot::channel();

Expand All @@ -31,8 +36,8 @@ impl Node {

info!("env: create db");
let db = {
let db = match file {
Some(file) => match redb::Database::create(file.path()) {
let db = match pstate {
Some(file) => match redb::Database::create(file.log_file.path()) {
Ok(x) => x,
Err(e) => {
error!("failed to create db: {:?}", e);
Expand All @@ -49,9 +54,12 @@ impl Node {
info!("env: db created");

for shard_id in 0..n_shards {
let state = pstate
.as_ref()
.map(|env| env.snapshot_file[shard_id as usize].path());
let (log, ballot) = db.get(shard_id).unwrap();
let driver = node.get_driver(shard_id);
let process = testapp::raft_process::new(log, ballot, driver)
let process = testapp::raft_process::new(state, log, ballot, driver)
.await
.unwrap();
node.attach_process(shard_id, process);
Expand Down Expand Up @@ -110,31 +118,51 @@ impl Drop for Node {
}
}

struct PersistentState {
log_file: NamedTempFile,
snapshot_file: Vec<NamedTempFile>,
}
impl PersistentState {
fn new(n_shards: u32) -> Self {
let log_file = NamedTempFile::new().unwrap();
let snapshot_file = (0..n_shards)
.map(|_| NamedTempFile::new().unwrap())
.collect();
Self {
log_file,
snapshot_file,
}
}
}

struct PersistentEnv {
files: HashMap<u8, Arc<NamedTempFile>>,
files: HashMap<u8, Arc<PersistentState>>,
n_shards: u32,
}
impl PersistentEnv {
fn new() -> Self {
fn new(n_shards: u32) -> Self {
Self {
files: HashMap::new(),
n_shards,
}
}
fn get(&mut self, id: u8) -> Arc<NamedTempFile> {
fn get(&mut self, id: u8) -> Arc<PersistentState> {
self.files
.entry(id)
.or_insert_with(|| Arc::new(NamedTempFile::new().unwrap()))
.or_insert_with(|| Arc::new(PersistentState::new(self.n_shards)))
.clone()
}
}

pub struct Env {
n_shards: u32,
allocated_ports: HashMap<u8, u16>,
nodes: HashMap<u8, Node>,
conn_cache: spin::Mutex<HashMap<u8, Channel>>,
penv: Option<PersistentEnv>,
}
impl Env {
pub fn new(with_persistency: bool, with_logging: bool) -> Self {
pub fn new(n_shards: u32, with_persistency: bool, with_logging: bool) -> Self {
INIT.call_once(|| {
// On terminating the tokio runtime,
// flooding stack traces are printed and they are super noisy.
Expand All @@ -150,24 +178,25 @@ impl Env {
}
});
let penv = if with_persistency {
Some(PersistentEnv::new())
Some(PersistentEnv::new(n_shards))
} else {
None
};
Self {
n_shards,
nodes: HashMap::new(),
allocated_ports: HashMap::new(),
conn_cache: spin::Mutex::new(HashMap::new()),
penv,
}
}

pub fn add_node(&mut self, id: u8, n_shards: u32) {
pub fn add_node(&mut self, id: u8) {
let free_port = *self
.allocated_ports
.entry(id)
.or_insert_with(|| port_check::free_local_ipv4_port().unwrap());
let file0 = self.penv.as_mut().map(|env| env.get(id));
let snap_states = self.penv.as_mut().map(|env| env.get(id));
let node = Node::new(id, free_port, n_shards, file0).unwrap();
port_check::is_port_reachable_with_timeout(
node.address().to_string(),
Expand Down
16 changes: 8 additions & 8 deletions tests/env/tests/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use anyhow::Result;

#[tokio::test(flavor = "multi_thread")]
async fn create() -> Result<()> {
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn create_remove() -> Result<()> {
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;

let mut cli = env.connect_ping_client(0).await?;
Expand All @@ -24,8 +24,8 @@ async fn create_remove() -> Result<()> {

#[tokio::test(flavor = "multi_thread")]
async fn panic_loop() -> Result<()> {
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;

for _ in 0..10 {
Expand All @@ -39,8 +39,8 @@ async fn panic_loop() -> Result<()> {
#[tokio::test(flavor = "multi_thread")]
async fn drop_env() -> Result<()> {
for _ in 0..10 {
let mut env = env::Env::new(false, true);
env.add_node(0, 1);
let mut env = env::Env::new(1, false, true);
env.add_node(0);
env.check_connectivity(0).await?;
}

Expand Down
4 changes: 2 additions & 2 deletions tests/sorock-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ impl Builder {
pub async fn build(self, n: u8, p: u32) -> Result<Cluster> {
ensure!(n > 0);
ensure!(p > 0);
let mut env = Env::new(self.with_persistency, self.with_logging);
let mut env = Env::new(p, self.with_persistency, self.with_logging);
for id in 0..n {
env.add_node(id, p);
env.add_node(id);
env.check_connectivity(0).await?;
}
Ok(Cluster { env })
Expand Down
7 changes: 6 additions & 1 deletion tests/sorock-tests/tests/6_persistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,16 @@ async fn n3_restore() -> Result<()> {
cluster.add_server(0, 1, 2).await?;

let mut cur_state = 0;
for _ in 0..10 {
for i in 0..10 {
let add_v = rand::thread_rng().gen_range(1..=9);
let old_v = cluster.user(0).fetch_add(0, add_v).await?;
assert_eq!(old_v, cur_state);
cur_state += add_v;

if i == 5 {
cluster.user(0).make_snapshot(0).await?;
cluster.user(1).make_snapshot(0).await?;
}
}

cluster.env().remove_node(0);
Expand Down
1 change: 1 addition & 0 deletions tests/testapp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bytes.workspace = true
futures.workspace = true
serde = { version = "1.0", features = ["derive"] }
spin = "0.9"
tempfile = "3.13.0"
tokio = { version = "1", features = ["full", "tracing"] }
tokio-retry = "0.3"
tokio-util = "0.7"
Expand Down
116 changes: 112 additions & 4 deletions tests/testapp/src/raft_process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,19 @@ use futures::TryStreamExt;
use sorock::process::*;
use spin::RwLock;
use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::Path;

mod snapshot_io;

pub async fn new(
snap_file: Option<impl AsRef<Path>>,
log: impl RaftLogStore,
ballot: impl RaftBallotStore,
driver: sorock::RaftDriver,
) -> Result<RaftProcess> {
let app_main = AppMain::new();
let app_main = AppMain::new(snap_file);
let process = RaftProcess::new(app_main, log, ballot, driver).await?;
Ok(process)
}
Expand All @@ -37,21 +41,125 @@ impl AppSnapshot {
}
}

#[derive(serde::Serialize, serde::Deserialize)]
struct SnapshotTable {
inner: BTreeMap<u64, AppState>,
}
impl SnapshotTable {
fn insert(&mut self, idx: u64, state: AppState) {
self.inner.insert(idx, state);
}
fn get(&self, idx: &u64) -> Option<AppState> {
self.inner.get(idx)
}
fn split_off(&mut self, idx: &u64) -> BTreeMap<u64, AppState> {
self.inner.split_off(idx)
}
fn contains_key(&self, idx: &u64) -> bool {
self.inner.contains_key(idx)
}
fn get_latest_snapshot(&self) -> u64 {
let mut out = vec![];
for (&i, _) in &self.inner {
out.push(i);
}
out.sort();
out.pop().unwrap_or(0)
}
}

enum SnapshotsInner {
Mem(SnapshotTable),
Disk(File),
}
struct Snapshots {
inner: SnapshotsInner,
}
impl Snapshots {
fn new(file: Option<File>) -> Self {
let inner = match file {
Some(f) => SnapshotsInner::Disk(f),
None => SnapshotsInner::Mem(SnapshotTable {
inner: BTreeMap::new(),
}),
};
Self { inner }
}
fn put(&mut self, idx: u64, state: AppState) {
match &mut self.inner {
SnapshotsInner::Mem(m) => {
m.insert(idx, state);
}
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let bytes = state.serialize();
f.write_all(&bytes).unwrap();
}
}
}
fn get(&self, idx: &u64) -> Option<AppState> {
match &self.inner {
SnapshotsInner::Mem(m) => m.get(idx),
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let mut buf = vec![];
f.read_to_end(&mut buf).unwrap();
Some(AppState::deserialize(&buf))
}
}
}
fn split_off(&mut self, idx: &u64) -> BTreeMap<u64, AppState> {
match &mut self.inner {
SnapshotsInner::Mem(m) => m.split_off(idx),
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let mut buf = vec![];
f.read_to_end(&mut buf).unwrap();
let mut m = BTreeMap::new();
let mut cursor = std::io::Cursor::new(&buf);
while cursor.position() < buf.len() as u64 {
let snap = AppState::deserialize(&buf);
m.insert(snap.0, snap);
}
m.split_off(idx)
}
}
}
fn get_latest_snapshot(&self, idx: u64) -> u64 {
match &self.inner {
SnapshotsInner::Mem(m) => m.get_latest_snapshot(),
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let mut buf = vec![];
f.read_to_end(&mut buf).unwrap();
let mut m = BTreeMap::new();
let mut cursor = std::io::Cursor::new(&buf);
while cursor.position() < buf.len() as u64 {
let snap = AppState::deserialize(&buf);
m.insert(snap.0, snap);
}
m.get_latest_snapshot()
}
}
}
}

struct InnerState {
state_index: u64,
counter: u64,
}
struct AppMain {
state: RwLock<InnerState>,
snapshots: RwLock<BTreeMap<u64, AppState>>,
snapshots: RwLock<Snapshots>,
}
impl AppMain {
pub fn new() -> Self {
pub fn new(snap_file: Option<impl AsRef<Path>>) -> Self {
let init_state = InnerState {
state_index: 0,
counter: 0,
};
let snapshots = BTreeMap::new();
let file = snap_file.map(|f| File::open(f).unwrap());
let snapshots = Snapshots::new(file);
Self {
state: RwLock::new(init_state),
snapshots: RwLock::new(snapshots),
Expand Down

0 comments on commit 6fc975b

Please sign in to comment.