Skip to content

Commit

Permalink
tower-abci: support CometBFT 0.38.x (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor authored Mar 1, 2024
1 parent bc0352e commit 1cf2085
Show file tree
Hide file tree
Showing 5 changed files with 968 additions and 0 deletions.
237 changes: 237 additions & 0 deletions examples/kvstore_38/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
//! Example ABCI application, an in-memory key-value store.

use std::{
collections::HashMap,
future::Future,
pin::Pin,
task::{Context, Poll},
};

use bytes::Bytes;
use futures::future::FutureExt;
use structopt::StructOpt;
use tower::{Service, ServiceBuilder};

use tendermint::{
abci::{
response::{self, PrepareProposal},
Event, EventAttributeIndexExt,
},
v0_38::abci::request,
};

use tower_abci::{
v038::{split, Server},
BoxError,
};

use tendermint::abci::types::ExecTxResult;
use tendermint::v0_38::abci::{Request, Response};

/// In-memory, hashmap-backed key-value store ABCI application.
#[derive(Clone, Debug, Default)]
pub struct KVStore {
store: HashMap<String, String>,
height: u32,
app_hash: [u8; 8],
}

impl Service<Request> for KVStore {
type Response = Response;
type Error = BoxError;
type Future = Pin<Box<dyn Future<Output = Result<Response, BoxError>> + Send + 'static>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request) -> Self::Future {
tracing::info!(?req);

let rsp = match req {
// handled messages
Request::Info(_) => Response::Info(self.info()),
Request::Query(query) => Response::Query(self.query(query.data)),
// Note: https://github.com/tendermint/tendermint/blob/v0.38.x/spec/abci/abci%2B%2B_tmint_expected_behavior.md#adapting-existing-applications-that-use-abci
Request::PrepareProposal(prepare_prop) => Response::PrepareProposal(PrepareProposal {
txs: prepare_prop.txs,
}),
Request::ProcessProposal(..) => {
Response::ProcessProposal(response::ProcessProposal::Accept)
}
Request::ExtendVote(vote) => Response::ExtendVote(self.extend_vote(vote)),
Request::VerifyVoteExtension(vote) => {
Response::VerifyVoteExtension(self.verify_vote(vote))
}
Request::FinalizeBlock(block) => Response::FinalizeBlock(self.finalize_block(block)),
Request::Commit => Response::Commit(self.commit()),

// unhandled messages
Request::Flush => Response::Flush,
Request::Echo(_) => Response::Echo(Default::default()),
Request::InitChain(_) => Response::InitChain(Default::default()),
Request::CheckTx(_) => Response::CheckTx(Default::default()),
Request::ListSnapshots => Response::ListSnapshots(Default::default()),
Request::OfferSnapshot(_) => Response::OfferSnapshot(Default::default()),
Request::LoadSnapshotChunk(_) => Response::LoadSnapshotChunk(Default::default()),
Request::ApplySnapshotChunk(_) => Response::ApplySnapshotChunk(Default::default()),
};
tracing::info!(?rsp);
async move { Ok(rsp) }.boxed()
}
}

impl KVStore {
fn info(&self) -> response::Info {
response::Info {
data: "tower-abci-kvstore-example".to_string(),
version: "0.1.0".to_string(),
app_version: 1,
last_block_height: self.height.into(),
last_block_app_hash: self.app_hash.to_vec().try_into().unwrap(),
}
}

fn query(&self, query: Bytes) -> response::Query {
let key = String::from_utf8(query.to_vec()).unwrap();
let (value, log) = match self.store.get(&key) {
Some(value) => (value.clone(), "exists".to_string()),
None => ("".to_string(), "does not exist".to_string()),
};

response::Query {
log,
key: key.into_bytes().into(),
value: value.into_bytes().into(),
..Default::default()
}
}

fn execute_tx(&mut self, tx: Bytes) -> ExecTxResult {
let tx = String::from_utf8(tx.to_vec()).unwrap();
let tx_parts = tx.split('=').collect::<Vec<_>>();
let (key, value) = match (tx_parts.first(), tx_parts.get(1)) {
(Some(key), Some(value)) => (*key, *value),
_ => (tx.as_ref(), tx.as_ref()),
};
self.store.insert(key.to_string(), value.to_string());

ExecTxResult {
events: vec![Event::new(
"app",
vec![
("key", key).index(),
("index_key", "index is working").index(),
("noindex_key", "noindex is working").no_index(),
],
)],
..Default::default()
}
}

fn finalize_block(&mut self, block: request::FinalizeBlock) -> response::FinalizeBlock {
let mut tx_results = Vec::new();
for tx in block.txs {
tx_results.push(self.execute_tx(tx));
}
response::FinalizeBlock {
events: vec![Event::new(
"app",
vec![("num_tx", format!("{}", tx_results.len())).index()],
)],
tx_results,
validator_updates: vec![],
consensus_param_updates: None,
app_hash: self
.compute_apphash()
.to_vec()
.try_into()
.expect("vec to `AppHash` conversion is actually infaillible."),
}
}

fn commit(&mut self) -> response::Commit {
let retain_height = self.height.into();
// As in the other kvstore examples, just use store.len() as the "hash"
self.app_hash = self.compute_apphash();
self.height += 1;

response::Commit {
// This field is ignored for CometBFT >= 0.38
data: Bytes::default(),
retain_height,
}
}

fn extend_vote(&self, _vote: request::ExtendVote) -> response::ExtendVote {
response::ExtendVote {
vote_extension: Bytes::default(),
}
}

fn verify_vote(&self, _vote: request::VerifyVoteExtension) -> response::VerifyVoteExtension {
response::VerifyVoteExtension::Accept
}

fn compute_apphash(&self) -> [u8; 8] {
(self.store.len() as u64).to_be_bytes()
}
}

#[derive(Debug, StructOpt)]
struct Opt {
/// Bind the TCP server to this host.
#[structopt(short, long, default_value = "127.0.0.1")]
host: String,

/// Bind the TCP server to this port.
#[structopt(short, long, default_value = "26658")]
port: u16,

/// Bind the UDS server to this path
#[structopt(long)]
uds: Option<String>,
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let opt = Opt::from_args();

// Construct our ABCI application.
let service = KVStore::default();

// Split it into components.
let (consensus, mempool, snapshot, info) = split::service(service, 1);

// Hand those components to the ABCI server, but customize request behavior
// for each category -- for instance, apply load-shedding only to mempool
// and info requests, but not to consensus requests.
let server_builder = Server::builder()
.consensus(consensus)
.snapshot(snapshot)
.mempool(
ServiceBuilder::new()
.load_shed()
.buffer(10)
.service(mempool),
)
.info(
ServiceBuilder::new()
.load_shed()
.buffer(100)
.rate_limit(50, std::time::Duration::from_secs(1))
.service(info),
);

let server = server_builder.finish().unwrap();

if let Some(uds_path) = opt.uds {
server.listen_unix(uds_path).await.unwrap();
} else {
server
.listen_tcp(format!("{}:{}", opt.host, opt.port))
.await
.unwrap();
}
}
8 changes: 8 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,13 @@ pub mod v037 {
pub use server::ServerBuilder;
}

pub mod v038 {
mod codec;
mod server;
pub mod split;
pub use server::Server;
pub use server::ServerBuilder;
}

/// A convenient error type alias.
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
110 changes: 110 additions & 0 deletions src/v038/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::marker::PhantomData;

use tokio_util::codec::{Decoder, Encoder};

use bytes::{BufMut, BytesMut};

pub struct Decode<M> {
state: DecodeState,
_marker: PhantomData<M>,
}

impl<M> Default for Decode<M> {
fn default() -> Self {
Self {
state: DecodeState::Head,
_marker: PhantomData,
}
}
}

#[derive(Debug)]
enum DecodeState {
Head,
Body { len: usize },
}

impl<M: prost::Message + Default> Decoder for Decode<M> {
type Item = M;
type Error = crate::BoxError;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.state {
DecodeState::Head => {
tracing::trace!(?src, "decoding head");
// we don't use decode_varint directly, because it advances the
// buffer regardless of success, but Decoder assumes that when
// the buffer advances we've consumed the data. this is sort of
// a sad hack, but it works.
// TODO(erwan): fix this

// Tendermint socket protocol:
// "Messages are serialized using Protobuf3 and length-prefixed
// with an unsigned varint"
// See: https://github.com/tendermint/tendermint/blob/v0.38.x/spec/abci/abci++_client_server.md#socket
let mut tmp = src.clone().freeze();
let len = match prost::encoding::decode_varint(&mut tmp) {
Ok(_) => {
// advance the real buffer
prost::encoding::decode_varint(src).unwrap() as usize
}
Err(_) => {
tracing::trace!(?self.state, src.len = src.len(), "waiting for header data");
return Ok(None);
}
};
self.state = DecodeState::Body { len };
tracing::trace!(?self.state, "ready for body");

// Recurse to attempt body decoding.
self.decode(src)
}
DecodeState::Body { len } => {
if src.len() < len {
tracing::trace!(?self.state, src.len = src.len(), "waiting for body");
return Ok(None);
}

let body = src.split_to(len);
tracing::trace!(?body, "decoding body");
let message = M::decode(body)?;

// Now reset the decoder state for the next message.
self.state = DecodeState::Head;

Ok(Some(message))
}
}
}
}

pub struct Encode<M> {
_marker: PhantomData<M>,
}

impl<M> Default for Encode<M> {
fn default() -> Self {
Self {
_marker: PhantomData,
}
}
}

impl<M: prost::Message + Sized + std::fmt::Debug> Encoder<M> for Encode<M> {
type Error = crate::BoxError;

fn encode(&mut self, item: M, dst: &mut BytesMut) -> Result<(), Self::Error> {
let mut buf = BytesMut::new();
item.encode(&mut buf)?;
let buf = buf.freeze();

// Tendermint socket protocol:
// "Messages are serialized using Protobuf3 and length-prefixed
// with an unsigned varint"
// See: https://github.com/tendermint/tendermint/blob/v0.38.x/spec/abci/abci++_client_server.md#socket
prost::encoding::encode_varint(buf.len() as u64, dst);
dst.put(buf);

Ok(())
}
}
Loading

0 comments on commit 1cf2085

Please sign in to comment.