diff --git a/core/node/da_clients/src/eigen/sdk.rs b/core/node/da_clients/src/eigen/sdk.rs index 9b7d992b530..ef3175b39c5 100644 --- a/core/node/da_clients/src/eigen/sdk.rs +++ b/core/node/da_clients/src/eigen/sdk.rs @@ -1,5 +1,6 @@ use std::{str::FromStr, time::Duration}; +use backon::{ConstantBuilder, Retryable}; use secp256k1::{ecdsa::RecoverableSignature, SecretKey}; use tokio::{sync::mpsc, time::Instant}; use tokio_stream::{wrappers::ReceiverStream, StreamExt}; @@ -35,6 +36,7 @@ pub(crate) struct RawEigenClient { } pub(crate) const DATA_CHUNK_SIZE: usize = 32; +pub(crate) const AVG_BLOCK_TIME: u64 = 12; impl RawEigenClient { pub(crate) const BUFFER_SIZE: usize = 1000; @@ -104,17 +106,18 @@ impl RawEigenClient { blob_info: BlobInfo, disperse_elapsed: Duration, ) -> anyhow::Result<()> { - let start = Instant::now(); - while Instant::now() - start - < (Duration::from_millis(self.config.status_query_timeout) - disperse_elapsed) - { - tokio::time::sleep(Duration::from_secs(12)).await; // avg block time - match self.verifier.verify_certificate(blob_info.clone()).await { - Ok(_) => return Ok(()), - Err(_) => continue, - } - } - Err(anyhow::anyhow!("Failed to validate certificate")) + (|| async { self.verifier.verify_certificate(blob_info.clone()).await }) + .retry( + &ConstantBuilder::default() + .with_delay(Duration::from_secs(AVG_BLOCK_TIME)) + .with_max_times( + (self.config.status_query_timeout + - disperse_elapsed.as_millis() as u64 / AVG_BLOCK_TIME) + as usize, + ), + ) + .await + .map_err(|_| anyhow::anyhow!("Failed to verify certificate")) } async fn dispatch_blob_authenticated(&self, data: Vec) -> anyhow::Result { @@ -259,24 +262,24 @@ impl RawEigenClient { async fn await_for_inclusion( &self, - mut client: DisperserClient, + client: DisperserClient, disperse_blob_reply: DisperseBlobReply, ) -> anyhow::Result { let polling_request = disperser::BlobStatusRequest { request_id: disperse_blob_reply.request_id, }; - let start_time = Instant::now(); - while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout) - { - tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await; - let resp = client + let blob_info = (|| async { + let mut client_clone = client.clone(); + let resp = client_clone .get_blob_status(polling_request.clone()) .await? .into_inner(); match disperser::BlobStatus::try_from(resp.status)? { - disperser::BlobStatus::Processing | disperser::BlobStatus::Dispersing => {} + disperser::BlobStatus::Processing | disperser::BlobStatus::Dispersing => { + return Err(anyhow::anyhow!("Blob is still processing")) + } disperser::BlobStatus::Failed => { return Err(anyhow::anyhow!("Blob dispatch failed")) } @@ -290,6 +293,7 @@ impl RawEigenClient { .ok_or_else(|| anyhow::anyhow!("No blob header in response"))?; return Ok(blob_info); } + return Err(anyhow::anyhow!("Blob is still processing")); } disperser::BlobStatus::Finalized => { let blob_info = resp @@ -300,9 +304,18 @@ impl RawEigenClient { _ => return Err(anyhow::anyhow!("Received unknown blob status")), } - } - - Err(anyhow::anyhow!("Failed to disperse blob (timeout)")) + }) + .retry( + &ConstantBuilder::default() + .with_delay(Duration::from_millis(self.config.status_query_interval)) + .with_max_times( + (self.config.status_query_timeout / self.config.status_query_interval) as usize, + ), + ) + .when(|e| e.to_string().contains("Blob is still processing")) + .await?; + + return Ok(blob_info); } #[cfg(test)]