From dd4299da14b38f8f08b6148c7a331b15718ea299 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1=C5=A1=20Zemanovi=C4=8D?= Date: Wed, 8 Nov 2023 09:32:53 +0100 Subject: [PATCH] backport to 034 --- src/v034/server.rs | 155 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 119 insertions(+), 36 deletions(-) diff --git a/src/v034/server.rs b/src/v034/server.rs index 8119624..519012a 100644 --- a/src/v034/server.rs +++ b/src/v034/server.rs @@ -1,20 +1,27 @@ use std::convert::{TryFrom, TryInto}; +use std::pin::Pin; +use std::sync::Arc; +use backoff::ExponentialBackoff; use futures::future::{FutureExt, TryFutureExt}; use futures::sink::SinkExt; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{FuturesOrdered, Peekable, StreamExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::sync::Mutex; use tokio::{ net::{TcpListener, ToSocketAddrs}, select, }; +use crate::v034::codec::{Decode, Encode}; use crate::BoxError; use tendermint::abci::MethodKind; +use tendermint::v0_34::abci::response::Exception; use tendermint::v0_34::abci::{ ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, }; +use tendermint_proto::v0_34::abci as pb; use tokio_util::codec::{FramedRead, FramedWrite}; use tower::{Service, ServiceExt}; @@ -109,18 +116,25 @@ impl Server where C: Service + Send + + Sync + Clone + 'static, C::Future: Send + 'static, M: Service + Send + + Sync + Clone + 'static, M::Future: Send + 'static, - I: Service + Send + Clone + 'static, + I: Service + + Send + + Sync + + Clone + + 'static, I::Future: Send + 'static, S: Service + Send + + Sync + Clone + 'static, S::Future: Send + 'static, @@ -146,7 +160,7 @@ where snapshot: self.snapshot.clone(), }; let (read, write) = socket.into_split(); - tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() }); } Err(e) => { tracing::error!({ %e }, "error accepting new connection"); @@ -174,7 +188,7 @@ where snapshot: self.snapshot.clone(), }; let (read, write) = socket.into_split(); - tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() }); } Err(e) => { tracing::error!({ %e }, "error accepting new connection"); @@ -184,6 +198,7 @@ where } } +#[derive(Clone)] struct Connection { consensus: C, mempool: M, @@ -191,90 +206,158 @@ struct Connection { snapshot: S, } +type StreamAndSink = ( + Peekable>>, + FramedWrite>, +); + impl Connection where - C: Service + Send + 'static, + C: Service + + Clone + + Send + + 'static, C::Future: Send + 'static, - M: Service + Send + 'static, + M: Service + + Clone + + Send + + 'static, M::Future: Send + 'static, - I: Service + Send + 'static, + I: Service + Clone + Send + 'static, I::Future: Send + 'static, - S: Service + Send + 'static, + S: Service + + Clone + + Send + + 'static, S::Future: Send + 'static, { - // XXX handle errors gracefully - // figure out how / if to return errors to tendermint - async fn run( - mut self, + async fn run_with_backoff( + self, read: impl AsyncReadExt + std::marker::Unpin, write: impl AsyncWriteExt + std::marker::Unpin, ) -> Result<(), BoxError> { - tracing::info!("listening for requests"); - - use tendermint_proto::v0_34::abci as pb; - - let (mut request_stream, mut response_sink) = { - use crate::v034::codec::{Decode, Encode}; + let (request_stream, response_sink) = { ( - FramedRead::new(read, Decode::::default()), + FramedRead::new(read, Decode::::default()).peekable(), FramedWrite::new(write, Encode::::default()), ) }; + let stream_and_sink = Arc::new(Mutex::new((request_stream, response_sink))); + backoff::future::retry::<_, BoxError, _, _, _>(ExponentialBackoff::default(), || async { + let mut stream_and_sink = stream_and_sink.lock().await; + let run_result = self.clone().run(&mut stream_and_sink).await; + + if let Err(e) = run_result { + match e.downcast::() { + Err(e) => { + tracing::error!("error {e} in a connection handler"); + return Err(backoff::Error::Permanent(e)); + } + Ok(e) => { + tracing::warn!("a service is overloaded - backing off"); + return Err(backoff::Error::transient(e)); + } + } + } + Ok(()) + }) + .await + } + + async fn run( + mut self, + stream_and_sink: &mut StreamAndSink< + impl AsyncReadExt + std::marker::Unpin, + impl AsyncWriteExt + std::marker::Unpin, + >, + ) -> Result<(), BoxError> { + tracing::info!("listening for requests"); + + let (request_stream, response_sink) = stream_and_sink; + let mut pinned_stream = Pin::new(request_stream); let mut responses = FuturesOrdered::new(); + // We only peek the next request once it's popped from the request_stream + // to avoid crashing Tendermint in case the service call fails because + // it's e.g. overloaded. + let mut peeked_req = false; loop { select! { - req = request_stream.next() => { - let proto = match req.transpose()? { - Some(proto) => proto, + req = pinned_stream.as_mut().peek(), if !peeked_req => { + peeked_req = true; + let proto = match req { + Some(Ok(proto)) => proto.clone(), + Some(Err(_)) => return Err(pinned_stream.next().await.unwrap().unwrap_err()), None => return Ok(()), }; let request = Request::try_from(proto)?; - tracing::debug!(?request, "new request"); - match request.kind() { + let kind = request.kind(); + match &kind { MethodKind::Consensus => { let request = request.try_into().expect("checked kind"); let response = self.consensus.ready().await?.call(request); // Need to box here for type erasure - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Mempool => { let request = request.try_into().expect("checked kind"); let response = self.mempool.ready().await?.call(request); - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Snapshot => { let request = request.try_into().expect("checked kind"); let response = self.snapshot.ready().await?.call(request); - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Info => { let request = request.try_into().expect("checked kind"); let response = self.info.ready().await?.call(request); - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Flush => { // Instead of propagating Flush requests to the application, // handle them here by awaiting all pending responses. tracing::debug!(responses.len = responses.len(), "flushing responses"); - while let Some(response) = responses.next().await { - // XXX: sometimes we might want to send errors to tendermint - // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors + while let Some((response, kind)) = responses.next().await { tracing::debug!(?response, "flushing response"); - response_sink.send(response?.into()).await?; + let response = match response { + Ok(rsp) => rsp, + Err(err) => match kind { + // TODO: allow to fail on Snapshot? + MethodKind::Info => + Response::Exception(Exception{error:err.to_string()}), + _ => return Err(err) + } + }; + response_sink.send(response.into()).await?; } + // Allow to peek next request if the `?` above didn't fail ... + peeked_req = false; + // ... and pop the last peeked request + pinned_stream.next().await.unwrap()?; // Now we need to tell Tendermint we've flushed responses response_sink.send(Response::Flush.into()).await?; } } } rsp = responses.next(), if !responses.is_empty() => { - let response = rsp.expect("didn't poll when responses was empty"); - // XXX: sometimes we might want to send errors to tendermint - // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors + let (rsp, kind) = rsp.expect("didn't poll when responses was empty"); + let response = match rsp { + Ok(rsp) => rsp, + Err(err) => match kind { + // TODO: allow to fail on Snapshot? + MethodKind::Info => + Response::Exception(Exception{error:err.to_string()}), + _ => return Err(err) + } + }; + // Allow to peek next request if the `?` above didn't fail ... + peeked_req = false; + // ... and pop the last peeked request + pinned_stream.next().await.unwrap()?; tracing::debug!(?response, "sending response"); - response_sink.send(response?.into()).await?; + response_sink.send(response.into()).await?; } } }