Skip to content

Commit

Permalink
feat(jstzd): spawn jstz-node to jstzd server
Browse files Browse the repository at this point in the history
  • Loading branch information
ryutamago committed Nov 29, 2024
1 parent 3933223 commit 67bff8b
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 19 deletions.
4 changes: 3 additions & 1 deletion crates/jstzd/src/task/child_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use std::sync::Arc;
use tokio::process::Child;
use tokio::sync::RwLock;

pub type SharedChildWrapper = Arc<RwLock<AsyncDropper<ChildWrapper>>>;
pub type Shared<T> = Arc<RwLock<T>>;

pub type SharedChildWrapper = Shared<AsyncDropper<ChildWrapper>>;

#[derive(Default)]
pub struct ChildWrapper {
Expand Down
70 changes: 56 additions & 14 deletions crates/jstzd/src/task/jstzd.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{
// child_wrapper::Shared,
child_wrapper::Shared,
jstz_node::JstzNode,
octez_baker::OctezBaker,
octez_node::OctezNode,
octez_rollup::OctezRollup,
Expand All @@ -14,6 +17,7 @@ use axum::{
routing::{get, put},
Router,
};
use jstz_node::config::JstzNodeConfig;
use octez::r#async::{
baker::OctezBakerConfig,
client::{OctezClient, OctezClientConfig},
Expand All @@ -26,10 +30,20 @@ use serde::Serialize;
use std::sync::Arc;
use tokio::{net::TcpListener, sync::RwLock, task::JoinHandle};

trait IntoShared {
fn into_shared(self) -> Shared<Self>;
}

impl<T: Task> IntoShared for T {
fn into_shared(self) -> Shared<Self> {
Arc::new(RwLock::new(self))
}
}
struct Jstzd {
octez_node: Arc<RwLock<OctezNode>>,
baker: Arc<RwLock<OctezBaker>>,
rollup: Arc<RwLock<OctezRollup>>,
octez_node: Shared<OctezNode>,
baker: Shared<OctezBaker>,
rollup: Shared<OctezRollup>,
jstz_node: Shared<JstzNode>,
}

#[derive(Clone, Serialize)]
Expand All @@ -43,6 +57,8 @@ pub struct JstzdConfig {
#[serde(skip_serializing)]
octez_rollup_config: OctezRollupConfig,
#[serde(skip_serializing)]
jstz_node_config: JstzNodeConfig,
#[serde(skip_serializing)]
protocol_params: ProtocolParameter,
}

Expand All @@ -52,13 +68,15 @@ impl JstzdConfig {
baker_config: OctezBakerConfig,
octez_client_config: OctezClientConfig,
octez_rollup_config: OctezRollupConfig,
jstz_node_config: JstzNodeConfig,
protocol_params: ProtocolParameter,
) -> Self {
Self {
octez_node_config,
baker_config,
octez_client_config,
octez_rollup_config,
jstz_node_config,
protocol_params,
}
}
Expand Down Expand Up @@ -91,10 +109,13 @@ impl Task for Jstzd {
let baker = OctezBaker::spawn(config.baker_config.clone()).await?;
Self::wait_for_block_level(&config.octez_node_config.rpc_endpoint, 3).await?;
let rollup = OctezRollup::spawn(config.octez_rollup_config.clone()).await?;
let jstz_node = JstzNode::spawn(config.jstz_node_config).await?;
Self::wait_for_jstz_node(&jstz_node).await?;
Ok(Self {
octez_node: Arc::new(RwLock::new(octez_node)),
baker: Arc::new(RwLock::new(baker)),
rollup: Arc::new(RwLock::new(rollup)),
octez_node: octez_node.into_shared(),
baker: baker.into_shared(),
rollup: rollup.into_shared(),
jstz_node: jstz_node.into_shared(),
})
}

Expand All @@ -103,6 +124,7 @@ impl Task for Jstzd {
self.octez_node.write().await.kill(),
self.baker.write().await.kill(),
self.rollup.write().await.kill(),
self.jstz_node.write().await.kill(),
])
.await;

Expand All @@ -125,6 +147,7 @@ impl Task for Jstzd {
self.octez_node.read().await.health_check(),
self.baker.read().await.health_check(),
self.rollup.read().await.health_check(),
self.jstz_node.read().await.health_check(),
])
.await;

Expand All @@ -138,7 +161,7 @@ impl Task for Jstzd {
}

if !err.is_empty() {
Err(anyhow::anyhow!("failed to perform health check: {:?}", err))
bail!("failed to perform health check: {:?}", err)
} else {
Ok(healthy)
}
Expand Down Expand Up @@ -210,11 +233,19 @@ impl Jstzd {
}
Ok(())
}

async fn wait_for_jstz_node(jstz_node: &JstzNode) -> Result<()> {
let ready = retry(10, 1000, || async { jstz_node.health_check().await }).await;
if !ready {
bail!("jstz node is still not ready after retries");
}
Ok(())
}
}

#[derive(Clone, Default)]
pub struct JstzdServerInner {
state: Arc<RwLock<ServerState>>,
state: Shared<ServerState>,
}

#[derive(Default)]
Expand Down Expand Up @@ -314,6 +345,19 @@ impl JstzdServer {
None => false,
}
}

pub async fn jstz_node_healthy(&self) -> bool {
match &self.inner.state.read().await.jstzd {
Some(v) => v
.jstz_node
.read()
.await
.health_check()
.await
.unwrap_or(false),
None => false,
}
}
}

async fn health_check(state: &ServerState) -> bool {
Expand Down Expand Up @@ -345,31 +389,29 @@ async fn shutdown(state: &mut ServerState) -> Result<()> {
Ok(())
}

async fn health_check_handler(
state: State<Arc<RwLock<ServerState>>>,
) -> http::StatusCode {
async fn health_check_handler(state: State<Shared<ServerState>>) -> http::StatusCode {
let lock = state.read().await;
match health_check(&lock).await {
true => http::StatusCode::OK,
_ => http::StatusCode::INTERNAL_SERVER_ERROR,
}
}

async fn shutdown_handler(state: State<Arc<RwLock<ServerState>>>) -> http::StatusCode {
async fn shutdown_handler(state: State<Shared<ServerState>>) -> http::StatusCode {
let mut lock = state.write().await;
if shutdown(&mut lock).await.is_err() {
return http::StatusCode::INTERNAL_SERVER_ERROR;
};
http::StatusCode::NO_CONTENT
}

async fn all_config_handler(state: State<Arc<RwLock<ServerState>>>) -> impl IntoResponse {
async fn all_config_handler(state: State<Shared<ServerState>>) -> impl IntoResponse {
let config = &state.read().await.jstzd_config_json;
serde_json::to_string(config).unwrap().into_response()
}

async fn config_handler(
state: State<Arc<RwLock<ServerState>>>,
state: State<Shared<ServerState>>,
Path(config_type): Path<String>,
) -> impl IntoResponse {
let config = &state.read().await.jstzd_config_json;
Expand Down
24 changes: 20 additions & 4 deletions crates/jstzd/tests/jstzd_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::PathBuf;
use std::str::FromStr;

use http::Uri;
use jstz_node::config::JstzNodeConfig;
use jstzd::task::jstzd::{JstzdConfig, JstzdServer};
use jstzd::task::utils::retry;
use jstzd::{EXCHANGER_ADDRESS, JSTZ_NATIVE_BRIDGE_ADDRESS, JSTZ_ROLLUP_ADDRESS};
Expand Down Expand Up @@ -41,10 +42,15 @@ async fn jstzd_test() {
Uri::from_str(&format!("http://127.0.0.1:{}", unused_port())).unwrap(),
)
.unwrap();
let jstz_node_rpc_endpoint = Endpoint::localhost(unused_port());
let jstzd_port = unused_port();
let (mut jstzd, config, kernel_debug_file) =
create_jstzd_server(&octez_node_rpc_endpoint, &rollup_rpc_endpoint, jstzd_port)
.await;
let (mut jstzd, config, kernel_debug_file) = create_jstzd_server(
&octez_node_rpc_endpoint,
&rollup_rpc_endpoint,
&jstz_node_rpc_endpoint,
jstzd_port,
)
.await;

jstzd.run().await.unwrap();
ensure_jstzd_components_are_up(&jstzd, &octez_node_rpc_endpoint, jstzd_port).await;
Expand Down Expand Up @@ -74,6 +80,7 @@ async fn jstzd_test() {
async fn create_jstzd_server(
octez_node_rpc_endpoint: &Endpoint,
rollup_rpc_endpoint: &Endpoint,
jstz_node_rpc_endpoint: &Endpoint,
jstzd_port: u16,
) -> (JstzdServer, JstzdConfig, NamedTempFile) {
let run_options = OctezNodeRunOptionsBuilder::new()
Expand Down Expand Up @@ -142,12 +149,18 @@ async fn create_jstzd_server(
.set_kernel_debug_file(kernel_debug_file.path())
.build()
.expect("failed to build rollup config");
let jstz_node_config = JstzNodeConfig::new(
jstz_node_rpc_endpoint,
&rollup_config.rpc_endpoint,
kernel_debug_file.path(),
);

let config = JstzdConfig::new(
octez_node_config,
baker_config,
octez_client_config.clone(),
rollup_config.clone(),
jstz_node_config,
protocol_params,
);
(
Expand Down Expand Up @@ -181,6 +194,9 @@ async fn ensure_jstzd_components_are_up(
let rollup_running =
retry(30, 1000, || async { Ok(jstzd.rollup_healthy().await) }).await;
assert!(rollup_running);
let jstz_node_running =
retry(30, 1000, || async { Ok(jstzd.jstz_node_healthy().await) }).await;
assert!(jstz_node_running);
assert!(jstzd.health_check().await);
}

Expand Down Expand Up @@ -216,8 +232,8 @@ async fn ensure_jstzd_components_are_down(
.await;
assert!(node_destroyed);
assert!(!jstzd.baker_healthy().await);

assert!(!jstzd.rollup_healthy().await);
assert!(!jstzd.jstz_node_healthy().await);
assert!(!jstzd.health_check().await);
}

Expand Down

0 comments on commit 67bff8b

Please sign in to comment.