Skip to content
This repository has been archived by the owner on Aug 9, 2022. It is now read-only.

Commit

Permalink
fix: fix blocking timeout and add more related config (#61)
Browse files Browse the repository at this point in the history
* fix: fix blocking timeout and more related config; decrease lock time for health check

* fix test

* fmt
  • Loading branch information
yjhmelody authored Sep 28, 2021
1 parent 23569a1 commit ea82501
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 23 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions bin/config-template.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
[ws]
# Elara kv websocket service address
addr = "127.0.0.1:9002"
# default to 10
heartbeat_interval_sec = 10

[client]
# response timeout
# default to 3000
timeout_ms = 3000
max_request_cap = 256
max_cap_per_subscription = 64

# polkadot config
[nodes.polkadot]
url = "wss://rpc.polkadot.io"
# kusama config
#[nodes.kusama]
#url = "wss://kusama-rpc.polkadot.io"
[nodes.kusama]
url = "wss://kusama-rpc.polkadot.io"
2 changes: 1 addition & 1 deletion bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async fn main() -> Result<()> {
let opt = CliOpts::init();
let toml = fs::read_to_string(opt.config.as_path())?;
let config = ServiceConfig::parse(toml)?;
log::debug!("Load config: {:#?}", config);
log::info!("Load config: {:#?}", config);

start_server(config).await
}
3 changes: 3 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct ServiceInnerConfig {
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct WsConfig {
pub addr: String,
pub heartbeat_interval_sec: Option<u64>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
Expand All @@ -52,6 +53,7 @@ pub struct NodeConfig {

#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
pub struct RpcClientConfig {
pub timeout_ms: Option<u64>,
pub max_request_cap: Option<usize>,
pub max_cap_per_subscription: Option<usize>,
}
Expand All @@ -74,6 +76,7 @@ fn test_toml_config() {
let config = ServiceInnerConfig {
ws: WsConfig {
addr: "localhost:9002".into(),
heartbeat_interval_sec: Some(10),
},
nodes,
client: None,
Expand Down
33 changes: 24 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ impl From<&str> for Chain {
}
}

const CONN_EXPIRED_TIME_SECS: u64 = 10;
const CHECK_CONN_ALIVE_SECS: u64 = 10;
const CONN_EXPIRED_TIME_SEC: u64 = 10;
const HEARTBEAT_INTERVAL_SEC: u64 = 10;

pub async fn start_server(config: ServiceConfig) -> Result<()> {
let server = WsServer::bind(config.ws.addr.as_str()).await?;
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn remove_expired_connections(mut conns: WsConnections) {
log::info!("Total connection num: {}", conns.len().await);
}

tokio::time::sleep(Duration::from_secs(CONN_EXPIRED_TIME_SECS)).await;
tokio::time::sleep(Duration::from_secs(CONN_EXPIRED_TIME_SEC)).await;
}
}

Expand All @@ -117,7 +117,8 @@ async fn create_clients(
let mut client = RpcClient::new(chain.clone(), node.url.clone(), config.client).await?;
subscribe_chain(connections.clone(), &mut client).await;
let client = Arc::new(RwLock::new(client));
tokio::spawn(health_check(connections.clone(), client.clone()));
let config = config.clone();
tokio::spawn(health_check(connections.clone(), client.clone(), config));
// maintains these clients
clients.insert(chain.clone(), client);
}
Expand All @@ -126,14 +127,28 @@ async fn create_clients(
}

// if rpc client is not alive, we reconnect to peer
async fn health_check(connections: WsConnections, client: ArcRpcClient) {
async fn health_check(connections: WsConnections, client: ArcRpcClient, config: ServiceConfig) {
loop {
tokio::time::sleep(Duration::from_secs(CHECK_CONN_ALIVE_SECS)).await;
tokio::time::sleep(Duration::from_secs(
config
.ws
.heartbeat_interval_sec
.unwrap_or(HEARTBEAT_INTERVAL_SEC),
))
.await;
let mut is_alive = true;
// use read lock.
{
let client = client.read().await;
if !client.is_alive().await {
is_alive = false;
}
}

let mut client = client.write().await;
if !client.is_alive().await {
if !is_alive {
let mut client = client.write().await;
log::info!(
"Trying to reconnect to `{}` to `{}`...",
"Trying to reconnect to `{}`/`{}`...",
client.chain(),
client.addr()
);
Expand Down
27 changes: 18 additions & 9 deletions src/rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::{
substrate::constants::{state_getRuntimeVersion, system_health},
Chain,
};
use tokio::time::Duration;

pub type Result<T, E = WsClientError> = std::result::Result<T, E>;
pub type NotificationStream = WsSubscription<SubscriptionNotification>;
Expand All @@ -25,6 +26,7 @@ pub struct RpcClient {
ws: WsClient,
chain: Chain,
addr: String,
config: Option<RpcClientConfig>,
// TODO: use arc for splitting lifetime
pub ctx: Option<RpcClientCtx>,
}
Expand All @@ -40,26 +42,33 @@ pub type ArcRpcClient = Arc<RwLock<RpcClient>>;
pub type RpcClients = HashMap<Chain, ArcRpcClient>;

impl RpcClient {
pub async fn new(
chain: Chain,
addr: impl Into<String>,
config: Option<RpcClientConfig>,
) -> Result<Self> {
let addr = addr.into();
async fn create_ws_client(addr: &str, config: Option<RpcClientConfig>) -> Result<WsClient> {
let ws = if let Some(config) = config {
WsClient::builder()
.timeout(Duration::from_millis(config.timeout_ms.unwrap_or(3000)))
.max_concurrent_request_capacity(config.max_request_cap.unwrap_or(256))
.max_capacity_per_subscription(config.max_cap_per_subscription.unwrap_or(64))
.build(addr.as_str())
.build(addr)
.await?
} else {
WsClient::new(addr.as_str()).await?
WsClient::new(addr).await?
};
Ok(ws)
}

pub async fn new(
chain: Chain,
addr: impl Into<String>,
config: Option<RpcClientConfig>,
) -> Result<Self> {
let addr = addr.into();
let ws = Self::create_ws_client(addr.as_str(), config).await?;

Ok(Self {
chain,
ws,
addr,
config,
ctx: Default::default(),
})
}
Expand Down Expand Up @@ -114,7 +123,7 @@ impl RpcClient {

// After `reconnect`ed, client need to re`subscribe`.
pub async fn reconnect(&mut self) -> Result<()> {
self.ws = WsClient::new(self.addr.as_str()).await?;
self.ws = Self::create_ws_client(self.addr.as_str(), self.config).await?;
Ok(())
}

Expand Down

0 comments on commit ea82501

Please sign in to comment.