From 24da3f3f674ad651d89e9feaf679c5267e164e0c Mon Sep 17 00:00:00 2001 From: yukang Date: Tue, 9 May 2023 20:09:51 +0800 Subject: [PATCH 01/71] move alert and chain to jsonrpc_utils --- Cargo.lock | 335 +++++++++++++++++++++++++++++---- ckb-bin/src/subcommand/run.rs | 7 +- rpc/Cargo.toml | 7 + rpc/src/lib.rs | 5 +- rpc/src/module/alert.rs | 13 +- rpc/src/module/chain.rs | 101 +++++----- rpc/src/module/mod.rs | 9 +- rpc/src/module/subscription.rs | 20 +- rpc/src/module/test.rs | 40 ++-- rpc/src/server.rs | 139 +++++--------- rpc/src/service_builder.rs | 46 +++-- util/launcher/Cargo.toml | 4 + util/launcher/src/lib.rs | 78 ++++---- 13 files changed, 531 insertions(+), 273 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 88de4180ce..756ae02d27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -86,13 +86,13 @@ checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" [[package]] name = "async-trait" -version = "0.1.52" +version = "0.1.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3" +checksum = "b9ccdd8f2a161be9bd5c023df56f1b2a0bd1d83872ae53b71a84a12c9bf6e842" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.12", ] [[package]] @@ -124,6 +124,59 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "axum" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2fb79c228270dcf2426e74864cabc94babb5dbab01a4314e702d2f16540e1591" +dependencies = [ + "async-trait", + "axum-core", + "base64 0.21.0", + "bitflags", + "bytes 1.4.0", + "futures-util", + "http", + "http-body", + "hyper", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sha1", + "sync_wrapper", + "tokio", + "tokio-tungstenite", + "tower", + "tower-http", + "tower-layer", + "tower-service", +] + +[[package]] +name = "axum-core" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" +dependencies = [ + "async-trait", + "bytes 1.4.0", + "futures-util", + "http", + "http-body", + "mime", + "rustversion", + "tower-layer", + "tower-service", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -139,6 +192,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.0" @@ -163,7 +222,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn", + "syn 1.0.109", "which", ] @@ -472,7 +531,7 @@ dependencies = [ name = "ckb-bin" version = "0.112.0-pre" dependencies = [ - "base64", + "base64 0.21.0", "ckb-app-config", "ckb-async-runtime", "ckb-build-info", @@ -700,7 +759,7 @@ dependencies = [ "ckb-fixed-hash-core", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -792,6 +851,7 @@ dependencies = [ name = "ckb-launcher" version = "0.112.0-pre" dependencies = [ + "async-trait", "ckb-app-config", "ckb-async-runtime", "ckb-block-filter", @@ -824,8 +884,11 @@ dependencies = [ "ckb-types", "ckb-verification", "ckb-verification-traits", + "http-body", + "jsonrpc-utils", "num_cpus", "once_cell", + "quote", "tempfile", ] @@ -957,14 +1020,14 @@ name = "ckb-migration-template" version = "0.112.0-pre" dependencies = [ "quote", - "syn", + "syn 1.0.109", ] [[package]] name = "ckb-miner" version = "0.112.0-pre" dependencies = [ - "base64", + "base64 0.21.0", "ckb-app-config", "ckb-async-runtime", "ckb-channel", @@ -1093,7 +1156,7 @@ version = "0.112.0-pre" dependencies = [ "ckb-occupied-capacity-core", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1171,6 +1234,8 @@ dependencies = [ name = "ckb-rpc" version = "0.112.0-pre" dependencies = [ + "async-trait", + "axum", "ckb-app-config", "ckb-chain", "ckb-chain-spec", @@ -1200,6 +1265,7 @@ dependencies = [ "ckb-util", "ckb-verification", "ckb-verification-traits", + "futures-util", "itertools 0.11.0", "jsonrpc-core", "jsonrpc-derive", @@ -1207,13 +1273,16 @@ dependencies = [ "jsonrpc-pubsub", "jsonrpc-server-utils", "jsonrpc-tcp-server", + "jsonrpc-utils", "jsonrpc-ws-server", "pretty_assertions", + "quote", "reqwest", "serde", "serde_json", "tempfile", "tokio", + "tokio-util 0.7.8", ] [[package]] @@ -1815,7 +1884,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ccc0a48a9b826acdf4028595adc9db92caea352f7af011a3034acd172a52a0aa" dependencies = [ "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1868,7 +1937,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 1.0.109", ] [[package]] @@ -1879,7 +1948,7 @@ checksum = "ddfc69c5bfcbd2fc09a0f38451d2daf0e372e367986a83906d1b0dbc88134fb5" dependencies = [ "darling_core", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -1918,7 +1987,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn", + "syn 1.0.109", ] [[package]] @@ -1998,7 +2067,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2097,7 +2166,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "uuid 0.8.2", ] @@ -2246,7 +2315,7 @@ checksum = "33c1e13800337f4d4d7a316bf45a567dbcb6ffe087f16424852d97e97a91f512" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2521,9 +2590,9 @@ dependencies = [ [[package]] name = "http" -version = "0.2.6" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" +checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ "bytes 1.4.0", "fnv", @@ -2532,15 +2601,21 @@ dependencies = [ [[package]] name = "http-body" -version = "0.4.4" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes 1.4.0", "http", "pin-project-lite", ] +[[package]] +name = "http-range-header" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bfe8eed0a9285ef776bb792479ea3834e8b94e13d615c2f66d03dd50a435a29" + [[package]] name = "httparse" version = "1.8.0" @@ -2820,7 +2895,7 @@ dependencies = [ "proc-macro-crate", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -2885,6 +2960,36 @@ dependencies = [ "tower-service", ] +[[package]] +name = "jsonrpc-utils" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f82ed5157e86d6cd293f7d72d6af1000e8b8dbd8e4dd4493d71d0ffb48226348" +dependencies = [ + "anyhow", + "axum", + "futures-core", + "futures-util", + "hex", + "jsonrpc-core", + "jsonrpc-utils-macros", + "pin-project-lite", + "rand 0.8.5", + "serde_json", + "tokio", +] + +[[package]] +name = "jsonrpc-utils-macros" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8727d2c8bb2833f22c4306879a5cf4cc4a8659170c211e9b523b86aab654f823" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.12", +] + [[package]] name = "jsonrpc-ws-server" version = "18.0.0" @@ -3022,6 +3127,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f" +[[package]] +name = "matchit" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" + [[package]] name = "memchr" version = "2.4.1" @@ -3163,7 +3274,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3274,7 +3385,7 @@ checksum = "621fe0f044729f810c6815cdd77e8f5e0cd803ce4f6a38380ebfc1322af98661" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3309,7 +3420,7 @@ dependencies = [ "numext-fixed-uint-core", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3368,7 +3479,7 @@ checksum = "b501e44f11665960c7e7fcf062c7d96a14ade4aa98116c004b2e37b5be7d736c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3555,11 +3666,31 @@ dependencies = [ "siphasher", ] +[[package]] +name = "pin-project" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c95a7476719eab1e366eaf73d0260af3021184f18177925b07f54b30089ceead" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39407670928234ebc5e6e580247dd567ad73a3578460c5990f9503df207e8f07" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.12", +] + [[package]] name = "pin-project-lite" -version = "0.2.8" +version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e280fbe77cc62c91527259e9442153f4688736748d24660126286329742b4c6c" +checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" [[package]] name = "pin-utils" @@ -3654,7 +3785,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "version_check", ] @@ -3708,7 +3839,7 @@ dependencies = [ "lazy_static", "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -3762,9 +3893,9 @@ checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" [[package]] name = "quote" -version = "1.0.23" +version = "1.0.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8856d8364d252a14d474036ea1358d63c9e6965c8e5c1885c18f73d70bff9c7b" +checksum = "8f4f29d145265ec1c483c7c654450edde0bfe043d3938d6972630663356d9500" dependencies = [ "proc-macro2", ] @@ -3923,7 +4054,7 @@ version = "0.11.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ba30cc2c0cd02af1222ed216ba659cdb2f879dfe3181852fe7c50b1d0005949" dependencies = [ - "base64", + "base64 0.21.0", "bytes 1.4.0", "encoding_rs", "futures-core", @@ -3987,7 +4118,7 @@ checksum = "db74e3fdd29d969a0ec1f8e79171a6f0f71d0429293656901db382d248c4c021" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4064,6 +4195,12 @@ dependencies = [ "windows-sys 0.45.0", ] +[[package]] +name = "rustversion" +version = "1.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f3208ce4d8448b3f3e7d168a73f5e0c43a61e32930de3bceeccedb388b6bf06" + [[package]] name = "rusty-fork" version = "0.3.0" @@ -4124,7 +4261,7 @@ checksum = "aaaae8f38bb311444cfb7f1979af0bc9240d95795f75f9ceddf6a59b79ceffa0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4282,7 +4419,7 @@ checksum = "d7e29c4601e36bcec74a223228dce795f4cd3616341a4af93520ca1a837c087d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4296,6 +4433,15 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7f05c1d5476066defcdfacce1f52fc3cae3af1d3089727100c02ae92e5abbe0" +dependencies = [ + "serde", +] + [[package]] name = "serde_plain" version = "0.3.0" @@ -4329,6 +4475,17 @@ dependencies = [ "opaque-debug 0.2.3", ] +[[package]] +name = "sha1" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "006769ba83e921b3085caa8334186b00cf92b4cb1a6cf4632fbccc8eff5c7549" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.10.3", +] + [[package]] name = "sha2" version = "0.10.1" @@ -4462,6 +4619,23 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79d9531f94112cfc3e4c8f5f02cb2b58f72c97b7efd85f70203cc6d8efda5927" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + [[package]] name = "synstructure" version = "0.12.6" @@ -4470,7 +4644,7 @@ checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", "unicode-xid", ] @@ -4589,7 +4763,7 @@ checksum = "5420d42e90af0c38c3290abcca25b9b3bdf379fc9f55c528f53a269d9c9a267e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4724,7 +4898,7 @@ checksum = "b557f72f448c511a979e2564e55d74e6c4432fc96ff4f6241bc6bded342643b7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", ] [[package]] @@ -4748,6 +4922,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.6.10" @@ -4799,6 +4985,47 @@ dependencies = [ "serde", ] +[[package]] +name = "tower" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +dependencies = [ + "futures-core", + "futures-util", + "pin-project", + "pin-project-lite", + "tokio", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tower-http" +version = "0.3.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f873044bf02dd1e8239e9c1293ea39dad76dc594ec16185d0a1bf31d8dc8d858" +dependencies = [ + "bitflags", + "bytes 1.4.0", + "futures-core", + "futures-util", + "http", + "http-body", + "http-range-header", + "pin-project-lite", + "tower", + "tower-layer", + "tower-service", +] + +[[package]] +name = "tower-layer" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" + [[package]] name = "tower-service" version = "0.3.1" @@ -4812,6 +5039,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2d8d93354fe2a8e50d5953f5ae2e47a3fc2ef03292e7ea46e3cc38f549525fb9" dependencies = [ "cfg-if 1.0.0", + "log", "pin-project-lite", "tracing-core", ] @@ -4876,6 +5104,25 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "tungstenite" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes 1.4.0", + "http", + "httparse", + "log", + "rand 0.8.5", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.15.0" @@ -4988,6 +5235,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "uuid" version = "0.8.2" @@ -5081,7 +5334,7 @@ dependencies = [ "log", "proc-macro2", "quote", - "syn", + "syn 1.0.109", "wasm-bindgen-shared", ] @@ -5115,7 +5368,7 @@ checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5469,6 +5722,6 @@ checksum = "3f8f187641dad4f680d25c4bfc4225b418165984179f26ca76ec4fb6441d3a17" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.109", "synstructure", ] diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index fd6355af33..12dbc4b3da 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,6 +1,6 @@ use crate::helper::deadlock_detection; use ckb_app_config::{ExitCode, RunArgs}; -use ckb_async_runtime::Handle; +use ckb_async_runtime::{tokio, Handle}; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; @@ -45,12 +45,15 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), launcher.start_block_filter(&shared); - let (network_controller, _rpc_server) = launcher.start_network_and_rpc( + let rt = tokio::runtime::Runtime::new().unwrap(); + + let network_controller = launcher.start_network_and_rpc( &shared, chain_controller.clone(), miner_enable, pack.take_relay_tx_receiver(), ); + let network_controller = rt.block_on(network_controller); let tx_pool_builder = pack.take_tx_pool_builder(); tx_pool_builder.start(network_controller.clone()); diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 212b5b2d06..dd7b93e1cc 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -45,6 +45,13 @@ ckb-pow = { path = "../pow", version = "= 0.112.0-pre" } ckb-indexer = { path = "../util/indexer", version = "= 0.112.0-pre" } itertools.workspace = true tokio = "1" +quote = "1.0.27" +async-trait = "0.1" +jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] } +axum = "0.6.1" +tokio-util = { version = "0.7.3", features = ["codec"] } +futures-util = { version = "0.3.21"} + [dev-dependencies] reqwest = { version = "0.11.4", features = ["blocking", "json"] } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index ebbd2227e4..9dbb12bbb0 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -10,9 +10,12 @@ pub mod module; #[cfg(test)] mod tests; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_utils::pub_sub::Session; + pub use crate::error::RPCError; pub use crate::server::RpcServer; pub use crate::service_builder::ServiceBuilder; #[doc(hidden)] -pub type IoHandler = jsonrpc_pubsub::PubSubHandler>; +pub type IoHandler = MetaIoHandler>; \ No newline at end of file diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs index cce4254e0f..0735d85020 100644 --- a/rpc/src/module/alert.rs +++ b/rpc/src/module/alert.rs @@ -6,8 +6,10 @@ use ckb_network_alert::{notifier::Notifier as AlertNotifier, verifier::Verifier use ckb_types::{packed, prelude::*}; use ckb_util::Mutex; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; use std::sync::Arc; +use jsonrpc_utils::rpc; +use async_trait::async_trait; + /// RPC Module Alert for network alerts. /// @@ -15,7 +17,8 @@ use std::sync::Arc; /// /// The alerts must be signed by 2-of-4 signatures, where the public keys are hard-coded in the source code /// and belong to early CKB developers. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait AlertRpc { /// Sends an alert. /// @@ -67,9 +70,10 @@ pub trait AlertRpc { /// } /// ``` #[rpc(name = "send_alert")] - fn send_alert(&self, alert: Alert) -> Result<()>; + async fn send_alert(&self, alert: Alert) -> Result<()>; } +#[derive(Clone)] pub(crate) struct AlertRpcImpl { network_controller: NetworkController, verifier: Arc, @@ -90,8 +94,9 @@ impl AlertRpcImpl { } } +#[async_trait] impl AlertRpc for AlertRpcImpl { - fn send_alert(&self, alert: Alert) -> Result<()> { + async fn send_alert(&self, alert: Alert) -> Result<()> { let alert: packed::Alert = alert.into(); let now_ms = ckb_systemtime::unix_time_as_millis(); let notice_until: u64 = alert.raw().notice_until().unpack(); diff --git a/rpc/src/module/chain.rs b/rpc/src/module/chain.rs index 36e0f4238f..1cf89d4821 100644 --- a/rpc/src/module/chain.rs +++ b/rpc/src/module/chain.rs @@ -26,9 +26,10 @@ use ckb_types::{ use ckb_verification::ScriptVerifier; use ckb_verification::TxVerifyEnv; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; use std::collections::HashSet; use std::sync::Arc; +use jsonrpc_utils::rpc; +use async_trait::async_trait; /// RPC Module Chain for methods related to the canonical chain. /// @@ -52,7 +53,8 @@ use std::sync::Arc; /// * it is found as an output in any transaction in the [canonical chain](#canonical-chain), /// and /// * it is not found as an input in any transaction in the canonical chain. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait ChainRpc { /// Returns the information about a block by hash. /// @@ -178,7 +180,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block")] - fn get_block( + async fn get_block( &self, block_hash: H256, verbosity: Option, @@ -312,7 +314,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_by_number")] - fn get_block_by_number( + async fn get_block_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -391,7 +393,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_header")] - fn get_header( + async fn get_header( &self, block_hash: H256, verbosity: Option, @@ -472,7 +474,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_header_by_number")] - fn get_header_by_number( + async fn get_header_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -526,7 +528,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_filter")] - fn get_block_filter(&self, block_hash: H256) -> Result>; + async fn get_block_filter(&self, block_hash: H256) -> Result>; /// Returns the information about a transaction requested by transaction hash. /// @@ -648,7 +650,7 @@ pub trait ChainRpc { /// ``` /// #[rpc(name = "get_transaction")] - fn get_transaction( + async fn get_transaction( &self, tx_hash: H256, verbosity: Option, @@ -696,7 +698,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_hash")] - fn get_block_hash(&self, block_number: BlockNumber) -> Result>; + async fn get_block_hash(&self, block_number: BlockNumber) -> Result>; /// Returns the header with the highest block number in the [canonical chain](#canonical-chain). /// @@ -762,7 +764,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_tip_header")] - fn get_tip_header(&self, verbosity: Option) -> Result>; + async fn get_tip_header(&self, verbosity: Option) -> Result>; /// Returns the status of a cell. The RPC returns extra information if it is a [live cell](#live-cell). /// @@ -828,7 +830,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_live_cell")] - fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result; + async fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result; /// Returns the highest block number in the [canonical chain](#canonical-chain). /// @@ -858,7 +860,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_tip_block_number")] - fn get_tip_block_number(&self) -> Result; + async fn get_tip_block_number(&self) -> Result; /// Returns the epoch with the highest number in the [canonical chain](#canonical-chain). /// @@ -894,7 +896,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_current_epoch")] - fn get_current_epoch(&self) -> Result; + async fn get_current_epoch(&self) -> Result; /// Returns the epoch in the [canonical chain](#canonical-chain) with the specific epoch number. /// @@ -940,7 +942,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_epoch_by_number")] - fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result>; + async fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result>; /// Returns increased issuance, miner reward, and the total transaction fee of a block. /// @@ -1004,7 +1006,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_economic_state")] - fn get_block_economic_state(&self, block_hash: H256) -> Result>; + async fn get_block_economic_state(&self, block_hash: H256) -> Result>; /// Returns a Merkle proof that transactions are included in a block. /// @@ -1045,7 +1047,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_transaction_proof")] - fn get_transaction_proof( + async fn get_transaction_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1091,7 +1093,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "verify_transaction_proof")] - fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result>; + async fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result>; /// Returns a Merkle proof of transactions' witness included in a block. /// @@ -1137,7 +1139,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_transaction_and_witness_proof")] - fn get_transaction_and_witness_proof( + async fn get_transaction_and_witness_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1188,7 +1190,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "verify_transaction_and_witness_proof")] - fn verify_transaction_and_witness_proof( + async fn verify_transaction_and_witness_proof( &self, tx_proof: TransactionAndWitnessProof, ) -> Result>; @@ -1302,7 +1304,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_fork_block")] - fn get_fork_block( + async fn get_fork_block( &self, block_hash: H256, verbosity: Option, @@ -1396,7 +1398,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_consensus")] - fn get_consensus(&self) -> Result; + async fn get_consensus(&self) -> Result; /// Returns the past median time by block hash. /// @@ -1436,7 +1438,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_median_time")] - fn get_block_median_time(&self, block_hash: H256) -> Result>; + async fn get_block_median_time(&self, block_hash: H256) -> Result>; /// `estimate_cycles` run a transaction and return the execution consumed cycles. /// @@ -1515,7 +1517,7 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "estimate_cycles")] - fn estimate_cycles(&self, tx: Transaction) -> Result; + async fn estimate_cycles(&self, tx: Transaction) -> Result; /// Returns the fee_rate statistics of confirmed blocks on the chain /// @@ -1561,7 +1563,7 @@ pub trait ChainRpc { note = "Please use the RPC method [`get_fee_rate_statistics`](#tymethod.get_fee_rate_statistics) instead" )] #[rpc(name = "get_fee_rate_statics")] - fn get_fee_rate_statics(&self, target: Option) -> Result>; + async fn get_fee_rate_statics(&self, target: Option) -> Result>; /// Returns the fee_rate statistics of confirmed blocks on the chain /// @@ -1603,9 +1605,10 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_fee_rate_statistics")] - fn get_fee_rate_statistics(&self, target: Option) -> Result>; + async fn get_fee_rate_statistics(&self, target: Option) -> Result>; } +#[derive(Clone)] pub(crate) struct ChainRpcImpl { pub shared: Shared, } @@ -1614,8 +1617,10 @@ const DEFAULT_BLOCK_VERBOSITY_LEVEL: u32 = 2; const DEFAULT_HEADER_VERBOSITY_LEVEL: u32 = 1; const DEFAULT_GET_TRANSACTION_VERBOSITY_LEVEL: u32 = 2; +#[async_trait] + impl ChainRpc for ChainRpcImpl { - fn get_block( + async fn get_block( &self, block_hash: H256, verbosity: Option, @@ -1627,7 +1632,7 @@ impl ChainRpc for ChainRpcImpl { self.get_block_by_hash(&snapshot, &block_hash, verbosity, with_cycles) } - fn get_block_by_number( + async fn get_block_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -1653,7 +1658,7 @@ impl ChainRpc for ChainRpcImpl { ret } - fn get_header( + async fn get_header( &self, block_hash: H256, verbosity: Option, @@ -1680,7 +1685,7 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_header_by_number( + async fn get_header_by_number( &self, block_number: BlockNumber, verbosity: Option, @@ -1715,7 +1720,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn get_block_filter(&self, block_hash: H256) -> Result> { + async fn get_block_filter(&self, block_hash: H256) -> Result> { let store = self.shared.store(); let block_hash = block_hash.pack(); if !store.is_main_chain(&block_hash) { @@ -1732,7 +1737,7 @@ impl ChainRpc for ChainRpcImpl { })) } - fn get_transaction( + async fn get_transaction( &self, tx_hash: H256, verbosity: Option, @@ -1765,7 +1770,7 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_block_hash(&self, block_number: BlockNumber) -> Result> { + async fn get_block_hash(&self, block_number: BlockNumber) -> Result> { Ok(self .shared .snapshot() @@ -1773,7 +1778,7 @@ impl ChainRpc for ChainRpcImpl { .map(|h| h.unpack())) } - fn get_tip_header(&self, verbosity: Option) -> Result> { + async fn get_tip_header(&self, verbosity: Option) -> Result> { let verbosity = verbosity .map(|v| v.value()) .unwrap_or(DEFAULT_HEADER_VERBOSITY_LEVEL); @@ -1790,13 +1795,13 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_current_epoch(&self) -> Result { + async fn get_current_epoch(&self) -> Result { Ok(EpochView::from_ext( self.shared.snapshot().epoch_ext().pack(), )) } - fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result> { + async fn get_epoch_by_number(&self, epoch_number: EpochNumber) -> Result> { let snapshot = self.shared.snapshot(); Ok(snapshot .get_epoch_index(epoch_number.into()) @@ -1807,7 +1812,7 @@ impl ChainRpc for ChainRpcImpl { })) } - fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result { + async fn get_live_cell(&self, out_point: OutPoint, with_data: bool) -> Result { let cell_status = self .shared .snapshot() @@ -1816,11 +1821,11 @@ impl ChainRpc for ChainRpcImpl { Ok(cell_status.into()) } - fn get_tip_block_number(&self) -> Result { + async fn get_tip_block_number(&self) -> Result { Ok(self.shared.snapshot().tip_header().number().into()) } - fn get_block_economic_state(&self, block_hash: H256) -> Result> { + async fn get_block_economic_state(&self, block_hash: H256) -> Result> { let snapshot = self.shared.snapshot(); let block_number = if let Some(block_number) = snapshot.get_block_number(&block_hash.pack()) @@ -1888,7 +1893,7 @@ impl ChainRpc for ChainRpcImpl { })) } - fn get_transaction_proof( + async fn get_transaction_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1910,7 +1915,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result> { + async fn verify_transaction_proof(&self, tx_proof: TransactionProof) -> Result> { let snapshot = self.shared.snapshot(); snapshot @@ -1953,7 +1958,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn get_transaction_and_witness_proof( + async fn get_transaction_and_witness_proof( &self, tx_hashes: Vec, block_hash: Option, @@ -1977,7 +1982,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn verify_transaction_and_witness_proof( + async fn verify_transaction_and_witness_proof( &self, tx_proof: TransactionAndWitnessProof, ) -> Result> { @@ -2049,7 +2054,7 @@ impl ChainRpc for ChainRpcImpl { }) } - fn get_fork_block( + async fn get_fork_block( &self, block_hash: H256, verbosity: Option, @@ -2077,12 +2082,12 @@ impl ChainRpc for ChainRpcImpl { } } - fn get_consensus(&self) -> Result { + async fn get_consensus(&self) -> Result { let consensus = self.shared.consensus().clone(); Ok(consensus.into()) } - fn get_block_median_time(&self, block_hash: H256) -> Result> { + async fn get_block_median_time(&self, block_hash: H256) -> Result> { let block_hash = block_hash.pack(); let snapshot = self.shared.snapshot(); if !snapshot.is_main_chain(&block_hash) { @@ -2096,17 +2101,17 @@ impl ChainRpc for ChainRpcImpl { Ok(Some(median_time.into())) } - fn estimate_cycles(&self, tx: Transaction) -> Result { + async fn estimate_cycles(&self, tx: Transaction) -> Result { let tx: packed::Transaction = tx.into(); CyclesEstimator::new(&self.shared).run(tx) } - fn get_fee_rate_statics(&self, target: Option) -> Result> { + async fn get_fee_rate_statics(&self, target: Option) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } - fn get_fee_rate_statistics(&self, target: Option) -> Result> { + async fn get_fee_rate_statistics(&self, target: Option) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index e55ac1bde0..fb40cbb187 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -119,7 +119,7 @@ mod miner; mod net; pub(crate) mod pool; mod stats; -mod subscription; +//mod subscription; mod test; pub(crate) use self::alert::AlertRpcImpl; @@ -131,10 +131,12 @@ pub(crate) use self::miner::MinerRpcImpl; pub(crate) use self::net::NetRpcImpl; pub(crate) use self::pool::PoolRpcImpl; pub(crate) use self::stats::StatsRpcImpl; -pub(crate) use self::subscription::{SubscriptionRpcImpl, SubscriptionSession}; +//pub(crate) use self::subscription::SubscriptionSession; pub(crate) use self::test::IntegrationTestRpcImpl; +pub use self::alert::add_alert_rpc_methods; pub use self::alert::AlertRpc; +pub use self::chain::add_chain_rpc_methods; pub use self::chain::ChainRpc; pub use self::debug::DebugRpc; pub use self::experiment::ExperimentRpc; @@ -143,5 +145,6 @@ pub use self::miner::MinerRpc; pub use self::net::NetRpc; pub use self::pool::PoolRpc; pub use self::stats::StatsRpc; -pub use self::subscription::SubscriptionRpc; +pub use self::test::add_integration_test_rpc_methods; +//pub use self::subscription::SubscriptionRpc; pub use self::test::IntegrationTestRpc; diff --git a/rpc/src/module/subscription.rs b/rpc/src/module/subscription.rs index c5ca5e5160..84d1c62213 100644 --- a/rpc/src/module/subscription.rs +++ b/rpc/src/module/subscription.rs @@ -1,12 +1,17 @@ +use async_trait::async_trait; use ckb_jsonrpc_types::Topic; use ckb_notify::NotifyController; use jsonrpc_core::{Metadata, Result}; -use jsonrpc_derive::rpc; +//use jsonrpc_derive::rpc; use jsonrpc_pubsub::{ typed::{Sink, Subscriber}, - PubSubMetadata, Session, SubscriptionId, + SubscriptionId, }; +use jsonrpc_utils::rpc; + +use jsonrpc_utils::{axum_utils::handle_jsonrpc, pub_sub::Session}; + use std::collections::HashMap; use std::collections::HashSet; use std::sync::Arc; @@ -18,7 +23,7 @@ use tokio::runtime::Handle; const SUBSCRIBER_NAME: &str = "TcpSubscription"; -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct SubscriptionSession { pub(crate) subscription_ids: Arc>>, pub(crate) session: Arc, @@ -35,12 +40,6 @@ impl SubscriptionSession { impl Metadata for SubscriptionSession {} -impl PubSubMetadata for SubscriptionSession { - fn session(&self) -> Option> { - Some(Arc::clone(&self.session)) - } -} - /// RPC Module Subscription that CKB node will push new messages to subscribers. /// /// RPC subscriptions require a full duplex connection. CKB offers such connections in the form of @@ -78,7 +77,8 @@ impl PubSubMetadata for SubscriptionSession { /// socket.send(`{"id": 2, "jsonrpc": "2.0", "method": "unsubscribe", "params": ["0x0"]}`) /// ``` #[allow(clippy::needless_return)] -#[rpc(server)] +#[rpc] +#[async_trait] pub trait SubscriptionRpc { /// Context to implement the subscription RPC. type Metadata; diff --git a/rpc/src/module/test.rs b/rpc/src/module/test.rs index 2f886970b5..493540e1e8 100644 --- a/rpc/src/module/test.rs +++ b/rpc/src/module/test.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_chain::chain::ChainController; use ckb_dao::DaoCalculator; use ckb_jsonrpc_types::{Block, BlockTemplate, Byte32, EpochNumberWithFraction, Transaction}; @@ -20,12 +21,13 @@ use ckb_types::{ }; use ckb_verification_traits::Switch; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::collections::HashSet; use std::sync::Arc; /// RPC for Integration Test. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait IntegrationTestRpc { /// process block without any block verification. /// @@ -105,7 +107,11 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "process_block_without_verify")] - fn process_block_without_verify(&self, data: Block, broadcast: bool) -> Result>; + async fn process_block_without_verify( + &self, + data: Block, + broadcast: bool, + ) -> Result>; /// Truncate chain to specified tip hash, can only truncate less then 50000 blocks each time. /// @@ -138,7 +144,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "truncate")] - fn truncate(&self, target_tip_hash: H256) -> Result<()>; + async fn truncate(&self, target_tip_hash: H256) -> Result<()>; /// Generate block(with verification) and broadcast the block. /// @@ -168,7 +174,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "generate_block")] - fn generate_block(&self) -> Result; + async fn generate_block(&self) -> Result; /// Generate epochs during development, can be useful for scenarios /// like testing DAO-related functionalities. @@ -291,7 +297,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "notify_transaction")] - fn notify_transaction(&self, transaction: Transaction) -> Result; + async fn notify_transaction(&self, transaction: Transaction) -> Result; /// Generate block with block template, attach calculated dao field to build new block, /// @@ -397,7 +403,7 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "generate_block_with_template")] - fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result; + async fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result; /// Return calculated dao field according to specified block template. /// @@ -501,17 +507,23 @@ pub trait IntegrationTestRpc { /// } /// ``` #[rpc(name = "calculate_dao_field")] - fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result; + async fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result; } +#[derive(Clone)] pub(crate) struct IntegrationTestRpcImpl { pub network_controller: NetworkController, pub shared: Shared, pub chain: ChainController, } +#[async_trait] impl IntegrationTestRpc for IntegrationTestRpcImpl { - fn process_block_without_verify(&self, data: Block, broadcast: bool) -> Result> { + async fn process_block_without_verify( + &self, + data: Block, + broadcast: bool, + ) -> Result> { let block: packed::Block = data.into(); let block: Arc = Arc::new(block.into_view()); let ret = self @@ -536,7 +548,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { } } - fn truncate(&self, target_tip_hash: H256) -> Result<()> { + async fn truncate(&self, target_tip_hash: H256) -> Result<()> { let header = { let snapshot = self.shared.snapshot(); let header = snapshot @@ -568,7 +580,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { Ok(()) } - fn generate_block(&self) -> Result { + async fn generate_block(&self) -> Result { let tx_pool = self.shared.tx_pool_controller(); let block_template = tx_pool .get_block_template(None, None, None) @@ -617,8 +629,8 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { Ok(tx_hash.unpack()) } - fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result { - let dao_field = self.calculate_dao_field(block_template.clone())?; + async fn generate_block_with_template(&self, block_template: BlockTemplate) -> Result { + let dao_field = self.calculate_dao_field(block_template.clone()).await?; let mut update_dao_template = block_template; update_dao_template.dao = dao_field; @@ -626,7 +638,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { self.process_and_announce_block(block) } - fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result { + async fn calculate_dao_field(&self, block_template: BlockTemplate) -> Result { let snapshot: &Snapshot = &self.shared.snapshot(); let consensus = snapshot.consensus(); let parent_header = snapshot diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 44d0cd8328..775c204647 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -1,20 +1,17 @@ -use crate::module::{SubscriptionRpc, SubscriptionRpcImpl, SubscriptionSession}; use crate::IoHandler; use ckb_app_config::RpcConfig; -use ckb_logger::info; use ckb_notify::NotifyController; -use jsonrpc_pubsub::Session; -use jsonrpc_server_utils::cors::AccessControlAllowOrigin; -use jsonrpc_server_utils::hosts::DomainsValidation; -use std::net::{SocketAddr, ToSocketAddrs}; +use futures_util::{SinkExt, TryStreamExt}; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_utils::axum_utils::jsonrpc_router; +use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig}; +use std::sync::Arc; +use tokio::net::TcpListener; use tokio::runtime::Handle; +use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; #[doc(hidden)] -pub struct RpcServer { - pub(crate) http: jsonrpc_http_server::Server, - pub(crate) _tcp: Option, - pub(crate) _ws: Option, -} +pub struct RpcServer {} impl RpcServer { /// Creates an RPC server. @@ -24,92 +21,56 @@ impl RpcServer { /// * `config` - RPC config options. /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). /// * `notify_controller` - Controller emitting notifications. - pub fn new( + pub async fn start_jsonrpc_server( config: RpcConfig, io_handler: IoHandler, notify_controller: &NotifyController, handle: Handle, - ) -> RpcServer { - let http = jsonrpc_http_server::ServerBuilder::new(io_handler.clone()) - .cors(DomainsValidation::AllowOnly(vec![ - AccessControlAllowOrigin::Null, - AccessControlAllowOrigin::Any, - ])) - .event_loop_executor(handle.clone()) - .max_request_body_size(config.max_request_body_size) - .health_api(("/ping", "ping")) - .start_http( - &config - .listen_address - .to_socket_addrs() - .expect("config listen_address parsed") - .next() - .expect("config listen_address parsed"), - ) - .expect("Start Jsonrpc HTTP service"); - info!("Listen HTTP RPCServer on address {}", config.listen_address); - - let _tcp = config - .tcp_listen_address - .as_ref() - .map(|tcp_listen_address| { - let subscription_rpc_impl = - SubscriptionRpcImpl::new(notify_controller.clone(), handle.clone()); - let mut handler = io_handler.clone(); - if config.subscription_enable() { - handler.extend_with(subscription_rpc_impl.to_delegate()); - } - let tcp_server = jsonrpc_tcp_server::ServerBuilder::with_meta_extractor( - handler, - |context: &jsonrpc_tcp_server::RequestContext| { - Some(SubscriptionSession::new(Session::new( - context.sender.clone(), - ))) - }, - ) - .start( - &tcp_listen_address - .to_socket_addrs() - .expect("config tcp_listen_address parsed") - .next() - .expect("config tcp_listen_address parsed"), - ) - .expect("Start Jsonrpc TCP service"); - info!("Listen TCP RPCServer on address {}", tcp_listen_address); - - tcp_server - }); + ) -> Result<(), String> { + let rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2); - let _ws = config.ws_listen_address.as_ref().map(|ws_listen_address| { - let subscription_rpc_impl = SubscriptionRpcImpl::new(notify_controller.clone(), handle); - let mut handler = io_handler.clone(); - if config.subscription_enable() { - handler.extend_with(subscription_rpc_impl.to_delegate()); - } - let ws_server = jsonrpc_ws_server::ServerBuilder::with_meta_extractor( - handler, - |context: &jsonrpc_ws_server::RequestContext| { - Some(SubscriptionSession::new(Session::new(context.sender()))) - }, - ) - .start( - &ws_listen_address - .to_socket_addrs() - .expect("config ws_listen_address parsed") - .next() - .expect("config ws_listen_address parsed"), - ) - .expect("Start Jsonrpc WebSocket service"); - info!("Listen WS RPCServer on address {}", ws_listen_address); + let rpc = Arc::new(rpc); + let stream_config = StreamServerConfig::default() + .with_channel_size(4) + .with_pipeline_size(4); - ws_server + // HTTP and WS server. + let ws_config = stream_config.clone().with_keep_alive(true); + let app = jsonrpc_router("/", rpc.clone(), ws_config); + // You can use additional tower-http middlewares to add e.g. CORS. + let http = tokio::spawn(async move { + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + .serve(app.into_make_service()) + .await + .unwrap(); }); - RpcServer { http, _tcp, _ws } - } + // TCP server. + + // TCP server with line delimited json codec. + // + // You can also use other transports (e.g. TLS, unix socket) and codecs + // (e.g. netstring, JSON splitter). + let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap(); + let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024); + while let Ok((s, _)) = listener.accept().await { + let rpc = rpc.clone(); + let stream_config = stream_config.clone(); + let codec = codec.clone(); + tokio::spawn(async move { + let (r, w) = s.into_split(); + let r = FramedRead::new(r, codec.clone()).map_ok(StreamMsg::Str); + let w = FramedWrite::new(w, codec).with(|msg| async move { + Ok::<_, LinesCodecError>(match msg { + StreamMsg::Str(msg) => msg, + _ => "".into(), + }) + }); + tokio::pin!(w); + drop(serve_stream_sink(&rpc, w, r, stream_config).await); + }); + } - /// Gets the HTTP RPC endpoint. - pub fn http_address(&self) -> &SocketAddr { - self.http.address() + Ok(()) } } diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 19372ce8a0..749d45014e 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -1,10 +1,11 @@ #![allow(deprecated)] use crate::error::RPCError; -use crate::module::SubscriptionSession; +//use crate::module::SubscriptionSession; use crate::module::{ - AlertRpc, AlertRpcImpl, ChainRpc, ChainRpcImpl, DebugRpc, DebugRpcImpl, ExperimentRpc, - ExperimentRpcImpl, IndexerRpc, IndexerRpcImpl, IntegrationTestRpc, IntegrationTestRpcImpl, - MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, PoolRpcImpl, StatsRpc, StatsRpcImpl, + add_alert_rpc_methods, add_chain_rpc_methods, add_integration_test_rpc_methods, AlertRpcImpl, + ChainRpcImpl, DebugRpc, DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, + IndexerRpcImpl, IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, + PoolRpcImpl, StatsRpc, StatsRpcImpl, }; use crate::IoHandler; use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig}; @@ -17,6 +18,7 @@ use ckb_shared::shared::Shared; use ckb_sync::SyncShared; use ckb_types::packed::Script; use ckb_util::Mutex; +use jsonrpc_core::MetaIoHandler; use jsonrpc_core::RemoteProcedure; use std::sync::Arc; @@ -26,6 +28,7 @@ const DEPRECATED_RPC_PREFIX: &str = "deprecated."; pub struct ServiceBuilder<'a> { config: &'a RpcConfig, io_handler: IoHandler, + rpc_hander: MetaIoHandler>, } impl<'a> ServiceBuilder<'a> { @@ -34,20 +37,20 @@ impl<'a> ServiceBuilder<'a> { Self { config, io_handler: IoHandler::default(), + rpc_hander: MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), } } /// Mounts methods from module Chain if it is enabled in the config. pub fn enable_chain(mut self, shared: Shared) -> Self { - let rpc_methods = ChainRpcImpl { shared }.to_delegate(); if self.config.chain_enable() { - self.add_methods(rpc_methods); - } else { - self.update_disabled_methods("Chain", rpc_methods); + let methods = ChainRpcImpl { shared }; + add_chain_rpc_methods(&mut self.rpc_hander, methods); } self } + /* /// Mounts methods from module Pool if it is enabled in the config. pub fn enable_pool( mut self, @@ -147,24 +150,21 @@ impl<'a> ServiceBuilder<'a> { network_controller: NetworkController, chain: ChainController, ) -> Self { - let rpc_methods = IntegrationTestRpcImpl { - shared: shared.clone(), - network_controller, - chain, - } - .to_delegate(); - if self.config.integration_test_enable() { // IntegrationTest only on Dummy PoW chain + /* assert_eq!( shared.consensus().pow, Pow::Dummy, "Only run integration test on Dummy PoW chain" ); - - self.add_methods(rpc_methods); - } else { - self.update_disabled_methods("IntegrationTest", rpc_methods); + */ + let methods = IntegrationTestRpcImpl { + shared: shared.clone(), + network_controller, + chain, + }; + add_integration_test_rpc_methods(&mut self.rpc_hander, methods); } self } @@ -176,12 +176,9 @@ impl<'a> ServiceBuilder<'a> { alert_notifier: Arc>, network_controller: NetworkController, ) -> Self { - let rpc_methods = - AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller).to_delegate(); if self.config.alert_enable() { - self.add_methods(rpc_methods); - } else { - self.update_disabled_methods("Alert", rpc_methods); + let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); + add_alert_rpc_methods(&mut self.rpc_hander, methods); } self } @@ -252,6 +249,7 @@ impl<'a> ServiceBuilder<'a> { } })); } + */ /// Builds the RPC methods handler used in the RPC server. pub fn build(self) -> IoHandler { diff --git a/util/launcher/Cargo.toml b/util/launcher/Cargo.toml index a1ec81525d..126bac1813 100644 --- a/util/launcher/Cargo.toml +++ b/util/launcher/Cargo.toml @@ -45,6 +45,10 @@ ckb-hash = { path = "../hash", version = "= 0.112.0-pre" } num_cpus = "1.10" once_cell = "1.8.0" tempfile.workspace = true +quote = "1.0.27" +async-trait = "0.1" +http-body = "0.4.5" +jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] } [dev-dependencies] ckb-systemtime = {path = "../systemtime", version = "= 0.112.0-pre", features = ["enable_faketime"] } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index ad56947f35..1a640db8be 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -38,6 +38,9 @@ use ckb_tx_pool::service::TxVerificationResult; use ckb_types::prelude::*; use ckb_verification::GenesisVerifier; use ckb_verification_traits::Verifier; +use jsonrpc_utils::{ + axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig, +}; use std::sync::Arc; pub use crate::shared_builder::{SharedBuilder, SharedPackage}; @@ -263,13 +266,13 @@ impl Launcher { } /// Start network service and rpc serve - pub fn start_network_and_rpc( + pub async fn start_network_and_rpc( &self, shared: &Shared, chain_controller: ChainController, miner_enable: bool, relay_tx_receiver: Receiver, - ) -> (NetworkController, RpcServer) { + ) -> NetworkController { let sync_shared = Arc::new(SyncShared::with_tmpdir( shared.clone(), self.args.config.network.sync.clone(), @@ -385,47 +388,48 @@ impl Launcher { .expect("Start network service failed"); let rpc_config = self.adjust_rpc_config(); - let builder = ServiceBuilder::new(&rpc_config) - .enable_chain(shared.clone()) - .enable_pool( - shared.clone(), - rpc_config - .extra_well_known_lock_scripts - .iter() - .map(|script| script.clone().into()) - .collect(), - rpc_config - .extra_well_known_type_scripts - .iter() - .map(|script| script.clone().into()) - .collect(), - ) - .enable_miner( - shared.clone(), - network_controller.clone(), - chain_controller.clone(), - miner_enable, - ) - .enable_net(network_controller.clone(), sync_shared) - .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) - .enable_experiment(shared.clone()) - .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) - .enable_indexer( - shared.clone(), - &self.args.config.db, - &self.args.config.indexer, - ) - .enable_debug(); + let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()); + // .enable_pool( + // shared.clone(), + // rpc_config + // .extra_well_known_lock_scripts + // .iter() + // .map(|script| script.clone().into()) + // .collect(), + // rpc_config + // .extra_well_known_type_scripts + // .iter() + // .map(|script| script.clone().into()) + // .collect(), + // ) + // .enable_miner( + // shared.clone(), + // network_controller.clone(), + // chain_controller.clone(), + // miner_enable, + // ) + // .enable_net(network_controller.clone(), sync_shared) + // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) + // .enable_experiment(shared.clone()) + // .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + // .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + // .enable_indexer( + // shared.clone(), + // &self.args.config.db, + // &self.args.config.indexer, + // ) + //.enable_debug(); let io_handler = builder.build(); - let rpc_server = RpcServer::new( + RpcServer::start_jsonrpc_server( rpc_config.clone(), io_handler, shared.notify_controller(), self.async_handle.clone().into_inner(), - ); + ) + .await + .expect("Start rpc server failed"); - (network_controller, rpc_server) + network_controller } } From 90bad30b97d1539d21103c104c2d10ee9ba80e63 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 04:55:04 +0800 Subject: [PATCH 02/71] make the ping router works --- Cargo.lock | 3 +++ Cargo.toml | 3 +++ ckb-bin/src/lib.rs | 9 +++---- ckb-bin/src/subcommand/run.rs | 49 ++++++++++++++++++++++++----------- rpc/src/module/alert.rs | 12 ++++++--- rpc/src/module/test.rs | 2 +- rpc/src/server.rs | 13 +++++----- rpc/src/service_builder.rs | 12 +++++---- src/main.rs | 4 ++- util/launcher/src/lib.rs | 14 +++++----- 10 files changed, 77 insertions(+), 44 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 756ae02d27..848852244d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -456,9 +456,11 @@ dependencies = [ name = "ckb" version = "0.112.0-pre" dependencies = [ + "ckb-async-runtime", "ckb-bin", "ckb-build-info", "tikv-jemallocator", + "tokio", ] [[package]] @@ -4887,6 +4889,7 @@ dependencies = [ "signal-hook-registry", "socket2 0.4.9", "tokio-macros", + "tracing", "windows-sys 0.42.0", ] diff --git a/Cargo.toml b/Cargo.toml index b02a4f2080..a1189a54c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,9 @@ ckb-build-info = { path = "util/build-info", version = "= 0.112.0-pre" } [dependencies] ckb-build-info = { path = "util/build-info", version = "= 0.112.0-pre" } ckb-bin = { path = "ckb-bin", version = "= 0.112.0-pre" } +tokio = { version = "1", features = ["full", "tracing"] } +ckb-async-runtime = { path = "util/runtime", version = "= 0.112.0-pre" } + [dev-dependencies] diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index 8596f504d0..1030d4a86c 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -6,7 +6,7 @@ mod setup_guard; mod subcommand; use ckb_app_config::{cli, ExitCode, Setup}; -use ckb_async_runtime::new_global_runtime; +use ckb_async_runtime::Handle; use ckb_build_info::Version; use ckb_logger::{debug, info}; use ckb_network::tokio; @@ -25,7 +25,7 @@ pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; /// /// * `version` - The version is passed in so the bin crate can collect the version without trigger /// re-linking. -pub fn run_app(version: Version) -> Result<(), ExitCode> { +pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCode> { // Always print backtrace on panic. ::std::env::set_var("RUST_BACKTRACE", "full"); @@ -58,14 +58,13 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { .expect("SubcommandRequiredElseHelp"); let is_silent_logging = is_silent_logging(cmd); - let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(); let setup = Setup::from_matches(bin_name, cmd, matches)?; let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?; raise_fd_limit(); let ret = match cmd { - cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()), + cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()).await, cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()), cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle.clone()), cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle.clone()), @@ -81,7 +80,7 @@ pub fn run_app(version: Version) -> Result<(), ExitCode> { tokio::task::block_in_place(|| { debug!("waiting all tokio tasks done"); - handle_stop_rx.blocking_recv(); + //handle_stop_rx.blocking_recv(); info!("ckb shutdown"); }); } diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 12dbc4b3da..1b5bc48a97 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -1,6 +1,9 @@ use crate::helper::deadlock_detection; use ckb_app_config::{ExitCode, RunArgs}; -use ckb_async_runtime::{tokio, Handle}; +use ckb_async_runtime::{ + tokio::{self, spawn}, + Handle, +}; use ckb_build_info::Version; use ckb_launcher::Launcher; use ckb_logger::info; @@ -8,11 +11,20 @@ use ckb_stop_handler::{broadcast_exit_signals, wait_all_ckb_services_exit}; use ckb_types::core::cell::setup_system_cell_cache; -pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { +// pub fn run( +// args: RunArgs, +// version: Version, +// async_handle: Handle, +// runtime: Runtime, +// ) -> Result<(), ExitCode> { +// //runtime.spawn_blocking(|| run_inner(args, version, async_handle)); +// runtime.block_on(run_inner(args, version, async_handle)) +// } + +pub async fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { deadlock_detection(); info!("ckb version: {}", version); - let mut launcher = Launcher::new(args, version, async_handle); let block_assembler_config = launcher.sanitize_block_assembler_config()?; @@ -45,18 +57,25 @@ pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), launcher.start_block_filter(&shared); - let rt = tokio::runtime::Runtime::new().unwrap(); - - let network_controller = launcher.start_network_and_rpc( - &shared, - chain_controller.clone(), - miner_enable, - pack.take_relay_tx_receiver(), - ); - let network_controller = rt.block_on(network_controller); - - let tx_pool_builder = pack.take_tx_pool_builder(); - tx_pool_builder.start(network_controller.clone()); + let rpc_task = spawn(async move { + let network_controller = launcher + .start_network_and_rpc( + &shared, + chain_controller.clone(), + miner_enable, + pack.take_relay_tx_receiver(), + ) + .await; + eprintln!("network_controller begin to run ...."); + eprintln!("end network_controller run ...."); + + let tx_pool_builder = pack.take_tx_pool_builder(); + tx_pool_builder.start(network_controller.clone()); + }); + + tokio::select! { + _ = rpc_task => {}, + }; ctrlc::set_handler(|| { info!("Trapped exit signal, exiting..."); diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs index 0735d85020..d15686f899 100644 --- a/rpc/src/module/alert.rs +++ b/rpc/src/module/alert.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_jsonrpc_types::Alert; use ckb_logger::error; use ckb_network::{NetworkController, SupportProtocols}; @@ -6,10 +7,8 @@ use ckb_network_alert::{notifier::Notifier as AlertNotifier, verifier::Verifier use ckb_types::{packed, prelude::*}; use ckb_util::Mutex; use jsonrpc_core::Result; -use std::sync::Arc; use jsonrpc_utils::rpc; -use async_trait::async_trait; - +use std::sync::Arc; /// RPC Module Alert for network alerts. /// @@ -71,6 +70,9 @@ pub trait AlertRpc { /// ``` #[rpc(name = "send_alert")] async fn send_alert(&self, alert: Alert) -> Result<()>; + + #[rpc(name = "hello")] + async fn hello(&self) -> Result; } #[derive(Clone)] @@ -126,4 +128,8 @@ impl AlertRpc for AlertRpcImpl { )), } } + + async fn hello(&self) -> Result { + Ok(format!("Hello, Yukang!")) + } } diff --git a/rpc/src/module/test.rs b/rpc/src/module/test.rs index 493540e1e8..9a8aa0aa12 100644 --- a/rpc/src/module/test.rs +++ b/rpc/src/module/test.rs @@ -617,7 +617,7 @@ impl IntegrationTestRpc for IntegrationTestRpcImpl { Ok(current_epoch.full_value().into()) } - fn notify_transaction(&self, tx: Transaction) -> Result { + async fn notify_transaction(&self, tx: Transaction) -> Result { let tx: packed::Transaction = tx.into(); let tx: core::TransactionView = tx.into_view(); let tx_pool = self.shared.tx_pool_controller(); diff --git a/rpc/src/server.rs b/rpc/src/server.rs index 775c204647..d6dbbae889 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -22,36 +22,35 @@ impl RpcServer { /// * `io_handler` - RPC methods handler. See [ServiceBuilder](../service_builder/struct.ServiceBuilder.html). /// * `notify_controller` - Controller emitting notifications. pub async fn start_jsonrpc_server( - config: RpcConfig, io_handler: IoHandler, notify_controller: &NotifyController, handle: Handle, ) -> Result<(), String> { - let rpc = MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2); - - let rpc = Arc::new(rpc); + let rpc = Arc::new(io_handler); let stream_config = StreamServerConfig::default() .with_channel_size(4) .with_pipeline_size(4); // HTTP and WS server. let ws_config = stream_config.clone().with_keep_alive(true); - let app = jsonrpc_router("/", rpc.clone(), ws_config); + let app = jsonrpc_router("/rpc", rpc.clone(), ws_config); // You can use additional tower-http middlewares to add e.g. CORS. let http = tokio::spawn(async move { - axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + axum::Server::bind(&"0.0.0.0:8114".parse().unwrap()) .serve(app.into_make_service()) .await .unwrap(); }); + eprintln!("started http ..........."); + // TCP server. // TCP server with line delimited json codec. // // You can also use other transports (e.g. TLS, unix socket) and codecs // (e.g. netstring, JSON splitter). - let listener = TcpListener::bind("0.0.0.0:3001").await.unwrap(); + let listener = TcpListener::bind("0.0.0.0:8116").await.unwrap(); let codec = LinesCodec::new_with_max_length(2 * 1024 * 1024); while let Ok((s, _)) = listener.accept().await { let rpc = rpc.clone(); diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 749d45014e..f9c90e5341 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -36,7 +36,7 @@ impl<'a> ServiceBuilder<'a> { pub fn new(config: &'a RpcConfig) -> Self { Self { config, - io_handler: IoHandler::default(), + io_handler: IoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), rpc_hander: MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), } } @@ -45,7 +45,7 @@ impl<'a> ServiceBuilder<'a> { pub fn enable_chain(mut self, shared: Shared) -> Self { if self.config.chain_enable() { let methods = ChainRpcImpl { shared }; - add_chain_rpc_methods(&mut self.rpc_hander, methods); + add_chain_rpc_methods(&mut self.io_handler, methods); } self } @@ -167,7 +167,7 @@ impl<'a> ServiceBuilder<'a> { add_integration_test_rpc_methods(&mut self.rpc_hander, methods); } self - } + }*/ /// Mounts methods from module Alert if it is enabled in the config. pub fn enable_alert( @@ -177,12 +177,14 @@ impl<'a> ServiceBuilder<'a> { network_controller: NetworkController, ) -> Self { if self.config.alert_enable() { + eprintln!("enable_alert ............."); let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); add_alert_rpc_methods(&mut self.rpc_hander, methods); } self } + /* /// Mounts methods from module Debug if it is enabled in the config. pub fn enable_debug(mut self) -> Self { if self.config.debug_enable() { @@ -254,8 +256,8 @@ impl<'a> ServiceBuilder<'a> { /// Builds the RPC methods handler used in the RPC server. pub fn build(self) -> IoHandler { let mut io_handler = self.io_handler; - io_handler.add_sync_method("ping", |_| Ok("pong".into())); - + io_handler.add_method("@ping", |_| async move { Ok("pong".into()) }); + eprintln!("build ............."); io_handler } } diff --git a/src/main.rs b/src/main.rs index 15f623d055..24fa39c5dd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,10 +5,12 @@ use ckb_build_info::Version; #[cfg(all(not(target_env = "msvc"), not(target_os = "macos")))] #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; +use ckb_async_runtime::new_global_runtime; fn main() { let version = get_version(); - if let Some(exit_code) = run_app(version).err() { + let (handle, _handle_stop_rx, runtime) = new_global_runtime(); + if let Some(exit_code) = runtime.block_on(run_app(version, handle)).err() { ::std::process::exit(exit_code.into()); } } diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 1a640db8be..0e62f4b4ad 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -29,7 +29,8 @@ use ckb_network::{ use ckb_network_alert::alert_relayer::AlertRelayer; use ckb_proposal_table::ProposalTable; use ckb_resource::Resource; -use ckb_rpc::{RpcServer, ServiceBuilder}; +use ckb_rpc::RpcServer; +use ckb_rpc::ServiceBuilder; use ckb_shared::Shared; use ckb_store::{ChainDB, ChainStore}; @@ -197,9 +198,9 @@ impl Launcher { &self, block_assembler_config: Option, ) -> Result<(Shared, SharedPackage), ExitCode> { - self.async_handle.block_on(observe_listen_port_occupancy( - &self.args.config.network.listen_addresses, - ))?; + // self.async_handle.block_on(observe_listen_port_occupancy( + // &self.args.config.network.listen_addresses, + // ))?; let shared_builder = SharedBuilder::new( &self.args.config.bin_name, @@ -388,7 +389,7 @@ impl Launcher { .expect("Start network service failed"); let rpc_config = self.adjust_rpc_config(); - let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()); + let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()) // .enable_pool( // shared.clone(), // rpc_config @@ -412,7 +413,7 @@ impl Launcher { // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) // .enable_experiment(shared.clone()) // .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - // .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()); // .enable_indexer( // shared.clone(), // &self.args.config.db, @@ -422,7 +423,6 @@ impl Launcher { let io_handler = builder.build(); RpcServer::start_jsonrpc_server( - rpc_config.clone(), io_handler, shared.notify_controller(), self.async_handle.clone().into_inner(), From 26062f8d3590d6488af6e1ba4a39d767a0b40e39 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 12:23:44 +0800 Subject: [PATCH 03/71] fix rpc api router issues --- Cargo.lock | 2 ++ rpc/Cargo.toml | 1 + rpc/src/module/debug.rs | 30 ++++++++++++++------ rpc/src/module/miner.rs | 16 +++++++---- rpc/src/module/mod.rs | 2 ++ rpc/src/server.rs | 21 ++++++++++---- rpc/src/service_builder.rs | 57 ++++++++++++-------------------------- util/launcher/src/lib.rs | 21 ++++++-------- 8 files changed, 78 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 848852244d..3517e75d1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1285,6 +1285,7 @@ dependencies = [ "tempfile", "tokio", "tokio-util 0.7.8", + "tower-http", ] [[package]] @@ -5018,6 +5019,7 @@ dependencies = [ "http-body", "http-range-header", "pin-project-lite", + "tokio", "tower", "tower-layer", "tower-service", diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index dd7b93e1cc..e2e45cd31e 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -51,6 +51,7 @@ jsonrpc-utils = { version = "0.2.0", features = ["server", "macros", "axum"] } axum = "0.6.1" tokio-util = { version = "0.7.3", features = ["codec"] } futures-util = { version = "0.3.21"} +tower-http = { version = "0.3.5", features = ["timeout"] } [dev-dependencies] diff --git a/rpc/src/module/debug.rs b/rpc/src/module/debug.rs index eabf49d87d..dd0bdd1f40 100644 --- a/rpc/src/module/debug.rs +++ b/rpc/src/module/debug.rs @@ -1,15 +1,16 @@ +use async_trait::async_trait; use ckb_jsonrpc_types::{ExtraLoggerConfig, MainLoggerConfig}; use ckb_logger_service::Logger; use jsonrpc_core::{Error, ErrorCode::InternalError, Result}; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::time; - /// RPC Module Debug for internal RPC methods. /// /// **This module is for CKB developers and will not guarantee compatibility.** The methods here /// will be changed or removed without advanced notification. -#[rpc(server)] #[doc(hidden)] +#[rpc] +#[async_trait] pub trait DebugRpc { /// Dumps jemalloc memory profiling information into a file. /// @@ -17,10 +18,10 @@ pub trait DebugRpc { /// /// The RPC returns the path to the dumped file on success or returns an error on failure. #[rpc(name = "jemalloc_profiling_dump")] - fn jemalloc_profiling_dump(&self) -> Result; + async fn jemalloc_profiling_dump(&self) -> Result; /// Changes main logger config options while CKB is running. #[rpc(name = "update_main_logger")] - fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()>; + async fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()>; /// Sets logger config options for extra loggers. /// /// CKB nodes allow setting up extra loggers. These loggers will have their own log files and @@ -32,13 +33,20 @@ pub trait DebugRpc { /// * `config_opt` - Adds a new logger or update an existing logger when this is not null. /// Removes the logger when this is null. #[rpc(name = "set_extra_logger")] - fn set_extra_logger(&self, name: String, config_opt: Option) -> Result<()>; + async fn set_extra_logger( + &self, + name: String, + config_opt: Option, + ) -> Result<()>; } +#[derive(Clone)] pub(crate) struct DebugRpcImpl {} +#[async_trait] + impl DebugRpc for DebugRpcImpl { - fn jemalloc_profiling_dump(&self) -> Result { + async fn jemalloc_profiling_dump(&self) -> Result { let timestamp = time::SystemTime::now() .duration_since(time::SystemTime::UNIX_EPOCH) .unwrap() @@ -54,7 +62,7 @@ impl DebugRpc for DebugRpcImpl { } } - fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()> { + async fn update_main_logger(&self, config: MainLoggerConfig) -> Result<()> { let MainLoggerConfig { filter, to_stdout, @@ -71,7 +79,11 @@ impl DebugRpc for DebugRpcImpl { }) } - fn set_extra_logger(&self, name: String, config_opt: Option) -> Result<()> { + async fn set_extra_logger( + &self, + name: String, + config_opt: Option, + ) -> Result<()> { if let Err(err) = Logger::check_extra_logger_name(&name) { return Err(Error { code: InternalError, diff --git a/rpc/src/module/miner.rs b/rpc/src/module/miner.rs index 5313fc4c34..b163da5438 100644 --- a/rpc/src/module/miner.rs +++ b/rpc/src/module/miner.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_chain::chain::ChainController; use ckb_jsonrpc_types::{Block, BlockTemplate, Uint64, Version}; use ckb_logger::{debug, error, info, warn}; @@ -9,7 +10,7 @@ use ckb_types::{core, packed, prelude::*, H256}; use ckb_verification::HeaderVerifier; use ckb_verification_traits::Verifier; use jsonrpc_core::{Error, Result}; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::collections::HashSet; use std::fmt::Debug; use std::sync::Arc; @@ -18,7 +19,8 @@ use std::sync::Arc; /// /// A miner gets a template from CKB, optionally selects transactions, resolves the PoW puzzle, and /// submits the found new block. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait MinerRpc { /// Returns block template for miners. /// @@ -131,7 +133,7 @@ pub trait MinerRpc { /// } /// ``` #[rpc(name = "get_block_template")] - fn get_block_template( + async fn get_block_template( &self, bytes_limit: Option, proposals_limit: Option, @@ -220,17 +222,19 @@ pub trait MinerRpc { /// } /// ``` #[rpc(name = "submit_block")] - fn submit_block(&self, work_id: String, block: Block) -> Result; + async fn submit_block(&self, work_id: String, block: Block) -> Result; } +#[derive(Clone)] pub(crate) struct MinerRpcImpl { pub network_controller: NetworkController, pub shared: Shared, pub chain: ChainController, } +#[async_trait] impl MinerRpc for MinerRpcImpl { - fn get_block_template( + async fn get_block_template( &self, bytes_limit: Option, proposals_limit: Option, @@ -252,7 +256,7 @@ impl MinerRpc for MinerRpcImpl { }) } - fn submit_block(&self, work_id: String, block: Block) -> Result { + async fn submit_block(&self, work_id: String, block: Block) -> Result { let block: packed::Block = block.into(); let block: Arc = Arc::new(block.into_view()); let header = block.header(); diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index fb40cbb187..f79cbbb84a 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -138,9 +138,11 @@ pub use self::alert::add_alert_rpc_methods; pub use self::alert::AlertRpc; pub use self::chain::add_chain_rpc_methods; pub use self::chain::ChainRpc; +pub use self::debug::add_debug_rpc_methods; pub use self::debug::DebugRpc; pub use self::experiment::ExperimentRpc; pub use self::indexer::IndexerRpc; +pub use self::miner::add_miner_rpc_methods; pub use self::miner::MinerRpc; pub use self::net::NetRpc; pub use self::pool::PoolRpc; diff --git a/rpc/src/server.rs b/rpc/src/server.rs index d6dbbae889..ade1630297 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -1,14 +1,17 @@ use crate::IoHandler; -use ckb_app_config::RpcConfig; +use axum::routing::post; +use axum::{Extension, Router}; use ckb_notify::NotifyController; use futures_util::{SinkExt, TryStreamExt}; -use jsonrpc_core::MetaIoHandler; -use jsonrpc_utils::axum_utils::jsonrpc_router; +use jsonrpc_utils::axum_utils::{handle_jsonrpc, handle_jsonrpc_ws}; +use jsonrpc_utils::pub_sub::Session; use jsonrpc_utils::stream::{serve_stream_sink, StreamMsg, StreamServerConfig}; use std::sync::Arc; +use std::time::Duration; use tokio::net::TcpListener; use tokio::runtime::Handle; use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec, LinesCodecError}; +use tower_http::timeout::TimeoutLayer; #[doc(hidden)] pub struct RpcServer {} @@ -32,10 +35,18 @@ impl RpcServer { .with_pipeline_size(4); // HTTP and WS server. + let method_router = + post(handle_jsonrpc::>).get(handle_jsonrpc_ws::>); let ws_config = stream_config.clone().with_keep_alive(true); - let app = jsonrpc_router("/rpc", rpc.clone(), ws_config); + let app = Router::new() + .route("/", method_router.clone()) + .route("/*path", method_router) + .layer(Extension(rpc.clone())) + .layer(Extension(ws_config)) + .layer(TimeoutLayer::new(Duration::from_secs(30))); + // You can use additional tower-http middlewares to add e.g. CORS. - let http = tokio::spawn(async move { + let _http = tokio::spawn(async move { axum::Server::bind(&"0.0.0.0:8114".parse().unwrap()) .serve(app.into_make_service()) .await diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index f9c90e5341..a804564d57 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -1,11 +1,11 @@ #![allow(deprecated)] -use crate::error::RPCError; //use crate::module::SubscriptionSession; use crate::module::{ - add_alert_rpc_methods, add_chain_rpc_methods, add_integration_test_rpc_methods, AlertRpcImpl, - ChainRpcImpl, DebugRpc, DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, - IndexerRpcImpl, IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, - PoolRpcImpl, StatsRpc, StatsRpcImpl, + add_alert_rpc_methods, add_chain_rpc_methods, add_debug_rpc_methods, + add_integration_test_rpc_methods, add_miner_rpc_methods, AlertRpcImpl, ChainRpcImpl, DebugRpc, + DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, IndexerRpcImpl, + IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, PoolRpcImpl, + StatsRpc, StatsRpcImpl, }; use crate::IoHandler; use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig}; @@ -28,7 +28,6 @@ const DEPRECATED_RPC_PREFIX: &str = "deprecated."; pub struct ServiceBuilder<'a> { config: &'a RpcConfig, io_handler: IoHandler, - rpc_hander: MetaIoHandler>, } impl<'a> ServiceBuilder<'a> { @@ -37,7 +36,6 @@ impl<'a> ServiceBuilder<'a> { Self { config, io_handler: IoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), - rpc_hander: MetaIoHandler::with_compatibility(jsonrpc_core::Compatibility::V2), } } @@ -71,6 +69,7 @@ impl<'a> ServiceBuilder<'a> { } self } + */ /// Mounts methods from module Miner if `enable` is `true` and it is enabled in the config. pub fn enable_miner( @@ -84,16 +83,16 @@ impl<'a> ServiceBuilder<'a> { shared, chain, network_controller, - } - .to_delegate(); + }; if enable && self.config.miner_enable() { - self.add_methods(rpc_methods); + add_miner_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Miner", rpc_methods); + //self.update_disabled_methods("Miner", rpc_methods); } self } + /* /// Mounts methods from module Net if it is enabled in the config. pub fn enable_net( mut self, @@ -142,6 +141,7 @@ impl<'a> ServiceBuilder<'a> { } self } + */ /// Mounts methods from module Integration if it is enabled in the config. pub fn enable_integration_test( @@ -164,10 +164,10 @@ impl<'a> ServiceBuilder<'a> { network_controller, chain, }; - add_integration_test_rpc_methods(&mut self.rpc_hander, methods); + add_integration_test_rpc_methods(&mut self.io_handler, methods); } self - }*/ + } /// Mounts methods from module Alert if it is enabled in the config. pub fn enable_alert( @@ -179,20 +179,21 @@ impl<'a> ServiceBuilder<'a> { if self.config.alert_enable() { eprintln!("enable_alert ............."); let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); - add_alert_rpc_methods(&mut self.rpc_hander, methods); + add_alert_rpc_methods(&mut self.io_handler, methods); } self } - /* /// Mounts methods from module Debug if it is enabled in the config. pub fn enable_debug(mut self) -> Self { if self.config.debug_enable() { - self.io_handler.extend_with(DebugRpcImpl {}.to_delegate()); + //self.io_handler.extend_with(DebugRpcImpl {}.to_delegate()); + add_debug_rpc_methods(&mut self.io_handler, DebugRpcImpl {}); } self } + /* /// Mounts methods from module Indexer if it is enabled in the config. pub fn enable_indexer( mut self, @@ -227,30 +228,6 @@ impl<'a> ServiceBuilder<'a> { ) }); } - - fn add_methods(&mut self, rpc_methods: I) - where - I: IntoIterator>)>, - { - let enable_deprecated_rpc = self.config.enable_deprecated_rpc; - self.io_handler - .extend_with(rpc_methods.into_iter().map(|(name, method)| { - if let Some(deprecated_method_name) = name.strip_prefix(DEPRECATED_RPC_PREFIX) { - ( - deprecated_method_name.to_owned(), - if enable_deprecated_rpc { - method - } else { - RemoteProcedure::Method(Arc::new(|_param, _meta| async { - Err(RPCError::rpc_method_is_deprecated()) - })) - }, - ) - } else { - (name, method) - } - })); - } */ /// Builds the RPC methods handler used in the RPC server. diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 0e62f4b4ad..9cddfecfc5 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -39,9 +39,6 @@ use ckb_tx_pool::service::TxVerificationResult; use ckb_types::prelude::*; use ckb_verification::GenesisVerifier; use ckb_verification_traits::Verifier; -use jsonrpc_utils::{ - axum_utils::jsonrpc_router, pub_sub::PublishMsg, rpc, stream::StreamServerConfig, -}; use std::sync::Arc; pub use crate::shared_builder::{SharedBuilder, SharedPackage}; @@ -403,23 +400,23 @@ impl Launcher { // .map(|script| script.clone().into()) // .collect(), // ) - // .enable_miner( - // shared.clone(), - // network_controller.clone(), - // chain_controller.clone(), - // miner_enable, - // ) + .enable_miner( + shared.clone(), + network_controller.clone(), + chain_controller.clone(), + miner_enable, + ) // .enable_net(network_controller.clone(), sync_shared) // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) // .enable_experiment(shared.clone()) - // .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()); + .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) // .enable_indexer( // shared.clone(), // &self.args.config.db, // &self.args.config.indexer, // ) - //.enable_debug(); + .enable_debug(); let io_handler = builder.build(); RpcServer::start_jsonrpc_server( From 99922fd519b268d71d8c1dc3e58e4565c33fccf5 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 17:29:08 +0800 Subject: [PATCH 04/71] migrate more rpc apis to jsonrpc_utils --- rpc/src/lib.rs | 2 +- rpc/src/module/experiment.rs | 16 +++++---- rpc/src/module/indexer.rs | 24 ++++++++------ rpc/src/module/mod.rs | 5 +++ rpc/src/module/net.rs | 48 ++++++++++++++------------- rpc/src/module/pool.rs | 32 ++++++++++-------- rpc/src/module/stats.rs | 16 +++++---- rpc/src/server.rs | 4 +-- rpc/src/service_builder.rs | 55 ++++++++++++------------------- util/indexer/src/service.rs | 1 + util/launcher/src/lib.rs | 63 ++++++++++++++++++------------------ 11 files changed, 139 insertions(+), 127 deletions(-) diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 9dbb12bbb0..812df193ee 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -18,4 +18,4 @@ pub use crate::server::RpcServer; pub use crate::service_builder::ServiceBuilder; #[doc(hidden)] -pub type IoHandler = MetaIoHandler>; \ No newline at end of file +pub type IoHandler = MetaIoHandler>; diff --git a/rpc/src/module/experiment.rs b/rpc/src/module/experiment.rs index 0486c9009a..6f8491b600 100644 --- a/rpc/src/module/experiment.rs +++ b/rpc/src/module/experiment.rs @@ -1,5 +1,6 @@ use crate::error::RPCError; use crate::module::chain::CyclesEstimator; +use async_trait::async_trait; use ckb_dao::DaoCalculator; use ckb_jsonrpc_types::{ Capacity, DaoWithdrawingCalculationKind, EstimateCycles, OutPoint, Transaction, @@ -8,14 +9,15 @@ use ckb_shared::{shared::Shared, Snapshot}; use ckb_store::ChainStore; use ckb_types::{core, packed, prelude::*}; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; /// RPC Module Experiment for experimenting methods. /// /// **EXPERIMENTAL warning** /// /// The methods here may be removed or changed in future releases without prior notifications. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait ExperimentRpc { /// Dry run a transaction and return the execution cycles. /// @@ -98,7 +100,7 @@ pub trait ExperimentRpc { note = "Please use the RPC method [`estimate_cycles`](#tymethod.estimate_cycles) instead" )] #[rpc(name = "dry_run_transaction")] - fn dry_run_transaction(&self, tx: Transaction) -> Result; + async fn dry_run_transaction(&self, tx: Transaction) -> Result; /// Calculates the maximum withdrawal one can get, given a referenced DAO cell, and /// a withdrawing block hash. @@ -155,24 +157,26 @@ pub trait ExperimentRpc { /// } /// ``` #[rpc(name = "calculate_dao_maximum_withdraw")] - fn calculate_dao_maximum_withdraw( + async fn calculate_dao_maximum_withdraw( &self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind, ) -> Result; } +#[derive(Clone)] pub(crate) struct ExperimentRpcImpl { pub shared: Shared, } +#[async_trait] impl ExperimentRpc for ExperimentRpcImpl { - fn dry_run_transaction(&self, tx: Transaction) -> Result { + async fn dry_run_transaction(&self, tx: Transaction) -> Result { let tx: packed::Transaction = tx.into(); CyclesEstimator::new(&self.shared).run(tx) } - fn calculate_dao_maximum_withdraw( + async fn calculate_dao_maximum_withdraw( &self, out_point: OutPoint, kind: DaoWithdrawingCalculationKind, diff --git a/rpc/src/module/indexer.rs b/rpc/src/module/indexer.rs index 02e93b1c8b..a0bdf67bb4 100644 --- a/rpc/src/module/indexer.rs +++ b/rpc/src/module/indexer.rs @@ -1,14 +1,16 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_indexer::IndexerHandle; use ckb_jsonrpc_types::{ IndexerCell, IndexerCellsCapacity, IndexerOrder, IndexerPagination, IndexerSearchKey, IndexerTip, IndexerTx, JsonBytes, Uint32, }; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; /// RPC Module Indexer. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait IndexerRpc { /// Returns the indexed tip /// @@ -41,7 +43,7 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_indexer_tip")] - fn get_indexer_tip(&self) -> Result>; + async fn get_indexer_tip(&self) -> Result>; /// Returns the live cells collection by the lock or type script. /// @@ -387,7 +389,7 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_cells")] - fn get_cells( + async fn get_cells( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -799,7 +801,7 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_transactions")] - fn get_transactions( + async fn get_transactions( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -862,12 +864,13 @@ pub trait IndexerRpc { /// } /// ``` #[rpc(name = "get_cells_capacity")] - fn get_cells_capacity( + async fn get_cells_capacity( &self, search_key: IndexerSearchKey, ) -> Result>; } +#[derive(Clone)] pub(crate) struct IndexerRpcImpl { pub(crate) handle: IndexerHandle, } @@ -878,14 +881,15 @@ impl IndexerRpcImpl { } } +#[async_trait] impl IndexerRpc for IndexerRpcImpl { - fn get_indexer_tip(&self) -> Result> { + async fn get_indexer_tip(&self) -> Result> { self.handle .get_indexer_tip() .map_err(|e| RPCError::custom(RPCError::Indexer, e)) } - fn get_cells( + async fn get_cells( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -897,7 +901,7 @@ impl IndexerRpc for IndexerRpcImpl { .map_err(|e| RPCError::custom(RPCError::Indexer, e)) } - fn get_transactions( + async fn get_transactions( &self, search_key: IndexerSearchKey, order: IndexerOrder, @@ -909,7 +913,7 @@ impl IndexerRpc for IndexerRpcImpl { .map_err(|e| RPCError::custom(RPCError::Indexer, e)) } - fn get_cells_capacity( + async fn get_cells_capacity( &self, search_key: IndexerSearchKey, ) -> Result> { diff --git a/rpc/src/module/mod.rs b/rpc/src/module/mod.rs index f79cbbb84a..0764848246 100644 --- a/rpc/src/module/mod.rs +++ b/rpc/src/module/mod.rs @@ -140,12 +140,17 @@ pub use self::chain::add_chain_rpc_methods; pub use self::chain::ChainRpc; pub use self::debug::add_debug_rpc_methods; pub use self::debug::DebugRpc; +pub use self::experiment::add_experiment_rpc_methods; pub use self::experiment::ExperimentRpc; +pub use self::indexer::add_indexer_rpc_methods; pub use self::indexer::IndexerRpc; pub use self::miner::add_miner_rpc_methods; pub use self::miner::MinerRpc; +pub use self::net::add_net_rpc_methods; pub use self::net::NetRpc; +pub use self::pool::add_pool_rpc_methods; pub use self::pool::PoolRpc; +pub use self::stats::add_stats_rpc_methods; pub use self::stats::StatsRpc; pub use self::test::add_integration_test_rpc_methods; //pub use self::subscription::SubscriptionRpc; diff --git a/rpc/src/module/net.rs b/rpc/src/module/net.rs index bef8e6d91c..602b2b85d6 100644 --- a/rpc/src/module/net.rs +++ b/rpc/src/module/net.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_jsonrpc_types::{ BannedAddr, LocalNode, LocalNodeProtocol, NodeAddress, PeerSyncState, RemoteNode, RemoteNodeProtocol, SyncState, Timestamp, @@ -7,14 +8,15 @@ use ckb_network::{extract_peer_id, multiaddr::Multiaddr, NetworkController}; use ckb_sync::SyncShared; use ckb_systemtime::unix_time_as_millis; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::sync::Arc; const MAX_ADDRS: usize = 50; const DEFAULT_BAN_DURATION: u64 = 24 * 60 * 60 * 1000; // 1 day /// RPC Module Net for P2P network. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait NetRpc { /// Returns the local node information. /// @@ -74,7 +76,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "local_node_info")] - fn local_node_info(&self) -> Result; + async fn local_node_info(&self) -> Result; /// Returns the connected peers' information. /// @@ -218,7 +220,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "get_peers")] - fn get_peers(&self) -> Result>; + async fn get_peers(&self) -> Result>; /// Returns all banned IPs/Subnets. /// @@ -252,7 +254,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "get_banned_addresses")] - fn get_banned_addresses(&self) -> Result>; + async fn get_banned_addresses(&self) -> Result>; /// Clears all banned IPs/Subnets. /// @@ -279,7 +281,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "clear_banned_addresses")] - fn clear_banned_addresses(&self) -> Result<()>; + async fn clear_banned_addresses(&self) -> Result<()>; /// Inserts or deletes an IP/Subnet from the banned list /// @@ -328,7 +330,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "set_ban")] - fn set_ban( + async fn set_ban( &self, address: String, command: String, @@ -371,7 +373,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "sync_state")] - fn sync_state(&self) -> Result; + async fn sync_state(&self) -> Result; /// Disable/enable all p2p network activity /// @@ -404,7 +406,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "set_network_active")] - fn set_network_active(&self, state: bool) -> Result<()>; + async fn set_network_active(&self, state: bool) -> Result<()>; /// Attempts to add a node to the peers list and try connecting to it. /// @@ -464,7 +466,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "add_node")] - fn add_node(&self, peer_id: String, address: String) -> Result<()>; + async fn add_node(&self, peer_id: String, address: String) -> Result<()>; /// Attempts to remove a node from the peers list and try disconnecting from it. /// @@ -501,7 +503,7 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "remove_node")] - fn remove_node(&self, peer_id: String) -> Result<()>; + async fn remove_node(&self, peer_id: String) -> Result<()>; /// Requests that a ping is sent to all connected peers, to measure ping time. /// @@ -528,16 +530,18 @@ pub trait NetRpc { /// } /// ``` #[rpc(name = "ping_peers")] - fn ping_peers(&self) -> Result<()>; + async fn ping_peers(&self) -> Result<()>; } +#[derive(Clone)] pub(crate) struct NetRpcImpl { pub network_controller: NetworkController, pub sync_shared: Arc, } +#[async_trait] impl NetRpc for NetRpcImpl { - fn local_node_info(&self) -> Result { + async fn local_node_info(&self) -> Result { Ok(LocalNode { version: self.network_controller.version().to_owned(), node_id: self.network_controller.node_id(), @@ -565,7 +569,7 @@ impl NetRpc for NetRpcImpl { }) } - fn get_peers(&self) -> Result> { + async fn get_peers(&self) -> Result> { let peers: Vec = self .network_controller .connected_peers() @@ -652,7 +656,7 @@ impl NetRpc for NetRpcImpl { Ok(peers) } - fn get_banned_addresses(&self) -> Result> { + async fn get_banned_addresses(&self) -> Result> { Ok(self .network_controller .get_banned_addrs() @@ -666,12 +670,12 @@ impl NetRpc for NetRpcImpl { .collect()) } - fn clear_banned_addresses(&self) -> Result<()> { + async fn clear_banned_addresses(&self) -> Result<()> { self.network_controller.clear_banned_addrs(); Ok(()) } - fn set_ban( + async fn set_ban( &self, address: String, command: String, @@ -709,7 +713,7 @@ impl NetRpc for NetRpcImpl { } } - fn sync_state(&self) -> Result { + async fn sync_state(&self) -> Result { let chain = self.sync_shared.active_chain(); let state = chain.shared().state(); let (fast_time, normal_time, low_time) = state.read_inflight_blocks().division_point(); @@ -729,12 +733,12 @@ impl NetRpc for NetRpcImpl { Ok(sync_state) } - fn set_network_active(&self, state: bool) -> Result<()> { + async fn set_network_active(&self, state: bool) -> Result<()> { self.network_controller.set_active(state); Ok(()) } - fn add_node(&self, peer_id: String, address: String) -> Result<()> { + async fn add_node(&self, peer_id: String, address: String) -> Result<()> { if let Ok(multiaddr) = address.parse::() { if extract_peer_id(&multiaddr).is_some() { self.network_controller.add_node(multiaddr) @@ -745,14 +749,14 @@ impl NetRpc for NetRpcImpl { Ok(()) } - fn remove_node(&self, peer_id: String) -> Result<()> { + async fn remove_node(&self, peer_id: String) -> Result<()> { if let Ok(id) = peer_id.parse() { self.network_controller.remove_node(&id) } Ok(()) } - fn ping_peers(&self) -> Result<()> { + async fn ping_peers(&self) -> Result<()> { self.network_controller.ping_peers(); Ok(()) } diff --git a/rpc/src/module/pool.rs b/rpc/src/module/pool.rs index 1fe3e45de6..d554cc81fb 100644 --- a/rpc/src/module/pool.rs +++ b/rpc/src/module/pool.rs @@ -1,4 +1,5 @@ use crate::error::RPCError; +use async_trait::async_trait; use ckb_chain_spec::consensus::Consensus; use ckb_constant::hardfork::{mainnet, testnet}; use ckb_jsonrpc_types::{OutputsValidator, RawTxPool, Script, Transaction, TxPoolInfo}; @@ -7,11 +8,12 @@ use ckb_shared::shared::Shared; use ckb_types::{core, packed, prelude::*, H256}; use ckb_verification::{Since, SinceMetric}; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::sync::Arc; /// RPC Module Pool for transaction memory pool. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait PoolRpc { /// Submits a new transaction into the transaction pool. If the transaction is already in the /// pool, rebroadcast it to peers. @@ -99,7 +101,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "send_transaction")] - fn send_transaction( + async fn send_transaction( &self, tx: Transaction, outputs_validator: Option, @@ -140,7 +142,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "remove_transaction")] - fn remove_transaction(&self, tx_hash: H256) -> Result; + async fn remove_transaction(&self, tx_hash: H256) -> Result; /// Returns the transaction pool information. /// @@ -180,7 +182,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "tx_pool_info")] - fn tx_pool_info(&self) -> Result; + async fn tx_pool_info(&self) -> Result; /// Removes all transactions from the transaction pool. /// @@ -207,7 +209,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "clear_tx_pool")] - fn clear_tx_pool(&self) -> Result<()>; + async fn clear_tx_pool(&self) -> Result<()>; /// Returns all transaction ids in tx pool as a json array of string transaction ids. /// ## Params @@ -251,7 +253,7 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "get_raw_tx_pool")] - fn get_raw_tx_pool(&self, verbose: Option) -> Result; + async fn get_raw_tx_pool(&self, verbose: Option) -> Result; /// Returns whether tx-pool service is started, ready for request. /// @@ -278,9 +280,10 @@ pub trait PoolRpc { /// } /// ``` #[rpc(name = "tx_pool_ready")] - fn tx_pool_ready(&self) -> Result; + async fn tx_pool_ready(&self) -> Result; } +#[derive(Clone)] pub(crate) struct PoolRpcImpl { shared: Shared, well_known_lock_scripts: Vec, @@ -385,13 +388,14 @@ fn build_well_known_type_scripts(chain_spec_name: &str) -> Vec { }).expect("checked json str").into_iter().map(Into::into).collect() } +#[async_trait] impl PoolRpc for PoolRpcImpl { - fn tx_pool_ready(&self) -> Result { + async fn tx_pool_ready(&self) -> Result { let tx_pool = self.shared.tx_pool_controller(); Ok(tx_pool.service_started()) } - fn send_transaction( + async fn send_transaction( &self, tx: Transaction, outputs_validator: Option, @@ -434,7 +438,7 @@ impl PoolRpc for PoolRpcImpl { } } - fn remove_transaction(&self, tx_hash: H256) -> Result { + async fn remove_transaction(&self, tx_hash: H256) -> Result { let tx_pool = self.shared.tx_pool_controller(); tx_pool.remove_local_tx(tx_hash.pack()).map_err(|e| { @@ -443,7 +447,7 @@ impl PoolRpc for PoolRpcImpl { }) } - fn tx_pool_info(&self) -> Result { + async fn tx_pool_info(&self) -> Result { let tx_pool = self.shared.tx_pool_controller(); let get_tx_pool_info = tx_pool.get_tx_pool_info(); if let Err(e) = get_tx_pool_info { @@ -456,7 +460,7 @@ impl PoolRpc for PoolRpcImpl { Ok(tx_pool_info.into()) } - fn clear_tx_pool(&self) -> Result<()> { + async fn clear_tx_pool(&self) -> Result<()> { let snapshot = Arc::clone(&self.shared.snapshot()); let tx_pool = self.shared.tx_pool_controller(); tx_pool @@ -466,7 +470,7 @@ impl PoolRpc for PoolRpcImpl { Ok(()) } - fn get_raw_tx_pool(&self, verbose: Option) -> Result { + async fn get_raw_tx_pool(&self, verbose: Option) -> Result { let tx_pool = self.shared.tx_pool_controller(); let raw = if verbose.unwrap_or(false) { diff --git a/rpc/src/module/stats.rs b/rpc/src/module/stats.rs index b4162b0e40..c9591cda9f 100644 --- a/rpc/src/module/stats.rs +++ b/rpc/src/module/stats.rs @@ -1,3 +1,4 @@ +use async_trait::async_trait; use ckb_jsonrpc_types::{AlertMessage, ChainInfo, DeploymentInfo, DeploymentPos, DeploymentsInfo}; use ckb_network_alert::notifier::Notifier as AlertNotifier; use ckb_shared::shared::Shared; @@ -5,12 +6,13 @@ use ckb_traits::HeaderFieldsProvider; use ckb_types::prelude::Unpack; use ckb_util::Mutex; use jsonrpc_core::Result; -use jsonrpc_derive::rpc; +use jsonrpc_utils::rpc; use std::collections::BTreeMap; use std::sync::Arc; /// RPC Module Stats for getting various statistic data. -#[rpc(server)] +#[rpc] +#[async_trait] pub trait StatsRpc { /// Returns statistics about the chain. /// @@ -51,7 +53,7 @@ pub trait StatsRpc { /// } /// ``` #[rpc(name = "get_blockchain_info")] - fn get_blockchain_info(&self) -> Result; + async fn get_blockchain_info(&self) -> Result; /// Returns statistics about the chain. /// @@ -96,16 +98,18 @@ pub trait StatsRpc { /// } /// ``` #[rpc(name = "get_deployments_info")] - fn get_deployments_info(&self) -> Result; + async fn get_deployments_info(&self) -> Result; } +#[derive(Clone)] pub(crate) struct StatsRpcImpl { pub shared: Shared, pub alert_notifier: Arc>, } +#[async_trait] impl StatsRpc for StatsRpcImpl { - fn get_blockchain_info(&self) -> Result { + async fn get_blockchain_info(&self) -> Result { let chain = self.shared.consensus().id.clone(); let (tip_header, median_time) = { let snapshot = self.shared.snapshot(); @@ -147,7 +151,7 @@ impl StatsRpc for StatsRpcImpl { }) } - fn get_deployments_info(&self) -> Result { + async fn get_deployments_info(&self) -> Result { let snapshot = self.shared.snapshot(); let deployments: BTreeMap = self .shared diff --git a/rpc/src/server.rs b/rpc/src/server.rs index ade1630297..f83e197728 100644 --- a/rpc/src/server.rs +++ b/rpc/src/server.rs @@ -26,8 +26,8 @@ impl RpcServer { /// * `notify_controller` - Controller emitting notifications. pub async fn start_jsonrpc_server( io_handler: IoHandler, - notify_controller: &NotifyController, - handle: Handle, + _notify_controller: &NotifyController, + _handle: Handle, ) -> Result<(), String> { let rpc = Arc::new(io_handler); let stream_config = StreamServerConfig::default() diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index a804564d57..4d0b540401 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -2,12 +2,12 @@ //use crate::module::SubscriptionSession; use crate::module::{ add_alert_rpc_methods, add_chain_rpc_methods, add_debug_rpc_methods, - add_integration_test_rpc_methods, add_miner_rpc_methods, AlertRpcImpl, ChainRpcImpl, DebugRpc, - DebugRpcImpl, ExperimentRpc, ExperimentRpcImpl, IndexerRpc, IndexerRpcImpl, - IntegrationTestRpcImpl, MinerRpc, MinerRpcImpl, NetRpc, NetRpcImpl, PoolRpc, PoolRpcImpl, - StatsRpc, StatsRpcImpl, + add_experiment_rpc_methods, add_indexer_rpc_methods, add_integration_test_rpc_methods, + add_miner_rpc_methods, add_net_rpc_methods, add_pool_rpc_methods, add_stats_rpc_methods, + AlertRpcImpl, ChainRpcImpl, DebugRpcImpl, ExperimentRpcImpl, IndexerRpcImpl, + IntegrationTestRpcImpl, MinerRpcImpl, NetRpcImpl, PoolRpcImpl, StatsRpcImpl, }; -use crate::IoHandler; +use crate::{IoHandler, RPCError}; use ckb_app_config::{DBConfig, IndexerConfig, RpcConfig}; use ckb_chain::chain::ChainController; use ckb_indexer::IndexerService; @@ -18,8 +18,6 @@ use ckb_shared::shared::Shared; use ckb_sync::SyncShared; use ckb_types::packed::Script; use ckb_util::Mutex; -use jsonrpc_core::MetaIoHandler; -use jsonrpc_core::RemoteProcedure; use std::sync::Arc; const DEPRECATED_RPC_PREFIX: &str = "deprecated."; @@ -48,7 +46,6 @@ impl<'a> ServiceBuilder<'a> { self } - /* /// Mounts methods from module Pool if it is enabled in the config. pub fn enable_pool( mut self, @@ -60,16 +57,14 @@ impl<'a> ServiceBuilder<'a> { shared, extra_well_known_lock_scripts, extra_well_known_type_scripts, - ) - .to_delegate(); + ); if self.config.pool_enable() { - self.add_methods(rpc_methods); + add_pool_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Pool", rpc_methods); + //self.update_disabled_methods("Pool", rpc_methods); } self } - */ /// Mounts methods from module Miner if `enable` is `true` and it is enabled in the config. pub fn enable_miner( @@ -92,7 +87,6 @@ impl<'a> ServiceBuilder<'a> { self } - /* /// Mounts methods from module Net if it is enabled in the config. pub fn enable_net( mut self, @@ -102,12 +96,11 @@ impl<'a> ServiceBuilder<'a> { let rpc_methods = NetRpcImpl { network_controller, sync_shared, - } - .to_delegate(); + }; if self.config.net_enable() { - self.add_methods(rpc_methods); + add_net_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Net", rpc_methods); + //self.update_disabled_methods("Net", rpc_methods); } self } @@ -121,27 +114,25 @@ impl<'a> ServiceBuilder<'a> { let rpc_methods = StatsRpcImpl { shared, alert_notifier, - } - .to_delegate(); + }; if self.config.stats_enable() { - self.add_methods(rpc_methods); + add_stats_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Stats", rpc_methods); + //self.update_disabled_methods("Stats", rpc_methods); } self } /// Mounts methods from module Experiment if it is enabled in the config. pub fn enable_experiment(mut self, shared: Shared) -> Self { - let rpc_methods = ExperimentRpcImpl { shared }.to_delegate(); + let rpc_methods = ExperimentRpcImpl { shared }; if self.config.experiment_enable() { - self.add_methods(rpc_methods); + add_experiment_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Experiment", rpc_methods); + //self.update_disabled_methods("Experiment", rpc_methods); } self } - */ /// Mounts methods from module Integration if it is enabled in the config. pub fn enable_integration_test( @@ -152,13 +143,11 @@ impl<'a> ServiceBuilder<'a> { ) -> Self { if self.config.integration_test_enable() { // IntegrationTest only on Dummy PoW chain - /* assert_eq!( shared.consensus().pow, Pow::Dummy, "Only run integration test on Dummy PoW chain" ); - */ let methods = IntegrationTestRpcImpl { shared: shared.clone(), network_controller, @@ -177,7 +166,6 @@ impl<'a> ServiceBuilder<'a> { network_controller: NetworkController, ) -> Self { if self.config.alert_enable() { - eprintln!("enable_alert ............."); let methods = AlertRpcImpl::new(alert_verifier, alert_notifier, network_controller); add_alert_rpc_methods(&mut self.io_handler, methods); } @@ -193,7 +181,6 @@ impl<'a> ServiceBuilder<'a> { self } - /* /// Mounts methods from module Indexer if it is enabled in the config. pub fn enable_indexer( mut self, @@ -203,12 +190,12 @@ impl<'a> ServiceBuilder<'a> { ) -> Self { let indexer = IndexerService::new(db_config, indexer_config, shared.async_handle().clone()); let indexer_handle = indexer.handle(); - let rpc_methods = IndexerRpcImpl::new(indexer_handle).to_delegate(); + let rpc_methods = IndexerRpcImpl::new(indexer_handle); if self.config.indexer_enable() { start_indexer(&shared, indexer, indexer_config.index_tx_pool); - self.add_methods(rpc_methods); + add_indexer_rpc_methods(&mut self.io_handler, rpc_methods); } else { - self.update_disabled_methods("Indexer", rpc_methods); + //self.update_disabled_methods("Indexer", rpc_methods); } self } @@ -228,13 +215,11 @@ impl<'a> ServiceBuilder<'a> { ) }); } - */ /// Builds the RPC methods handler used in the RPC server. pub fn build(self) -> IoHandler { let mut io_handler = self.io_handler; io_handler.add_method("@ping", |_| async move { Ok("pong".into()) }); - eprintln!("build ............."); io_handler } } diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index bcdc89c7ce..42e46903ac 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -253,6 +253,7 @@ impl IndexerService { /// /// The handle is internally reference-counted and can be freely cloned. /// A handle can be obtained using the IndexerService::handle method. +#[derive(Clone)] pub struct IndexerHandle { pub(crate) store: RocksdbStore, pub(crate) pool: Option>>, diff --git a/util/launcher/src/lib.rs b/util/launcher/src/lib.rs index 9cddfecfc5..f15603e4ff 100644 --- a/util/launcher/src/lib.rs +++ b/util/launcher/src/lib.rs @@ -386,37 +386,38 @@ impl Launcher { .expect("Start network service failed"); let rpc_config = self.adjust_rpc_config(); - let builder = ServiceBuilder::new(&rpc_config).enable_chain(shared.clone()) - // .enable_pool( - // shared.clone(), - // rpc_config - // .extra_well_known_lock_scripts - // .iter() - // .map(|script| script.clone().into()) - // .collect(), - // rpc_config - // .extra_well_known_type_scripts - // .iter() - // .map(|script| script.clone().into()) - // .collect(), - // ) - .enable_miner( - shared.clone(), - network_controller.clone(), - chain_controller.clone(), - miner_enable, - ) - // .enable_net(network_controller.clone(), sync_shared) - // .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) - // .enable_experiment(shared.clone()) - .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) - .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) - // .enable_indexer( - // shared.clone(), - // &self.args.config.db, - // &self.args.config.indexer, - // ) - .enable_debug(); + let builder = ServiceBuilder::new(&rpc_config) + .enable_chain(shared.clone()) + .enable_pool( + shared.clone(), + rpc_config + .extra_well_known_lock_scripts + .iter() + .map(|script| script.clone().into()) + .collect(), + rpc_config + .extra_well_known_type_scripts + .iter() + .map(|script| script.clone().into()) + .collect(), + ) + .enable_miner( + shared.clone(), + network_controller.clone(), + chain_controller.clone(), + miner_enable, + ) + .enable_net(network_controller.clone(), sync_shared) + .enable_stats(shared.clone(), Arc::clone(&alert_notifier)) + .enable_experiment(shared.clone()) + .enable_integration_test(shared.clone(), network_controller.clone(), chain_controller) + .enable_alert(alert_verifier, alert_notifier, network_controller.clone()) + .enable_indexer( + shared.clone(), + &self.args.config.db, + &self.args.config.indexer, + ) + .enable_debug(); let io_handler = builder.build(); RpcServer::start_jsonrpc_server( From c29c0219960fc8faded97c5f57138c8948195100 Mon Sep 17 00:00:00 2001 From: yukang Date: Wed, 20 Sep 2023 18:33:43 +0800 Subject: [PATCH 05/71] fix deprecated issues --- ckb-bin/src/lib.rs | 9 +-- ckb-bin/src/subcommand/run.rs | 50 ++++++------- rpc/src/module/alert.rs | 7 -- rpc/src/module/chain.rs | 44 +++++++++--- rpc/src/service_builder.rs | 132 ++++++++++++++++++++++++---------- src/main.rs | 4 +- util/launcher/src/lib.rs | 6 +- 7 files changed, 155 insertions(+), 97 deletions(-) diff --git a/ckb-bin/src/lib.rs b/ckb-bin/src/lib.rs index 1030d4a86c..8596f504d0 100644 --- a/ckb-bin/src/lib.rs +++ b/ckb-bin/src/lib.rs @@ -6,7 +6,7 @@ mod setup_guard; mod subcommand; use ckb_app_config::{cli, ExitCode, Setup}; -use ckb_async_runtime::Handle; +use ckb_async_runtime::new_global_runtime; use ckb_build_info::Version; use ckb_logger::{debug, info}; use ckb_network::tokio; @@ -25,7 +25,7 @@ pub(crate) const LOG_TARGET_SENTRY: &str = "sentry"; /// /// * `version` - The version is passed in so the bin crate can collect the version without trigger /// re-linking. -pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCode> { +pub fn run_app(version: Version) -> Result<(), ExitCode> { // Always print backtrace on panic. ::std::env::set_var("RUST_BACKTRACE", "full"); @@ -58,13 +58,14 @@ pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCod .expect("SubcommandRequiredElseHelp"); let is_silent_logging = is_silent_logging(cmd); + let (mut handle, mut handle_stop_rx, _runtime) = new_global_runtime(); let setup = Setup::from_matches(bin_name, cmd, matches)?; let _guard = SetupGuard::from_setup(&setup, &version, handle.clone(), is_silent_logging)?; raise_fd_limit(); let ret = match cmd { - cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()).await, + cli::CMD_RUN => subcommand::run(setup.run(matches)?, version, handle.clone()), cli::CMD_MINER => subcommand::miner(setup.miner(matches)?, handle.clone()), cli::CMD_REPLAY => subcommand::replay(setup.replay(matches)?, handle.clone()), cli::CMD_EXPORT => subcommand::export(setup.export(matches)?, handle.clone()), @@ -80,7 +81,7 @@ pub async fn run_app(version: Version, mut handle: Handle) -> Result<(), ExitCod tokio::task::block_in_place(|| { debug!("waiting all tokio tasks done"); - //handle_stop_rx.blocking_recv(); + handle_stop_rx.blocking_recv(); info!("ckb shutdown"); }); } diff --git a/ckb-bin/src/subcommand/run.rs b/ckb-bin/src/subcommand/run.rs index 1b5bc48a97..012d38d0cd 100644 --- a/ckb-bin/src/subcommand/run.rs +++ b/ckb-bin/src/subcommand/run.rs @@ -11,21 +11,11 @@ use ckb_stop_handler::{broadcast_exit_signals, wait_all_ckb_services_exit}; use ckb_types::core::cell::setup_system_cell_cache; -// pub fn run( -// args: RunArgs, -// version: Version, -// async_handle: Handle, -// runtime: Runtime, -// ) -> Result<(), ExitCode> { -// //runtime.spawn_blocking(|| run_inner(args, version, async_handle)); -// runtime.block_on(run_inner(args, version, async_handle)) -// } - -pub async fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { +pub fn run(args: RunArgs, version: Version, async_handle: Handle) -> Result<(), ExitCode> { deadlock_detection(); info!("ckb version: {}", version); - let mut launcher = Launcher::new(args, version, async_handle); + let mut launcher = Launcher::new(args, version, async_handle.clone()); let block_assembler_config = launcher.sanitize_block_assembler_config()?; let miner_enable = block_assembler_config.is_some(); @@ -57,26 +47,26 @@ pub async fn run(args: RunArgs, version: Version, async_handle: Handle) -> Resul launcher.start_block_filter(&shared); - let rpc_task = spawn(async move { - let network_controller = launcher - .start_network_and_rpc( - &shared, - chain_controller.clone(), - miner_enable, - pack.take_relay_tx_receiver(), - ) - .await; - eprintln!("network_controller begin to run ...."); - eprintln!("end network_controller run ...."); - - let tx_pool_builder = pack.take_tx_pool_builder(); - tx_pool_builder.start(network_controller.clone()); + async_handle.block_on(async move { + let rpc_task = spawn(async move { + let network_controller = launcher + .start_network_and_rpc( + &shared, + chain_controller.clone(), + miner_enable, + pack.take_relay_tx_receiver(), + ) + .await; + + let tx_pool_builder = pack.take_tx_pool_builder(); + tx_pool_builder.start(network_controller.clone()); + }); + + tokio::select! { + _ = rpc_task => {}, + } }); - tokio::select! { - _ = rpc_task => {}, - }; - ctrlc::set_handler(|| { info!("Trapped exit signal, exiting..."); broadcast_exit_signals(); diff --git a/rpc/src/module/alert.rs b/rpc/src/module/alert.rs index d15686f899..3ce3c661fe 100644 --- a/rpc/src/module/alert.rs +++ b/rpc/src/module/alert.rs @@ -70,9 +70,6 @@ pub trait AlertRpc { /// ``` #[rpc(name = "send_alert")] async fn send_alert(&self, alert: Alert) -> Result<()>; - - #[rpc(name = "hello")] - async fn hello(&self) -> Result; } #[derive(Clone)] @@ -128,8 +125,4 @@ impl AlertRpc for AlertRpcImpl { )), } } - - async fn hello(&self) -> Result { - Ok(format!("Hello, Yukang!")) - } } diff --git a/rpc/src/module/chain.rs b/rpc/src/module/chain.rs index 1cf89d4821..c3d32af4d1 100644 --- a/rpc/src/module/chain.rs +++ b/rpc/src/module/chain.rs @@ -1,5 +1,6 @@ use crate::error::RPCError; use crate::util::FeeRateCollector; +use async_trait::async_trait; use ckb_jsonrpc_types::{ BlockEconomicState, BlockFilter, BlockNumber, BlockResponse, BlockView, CellWithStatus, Consensus, EpochNumber, EpochView, EstimateCycles, FeeRateStatistics, HeaderView, OutPoint, @@ -26,10 +27,9 @@ use ckb_types::{ use ckb_verification::ScriptVerifier; use ckb_verification::TxVerifyEnv; use jsonrpc_core::Result; +use jsonrpc_utils::rpc; use std::collections::HashSet; use std::sync::Arc; -use jsonrpc_utils::rpc; -use async_trait::async_trait; /// RPC Module Chain for methods related to the canonical chain. /// @@ -764,7 +764,8 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_tip_header")] - async fn get_tip_header(&self, verbosity: Option) -> Result>; + async fn get_tip_header(&self, verbosity: Option) + -> Result>; /// Returns the status of a cell. The RPC returns extra information if it is a [live cell](#live-cell). /// @@ -1006,7 +1007,10 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_block_economic_state")] - async fn get_block_economic_state(&self, block_hash: H256) -> Result>; + async fn get_block_economic_state( + &self, + block_hash: H256, + ) -> Result>; /// Returns a Merkle proof that transactions are included in a block. /// @@ -1562,8 +1566,11 @@ pub trait ChainRpc { since = "0.109.0", note = "Please use the RPC method [`get_fee_rate_statistics`](#tymethod.get_fee_rate_statistics) instead" )] - #[rpc(name = "get_fee_rate_statics")] - async fn get_fee_rate_statics(&self, target: Option) -> Result>; + #[rpc(name = "deprecated.get_fee_rate_statics")] + async fn get_fee_rate_statics( + &self, + target: Option, + ) -> Result>; /// Returns the fee_rate statistics of confirmed blocks on the chain /// @@ -1605,7 +1612,10 @@ pub trait ChainRpc { /// } /// ``` #[rpc(name = "get_fee_rate_statistics")] - async fn get_fee_rate_statistics(&self, target: Option) -> Result>; + async fn get_fee_rate_statistics( + &self, + target: Option, + ) -> Result>; } #[derive(Clone)] @@ -1778,7 +1788,10 @@ impl ChainRpc for ChainRpcImpl { .map(|h| h.unpack())) } - async fn get_tip_header(&self, verbosity: Option) -> Result> { + async fn get_tip_header( + &self, + verbosity: Option, + ) -> Result> { let verbosity = verbosity .map(|v| v.value()) .unwrap_or(DEFAULT_HEADER_VERBOSITY_LEVEL); @@ -1825,7 +1838,10 @@ impl ChainRpc for ChainRpcImpl { Ok(self.shared.snapshot().tip_header().number().into()) } - async fn get_block_economic_state(&self, block_hash: H256) -> Result> { + async fn get_block_economic_state( + &self, + block_hash: H256, + ) -> Result> { let snapshot = self.shared.snapshot(); let block_number = if let Some(block_number) = snapshot.get_block_number(&block_hash.pack()) @@ -2106,12 +2122,18 @@ impl ChainRpc for ChainRpcImpl { CyclesEstimator::new(&self.shared).run(tx) } - async fn get_fee_rate_statics(&self, target: Option) -> Result> { + async fn get_fee_rate_statics( + &self, + target: Option, + ) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } - async fn get_fee_rate_statistics(&self, target: Option) -> Result> { + async fn get_fee_rate_statistics( + &self, + target: Option, + ) -> Result> { Ok(FeeRateCollector::new(self.shared.snapshot().as_ref()) .statistics(target.map(Into::into))) } diff --git a/rpc/src/service_builder.rs b/rpc/src/service_builder.rs index 4d0b540401..ee7ac5b6a9 100644 --- a/rpc/src/service_builder.rs +++ b/rpc/src/service_builder.rs @@ -18,6 +18,9 @@ use ckb_shared::shared::Shared; use ckb_sync::SyncShared; use ckb_types::packed::Script; use ckb_util::Mutex; +use jsonrpc_core::MetaIoHandler; +use jsonrpc_core::RemoteProcedure; +use jsonrpc_utils::pub_sub::Session; use std::sync::Arc; const DEPRECATED_RPC_PREFIX: &str = "deprecated."; @@ -39,9 +42,13 @@ impl<'a> ServiceBuilder<'a> { /// Mounts methods from module Chain if it is enabled in the config. pub fn enable_chain(mut self, shared: Shared) -> Self { + let mut meta_io = MetaIoHandler::default(); if self.config.chain_enable() { let methods = ChainRpcImpl { shared }; - add_chain_rpc_methods(&mut self.io_handler, methods); + add_chain_rpc_methods(&mut meta_io, methods); + self.add_methods(meta_io); + } else { + self.update_disabled_methods("Chain", meta_io); } self } @@ -53,15 +60,17 @@ impl<'a> ServiceBuilder<'a> { extra_well_known_lock_scripts: Vec