Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tzemanovic committed Nov 6, 2023
1 parent ba5d245 commit 9ff6a79
Showing 1 changed file with 223 additions and 72 deletions.
295 changes: 223 additions & 72 deletions src/server.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
use std::convert::{TryFrom, TryInto};
use std::ops::DerefMut;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

Check warning on line 5 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `std::time::Duration`

use backoff::ExponentialBackoff;
use futures::future::{FutureExt, TryFutureExt};
use futures::sink::SinkExt;
use futures::stream::{FuturesOrdered, StreamExt};
use futures::stream::{FuturesOrdered, Peekable, StreamExt};
use futures::Future;
use tokio::io::{AsyncReadExt, AsyncWriteExt};

Check warning on line 12 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused imports: `AsyncReadExt`, `AsyncWriteExt`
use tokio::net::tcp::{ReadHalf, WriteHalf};
use tokio::sync::{Mutex, MutexGuard};
use tokio::{
net::{TcpListener, TcpStream, ToSocketAddrs},
select,
Expand All @@ -15,8 +22,9 @@ use tracing::Instrument;
use tracing::Level;

Check warning on line 22 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `tracing::Level`

use crate::{
BoxError, ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, MethodKind, Request, Response, SnapshotRequest, SnapshotResponse,
response, BoxError, ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse,

Check warning on line 25 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `response`
MempoolRequest, MempoolResponse, MethodKind, Request, Response, SnapshotRequest,
SnapshotResponse,
};

/// An ABCI server which listens for connections and forwards requests to four
Expand Down Expand Up @@ -161,52 +169,73 @@ where
let local_addr = listener.local_addr()?;
tracing::info!(?local_addr, "bound tcp listener");

// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
// let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);

let listener_clone = listener.clone();

Check warning on line 176 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `listener_clone`

// let socket = Arc::new(socket);
// let conn_clone = conn.clone();
// TODO: loop only if there's no error
let mut i = 0;
loop {
// set parent: None for the connection span, as it should
// exist independently of the listener's spans.
// let span = tracing::span!(parent: None, Level::ERROR, "abci", ?addr);

let server = self.clone();
let listener_clone = listener.clone();
tokio::spawn(async move {
let s = server.clone();
backoff::future::retry::<_, BoxError, _, _, _>(
ExponentialBackoff::default(),
|| async {
match listener_clone.accept().await {
Ok((socket, _addr)) => {
let conn = Connection {
consensus: s.consensus.clone(),
mempool: s.mempool.clone(),
info: s.info.clone(),
snapshot: s.snapshot.clone(),
};

if let Err(e) = conn.run(socket).await {
match e.downcast::<tower::load_shed::error::Overloaded>() {
Err(e) => {
tracing::error!({ %e }, "error in a connection handler");
return Err(backoff::Error::Permanent(e));
}
Ok(e) => {
tracing::warn!("Service overloaded - backing off");
return Err(backoff::Error::transient(e));
}
}
}
Ok(())
}
Err(e) => {
tracing::error!({ %e }, "error accepting new tcp connection");
Ok(())
}
}
},
)
.await
});
// .instrument(span);
println!("am looping {i}");
match listener.accept().await {
Ok((socket, _addr)) => {
println!(
"Accepting inner, socket: {socket:?}, linger: {:?}",
socket.linger()
);
// socket.set_linger(Some(Duration::new(100, 0))).unwrap();
let conn = Connection {
consensus: self.consensus.clone(),
mempool: self.mempool.clone(),
info: self.info.clone(),
snapshot: self.snapshot.clone(),
};

// let conn_clone = conn_clone.clone();
tokio::spawn(async move {
conn.run_with_backoff(socket, i).await.unwrap();
println!("Task {i} done");
// println!("am looping");
// backoff::future::retry::<_, BoxError, _, _, _>(
// ExponentialBackoff::default(),
// || async {
// if let Err(e) = conn_clone.clone().run(socket).await {
// match e.downcast::<tower::load_shed::error::Overloaded>() {
// Err(e) => {
// println!("error {e} in a connection handler");
// return Err(backoff::Error::Permanent(e));
// }
// Ok(e) => {
// println!("Overloaded - backing off");
// return Err(backoff::Error::transient(e));
// }
// }
// } else {
// println!("Conn ran without error");
// Ok(())
// }
// },
// )
// .await
// .unwrap();

// println!("Task done");
});
println!("Task spawned");
}

Err(e) => {
println!("error {e} accepting new tcp connection (inner)");
// return Err(backoff::Error::Permanent(e));
}
}
i = i + 1;
}
// .instrument(span);
}
}

Expand All @@ -220,42 +249,157 @@ struct Connection<C, M, I, S> {

impl<C, M, I, S> Connection<C, M, I, S>
where
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
+ Send
+ Clone
+ 'static,
C::Future: Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
+ Send
+ Clone
+ 'static,
M::Future: Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
I::Future: Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
+ Send
+ Clone
+ 'static,
S::Future: Send + 'static,
{
async fn run_with_backoff(self, mut socket: TcpStream, i: u64) -> Result<(), BoxError> {

Check warning on line 270 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

variable does not need to be mutable
let mut pinned_socket = Pin::new(Box::new(socket));
// TODO try to keep the stream and sink open instead
// let s = Arc::new(Mutex::new(socket));
let (read, write) = pinned_socket.split();
// let (mut request_stream, mut response_sink) = {
let rw = {
use crate::codec::{Decode, Encode};
use tendermint_proto::abci as pb;
let mut responses = FuturesOrdered::new();

Check warning on line 279 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

variable does not need to be mutable
Arc::new(Mutex::new((
FramedRead::new(read, Decode::<pb::Request>::default()).peekable(),
FramedWrite::new(write, Encode::<pb::Response>::default()),
responses,
)))
};

backoff::future::retry::<_, BoxError, _, _, _>(ExponentialBackoff::default(), || async {
println!("Trying to lock {i}");
// let mut s_guard = s.lock().await;
let mut rw_guard = rw.lock().await;
println!("Locked {i}");
// let (read, write) = s_guard.split();
// let (mut request_stream, mut response_sink) = {
// use crate::codec::{Decode, Encode};
// use tendermint_proto::abci as pb;
// (
// FramedRead::new(read, Decode::<pb::Request>::default()),
// FramedWrite::new(write, Encode::<pb::Response>::default()),
// )
// };

println!("Trying to run {i}");
let run_result = self
.clone()
// .run(&mut request_stream, &mut response_sink, i)
.run(&mut rw_guard, i)
.await;

if let Err(e) = run_result {
println!("some error in a connection handler");
match e.downcast::<tower::load_shed::error::Overloaded>() {
Err(e) => {
println!("error {e} in a connection handler");
return Err(backoff::Error::Permanent(e));
}
Ok(e) => {
println!(
"Overloaded on {i} - backing off after trying to send reponses, if any"
);

let (_, response_sink, responses) = rw_guard.deref_mut();
if !responses.is_empty() {
println!("Sending reponses first");
while let Some(rsp) = responses.next().await {
let response = rsp.expect("didn't poll when responses was empty");
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
tracing::debug!(?response, "sending response");
println!("sending response");
response_sink.send(response.into()).await?;
}
}

return Err(backoff::Error::transient(e));
}
}
} else {
println!("Conn {i} ran without error");
Ok(())
}
})
.await?;
println!("Backoff {i} finished");
// self.run_with_backoff(Arc::into_inner(s).unwrap().into_inner(), i)
// .await
Ok(())
}
// XXX handle errors gracefully
// figure out how / if to return errors to tendermint
async fn run(mut self, mut socket: TcpStream) -> Result<(), BoxError> {
async fn run(
mut self,
rw_guard: &mut MutexGuard<
'_,
(
Peekable<
FramedRead<ReadHalf<'_>, crate::codec::Decode<tendermint_proto::abci::Request>>,
>,
FramedWrite<WriteHalf<'_>, crate::codec::Encode<tendermint_proto::abci::Response>>,
FuturesOrdered<Pin<Box<dyn Future<Output = Result<Response, BoxError>> + Send>>>,
),
>,
// request_stream: &mut FramedRead<
// ReadHalf<'_>,
// crate::codec::Decode<tendermint_proto::abci::Request>,
// >,
// response_sink: &mut FramedWrite<
// WriteHalf<'_>,
// crate::codec::Encode<tendermint_proto::abci::Response>,
// >,
i: u64,
) -> Result<(), BoxError> {
tracing::info!("listening for requests");

use tendermint_proto::abci as pb;

Check warning on line 374 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused import: `tendermint_proto::abci as pb`

let (mut request_stream, mut response_sink) = {
use crate::codec::{Decode, Encode};
let (read, write) = socket.split();
(
FramedRead::new(read, Decode::<pb::Request>::default()),
FramedWrite::new(write, Encode::<pb::Response>::default()),
)
};

let mut responses = FuturesOrdered::new();
let (request_stream, response_sink, responses) = rw_guard.deref_mut();
let mut pinned_stream = Pin::new(request_stream);

let mut peeked = false;
loop {
println!("Run loop, i {i}");
select! {
req = request_stream.next() => {
let proto = match req.transpose()? {
Some(proto) => proto,
None => return Ok(()),
req = pinned_stream.as_mut().peek(), if !peeked => {
peeked = true;
let proto = match req {
Some(Ok(proto)) => proto.clone(),
Some(Err(err)) => {panic!("TODO:Request err")}

Check warning on line 387 in src/server.rs

View workflow job for this annotation

GitHub Actions / Check

unused variable: `err`
None => {
println!("HUH {i}");
return Ok(());
},
};
// let proto = match req.transpose()? {
// Some(proto) => proto,
// None => {
// println!("HUH {i}");
// return Ok(());
// },
// };
let request = Request::try_from(proto)?;
tracing::debug!(?request, "new request");
println!("new request on {i}, kind {:?}", request.kind());
match request.kind() {
MethodKind::Consensus => {
let request = request.try_into().expect("checked kind");
Expand All @@ -266,7 +410,7 @@ where
responses.push_back(response.map_ok(Response::from).boxed());
},
Err(err) => {
tracing::error!("consensus service is not ready: {}", err);
println!("consensus service is not ready: {}", err);
}
}
}
Expand All @@ -279,7 +423,7 @@ where
responses.push_back(response.map_ok(Response::from).boxed());
},
Err(err) => {
tracing::error!("mempool service is not ready: {}", err);
println!("mempool service is not ready: {}", err);
}
}
}
Expand All @@ -292,7 +436,7 @@ where
responses.push_back(response.map_ok(Response::from).boxed());
},
Err(err) => {
tracing::error!("snapshot service is not ready: {}", err);
println!("snapshot service is not ready: {}", err);
}
}
}
Expand All @@ -305,7 +449,7 @@ where
responses.push_back(response.map_ok(Response::from).boxed());
},
Err(err) => {
tracing::error!("info service is not ready: {}", err);
println!("info service is not ready: {}", err);
}
}
}
Expand All @@ -317,19 +461,26 @@ where
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
tracing::debug!(?response, "flushing response");
println!("flushing response");
response_sink.send(response?.into()).await?;
}
peeked = false;
pinned_stream.next().await.unwrap().unwrap();
println!("send flush done");
// Now we need to tell Tendermint we've flushed responses
response_sink.send(Response::Flush(Default::default()).into()).await?;
}
}
}
rsp = responses.next(), if !responses.is_empty() => {
let response = rsp.expect("didn't poll when responses was empty");
peeked = false;
let response = rsp.expect("didn't poll when responses was empty")?;
pinned_stream.next().await.unwrap().unwrap();
// XXX: sometimes we might want to send errors to tendermint
// https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
tracing::debug!(?response, "sending response");
response_sink.send(response?.into()).await?;
println!("sending response");
response_sink.send(response.into()).await?;
}
}
}
Expand Down

0 comments on commit 9ff6a79

Please sign in to comment.