diff --git a/Cargo.lock b/Cargo.lock index bf42e1e1e6..1f913ffcf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -907,7 +907,6 @@ version = "0.114.0-pre" dependencies = [ "ckb-app-config", "ckb-async-runtime", - "ckb-channel", "ckb-db-schema", "ckb-jsonrpc-types", "ckb-logger", diff --git a/util/indexer/Cargo.toml b/util/indexer/Cargo.toml index 294921a99b..ea01bb6ffe 100644 --- a/util/indexer/Cargo.toml +++ b/util/indexer/Cargo.toml @@ -22,7 +22,6 @@ ckb-notify = { path = "../../notify", version = "= 0.114.0-pre" } ckb-store = { path = "../../store", version = "= 0.114.0-pre" } ckb-stop-handler = { path = "../stop-handler", version = "= 0.114.0-pre" } ckb-async-runtime = { path = "../runtime", version = "= 0.114.0-pre" } -ckb-channel = { path = "../channel", version = "= 0.114.0-pre" } rhai = { version = "1.10.0", features = ["no_function", "no_float", "no_module", "sync"]} serde_json = "1.0" numext-fixed-uint = "0.1" diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index 0bbb0c2056..edc2f01d96 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -18,7 +18,7 @@ use ckb_jsonrpc_types::{ }; use ckb_logger::{error, info}; use ckb_notify::NotifyController; -use ckb_stop_handler::{new_crossbeam_exit_rx, new_tokio_exit_rx, CancellationToken}; +use ckb_stop_handler::{has_received_stop_signal, new_tokio_exit_rx, CancellationToken}; use ckb_store::ChainStore; use ckb_types::{ core::{self, BlockNumber}, @@ -153,14 +153,10 @@ impl IndexerService { } } } - let stop_rx = new_crossbeam_exit_rx(); loop { - ckb_channel::select! { - recv(stop_rx) -> _ =>{ - info!("apply_init_tip received exit signal, exit now"); - break; - }, - default() => {}, + if has_received_stop_signal() { + info!("apply_init_tip received exit signal, exit now"); + break; } if let Err(e) = self.secondary_db.try_catch_up_with_primary() { @@ -197,6 +193,11 @@ impl IndexerService { CustomFilters::new(self.block_filter.as_deref(), self.cell_filter.as_deref()), ); loop { + if has_received_stop_signal() { + info!("try_loop_sync received exit signal, exit now"); + break; + } + if let Some((tip_number, tip_hash)) = indexer.tip().expect("get tip should be OK") { match self.get_block_by_number(tip_number + 1) { Some(block) => { @@ -237,12 +238,13 @@ impl IndexerService { let poll_service = self.clone(); self.async_handle.spawn(async move { let _initial_finished = initial_syncing.await; - info!("initial_syncing finished"); if stop.is_cancelled() { info!("Indexer received exit signal, cancel new_block_watcher task, exit now"); return; } + info!("initial_syncing finished"); + let mut new_block_watcher = notify_controller .watch_new_block(SUBSCRIBER_NAME.to_string()) .await; diff --git a/util/stop-handler/src/lib.rs b/util/stop-handler/src/lib.rs index 48309b03e5..c3bf144004 100644 --- a/util/stop-handler/src/lib.rs +++ b/util/stop-handler/src/lib.rs @@ -1,8 +1,8 @@ //! TODO(doc): @keroro520 pub use stop_register::{ - broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, - wait_all_ckb_services_exit, + broadcast_exit_signals, has_received_stop_signal, new_crossbeam_exit_rx, new_tokio_exit_rx, + register_thread, wait_all_ckb_services_exit, }; pub use tokio_util::sync::CancellationToken; diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs index ccc3d9a6f4..c9146332dc 100644 --- a/util/stop-handler/src/stop_register.rs +++ b/util/stop-handler/src/stop_register.rs @@ -1,6 +1,7 @@ use ckb_channel::TrySendError; use ckb_logger::{debug, info, trace, warn}; use ckb_util::Mutex; +use std::sync::atomic::AtomicBool; use tokio_util::sync::CancellationToken; struct CkbServiceHandles { @@ -34,6 +35,9 @@ static CKB_HANDLES: once_cell::sync::Lazy> = }) }); +static RECEIVED_STOP_SIGNAL: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(AtomicBool::default); + static TOKIO_EXIT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(CancellationToken::new); @@ -52,9 +56,15 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> { rx } +/// Check if the ckb process has received stop signal +pub fn has_received_stop_signal() -> bool { + RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst) +} + /// Broadcast exit signals to all threads and all tokio tasks pub fn broadcast_exit_signals() { debug!("Received exit signal; broadcasting exit signal to all threads"); + RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst); TOKIO_EXIT.cancel(); CROSSBEAM_EXIT_SENDERS .lock()