From d28da9da5b6372e9c985b01f76ecaba78e85f1ce Mon Sep 17 00:00:00 2001 From: idky137 Date: Wed, 30 Oct 2024 11:52:10 +0000 Subject: [PATCH] implemented get_block_range_nullifiers --- zaino-serve/src/rpc/service.rs | 63 ++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 6 deletions(-) diff --git a/zaino-serve/src/rpc/service.rs b/zaino-serve/src/rpc/service.rs index 0d064b4..7fa2fb5 100644 --- a/zaino-serve/src/rpc/service.rs +++ b/zaino-serve/src/rpc/service.rs @@ -273,15 +273,14 @@ impl CompactTxStreamer for GrpcClient { /// Server streaming response type for the GetBlockRangeNullifiers method. #[doc = " Server streaming response type for the GetBlockRangeNullifiers method."] - type GetBlockRangeNullifiersStream = tonic::Streaming; + type GetBlockRangeNullifiersStream = std::pin::Pin>; /// Same as GetBlockRange except actions contain only nullifiers. /// - /// This RPC has not been implemented as it is not currently used by zingolib. - /// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer). + /// NOTE: This should be reimplemented with the introduction of the BlockCache. fn get_block_range_nullifiers<'life0, 'async_trait>( &'life0 self, - _request: tonic::Request, + request: tonic::Request, ) -> core::pin::Pin< Box< dyn core::future::Future< @@ -298,8 +297,60 @@ impl CompactTxStreamer for GrpcClient { Self: 'async_trait, { println!("[TEST] Received call of get_block_range_nullifiers."); - Box::pin(async { - Err(tonic::Status::unimplemented("get_block_range_nullifiers not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).")) + let zebrad_uri = self.zebrad_uri.clone(); + Box::pin(async move { + let blockrange = request.into_inner(); + let mut start = blockrange + .start + .map(|s| s.height as u32) + .ok_or(tonic::Status::invalid_argument("Start block not specified"))?; + let mut end = blockrange + .end + .map(|e| e.height as u32) + .ok_or(tonic::Status::invalid_argument("End block not specified"))?; + if start > end { + (start, end) = (end, start); + } + let (channel_tx, channel_rx) = tokio::sync::mpsc::channel(32); + tokio::spawn(async move { + // NOTE: This timeout is so slow due to the blockcache not being implemented. This should be reduced to 30s once functionality is in place. + let timeout = timeout(std::time::Duration::from_secs(120), async { + for height in start..=end { + let compact_block = get_nullifiers_from_node(&zebrad_uri, &height).await; + match compact_block { + Ok(block) => { + if channel_tx.send(Ok(block)).await.is_err() { + break; + } + } + Err(e) => { + if channel_tx + .send(Err(tonic::Status::internal(e.to_string()))) + .await + .is_err() + { + break; + } + } + } + } + }) + .await; + match timeout { + Ok(_) => {} + Err(_) => { + channel_tx + .send(Err(tonic::Status::internal( + "get_block_range_nullifiers gRPC request timed out", + ))) + .await + .ok(); + } + } + }); + let output_stream = CompactBlockStream::new(channel_rx); + let stream_boxed = Box::pin(output_stream); + Ok(tonic::Response::new(stream_boxed)) }) }