Skip to content

Commit

Permalink
backoff retry when service is overloaded
Browse files Browse the repository at this point in the history
  • Loading branch information
tzemanovic committed Nov 8, 2023
1 parent bc0352e commit 0da0f1f
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pin-project = "1"
futures = "0.3"
tracing = "0.1"
prost = "0.12"
backoff = { version = "0.4.0", features = ["tokio"] }

[dev-dependencies]
structopt = "0.3"
Expand Down
120 changes: 96 additions & 24 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::convert::{TryFrom, TryInto};
use std::pin::Pin;
use std::sync::Arc;

use backoff::ExponentialBackoff;
use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use futures::stream::{FuturesOrdered, Peekable, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::{
net::{TcpListener, ToSocketAddrs},
select,
Expand All @@ -20,10 +24,12 @@ use std::path::Path;
#[cfg(target_family = "unix")]
use tokio::net::UnixListener;

use crate::v037::codec::{Decode, Encode};
use tendermint::v0_37::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};
use tendermint_proto::v0_37::abci as pb;

/// An ABCI server which listens for connections and forwards requests to four
/// component ABCI [`Service`]s.
Expand Down Expand Up @@ -111,18 +117,25 @@ impl<C, M, I, S> Server<C, M, I, S>
where
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
C::Future: Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
M::Future: Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
I::Future: Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
S::Future: Send + 'static,
Expand All @@ -148,7 +161,7 @@ where
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
Expand Down Expand Up @@ -176,7 +189,7 @@ where
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
Expand All @@ -186,50 +199,101 @@ where
}
}

#[derive(Clone)]
struct Connection<C, M, I, S> {
consensus: C,
mempool: M,
info: I,
snapshot: S,
}

type StreamAndSink<R, W> = (
Peekable<FramedRead<R, Decode<pb::Request>>>,
FramedWrite<W, Encode<pb::Response>>,
);

impl<C, M, I, S> Connection<C, M, I, S>
where
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
M::Future: Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Clone + Send + 'static,
I::Future: Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(
mut self,
async fn run_with_backoff(
self,
read: impl AsyncReadExt + std::marker::Unpin,
write: impl AsyncWriteExt + std::marker::Unpin,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_37::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v037::codec::{Decode, Encode};
let (request_stream, response_sink) = {
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedRead::new(read, Decode::<pb::Request>::default()).peekable(),
FramedWrite::new(write, Encode::<pb::Response>::default()),
)
};

let stream_and_sink = Arc::new(Mutex::new((request_stream, response_sink)));
backoff::future::retry::<_, BoxError, _, _, _>(ExponentialBackoff::default(), || async {
let mut stream_and_sink = stream_and_sink.lock().await;
let run_result = self.clone().run(&mut stream_and_sink).await;

if let Err(e) = run_result {
match e.downcast::<tower::load_shed::error::Overloaded>() {
Err(e) => {
tracing::error!("error {e} in a connection handler");
return Err(backoff::Error::Permanent(e));
}
Ok(e) => {
tracing::warn!("a service is overloaded - backing off");
return Err(backoff::Error::transient(e));
}
}
}
Ok(())
})
.await
}

async fn run(
mut self,
stream_and_sink: &mut StreamAndSink<
impl AsyncReadExt + std::marker::Unpin,
impl AsyncWriteExt + std::marker::Unpin,
>,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

let (request_stream, response_sink) = stream_and_sink;
let mut pinned_stream = Pin::new(request_stream);
let mut responses = FuturesOrdered::new();

// We only peek the next request once it's popped from the request_stream
// to avoid crashing Tendermint in case the service call fails because
// it's e.g. overloaded.
let mut peeked_req = false;
loop {
select! {
req = request_stream.next() => {
let proto = match req.transpose()? {
Some(proto) => proto,
req = pinned_stream.as_mut().peek(), if !peeked_req => {
peeked_req = true;
let proto = match req {
Some(Ok(proto)) => proto.clone(),
Some(Err(_)) => return Err(pinned_stream.next().await.unwrap().unwrap_err()),
None => return Ok(()),
};
let request = Request::try_from(proto)?;
Expand Down Expand Up @@ -266,17 +330,25 @@ where
tracing::debug!(?response, "flushing response");
response_sink.send(response?.into()).await?;
}
// Allow to peek next request if none of the `response?` above failed ...
peeked_req = false;
// ... and pop the last peeked request
pinned_stream.next().await.unwrap()?;
// Now we need to tell Tendermint we've flushed responses
response_sink.send(Response::Flush.into()).await?;
}
}
}
rsp = responses.next(), if !responses.is_empty() => {
let response = rsp.expect("didn't poll when responses was empty");
let response = rsp.expect("didn't poll when responses was empty")?;
// Allow to peek next request if the `?` above didn't fail ...
peeked_req = false;
// ... and pop the last peeked request
pinned_stream.next().await.unwrap()?;
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
tracing::debug!(?response, "sending response");
response_sink.send(response?.into()).await?;
response_sink.send(response.into()).await?;
}
}
}
Expand Down

0 comments on commit 0da0f1f

Please sign in to comment.