Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Griffon <[email protected]>
  • Loading branch information
Miaxos committed Feb 28, 2024
1 parent d303659 commit 3e76d79
Show file tree
Hide file tree
Showing 19 changed files with 909 additions and 30 deletions.
368 changes: 338 additions & 30 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"

members = [
"app/roster",
"lib/lan_protocol",
]

[workspace.package]
Expand All @@ -17,10 +18,15 @@ config = "0.14"
derive_builder = "0.20"
dotenv = "0.15"
monoio = "0.2.2"
openraft = { git = "https://github.com/miaxos/openraft.git", branch = "add-monoio-runtime", features = ["monoio"] }
insta = { version = "1", features = ["yaml"] }
serde = "1"
tracing = "0.1"
tracing-subscriber = "0.3"
url = "2.5.0"
ulid = "1.1.2"

roster-lan-protocol = { version = "0.*.*", path = "./lib/lan_protocol" }

[profile.dev]
panic = "abort"
Expand Down
4 changes: 4 additions & 0 deletions app/roster/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,22 @@ futures-locks = "0.7"
indexmap = "2"
local-sync = "0.1"
monoio = { workspace = true, features = ["bytes", "sync", "iouring"] }
openraft = { workspace = true, features = ["serde", "monoio"] }
rustc-hash = "1.1.0"
scc = "2"
sharded-thread = "1"
serde.workspace = true
thiserror = "1"
url = { workspace = true, features = ["serde"] }
rand = "0.8"
zstd = "0.13"

# Logging
tracing = { workspace = true, features = ["attributes"] }
tracing-subscriber = { workspace = true, features = ["registry", "env-filter", "json"] }

roster-lan-protocol.workspace = true

[target.'cfg(windows)'.dependencies]
monoio = { workspace = true, features = ["bytes", "legacy"] }

Expand Down
32 changes: 32 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
mod node;
mod snapshot;
mod state_machine;
mod store;
use std::io::Cursor;

pub use node::LANNode;
use openraft::storage::Adaptor;
use openraft::MonoioRuntime;
use roster_lan_protocol::{
RequestEnveloppe, RequestLAN, ResponseEnveloppe, ResponseLAN,
};
use store::Store;

pub type NodeRaftID = u64;

openraft::declare_raft_types!(
pub TypeConfig: D = RequestLAN, R = ResponseLAN, NodeId = NodeRaftID, Node = LANNode,
Entry = openraft::Entry<TypeConfig>, SnapshotData = Cursor<Vec<u8>>, AsyncRuntime = MonoioRuntime
);

pub type LogStore = Adaptor<TypeConfig, Store>;
/*
pub type StateMachineStore = Adaptor<TypeConfig, Arc<Store>>;
pub type Raft =
openraft::Raft<TypeConfig, Network, LogStore, StateMachineStore>;
*/

/// [LANCluster] is the cluster where we'll distribute every hash keys between
/// roster instances
#[derive(Debug, Clone)]
pub struct LANCluster {}
35 changes: 35 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use std::fmt::Display;

/// An implementation of trait [`Node`] that contains minimal node information.
///
/// The most common usage is to store the connecting address of a node.
/// So that an application does not need an additional store to support its
/// [`RaftNetwork`](crate::RaftNetwork) implementation.
///
/// An application is also free not to use this storage and implements its own
/// node-id to address mapping.
#[derive(
Debug, Clone, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize,
)]
pub struct LANNode {
/// User defined string that represent the endpoint of the target node.
///
/// It is used by [`RaftNetwork`](crate::RaftNetwork) for connecting to
/// target node.
pub addr: String,
}

impl LANNode {
/// Creates as [`BasicNode`].
pub fn new(addr: impl ToString) -> Self {
Self {
addr: addr.to_string(),
}
}
}

impl Display for LANNode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.addr)
}
}
72 changes: 72 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/snapshot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use openraft::{
add_async_trait, BasicNode, RaftSnapshotBuilder, Snapshot, SnapshotMeta,
StorageError, StorageIOError,
};

use super::store::Store;
use super::{NodeRaftID, TypeConfig};

#[derive(Debug)]
pub struct StoredSnapshot {
pub meta: SnapshotMeta<NodeRaftID, BasicNode>,

/// The data of the state machine at the time of this snapshot.
pub data: Vec<u8>,
}

#[add_async_trait]
impl RaftSnapshotBuilder<TypeConfig> for Store {
async fn build_snapshot(
&mut self,
) -> Result<Snapshot<TypeConfig>, StorageError<NodeRaftID>> {
todo!()
/*
let data;
let last_applied_log;
let last_membership;
{
// Serialize the data of the state machine.
let state_machine = self.state_machine.read().await;
data = serde_json::to_vec(&*state_machine)
.map_err(|e| StorageIOError::read_state_machine(&e))?;
last_applied_log = state_machine.last_applied_log;
last_membership = state_machine.last_membership.clone();
}
let snapshot_idx = {
let mut l = self.snapshot_idx.lock().unwrap();
*l += 1;
*l
};
let snapshot_id = if let Some(last) = last_applied_log {
format!("{}-{}-{}", last.leader_id, last.index, snapshot_idx)
} else {
format!("--{}", snapshot_idx)
};
let meta = SnapshotMeta {
last_log_id: last_applied_log,
last_membership,
snapshot_id,
};
let snapshot = StoredSnapshot {
meta: meta.clone(),
data: data.clone(),
};
{
let mut current_snapshot = self.current_snapshot.write().await;
*current_snapshot = Some(snapshot);
}
Ok(Snapshot {
meta,
snapshot: Box::new(Cursor::new(data)),
})
*/
}
}
13 changes: 13 additions & 0 deletions app/roster/src/domain/cluster/lan_cluster/state_machine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use openraft::{LogId, StoredMembership};
use serde::{Deserialize, Serialize};

use super::{LANNode, NodeRaftID};

/// This state represents a copy of the data between each node. We have to be
/// careful with what is stored here as it'll be shared with every Node of the
/// LocalCluster.
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct StateMachine {
pub last_applied_log: Option<LogId<NodeRaftID>>,
pub last_membership: StoredMembership<NodeRaftID, LANNode>,
}
Loading

0 comments on commit 3e76d79

Please sign in to comment.