Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
tzemanovic committed Oct 27, 2023
1 parent cf9573d commit ba5d245
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ futures = "0.3"
tracing = { git = "https://github.com/tokio-rs/tracing/", tag = "tracing-0.1.30" }
prost = "0.11.0"
tracing-tower = { git = "https://github.com/tokio-rs/tracing/", tag = "tracing-0.1.30" }
backoff = { version = "0.4.0", features = ["tokio"] }

[dev-dependencies]
structopt = "0.3"
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod codec;
pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// A category of ABCI method.
#[derive(Debug)]
pub enum MethodKind {
Consensus,
Mempool,
Expand Down
81 changes: 61 additions & 20 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;

use backoff::ExponentialBackoff;
use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
Expand All @@ -19,6 +21,7 @@ use crate::{

/// An ABCI server which listens for connections and forwards requests to four
/// component ABCI [`Service`]s.
#[derive(Clone)]
pub struct Server<C, M, I, S> {
consensus: C,
mempool: M,
Expand Down Expand Up @@ -122,18 +125,25 @@ impl<C, M, I, S> Server<C, M, I, S>
where
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
C::Future: Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
M::Future: Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
I::Future: Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
S::Future: Send + 'static,
Expand All @@ -142,34 +152,65 @@ where
ServerBuilder::default()
}

pub async fn listen<A: ToSocketAddrs + std::fmt::Debug>(self, addr: A) -> Result<(), BoxError> {
pub async fn listen<A: ToSocketAddrs + std::fmt::Debug + Copy + Send + Sync + 'static>(
self,
addr: A,
) -> Result<(), BoxError> {
tracing::info!(?addr, "starting ABCI server");
let listener = TcpListener::bind(addr).await?;
let listener = Arc::new(TcpListener::bind(addr).await?);
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");

loop {
match listener.accept().await {
Ok((socket, addr)) => {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};
tokio::spawn(async move { conn.run(socket).await.unwrap() }.instrument(span));
}
Err(e) => {
tracing::warn!({ %e }, "error accepting new tcp connection");
}
}
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
// let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);

let server = self.clone();
let listener_clone = listener.clone();
tokio::spawn(async move {
let s = server.clone();
backoff::future::retry::<_, BoxError, _, _, _>(
ExponentialBackoff::default(),
|| async {
match listener_clone.accept().await {
Ok((socket, _addr)) => {
let conn = Connection {
consensus: s.consensus.clone(),
mempool: s.mempool.clone(),
info: s.info.clone(),
snapshot: s.snapshot.clone(),
};

if let Err(e) = conn.run(socket).await {
match e.downcast::<tower::load_shed::error::Overloaded>() {
Err(e) => {
tracing::error!({ %e }, "error in a connection handler");
return Err(backoff::Error::Permanent(e));
}
Ok(e) => {
tracing::warn!("Service overloaded - backing off");
return Err(backoff::Error::transient(e));
}
}
}
Ok(())
}
Err(e) => {
tracing::error!({ %e }, "error accepting new tcp connection");
Ok(())
}
}
},
)
.await
});
// .instrument(span);
}
}
}

#[derive(Clone)]
struct Connection<C, M, I, S> {
consensus: C,
mempool: M,
Expand Down

0 comments on commit ba5d245

Please sign in to comment.