Skip to content

Commit

Permalink
allow to drop failed info queries
Browse files Browse the repository at this point in the history
  • Loading branch information
tzemanovic committed Nov 8, 2023
1 parent 0da0f1f commit f8e6302
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions src/v037/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use std::path::Path;
use tokio::net::UnixListener;

use crate::v037::codec::{Decode, Encode};
use tendermint::v0_37::abci::response::Exception;
use tendermint::v0_37::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
Expand Down Expand Up @@ -298,37 +299,45 @@ where
};
let request = Request::try_from(proto)?;
tracing::debug!(?request, "new request");
match request.kind() {
let kind = request.kind();
match &kind {
MethodKind::Consensus => {
let request = request.try_into().expect("checked kind");
let response = self.consensus.ready().await?.call(request);
// Need to box here for type erasure
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Mempool => {
let request = request.try_into().expect("checked kind");
let response = self.mempool.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Snapshot => {
let request = request.try_into().expect("checked kind");
let response = self.snapshot.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Info => {
let request = request.try_into().expect("checked kind");
let response = self.info.ready().await?.call(request);
responses.push_back(response.map_ok(Response::from).boxed());
responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed());
}
MethodKind::Flush => {
// Instead of propagating Flush requests to the application,
// handle them here by awaiting all pending responses.
tracing::debug!(responses.len = responses.len(), "flushing responses");
while let Some(response) = responses.next().await {
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
while let Some((response, kind)) = responses.next().await {
tracing::debug!(?response, "flushing response");
response_sink.send(response?.into()).await?;
let response = match response {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
response_sink.send(response.into()).await?;
}
// Allow to peek next request if none of the `response?` above failed ...
peeked_req = false;
Expand All @@ -340,13 +349,20 @@ where
}
}
rsp = responses.next(), if !responses.is_empty() => {
let response = rsp.expect("didn't poll when responses was empty")?;
let (rsp, kind) = rsp.expect("didn't poll when responses was empty");
let response = match rsp {
Ok(rsp) => rsp,
Err(err) => match kind {
// TODO: allow to fail on Snapshot?
MethodKind::Info =>
Response::Exception(Exception{error:err.to_string()}),
_ => return Err(err)
}
};
// Allow to peek next request if the `?` above didn't fail ...
peeked_req = false;
// ... and pop the last peeked request
pinned_stream.next().await.unwrap()?;
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
tracing::debug!(?response, "sending response");
response_sink.send(response.into()).await?;
}
Expand Down

0 comments on commit f8e6302

Please sign in to comment.