From 995a0a32162b3e5e98b1b7498b7314fcbb2fb6be Mon Sep 17 00:00:00 2001 From: Erwan Date: Sat, 17 Feb 2024 18:05:29 -0500 Subject: [PATCH] tower-abci: support CometBFT `0.38.x` --- examples/kvstore_38/main.rs | 237 +++++++++++++++++++++++++ src/lib.rs | 8 + src/v038/codec.rs | 110 ++++++++++++ src/v038/server.rs | 278 ++++++++++++++++++++++++++++++ src/v038/split.rs | 335 ++++++++++++++++++++++++++++++++++++ 5 files changed, 968 insertions(+) create mode 100644 examples/kvstore_38/main.rs create mode 100644 src/v038/codec.rs create mode 100644 src/v038/server.rs create mode 100644 src/v038/split.rs diff --git a/examples/kvstore_38/main.rs b/examples/kvstore_38/main.rs new file mode 100644 index 0000000..2767d90 --- /dev/null +++ b/examples/kvstore_38/main.rs @@ -0,0 +1,237 @@ +//! Example ABCI application, an in-memory key-value store. + +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use bytes::Bytes; +use futures::future::FutureExt; +use structopt::StructOpt; +use tower::{Service, ServiceBuilder}; + +use tendermint::{ + abci::{ + response::{self, PrepareProposal}, + Event, EventAttributeIndexExt, + }, + v0_38::abci::request, +}; + +use tower_abci::{ + v038::{split, Server}, + BoxError, +}; + +use tendermint::abci::types::ExecTxResult; +use tendermint::v0_38::abci::{Request, Response}; + +/// In-memory, hashmap-backed key-value store ABCI application. +#[derive(Clone, Debug, Default)] +pub struct KVStore { + store: HashMap, + height: u32, + app_hash: [u8; 8], +} + +impl Service for KVStore { + type Response = Response; + type Error = BoxError; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + tracing::info!(?req); + + let rsp = match req { + // handled messages + Request::Info(_) => Response::Info(self.info()), + Request::Query(query) => Response::Query(self.query(query.data)), + // Note: https://github.com/tendermint/tendermint/blob/v0.38.x/spec/abci/abci%2B%2B_tmint_expected_behavior.md#adapting-existing-applications-that-use-abci + Request::PrepareProposal(prepare_prop) => Response::PrepareProposal(PrepareProposal { + txs: prepare_prop.txs, + }), + Request::ProcessProposal(..) => { + Response::ProcessProposal(response::ProcessProposal::Accept) + } + Request::ExtendVote(vote) => Response::ExtendVote(self.extend_vote(vote)), + Request::VerifyVoteExtension(vote) => { + Response::VerifyVoteExtension(self.verify_vote(vote)) + } + Request::FinalizeBlock(block) => Response::FinalizeBlock(self.finalize_block(block)), + Request::Commit => Response::Commit(self.commit()), + + // unhandled messages + Request::Flush => Response::Flush, + Request::Echo(_) => Response::Echo(Default::default()), + Request::InitChain(_) => Response::InitChain(Default::default()), + Request::CheckTx(_) => Response::CheckTx(Default::default()), + Request::ListSnapshots => Response::ListSnapshots(Default::default()), + Request::OfferSnapshot(_) => Response::OfferSnapshot(Default::default()), + Request::LoadSnapshotChunk(_) => Response::LoadSnapshotChunk(Default::default()), + Request::ApplySnapshotChunk(_) => Response::ApplySnapshotChunk(Default::default()), + }; + tracing::info!(?rsp); + async move { Ok(rsp) }.boxed() + } +} + +impl KVStore { + fn info(&self) -> response::Info { + response::Info { + data: "tower-abci-kvstore-example".to_string(), + version: "0.1.0".to_string(), + app_version: 1, + last_block_height: self.height.into(), + last_block_app_hash: self.app_hash.to_vec().try_into().unwrap(), + } + } + + fn query(&self, query: Bytes) -> response::Query { + let key = String::from_utf8(query.to_vec()).unwrap(); + let (value, log) = match self.store.get(&key) { + Some(value) => (value.clone(), "exists".to_string()), + None => ("".to_string(), "does not exist".to_string()), + }; + + response::Query { + log, + key: key.into_bytes().into(), + value: value.into_bytes().into(), + ..Default::default() + } + } + + fn execute_tx(&mut self, tx: Bytes) -> ExecTxResult { + let tx = String::from_utf8(tx.to_vec()).unwrap(); + let tx_parts = tx.split('=').collect::>(); + let (key, value) = match (tx_parts.first(), tx_parts.get(1)) { + (Some(key), Some(value)) => (*key, *value), + _ => (tx.as_ref(), tx.as_ref()), + }; + self.store.insert(key.to_string(), value.to_string()); + + ExecTxResult { + events: vec![Event::new( + "app", + vec![ + ("key", key).index(), + ("index_key", "index is working").index(), + ("noindex_key", "noindex is working").no_index(), + ], + )], + ..Default::default() + } + } + + fn finalize_block(&mut self, block: request::FinalizeBlock) -> response::FinalizeBlock { + let mut tx_results = Vec::new(); + for tx in block.txs { + tx_results.push(self.execute_tx(tx)); + } + response::FinalizeBlock { + events: vec![Event::new( + "app", + vec![("num_tx", format!("{}", tx_results.len())).index()], + )], + tx_results, + validator_updates: vec![], + consensus_param_updates: None, + app_hash: self + .compute_apphash() + .to_vec() + .try_into() + .expect("vec to `AppHash` conversion is actually infaillible."), + } + } + + fn commit(&mut self) -> response::Commit { + let retain_height = self.height.into(); + // As in the other kvstore examples, just use store.len() as the "hash" + self.app_hash = self.compute_apphash(); + self.height += 1; + + response::Commit { + // This field is ignored for CometBFT >= 0.38 + data: Bytes::default(), + retain_height, + } + } + + fn extend_vote(&self, _vote: request::ExtendVote) -> response::ExtendVote { + response::ExtendVote { + vote_extension: Bytes::default(), + } + } + + fn verify_vote(&self, _vote: request::VerifyVoteExtension) -> response::VerifyVoteExtension { + response::VerifyVoteExtension::Accept + } + + fn compute_apphash(&self) -> [u8; 8] { + (self.store.len() as u64).to_be_bytes() + } +} + +#[derive(Debug, StructOpt)] +struct Opt { + /// Bind the TCP server to this host. + #[structopt(short, long, default_value = "127.0.0.1")] + host: String, + + /// Bind the TCP server to this port. + #[structopt(short, long, default_value = "26658")] + port: u16, + + /// Bind the UDS server to this path + #[structopt(long)] + uds: Option, +} + +#[tokio::main] +async fn main() { + tracing_subscriber::fmt::init(); + let opt = Opt::from_args(); + + // Construct our ABCI application. + let service = KVStore::default(); + + // Split it into components. + let (consensus, mempool, snapshot, info) = split::service(service, 1); + + // Hand those components to the ABCI server, but customize request behavior + // for each category -- for instance, apply load-shedding only to mempool + // and info requests, but not to consensus requests. + let server_builder = Server::builder() + .consensus(consensus) + .snapshot(snapshot) + .mempool( + ServiceBuilder::new() + .load_shed() + .buffer(10) + .service(mempool), + ) + .info( + ServiceBuilder::new() + .load_shed() + .buffer(100) + .rate_limit(50, std::time::Duration::from_secs(1)) + .service(info), + ); + + let server = server_builder.finish().unwrap(); + + if let Some(uds_path) = opt.uds { + server.listen_unix(uds_path).await.unwrap(); + } else { + server + .listen_tcp(format!("{}:{}", opt.host, opt.port)) + .await + .unwrap(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 536cf28..a06c9b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,5 +21,13 @@ pub mod v037 { pub use server::ServerBuilder; } +pub mod v038 { + mod codec; + mod server; + pub mod split; + pub use server::Server; + pub use server::ServerBuilder; +} + /// A convenient error type alias. pub type BoxError = Box; diff --git a/src/v038/codec.rs b/src/v038/codec.rs new file mode 100644 index 0000000..e99cbe7 --- /dev/null +++ b/src/v038/codec.rs @@ -0,0 +1,110 @@ +use std::marker::PhantomData; + +use tokio_util::codec::{Decoder, Encoder}; + +use bytes::{BufMut, BytesMut}; + +pub struct Decode { + state: DecodeState, + _marker: PhantomData, +} + +impl Default for Decode { + fn default() -> Self { + Self { + state: DecodeState::Head, + _marker: PhantomData, + } + } +} + +#[derive(Debug)] +enum DecodeState { + Head, + Body { len: usize }, +} + +impl Decoder for Decode { + type Item = M; + type Error = crate::BoxError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + match self.state { + DecodeState::Head => { + tracing::trace!(?src, "decoding head"); + // we don't use decode_varint directly, because it advances the + // buffer regardless of success, but Decoder assumes that when + // the buffer advances we've consumed the data. this is sort of + // a sad hack, but it works. + // TODO(erwan): fix this + + // Tendermint socket protocol: + // "Messages are serialized using Protobuf3 and length-prefixed + // with an unsigned varint" + // See: https://github.com/tendermint/tendermint/blob/v0.38.x/spec/abci/abci++_client_server.md#socket + let mut tmp = src.clone().freeze(); + let len = match prost::encoding::decode_varint(&mut tmp) { + Ok(_) => { + // advance the real buffer + prost::encoding::decode_varint(src).unwrap() as usize + } + Err(_) => { + tracing::trace!(?self.state, src.len = src.len(), "waiting for header data"); + return Ok(None); + } + }; + self.state = DecodeState::Body { len }; + tracing::trace!(?self.state, "ready for body"); + + // Recurse to attempt body decoding. + self.decode(src) + } + DecodeState::Body { len } => { + if src.len() < len { + tracing::trace!(?self.state, src.len = src.len(), "waiting for body"); + return Ok(None); + } + + let body = src.split_to(len); + tracing::trace!(?body, "decoding body"); + let message = M::decode(body)?; + + // Now reset the decoder state for the next message. + self.state = DecodeState::Head; + + Ok(Some(message)) + } + } + } +} + +pub struct Encode { + _marker: PhantomData, +} + +impl Default for Encode { + fn default() -> Self { + Self { + _marker: PhantomData, + } + } +} + +impl Encoder for Encode { + type Error = crate::BoxError; + + fn encode(&mut self, item: M, dst: &mut BytesMut) -> Result<(), Self::Error> { + let mut buf = BytesMut::new(); + item.encode(&mut buf)?; + let buf = buf.freeze(); + + // Tendermint socket protocol: + // "Messages are serialized using Protobuf3 and length-prefixed + // with an unsigned varint" + // See: https://github.com/tendermint/tendermint/blob/v0.38.x/spec/abci/abci++_client_server.md#socket + prost::encoding::encode_varint(buf.len() as u64, dst); + dst.put(buf); + + Ok(()) + } +} diff --git a/src/v038/server.rs b/src/v038/server.rs new file mode 100644 index 0000000..881fdb1 --- /dev/null +++ b/src/v038/server.rs @@ -0,0 +1,278 @@ +use std::convert::{TryFrom, TryInto}; +use std::path::Path; + +use futures::future::{FutureExt, TryFutureExt}; +use futures::sink::SinkExt; +use futures::stream::{FuturesOrdered, StreamExt}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::{ + net::{TcpListener, ToSocketAddrs, UnixListener}, + select, +}; +use tokio_util::codec::{FramedRead, FramedWrite}; +use tower::{Service, ServiceExt}; + +use crate::BoxError; +use tendermint::abci::MethodKind; + +use tendermint::v0_38::abci::{ + ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, + MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, +}; + +/// An ABCI server which listens for connections and forwards requests to four +/// component ABCI [`Service`]s. +pub struct Server { + consensus: C, + mempool: M, + info: I, + snapshot: S, +} + +pub struct ServerBuilder { + consensus: Option, + mempool: Option, + info: Option, + snapshot: Option, +} + +impl Default for ServerBuilder { + fn default() -> Self { + Self { + consensus: None, + mempool: None, + info: None, + snapshot: None, + } + } +} + +impl ServerBuilder +where + C: Service + + Send + + Clone + + 'static, + C::Future: Send + 'static, + M: Service + + Send + + Clone + + 'static, + M::Future: Send + 'static, + I: Service + Send + Clone + 'static, + I::Future: Send + 'static, + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ + pub fn consensus(mut self, consensus: C) -> Self { + self.consensus = Some(consensus); + self + } + + pub fn mempool(mut self, mempool: M) -> Self { + self.mempool = Some(mempool); + self + } + + pub fn info(mut self, info: I) -> Self { + self.info = Some(info); + self + } + + pub fn snapshot(mut self, snapshot: S) -> Self { + self.snapshot = Some(snapshot); + self + } + + pub fn finish(self) -> Option> { + let consensus = self.consensus?; + let mempool = self.mempool?; + let info = self.info?; + let snapshot = self.snapshot?; + + Some(Server { + consensus, + mempool, + info, + snapshot, + }) + } +} + +impl Server +where + C: Service + + Send + + Clone + + 'static, + C::Future: Send + 'static, + M: Service + + Send + + Clone + + 'static, + M::Future: Send + 'static, + I: Service + Send + Clone + 'static, + I::Future: Send + 'static, + S: Service + + Send + + Clone + + 'static, + S::Future: Send + 'static, +{ + pub fn builder() -> ServerBuilder { + ServerBuilder::default() + } + + pub async fn listen_unix(self, path: impl AsRef) -> Result<(), BoxError> { + let listener = UnixListener::bind(path)?; + let addr = listener.local_addr()?; + tracing::info!(?addr, "ABCI server starting on uds"); + + loop { + match listener.accept().await { + Ok((socket, _addr)) => { + tracing::debug!(?_addr, "accepted new connection"); + let conn = Connection { + consensus: self.consensus.clone(), + mempool: self.mempool.clone(), + info: self.info.clone(), + snapshot: self.snapshot.clone(), + }; + let (read, write) = socket.into_split(); + tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + } + Err(e) => { + tracing::error!({ %e }, "error accepting new connection"); + } + } + } + } + + pub async fn listen_tcp( + self, + addr: A, + ) -> Result<(), BoxError> { + let listener = TcpListener::bind(addr).await?; + let addr = listener.local_addr()?; + tracing::info!(?addr, "ABCI server starting on tcp socket"); + + loop { + match listener.accept().await { + Ok((socket, _addr)) => { + tracing::debug!(?_addr, "accepted new connection"); + let conn = Connection { + consensus: self.consensus.clone(), + mempool: self.mempool.clone(), + info: self.info.clone(), + snapshot: self.snapshot.clone(), + }; + let (read, write) = socket.into_split(); + tokio::spawn(async move { conn.run(read, write).await.unwrap() }); + } + Err(e) => { + tracing::error!({ %e }, "error accepting new connection"); + } + } + } + } +} + +struct Connection { + consensus: C, + mempool: M, + info: I, + snapshot: S, +} + +impl Connection +where + C: Service + Send + 'static, + C::Future: Send + 'static, + M: Service + Send + 'static, + M::Future: Send + 'static, + I: Service + Send + 'static, + I::Future: Send + 'static, + S: Service + Send + 'static, + S::Future: Send + 'static, +{ + // XXX handle errors gracefully + // figure out how / if to return errors to tendermint + async fn run( + mut self, + read: impl AsyncReadExt + std::marker::Unpin, + write: impl AsyncWriteExt + std::marker::Unpin, + ) -> Result<(), BoxError> { + tracing::info!("listening for requests"); + + use tendermint_proto::v0_38::abci as pb; + + let (mut request_stream, mut response_sink) = { + use crate::v038::codec::{Decode, Encode}; + ( + FramedRead::new(read, Decode::::default()), + FramedWrite::new(write, Encode::::default()), + ) + }; + + let mut responses = FuturesOrdered::new(); + + loop { + select! { + req = request_stream.next() => { + let proto = match req.transpose()? { + Some(proto) => proto, + None => return Ok(()), + }; + let request = Request::try_from(proto)?; + tracing::debug!(?request, "new request"); + match request.kind() { + MethodKind::Consensus => { + let request = request.try_into().expect("checked kind"); + let response = self.consensus.ready().await?.call(request); + // Need to box here for type erasure + responses.push_back(response.map_ok(Response::from).boxed()); + } + MethodKind::Mempool => { + let request = request.try_into().expect("checked kind"); + let response = self.mempool.ready().await?.call(request); + responses.push_back(response.map_ok(Response::from).boxed()); + } + MethodKind::Snapshot => { + let request = request.try_into().expect("checked kind"); + let response = self.snapshot.ready().await?.call(request); + responses.push_back(response.map_ok(Response::from).boxed()); + } + MethodKind::Info => { + let request = request.try_into().expect("checked kind"); + let response = self.info.ready().await?.call(request); + responses.push_back(response.map_ok(Response::from).boxed()); + } + MethodKind::Flush => { + // Instead of propagating Flush requests to the application, + // handle them here by awaiting all pending responses. + tracing::debug!(responses.len = responses.len(), "flushing responses"); + while let Some(response) = responses.next().await { + // XXX: sometimes we might want to send errors to tendermint + // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors + tracing::debug!(?response, "flushing response"); + response_sink.send(response?.into()).await?; + } + // Now we need to tell Tendermint we've flushed responses + response_sink.send(Response::Flush.into()).await?; + } + } + } + rsp = responses.next(), if !responses.is_empty() => { + let response = rsp.expect("didn't poll when responses was empty"); + // XXX: sometimes we might want to send errors to tendermint + // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors + tracing::debug!(?response, "sending response"); + response_sink.send(response?.into()).await?; + } + } + } + } +} diff --git a/src/v038/split.rs b/src/v038/split.rs new file mode 100644 index 0000000..b58344d --- /dev/null +++ b/src/v038/split.rs @@ -0,0 +1,335 @@ +//! Splits a single [`Service`] implementing all of ABCI into four cloneable +//! component services, each implementing one category of ABCI requests. +//! +//! The component services share access to the main service via message-passing +//! over buffered channels. This means that the component services can be cloned +//! to provide shared access to the ABCI application, which processes +//! requests sequentially with the following prioritization: +//! +//! 1. [`ConsensusRequest`]s sent to the [`Consensus`] service; +//! 2. [`MempoolRequest`]s sent to the [`Mempool`] service; +//! 3. [`SnapshotRequest`]s sent to the [`Snapshot`] service; +//! 4. [`InfoRequest`]s sent to the [`Info`] service. +//! +//! The ABCI service can execute these requests synchronously, in +//! [`Service::call`](tower::Service::call), or asynchronously, by immediately +//! returning a future that will be executed on the caller's task. Or, it can +//! split the difference and perform some amount of synchronous work and defer +//! the rest to be performed asynchronously. +//! +//! Because each category of requests is handled by a different service, request +//! behavior can be customized on a per-category basis using Tower +//! [`Layer`](tower::Layer)s. For instance, load-shedding can be added to +//! [`InfoRequest`]s but not [`ConsensusRequest`]s, or different categories can +//! have different timeout policies, or different types of instrumentation. + +use std::task::{Context, Poll}; + +use tower::Service; + +use crate::{buffer4::Buffer, BoxError}; +use tendermint::v0_38::abci::{ + ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest, + MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse, +}; + +/// Splits a single `service` implementing all of ABCI into four cloneable +/// component services, each implementing one category of ABCI requests. See the +/// module documentation for details. +/// +/// The `bound` parameter bounds the size of each component's request queue. For +/// the same reason as in Tower's [`Buffer`](tower::buffer::Buffer) middleware, +/// it's advisable to set the `bound` to be at least the number of concurrent +/// requests to the component services. However, large buffers hide backpressure +/// from propagating to the caller. +pub fn service(service: S, bound: usize) -> (Consensus, Mempool, Snapshot, Info) +where + S: Service + Send + 'static, + S::Future: Send + 'static, +{ + let bound = std::cmp::max(1, bound); + let (buffer1, buffer2, buffer3, buffer4) = Buffer::new(service, bound); + + ( + Consensus { inner: buffer1 }, + Mempool { inner: buffer2 }, + Snapshot { inner: buffer3 }, + Info { inner: buffer4 }, + ) +} + +/// Forwards consensus requests to a shared backing service. +pub struct Consensus +where + S: Service, +{ + inner: Buffer, +} + +// Implementing Clone manually avoids an (incorrect) derived S: Clone bound +impl Clone for Consensus +where + S: Service, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Service for Consensus +where + S: Service, +{ + type Response = ConsensusResponse; + type Error = BoxError; + type Future = futures::ConsensusFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: ConsensusRequest) -> Self::Future { + futures::ConsensusFuture { + inner: self.inner.call(req.into()), + } + } +} + +/// Forwards mempool requests to a shared backing service. +pub struct Mempool +where + S: Service, +{ + inner: Buffer, +} + +// Implementing Clone manually avoids an (incorrect) derived S: Clone bound +impl Clone for Mempool +where + S: Service, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Service for Mempool +where + S: Service, +{ + type Response = MempoolResponse; + type Error = BoxError; + type Future = futures::MempoolFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: MempoolRequest) -> Self::Future { + futures::MempoolFuture { + inner: self.inner.call(req.into()), + } + } +} + +/// Forwards info requests to a shared backing service. +pub struct Info +where + S: Service, +{ + inner: Buffer, +} + +// Implementing Clone manually avoids an (incorrect) derived S: Clone bound +impl Clone for Info +where + S: Service, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Service for Info +where + S: Service, +{ + type Response = InfoResponse; + type Error = BoxError; + type Future = futures::InfoFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: InfoRequest) -> Self::Future { + futures::InfoFuture { + inner: self.inner.call(req.into()), + } + } +} + +/// Forwards snapshot requests to a shared backing service. +pub struct Snapshot +where + S: Service, +{ + inner: Buffer, +} + +// Implementing Clone manually avoids an (incorrect) derived S: Clone bound +impl Clone for Snapshot +where + S: Service, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + } + } +} + +impl Service for Snapshot +where + S: Service, +{ + type Response = SnapshotResponse; + type Error = BoxError; + type Future = futures::SnapshotFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: SnapshotRequest) -> Self::Future { + futures::SnapshotFuture { + inner: self.inner.call(req.into()), + } + } +} + +// this is all "necessary" only because rust does not support full GATs or allow +// specifying a concrete (but unnameable) associated type using impl Trait. +// this means that Tower services either have to have handwritten futures +// or box the futures they return. Boxing a few futures is not really a big deal +// but it's nice to avoid deeply nested boxes arising from service combinators like +// these ones. + +// https://github.com/rust-lang/rust/issues/63063 fixes this +/// Futures types. +pub mod futures { + use pin_project::pin_project; + use std::{convert::TryInto, future::Future, pin::Pin}; + + use super::*; + + #[pin_project] + pub struct ConsensusFuture + where + S: Service, + { + #[pin] + pub(super) inner: as Service>::Future, + } + + impl Future for ConsensusFuture + where + S: Service, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Ready(rsp) => Poll::Ready( + rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")), + ), + Poll::Pending => Poll::Pending, + } + } + } + + #[pin_project] + pub struct MempoolFuture + where + S: Service, + { + #[pin] + pub(super) inner: as Service>::Future, + } + + impl Future for MempoolFuture + where + S: Service, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Ready(rsp) => Poll::Ready( + rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")), + ), + Poll::Pending => Poll::Pending, + } + } + } + + #[pin_project] + pub struct InfoFuture + where + S: Service, + { + #[pin] + pub(super) inner: as Service>::Future, + } + + impl Future for InfoFuture + where + S: Service, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Ready(rsp) => Poll::Ready( + rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")), + ), + Poll::Pending => Poll::Pending, + } + } + } + + #[pin_project] + pub struct SnapshotFuture + where + S: Service, + { + #[pin] + pub(super) inner: as Service>::Future, + } + + impl Future for SnapshotFuture + where + S: Service, + { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.inner.poll(cx) { + Poll::Ready(rsp) => Poll::Ready( + rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")), + ), + Poll::Pending => Poll::Pending, + } + } + } +}