Skip to content

Commit

Permalink
Merge branch 'develop' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
EthanYuan authored Aug 19, 2024
2 parents 35b7944 + 75728e6 commit 2c9c5d1
Show file tree
Hide file tree
Showing 28 changed files with 310 additions and 72 deletions.
2 changes: 1 addition & 1 deletion network/src/tests/peer_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ fn test_accept_inbound_peer_eviction() {
peer.connected_time = now - Duration::from_secs(10);
};
}
// thoses peers will not be protect, we add them to evict_targets
// these peers will not be protect, we add them to evict_targets
for _ in 0..longest_connection_time_peers_count {
let peer_addr = peers_iter.next().unwrap();
let peer_id = extract_peer_id(peer_addr).unwrap();
Expand Down
6 changes: 6 additions & 0 deletions resource/ckb.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ block_uncles_cache_size = 30
# cell_filter = "let script = output.type;script!=() && script.code_hash == \"0x00000000000000000000000000000000000000000000000000545950455f4944\""
# # The initial tip can be set higher than the current indexer tip as the starting height for indexing.
# init_tip_hash = "0x8fbd0ec887159d2814cee475911600e3589849670f5ee1ed9798b38fdeef4e44"
# By default, there is no limitation on the size of indexer request
# However, because serde json serialization consumes too much memory(10x),
# it may cause the physical machine to become unresponsive.
# We recommend a consumption limit of 2g, which is 400 as the limit,
# which is a safer approach
# request_limit = 400
#
# # CKB rich-indexer has its unique configuration.
# [indexer_v2.rich_indexer]
Expand Down
2 changes: 2 additions & 0 deletions rpc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -926,6 +926,7 @@ Response
"block_hash": null,
"block_number": null,
"status": "pending",
"tx_index": null,
"reason": null
}
}
Expand All @@ -946,6 +947,7 @@ The response looks like below when `verbosity` is 0.
"block_hash": null,
"block_number": null,
"status": "pending",
"tx_index": null,
"reason": null
}
}
Expand Down
4 changes: 4 additions & 0 deletions rpc/src/module/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,7 @@ pub trait ChainRpc {
/// "block_hash": null,
/// "block_number": null,
/// "status": "pending",
/// "tx_index": null,
/// "reason": null
/// }
/// }
Expand All @@ -645,6 +646,7 @@ pub trait ChainRpc {
/// "block_hash": null,
/// "block_number": null,
/// "status": "pending",
/// "tx_index": null,
/// "reason": null
/// }
/// }
Expand Down Expand Up @@ -2159,6 +2161,7 @@ impl ChainRpcImpl {
None,
tx_info.block_number,
tx_info.block_hash.unpack(),
tx_info.index as u32,
cycles,
None,
));
Expand Down Expand Up @@ -2207,6 +2210,7 @@ impl ChainRpcImpl {
Some(tx),
tx_info.block_number,
tx_info.block_hash.unpack(),
tx_info.index as u32,
cycles,
None,
));
Expand Down
8 changes: 7 additions & 1 deletion test/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,13 +148,19 @@ fn main() {
let mut test_results = Vec::new();
let mut worker_running = worker_count;
let mut done_specs = 0;
let mut started_sequential = false;
while worker_running > 0 {
if max_time > 0 && start_time.elapsed().as_secs() > max_time {
// shutdown, specs running to long
workers.shutdown();
break;
}

if worker_running == 1 && !started_sequential {
started_sequential = true;
workers.start_sequencial()
}

let msg = match notify_rx.recv_timeout(Duration::from_secs(5)) {
Ok(msg) => msg,
Err(err) => {
Expand Down Expand Up @@ -398,7 +404,6 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(BlockSyncNonAncestorBestBlocks),
Box::new(RequestUnverifiedBlocks),
Box::new(SyncTimeout),
Box::new(SyncChurn),
Box::new(SyncInvalid),
Box::new(GetBlockFilterCheckPoints),
Box::new(GetBlockFilterHashes),
Expand Down Expand Up @@ -591,6 +596,7 @@ fn all_specs() -> Vec<Box<dyn Spec>> {
Box::new(CheckVmVersion2),
Box::new(CheckVmBExtension),
Box::new(RandomlyKill),
Box::new(SyncChurn),
];
specs.shuffle(&mut thread_rng());
specs
Expand Down
9 changes: 6 additions & 3 deletions test/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,9 @@ impl Node {
status,
self.log_path().display()
);
info!("Last 200 lines of log:");
info!("Last 500 lines of log:");
self.print_last_500_lines_log(&self.log_path());
info!("End of last 200 lines of log");
info!("End of last 500 lines of log");
// parent process will exit
return;
}
Expand All @@ -693,6 +693,9 @@ impl Node {
error,
self.log_path().display()
);
info!("Last 500 lines of log:");
self.print_last_500_lines_log(&self.log_path());
info!("End of last 500 lines of log");
return;
}
}
Expand All @@ -708,7 +711,7 @@ impl Node {
self.set_node_id(node_info.node_id.as_str());
}

fn print_last_500_lines_log(&self, log_file: &Path) {
pub(crate) fn print_last_500_lines_log(&self, log_file: &Path) {
let file = File::open(log_file).expect("open log file");
let reader = BufReader::new(file);
let lines: Vec<String> = reader.lines().map(|line| line.unwrap()).collect();
Expand Down
7 changes: 7 additions & 0 deletions test/src/specs/sync/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ impl Spec for BlockSyncFromOne {

// NOTE: ENSURE node0 and nodes1 is in genesis state.
fn run(&self, nodes: &mut Vec<Node>) {
nodes
.iter()
.for_each(|node| node.wait_find_unverified_blocks_finished());
let node0 = &nodes[0];
let node1 = &nodes[1];
let (rpc_client0, rpc_client1) = (node0.rpc_client(), node1.rpc_client());
Expand Down Expand Up @@ -307,6 +310,10 @@ impl Spec for BlockSyncNonAncestorBestBlocks {
crate::setup!(num_nodes: 2);

fn run(&self, nodes: &mut Vec<Node>) {
nodes
.iter()
.for_each(|node| node.wait_find_unverified_blocks_finished());

let node0 = &nodes[0];
let node1 = &nodes[1];
out_ibd_mode(nodes);
Expand Down
3 changes: 3 additions & 0 deletions test/src/specs/sync/ibd_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ impl Spec for IBDProcess {
crate::setup!(num_nodes: 3);

fn run(&self, nodes: &mut Vec<Node>) {
nodes
.iter()
.for_each(|node| node.wait_find_unverified_blocks_finished());
info!("Running IBD process");

let node0 = &nodes[0];
Expand Down
17 changes: 13 additions & 4 deletions test/src/specs/sync/sync_churn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,27 @@ impl Spec for SyncChurn {

let (restart_stopped_tx, restart_stopped_rx) = mpsc::channel();

#[cfg(target_os = "linux")]
const NUM_MINED_BLOCKS: usize = 10000;
#[cfg(target_os = "linux")]
const NUM_RESTART: usize = 100;

#[cfg(not(target_os = "linux"))]
const NUM_MINED_BLOCKS: usize = 1000;
#[cfg(not(target_os = "linux"))]
const NUM_RESTART: usize = 20;

let mining_thread = thread::spawn(move || {
let mut rng = rand::thread_rng();
loop {
let mining_node = select_random_node(&mut rng, &mut mining_nodes);
mining_node.mine(1);
// Because the test that waiting for nodes to sync has a implicit maximum waiting time
// (currently 60 seconds, we can sync about 200 blocks per second, so a maxium blocks of 10000 is reasonable)
// (currently 60 seconds, we can sync about 200 blocks per second, so a maximum blocks of 10000 is reasonable)
// and the implicit waiting time is not long enough when there are too many blocks to sync,
// so we stop mining when the tip block number is greater than 15000.
// Otherwise nodes may not be able to sync within the implicit waiting time.
let too_many_blocks = mining_node.get_tip_block_number() > 10000;
let too_many_blocks = mining_node.get_tip_block_number() > NUM_MINED_BLOCKS as u64;
if too_many_blocks || restart_stopped_rx.try_recv().is_ok() {
break;
}
Expand All @@ -57,8 +67,7 @@ impl Spec for SyncChurn {
let restart_thread = thread::spawn(move || {
let mut rng = rand::thread_rng();
// It takes about 1 second to restart a node. So restarting nodes 100 times takes about 100 seconds.
let num_restarts = 100;
for _ in 0..num_restarts {
for _ in 0..NUM_RESTART {
let node = select_random_node(&mut rng, &mut churn_nodes);
info!("Restarting node {}", node.node_id());
node.stop();
Expand Down
5 changes: 5 additions & 0 deletions test/src/specs/tx_pool/send_large_cycles_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ impl Spec for SendLargeCyclesTxToRelay {
.transaction
.is_some()
});
if !result {
info!("node0 last 500 log begin");
node0.print_last_500_lines_log(&node0.log_path());
info!("node0 last 500 log end");
}
assert!(result, "Node0 should accept tx");
}

Expand Down
85 changes: 76 additions & 9 deletions test/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::time::Instant;
#[derive(PartialEq, Eq)]
pub enum Command {
Shutdown,
StartSequencial,
}

/// Notify from worker
Expand Down Expand Up @@ -45,6 +46,9 @@ pub struct Worker {
inbox: Receiver<Command>,
outbox: Sender<Notify>,
start_port: Arc<AtomicU16>,

sequencial_tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
sequencial_worker: bool,
}

impl Clone for Worker {
Expand All @@ -54,13 +58,18 @@ impl Clone for Worker {
inbox: self.inbox.clone(),
outbox: self.outbox.clone(),
start_port: Arc::clone(&self.start_port),
sequencial_tasks: Arc::clone(&self.sequencial_tasks),
sequencial_worker: self.sequencial_worker,
}
}
}

const SEQUENCIAL_TASKS: &[&str] = &["RandomlyKill", "SyncChurn"];

impl Worker {
pub fn new(
tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
sequencial_tasks: Arc<Mutex<Vec<Box<dyn Spec>>>>,
inbox: Receiver<Command>,
outbox: Sender<Notify>,
start_port: Arc<AtomicU16>,
Expand All @@ -70,12 +79,16 @@ impl Worker {
inbox,
outbox,
start_port,
sequencial_tasks,
sequencial_worker: false,
}
}

/// start handle tasks
pub fn start(self) -> JoinHandle<()> {
thread::spawn(move || {
let mut start_sequencial_task = false;

loop {
let msg = match self.inbox.try_recv() {
Ok(msg) => Some(msg),
Expand All @@ -88,20 +101,54 @@ impl Worker {
}
};
// check command
if Some(Command::Shutdown) == msg {
self.outbox.send(Notify::Stop).unwrap();
return;
match msg {
Some(Command::StartSequencial) => {
start_sequencial_task = true;
}
Some(Command::Shutdown) => {
self.outbox.send(Notify::Stop).unwrap();
return;
}
_ => {}
}

// pick a spec to run
let spec = match self.tasks.lock().pop() {
Some(spec) => spec,

let task = self.tasks.lock().pop();
match task {
Some(spec) => {
// if spec.name() is RandomlyKill or SyncChurn, then push it to sequencial_tasks
if SEQUENCIAL_TASKS.contains(&spec.name()) {
info!("push {} to sequencial_tasks", spec.name());
self.sequencial_tasks.lock().push(spec);
} else {
self.run_spec(spec.as_ref(), 0);
}
}
None => {
self.outbox.send(Notify::Stop).unwrap();
return;
if self.sequencial_worker {
info!("sequencial worker is waiting for command");
if start_sequencial_task {
match self.sequencial_tasks.lock().pop() {
Some(spec) => {
self.run_spec(spec.as_ref(), 0);
}
None => {
info!("sequencial worker has no task to run");
self.outbox.send(Notify::Stop).unwrap();
return;
}
};
} else {
info!("sequencial worker is waiting for parallel workers finish");
std::thread::sleep(std::time::Duration::from_secs(1));
}
} else {
self.outbox.send(Notify::Stop).unwrap();
return;
}
}
};

self.run_spec(spec.as_ref(), 0);
}
})
}
Expand Down Expand Up @@ -176,13 +223,17 @@ impl Workers {
start_port: u16,
) -> Self {
let start_port = Arc::new(AtomicU16::new(start_port));

let sequencial_tasks = Arc::new(Mutex::new(Vec::new()));
let workers: Vec<_> = (0..count)
.map({
let tasks = Arc::clone(&tasks);
let sequencial_tasks = Arc::clone(&sequencial_tasks);
move |_| {
let (command_tx, command_rx) = unbounded();
let worker = Worker::new(
Arc::clone(&tasks),
Arc::clone(&sequencial_tasks),
command_rx,
outbox.clone(),
Arc::clone(&start_port),
Expand All @@ -200,6 +251,8 @@ impl Workers {

/// start all workers
pub fn start(&mut self) {
self.workers.first_mut().unwrap().1.sequencial_worker = true;

let mut join_handles = Vec::new();
for w in self.workers.iter_mut() {
let h = w.1.clone().start();
Expand All @@ -208,6 +261,20 @@ impl Workers {
self.join_handles.replace(join_handles);
}

pub fn start_sequencial(&mut self) {
if let Err(err) = self
.workers
.first()
.unwrap()
.0
.send(Command::StartSequencial)
{
error!("start sequencial worker failed, error: {}", err);
} else {
info!("start sequencial worker success")
}
}

/// shutdown all workers, must call join_all after this.
pub fn shutdown(&mut self) {
if self.is_shutdown {
Expand Down
Loading

0 comments on commit 2c9c5d1

Please sign in to comment.