From d1db881d59b1dcc226f6baf0408ef0e33faef2e3 Mon Sep 17 00:00:00 2001 From: OnlyF0uR <29165327+OnlyF0uR@users.noreply.github.com> Date: Tue, 19 Nov 2024 18:49:46 +0100 Subject: [PATCH] feat, prelim rpc --- Cargo.lock | 5 +++ crates/cesium-nucleus/src/graph/mempool.rs | 4 ++ crates/cesium-rpc/Cargo.toml | 4 +- crates/cesium-rpc/src/lib.rs | 49 ++++++++++++++++------ validator/Cargo.toml | 5 ++- validator/src/main.rs | 25 ++++++++++- 6 files changed, 76 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5a2a5cb..8e7a080 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,7 +367,9 @@ dependencies = [ name = "cesium-rpc" version = "0.3.0" dependencies = [ + "cesium-crypto", "cesium-nebula", + "cesium-nucleus", "futures", "hex", "jsonrpsee", @@ -3044,7 +3046,10 @@ name = "validator" version = "0.3.0" dependencies = [ "cesium-crypto", + "cesium-nucleus", + "cesium-rpc", "dirs", + "tokio", ] [[package]] diff --git a/crates/cesium-nucleus/src/graph/mempool.rs b/crates/cesium-nucleus/src/graph/mempool.rs index 967e275..3887bb1 100644 --- a/crates/cesium-nucleus/src/graph/mempool.rs +++ b/crates/cesium-nucleus/src/graph/mempool.rs @@ -36,6 +36,10 @@ impl<'a> Graph<'a> { Self::new(account, 2500, 5, 0.45) } + pub fn sync(&self) { + // TODO: Sync the graph with other validators + } + // The minimal amount of nodes required to kick off the graph is pub async fn add_genesis(&self, input: &Transaction) -> Result<(), GraphError> { if let Err(e) = self.validate_item(input) { diff --git a/crates/cesium-rpc/Cargo.toml b/crates/cesium-rpc/Cargo.toml index f7e8ce7..8a633c6 100644 --- a/crates/cesium-rpc/Cargo.toml +++ b/crates/cesium-rpc/Cargo.toml @@ -11,6 +11,8 @@ futures.workspace = true jsonrpsee = { workspace = true, features = ["server", "macros"]} hex.workspace = true cesium-nebula.workspace = true +cesium-nucleus.workspace = true [dev-dependencies] -jsonrpsee = { workspace = true, features = ["http-client", "ws-client", "macros"]} \ No newline at end of file +jsonrpsee = { workspace = true, features = ["http-client", "ws-client", "macros"]} +cesium-crypto.workspace = true \ No newline at end of file diff --git a/crates/cesium-rpc/src/lib.rs b/crates/cesium-rpc/src/lib.rs index c04ca3a..a48cbf6 100644 --- a/crates/cesium-rpc/src/lib.rs +++ b/crates/cesium-rpc/src/lib.rs @@ -1,6 +1,8 @@ -use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::Mutex; use cesium_nebula::transaction::{Transaction, TransactionError}; +use cesium_nucleus::graph::mempool::Graph; use hex::FromHexError; use jsonrpsee::{ core::{async_trait, SubscriptionResult}, @@ -18,6 +20,7 @@ pub enum RpcError { HexError(hex::FromHexError), RpcError(String), TxError(TransactionError), + GraphError(cesium_nucleus::graph::errors::GraphError), } impl From for ErrorObject<'static> { @@ -27,6 +30,7 @@ impl From for ErrorObject<'static> { RpcError::HexError(e) => ErrorObject::owned(2, "Hex Error", Some(e.to_string())), RpcError::RpcError(e) => ErrorObject::owned(2, "RPC Error", Some(e)), RpcError::TxError(e) => ErrorObject::owned(2, "Transaction Error", Some(e.to_string())), + RpcError::GraphError(e) => ErrorObject::owned(2, "Graph Error", Some(e.to_string())), } } } @@ -49,6 +53,12 @@ impl From for RpcError { } } +impl From for RpcError { + fn from(e: cesium_nucleus::graph::errors::GraphError) -> Self { + RpcError::GraphError(e) + } +} + #[rpc(server)] pub trait Rpc { #[method(name = "getVersion")] @@ -85,7 +95,15 @@ pub trait Rpc { async fn account_sub(&self) -> SubscriptionResult; } -pub struct RpcServerImpl; +pub struct RpcServerImpl { + dag: Arc>>, // If possible, make Graph 'static +} + +impl RpcServerImpl { + pub fn new(dag: Arc>>) -> Self { + Self { dag } + } +} #[async_trait] impl RpcServer for RpcServerImpl { @@ -111,7 +129,8 @@ impl RpcServer for RpcServerImpl { return Err(TransactionError::InvalidSignature.into()); } - // TODO: Submit the transaction to the dag/network + // TODO: May still need to do some things here? + self.dag.lock().await.add_item(&tx).await?; Ok("todo".to_string()) } @@ -177,7 +196,7 @@ impl RpcServer for RpcServerImpl { } } -async fn run_server() -> Result { +pub async fn start_rpc(dag: &Arc>>) -> Result { let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new(); let server = jsonrpsee::server::Server::builder() .set_rpc_middleware(rpc_middleware) @@ -185,21 +204,23 @@ async fn run_server() -> Result { .await?; let addr = server.local_addr()?; - let handle = server.start(RpcServerImpl.into_rpc()); + let rpc_server = RpcServerImpl::new(Arc::clone(dag)); + let handle = server.start(rpc_server.into_rpc()); tokio::spawn(handle.stopped()); - Ok(addr) -} - -pub async fn start_rpc() -> Result { - let server_addr = run_server().await?; - let url = format!("ws://{}", server_addr); + let url = format!("ws://{}", addr); Ok(url) } #[cfg(test)] mod tests { + use std::sync::Arc; + + use tokio::sync::Mutex; + + use cesium_crypto::keys::Account; + use cesium_nucleus::graph::mempool::Graph; use jsonrpsee::{ core::{client::ClientT, ClientError}, rpc_params, @@ -208,7 +229,11 @@ mod tests { #[tokio::test] async fn test_get_version() { - let url = super::start_rpc().await.unwrap(); + // Create the account and wrap it in Arc + let acc = Box::leak(Box::new(Account::create())); + let dag = Arc::new(Mutex::new(Graph::default(acc))); + + let url = super::start_rpc(&dag).await.unwrap(); let client = WsClientBuilder::default().build(&url).await.unwrap(); let result: Result = client.request("getVersion", rpc_params!()).await; diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 5d891d3..1ad0fa5 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -8,4 +8,7 @@ publish = false [dependencies] dirs.workspace = true -cesium-crypto.workspace = true \ No newline at end of file +cesium-crypto.workspace = true +cesium-nucleus.workspace = true +tokio.workspace = true +cesium-rpc.workspace = true \ No newline at end of file diff --git a/validator/src/main.rs b/validator/src/main.rs index 2976baa..ca6e9d0 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,6 +1,9 @@ -use std::{fs::File, io::Write, path::PathBuf}; +use std::{fs::File, io::Write, path::PathBuf, sync::Arc}; use cesium_crypto::keys::Account; +use cesium_nucleus::graph::mempool::Graph; +use cesium_rpc::start_rpc; +use tokio::sync::Mutex; fn handle_account(cesium_dir: &PathBuf) -> Account { let account_sk_path = cesium_dir.join("account.sk"); @@ -56,7 +59,8 @@ fn handle_account(cesium_dir: &PathBuf) -> Account { account } -fn main() { +#[tokio::main] +async fn main() { let home_dir = match dirs::home_dir() { Some(path) => path, None => { @@ -83,4 +87,21 @@ fn main() { Address: {}"#, account.to_public_key_readable() ); + + // Get a dag instance + let acc = Box::leak(Box::new(account)); + let dag = Arc::new(Mutex::new(Graph::default(acc))); + + // This will also spawn a tokio process + let url = start_rpc(&dag).await.unwrap(); + println!( + "RPC server started at: {}", + url.split("://").collect::>()[1] + ); + + // Now we just need to keep this process running + // TODO: Handle this properly + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + } }