diff --git a/Cargo.lock b/Cargo.lock index 65aca87..adff143 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -646,9 +646,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.102" +version = "0.2.103" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2a5ac8f984bfcf3a823267e5fde638acc3325f6496633a5da6bb6eb2171e103" +checksum = "dd8f7255a17a627354f321ef0055d63b898c6fb27eff628af4d1b66b7331edf6" [[package]] name = "lock_api" diff --git a/bin/config-template.toml b/bin/config-template.toml index 0d9ee5f..0d20ff6 100644 --- a/bin/config-template.toml +++ b/bin/config-template.toml @@ -1,8 +1,13 @@ [ws] # Elara kv websocket service address addr = "127.0.0.1:9002" +# default to 10 +heartbeat_interval_sec = 10 [client] +# response timeout +# default to 3000 +timeout_ms = 3000 max_request_cap = 256 max_cap_per_subscription = 64 @@ -10,5 +15,5 @@ max_cap_per_subscription = 64 [nodes.polkadot] url = "wss://rpc.polkadot.io" # kusama config -#[nodes.kusama] -#url = "wss://kusama-rpc.polkadot.io" +[nodes.kusama] +url = "wss://kusama-rpc.polkadot.io" diff --git a/bin/main.rs b/bin/main.rs index ab123a2..73755d3 100644 --- a/bin/main.rs +++ b/bin/main.rs @@ -11,7 +11,7 @@ async fn main() -> Result<()> { let opt = CliOpts::init(); let toml = fs::read_to_string(opt.config.as_path())?; let config = ServiceConfig::parse(toml)?; - log::debug!("Load config: {:#?}", config); + log::info!("Load config: {:#?}", config); start_server(config).await } diff --git a/src/config.rs b/src/config.rs index 7893934..47cc7b0 100644 --- a/src/config.rs +++ b/src/config.rs @@ -43,6 +43,7 @@ struct ServiceInnerConfig { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct WsConfig { pub addr: String, + pub heartbeat_interval_sec: Option, } #[derive(Clone, Debug, Serialize, Deserialize)] @@ -52,6 +53,7 @@ pub struct NodeConfig { #[derive(Copy, Clone, Debug, Serialize, Deserialize)] pub struct RpcClientConfig { + pub timeout_ms: Option, pub max_request_cap: Option, pub max_cap_per_subscription: Option, } @@ -74,6 +76,7 @@ fn test_toml_config() { let config = ServiceInnerConfig { ws: WsConfig { addr: "localhost:9002".into(), + heartbeat_interval_sec: Some(10), }, nodes, client: None, diff --git a/src/lib.rs b/src/lib.rs index 2e4f677..7de4d65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,8 +53,8 @@ impl From<&str> for Chain { } } -const CONN_EXPIRED_TIME_SECS: u64 = 10; -const CHECK_CONN_ALIVE_SECS: u64 = 10; +const CONN_EXPIRED_TIME_SEC: u64 = 10; +const HEARTBEAT_INTERVAL_SEC: u64 = 10; pub async fn start_server(config: ServiceConfig) -> Result<()> { let server = WsServer::bind(config.ws.addr.as_str()).await?; @@ -103,7 +103,7 @@ async fn remove_expired_connections(mut conns: WsConnections) { log::info!("Total connection num: {}", conns.len().await); } - tokio::time::sleep(Duration::from_secs(CONN_EXPIRED_TIME_SECS)).await; + tokio::time::sleep(Duration::from_secs(CONN_EXPIRED_TIME_SEC)).await; } } @@ -117,7 +117,8 @@ async fn create_clients( let mut client = RpcClient::new(chain.clone(), node.url.clone(), config.client).await?; subscribe_chain(connections.clone(), &mut client).await; let client = Arc::new(RwLock::new(client)); - tokio::spawn(health_check(connections.clone(), client.clone())); + let config = config.clone(); + tokio::spawn(health_check(connections.clone(), client.clone(), config)); // maintains these clients clients.insert(chain.clone(), client); } @@ -126,14 +127,28 @@ async fn create_clients( } // if rpc client is not alive, we reconnect to peer -async fn health_check(connections: WsConnections, client: ArcRpcClient) { +async fn health_check(connections: WsConnections, client: ArcRpcClient, config: ServiceConfig) { loop { - tokio::time::sleep(Duration::from_secs(CHECK_CONN_ALIVE_SECS)).await; + tokio::time::sleep(Duration::from_secs( + config + .ws + .heartbeat_interval_sec + .unwrap_or(HEARTBEAT_INTERVAL_SEC), + )) + .await; + let mut is_alive = true; + // use read lock. + { + let client = client.read().await; + if !client.is_alive().await { + is_alive = false; + } + } - let mut client = client.write().await; - if !client.is_alive().await { + if !is_alive { + let mut client = client.write().await; log::info!( - "Trying to reconnect to `{}` to `{}`...", + "Trying to reconnect to `{}`/`{}`...", client.chain(), client.addr() ); diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 2f352c4..d64b14f 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -15,6 +15,7 @@ use crate::{ substrate::constants::{state_getRuntimeVersion, system_health}, Chain, }; +use tokio::time::Duration; pub type Result = std::result::Result; pub type NotificationStream = WsSubscription; @@ -25,6 +26,7 @@ pub struct RpcClient { ws: WsClient, chain: Chain, addr: String, + config: Option, // TODO: use arc for splitting lifetime pub ctx: Option, } @@ -40,26 +42,33 @@ pub type ArcRpcClient = Arc>; pub type RpcClients = HashMap; impl RpcClient { - pub async fn new( - chain: Chain, - addr: impl Into, - config: Option, - ) -> Result { - let addr = addr.into(); + async fn create_ws_client(addr: &str, config: Option) -> Result { let ws = if let Some(config) = config { WsClient::builder() + .timeout(Duration::from_millis(config.timeout_ms.unwrap_or(3000))) .max_concurrent_request_capacity(config.max_request_cap.unwrap_or(256)) .max_capacity_per_subscription(config.max_cap_per_subscription.unwrap_or(64)) - .build(addr.as_str()) + .build(addr) .await? } else { - WsClient::new(addr.as_str()).await? + WsClient::new(addr).await? }; + Ok(ws) + } + + pub async fn new( + chain: Chain, + addr: impl Into, + config: Option, + ) -> Result { + let addr = addr.into(); + let ws = Self::create_ws_client(addr.as_str(), config).await?; Ok(Self { chain, ws, addr, + config, ctx: Default::default(), }) } @@ -114,7 +123,7 @@ impl RpcClient { // After `reconnect`ed, client need to re`subscribe`. pub async fn reconnect(&mut self) -> Result<()> { - self.ws = WsClient::new(self.addr.as_str()).await?; + self.ws = Self::create_ws_client(self.addr.as_str(), self.config).await?; Ok(()) }