Skip to content

Commit

Permalink
upd
Browse files Browse the repository at this point in the history
Signed-off-by: Akira Hayakawa <[email protected]>
  • Loading branch information
akiradeveloper committed Oct 21, 2024
1 parent 6fc975b commit 0175462
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 59 deletions.
14 changes: 12 additions & 2 deletions tests/env/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl Node {

info!("env: create db");
let db = {
let db = match pstate {
let db = match &pstate {
Some(file) => match redb::Database::create(file.log_file.path()) {
Ok(x) => x,
Err(e) => {
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Env {
.entry(id)
.or_insert_with(|| port_check::free_local_ipv4_port().unwrap());
let snap_states = self.penv.as_mut().map(|env| env.get(id));
let node = Node::new(id, free_port, n_shards, file0).unwrap();
let node = Node::new(id, free_port, self.n_shards, snap_states).unwrap();
port_check::is_port_reachable_with_timeout(
node.address().to_string(),
Duration::from_secs(5),
Expand Down Expand Up @@ -260,3 +260,13 @@ impl Env {
anyhow::bail!("failed to connect to id={}", id);
}
}

#[cfg(test)]
mod tests {
#[test]
fn test_temp_file() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let db1 = redb::Database::create(tmp.path()).unwrap();
let db2 = redb::Database::create(tmp.path()).unwrap();
}
}
10 changes: 5 additions & 5 deletions tests/env/tests/persistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,22 @@ use std::time::Duration;

#[tokio::test(flavor = "multi_thread")]
async fn create() -> Result<()> {
let mut env = env::Env::new(true, true);
env.add_node(0, 1);
let mut env = env::Env::new(1, true, true);
env.add_node(0);
env.check_connectivity(0).await?;
Ok(())
}

#[tokio::test(flavor = "multi_thread")]
async fn create_remove() -> Result<()> {
let mut env = env::Env::new(true, true);
env.add_node(0, 1);
let mut env = env::Env::new(1, true, true);
env.add_node(0);
env.check_connectivity(0).await?;
let addr0 = env.address(0);
env.remove_node(0);
tokio::time::sleep(Duration::from_secs(1)).await;

env.add_node(0, 1);
env.add_node(0);
env.check_connectivity(0).await?;
let addr1 = env.address(0);

Expand Down
4 changes: 2 additions & 2 deletions tests/sorock-tests/tests/6_persistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ async fn n3_restore() -> Result<()> {
cluster.env().remove_node(2);
tokio::time::sleep(Duration::from_secs(1)).await;

cluster.env().add_node(0, 1);
cluster.env().add_node(1, 1);
cluster.env().add_node(0);
cluster.env().add_node(1);
// Wait for election.
tokio::time::sleep(Duration::from_secs(5)).await;
assert_eq!(cluster.user(1).read(0).await?, cur_state);
Expand Down
3 changes: 2 additions & 1 deletion tests/testapp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ tracing.workspace = true
uuid.workspace = true

sorock.workspace = true
serde_json = "1.0.132"

[build-dependencies]
protox.workspace = true
tonic-build.workspace = true
tonic-build.workspace = true
96 changes: 47 additions & 49 deletions tests/testapp/src/raft_process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::ensure;
use bytes::Bytes;
use futures::TryStreamExt;
use sorock::process::*;
use spin::RwLock;
use spin::{Mutex, RwLock};
use std::collections::BTreeMap;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
Expand Down Expand Up @@ -49,11 +49,12 @@ impl SnapshotTable {
fn insert(&mut self, idx: u64, state: AppState) {
self.inner.insert(idx, state);
}
fn get(&self, idx: &u64) -> Option<AppState> {
fn get(&self, idx: &u64) -> Option<&AppState> {
self.inner.get(idx)
}
fn split_off(&mut self, idx: &u64) -> BTreeMap<u64, AppState> {
self.inner.split_off(idx)
fn delete_before(&mut self, idx: &u64) {
let latter = self.inner.split_off(idx);
self.inner = latter;
}
fn contains_key(&self, idx: &u64) -> bool {
self.inner.contains_key(idx)
Expand Down Expand Up @@ -85,60 +86,66 @@ impl Snapshots {
};
Self { inner }
}
fn put(&mut self, idx: u64, state: AppState) {
fn contains_key(&self, idx: &u64) -> bool {
match &self.inner {
SnapshotsInner::Mem(m) => m.contains_key(idx),
SnapshotsInner::Disk(f) => {
let data: SnapshotTable = serde_json::from_reader(f).unwrap();
data.contains_key(idx)
}
}
}
fn insert(&mut self, idx: u64, state: AppState) {
match &mut self.inner {
SnapshotsInner::Mem(m) => {
m.insert(idx, state);
}
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let bytes = state.serialize();
f.write_all(&bytes).unwrap();
let mut snap: SnapshotTable = {
let mut data = vec![];
f.read_to_end(&mut data);
serde_json::from_slice(&data).unwrap()
};
snap.insert(idx, state);
let data = serde_json::to_vec(&snap).unwrap();
f.write_all(&data).unwrap();
}
}
}
fn get(&self, idx: &u64) -> Option<AppState> {
match &self.inner {
SnapshotsInner::Mem(m) => m.get(idx),
SnapshotsInner::Mem(m) => m.get(idx).cloned(),
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let mut buf = vec![];
f.read_to_end(&mut buf).unwrap();
Some(AppState::deserialize(&buf))
let data: SnapshotTable = serde_json::from_reader(f).unwrap();
data.get(idx).cloned()
}
}
}
fn split_off(&mut self, idx: &u64) -> BTreeMap<u64, AppState> {
fn delete_before(&mut self, idx: &u64) {
match &mut self.inner {
SnapshotsInner::Mem(m) => m.split_off(idx),
SnapshotsInner::Mem(m) => {
m.delete_before(idx);
}
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let mut buf = vec![];
f.read_to_end(&mut buf).unwrap();
let mut m = BTreeMap::new();
let mut cursor = std::io::Cursor::new(&buf);
while cursor.position() < buf.len() as u64 {
let snap = AppState::deserialize(&buf);
m.insert(snap.0, snap);
}
m.split_off(idx)
let mut snap: SnapshotTable = {
let mut data = vec![];
f.read_to_end(&mut data);
serde_json::from_slice(&data).unwrap()
};

snap.delete_before(idx);

let data = serde_json::to_vec(&snap).unwrap();
f.write_all(&data).unwrap();
}
}
}
fn get_latest_snapshot(&self, idx: u64) -> u64 {
fn get_latest_snapshot(&self) -> u64 {
match &self.inner {
SnapshotsInner::Mem(m) => m.get_latest_snapshot(),
SnapshotsInner::Disk(f) => {
let mut f = f.reopen().unwrap();
let mut buf = vec![];
f.read_to_end(&mut buf).unwrap();
let mut m = BTreeMap::new();
let mut cursor = std::io::Cursor::new(&buf);
while cursor.position() < buf.len() as u64 {
let snap = AppState::deserialize(&buf);
m.insert(snap.0, snap);
}
m.get_latest_snapshot()
let snap: SnapshotTable = serde_json::from_reader(f).unwrap();
snap.get_latest_snapshot()
}
}
}
Expand Down Expand Up @@ -203,7 +210,7 @@ impl RaftApp for AppMain {

async fn install_snapshot(&self, snapshot_index: Index) -> Result<()> {
ensure!(self.snapshots.read().contains_key(&snapshot_index));
let snapshot = *self.snapshots.read().get(&snapshot_index).unwrap();
let snapshot = self.snapshots.read().get(&snapshot_index).unwrap();

let mut cur_state = self.state.write();
cur_state.state_index = snapshot_index;
Expand All @@ -220,29 +227,20 @@ impl RaftApp for AppMain {

async fn open_snapshot(&self, x: Index) -> Result<SnapshotStream> {
ensure!(self.snapshots.read().contains_key(&x));
let cur_state = *self.snapshots.read().get(&x).unwrap();
let cur_state = self.snapshots.read().get(&x).unwrap();
let snap = AppSnapshot(cur_state);
let st = snap.into_stream();
Ok(st)
}

async fn delete_snapshots_before(&self, x: Index) -> Result<()> {
let mut snapshots = self.snapshots.write();
let latter = snapshots.split_off(&x);
*snapshots = latter;
snapshots.delete_before(&x);
Ok(())
}

async fn get_latest_snapshot(&self) -> Result<Index> {
let k = {
let mut out = vec![];
let snapshots = self.snapshots.read();
for (&i, _) in snapshots.iter() {
out.push(i);
}
out.sort();
out.pop().unwrap_or(0)
};
let k = self.snapshots.read().get_latest_snapshot();
Ok(k)
}
}

0 comments on commit 0175462

Please sign in to comment.