Skip to content

Commit

Permalink
backport to 034
Browse files Browse the repository at this point in the history
  • Loading branch information
tzemanovic committed Nov 8, 2023
1 parent f8e6302 commit dd4299d
Showing 1 changed file with 119 additions and 36 deletions.
155 changes: 119 additions & 36 deletions src/v034/server.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
use std::convert::{TryFrom, TryInto};
use std::pin::Pin;
use std::sync::Arc;

use backoff::ExponentialBackoff;
use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use futures::stream::{FuturesOrdered, Peekable, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::Mutex;
use tokio::{
net::{TcpListener, ToSocketAddrs},
select,
};

use crate::v034::codec::{Decode, Encode};
use crate::BoxError;
use tendermint::abci::MethodKind;
use tendermint::v0_34::abci::response::Exception;
use tendermint::v0_34::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};
use tendermint_proto::v0_34::abci as pb;
use tokio_util::codec::{FramedRead, FramedWrite};
use tower::{Service, ServiceExt};

Expand Down Expand Up @@ -109,18 +116,25 @@ impl<C, M, I, S> Server<C, M, I, S>
where
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
C::Future: Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
M::Future: Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
I::Future: Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
S::Future: Send + 'static,
Expand All @@ -146,7 +160,7 @@ where
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
Expand Down Expand Up @@ -174,7 +188,7 @@ where
snapshot: self.snapshot.clone(),
};
let (read, write) = socket.into_split();
tokio::spawn(async move { conn.run(read, write).await.unwrap() });
tokio::spawn(async move { conn.run_with_backoff(read, write).await.unwrap() });
}
Err(e) => {
tracing::error!({ %e }, "error accepting new connection");
Expand All @@ -184,97 +198,166 @@ where
}
}

#[derive(Clone)]
struct Connection<C, M, I, S> {
consensus: C,
mempool: M,
info: I,
snapshot: S,
}

type StreamAndSink<R, W> = (
Peekable<FramedRead<R, Decode<pb::Request>>>,
FramedWrite<W, Encode<pb::Response>>,
);

impl<C, M, I, S> Connection<C, M, I, S>
where
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
C::Future: Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
M::Future: Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Clone + Send + 'static,
I::Future: Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
{
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(
mut self,
async fn run_with_backoff(
self,
read: impl AsyncReadExt + std::marker::Unpin,
write: impl AsyncWriteExt + std::marker::Unpin,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::v0_34::abci as pb;

let (mut request_stream, mut response_sink) = {
use crate::v034::codec::{Decode, Encode};
let (request_stream, response_sink) = {
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedRead::new(read, Decode::<pb::Request>::default()).peekable(),
FramedWrite::new(write, Encode::<pb::Response>::default()),
)
};

let stream_and_sink = Arc::new(Mutex::new((request_stream, response_sink)));
backoff::future::retry::<_, BoxError, _, _, _>(ExponentialBackoff::default(), || async {
let mut stream_and_sink = stream_and_sink.lock().await;
let run_result = self.clone().run(&mut stream_and_sink).await;

if let Err(e) = run_result {
match e.downcast::<tower::load_shed::error::Overloaded>() {
Err(e) => {
tracing::error!("error {e} in a connection handler");
return Err(backoff::Error::Permanent(e));
}
Ok(e) => {
tracing::warn!("a service is overloaded - backing off");
return Err(backoff::Error::transient(e));
}
}
}
Ok(())
})
.await
}

async fn run(
mut self,
stream_and_sink: &mut StreamAndSink<
impl AsyncReadExt + std::marker::Unpin,
impl AsyncWriteExt + std::marker::Unpin,
>,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

let (request_stream, response_sink) = stream_and_sink;
let mut pinned_stream = Pin::new(request_stream);
let mut responses = FuturesOrdered::new();

// We only peek the next request once it's popped from the request_stream
// to avoid crashing Tendermint in case the service call fails because
// it's e.g. overloaded.
let mut peeked_req = false;
loop {
select! {
req = request_stream.next() => {
let proto = match req.transpose()? {
Some(proto) => proto,
req = pinned_stream.as_mut().peek(), if !peeked_req => {
peeked_req = true;
let proto = match req {
Some(Ok(proto)) => proto.clone(),
Some(Err(_)) => return Err(pinned_stream.next().await.unwrap().unwrap_err()),
None => return Ok(()),
};
let request = Request::try_from(proto)?;
tracing::debug!(?request, "new request");
match request.kind() {
let kind = request.kind();
match &kind {
MethodKind::Consensus => {
let request = request.try_into().expect("checked kind");
let response = self.consensus.ready().await?.call(request);
// Need to box here for type erasure
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Mempool => {
let request = request.try_into().expect("checked kind");
let response = self.mempool.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Snapshot => {
let request = request.try_into().expect("checked kind");
let response = self.snapshot.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Info => {
let request = request.try_into().expect("checked kind");
let response = self.info.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Flush => {
// Instead of propagating Flush requests to the application,
// handle them here by awaiting all pending responses.
tracing::debug!(responses.len = responses.len(), "flushing responses");
while let Some(response) = responses.next().await {
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
while let Some((response, kind)) = responses.next().await {
tracing::debug!(?response, "flushing response");
response_sink.send(response?.into()).await?;
let response = match response {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
response_sink.send(response.into()).await?;
}
// Allow to peek next request if the `?` above didn't fail ...
peeked_req = false;
// ... and pop the last peeked request
pinned_stream.next().await.unwrap()?;
// Now we need to tell Tendermint we've flushed responses
response_sink.send(Response::Flush.into()).await?;
}
}
}
rsp = responses.next(), if !responses.is_empty() => {
let response = rsp.expect("didn't poll when responses was empty");
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
let (rsp, kind) = rsp.expect("didn't poll when responses was empty");
let response = match rsp {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
// Allow to peek next request if the `?` above didn't fail ...
peeked_req = false;
// ... and pop the last peeked request
pinned_stream.next().await.unwrap()?;
tracing::debug!(?response, "sending response");
response_sink.send(response?.into()).await?;
response_sink.send(response.into()).await?;
}
}
}
Expand Down

0 comments on commit dd4299d

Please sign in to comment.