Skip to content

Commit

Permalink
Merge pull request #4351 from eval-exec/exec/indexer-try_loop_sync-exit
Browse files Browse the repository at this point in the history
`IndexerService::try_loop_sync` should not run if ckb has received stop signal
  • Loading branch information
quake authored Feb 19, 2024
2 parents 2615601 + b3dde39 commit 4cd7845
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 13 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion util/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ ckb-notify = { path = "../../notify", version = "= 0.114.0-pre" }
ckb-store = { path = "../../store", version = "= 0.114.0-pre" }
ckb-stop-handler = { path = "../stop-handler", version = "= 0.114.0-pre" }
ckb-async-runtime = { path = "../runtime", version = "= 0.114.0-pre" }
ckb-channel = { path = "../channel", version = "= 0.114.0-pre" }
rhai = { version = "1.10.0", features = ["no_function", "no_float", "no_module", "sync"]}
serde_json = "1.0"
numext-fixed-uint = "0.1"
Expand Down
20 changes: 11 additions & 9 deletions util/indexer/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use ckb_jsonrpc_types::{
};
use ckb_logger::{error, info};
use ckb_notify::NotifyController;
use ckb_stop_handler::{new_crossbeam_exit_rx, new_tokio_exit_rx, CancellationToken};
use ckb_stop_handler::{has_received_stop_signal, new_tokio_exit_rx, CancellationToken};
use ckb_store::ChainStore;
use ckb_types::{
core::{self, BlockNumber},
Expand Down Expand Up @@ -153,14 +153,10 @@ impl IndexerService {
}
}
}
let stop_rx = new_crossbeam_exit_rx();
loop {
ckb_channel::select! {
recv(stop_rx) -> _ =>{
info!("apply_init_tip received exit signal, exit now");
break;
},
default() => {},
if has_received_stop_signal() {
info!("apply_init_tip received exit signal, exit now");
break;
}

if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
Expand Down Expand Up @@ -197,6 +193,11 @@ impl IndexerService {
CustomFilters::new(self.block_filter.as_deref(), self.cell_filter.as_deref()),
);
loop {
if has_received_stop_signal() {
info!("try_loop_sync received exit signal, exit now");
break;
}

if let Some((tip_number, tip_hash)) = indexer.tip().expect("get tip should be OK") {
match self.get_block_by_number(tip_number + 1) {
Some(block) => {
Expand Down Expand Up @@ -237,12 +238,13 @@ impl IndexerService {
let poll_service = self.clone();
self.async_handle.spawn(async move {
let _initial_finished = initial_syncing.await;
info!("initial_syncing finished");
if stop.is_cancelled() {
info!("Indexer received exit signal, cancel new_block_watcher task, exit now");
return;
}

info!("initial_syncing finished");

let mut new_block_watcher = notify_controller
.watch_new_block(SUBSCRIBER_NAME.to_string())
.await;
Expand Down
4 changes: 2 additions & 2 deletions util/stop-handler/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
//! TODO(doc): @keroro520
pub use stop_register::{
broadcast_exit_signals, new_crossbeam_exit_rx, new_tokio_exit_rx, register_thread,
wait_all_ckb_services_exit,
broadcast_exit_signals, has_received_stop_signal, new_crossbeam_exit_rx, new_tokio_exit_rx,
register_thread, wait_all_ckb_services_exit,
};

pub use tokio_util::sync::CancellationToken;
Expand Down
10 changes: 10 additions & 0 deletions util/stop-handler/src/stop_register.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use ckb_channel::TrySendError;
use ckb_logger::{debug, info, trace, warn};
use ckb_util::Mutex;
use std::sync::atomic::AtomicBool;
use tokio_util::sync::CancellationToken;

struct CkbServiceHandles {
Expand Down Expand Up @@ -34,6 +35,9 @@ static CKB_HANDLES: once_cell::sync::Lazy<Mutex<CkbServiceHandles>> =
})
});

static RECEIVED_STOP_SIGNAL: once_cell::sync::Lazy<AtomicBool> =
once_cell::sync::Lazy::new(AtomicBool::default);

static TOKIO_EXIT: once_cell::sync::Lazy<CancellationToken> =
once_cell::sync::Lazy::new(CancellationToken::new);

Expand All @@ -52,9 +56,15 @@ pub fn new_crossbeam_exit_rx() -> ckb_channel::Receiver<()> {
rx
}

/// Check if the ckb process has received stop signal
pub fn has_received_stop_signal() -> bool {
RECEIVED_STOP_SIGNAL.load(std::sync::atomic::Ordering::SeqCst)
}

/// Broadcast exit signals to all threads and all tokio tasks
pub fn broadcast_exit_signals() {
debug!("Received exit signal; broadcasting exit signal to all threads");
RECEIVED_STOP_SIGNAL.store(true, std::sync::atomic::Ordering::SeqCst);
TOKIO_EXIT.cancel();
CROSSBEAM_EXIT_SENDERS
.lock()
Expand Down

0 comments on commit 4cd7845

Please sign in to comment.