diff --git a/src/v037/server.rs b/src/v037/server.rs index 2e3c1b4..088ccbe 100644 --- a/src/v037/server.rs +++ b/src/v037/server.rs @@ -25,6 +25,7 @@ use std::path::Path; use tokio::net::UnixListener; use crate::v037::codec::{Decode, Encode}; +use tendermint::v0_37::abci::response::Exception; use tendermint::v0_37::abci::{ ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, @@ -298,37 +299,45 @@ where }; let request = Request::try_from(proto)?; tracing::debug!(?request, "new request"); - match request.kind() { + let kind = request.kind(); + match &kind { MethodKind::Consensus => { let request = request.try_into().expect("checked kind"); let response = self.consensus.ready().await?.call(request); // Need to box here for type erasure - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Mempool => { let request = request.try_into().expect("checked kind"); let response = self.mempool.ready().await?.call(request); - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Snapshot => { let request = request.try_into().expect("checked kind"); let response = self.snapshot.ready().await?.call(request); - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Info => { let request = request.try_into().expect("checked kind"); let response = self.info.ready().await?.call(request); - responses.push_back(response.map_ok(Response::from).boxed()); + responses.push_back(response.map_ok(Response::from).map(|r| (r, kind)).boxed()); } MethodKind::Flush => { // Instead of propagating Flush requests to the application, // handle them here by awaiting all pending responses. tracing::debug!(responses.len = responses.len(), "flushing responses"); - while let Some(response) = responses.next().await { - // XXX: sometimes we might want to send errors to tendermint - // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors + while let Some((response, kind)) = responses.next().await { tracing::debug!(?response, "flushing response"); - response_sink.send(response?.into()).await?; + let response = match response { + Ok(rsp) => rsp, + Err(err) => match kind { + // TODO: allow to fail on Snapshot? + MethodKind::Info => + Response::Exception(Exception{error:err.to_string()}), + _ => return Err(err) + } + }; + response_sink.send(response.into()).await?; } // Allow to peek next request if none of the `response?` above failed ... peeked_req = false; @@ -340,13 +349,20 @@ where } } rsp = responses.next(), if !responses.is_empty() => { - let response = rsp.expect("didn't poll when responses was empty")?; + let (rsp, kind) = rsp.expect("didn't poll when responses was empty"); + let response = match rsp { + Ok(rsp) => rsp, + Err(err) => match kind { + // TODO: allow to fail on Snapshot? + MethodKind::Info => + Response::Exception(Exception{error:err.to_string()}), + _ => return Err(err) + } + }; // Allow to peek next request if the `?` above didn't fail ... peeked_req = false; // ... and pop the last peeked request pinned_stream.next().await.unwrap()?; - // XXX: sometimes we might want to send errors to tendermint - // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors tracing::debug!(?response, "sending response"); response_sink.send(response.into()).await?; }