Skip to content

Commit

Permalink
implemented address balance rpcs
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Nov 6, 2024
1 parent d3ef4f9 commit ab5e73a
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 11 deletions.
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions zaino-fetch/src/jsonrpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl JsonRpcConnector {
}
if !status.is_success() {
return Err(JsonRpcConnectorError::new(format!(
"Error: Error status from node's rpc server: {}",
status
"Error: Error status from node's rpc server: {}, {}",
status, body_str
)));
}

Expand Down Expand Up @@ -189,7 +189,7 @@ impl JsonRpcConnector {
&self,
addresses: Vec<String>,
) -> Result<GetBalanceResponse, JsonRpcConnectorError> {
let params = vec![serde_json::to_value(addresses)?];
let params = vec![serde_json::json!({ "addresses": addresses })];
self.send_request("getaddressbalance", params).await
}

Expand Down
1 change: 1 addition & 0 deletions zaino-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tokio-stream = "0.1"
futures = "0.3.30"
async-stream = "0.3"
crossbeam-channel = "0.5"
lazy-regex = "3.3"

[build-dependencies]
whoami = "1.5"
142 changes: 134 additions & 8 deletions zaino-serve/src/rpc/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Lightwallet service RPC implementations.

use futures::StreamExt;
use hex::FromHex;
use tokio::time::timeout;
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -24,6 +25,21 @@ use zaino_proto::proto::{
},
};

/// T Address Regex
static TADDR_REGEX: lazy_regex::Lazy<lazy_regex::regex::Regex> =
lazy_regex::lazy_regex!(r"^t[a-zA-Z0-9]{34}$");

/// Checks for valid t Address.
///
/// Returns Some(taddress) if address is valid else none.
fn check_taddress(taddr: &str) -> Option<&str> {
if TADDR_REGEX.is_match(taddr) {
Some(taddr)
} else {
None
}
}

/// Stream of RawTransactions, output type of get_taddress_txids.
pub struct RawTransactionStream {
inner: ReceiverStream<Result<RawTransaction, tonic::Status>>,
Expand Down Expand Up @@ -783,11 +799,10 @@ impl CompactTxStreamer for GrpcClient {
})
}

/// This RPC has not been implemented as it is not currently used by zingolib.
/// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).
/// Returns the total balance for a list of taddrs
fn get_taddress_balance<'life0, 'async_trait>(
&'life0 self,
_request: tonic::Request<AddressList>,
request: tonic::Request<AddressList>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<
Expand All @@ -802,17 +817,39 @@ impl CompactTxStreamer for GrpcClient {
{
println!("[TEST] Received call of get_taddress_balance.");
Box::pin(async {
Err(tonic::Status::unimplemented("get_taddress_balance not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer)."))
let zebrad_client = JsonRpcConnector::new(
self.zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await?;
let taddrs = request.into_inner().addresses;
if !taddrs.iter().all(|taddr| check_taddress(taddr).is_some()) {
return Err(tonic::Status::invalid_argument(
"Error: One or more invalid taddresses given.",
));
}
let balance = zebrad_client.get_address_balance(taddrs).await?;
let checked_balance: i64 = match i64::try_from(balance.balance) {
Ok(balance) => balance,
Err(_) => {
return Err(tonic::Status::unknown(
"Error: Error converting balance from u64 to i64.",
));
}
};
Ok(tonic::Response::new(Balance {
value_zat: checked_balance,
}))
})
}

/// This RPC has not been implemented as it is not currently used by zingolib.
/// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).
/// Returns the total balance for a list of taddrs
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn get_taddress_balance_stream<'life0, 'async_trait>(
&'life0 self,
_request: tonic::Request<tonic::Streaming<Address>>,
request: tonic::Request<tonic::Streaming<Address>>,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = Result<tonic::Response<Balance>, tonic::Status>>
Expand All @@ -826,7 +863,96 @@ impl CompactTxStreamer for GrpcClient {
{
println!("[TEST] Received call of get_taddress_balance_stream.");
Box::pin(async {
Err(tonic::Status::unimplemented("get_taddress_balance_stream not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer)."))
let zebrad_client = JsonRpcConnector::new(
self.zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await?;
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<String>(32);
let fetcher_task_handle = tokio::spawn(async move {
let fetcher_timeout = timeout(std::time::Duration::from_secs(30), async {
let mut total_balance: u64 = 0;
loop {
match channel_rx.recv().await {
Some(taddr) => {
if check_taddress(taddr.as_str()).is_some() {
let balance =
zebrad_client.get_address_balance(vec![taddr]).await?;
total_balance += balance.balance;
} else {
return Err(tonic::Status::invalid_argument(
"Error: One or more invalid taddresses given.",
));
}
}
None => {
return Ok(total_balance);
}
}
}
})
.await;
match fetcher_timeout {
Ok(result) => result,
Err(_) => Err(tonic::Status::deadline_exceeded(
"Error: get_taddress_balance_stream request timed out.",
)),
}
});
let addr_recv_timeout = timeout(std::time::Duration::from_secs(30), async {
let mut address_stream = request.into_inner();
while let Some(address_result) = address_stream.next().await {
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
let address = address_result.map_err(|e| {
tonic::Status::unknown(format!("Failed to read from stream: {}", e))
})?;
if channel_tx.send(address.address).await.is_err() {
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
return Err(tonic::Status::unknown(
"Error: Failed to send address to balance task.",
));
}
}
drop(channel_tx);
Ok::<(), tonic::Status>(())
})
.await;
match addr_recv_timeout {
Ok(Ok(())) => {}
Ok(Err(e)) => {
fetcher_task_handle.abort();
return Err(e);
}
Err(_) => {
fetcher_task_handle.abort();
return Err(tonic::Status::deadline_exceeded(
"Error: get_taddress_balance_stream request timed out in address loop.",
));
}
}
match fetcher_task_handle.await {
Ok(Ok(total_balance)) => {
let checked_balance: i64 = match i64::try_from(total_balance) {
Ok(balance) => balance,
Err(_) => {
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
return Err(tonic::Status::unknown(
"Error: Error converting balance from u64 to i64.",
));
}
};
Ok(tonic::Response::new(Balance {
value_zat: checked_balance,
}))
}
Ok(Err(e)) => Err(e),
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
Err(e) => Err(tonic::Status::unknown(format!(
"Fetcher Task failed: {}",
e
))),
}
})
}

Expand Down

0 comments on commit ab5e73a

Please sign in to comment.