Skip to content

Commit

Permalink
feat(eigen-client-extra-features): Backon (#344)
Browse files Browse the repository at this point in the history
* Add backon instead loops

* Add constant for avg block time
  • Loading branch information
gianbelinche authored Nov 19, 2024
1 parent 986a1d2 commit 09ecfdd
Showing 1 changed file with 34 additions and 21 deletions.
55 changes: 34 additions & 21 deletions core/node/da_clients/src/eigen/sdk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{str::FromStr, time::Duration};

use backon::{ConstantBuilder, Retryable};
use secp256k1::{ecdsa::RecoverableSignature, SecretKey};
use tokio::{sync::mpsc, time::Instant};
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
Expand Down Expand Up @@ -35,6 +36,7 @@ pub(crate) struct RawEigenClient {
}

pub(crate) const DATA_CHUNK_SIZE: usize = 32;
pub(crate) const AVG_BLOCK_TIME: u64 = 12;

impl RawEigenClient {
pub(crate) const BUFFER_SIZE: usize = 1000;
Expand Down Expand Up @@ -104,17 +106,18 @@ impl RawEigenClient {
blob_info: BlobInfo,
disperse_elapsed: Duration,
) -> anyhow::Result<()> {
let start = Instant::now();
while Instant::now() - start
< (Duration::from_millis(self.config.status_query_timeout) - disperse_elapsed)
{
tokio::time::sleep(Duration::from_secs(12)).await; // avg block time
match self.verifier.verify_certificate(blob_info.clone()).await {
Ok(_) => return Ok(()),
Err(_) => continue,
}
}
Err(anyhow::anyhow!("Failed to validate certificate"))
(|| async { self.verifier.verify_certificate(blob_info.clone()).await })
.retry(
&ConstantBuilder::default()
.with_delay(Duration::from_secs(AVG_BLOCK_TIME))
.with_max_times(
(self.config.status_query_timeout
- disperse_elapsed.as_millis() as u64 / AVG_BLOCK_TIME)
as usize,
),
)
.await
.map_err(|_| anyhow::anyhow!("Failed to verify certificate"))
}

async fn dispatch_blob_authenticated(&self, data: Vec<u8>) -> anyhow::Result<String> {
Expand Down Expand Up @@ -259,24 +262,24 @@ impl RawEigenClient {

async fn await_for_inclusion(
&self,
mut client: DisperserClient<Channel>,
client: DisperserClient<Channel>,
disperse_blob_reply: DisperseBlobReply,
) -> anyhow::Result<DisperserBlobInfo> {
let polling_request = disperser::BlobStatusRequest {
request_id: disperse_blob_reply.request_id,
};

let start_time = Instant::now();
while Instant::now() - start_time < Duration::from_millis(self.config.status_query_timeout)
{
tokio::time::sleep(Duration::from_millis(self.config.status_query_interval)).await;
let resp = client
let blob_info = (|| async {
let mut client_clone = client.clone();
let resp = client_clone
.get_blob_status(polling_request.clone())
.await?
.into_inner();

match disperser::BlobStatus::try_from(resp.status)? {
disperser::BlobStatus::Processing | disperser::BlobStatus::Dispersing => {}
disperser::BlobStatus::Processing | disperser::BlobStatus::Dispersing => {
return Err(anyhow::anyhow!("Blob is still processing"))
}
disperser::BlobStatus::Failed => {
return Err(anyhow::anyhow!("Blob dispatch failed"))
}
Expand All @@ -290,6 +293,7 @@ impl RawEigenClient {
.ok_or_else(|| anyhow::anyhow!("No blob header in response"))?;
return Ok(blob_info);
}
return Err(anyhow::anyhow!("Blob is still processing"));
}
disperser::BlobStatus::Finalized => {
let blob_info = resp
Expand All @@ -300,9 +304,18 @@ impl RawEigenClient {

_ => return Err(anyhow::anyhow!("Received unknown blob status")),
}
}

Err(anyhow::anyhow!("Failed to disperse blob (timeout)"))
})
.retry(
&ConstantBuilder::default()
.with_delay(Duration::from_millis(self.config.status_query_interval))
.with_max_times(
(self.config.status_query_timeout / self.config.status_query_interval) as usize,
),
)
.when(|e| e.to_string().contains("Blob is still processing"))
.await?;

return Ok(blob_info);
}

#[cfg(test)]
Expand Down

0 comments on commit 09ecfdd

Please sign in to comment.