Skip to content

Commit

Permalink
updated child thread closure
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Jun 5, 2024
1 parent 74a34f7 commit 5ee32c5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 11 deletions.
8 changes: 6 additions & 2 deletions zingo-proxyd/src/bin/zingoproxyd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use zingoproxylib::proxy::spawn_proxy;

extern crate ctrlc;
use ctrlc;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -42,5 +42,9 @@ async fn main() {
)
.await;

while online.load(Ordering::SeqCst) {}
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));

while online.load(Ordering::SeqCst) {
interval.tick().await;
}
}
7 changes: 7 additions & 0 deletions zingo-proxyd/src/nym_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,16 @@ impl NymServer {
) -> tokio::task::JoinHandle<Result<(), tonic::transport::Error>> {
let mut request_in: Vec<ReconstructedMessage> = Vec::new();
tokio::task::spawn(async move {
// NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
while online.load(Ordering::SeqCst) {
while let Some(request_nym) = self.0 .0.wait_for_messages().await {
if request_nym.is_empty() {
interval.tick().await;
if !online.load(Ordering::SeqCst) {
println!("Nym server shutting down.");
return Ok(());
}
continue;
}
request_in = request_nym;
Expand Down
37 changes: 28 additions & 9 deletions zingo-proxyd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,36 @@ impl ProxyServer {
let svc = CompactTxStreamerServer::new(self.0);
let sockaddr = SocketAddr::new(std::net::IpAddr::V4(Ipv4Addr::LOCALHOST), port.into());
println!("GRPC server listening on: {sockaddr}");
while online.load(Ordering::SeqCst) {
let server = tonic::transport::Server::builder()
.add_service(svc.clone())
.serve(sockaddr)
.await;
match server {
Ok(_) => (),
Err(e) => return Err(e),

let server = tonic::transport::Server::builder()
.add_service(svc.clone())
.serve(sockaddr);

let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(500));
tokio::select! {
result = server => {
match result {
Ok(_) => {
// TODO: Gracefully restart gRPC server.
println!("gRPC Server closed early. Restart required");
Ok(())
}
Err(e) => {
// TODO: restart server or set online to false and exit
println!("gRPC Server closed with error: {}. Restart required", e);
Err(e)
}
}
}
_ = async {
while online.load(Ordering::SeqCst) {
interval.tick().await;
}
} => {
println!("gRPC server shutting down.");
Ok(())
}
}
Ok(())
})
}

Expand Down

0 comments on commit 5ee32c5

Please sign in to comment.