Skip to content

Commit

Permalink
fixed merge conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Jun 6, 2024
2 parents b1b09d3 + b119bf9 commit 9c73ccd
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
8 changes: 4 additions & 4 deletions zingo-proxyd/src/bin/zingoproxyd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ use std::{
Arc,
},
};

use zingoproxylib::proxy::spawn_proxy;

use ctrlc;

#[tokio::main]
async fn main() {
let online = Arc::new(AtomicBool::new(true));
Expand Down Expand Up @@ -43,5 +40,8 @@ async fn main() {
)
.await;

while online.load(Ordering::SeqCst) {}
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
while online.load(Ordering::SeqCst) {
interval.tick().await;
}
}
8 changes: 8 additions & 0 deletions zingo-proxyd/src/nym_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,16 @@ impl NymServer {
) -> tokio::task::JoinHandle<Result<(), tonic::transport::Error>> {
let mut request_in: Vec<ReconstructedMessage> = Vec::new();
tokio::task::spawn(async move {
// NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
while online.load(Ordering::SeqCst) {
while let Some(request_nym) = self.0 .0.wait_for_messages().await {
if request_nym.is_empty() {
interval.tick().await;
if !online.load(Ordering::SeqCst) {
println!("Nym server shutting down.");
return Ok(());
}
continue;
}
request_in = request_nym;
Expand Down Expand Up @@ -74,6 +81,7 @@ impl NymServer {
.await
.unwrap();
}
println!("Nym server shutting down.");
Ok(())
})
}
Expand Down
39 changes: 29 additions & 10 deletions zingo-proxyd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,37 @@ impl ProxyServer {
tokio::task::spawn(async move {
let svc = CompactTxStreamerServer::new(self.0);
let sockaddr = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), port.into());
println!("@zingoproxyd: GRPC server listening on: {sockaddr}.");
while online.load(Ordering::SeqCst) {
let server = tonic::transport::Server::builder()
.add_service(svc.clone())
.serve(sockaddr)
.await;
match server {
Ok(_) => (),
Err(e) => return Err(e),
println!("@zingoproxyd: gRPC server listening on: {sockaddr}");

let server = tonic::transport::Server::builder()
.add_service(svc.clone())
.serve(sockaddr);

let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
tokio::select! {
result = server => {
match result {
Ok(_) => {
// TODO: Gracefully restart gRPC server.
println!("@zingoproxyd: gRPC Server closed early. Restart required");
Ok(())
}
Err(e) => {
// TODO: restart server or set online to false and exit
println!("@zingoproxyd: gRPC Server closed with error: {}. Restart required", e);
Err(e)
}
}
}
_ = async {
while online.load(Ordering::SeqCst) {
interval.tick().await;
}
} => {
println!("@zingoproxyd: gRPC server shutting down.");
Ok(())
}
}
Ok(())
})
}

Expand Down

0 comments on commit 9c73ccd

Please sign in to comment.