From 3495ce42fdebc3e5cef7265e3e47efbb1689a8da Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sun, 18 Feb 2024 10:35:21 +0800 Subject: [PATCH 1/5] Add has_received_stop_signal function to check if the ckb process has received stop signal --- util/stop-handler/src/lib.rs | 4 ++-- util/stop-handler/src/stop_register.rs | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/util/stop-handler/src/lib.rs b/util/stop-handler/src/lib.rs index 48309b03e5..c3bf144004 100644 --- a/util/stop-handler/src/lib.rs +++ b/util/stop-handler/src/lib.rs @@ -1,8 +1,8 @@ //! TODO(doc): @keroro520 pub use stop_register::{ - broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread, - wait_all_ckb_services_exit, + broadcast_exit_signals, has_received_stop_signal, new_crossbeam_exit_rx, new_tokio_exit_rx, + register_thread, wait_all_ckb_services_exit, }; pub use tokio_util::sync::CancellationToken; diff --git a/util/stop-handler/src/stop_register.rs b/util/stop-handler/src/stop_register.rs index ccc3d9a6f4..c9146332dc 100644 --- a/util/stop-handler/src/stop_register.rs +++ b/util/stop-handler/src/stop_register.rs @@ -1,6 +1,7 @@ use ckb_channel::TrySendError; use ckb_logger::{debug, info, trace, warn}; use ckb_util::Mutex; +use std::sync::atomic::AtomicBool; use tokio_util::sync::CancellationToken; struct CkbServiceHandles { @@ -34,6 +35,9 @@ static CKB_HANDLES: once_cell::sync::Lazy> = }) }); +static RECEIVED_STOP_SIGNAL: once_cell::sync::Lazy = + once_cell::sync::Lazy::new(AtomicBool::default); + static TOKIO_EXIT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(CancellationToken::new); @@ -52,9 +56,15 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> { rx } +/// Check if the ckb process has received stop signal +pub fn has_received_stop_signal() -> bool { + RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst) +} + /// Broadcast exit signals to all threads and all tokio tasks pub fn broadcast_exit_signals() { debug!("Received exit signal; broadcasting exit signal to all threads"); + RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst); TOKIO_EXIT.cancel(); CROSSBEAM_EXIT_SENDERS .lock() From 2f6b67d7106424b09ef898eb9a090e107345e516 Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sun, 18 Feb 2024 10:07:51 +0800 Subject: [PATCH 2/5] IndexerService::try_loop_sync should not run if ckb has received stop signal Signed-off-by: Eval EXEC --- util/indexer/src/service.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index 0bbb0c2056..425340ecff 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -18,7 +18,9 @@ use ckb_jsonrpc_types::{ }; use ckb_logger::{error, info}; use ckb_notify::NotifyController; -use ckb_stop_handler::{new_crossbeam_exit_rx, new_tokio_exit_rx, CancellationToken}; +use ckb_stop_handler::{ + has_received_stop_signal, new_crossbeam_exit_rx, new_tokio_exit_rx, CancellationToken, +}; use ckb_store::ChainStore; use ckb_types::{ core::{self, BlockNumber}, @@ -229,7 +231,9 @@ impl IndexerService { let initial_service = self.clone(); let initial_syncing = self.async_handle.spawn_blocking(move || { initial_service.apply_init_tip(); - initial_service.try_loop_sync() + if !has_received_stop_signal() { + initial_service.try_loop_sync() + } }); let stop: CancellationToken = new_tokio_exit_rx(); From b36241d81ecebd3a13497f4b660ec3df52798bcd Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sun, 18 Feb 2024 11:23:04 +0800 Subject: [PATCH 3/5] IndexerService::try_loop_sync should break if has_received_stop_signal is true Signed-off-by: Eval EXEC --- util/indexer/src/service.rs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index 425340ecff..df1d9b69dc 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -18,9 +18,7 @@ use ckb_jsonrpc_types::{ }; use ckb_logger::{error, info}; use ckb_notify::NotifyController; -use ckb_stop_handler::{ - has_received_stop_signal, new_crossbeam_exit_rx, new_tokio_exit_rx, CancellationToken, -}; +use ckb_stop_handler::{has_received_stop_signal, new_tokio_exit_rx, CancellationToken}; use ckb_store::ChainStore; use ckb_types::{ core::{self, BlockNumber}, @@ -155,14 +153,10 @@ impl IndexerService { } } } - let stop_rx = new_crossbeam_exit_rx(); loop { - ckb_channel::select! { - recv(stop_rx) -> _ =>{ - info!("apply_init_tip received exit signal, exit now"); - break; - }, - default() => {}, + if has_received_stop_signal() { + info!("apply_init_tip received exit signal, exit now"); + break; } if let Err(e) = self.secondary_db.try_catch_up_with_primary() { @@ -199,6 +193,11 @@ impl IndexerService { CustomFilters::new(self.block_filter.as_deref(), self.cell_filter.as_deref()), ); loop { + if has_received_stop_signal() { + info!("try_loop_sync received exit signal, exit now"); + break; + } + if let Some((tip_number, tip_hash)) = indexer.tip().expect("get tip should be OK") { match self.get_block_by_number(tip_number + 1) { Some(block) => { @@ -231,9 +230,7 @@ impl IndexerService { let initial_service = self.clone(); let initial_syncing = self.async_handle.spawn_blocking(move || { initial_service.apply_init_tip(); - if !has_received_stop_signal() { - initial_service.try_loop_sync() - } + initial_service.try_loop_sync() }); let stop: CancellationToken = new_tokio_exit_rx(); From 40fdf3fef129f3e2a79e1a7609178070b4e8852f Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sun, 18 Feb 2024 11:36:29 +0800 Subject: [PATCH 4/5] Remove ckb-channel dependency from ckb-indexer --- Cargo.lock | 1 - util/indexer/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bf42e1e1e6..1f913ffcf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -907,7 +907,6 @@ version = "0.114.0-pre" dependencies = [ "ckb-app-config", "ckb-async-runtime", - "ckb-channel", "ckb-db-schema", "ckb-jsonrpc-types", "ckb-logger", diff --git a/util/indexer/Cargo.toml b/util/indexer/Cargo.toml index 294921a99b..ea01bb6ffe 100644 --- a/util/indexer/Cargo.toml +++ b/util/indexer/Cargo.toml @@ -22,7 +22,6 @@ ckb-notify = { path = "../../notify", version = "= 0.114.0-pre" } ckb-store = { path = "../../store", version = "= 0.114.0-pre" } ckb-stop-handler = { path = "../stop-handler", version = "= 0.114.0-pre" } ckb-async-runtime = { path = "../runtime", version = "= 0.114.0-pre" } -ckb-channel = { path = "../channel", version = "= 0.114.0-pre" } rhai = { version = "1.10.0", features = ["no_function", "no_float", "no_module", "sync"]} serde_json = "1.0" numext-fixed-uint = "0.1" From b3dde3953edc16a3e469f16c12e9e4458bb4132f Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sun, 18 Feb 2024 11:44:30 +0800 Subject: [PATCH 5/5] Reorder initial_syncing finished message --- util/indexer/src/service.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/util/indexer/src/service.rs b/util/indexer/src/service.rs index df1d9b69dc..edc2f01d96 100644 --- a/util/indexer/src/service.rs +++ b/util/indexer/src/service.rs @@ -238,12 +238,13 @@ impl IndexerService { let poll_service = self.clone(); self.async_handle.spawn(async move { let _initial_finished = initial_syncing.await; - info!("initial_syncing finished"); if stop.is_cancelled() { info!("Indexer received exit signal, cancel new_block_watcher task, exit now"); return; } + info!("initial_syncing finished"); + let mut new_block_watcher = notify_controller .watch_new_block(SUBSCRIBER_NAME.to_string()) .await;