Skip to content

Commit

Permalink
Merge pull request nervosnetwork#4247 from EthanYuan/indexer-set-init…
Browse files Browse the repository at this point in the history
…-tip-hash

feat: introduce `init_tip_hash` setting for indexer to skip previous block synchronization
  • Loading branch information
zhangsoledad authored Jan 11, 2024
2 parents ba67744 + 4851b30 commit 88a70a6
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 6 deletions.
4 changes: 4 additions & 0 deletions ckb-bin/src/subcommand/reset_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pub fn reset_data(args: ResetDataArgs) -> Result<(), ExitCode> {
target_dirs.push(args.db_path);
}

if args.indexer {
target_dirs.push(args.indexer_path);
}

if args.network {
target_dirs.push(args.network_dir);
}
Expand Down
7 changes: 7 additions & 0 deletions resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,13 @@ block_uncles_cache_size = 30
# # Or you may want use more flexible scripts, block template as arg.
# notify_scripts = ["{cmd} {blocktemplate}"]
#
# # CKB built-in indexer settings. Existing indexes can be cleaned up using the `ckb reset-data --indexer` subcommand.
# [indexer_v2]
# # Indexing the pending txs in the ckb tx-pool
# index_tx_pool = false
# # Customize block filtering rules to index only retained blocks
# block_filter = "block.header.number.to_uint() >= \"0x0\".to_uint()"
# # Customize cell filtering rules to index only retained cells
# cell_filter = "let script = output.type;script!=() && script.code_hash == \"0x00000000000000000000000000000000000000000000000000545950455f4944\""
# # The initial tip can be set higher than the current indexer tip as the starting height for indexing.
# init_tip_hash = "0x8fbd0ec887159d2814cee475911600e3589849670f5ee1ed9798b38fdeef4e44"
4 changes: 4 additions & 0 deletions util/app-config/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ pub struct ResetDataArgs {
pub all: bool,
/// Reset database.
pub database: bool,
/// Reset indexer.
pub indexer: bool,
/// Reset all network data, including the secret key and peer store.
pub network: bool,
/// Reset network peer store.
Expand All @@ -172,6 +174,8 @@ pub struct ResetDataArgs {
pub data_dir: PathBuf,
/// The path to the database directory.
pub db_path: PathBuf,
/// The path to the indexer directory.
pub indexer_path: PathBuf,
/// The path to the network data directory.
pub network_dir: PathBuf,
/// The path to the network peer store directory.
Expand Down
6 changes: 6 additions & 0 deletions util/app-config/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,12 @@ fn reset_data() -> Command {
.action(clap::ArgAction::SetTrue)
.help("Delete only `data/db`"),
)
.arg(
Arg::new(ARG_INDEXER)
.long(ARG_INDEXER)
.action(clap::ArgAction::SetTrue)
.help("Delete only `data/indexer/store`"),
)
.arg(
Arg::new(ARG_NETWORK)
.long(ARG_NETWORK)
Expand Down
5 changes: 5 additions & 0 deletions util/app-config/src/configs/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ckb_types::H256;
use serde::{Deserialize, Serialize};
use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -28,6 +29,9 @@ pub struct IndexerConfig {
/// Maximal db info log files to be kept.
#[serde(default)]
pub db_keep_log_file_num: Option<NonZeroUsize>,
/// The init tip block hash
#[serde(default)]
pub init_tip_hash: Option<H256>,
}

const fn default_poll_interval() -> u64 {
Expand All @@ -45,6 +49,7 @@ impl Default for IndexerConfig {
cell_filter: None,
db_background_jobs: None,
db_keep_log_file_num: None,
init_tip_hash: None,
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions util/app-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl Setup {
let config = self.config.into_ckb()?;
let data_dir = config.data_dir;
let db_path = config.db.path;
let indexer_path = config.indexer.store;
let network_config = config.network;
let network_dir = network_config.path.clone();
let network_peer_store_path = network_config.peer_store_path();
Expand All @@ -329,6 +330,7 @@ impl Setup {
let force = matches.get_flag(cli::ARG_FORCE);
let all = matches.get_flag(cli::ARG_ALL);
let database = matches.get_flag(cli::ARG_DATABASE);
let indexer = matches.get_flag(cli::ARG_INDEXER);
let network = matches.get_flag(cli::ARG_NETWORK);
let network_peer_store = matches.get_flag(cli::ARG_NETWORK_PEER_STORE);
let network_secret_key = matches.get_flag(cli::ARG_NETWORK_SECRET_KEY);
Expand All @@ -338,12 +340,14 @@ impl Setup {
force,
all,
database,
indexer,
network,
network_peer_store,
network_secret_key,
logs,
data_dir,
db_path,
indexer_path,
network_dir,
network_peer_store_path,
network_secret_key_path,
Expand Down
60 changes: 54 additions & 6 deletions util/indexer/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//!The indexer service.

use crate::error::Error;
use crate::indexer::{self, extract_raw_data, CustomFilters, Indexer, Key, KeyPrefix, Value};
use crate::pool::Pool;
use crate::store::{IteratorDirection, RocksdbStore, SecondaryDB, Store};
use crate::store::{Batch, IteratorDirection, RocksdbStore, SecondaryDB, Store};

use crate::error::Error;
use ckb_app_config::{DBConfig, IndexerConfig};
use ckb_async_runtime::{
tokio::{self, time},
Expand All @@ -20,11 +20,17 @@ use ckb_logger::{error, info};
use ckb_notify::NotifyController;
use ckb_stop_handler::{new_tokio_exit_rx, CancellationToken};
use ckb_store::ChainStore;
use ckb_types::{core, packed, prelude::*, H256};
use ckb_types::{
core::{self, BlockNumber},
packed,
prelude::*,
H256,
};
use rocksdb::{prelude::*, Direction, IteratorMode};
use std::convert::TryInto;
use std::num::NonZeroUsize;
use std::sync::{Arc, RwLock};
use std::thread::sleep;
use std::time::Duration;

const SUBSCRIBER_NAME: &str = "Indexer";
Expand All @@ -41,6 +47,7 @@ pub struct IndexerService {
async_handle: Handle,
block_filter: Option<String>,
cell_filter: Option<String>,
init_tip_hash: Option<H256>,
}

impl IndexerService {
Expand Down Expand Up @@ -76,6 +83,7 @@ impl IndexerService {
poll_interval: Duration::from_secs(config.poll_interval),
block_filter: config.block_filter.clone(),
cell_filter: config.cell_filter.clone(),
init_tip_hash: config.init_tip_hash.clone(),
}
}

Expand Down Expand Up @@ -127,6 +135,44 @@ impl IndexerService {
});
}

fn apply_init_tip(&self) {
if let Some(init_tip_hash) = &self.init_tip_hash {
let indexer_tip = self
.store
.iter([KeyPrefix::Header as u8 + 1], IteratorDirection::Reverse)
.expect("iter Header should be OK")
.next()
.map(|(key, _)| {
BlockNumber::from_be_bytes(key[1..9].try_into().expect("stored block key"))
});
if let Some(indexer_tip) = indexer_tip {
if let Some(init_tip) = self.secondary_db.get_block_header(&init_tip_hash.pack()) {
if indexer_tip >= init_tip.number() {
return;
}
}
}
loop {
if let Err(e) = self.secondary_db.try_catch_up_with_primary() {
error!("secondary_db try_catch_up_with_primary error {}", e);
}
if let Some(header) = self.secondary_db.get_block_header(&init_tip_hash.pack()) {
let init_tip_number = header.number();
let mut batch = self.store.batch().expect("create batch should be OK");
batch
.put_kv(
Key::Header(init_tip_number, &init_tip_hash.pack(), true),
vec![],
)
.expect("insert init tip header should be OK");
batch.commit().expect("commit batch should be OK");
break;
}
sleep(Duration::from_secs(1));
}
}
}

fn try_loop_sync(&self) {
// assume that long fork will not happen >= 100 blocks.
let keep_num = 100;
Expand Down Expand Up @@ -171,9 +217,11 @@ impl IndexerService {
/// Processes that handle block cell and expect to be spawned to run in tokio runtime
pub fn spawn_poll(&self, notify_controller: NotifyController) {
let initial_service = self.clone();
let initial_syncing = self
.async_handle
.spawn_blocking(move || initial_service.try_loop_sync());
let initial_syncing = self.async_handle.spawn_blocking(move || {
initial_service.apply_init_tip();
initial_service.try_loop_sync()
});

let stop: CancellationToken = new_tokio_exit_rx();
let async_handle = self.async_handle.clone();
let poll_service = self.clone();
Expand Down

0 comments on commit 88a70a6

Please sign in to comment.