diff --git a/Cargo.toml b/Cargo.toml index f46579c..719a1fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ pin-project = "1" futures = "0.3" tracing = "0.1" prost = "0.12" +backoff = { version = "0.4.0", features = ["tokio"] } [dev-dependencies] structopt = "0.3" diff --git a/src/v037/server.rs b/src/v037/server.rs index 3e46c06..2e3c1b4 100644 --- a/src/v037/server.rs +++ b/src/v037/server.rs @@ -1,9 +1,13 @@ use std::convert::{TryFrom, TryInto}; +use std::pin::Pin; +use std::sync::Arc; +use backoff::ExponentialBackoff; use futures::future::{FutureExt, TryFutureExt}; use futures::sink::SinkExt; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{FuturesOrdered, Peekable, StreamExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::Mutex; use tokio::{ net::{TcpListener, ToSocketAddrs}, select, @@ -20,10 +24,12 @@ use std::path::Path; #[cfg(target_family = "unix")] use tokio::net::UnixListener; +use crate::v037::codec::{Decode, Encode}; use tendermint::v0_37::abci::{ ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, }; +use tendermint_proto::v0_37::abci as pb; /// An ABCI server which listens for connections and forwards requests to four /// component ABCI [`Service`]s. @@ -111,18 +117,25 @@ impl Server where C: Service + Send + + Sync + Clone + 'static, C::Future: Send + 'static, M: Service + Send + + Sync + Clone + 'static, M::Future: Send + 'static, - I: Service + Send + Clone + 'static, + I: Service + + Send + + Sync + + Clone + + 'static, I::Future: Send + 'static, S: Service + Send + + Sync + Clone + 'static, S::Future: Send + 'static, @@ -148,7 +161,7 @@ where snapshot: self.snapshot.clone(), }; let (read, write) = socket.into_split(); - tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() }); } Err(e) => { tracing::error!({ %e }, "error accepting new connection"); @@ -176,7 +189,7 @@ where snapshot: self.snapshot.clone(), }; let (read, write) = socket.into_split(); - tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() }); } Err(e) => { tracing::error!({ %e }, "error accepting new connection"); @@ -186,6 +199,7 @@ where } } +#[derive(Clone)] struct Connection { consensus: C, mempool: M, @@ -193,43 +207,93 @@ struct Connection { snapshot: S, } +type StreamAndSink = ( + Peekable>>, + FramedWrite>, +); + impl Connection where - C: Service + Send + 'static, + C: Service + + Clone + + Send + + 'static, C::Future: Send + 'static, - M: Service + Send + 'static, + M: Service + + Clone + + Send + + 'static, + C: Service + + Clone + + Send + + 'static, M::Future: Send + 'static, - I: Service + Send + 'static, + I: Service + Clone + Send + 'static, I::Future: Send + 'static, - S: Service + Send + 'static, + S: Service + + Clone + + Send + + 'static, S::Future: Send + 'static, { - // XXX handle errors gracefully - // figure out how / if to return errors to tendermint - async fn run( - mut self, + async fn run_with_backoff( + self, read: impl AsyncReadExt + std::marker::Unpin, write: impl AsyncWriteExt + std::marker::Unpin, ) -> Result<(), BoxError> { - tracing::info!("listening for requests"); - - use tendermint_proto::v0_37::abci as pb; - - let (mut request_stream, mut response_sink) = { - use crate::v037::codec::{Decode, Encode}; + let (request_stream, response_sink) = { ( - FramedRead::new(read, Decode::::default()), + FramedRead::new(read, Decode::::default()).peekable(), FramedWrite::new(write, Encode::::default()), ) }; + let stream_and_sink = Arc::new(Mutex::new((request_stream, response_sink))); + backoff::future::retry::<_, BoxError, _, _, _>(ExponentialBackoff::default(), || async { + let mut stream_and_sink = stream_and_sink.lock().await; + let run_result = self.clone().run(&mut stream_and_sink).await; + + if let Err(e) = run_result { + match e.downcast::() { + Err(e) => { + tracing::error!("error {e} in a connection handler"); + return Err(backoff::Error::Permanent(e)); + } + Ok(e) => { + tracing::warn!("a service is overloaded - backing off"); + return Err(backoff::Error::transient(e)); + } + } + } + Ok(()) + }) + .await + } + + async fn run( + mut self, + stream_and_sink: &mut StreamAndSink< + impl AsyncReadExt + std::marker::Unpin, + impl AsyncWriteExt + std::marker::Unpin, + >, + ) -> Result<(), BoxError> { + tracing::info!("listening for requests"); + + let (request_stream, response_sink) = stream_and_sink; + let mut pinned_stream = Pin::new(request_stream); let mut responses = FuturesOrdered::new(); + // We only peek the next request once it's popped from the request_stream + // to avoid crashing Tendermint in case the service call fails because + // it's e.g. overloaded. + let mut peeked_req = false; loop { select! { - req = request_stream.next() => { - let proto = match req.transpose()? { - Some(proto) => proto, + req = pinned_stream.as_mut().peek(), if !peeked_req => { + peeked_req = true; + let proto = match req { + Some(Ok(proto)) => proto.clone(), + Some(Err(_)) => return Err(pinned_stream.next().await.unwrap().unwrap_err()), None => return Ok(()), }; let request = Request::try_from(proto)?; @@ -266,17 +330,25 @@ where tracing::debug!(?response, "flushing response"); response_sink.send(response?.into()).await?; } + // Allow to peek next request if none of the `response?` above failed ... + peeked_req = false; + // ... and pop the last peeked request + pinned_stream.next().await.unwrap()?; // Now we need to tell Tendermint we've flushed responses response_sink.send(Response::Flush.into()).await?; } } } rsp = responses.next(), if !responses.is_empty() => { - let response = rsp.expect("didn't poll when responses was empty"); + let response = rsp.expect("didn't poll when responses was empty")?; + // Allow to peek next request if the `?` above didn't fail ... + peeked_req = false; + // ... and pop the last peeked request + pinned_stream.next().await.unwrap()?; // XXX: sometimes we might want to send errors to tendermint // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors tracing::debug!(?response, "sending response"); - response_sink.send(response?.into()).await?; + response_sink.send(response.into()).await?; } } }