Skip to content

Commit

Permalink
feat, prelim rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
OnlyF0uR committed Nov 19, 2024
1 parent 8468bae commit d1db881
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 16 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/cesium-nucleus/src/graph/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ impl<'a> Graph<'a> {
Self::new(account, 2500, 5, 0.45)
}

pub fn sync(&self) {
// TODO: Sync the graph with other validators
}

// The minimal amount of nodes required to kick off the graph is
pub async fn add_genesis(&self, input: &Transaction) -> Result<(), GraphError> {
if let Err(e) = self.validate_item(input) {
Expand Down
4 changes: 3 additions & 1 deletion crates/cesium-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ futures.workspace = true
jsonrpsee = { workspace = true, features = ["server", "macros"]}
hex.workspace = true
cesium-nebula.workspace = true
cesium-nucleus.workspace = true

[dev-dependencies]
jsonrpsee = { workspace = true, features = ["http-client", "ws-client", "macros"]}
jsonrpsee = { workspace = true, features = ["http-client", "ws-client", "macros"]}
cesium-crypto.workspace = true
49 changes: 37 additions & 12 deletions crates/cesium-rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::Mutex;

use cesium_nebula::transaction::{Transaction, TransactionError};
use cesium_nucleus::graph::mempool::Graph;
use hex::FromHexError;
use jsonrpsee::{
core::{async_trait, SubscriptionResult},
Expand All @@ -18,6 +20,7 @@ pub enum RpcError {
HexError(hex::FromHexError),
RpcError(String),
TxError(TransactionError),
GraphError(cesium_nucleus::graph::errors::GraphError),
}

impl From<RpcError> for ErrorObject<'static> {
Expand All @@ -27,6 +30,7 @@ impl From<RpcError> for ErrorObject<'static> {
RpcError::HexError(e) => ErrorObject::owned(2, "Hex Error", Some(e.to_string())),
RpcError::RpcError(e) => ErrorObject::owned(2, "RPC Error", Some(e)),
RpcError::TxError(e) => ErrorObject::owned(2, "Transaction Error", Some(e.to_string())),
RpcError::GraphError(e) => ErrorObject::owned(2, "Graph Error", Some(e.to_string())),
}
}
}
Expand All @@ -49,6 +53,12 @@ impl From<TransactionError> for RpcError {
}
}

impl From<cesium_nucleus::graph::errors::GraphError> for RpcError {
fn from(e: cesium_nucleus::graph::errors::GraphError) -> Self {
RpcError::GraphError(e)
}
}

#[rpc(server)]
pub trait Rpc {
#[method(name = "getVersion")]
Expand Down Expand Up @@ -85,7 +95,15 @@ pub trait Rpc {
async fn account_sub(&self) -> SubscriptionResult;
}

pub struct RpcServerImpl;
pub struct RpcServerImpl {
dag: Arc<Mutex<Graph<'static>>>, // If possible, make Graph 'static
}

impl RpcServerImpl {
pub fn new(dag: Arc<Mutex<Graph<'static>>>) -> Self {
Self { dag }
}
}

#[async_trait]
impl RpcServer for RpcServerImpl {
Expand All @@ -111,7 +129,8 @@ impl RpcServer for RpcServerImpl {
return Err(TransactionError::InvalidSignature.into());
}

// TODO: Submit the transaction to the dag/network
// TODO: May still need to do some things here?
self.dag.lock().await.add_item(&tx).await?;

Ok("todo".to_string())
}
Expand Down Expand Up @@ -177,29 +196,31 @@ impl RpcServer for RpcServerImpl {
}
}

async fn run_server() -> Result<SocketAddr, RpcError> {
pub async fn start_rpc(dag: &Arc<Mutex<Graph<'static>>>) -> Result<String, RpcError> {
let rpc_middleware = jsonrpsee::server::middleware::rpc::RpcServiceBuilder::new();
let server = jsonrpsee::server::Server::builder()
.set_rpc_middleware(rpc_middleware)
.build("127.0.0.1:0")
.await?;

let addr = server.local_addr()?;
let handle = server.start(RpcServerImpl.into_rpc());
let rpc_server = RpcServerImpl::new(Arc::clone(dag));
let handle = server.start(rpc_server.into_rpc());

tokio::spawn(handle.stopped());

Ok(addr)
}

pub async fn start_rpc() -> Result<String, RpcError> {
let server_addr = run_server().await?;
let url = format!("ws://{}", server_addr);
let url = format!("ws://{}", addr);
Ok(url)
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use tokio::sync::Mutex;

use cesium_crypto::keys::Account;
use cesium_nucleus::graph::mempool::Graph;
use jsonrpsee::{
core::{client::ClientT, ClientError},
rpc_params,
Expand All @@ -208,7 +229,11 @@ mod tests {

#[tokio::test]
async fn test_get_version() {
let url = super::start_rpc().await.unwrap();
// Create the account and wrap it in Arc
let acc = Box::leak(Box::new(Account::create()));
let dag = Arc::new(Mutex::new(Graph::default(acc)));

let url = super::start_rpc(&dag).await.unwrap();

let client = WsClientBuilder::default().build(&url).await.unwrap();
let result: Result<String, ClientError> = client.request("getVersion", rpc_params!()).await;
Expand Down
5 changes: 4 additions & 1 deletion validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@ publish = false

[dependencies]
dirs.workspace = true
cesium-crypto.workspace = true
cesium-crypto.workspace = true
cesium-nucleus.workspace = true
tokio.workspace = true
cesium-rpc.workspace = true
25 changes: 23 additions & 2 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::{fs::File, io::Write, path::PathBuf};
use std::{fs::File, io::Write, path::PathBuf, sync::Arc};

use cesium_crypto::keys::Account;
use cesium_nucleus::graph::mempool::Graph;
use cesium_rpc::start_rpc;
use tokio::sync::Mutex;

fn handle_account(cesium_dir: &PathBuf) -> Account {
let account_sk_path = cesium_dir.join("account.sk");
Expand Down Expand Up @@ -56,7 +59,8 @@ fn handle_account(cesium_dir: &PathBuf) -> Account {
account
}

fn main() {
#[tokio::main]
async fn main() {
let home_dir = match dirs::home_dir() {
Some(path) => path,
None => {
Expand All @@ -83,4 +87,21 @@ fn main() {
Address: {}"#,
account.to_public_key_readable()
);

// Get a dag instance
let acc = Box::leak(Box::new(account));
let dag = Arc::new(Mutex::new(Graph::default(acc)));

// This will also spawn a tokio process
let url = start_rpc(&dag).await.unwrap();
println!(
"RPC server started at: {}",
url.split("://").collect::<Vec<&str>>()[1]
);

// Now we just need to keep this process running
// TODO: Handle this properly
loop {
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
}

0 comments on commit d1db881

Please sign in to comment.