From 0175462b8cba219ce2a5428fa55441f13d7f4a19 Mon Sep 17 00:00:00 2001 From: Akira Hayakawa Date: Mon, 21 Oct 2024 19:58:35 +0900 Subject: [PATCH] upd Signed-off-by: Akira Hayakawa --- tests/env/src/lib.rs | 14 +++- tests/env/tests/persistency.rs | 10 +-- tests/sorock-tests/tests/6_persistency.rs | 4 +- tests/testapp/Cargo.toml | 3 +- tests/testapp/src/raft_process/mod.rs | 96 +++++++++++------------ 5 files changed, 68 insertions(+), 59 deletions(-) diff --git a/tests/env/src/lib.rs b/tests/env/src/lib.rs index f4443400..cb2c42a9 100644 --- a/tests/env/src/lib.rs +++ b/tests/env/src/lib.rs @@ -36,7 +36,7 @@ impl Node { info!("env: create db"); let db = { - let db = match pstate { + let db = match &pstate { Some(file) => match redb::Database::create(file.log_file.path()) { Ok(x) => x, Err(e) => { @@ -197,7 +197,7 @@ impl Env { .entry(id) .or_insert_with(|| port_check::free_local_ipv4_port().unwrap()); let snap_states = self.penv.as_mut().map(|env| env.get(id)); - let node = Node::new(id, free_port, n_shards, file0).unwrap(); + let node = Node::new(id, free_port, self.n_shards, snap_states).unwrap(); port_check::is_port_reachable_with_timeout( node.address().to_string(), Duration::from_secs(5), @@ -260,3 +260,13 @@ impl Env { anyhow::bail!("failed to connect to id={}", id); } } + +#[cfg(test)] +mod tests { + #[test] + fn test_temp_file() { + let tmp = tempfile::NamedTempFile::new().unwrap(); + let db1 = redb::Database::create(tmp.path()).unwrap(); + let db2 = redb::Database::create(tmp.path()).unwrap(); + } +} diff --git a/tests/env/tests/persistency.rs b/tests/env/tests/persistency.rs index ca087cfd..0192dcdb 100644 --- a/tests/env/tests/persistency.rs +++ b/tests/env/tests/persistency.rs @@ -3,22 +3,22 @@ use std::time::Duration; #[tokio::test(flavor = "multi_thread")] async fn create() -> Result<()> { - let mut env = env::Env::new(true, true); - env.add_node(0, 1); + let mut env = env::Env::new(1, true, true); + env.add_node(0); env.check_connectivity(0).await?; Ok(()) } #[tokio::test(flavor = "multi_thread")] async fn create_remove() -> Result<()> { - let mut env = env::Env::new(true, true); - env.add_node(0, 1); + let mut env = env::Env::new(1, true, true); + env.add_node(0); env.check_connectivity(0).await?; let addr0 = env.address(0); env.remove_node(0); tokio::time::sleep(Duration::from_secs(1)).await; - env.add_node(0, 1); + env.add_node(0); env.check_connectivity(0).await?; let addr1 = env.address(0); diff --git a/tests/sorock-tests/tests/6_persistency.rs b/tests/sorock-tests/tests/6_persistency.rs index bde2b438..b3f556ec 100644 --- a/tests/sorock-tests/tests/6_persistency.rs +++ b/tests/sorock-tests/tests/6_persistency.rs @@ -31,8 +31,8 @@ async fn n3_restore() -> Result<()> { cluster.env().remove_node(2); tokio::time::sleep(Duration::from_secs(1)).await; - cluster.env().add_node(0, 1); - cluster.env().add_node(1, 1); + cluster.env().add_node(0); + cluster.env().add_node(1); // Wait for election. tokio::time::sleep(Duration::from_secs(5)).await; assert_eq!(cluster.user(1).read(0).await?, cur_state); diff --git a/tests/testapp/Cargo.toml b/tests/testapp/Cargo.toml index 610d5555..30d41fc4 100644 --- a/tests/testapp/Cargo.toml +++ b/tests/testapp/Cargo.toml @@ -22,7 +22,8 @@ tracing.workspace = true uuid.workspace = true sorock.workspace = true +serde_json = "1.0.132" [build-dependencies] protox.workspace = true -tonic-build.workspace = true \ No newline at end of file +tonic-build.workspace = true diff --git a/tests/testapp/src/raft_process/mod.rs b/tests/testapp/src/raft_process/mod.rs index d70570d5..115d4a4e 100644 --- a/tests/testapp/src/raft_process/mod.rs +++ b/tests/testapp/src/raft_process/mod.rs @@ -4,7 +4,7 @@ use anyhow::ensure; use bytes::Bytes; use futures::TryStreamExt; use sorock::process::*; -use spin::RwLock; +use spin::{Mutex, RwLock}; use std::collections::BTreeMap; use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; @@ -49,11 +49,12 @@ impl SnapshotTable { fn insert(&mut self, idx: u64, state: AppState) { self.inner.insert(idx, state); } - fn get(&self, idx: &u64) -> Option { + fn get(&self, idx: &u64) -> Option<&AppState> { self.inner.get(idx) } - fn split_off(&mut self, idx: &u64) -> BTreeMap { - self.inner.split_off(idx) + fn delete_before(&mut self, idx: &u64) { + let latter = self.inner.split_off(idx); + self.inner = latter; } fn contains_key(&self, idx: &u64) -> bool { self.inner.contains_key(idx) @@ -85,60 +86,66 @@ impl Snapshots { }; Self { inner } } - fn put(&mut self, idx: u64, state: AppState) { + fn contains_key(&self, idx: &u64) -> bool { + match &self.inner { + SnapshotsInner::Mem(m) => m.contains_key(idx), + SnapshotsInner::Disk(f) => { + let data: SnapshotTable = serde_json::from_reader(f).unwrap(); + data.contains_key(idx) + } + } + } + fn insert(&mut self, idx: u64, state: AppState) { match &mut self.inner { SnapshotsInner::Mem(m) => { m.insert(idx, state); } SnapshotsInner::Disk(f) => { - let mut f = f.reopen().unwrap(); - let bytes = state.serialize(); - f.write_all(&bytes).unwrap(); + let mut snap: SnapshotTable = { + let mut data = vec![]; + f.read_to_end(&mut data); + serde_json::from_slice(&data).unwrap() + }; + snap.insert(idx, state); + let data = serde_json::to_vec(&snap).unwrap(); + f.write_all(&data).unwrap(); } } } fn get(&self, idx: &u64) -> Option { match &self.inner { - SnapshotsInner::Mem(m) => m.get(idx), + SnapshotsInner::Mem(m) => m.get(idx).cloned(), SnapshotsInner::Disk(f) => { - let mut f = f.reopen().unwrap(); - let mut buf = vec![]; - f.read_to_end(&mut buf).unwrap(); - Some(AppState::deserialize(&buf)) + let data: SnapshotTable = serde_json::from_reader(f).unwrap(); + data.get(idx).cloned() } } } - fn split_off(&mut self, idx: &u64) -> BTreeMap { + fn delete_before(&mut self, idx: &u64) { match &mut self.inner { - SnapshotsInner::Mem(m) => m.split_off(idx), + SnapshotsInner::Mem(m) => { + m.delete_before(idx); + } SnapshotsInner::Disk(f) => { - let mut f = f.reopen().unwrap(); - let mut buf = vec![]; - f.read_to_end(&mut buf).unwrap(); - let mut m = BTreeMap::new(); - let mut cursor = std::io::Cursor::new(&buf); - while cursor.position() < buf.len() as u64 { - let snap = AppState::deserialize(&buf); - m.insert(snap.0, snap); - } - m.split_off(idx) + let mut snap: SnapshotTable = { + let mut data = vec![]; + f.read_to_end(&mut data); + serde_json::from_slice(&data).unwrap() + }; + + snap.delete_before(idx); + + let data = serde_json::to_vec(&snap).unwrap(); + f.write_all(&data).unwrap(); } } } - fn get_latest_snapshot(&self, idx: u64) -> u64 { + fn get_latest_snapshot(&self) -> u64 { match &self.inner { SnapshotsInner::Mem(m) => m.get_latest_snapshot(), SnapshotsInner::Disk(f) => { - let mut f = f.reopen().unwrap(); - let mut buf = vec![]; - f.read_to_end(&mut buf).unwrap(); - let mut m = BTreeMap::new(); - let mut cursor = std::io::Cursor::new(&buf); - while cursor.position() < buf.len() as u64 { - let snap = AppState::deserialize(&buf); - m.insert(snap.0, snap); - } - m.get_latest_snapshot() + let snap: SnapshotTable = serde_json::from_reader(f).unwrap(); + snap.get_latest_snapshot() } } } @@ -203,7 +210,7 @@ impl RaftApp for AppMain { async fn install_snapshot(&self, snapshot_index: Index) -> Result<()> { ensure!(self.snapshots.read().contains_key(&snapshot_index)); - let snapshot = *self.snapshots.read().get(&snapshot_index).unwrap(); + let snapshot = self.snapshots.read().get(&snapshot_index).unwrap(); let mut cur_state = self.state.write(); cur_state.state_index = snapshot_index; @@ -220,7 +227,7 @@ impl RaftApp for AppMain { async fn open_snapshot(&self, x: Index) -> Result { ensure!(self.snapshots.read().contains_key(&x)); - let cur_state = *self.snapshots.read().get(&x).unwrap(); + let cur_state = self.snapshots.read().get(&x).unwrap(); let snap = AppSnapshot(cur_state); let st = snap.into_stream(); Ok(st) @@ -228,21 +235,12 @@ impl RaftApp for AppMain { async fn delete_snapshots_before(&self, x: Index) -> Result<()> { let mut snapshots = self.snapshots.write(); - let latter = snapshots.split_off(&x); - *snapshots = latter; + snapshots.delete_before(&x); Ok(()) } async fn get_latest_snapshot(&self) -> Result { - let k = { - let mut out = vec![]; - let snapshots = self.snapshots.read(); - for (&i, _) in snapshots.iter() { - out.push(i); - } - out.sort(); - out.pop().unwrap_or(0) - }; + let k = self.snapshots.read().get_latest_snapshot(); Ok(k) } }