From 274852c802ddd56a6b79f1f88af0ee2da0bc3422 Mon Sep 17 00:00:00 2001 From: OnlyF0uR <29165327+OnlyF0uR@users.noreply.github.com> Date: Tue, 5 Nov 2024 20:28:15 +0100 Subject: [PATCH] feat: DAG --- Cargo.lock | 9 ++ Cargo.toml | 3 + README.md | 1 + crates/cesium-material/src/keys.rs | 8 +- crates/cesium-proc/Cargo.toml | 11 ++ crates/cesium-proc/src/gossip/mod.rs | 1 + crates/cesium-proc/src/graph/dag.rs | 200 +++++++++++++++++++++++++ crates/cesium-proc/src/graph/errors.rs | 33 ++++ crates/cesium-proc/src/graph/mod.rs | 3 + crates/cesium-proc/src/graph/node.rs | 19 +++ crates/cesium-proc/src/lib.rs | 2 + 11 files changed, 288 insertions(+), 2 deletions(-) create mode 100644 crates/cesium-proc/Cargo.toml create mode 100644 crates/cesium-proc/src/gossip/mod.rs create mode 100644 crates/cesium-proc/src/graph/dag.rs create mode 100644 crates/cesium-proc/src/graph/errors.rs create mode 100644 crates/cesium-proc/src/graph/mod.rs create mode 100644 crates/cesium-proc/src/graph/node.rs create mode 100644 crates/cesium-proc/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 92c9f3a..dc07ccd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -340,6 +340,15 @@ dependencies = [ name = "cesium-node" version = "0.2.0" +[[package]] +name = "cesium-proc" +version = "0.2.0" +dependencies = [ + "cesium-material", + "dashmap", + "tokio", +] + [[package]] name = "cesium-state" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 2853d8e..1e4a662 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "crates/cesium-mempool", "crates/cesium-state", "crates/cesium-econ", + "crates/cesium-proc", "crates/selenide-sdk", "crates/selenide-sdk-macros", "crates/selenide-wasm", @@ -56,6 +57,7 @@ serde = { version = "1.0.213", features = ["derive"] } wasmer = { version = "5.0.0", features = ["singlepass"] } wasmer-middlewares = "5.0.0" clap = { version = "4.5.20", features = ["derive"]} +dashmap = "6.1.0" # Inner libs cesium-storage = { path = "crates/cesium-storage" } cesium-material = { path = "crates/cesium-material" } @@ -63,6 +65,7 @@ cesium-mempool = { path = "crates/cesium-mempool" } cesium-state = { path = "crates/cesium-state" } cesium-econ = { path = "crates/cesium-econ" } cesium-runtime = { path = "crates/cesium-runtime" } +cesium-proc = { path = "crates/cesium-proc" } selenide-sdk = { path = "crates/selenide-sdk" } selenide-sdk-macros = { path = "crates/selenide-sdk-macros" } selenide-wasm = { path = "crates/selenide-wasm" } \ No newline at end of file diff --git a/README.md b/README.md index 1c4328f..a9eaf8f 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,4 @@ ### Dependencies - LLVM (for compiling rocksdb) +- Emscripten (optional, for compiling C to wasm) diff --git a/crates/cesium-material/src/keys.rs b/crates/cesium-material/src/keys.rs index 65f30c8..5460650 100644 --- a/crates/cesium-material/src/keys.rs +++ b/crates/cesium-material/src/keys.rs @@ -153,7 +153,9 @@ pub fn sig_byte_len(msg_len: usize) -> usize { SIG_BYTE_LEN + msg_len } -fn slice_to_array_48(slice: &[T]) -> Result<&[T; 48], Box> { +pub fn slice_to_array_48( + slice: &[T], +) -> Result<&[T; 48], Box> { if slice.len() == 48 { Ok(unsafe { &*(slice.as_ptr() as *const [T; 48]) }) } else { @@ -161,7 +163,9 @@ fn slice_to_array_48(slice: &[T]) -> Result<&[T; 48], Box(slice: &[T]) -> Result<&[T; 96], Box> { +pub fn slice_to_array_96( + slice: &[T], +) -> Result<&[T; 96], Box> { if slice.len() == 96 { Ok(unsafe { &*(slice.as_ptr() as *const [T; 96]) }) } else { diff --git a/crates/cesium-proc/Cargo.toml b/crates/cesium-proc/Cargo.toml new file mode 100644 index 0000000..e40303d --- /dev/null +++ b/crates/cesium-proc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "cesium-proc" +edition = "2021" +version.workspace = true +license = "GPL-3.0" +publish = false + +[dependencies] +tokio.workspace = true +cesium-material.workspace = true +dashmap.workspace = true \ No newline at end of file diff --git a/crates/cesium-proc/src/gossip/mod.rs b/crates/cesium-proc/src/gossip/mod.rs new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/crates/cesium-proc/src/gossip/mod.rs @@ -0,0 +1 @@ + diff --git a/crates/cesium-proc/src/graph/dag.rs b/crates/cesium-proc/src/graph/dag.rs new file mode 100644 index 0000000..9c939b5 --- /dev/null +++ b/crates/cesium-proc/src/graph/dag.rs @@ -0,0 +1,200 @@ +use cesium_material::{ + id::generate_id, + keys::{slice_to_array_48, SIG_BYTE_LEN}, +}; +use dashmap::DashMap; +use std::sync::Arc; + +use super::{ + errors::GraphError, + node::{GraphNode, NodeInput}, +}; + +pub struct Graph { + nodes: Arc>>, + pack_iv_count: usize, + pack_min_conf: u32, + pack_proportion: f32, +} + +impl Graph { + pub fn new(pack_iv_count: usize, pack_min_conf: u32, pack_proportion: f32) -> Self { + Self { + nodes: Arc::new(DashMap::new()), + pack_iv_count, + pack_min_conf, + pack_proportion, + } + } + + pub fn default() -> Self { + Self::new(2500, 5, 0.45) + } + + // The minimal amount of nodes required to kick off the graph is + pub async fn add_genesis(&self, input: &NodeInput) -> Result<(), GraphError> { + if let Err(e) = self.validate_item(input) { + return Err(e); + } + + // Compute random node_id of 48 characters + let node_id = generate_id(); + let node_id = match slice_to_array_48(node_id.as_slice()) { + Ok(id) => *id, + // This should never happen + Err(_) => return Err(GraphError::InvalidNodeId), + }; + + let node = GraphNode { + id: node_id, + instructions: input.instructions.clone(), + prev_nodes: vec![], + confirmations: 0.into(), + }; + let node_arc: Arc = Arc::new(node); + + // Add node to the graph + self.nodes.insert(node_id, node_arc); + Ok(()) + } + + pub async fn add_item(&self, input: &NodeInput) -> Result<(), GraphError> { + if let Err(e) = self.validate_item(input) { + return Err(e); + } + + // Compute random node_id of 48 characters + let node_id = generate_id(); + let node_id = match slice_to_array_48(node_id.as_slice()) { + Ok(id) => *id, + // This should never happen + Err(_) => return Err(GraphError::InvalidNodeId), + }; + + let ref_nodes = self.get_pending_nodes().await; + if ref_nodes.is_empty() { + return Err(GraphError::MissingGenesisNode); + } + + for ref_node in &ref_nodes { + self.validate_node(ref_node).await?; + } + + let node = GraphNode { + id: node_id, + instructions: input.instructions.clone(), + prev_nodes: ref_nodes.iter().map(|n| n.id).collect(), + confirmations: 0.into(), + }; + let node_arc = Arc::new(node); + + // Add node to the graph + self.nodes.insert(node_id, node_arc); + + // if nodes length + if self.nodes.len() >= self.pack_iv_count { + self.pack_history().await?; + } + + Ok(()) + } + + pub fn set_interval_count(&mut self, count: usize) { + self.pack_iv_count = count; + } + + pub fn set_min_confirmations(&mut self, count: u32) { + self.pack_min_conf = count; + } + + pub fn set_proportion(&mut self, proportion: f32) { + self.pack_proportion = proportion; + } + + async fn pack_history(&self) -> Result<(), GraphError> { + // Get all nodes with 5 or more confirmations + let nodes = self.get_packable_nodes().await; + println!("Nodes to pack: {:?}", nodes); + + // TODO: This + // This function will get the 45% of nodes with the most confirmations provided + // they have 5 or more confirmations, then writes them to rocksdb and removes them + // from memory + Ok(()) + } + + async fn get_packable_nodes(&self) -> Vec> { + let packable_count = (self.nodes.len() as f32 * self.pack_proportion).ceil() as usize; + self.get_nodes_with_sorting(true, packable_count).await + } + + async fn get_pending_nodes(&self) -> Vec> { + self.get_nodes_with_sorting(false, 5).await + } + + fn validate_item(&self, input: &NodeInput) -> Result<(), GraphError> { + // Some very basic validation + if input.instructions.is_empty() { + return Err(GraphError::InvalidNodeInput); + } + + if input.digest.len() < SIG_BYTE_LEN { + return Err(GraphError::InvalidNodeInput); + } + + // TODO: Signature check + + // TODO: Instruction validity check (balances, enough reserved gas, etc.) + + Ok(()) + } + + async fn validate_node(&self, node: &GraphNode) -> Result<(), GraphError> { + // TODO: Validate the current node + + // Get a read lock on the node's previous nodes + let prev_nodes: Vec> = { + node.prev_nodes + .iter() + .filter_map(|id| self.nodes.get(id)) + .map(|node| node.value().clone()) + .collect() + }; + + // Acquire write locks on the confirmations of the previous nodes + let mut prev_nodes_confirmation_locks = Vec::with_capacity(prev_nodes.len()); + for prev_node in &prev_nodes { + prev_nodes_confirmation_locks.push(prev_node.confirmations.write().await); + } + + // Update the confirmations of the previous nodes + for mut confirmations_lock in prev_nodes_confirmation_locks { + *confirmations_lock += 1; + } + + Ok(()) + } + + async fn get_nodes_with_sorting(&self, take_top: bool, limit: usize) -> Vec> { + // Preallocate vector with known capacity + let mut nodes_with_confirmations = Vec::with_capacity(self.nodes.len()); + + // Collect nodes and confirmations + for node in self.nodes.iter() { + let confirmations = *node.confirmations.read().await; + nodes_with_confirmations.push((node.clone(), confirmations)); + } + + // Sort by confirmation count + nodes_with_confirmations.sort_by(|a, b| a.1.cmp(&b.1)); + + // Take from either end of the sorted list depending on take_top + let iter: Box, u32)>> = if take_top { + Box::new(nodes_with_confirmations.iter().rev().take(limit)) + } else { + Box::new(nodes_with_confirmations.iter().take(limit)) + }; + + iter.map(|(node, _)| node.clone()).collect() + } +} diff --git a/crates/cesium-proc/src/graph/errors.rs b/crates/cesium-proc/src/graph/errors.rs new file mode 100644 index 0000000..663ab2f --- /dev/null +++ b/crates/cesium-proc/src/graph/errors.rs @@ -0,0 +1,33 @@ +use std::{error::Error, fmt}; + +#[derive(Debug)] +pub enum GraphError { + MissingGenesisNode, + InvalidNodeInput, + InvalidNodeId, + ReferenceNodeMismatch, +} + +// Implement Display for custom error formatting +impl fmt::Display for GraphError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + GraphError::MissingGenesisNode => write!(f, "Missing genesis node"), + GraphError::InvalidNodeInput => write!(f, "Invalid node input"), + GraphError::InvalidNodeId => write!(f, "Invalid node id"), + GraphError::ReferenceNodeMismatch => write!(f, "Reference node mismatch"), + } + } +} + +// Implement the Error trait for custom error handling +impl Error for GraphError { + fn source(&self) -> Option<&(dyn Error + 'static)> { + match *self { + GraphError::MissingGenesisNode => None, + GraphError::InvalidNodeInput => None, + GraphError::InvalidNodeId => None, + GraphError::ReferenceNodeMismatch => None, + } + } +} diff --git a/crates/cesium-proc/src/graph/mod.rs b/crates/cesium-proc/src/graph/mod.rs new file mode 100644 index 0000000..e7865dd --- /dev/null +++ b/crates/cesium-proc/src/graph/mod.rs @@ -0,0 +1,3 @@ +pub mod dag; +pub mod errors; +pub mod node; diff --git a/crates/cesium-proc/src/graph/node.rs b/crates/cesium-proc/src/graph/node.rs new file mode 100644 index 0000000..d243359 --- /dev/null +++ b/crates/cesium-proc/src/graph/node.rs @@ -0,0 +1,19 @@ +use tokio::sync::RwLock; + +#[derive(Debug)] +pub struct NodeInput { + pub instructions: Vec>, + pub digest: Vec, + pub reserved_gas: u128, + pub priority_fee: u128, +} + +pub type NodeId = [u8; 48]; + +#[derive(Debug)] +pub struct GraphNode { + pub id: NodeId, + pub instructions: Vec>, + pub prev_nodes: Vec, + pub confirmations: RwLock, +} diff --git a/crates/cesium-proc/src/lib.rs b/crates/cesium-proc/src/lib.rs new file mode 100644 index 0000000..5c1abb3 --- /dev/null +++ b/crates/cesium-proc/src/lib.rs @@ -0,0 +1,2 @@ +pub mod gossip; +pub mod graph;