From 67bff8bff0468470bf8d9e28621577ff027bc026 Mon Sep 17 00:00:00 2001 From: leo Date: Tue, 26 Nov 2024 14:43:03 +0000 Subject: [PATCH] feat(jstzd): spawn jstz-node to jstzd server --- crates/jstzd/src/task/child_wrapper.rs | 4 +- crates/jstzd/src/task/jstzd.rs | 70 ++++++++++++++++++++------ crates/jstzd/tests/jstzd_test.rs | 24 +++++++-- 3 files changed, 79 insertions(+), 19 deletions(-) diff --git a/crates/jstzd/src/task/child_wrapper.rs b/crates/jstzd/src/task/child_wrapper.rs index fb8a04402..b2b51cc53 100644 --- a/crates/jstzd/src/task/child_wrapper.rs +++ b/crates/jstzd/src/task/child_wrapper.rs @@ -4,7 +4,9 @@ use std::sync::Arc; use tokio::process::Child; use tokio::sync::RwLock; -pub type SharedChildWrapper = Arc>>; +pub type Shared = Arc>; + +pub type SharedChildWrapper = Shared>; #[derive(Default)] pub struct ChildWrapper { diff --git a/crates/jstzd/src/task/jstzd.rs b/crates/jstzd/src/task/jstzd.rs index 41afc60cb..8afe33257 100644 --- a/crates/jstzd/src/task/jstzd.rs +++ b/crates/jstzd/src/task/jstzd.rs @@ -1,4 +1,7 @@ use super::{ + // child_wrapper::Shared, + child_wrapper::Shared, + jstz_node::JstzNode, octez_baker::OctezBaker, octez_node::OctezNode, octez_rollup::OctezRollup, @@ -14,6 +17,7 @@ use axum::{ routing::{get, put}, Router, }; +use jstz_node::config::JstzNodeConfig; use octez::r#async::{ baker::OctezBakerConfig, client::{OctezClient, OctezClientConfig}, @@ -26,10 +30,20 @@ use serde::Serialize; use std::sync::Arc; use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle}; +trait IntoShared { + fn into_shared(self) -> Shared; +} + +impl IntoShared for T { + fn into_shared(self) -> Shared { + Arc::new(RwLock::new(self)) + } +} struct Jstzd { - octez_node: Arc>, - baker: Arc>, - rollup: Arc>, + octez_node: Shared, + baker: Shared, + rollup: Shared, + jstz_node: Shared, } #[derive(Clone, Serialize)] @@ -43,6 +57,8 @@ pub struct JstzdConfig { #[serde(skip_serializing)] octez_rollup_config: OctezRollupConfig, #[serde(skip_serializing)] + jstz_node_config: JstzNodeConfig, + #[serde(skip_serializing)] protocol_params: ProtocolParameter, } @@ -52,6 +68,7 @@ impl JstzdConfig { baker_config: OctezBakerConfig, octez_client_config: OctezClientConfig, octez_rollup_config: OctezRollupConfig, + jstz_node_config: JstzNodeConfig, protocol_params: ProtocolParameter, ) -> Self { Self { @@ -59,6 +76,7 @@ impl JstzdConfig { baker_config, octez_client_config, octez_rollup_config, + jstz_node_config, protocol_params, } } @@ -91,10 +109,13 @@ impl Task for Jstzd { let baker = OctezBaker::spawn(config.baker_config.clone()).await?; Self::wait_for_block_level(&config.octez_node_config.rpc_endpoint, 3).await?; let rollup = OctezRollup::spawn(config.octez_rollup_config.clone()).await?; + let jstz_node = JstzNode::spawn(config.jstz_node_config).await?; + Self::wait_for_jstz_node(&jstz_node).await?; Ok(Self { - octez_node: Arc::new(RwLock::new(octez_node)), - baker: Arc::new(RwLock::new(baker)), - rollup: Arc::new(RwLock::new(rollup)), + octez_node: octez_node.into_shared(), + baker: baker.into_shared(), + rollup: rollup.into_shared(), + jstz_node: jstz_node.into_shared(), }) } @@ -103,6 +124,7 @@ impl Task for Jstzd { self.octez_node.write().await.kill(), self.baker.write().await.kill(), self.rollup.write().await.kill(), + self.jstz_node.write().await.kill(), ]) .await; @@ -125,6 +147,7 @@ impl Task for Jstzd { self.octez_node.read().await.health_check(), self.baker.read().await.health_check(), self.rollup.read().await.health_check(), + self.jstz_node.read().await.health_check(), ]) .await; @@ -138,7 +161,7 @@ impl Task for Jstzd { } if !err.is_empty() { - Err(anyhow::anyhow!("failed to perform health check: {:?}", err)) + bail!("failed to perform health check: {:?}", err) } else { Ok(healthy) } @@ -210,11 +233,19 @@ impl Jstzd { } Ok(()) } + + async fn wait_for_jstz_node(jstz_node: &JstzNode) -> Result<()> { + let ready = retry(10, 1000, || async { jstz_node.health_check().await }).await; + if !ready { + bail!("jstz node is still not ready after retries"); + } + Ok(()) + } } #[derive(Clone, Default)] pub struct JstzdServerInner { - state: Arc>, + state: Shared, } #[derive(Default)] @@ -314,6 +345,19 @@ impl JstzdServer { None => false, } } + + pub async fn jstz_node_healthy(&self) -> bool { + match &self.inner.state.read().await.jstzd { + Some(v) => v + .jstz_node + .read() + .await + .health_check() + .await + .unwrap_or(false), + None => false, + } + } } async fn health_check(state: &ServerState) -> bool { @@ -345,9 +389,7 @@ async fn shutdown(state: &mut ServerState) -> Result<()> { Ok(()) } -async fn health_check_handler( - state: State>>, -) -> http::StatusCode { +async fn health_check_handler(state: State>) -> http::StatusCode { let lock = state.read().await; match health_check(&lock).await { true => http::StatusCode::OK, @@ -355,7 +397,7 @@ async fn health_check_handler( } } -async fn shutdown_handler(state: State>>) -> http::StatusCode { +async fn shutdown_handler(state: State>) -> http::StatusCode { let mut lock = state.write().await; if shutdown(&mut lock).await.is_err() { return http::StatusCode::INTERNAL_SERVER_ERROR; @@ -363,13 +405,13 @@ async fn shutdown_handler(state: State>>) -> http::Statu http::StatusCode::NO_CONTENT } -async fn all_config_handler(state: State>>) -> impl IntoResponse { +async fn all_config_handler(state: State>) -> impl IntoResponse { let config = &state.read().await.jstzd_config_json; serde_json::to_string(config).unwrap().into_response() } async fn config_handler( - state: State>>, + state: State>, Path(config_type): Path, ) -> impl IntoResponse { let config = &state.read().await.jstzd_config_json; diff --git a/crates/jstzd/tests/jstzd_test.rs b/crates/jstzd/tests/jstzd_test.rs index c71e10903..31fe91109 100644 --- a/crates/jstzd/tests/jstzd_test.rs +++ b/crates/jstzd/tests/jstzd_test.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use std::str::FromStr; use http::Uri; +use jstz_node::config::JstzNodeConfig; use jstzd::task::jstzd::{JstzdConfig, JstzdServer}; use jstzd::task::utils::retry; use jstzd::{EXCHANGER_ADDRESS, JSTZ_NATIVE_BRIDGE_ADDRESS, JSTZ_ROLLUP_ADDRESS}; @@ -41,10 +42,15 @@ async fn jstzd_test() { Uri::from_str(&format!("http://127.0.0.1:{}", unused_port())).unwrap(), ) .unwrap(); + let jstz_node_rpc_endpoint = Endpoint::localhost(unused_port()); let jstzd_port = unused_port(); - let (mut jstzd, config, kernel_debug_file) = - create_jstzd_server(&octez_node_rpc_endpoint, &rollup_rpc_endpoint, jstzd_port) - .await; + let (mut jstzd, config, kernel_debug_file) = create_jstzd_server( + &octez_node_rpc_endpoint, + &rollup_rpc_endpoint, + &jstz_node_rpc_endpoint, + jstzd_port, + ) + .await; jstzd.run().await.unwrap(); ensure_jstzd_components_are_up(&jstzd, &octez_node_rpc_endpoint, jstzd_port).await; @@ -74,6 +80,7 @@ async fn jstzd_test() { async fn create_jstzd_server( octez_node_rpc_endpoint: &Endpoint, rollup_rpc_endpoint: &Endpoint, + jstz_node_rpc_endpoint: &Endpoint, jstzd_port: u16, ) -> (JstzdServer, JstzdConfig, NamedTempFile) { let run_options = OctezNodeRunOptionsBuilder::new() @@ -142,12 +149,18 @@ async fn create_jstzd_server( .set_kernel_debug_file(kernel_debug_file.path()) .build() .expect("failed to build rollup config"); + let jstz_node_config = JstzNodeConfig::new( + jstz_node_rpc_endpoint, + &rollup_config.rpc_endpoint, + kernel_debug_file.path(), + ); let config = JstzdConfig::new( octez_node_config, baker_config, octez_client_config.clone(), rollup_config.clone(), + jstz_node_config, protocol_params, ); ( @@ -181,6 +194,9 @@ async fn ensure_jstzd_components_are_up( let rollup_running = retry(30, 1000, || async { Ok(jstzd.rollup_healthy().await) }).await; assert!(rollup_running); + let jstz_node_running = + retry(30, 1000, || async { Ok(jstzd.jstz_node_healthy().await) }).await; + assert!(jstz_node_running); assert!(jstzd.health_check().await); } @@ -216,8 +232,8 @@ async fn ensure_jstzd_components_are_down( .await; assert!(node_destroyed); assert!(!jstzd.baker_healthy().await); - assert!(!jstzd.rollup_healthy().await); + assert!(!jstzd.jstz_node_healthy().await); assert!(!jstzd.health_check().await); }