Skip to content

Commit

Permalink
get_mempool_stream implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Jun 17, 2024
1 parent 6c35b78 commit 5c1d8b5
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions integration-tests/tests/integrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,14 @@ mod wallet_basic {
)])
.await
.unwrap();
zingo_client
.do_send(vec![(
&zingolib::get_base_address!(zingo_client, "sapling"),
250_000,
None,
)])
.await
.unwrap();

let zingo_client_saved = zingo_client.export_save_buffer_async().await.unwrap();
let zingo_client_loaded = std::sync::Arc::new(
Expand All @@ -431,19 +439,17 @@ mod wallet_basic {
);
LightClient::start_mempool_monitor(zingo_client_loaded.clone());
// This seems to be long enough for the mempool monitor to kick in.
// One second is insufficient. Even if this fails, this can only ever be
// a false negative, giving us a balance of 100_000. Still, could be improved.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;

let balance = zingo_client.do_balance().await;
println!("@zingoproxytest: zingo_client balance: \n{:#?}.", balance);
assert_eq!(balance.unverified_sapling_balance.unwrap(), 250_000);
assert_eq!(balance.unverified_sapling_balance.unwrap(), 500_000);

test_manager.regtest_manager.generate_n_blocks(1).unwrap();
zingo_client.do_sync(false).await.unwrap();
let balance = zingo_client.do_balance().await;
println!("@zingoproxytest: zingo_client balance: \n{:#?}.", balance);
assert_eq!(balance.verified_sapling_balance.unwrap(), 250_000);
assert_eq!(balance.verified_sapling_balance.unwrap(), 500_000);

drop_test_manager(
Some(test_manager.temp_conf_dir.path().to_path_buf()),
Expand Down
2 changes: 1 addition & 1 deletion zingo-proxyd/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub async fn spawn_proxy(
let mut handles = vec![];
let nym_addr_out: Option<String>;

// startup_message();
startup_message();
println!("@zingoproxyd: Launching Zingo-Proxy..\n@zingoproxyd: Launching gRPC Server..");
let proxy_handle = spawn_server(proxy_port, lwd_port, zebrad_port, online.clone()).await;
handles.push(proxy_handle);
Expand Down
4 changes: 1 addition & 3 deletions zingo-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ serde_json = { version = "1.0.117", features = ["preserve_order"] }
jsonrpc-core = "18.0.0"

indexmap = { version = "2.2.6", features = ["serde"] }

base64 = "0.13.0"
tokio-stream = "0.1"
futures = "0.3.30"

byteorder = "1"
sha2 = "0.10"

thiserror = "1.0.59"

1 change: 1 addition & 0 deletions zingo-rpc/src/blockcache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Zingo-Proxy Block Cache and Mempool State Engine.
pub mod block;
pub mod mempool;
pub mod transaction;
pub mod utils;
132 changes: 132 additions & 0 deletions zingo-rpc/src/blockcache/mempool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//! Zingo-Proxy mempool state functionality.
use std::{collections::HashSet, time::SystemTime};
use thiserror::Error;
use tokio::sync::{Mutex, RwLock};

use crate::jsonrpc::connector::{JsonRpcConnector, JsonRpcConnectorError};

/// Mempool state information.
pub struct Mempool {
/// Txids currently in the mempool.
txids: RwLock<Vec<String>>,
/// Txids that have already been added to Zingo-Proxy's mempool.
txids_seen: Mutex<HashSet<String>>,
/// System time when the mempool was last updated.
last_sync_time: Mutex<SystemTime>,
/// Blockchain data, used to check when a new block has been mined.
best_block_hash: RwLock<Option<zebra_chain::block::Hash>>,
}

/// Mempool Error struct.
#[derive(Error, Debug)]
pub enum MempoolError {
/// Errors from the JsonRPC client.
#[error("JsonRpcConnectorError: {0}")]
JsonRpcError(#[from] JsonRpcConnectorError),
}

impl Mempool {
/// Returns an empty mempool.
pub fn new() -> Self {
Mempool {
txids: RwLock::new(Vec::new()),
txids_seen: Mutex::new(HashSet::new()),
last_sync_time: Mutex::new(SystemTime::now()),
best_block_hash: RwLock::new(None),
}
}

/// Updates the mempool, returns true if the current block in the mempool has been mined.
pub async fn update(&self, zebrad_uri: &http::Uri) -> Result<bool, MempoolError> {
self.update_last_sync_time().await?;
let mined = self.check_and_update_best_block_hash(zebrad_uri).await?;
if mined {
self.reset_txids().await?;
self.update_txids(zebrad_uri).await?;
Ok(true)
} else {
self.update_txids(zebrad_uri).await?;
Ok(false)
}
}

/// Updates the txids in the mempool.
async fn update_txids(&self, zebrad_uri: &http::Uri) -> Result<(), MempoolError> {
let node_txids = JsonRpcConnector::new(
zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.get_raw_mempool()
.await?
.transactions;
let mut txids_seen = self.txids_seen.lock().await;
let mut txids = self.txids.write().await;
for txid in node_txids {
if !txids_seen.contains(&txid) {
txids.push(txid.clone());
}
txids_seen.insert(txid);
}
Ok(())
}

/// Updates the system last sync time.
async fn update_last_sync_time(&self) -> Result<(), MempoolError> {
let mut last_sync_time = self.last_sync_time.lock().await;
*last_sync_time = SystemTime::now();
Ok(())
}

/// Updates the mempool blockchain info, returns true if the current block in the mempool has been mined.
async fn check_and_update_best_block_hash(
&self,
zebrad_uri: &http::Uri,
) -> Result<bool, MempoolError> {
let node_best_block_hash = JsonRpcConnector::new(
zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.get_blockchain_info()
.await?
.best_block_hash;

let mut last_best_block_hash = self.best_block_hash.write().await;

if let Some(ref last_hash) = *last_best_block_hash {
if node_best_block_hash == *last_hash {
return Ok(false);
}
}

*last_best_block_hash = Some(node_best_block_hash);
Ok(true)
}

/// Clears the txids currently held in the mempool.
async fn reset_txids(&self) -> Result<(), MempoolError> {
let mut txids = self.txids.write().await;
txids.clear();
let mut txids_seen = self.txids_seen.lock().await;
txids_seen.clear();
Ok(())
}

/// Returns the txids currently in the mempool.
pub async fn get_mempool_txids(&self) -> Result<Vec<String>, MempoolError> {
let txids = self.txids.read().await;
Ok(txids.clone())
}

/// Returns the hash of the block currently in the mempool.
pub async fn get_best_block_hash(
&self,
) -> Result<Option<zebra_chain::block::Hash>, MempoolError> {
let best_block_hash = self.best_block_hash.read().await;
Ok(best_block_hash.clone())
}
}
14 changes: 1 addition & 13 deletions zingo-rpc/src/jsonrpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl JsonRpcConnector {

/// Sends a jsonRPC request and returns the response.
///
/// NOTE/TODO: This function currently resends the call up to 5 times on a server response of "Work queue depth exceeded".
/// TODO: This function currently resends the call up to 5 times on a server response of "Work queue depth exceeded".
/// This is because the node's queue can become overloaded and stop servicing RPCs.
/// This functionality is weak and should be incorporated in Zingo-Proxy's queue mechanism [WIP] that handles various errors appropriately.
pub async fn send_request<T: Serialize, R: for<'de> Deserialize<'de>>(
Expand Down Expand Up @@ -219,15 +219,6 @@ impl JsonRpcConnector {
Box::new(e),
)
})?;

// let test_response: RpcResponse<R> =
// serde_json::from_slice(&body_bytes).unwrap_or_else(|e| {
// panic!(
// "Failed to deserialize response: {}\nBody bytes: {:?}",
// e,
// String::from_utf8_lossy(&body_bytes)
// )
// });
let body_str = String::from_utf8_lossy(&body_bytes);
if body_str.contains("Work queue depth exceeded") {
if attempts >= max_attempts {
Expand All @@ -238,7 +229,6 @@ impl JsonRpcConnector {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
continue;
}

let response: RpcResponse<R> = serde_json::from_slice(&body_bytes).map_err(|e| {
JsonRpcConnectorError::new_with_source(
"Failed to deserialize response",
Expand Down Expand Up @@ -364,8 +354,6 @@ impl JsonRpcConnector {
/// zcashd reference: [`getrawmempool`](https://zcash.github.io/rpc/getrawmempool.html)
/// method: post
/// tags: blockchain
///
/// NOTE: Currently unused by Zingo-Proxy and untested!
pub async fn get_raw_mempool(&self) -> Result<TxidsResponse, JsonRpcConnectorError> {
self.send_request::<(), TxidsResponse>("getrawmempool", ())
.await
Expand Down
7 changes: 7 additions & 0 deletions zingo-rpc/src/jsonrpc/primitives.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,13 @@ impl<'de> Deserialize<'de> for GetTransactionResponse {
confirmations: v["confirmations"].as_u64().unwrap() as u32,
};
Ok(obj)
} else if v.get("hex").is_some() && v.get("txid").is_some() {
let obj = GetTransactionResponse::Object {
hex: serde_json::from_value(v["hex"].clone()).unwrap(),
height: -1 as i32,
confirmations: 0 as u32,
};
Ok(obj)
} else {
let raw = GetTransactionResponse::Raw(serde_json::from_value(v.clone()).unwrap());
Ok(raw)
Expand Down
Loading

0 comments on commit 5c1d8b5

Please sign in to comment.