diff --git a/sync/src/synchronizer/get_headers_process.rs b/sync/src/synchronizer/get_headers_process.rs index 3445f55c072..d06e27afc5f 100644 --- a/sync/src/synchronizer/get_headers_process.rs +++ b/sync/src/synchronizer/get_headers_process.rs @@ -74,17 +74,33 @@ impl<'a> GetHeadersProcess<'a> { ); self.synchronizer.peers().getheaders_received(self.peer); - let headers_vec: Vec> = - active_chain.get_locator_responses(block_number, &hash_stop); - // response headers - debug!("headers len={}", headers_vec.len()); - for headers in headers_vec { + let length_20_for_test = + packed::Byte32::new_unchecked(packed::Uint32::from(20).as_bytes()); + if hash_stop.eq(length_20_for_test) { + let headers: Vec = + active_chain.get_locator_response(block_number, &hash_stop); + // response headers + + debug!("headers len={}", headers_vec.len()); let content = packed::SendHeaders::new_builder() .headers(headers.into_iter().map(|x| x.data()).pack()) .build(); let message = packed::SyncMessage::new_builder().set(content).build(); attempt!(send_message_to(self.nc, self.peer, &message)); + } else { + let headers_vec: Vec> = + active_chain.get_locator_responses(block_number, &hash_stop); + // response headers + + debug!("headers vec len={}", headers_vec.len()); + for headers in headers_vec { + let content = packed::SendHeaders::new_builder() + .headers(headers.into_iter().map(|x| x.data()).pack()) + .build(); + let message = packed::SyncMessage::new_builder().set(content).build(); + attempt!(send_message_to(self.nc, self.peer, &message)); + } } } else { return StatusCode::GetHeadersMissCommonAncestors diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 309244da8f5..29d907f9b62 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -54,7 +54,7 @@ impl<'a> HeadersProcess<'a> { fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool { let shared: &SyncShared = self.synchronizer.shared(); - shared.get_header_fields(first_header.parent_hash).is_some() + shared.get_header_fields(first_header.parent_hash()).is_some() } @@ -98,8 +98,6 @@ impl<'a> HeadersProcess<'a> { pub fn execute(self) -> Status { debug!("HeadersProcess begins"); - let shared: &SyncShared = self.synchronizer.shared(); - let consensus = shared.consensus(); let headers = self .message .headers() @@ -107,6 +105,12 @@ impl<'a> HeadersProcess<'a> { .into_iter() .map(packed::Header::into_view) .collect::>(); + self.execute_inner(headers) + } + + fn execute_inner(self, headers: Vec) -> Status { + let shared: &SyncShared = self.synchronizer.shared(); + let consensus = shared.consensus(); if headers.len() > MAX_HEADERS_LEN { warn!("HeadersProcess is oversized"); @@ -136,7 +140,7 @@ impl<'a> HeadersProcess<'a> { if !self.is_parent_exists(&headers[0]) { // put the headers into a memory cache - self.synchronizer.header_cache.insert(headers[0].parent_hash, headers); + self.synchronizer.header_cache.insert(headers[0].parent_hash(), headers); // verify them later return Status::ok(); } @@ -226,8 +230,8 @@ impl<'a> HeadersProcess<'a> { { // these headers verify success // may the headers's tail header_hash exist in headers_cahce? - if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){ - HeadersProcess::new().execute(); + if let Some(headers) = self.synchronizer.header_cache.get(headers.last().expect("last header must exist").hash){ + return self.execute_inner(headers); } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index c716b9f1d05..b03a0c75e27 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -44,11 +44,13 @@ use ckb_systemtime::unix_time_as_millis; #[cfg(test)] use ckb_types::core; +use ckb_types::packed::Header; use ckb_types::{ core::BlockNumber, packed::{self, Byte32}, prelude::*, }; +use std::collections::HashMap; use std::{ collections::HashSet, sync::{atomic::Ordering, Arc}, @@ -312,10 +314,12 @@ impl Synchronizer { /// /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it pub fn new(chain: ChainController, shared: Arc) -> Synchronizer { + let header_cache = HashMap::new(); Synchronizer { chain, shared, fetch_channel: None, + header_cache, } } diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 7c520d0ca22..0b3c9289312 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1914,9 +1914,12 @@ impl ActiveChain { block_number: BlockNumber, hash_stop: &Byte32, ) -> Vec> { - (0..32).iter().map(|index| { - get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default()) - }).collect(); + (0..32) + .iter() + .map(|index| { + get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default()) + }) + .collect(); } pub fn send_getheaders_to_peer( @@ -1955,9 +1958,10 @@ impl ActiveChain { block_number_and_hash.hash() ); let locator_hash = self.get_locator(block_number_and_hash); + let length_20_for_test = packed::Byte32::new_unchecked(packed::Uint32::from(20).as_bytes()); let content = packed::GetHeaders::new_builder() .block_locator_hashes(locator_hash.pack()) - .hash_stop(packed::Byte32::zero()) + .hash_stop(length_20_for_test) .build(); let message = packed::SyncMessage::new_builder().set(content).build(); let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);