Skip to content

Commit

Permalink
Merge pull request #3008 from Pana/migrateTokioRuntime3
Browse files Browse the repository at this point in the history
migrate pubsub used async runtime to tokio 1.40
  • Loading branch information
Pana authored Dec 30, 2024
2 parents b6e34f4 + 9ed5ef1 commit c243c27
Show file tree
Hide file tree
Showing 16 changed files with 272 additions and 553 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ members = [
"crates/util/metrics",
"crates/util/priority-send-queue",
"crates/util/random_crash",
"crates/util/runtime",
"crates/util/sha3-macro",
"crates/util/solidity-abi",
"crates/util/solidity-abi-derive",
Expand Down Expand Up @@ -145,7 +144,6 @@ priority-send-queue = { path = "./crates/util/priority-send-queue" }
heap-map = { path = "./crates/util/heap-map" }
treap-map = { path = "./crates/util/treap-map" }
hibitset = { path = "./crates/util/hibitset" }
runtime = { path = "./crates/util/runtime" }
malloc_size_of = { path = "./crates/util/malloc_size_of" }
delegate = { path = "./crates/util/delegate" }
throttling = { path = "./crates/util/throttling" }
Expand Down Expand Up @@ -307,7 +305,6 @@ tracing-futures = "0.2"
# old version tokio 0.2
tokio02 = { version = "0.2", package = "tokio", features = ["full"] }
tokio01 = { version = "0.1", package = "tokio" }
tokio-timer = "0.2.13"

# crypto & hash
fixed-hash = "0.5"
Expand Down
2 changes: 0 additions & 2 deletions crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ log = { workspace = true }
cfx-types = { workspace = true }
cfx-addr = { workspace = true }
cfx-bytes = { workspace = true }
runtime = { workspace = true }
slab = { workspace = true }
cfxcore = { workspace = true }
network = { workspace = true }
Expand Down Expand Up @@ -113,7 +112,6 @@ cfx-rpc = { workspace = true }
cfx-rpc-utils = { workspace = true }
cfx-rpc-builder = { workspace = true }
jsonrpsee = { workspace = true }
tokio-timer = "0.2"

[dev-dependencies]
criterion = "0.3"
Expand Down
6 changes: 1 addition & 5 deletions crates/client/src/archive/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use cfxcore::{
};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use parking_lot::{Condvar, Mutex};
use runtime::Runtime;
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;

Expand All @@ -30,7 +29,6 @@ pub struct ArchiveClientExtraComponents {
pub rpc_tcp_server: Option<TcpServer>,
pub debug_rpc_ws_server: Option<WsServer>,
pub rpc_ws_server: Option<WsServer>,
pub runtime: Runtime,
pub sync: Arc<SynchronizationService>,
pub txpool: Arc<TransactionPool>,
pub pow: Arc<PowComputer>,
Expand All @@ -39,7 +37,7 @@ pub struct ArchiveClientExtraComponents {
/// Handle to the started ETH RPC server. This is version 2 of the ETH RPC.
/// Which use Rust async I/O
pub eth_rpc_server_handle: Option<RpcServerHandle>,
pub tokio_runtime: TokioRuntime,
pub tokio_runtime: Arc<TokioRuntime>,
}

impl MallocSizeOf for ArchiveClientExtraComponents {
Expand Down Expand Up @@ -77,7 +75,6 @@ impl ArchiveClient {
debug_rpc_ws_server,
rpc_ws_server,
pos_handler,
runtime,
eth_rpc_http_server,
eth_rpc_ws_server,
tokio_runtime,
Expand All @@ -99,7 +96,6 @@ impl ArchiveClient {
rpc_tcp_server,
debug_rpc_ws_server,
rpc_ws_server,
runtime,
sync,
txpool,
pow,
Expand Down
22 changes: 9 additions & 13 deletions crates/client/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ use diem_types::validator_config::{
};
use malloc_size_of::{new_malloc_size_ops, MallocSizeOf, MallocSizeOfOps};
use network::NetworkService;
use runtime::Runtime;
use secret_store::{SecretStore, SharedSecretStore};
use tokio::runtime::Runtime as TokioRuntime;
use txgen::{DirectTransactionGenerator, TransactionGenerator};
Expand Down Expand Up @@ -155,8 +154,8 @@ pub fn initialize_common_modules(
Arc<AccountProvider>,
Arc<Notifications>,
PubSubClient,
Runtime,
EthPubSubClient,
Arc<TokioRuntime>,
),
String,
> {
Expand Down Expand Up @@ -460,16 +459,18 @@ pub fn initialize_common_modules(
pos_verifier.clone(),
));

let runtime = Runtime::with_default_thread_count();
let tokio_runtime =
Arc::new(TokioRuntime::new().map_err(|e| e.to_string())?);

let pubsub = PubSubClient::new(
runtime.executor(),
tokio_runtime.clone(),
consensus.clone(),
notifications.clone(),
*network.get_network_type(),
);

let eth_pubsub = EthPubSubClient::new(
runtime.executor(),
tokio_runtime.clone(),
consensus.clone(),
notifications.clone(),
);
Expand All @@ -489,8 +490,8 @@ pub fn initialize_common_modules(
accounts,
notifications,
pubsub,
runtime,
eth_pubsub,
tokio_runtime,
))
}

Expand All @@ -512,10 +513,9 @@ pub fn initialize_not_light_node_modules(
Option<WSServer>,
Option<WSServer>,
Arc<PosVerifier>,
Runtime,
Option<HttpServer>,
Option<WSServer>,
TokioRuntime,
Arc<TokioRuntime>,
Option<RpcServerHandle>,
),
String,
Expand All @@ -535,8 +535,8 @@ pub fn initialize_not_light_node_modules(
accounts,
_notifications,
pubsub,
runtime,
eth_pubsub,
tokio_runtime,
) = initialize_common_modules(conf, exit.clone(), node_type)?;

let light_provider = Arc::new(LightProvider::new(
Expand Down Expand Up @@ -763,9 +763,6 @@ pub fn initialize_not_light_node_modules(

network.start();

let tokio_runtime =
tokio::runtime::Runtime::new().map_err(|e| e.to_string())?;

let eth_rpc_http_server_addr =
conf.raw_conf.jsonrpc_http_eth_port_v2.map(|port| {
format!("0.0.0.0:{}", port)
Expand Down Expand Up @@ -796,7 +793,6 @@ pub fn initialize_not_light_node_modules(
debug_rpc_ws_server,
rpc_ws_server,
pos_verifier,
runtime,
eth_rpc_http_server,
eth_rpc_ws_server,
tokio_runtime,
Expand Down
6 changes: 1 addition & 5 deletions crates/client/src/full/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use cfxcore::{
};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use parking_lot::{Condvar, Mutex};
use runtime::Runtime;
use std::sync::Arc;
use tokio::runtime::Runtime as TokioRuntime;

Expand All @@ -30,7 +29,6 @@ pub struct FullClientExtraComponents {
pub rpc_tcp_server: Option<TcpServer>,
pub debug_rpc_ws_server: Option<WsServer>,
pub rpc_ws_server: Option<WsServer>,
pub runtime: Runtime,
pub sync: Arc<SynchronizationService>,
pub txpool: Arc<TransactionPool>,
pub pow: Arc<PowComputer>,
Expand All @@ -39,7 +37,7 @@ pub struct FullClientExtraComponents {
/// Handle to the started ETH RPC server. This is version 2 of the ETH RPC.
/// Which use Rust async I/O
pub eth_rpc_server_handle: Option<RpcServerHandle>,
pub tokio_runtime: TokioRuntime,
pub tokio_runtime: Arc<TokioRuntime>,
}

impl MallocSizeOf for FullClientExtraComponents {
Expand Down Expand Up @@ -70,7 +68,6 @@ impl FullClient {
debug_rpc_ws_server,
rpc_ws_server,
pos_handler,
runtime,
eth_rpc_http_server,
eth_rpc_ws_server,
tokio_runtime,
Expand All @@ -88,7 +85,6 @@ impl FullClient {
rpc_tcp_server,
debug_rpc_ws_server,
rpc_ws_server,
runtime,
sync,
txpool,
pow,
Expand Down
5 changes: 1 addition & 4 deletions crates/client/src/light/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use cfxcore::{
TransactionPool,
};
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use runtime::Runtime;

pub struct LightClientExtraComponents {
pub consensus: Arc<ConsensusGraph>,
Expand All @@ -36,7 +35,6 @@ pub struct LightClientExtraComponents {
pub rpc_http_server: Option<HttpServer>,
pub rpc_tcp_server: Option<TcpServer>,
pub rpc_ws_server: Option<WsServer>,
pub runtime: Runtime,
pub secret_store: Arc<SecretStore>,
pub txpool: Arc<TransactionPool>,
pub pow: Arc<PowComputer>,
Expand Down Expand Up @@ -71,8 +69,8 @@ impl LightClient {
accounts,
notifications,
pubsub,
runtime,
eth_pubsub,
_tokio_runtime,
) = initialize_common_modules(
&mut conf,
exit.clone(),
Expand Down Expand Up @@ -183,7 +181,6 @@ impl LightClient {
rpc_http_server,
rpc_tcp_server,
rpc_ws_server,
runtime,
secret_store,
txpool,
pow,
Expand Down
36 changes: 17 additions & 19 deletions crates/client/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fn setup_rpc_apis(
rpc.consensus.clone(),
rpc.tx_pool.clone(),
eth_pubsub.epochs_ordered(),
h.executor.clone(),
pubsub.executor.clone(),
poll_lifetime,
rpc.config.get_logs_filter_max_limit,
h.network.clone(),
Expand Down Expand Up @@ -210,25 +210,23 @@ fn setup_rpc_apis(

if let Some(poll_lifetime) = rpc.config.poll_lifetime_in_seconds
{
if let Some(h) = eth_pubsub.handler().upgrade() {
let filter_client = EthFilterClient::new(
rpc.consensus.clone(),
rpc.tx_pool.clone(),
eth_pubsub.epochs_ordered(),
h.executor.clone(),
poll_lifetime,
rpc.config.get_logs_filter_max_limit,
)
.to_delegate();
let filter_client = EthFilterClient::new(
rpc.consensus.clone(),
rpc.tx_pool.clone(),
eth_pubsub.epochs_ordered(),
eth_pubsub.executor.clone(),
poll_lifetime,
rpc.config.get_logs_filter_max_limit,
)
.to_delegate();

extend_with_interceptor(
&mut handler,
&rpc.config,
filter_client,
throttling_conf,
throttling_section,
);
}
extend_with_interceptor(
&mut handler,
&rpc.config,
filter_client,
throttling_conf,
throttling_section,
);
}
}
Api::Debug => {
Expand Down
8 changes: 3 additions & 5 deletions crates/client/src/rpc/impls/cfx/cfx_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@ use cfxcore::{
ConsensusGraph, ConsensusGraphTrait, SharedConsensusGraph,
SharedTransactionPool,
};
use futures::{FutureExt, TryFutureExt};
use itertools::zip;
use jsonrpc_core::{Error as RpcError, ErrorCode, Result as JsonRpcResult};
use parking_lot::{Mutex, RwLock};
use primitives::{
filter::LogFilter, log_entry::LocalizedLogEntry, BlockReceipts, EpochNumber,
};
use runtime::Executor;
use tokio::runtime::Runtime;

/// Something which provides data that can be filtered over.
pub trait Filterable {
Expand Down Expand Up @@ -101,7 +100,7 @@ impl CfxFilterClient {
/// Creates new Cfx filter client.
pub fn new(
consensus: SharedConsensusGraph, tx_pool: SharedTransactionPool,
epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: Executor,
epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>, executor: Arc<Runtime>,
poll_lifetime: u32, logs_filter_max_limit: Option<usize>,
network: Network,
) -> Self {
Expand All @@ -121,7 +120,7 @@ impl CfxFilterClient {

fn start_epochs_loop(
&self, epochs_ordered: Arc<Channel<(u64, Vec<H256>)>>,
executor: Executor,
executor: Arc<Runtime>,
) {
// subscribe to the `epochs_ordered` channel
let mut receiver = epochs_ordered.subscribe();
Expand Down Expand Up @@ -165,7 +164,6 @@ impl CfxFilterClient {
}
};

let fut = fut.unit_error().boxed().compat();
executor.spawn(fut);
}
}
Expand Down
Loading

0 comments on commit c243c27

Please sign in to comment.