From ad01cc4cf35842dd7b7568010a0cf41a4798ebb8 Mon Sep 17 00:00:00 2001 From: idky137 Date: Thu, 8 Aug 2024 18:13:22 +0100 Subject: [PATCH] implemented Server --- zingo-rpc/src/server.rs | 452 +++++++++++++++-------------- zingo-rpc/src/server/dispatcher.rs | 53 ++-- zingo-rpc/src/server/ingestor.rs | 74 ++--- zingo-rpc/src/server/queue.rs | 31 +- zingo-rpc/src/server/worker.rs | 156 ++++------ 5 files changed, 382 insertions(+), 384 deletions(-) diff --git a/zingo-rpc/src/server.rs b/zingo-rpc/src/server.rs index 268b30d..c9591ac 100644 --- a/zingo-rpc/src/server.rs +++ b/zingo-rpc/src/server.rs @@ -5,7 +5,7 @@ use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag; use std::{ net::SocketAddr, sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, }; @@ -18,35 +18,99 @@ pub mod request; pub mod worker; use self::{ - dispatcher::{DispatcherStatus, NymDispatcher}, + dispatcher::NymDispatcher, error::{DispatcherError, IngestorError, ServerError, WorkerError}, - ingestor::{IngestorStatus, NymIngestor, TcpIngestor}, + ingestor::{NymIngestor, TcpIngestor}, queue::Queue, request::ZingoProxyRequest, - worker::{WorkerPool, WorkerStatusType}, + worker::{WorkerPool, WorkerPoolStatus}, }; -/// -#[derive(Debug, PartialEq, Clone)] -pub struct ServerStatus { - tcp_ingestor_status: IngestorStatus, - nym_ingestor_status: IngestorStatus, - nym_dispatcher_status: DispatcherStatus, - workerpool_status: (usize, usize, Vec), - request_queue_status: (usize, usize), - nym_response_queue_status: (usize, usize), +/// Holds a thread safe reperesentation of a StatusType. +/// Possible values: +/// - [0: Spawning] +/// - [1: Listening] +/// - [2: Working] +/// - [3: Inactive] +/// - [4: Closing]. +/// - [>=5: Offline]. +/// - [>=6: Error]. +/// TODO: Define error code spec. +#[derive(Debug, Clone)] +pub struct AtomicStatus(Arc); + +impl AtomicStatus { + /// Creates a new AtomicStatus + pub fn new(status: usize) -> Self { + Self(Arc::new(AtomicUsize::new(status))) + } + + /// Loads the value held in the AtomicStatus + pub fn load(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + /// Sets the value held in the AtomicStatus + pub fn store(&self, status: usize) { + self.0.store(status, Ordering::SeqCst); + } } + /// Status of the server. #[derive(Debug, PartialEq, Clone)] -pub enum ServerStatusType { +pub enum StatusType { /// Running initial startup routine. - Spawning(ServerStatus), - /// Processing incoming requests. - Active(ServerStatus), - /// Waiting for node / blockcache to sync. - Hold(ServerStatus), + Spawning = 0, + /// Waiting for requests from the queue. + Listening = 1, + /// Processing requests from the queue.StatusType + Working = 2, + /// On hold, due to blockcache / node error. + Inactive = 3, /// Running shutdown routine. - Closing(ServerStatus), + Closing = 4, + /// Offline. + Offline = 5, + /// Offline. + Error = 6, +} + +impl From for StatusType { + fn from(value: usize) -> Self { + match value { + 0 => StatusType::Spawning, + 1 => StatusType::Listening, + 2 => StatusType::Working, + 3 => StatusType::Inactive, + 4 => StatusType::Closing, + 5 => StatusType::Offline, + _ => StatusType::Error, + } + } +} + +impl From for usize { + fn from(status: StatusType) -> Self { + status as usize + } +} + +impl From for StatusType { + fn from(status: AtomicStatus) -> Self { + status.load().into() + } +} + +/// Holds the status of the server and all its components. +#[derive(Debug, Clone)] +pub struct ServerStatus { + server_status: AtomicStatus, + tcp_ingestor_status: AtomicStatus, + nym_ingestor_status: AtomicStatus, + nym_dispatcher_status: AtomicStatus, + workerpool_status: WorkerPoolStatus, + request_queue_status: Arc, + nym_response_queue_status: Arc, } /// LightWallet server capable of servicing clients over both http and nym. @@ -64,160 +128,104 @@ pub struct Server { /// Nym response queue. nym_response_queue: Queue<(Vec, AnonymousSenderTag)>, /// Servers current status. - status: ServerStatusType, + status: ServerStatus, /// Represents the Online status of the Server. pub online: Arc, } impl Server { - /// Spawns a new server. + /// Spawns a new Server. pub async fn spawn( tcp_active: bool, - tcp_ingestor_listen_addr: SocketAddr, + tcp_ingestor_listen_addr: Option, nym_active: bool, - nym_conf_path: &str, + nym_conf_path: Option<&str>, lightwalletd_uri: Uri, zebrad_uri: Uri, max_queue_size: usize, max_worker_pool_size: usize, idle_worker_pool_size: usize, + status: ServerStatus, online: Arc, ) -> Result { - let ( - request_queue, - nym_response_queue, - tcp_ingestor, - nym_ingestor, - nym_dispatcher, - worker_pool, - ) = match (tcp_active, nym_active) { - (false, false) => Err(ServerError::ServerConfigError( - "Cannot start server with no ingestors selected, at least one of nym or tcp must be set to active in conf.".to_string(), - )), - (false, true) => { - let request_queue = Queue::new(max_queue_size); - let nym_response_queue = Queue::new(max_queue_size); - let nym_ingestor = Some( - NymIngestor::spawn(nym_conf_path, request_queue.tx().clone(), online.clone()) - .await?, - ); - let nym_dispatcher = Some( - NymDispatcher::spawn( - nym_conf_path, - nym_response_queue.rx().clone(), - nym_response_queue.tx().clone(), - online.clone(), - ) - .await?, - ); - let worker_pool = WorkerPool::spawn( - max_worker_pool_size, - idle_worker_pool_size, - request_queue.rx().clone(), + if !(tcp_active && nym_active) { + return Err(ServerError::ServerConfigError( + "Cannot start server with no ingestors selected, at least one of either nym or tcp must be set to active in conf.".to_string(), + )); + } + if tcp_active && tcp_ingestor_listen_addr.is_none() { + return Err(ServerError::ServerConfigError( + "TCP is active but no address provided.".to_string(), + )); + } + if nym_active && nym_conf_path.is_none() { + return Err(ServerError::ServerConfigError( + "NYM is active but no conf path provided.".to_string(), + )); + } + status.server_status.store(0); + let request_queue: Queue = + Queue::new(max_queue_size, status.request_queue_status.clone()); + status.request_queue_status.store(0, Ordering::SeqCst); + let nym_response_queue: Queue<(Vec, AnonymousSenderTag)> = + Queue::new(max_queue_size, status.nym_response_queue_status.clone()); + status.nym_response_queue_status.store(0, Ordering::SeqCst); + let tcp_ingestor = if tcp_active { + Some( + TcpIngestor::spawn( + tcp_ingestor_listen_addr.expect( + "tcp_ingestor_listen_addr returned none when used, after checks made.", + ), request_queue.tx().clone(), - nym_response_queue.tx().clone(), - lightwalletd_uri, - zebrad_uri, + status.tcp_ingestor_status.clone(), online.clone(), ) - .await; - Ok(( - request_queue, - nym_response_queue, - None, - nym_ingestor, - nym_dispatcher, - worker_pool, - )) - } - (true, false) => { - let request_queue = Queue::new(max_queue_size); - let nym_response_queue = Queue::new(max_queue_size); - let tcp_ingestor = Some( - TcpIngestor::spawn( - tcp_ingestor_listen_addr, - request_queue.tx().clone(), - online.clone(), - ) - .await?, - ); - let worker_pool = WorkerPool::spawn( - max_worker_pool_size, - idle_worker_pool_size, - request_queue.rx().clone(), + .await?, + ) + } else { + None + }; + let nym_ingestor = if nym_active { + Some( + NymIngestor::spawn( + nym_conf_path + .expect("nym_conf_path returned none when used, after checks made."), request_queue.tx().clone(), - nym_response_queue.tx().clone(), - lightwalletd_uri, - zebrad_uri, + status.nym_ingestor_status.clone(), online.clone(), ) - .await; - Ok(( - request_queue, - nym_response_queue, - tcp_ingestor, - None, - None, - worker_pool, - )) - } - (true, true) => { - let request_queue = Queue::new(max_queue_size); - let nym_response_queue = Queue::new(max_queue_size); - let tcp_ingestor = Some( - TcpIngestor::spawn( - tcp_ingestor_listen_addr, - request_queue.tx().clone(), - online.clone(), - ) - .await?, - ); - let nym_ingestor = Some( - NymIngestor::spawn(nym_conf_path, request_queue.tx().clone(), online.clone()) - .await?, - ); - let nym_dispatcher = Some( - NymDispatcher::spawn( - nym_conf_path, - nym_response_queue.rx().clone(), - nym_response_queue.tx().clone(), - online.clone(), - ) - .await?, - ); - let worker_pool = WorkerPool::spawn( - max_worker_pool_size, - idle_worker_pool_size, - request_queue.rx().clone(), - request_queue.tx().clone(), + .await?, + ) + } else { + None + }; + let nym_dispatcher = if nym_active { + Some( + NymDispatcher::spawn( + nym_conf_path + .expect("nym_conf_path returned none when used, after checks made."), + nym_response_queue.rx().clone(), nym_response_queue.tx().clone(), - lightwalletd_uri, - zebrad_uri, + status.nym_dispatcher_status.clone(), online.clone(), ) - .await; - Ok(( - request_queue, - nym_response_queue, - tcp_ingestor, - nym_ingestor, - nym_dispatcher, - worker_pool, - )) - } - }?; - let status = ServerStatusType::Spawning(ServerStatus { - tcp_ingestor_status: IngestorStatus::Inactive, - nym_ingestor_status: IngestorStatus::Inactive, - nym_dispatcher_status: DispatcherStatus::Inactive, - workerpool_status: ( - idle_worker_pool_size, - max_worker_pool_size, - vec![WorkerStatusType::Spawning; worker_pool.workers()], - ), - request_queue_status: (0, max_queue_size), - nym_response_queue_status: (0, max_queue_size), - }); + .await?, + ) + } else { + None + }; + let worker_pool = WorkerPool::spawn( + max_worker_pool_size, + idle_worker_pool_size, + request_queue.rx().clone(), + request_queue.tx().clone(), + nym_response_queue.tx().clone(), + lightwalletd_uri, + zebrad_uri, + status.workerpool_status.clone(), + online.clone(), + ) + .await; Ok(Server { tcp_ingestor, nym_ingestor, @@ -225,12 +233,17 @@ impl Server { worker_pool, request_queue, nym_response_queue, - status, + status: status.clone(), online, }) } - /// Starts the server. + /// Starts the gRPC service. + /// + /// Launches all components then enters command loop: + /// - Checks request queue and workerpool to spawn / despawn workers as required. + /// - Updates the ServerStatus. + /// - Checks for shutdown signal, shutting down server if received. pub async fn serve(mut self) -> tokio::task::JoinHandle> { tokio::task::spawn(async move { // NOTE: This interval may need to be reduced or removed / moved once scale testing begins. @@ -239,18 +252,21 @@ impl Server { let mut nym_ingestor_handle = None; let mut tcp_ingestor_handle = None; let mut worker_handles; - if let Some(dispatcher) = self.nym_dispatcher { + if let Some(dispatcher) = self.nym_dispatcher.take() { nym_dispatcher_handle = Some(dispatcher.serve().await); } - if let Some(ingestor) = self.nym_ingestor { + if let Some(ingestor) = self.nym_ingestor.take() { nym_ingestor_handle = Some(ingestor.serve().await); } - if let Some(ingestor) = self.tcp_ingestor { + if let Some(ingestor) = self.tcp_ingestor.take() { tcp_ingestor_handle = Some(ingestor.serve().await); } worker_handles = self.worker_pool.clone().serve().await; + self.status.server_status.store(1); loop { - if self.request_queue.queue_length() >= (self.request_queue.max_length() / 2) { + if self.request_queue.queue_length() >= (self.request_queue.max_length() / 2) + && (self.worker_pool.workers() < self.worker_pool.max_size()) + { match self.worker_pool.push_worker().await { Ok(handle) => { worker_handles.push(handle); @@ -259,43 +275,35 @@ impl Server { eprintln!("WorkerPool at capacity"); } } - } else { - let excess_workers: usize = if (self.worker_pool.workers() - - self.worker_pool.check_long_standby()) - < self.worker_pool.idle_size() - { - self.worker_pool.workers() - self.worker_pool.idle_size() - } else { - self.worker_pool.check_long_standby() - }; - for i in ((self.worker_pool.workers() - excess_workers) - ..self.worker_pool.workers()) - .rev() - { - let handle = worker_handles.remove(i); - match self.worker_pool.pop_worker(handle).await { - Ok(_) => {} - Err(e) => { - eprintln!("Failed to pop worker from pool: {}", e); - // TODO: Handle this error. - } + } else if (self.request_queue.queue_length() <= 1) + && (self.worker_pool.workers() > self.worker_pool.idle_size()) + { + let worker_index = self.worker_pool.workers() - 1; + let worker_handle = worker_handles.remove(worker_index); + match self.worker_pool.pop_worker(worker_handle).await { + Ok(_) => {} + Err(e) => { + eprintln!("Failed to pop worker from pool: {}", e); + // TODO: Handle this error. } } } - // self.check_statuses(); - // if self.check_for_shutdown().await { - // let worker_handle_options: Vec< - // Option>>, - // > = worker_handles.into_iter().map(Some).collect(); - // self.shutdown_components( - // tcp_ingestor_handle, - // nym_ingestor_handle, - // nym_dispatcher_handle, - // worker_handle_options, - // ) - // .await; - // return Ok(()); - // } + self.statuses(); + // TODO: Implement check_statuses() and run here. + if self.check_for_shutdown().await { + let worker_handle_options: Vec< + Option>>, + > = worker_handles.into_iter().map(Some).collect(); + self.shutdown_components( + tcp_ingestor_handle, + nym_ingestor_handle, + nym_dispatcher_handle, + worker_handle_options, + ) + .await; + self.status.server_status.store(5); + return Ok(()); + } interval.tick().await; } }) @@ -303,13 +311,18 @@ impl Server { /// Checks indexers online status and servers internal status for closure signal. pub async fn check_for_shutdown(&self) -> bool { - if let ServerStatusType::Closing(_) = self.status { + if self.status() >= 4 { return true; } if !self.check_online() { return true; } - return false; + false + } + + /// Sets the servers to close gracefully. + pub async fn shutdown(&mut self) { + self.status.server_status.store(4) } /// Sets the server's components to close gracefully. @@ -320,36 +333,51 @@ impl Server { nym_dispatcher_handle: Option>>, mut worker_handles: Vec>>>, ) { - if let Some(ingestor) = self.tcp_ingestor.as_mut() { - ingestor.shutdown().await; - if let Some(handle) = tcp_ingestor_handle { - handle.await.ok(); - } + if let Some(handle) = tcp_ingestor_handle { + self.status.tcp_ingestor_status.store(4); + handle.await.ok(); } - if let Some(ingestor) = self.nym_ingestor.as_mut() { - ingestor.shutdown().await; - if let Some(handle) = nym_ingestor_handle { - handle.await.ok(); - } + if let Some(handle) = nym_ingestor_handle { + self.status.nym_ingestor_status.store(4); + handle.await.ok(); } self.worker_pool.shutdown(&mut worker_handles).await; - if let Some(dispatcher) = self.nym_dispatcher.as_mut() { - dispatcher.shutdown().await; - if let Some(handle) = nym_dispatcher_handle { - handle.await.ok(); - } + if let Some(handle) = nym_dispatcher_handle { + self.status.nym_dispatcher_status.store(4); + handle.await.ok(); } self.online .store(false, std::sync::atomic::Ordering::SeqCst); } - /// Returns the status of the server and its parts, to be consumed by system printout. - pub async fn status(&self) -> ServerStatus { - todo!() + /// Returns the servers current status usize. + pub fn status(&self) -> usize { + self.status.server_status.load() + } + + /// Returns the servers current statustype. + pub fn statustype(&self) -> StatusType { + StatusType::from(self.status()) + } + + /// Updates and returns the status of the server and its parts. + pub fn statuses(&mut self) -> ServerStatus { + self.status.server_status.load(); + self.status.tcp_ingestor_status.load(); + self.status.nym_ingestor_status.load(); + self.status.nym_dispatcher_status.load(); + self.status + .request_queue_status + .store(self.request_queue.queue_length(), Ordering::SeqCst); + self.status + .nym_response_queue_status + .store(self.nym_response_queue.queue_length(), Ordering::SeqCst); + self.worker_pool.status(); + self.status.clone() } - /// Checks status, handling errors. Returns ServerStatus. - pub async fn check_statuses(&self) -> ServerStatus { + /// Checks statuses, handling errors. + pub async fn check_statuses(&mut self) { todo!() } diff --git a/zingo-rpc/src/server/dispatcher.rs b/zingo-rpc/src/server/dispatcher.rs index 8ec6283..09c37ef 100644 --- a/zingo-rpc/src/server/dispatcher.rs +++ b/zingo-rpc/src/server/dispatcher.rs @@ -9,23 +9,13 @@ use std::sync::{ use crate::{ nym::{client::NymClient, error::NymError}, - server::error::{DispatcherError, QueueError}, - server::queue::{QueueReceiver, QueueSender}, + server::{ + error::{DispatcherError, QueueError}, + queue::{QueueReceiver, QueueSender}, + AtomicStatus, StatusType, + }, }; -/// Status of the worker. -#[derive(Debug, PartialEq, Clone)] -pub enum DispatcherStatus { - /// On hold, due to blockcache / node error. - Inactive, - /// Listening for new requests. - Listening, - /// Running shutdown routine. - Closing, - /// Offline. - Offline, -} - /// Sends gRPC responses over Nym Mixnet. pub struct NymDispatcher { /// Nym Client @@ -34,10 +24,10 @@ pub struct NymDispatcher { response_queue: QueueReceiver<(Vec, AnonymousSenderTag)>, /// Used to send requests to the queue. response_requeue: QueueSender<(Vec, AnonymousSenderTag)>, + /// Current status of the ingestor. + status: AtomicStatus, /// Represents the Online status of the gRPC server. online: Arc, - /// Current status of the ingestor. - status: DispatcherStatus, } impl NymDispatcher { @@ -46,15 +36,17 @@ impl NymDispatcher { nym_conf_path: &str, response_queue: QueueReceiver<(Vec, AnonymousSenderTag)>, response_requeue: QueueSender<(Vec, AnonymousSenderTag)>, + status: AtomicStatus, online: Arc, ) -> Result { + status.store(0); let client = NymClient::spawn(&format!("{}/dispatcher", nym_conf_path)).await?; Ok(NymDispatcher { dispatcher: client, response_queue, response_requeue, online, - status: DispatcherStatus::Inactive, + status, }) } @@ -63,12 +55,13 @@ impl NymDispatcher { tokio::task::spawn(async move { // NOTE: This interval may need to be changed or removed / moved once scale testing begins. let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50)); - // TODO Check self.status and wait on server / node if on hold. - self.status = DispatcherStatus::Listening; + // TODO Check blockcache sync status and wait on server / node if on hold. + self.status.store(1); loop { tokio::select! { _ = interval.tick() => { if self.check_for_shutdown().await { + self.status.store(5); return Ok(()); } } @@ -77,6 +70,7 @@ impl NymDispatcher { Ok(response) => { // NOTE: This may need to be removed / moved for scale use. if self.check_for_shutdown().await { + self.status.store(5); return Ok(()); } if let Err(nym_e) = self.dispatcher @@ -95,6 +89,7 @@ impl NymDispatcher { Err(_e) => { eprintln!("Failed to send response over nym: {}\nAnd failed to requeue response due to the queue being closed.\nFatal error! Nym dispatcher shutting down..", nym_e); // TODO: Handle queue closed error here. (return correct error type?) + self.status.store(6); return Ok(()); //Return Err! } } @@ -103,6 +98,7 @@ impl NymDispatcher { Err(_e) => { eprintln!("Response queue closed, nym dispatcher shutting down."); //TODO: Handle this error here (return correct error type?) + self.status.store(6); return Ok(()); // Return Err! } } @@ -114,23 +110,28 @@ impl NymDispatcher { /// Checks indexers online status and ingestors internal status for closure signal. pub async fn check_for_shutdown(&self) -> bool { - if let DispatcherStatus::Closing = self.status { + if self.status() >= 4 { return true; } if !self.check_online() { return true; } - return false; + false } /// Sets the dispatcher to close gracefully. pub async fn shutdown(&mut self) { - self.status = DispatcherStatus::Closing + self.status.store(4) + } + + /// Returns the dispatchers current status usize. + pub fn status(&self) -> usize { + self.status.load() } - /// Returns the dispatchers current status. - pub fn status(&self) -> DispatcherStatus { - self.status.clone() + /// Returns the dispatchers current statustype. + pub fn statustype(&self) -> StatusType { + StatusType::from(self.status()) } fn check_online(&self) -> bool { diff --git a/zingo-rpc/src/server/ingestor.rs b/zingo-rpc/src/server/ingestor.rs index 961d24e..22bd9ca 100644 --- a/zingo-rpc/src/server/ingestor.rs +++ b/zingo-rpc/src/server/ingestor.rs @@ -15,32 +15,20 @@ use crate::{ error::{IngestorError, QueueError}, queue::QueueSender, request::ZingoProxyRequest, + AtomicStatus, StatusType, }, }; -/// Status of the worker. -#[derive(Debug, PartialEq, Clone)] -pub enum IngestorStatus { - /// On hold, due to blockcache / node error. - Inactive, - /// Listening for new requests. - Listening, - /// Running shutdown routine. - Closing, - /// Offline. - Offline, -} - /// Listens for incoming gRPC requests over HTTP. pub struct TcpIngestor { /// Tcp Listener. ingestor: TcpListener, /// Used to send requests to the queue. queue: QueueSender, + /// Current status of the ingestor. + status: AtomicStatus, /// Represents the Online status of the gRPC server. online: Arc, - /// Current status of the ingestor. - status: IngestorStatus, } impl TcpIngestor { @@ -48,14 +36,16 @@ impl TcpIngestor { pub async fn spawn( listen_addr: SocketAddr, queue: QueueSender, + status: AtomicStatus, online: Arc, ) -> Result { + status.store(0); let listener = TcpListener::bind(listen_addr).await?; Ok(TcpIngestor { ingestor: listener, queue, online, - status: IngestorStatus::Inactive, + status, }) } @@ -65,17 +55,19 @@ impl TcpIngestor { // NOTE: This interval may need to be changed or removed / moved once scale testing begins. let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50)); // TODO Check blockcache sync status and wait on server / node if on hold. - self.status = IngestorStatus::Listening; + self.status.store(1); loop { tokio::select! { _ = interval.tick() => { if self.check_for_shutdown().await { + self.status.store(5); return Ok(()); } } incoming = self.ingestor.accept() => { // NOTE: This may need to be removed / moved for scale use. if self.check_for_shutdown().await { + self.status.store(5); return Ok(()); } match incoming { @@ -105,23 +97,28 @@ impl TcpIngestor { /// Checks indexers online status and ingestors internal status for closure signal. pub async fn check_for_shutdown(&self) -> bool { - if let IngestorStatus::Closing = self.status { + if self.status() >= 4 { return true; } if !self.check_online() { return true; } - return false; + false } /// Sets the ingestor to close gracefully. pub async fn shutdown(&mut self) { - self.status = IngestorStatus::Closing + self.status.store(4) + } + + /// Returns the ingestor current status usize. + pub fn status(&self) -> usize { + self.status.load() } - /// Returns the ingestor current status. - pub fn status(&self) -> IngestorStatus { - self.status.clone() + /// Returns the ingestor current statustype. + pub fn statustype(&self) -> StatusType { + StatusType::from(self.status()) } fn check_online(&self) -> bool { @@ -135,10 +132,10 @@ pub struct NymIngestor { ingestor: NymClient, /// Used to send requests to the queue. queue: QueueSender, + /// Current status of the ingestor. + status: AtomicStatus, /// Represents the Online status of the gRPC server. online: Arc, - /// Current status of the ingestor. - status: IngestorStatus, } impl NymIngestor { @@ -146,14 +143,17 @@ impl NymIngestor { pub async fn spawn( nym_conf_path: &str, queue: QueueSender, + status: AtomicStatus, online: Arc, ) -> Result { + status.store(0); + // TODO: HANDLE THESE ERRORS TO SMOOTH MIXNET CLIENT SPAWN PROCESS! let listener = NymClient::spawn(&format!("{}/ingestor", nym_conf_path)).await?; Ok(NymIngestor { ingestor: listener, queue, online, - status: IngestorStatus::Inactive, + status, }) } @@ -163,18 +163,19 @@ impl NymIngestor { // NOTE: This interval may need to be reduced or removed / moved once scale testing begins. let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50)); // TODO Check blockcache sync status and wait on server / node if on hold. - self.status = IngestorStatus::Listening; - + self.status.store(1); loop { tokio::select! { _ = interval.tick() => { if self.check_for_shutdown().await { + self.status.store(5); return Ok(()) } } incoming = self.ingestor.client.wait_for_messages() => { // NOTE: This may need to be removed /moved for scale use. if self.check_for_shutdown().await { + self.status.store(5); return Ok(()) } match incoming { @@ -217,23 +218,28 @@ impl NymIngestor { /// Checks indexers online status and ingestors internal status for closure signal. pub async fn check_for_shutdown(&self) -> bool { - if let IngestorStatus::Closing = self.status { + if self.status() >= 4 { return true; } if !self.check_online() { return true; } - return false; + false } /// Sets the ingestor to close gracefully. pub async fn shutdown(&mut self) { - self.status = IngestorStatus::Closing + self.status.store(4) + } + + /// Returns the ingestor current status usize. + pub fn status(&self) -> usize { + self.status.load() } - /// Returns the ingestor current status. - pub fn status(&self) -> IngestorStatus { - self.status.clone() + /// Returns the ingestor current statustype. + pub fn statustype(&self) -> StatusType { + StatusType::from(self.status()) } fn check_online(&self) -> bool { diff --git a/zingo-rpc/src/server/queue.rs b/zingo-rpc/src/server/queue.rs index 641500d..38b2c3f 100644 --- a/zingo-rpc/src/server/queue.rs +++ b/zingo-rpc/src/server/queue.rs @@ -14,7 +14,7 @@ pub struct Queue { /// Max number of messages allowed in the queue. max_length: usize, /// Used to track current messages in the queue. - message_count: Arc, + queue_status: Arc, /// Queue sender. queue_tx: QueueSender, /// Queue receiver. @@ -23,20 +23,19 @@ pub struct Queue { impl Queue { /// Creates a new queue with a maximum size. - pub fn new(max_length: usize) -> Self { + pub fn new(max_length: usize, queue_status: Arc) -> Self { let (queue_tx, queue_rx) = bounded(max_length); - let message_count = Arc::new(AtomicUsize::new(0)); - + queue_status.store(0, Ordering::SeqCst); Queue { max_length, - message_count: message_count.clone(), + queue_status: queue_status.clone(), queue_tx: QueueSender { inner: queue_tx, - message_count: message_count.clone(), + queue_status: queue_status.clone(), }, queue_rx: QueueReceiver { inner: queue_rx, - message_count, + queue_status, }, } } @@ -58,7 +57,7 @@ impl Queue { /// Returns the current length of the queue. pub fn queue_length(&self) -> usize { - self.message_count.load(Ordering::SeqCst) + self.queue_status.load(Ordering::SeqCst) } } @@ -68,14 +67,14 @@ pub struct QueueSender { /// Crossbeam_Channel Sender. inner: Sender, /// Used to track current messages in the queue. - message_count: Arc, + queue_status: Arc, } impl Clone for QueueSender { fn clone(&self) -> Self { Self { inner: self.inner.clone(), - message_count: Arc::clone(&self.message_count), + queue_status: Arc::clone(&self.queue_status), } } } @@ -85,7 +84,7 @@ impl QueueSender { pub fn try_send(&self, message: T) -> Result<(), QueueError> { match self.inner.try_send(message) { Ok(_) => { - self.message_count.fetch_add(1, Ordering::SeqCst); + self.queue_status.fetch_add(1, Ordering::SeqCst); Ok(()) } Err(crossbeam_channel::TrySendError::Full(t)) => Err(QueueError::QueueFull(t)), @@ -95,7 +94,7 @@ impl QueueSender { /// Returns the current length of the queue. pub fn queue_length(&self) -> usize { - self.message_count.load(Ordering::SeqCst) + self.queue_status.load(Ordering::SeqCst) } } @@ -105,14 +104,14 @@ pub struct QueueReceiver { /// Crossbeam_Channel Receiver. inner: Receiver, /// Used to track current messages in the queue. - message_count: Arc, + queue_status: Arc, } impl Clone for QueueReceiver { fn clone(&self) -> Self { Self { inner: self.inner.clone(), - message_count: Arc::clone(&self.message_count), + queue_status: Arc::clone(&self.queue_status), } } } @@ -122,7 +121,7 @@ impl QueueReceiver { pub fn try_recv(&self) -> Result> { match self.inner.try_recv() { Ok(message) => { - self.message_count.fetch_sub(1, Ordering::SeqCst); + self.queue_status.fetch_sub(1, Ordering::SeqCst); Ok(message) } Err(crossbeam_channel::TryRecvError::Empty) => Err(QueueError::QueueEmpty), @@ -152,6 +151,6 @@ impl QueueReceiver { /// Returns the current length of the queue. pub fn queue_length(&self) -> usize { - self.message_count.load(Ordering::SeqCst) + self.queue_status.load(Ordering::SeqCst) } } diff --git a/zingo-rpc/src/server/worker.rs b/zingo-rpc/src/server/worker.rs index ab8037e..4f35c26 100644 --- a/zingo-rpc/src/server/worker.rs +++ b/zingo-rpc/src/server/worker.rs @@ -1,13 +1,12 @@ //! Holds the server worker implementation. use std::sync::{ - atomic::{AtomicBool, Ordering}, + atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }; use http::Uri; use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag; -use tokio::time::{Duration, Instant}; use tonic::transport::Server; use crate::{ @@ -16,6 +15,7 @@ use crate::{ error::{QueueError, WorkerError}, queue::{QueueReceiver, QueueSender}, request::ZingoProxyRequest, + AtomicStatus, }, }; @@ -25,64 +25,6 @@ use crate::proto::service::compact_tx_streamer_server::CompactTxStreamerServer; #[cfg(feature = "nym_poc")] use zcash_client_backend::proto::service::compact_tx_streamer_server::CompactTxStreamerServer; -/// Status of the worker. -#[derive(Debug, PartialEq, Clone, Copy)] -pub enum WorkerStatusType { - /// Running initial startup routine. - Spawning, - /// Processing requests from the queue. - Working, - /// Waiting for requests from the queue. - Standby, - /// Running shutdown routine. - Closing, -} - -/// Wrapper for StatusType that also holds initiation time, used for standby monitoring. -#[derive(Debug, Clone)] -pub enum WorkerStatus { - /// Running initial startup routine. - Spawning(Instant), - /// Processing requests from the queue. - Working(Instant), - /// Waiting for requests from the queue. - Standby(Instant), - /// Running shutdown routine. - Closing(Instant), -} - -impl WorkerStatus { - /// Create a new status with the current timestamp. - pub fn new(status: WorkerStatusType) -> WorkerStatus { - match status { - WorkerStatusType::Spawning => WorkerStatus::Spawning(Instant::now()), - WorkerStatusType::Working => WorkerStatus::Working(Instant::now()), - WorkerStatusType::Standby => WorkerStatus::Standby(Instant::now()), - WorkerStatusType::Closing => WorkerStatus::Closing(Instant::now()), - } - } - - /// Return the current status type and the duration the worker has been in this status. - pub fn status(&self) -> (WorkerStatusType, Duration) { - match self { - WorkerStatus::Spawning(timestamp) => (WorkerStatusType::Spawning, timestamp.elapsed()), - WorkerStatus::Working(timestamp) => (WorkerStatusType::Working, timestamp.elapsed()), - WorkerStatus::Standby(timestamp) => (WorkerStatusType::Standby, timestamp.elapsed()), - WorkerStatus::Closing(timestamp) => (WorkerStatusType::Closing, timestamp.elapsed()), - } - } - - /// Update the status to a new one, resetting the timestamp. - pub fn set(&mut self, new_status: WorkerStatusType) { - *self = match new_status { - WorkerStatusType::Spawning => WorkerStatus::Spawning(Instant::now()), - WorkerStatusType::Working => WorkerStatus::Working(Instant::now()), - WorkerStatusType::Standby => WorkerStatus::Standby(Instant::now()), - WorkerStatusType::Closing => WorkerStatus::Closing(Instant::now()), - } - } -} - /// A queue working is the entity that takes requests from the queue and processes them. /// /// TODO: - Add JsonRpcConnector to worker and pass to underlying RPC services. @@ -99,8 +41,10 @@ pub struct Worker { nym_response_queue: QueueSender<(Vec, AnonymousSenderTag)>, /// gRPC client used for processing requests received over http. grpc_client: GrpcClient, - /// Workers current status. - status: WorkerStatus, + // /// Workers current status, includes timestamp for despawning inactive workers.. + // worker_status: WorkerStatus, + /// Thread safe worker status. + atomic_status: AtomicStatus, /// Represents the Online status of the Worker. pub online: Arc, } @@ -114,6 +58,7 @@ impl Worker { nym_response_queue: QueueSender<(Vec, AnonymousSenderTag)>, lightwalletd_uri: Uri, zebrad_uri: Uri, + atomic_status: AtomicStatus, online: Arc, ) -> Self { let grpc_client = GrpcClient { @@ -127,7 +72,7 @@ impl Worker { requeue, nym_response_queue, grpc_client, - status: WorkerStatus::new(WorkerStatusType::Spawning), + atomic_status, online, } } @@ -141,7 +86,7 @@ impl Worker { let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50)); let svc = CompactTxStreamerServer::new(self.grpc_client.clone()); // TODO: create tonic server here for use within loop. - self.status.set(WorkerStatusType::Standby); + self.atomic_status.store(1); loop { tokio::select! { _ = interval.tick() => { @@ -159,19 +104,20 @@ impl Worker { return Ok(()); } Err(QueueError::QueueFull(_request)) => { + self.atomic_status.store(5); eprintln!("Request Queue Full. Failed to send response to queue.\nWorker shutting down."); // TODO: Handle this error! (cancel shutdown?). return Ok(()); } Err(e) => { - self.status.set(WorkerStatusType::Closing); + self.atomic_status.store(5); eprintln!("Request Queue Closed. Failed to send response to queue: {}\nWorker shutting down.", e); // TODO: Handle queue closed error here. (return correct error?) return Ok(()); } } } else { - self.status.set(WorkerStatusType::Working); + self.atomic_status.store(2); match request { ZingoProxyRequest::TcpServerRequest(request) => { Server::builder().add_service(svc.clone()) @@ -195,7 +141,7 @@ impl Worker { // TODO: Handle this error! (open second nym responder?). } Err(e) => { - self.status.set(WorkerStatusType::Closing); + self.atomic_status.store(5); eprintln!("Response Queue Closed. Failed to send response to queue: {}\nWorker shutting down.", e); // TODO: Handle queue closed error here. (return correct error?) return Ok(()); @@ -211,11 +157,11 @@ impl Worker { } } } - self.status.set(WorkerStatusType::Standby); + self.atomic_status.store(1); } } Err(_e) => { - self.status.set(WorkerStatusType::Closing); + self.atomic_status.store(5); eprintln!("Queue closed, worker shutting down."); // TODO: Handle queue closed error here. (return correct error?) return Ok(()); @@ -227,20 +173,23 @@ impl Worker { }) } - /// Checks indexers online status and workers internal status for closure signal. + /// Checks for closure signals. + /// + /// Checks AtomicStatus for closure signal. + /// Checks (online) AtomicBool for fatal error signal. pub async fn check_for_shutdown(&self) -> bool { - if let WorkerStatus::Closing(_) = self.status { + if self.atomic_status() >= 4 { return true; } if !self.check_online() { return true; } - return false; + false } /// Sets the worker to close gracefully. pub async fn shutdown(&mut self) { - self.status.set(WorkerStatusType::Closing) + self.atomic_status.store(4) } /// Returns the worker's ID. @@ -248,9 +197,9 @@ impl Worker { self.worker_id } - /// Returns the workers current status. - pub fn status(&self) -> (WorkerStatusType, Duration) { - self.status.status() + /// Loads the workers current atomic status. + pub fn atomic_status(&self) -> usize { + self.atomic_status.load() } /// Check the online status on the server. @@ -259,6 +208,13 @@ impl Worker { } } +/// Holds the status of the worker pool and its workers. +#[derive(Debug, Clone)] +pub struct WorkerPoolStatus { + workers: Arc, + statuses: Vec, +} + /// Dynamically sized pool of workers. #[derive(Debug, Clone)] pub struct WorkerPool { @@ -268,6 +224,8 @@ pub struct WorkerPool { idle_size: usize, /// Workers currently in the pool workers: Vec, + /// Status of the workerpool and its workers. + status: WorkerPoolStatus, /// Represents the Online status of the WorkerPool. pub online: Arc, } @@ -282,6 +240,7 @@ impl WorkerPool { nym_response_queue: QueueSender<(Vec, AnonymousSenderTag)>, lightwalletd_uri: Uri, zebrad_uri: Uri, + status: WorkerPoolStatus, online: Arc, ) -> Self { let mut workers: Vec = Vec::with_capacity(max_size); @@ -294,16 +253,18 @@ impl WorkerPool { nym_response_queue.clone(), lightwalletd_uri.clone(), zebrad_uri.clone(), + status.statuses[workers.len()].clone(), online.clone(), ) .await, ); } - + status.workers.store(idle_size, Ordering::SeqCst); WorkerPool { max_size, idle_size, workers, + status, online, } } @@ -332,10 +293,12 @@ impl WorkerPool { self.workers[0].nym_response_queue.clone(), self.workers[0].grpc_client.lightwalletd_uri.clone(), self.workers[0].grpc_client.zebrad_uri.clone(), + self.status.statuses[self.workers.len()].clone(), self.online.clone(), ) .await, ); + self.status.workers.fetch_add(1, Ordering::SeqCst); Ok(self.workers[self.workers.len()].clone().serve().await) } } @@ -353,18 +316,24 @@ impl WorkerPool { match worker_handle.await { Ok(worker) => match worker { Ok(()) => { + self.status.statuses[worker_index].store(5); self.workers.pop(); + self.status.workers.fetch_sub(1, Ordering::SeqCst); return Ok(()); } Err(e) => { + self.status.statuses[worker_index].store(6); eprintln!("Worker returned error on shutdown: {}", e); - // TODO: Handle the inner WorkerError + // TODO: Handle the inner WorkerError. Return error. + self.status.workers.fetch_sub(1, Ordering::SeqCst); return Ok(()); } }, Err(e) => { + self.status.statuses[worker_index].store(6); eprintln!("Worker returned error on shutdown: {}", e); - // TODO: Handle the JoinError + // TODO: Handle the JoinError. Return error. + self.status.workers.fetch_sub(1, Ordering::SeqCst); return Ok(()); } }; @@ -386,24 +355,13 @@ impl WorkerPool { self.workers.len() } - /// Returns the statuses of all the workers in the workerpool. - pub fn statuses(&self) -> Vec<(WorkerStatusType, Duration)> { - let mut worker_statuses = Vec::new(); - for i in 0..self.workers.len() { - worker_statuses.push(self.workers[i].status()) + /// Fetches and returns the status of the workerpool and its workers. + pub fn status(&self) -> WorkerPoolStatus { + self.status.workers.load(Ordering::SeqCst); + for i in 0..self.workers() { + self.status.statuses[i].load(); } - worker_statuses - } - - /// Returns the number of workers in Standby mode for 30 seconds or longer. - pub fn check_long_standby(&self) -> usize { - let statuses = self.statuses(); - statuses - .iter() - .filter(|(status, duration)| { - *status == WorkerStatusType::Standby && *duration >= Duration::from_secs(30) - }) - .count() + self.status.clone() } /// Shuts down all the workers in the pool. @@ -417,16 +375,22 @@ impl WorkerPool { match worker_handle.await { Ok(worker) => match worker { Ok(()) => { + self.status.statuses[i].store(5); self.workers.pop(); + self.status.workers.fetch_sub(1, Ordering::SeqCst); } Err(e) => { + self.status.statuses[i].store(6); eprintln!("Worker returned error on shutdown: {}", e); // TODO: Handle the inner WorkerError + self.status.workers.fetch_sub(1, Ordering::SeqCst); } }, Err(e) => { + self.status.statuses[i].store(6); eprintln!("Worker returned error on shutdown: {}", e); // TODO: Handle the JoinError + self.status.workers.fetch_sub(1, Ordering::SeqCst); } }; }