Skip to content

Commit

Permalink
feat: DAG
Browse files Browse the repository at this point in the history
  • Loading branch information
OnlyF0uR committed Nov 5, 2024
1 parent a518ad8 commit 274852c
Show file tree
Hide file tree
Showing 11 changed files with 288 additions and 2 deletions.
9 changes: 9 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"crates/cesium-mempool",
"crates/cesium-state",
"crates/cesium-econ",
"crates/cesium-proc",
"crates/selenide-sdk",
"crates/selenide-sdk-macros",
"crates/selenide-wasm",
Expand Down Expand Up @@ -56,13 +57,15 @@ serde = { version = "1.0.213", features = ["derive"] }
wasmer = { version = "5.0.0", features = ["singlepass"] }
wasmer-middlewares = "5.0.0"
clap = { version = "4.5.20", features = ["derive"]}
dashmap = "6.1.0"
# Inner libs
cesium-storage = { path = "crates/cesium-storage" }
cesium-material = { path = "crates/cesium-material" }
cesium-mempool = { path = "crates/cesium-mempool" }
cesium-state = { path = "crates/cesium-state" }
cesium-econ = { path = "crates/cesium-econ" }
cesium-runtime = { path = "crates/cesium-runtime" }
cesium-proc = { path = "crates/cesium-proc" }
selenide-sdk = { path = "crates/selenide-sdk" }
selenide-sdk-macros = { path = "crates/selenide-sdk-macros" }
selenide-wasm = { path = "crates/selenide-wasm" }
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
### Dependencies

- LLVM (for compiling rocksdb)
- Emscripten (optional, for compiling C to wasm)
8 changes: 6 additions & 2 deletions crates/cesium-material/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,19 @@ pub fn sig_byte_len(msg_len: usize) -> usize {
SIG_BYTE_LEN + msg_len
}

fn slice_to_array_48<T>(slice: &[T]) -> Result<&[T; 48], Box<dyn std::error::Error + Send + Sync>> {
pub fn slice_to_array_48<T>(
slice: &[T],
) -> Result<&[T; 48], Box<dyn std::error::Error + Send + Sync>> {
if slice.len() == 48 {
Ok(unsafe { &*(slice.as_ptr() as *const [T; 48]) })
} else {
Err("Invalid slice length".into())
}
}

fn slice_to_array_96<T>(slice: &[T]) -> Result<&[T; 96], Box<dyn std::error::Error + Send + Sync>> {
pub fn slice_to_array_96<T>(
slice: &[T],
) -> Result<&[T; 96], Box<dyn std::error::Error + Send + Sync>> {
if slice.len() == 96 {
Ok(unsafe { &*(slice.as_ptr() as *const [T; 96]) })
} else {
Expand Down
11 changes: 11 additions & 0 deletions crates/cesium-proc/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "cesium-proc"
edition = "2021"
version.workspace = true
license = "GPL-3.0"
publish = false

[dependencies]
tokio.workspace = true
cesium-material.workspace = true
dashmap.workspace = true
1 change: 1 addition & 0 deletions crates/cesium-proc/src/gossip/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

200 changes: 200 additions & 0 deletions crates/cesium-proc/src/graph/dag.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use cesium_material::{
id::generate_id,
keys::{slice_to_array_48, SIG_BYTE_LEN},
};
use dashmap::DashMap;
use std::sync::Arc;

use super::{
errors::GraphError,
node::{GraphNode, NodeInput},
};

pub struct Graph {
nodes: Arc<DashMap<[u8; 48], Arc<GraphNode>>>,
pack_iv_count: usize,
pack_min_conf: u32,
pack_proportion: f32,
}

impl Graph {
pub fn new(pack_iv_count: usize, pack_min_conf: u32, pack_proportion: f32) -> Self {
Self {
nodes: Arc::new(DashMap::new()),
pack_iv_count,
pack_min_conf,
pack_proportion,
}
}

pub fn default() -> Self {
Self::new(2500, 5, 0.45)
}

// The minimal amount of nodes required to kick off the graph is
pub async fn add_genesis(&self, input: &NodeInput) -> Result<(), GraphError> {
if let Err(e) = self.validate_item(input) {
return Err(e);
}

// Compute random node_id of 48 characters
let node_id = generate_id();
let node_id = match slice_to_array_48(node_id.as_slice()) {
Ok(id) => *id,
// This should never happen
Err(_) => return Err(GraphError::InvalidNodeId),
};

let node = GraphNode {
id: node_id,
instructions: input.instructions.clone(),
prev_nodes: vec![],
confirmations: 0.into(),
};
let node_arc: Arc<GraphNode> = Arc::new(node);

// Add node to the graph
self.nodes.insert(node_id, node_arc);
Ok(())
}

pub async fn add_item(&self, input: &NodeInput) -> Result<(), GraphError> {
if let Err(e) = self.validate_item(input) {
return Err(e);
}

// Compute random node_id of 48 characters
let node_id = generate_id();
let node_id = match slice_to_array_48(node_id.as_slice()) {
Ok(id) => *id,
// This should never happen
Err(_) => return Err(GraphError::InvalidNodeId),
};

let ref_nodes = self.get_pending_nodes().await;
if ref_nodes.is_empty() {
return Err(GraphError::MissingGenesisNode);
}

for ref_node in &ref_nodes {
self.validate_node(ref_node).await?;
}

let node = GraphNode {
id: node_id,
instructions: input.instructions.clone(),
prev_nodes: ref_nodes.iter().map(|n| n.id).collect(),
confirmations: 0.into(),
};
let node_arc = Arc::new(node);

// Add node to the graph
self.nodes.insert(node_id, node_arc);

// if nodes length
if self.nodes.len() >= self.pack_iv_count {
self.pack_history().await?;
}

Ok(())
}

pub fn set_interval_count(&mut self, count: usize) {
self.pack_iv_count = count;
}

pub fn set_min_confirmations(&mut self, count: u32) {
self.pack_min_conf = count;
}

pub fn set_proportion(&mut self, proportion: f32) {
self.pack_proportion = proportion;
}

async fn pack_history(&self) -> Result<(), GraphError> {
// Get all nodes with 5 or more confirmations
let nodes = self.get_packable_nodes().await;
println!("Nodes to pack: {:?}", nodes);

// TODO: This
// This function will get the 45% of nodes with the most confirmations provided
// they have 5 or more confirmations, then writes them to rocksdb and removes them
// from memory
Ok(())
}

async fn get_packable_nodes(&self) -> Vec<Arc<GraphNode>> {
let packable_count = (self.nodes.len() as f32 * self.pack_proportion).ceil() as usize;
self.get_nodes_with_sorting(true, packable_count).await
}

async fn get_pending_nodes(&self) -> Vec<Arc<GraphNode>> {
self.get_nodes_with_sorting(false, 5).await
}

fn validate_item(&self, input: &NodeInput) -> Result<(), GraphError> {
// Some very basic validation
if input.instructions.is_empty() {
return Err(GraphError::InvalidNodeInput);
}

if input.digest.len() < SIG_BYTE_LEN {
return Err(GraphError::InvalidNodeInput);
}

// TODO: Signature check

// TODO: Instruction validity check (balances, enough reserved gas, etc.)

Ok(())
}

async fn validate_node(&self, node: &GraphNode) -> Result<(), GraphError> {
// TODO: Validate the current node

// Get a read lock on the node's previous nodes
let prev_nodes: Vec<Arc<GraphNode>> = {
node.prev_nodes
.iter()
.filter_map(|id| self.nodes.get(id))
.map(|node| node.value().clone())
.collect()
};

// Acquire write locks on the confirmations of the previous nodes
let mut prev_nodes_confirmation_locks = Vec::with_capacity(prev_nodes.len());
for prev_node in &prev_nodes {
prev_nodes_confirmation_locks.push(prev_node.confirmations.write().await);
}

// Update the confirmations of the previous nodes
for mut confirmations_lock in prev_nodes_confirmation_locks {
*confirmations_lock += 1;
}

Ok(())
}

async fn get_nodes_with_sorting(&self, take_top: bool, limit: usize) -> Vec<Arc<GraphNode>> {
// Preallocate vector with known capacity
let mut nodes_with_confirmations = Vec::with_capacity(self.nodes.len());

// Collect nodes and confirmations
for node in self.nodes.iter() {
let confirmations = *node.confirmations.read().await;
nodes_with_confirmations.push((node.clone(), confirmations));
}

// Sort by confirmation count
nodes_with_confirmations.sort_by(|a, b| a.1.cmp(&b.1));

// Take from either end of the sorted list depending on take_top
let iter: Box<dyn Iterator<Item = &(Arc<GraphNode>, u32)>> = if take_top {
Box::new(nodes_with_confirmations.iter().rev().take(limit))
} else {
Box::new(nodes_with_confirmations.iter().take(limit))
};

iter.map(|(node, _)| node.clone()).collect()
}
}
33 changes: 33 additions & 0 deletions crates/cesium-proc/src/graph/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::{error::Error, fmt};

#[derive(Debug)]
pub enum GraphError {
MissingGenesisNode,
InvalidNodeInput,
InvalidNodeId,
ReferenceNodeMismatch,
}

// Implement Display for custom error formatting
impl fmt::Display for GraphError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match *self {
GraphError::MissingGenesisNode => write!(f, "Missing genesis node"),
GraphError::InvalidNodeInput => write!(f, "Invalid node input"),
GraphError::InvalidNodeId => write!(f, "Invalid node id"),
GraphError::ReferenceNodeMismatch => write!(f, "Reference node mismatch"),
}
}
}

// Implement the Error trait for custom error handling
impl Error for GraphError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match *self {
GraphError::MissingGenesisNode => None,
GraphError::InvalidNodeInput => None,
GraphError::InvalidNodeId => None,
GraphError::ReferenceNodeMismatch => None,
}
}
}
3 changes: 3 additions & 0 deletions crates/cesium-proc/src/graph/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod dag;
pub mod errors;
pub mod node;
19 changes: 19 additions & 0 deletions crates/cesium-proc/src/graph/node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use tokio::sync::RwLock;

#[derive(Debug)]
pub struct NodeInput {
pub instructions: Vec<Vec<u8>>,
pub digest: Vec<u8>,
pub reserved_gas: u128,
pub priority_fee: u128,
}

pub type NodeId = [u8; 48];

#[derive(Debug)]
pub struct GraphNode {
pub id: NodeId,
pub instructions: Vec<Vec<u8>>,
pub prev_nodes: Vec<NodeId>,
pub confirmations: RwLock<u32>,
}
2 changes: 2 additions & 0 deletions crates/cesium-proc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod gossip;
pub mod graph;

0 comments on commit 274852c

Please sign in to comment.