From c2f8f8c161e75904c13ec17b5663f37378a0796b Mon Sep 17 00:00:00 2001 From: Eval EXEC Date: Sat, 14 Dec 2024 13:34:30 +0800 Subject: [PATCH] wip Signed-off-by: Eval EXEC --- sync/src/synchronizer/get_headers_process.rs | 26 +++++++++++++++---- sync/src/synchronizer/headers_process.rs | 27 ++++++++++++++------ sync/src/synchronizer/mod.rs | 7 ++++- sync/src/types/mod.rs | 15 ++++++++--- 4 files changed, 57 insertions(+), 18 deletions(-) diff --git a/sync/src/synchronizer/get_headers_process.rs b/sync/src/synchronizer/get_headers_process.rs index 3445f55c072..d7565c7a5f2 100644 --- a/sync/src/synchronizer/get_headers_process.rs +++ b/sync/src/synchronizer/get_headers_process.rs @@ -74,17 +74,33 @@ impl<'a> GetHeadersProcess<'a> { ); self.synchronizer.peers().getheaders_received(self.peer); - let headers_vec: Vec> = - active_chain.get_locator_responses(block_number, &hash_stop); - // response headers - debug!("headers len={}", headers_vec.len()); - for headers in headers_vec { + let hash_size: packed::Uint32 = 20_u32.pack(); + let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes()); + if hash_stop.eq(&length_20_for_test) { + let headers: Vec = + active_chain.get_locator_response(block_number, &hash_stop); + // response headers + + debug!("headers len={}", headers.len()); let content = packed::SendHeaders::new_builder() .headers(headers.into_iter().map(|x| x.data()).pack()) .build(); let message = packed::SyncMessage::new_builder().set(content).build(); attempt!(send_message_to(self.nc, self.peer, &message)); + } else { + let headers_vec: Vec> = + active_chain.get_locator_responses(block_number, &hash_stop); + // response headers + + debug!("headers vec len={}", headers_vec.len()); + for headers in headers_vec { + let content = packed::SendHeaders::new_builder() + .headers(headers.into_iter().map(|x| x.data()).pack()) + .build(); + let message = packed::SyncMessage::new_builder().set(content).build(); + attempt!(send_message_to(self.nc, self.peer, &message)); + } } } else { return StatusCode::GetHeadersMissCommonAncestors diff --git a/sync/src/synchronizer/headers_process.rs b/sync/src/synchronizer/headers_process.rs index 309244da8f5..b16182cc50b 100644 --- a/sync/src/synchronizer/headers_process.rs +++ b/sync/src/synchronizer/headers_process.rs @@ -52,12 +52,13 @@ impl<'a> HeadersProcess<'a> { true } - fn is_parent_exists(&self, first_header: &core:HeaderView) -> bool { + fn is_parent_exists(&self, first_header: &core::HeaderView) -> bool { let shared: &SyncShared = self.synchronizer.shared(); - shared.get_header_fields(first_header.parent_hash).is_some() + shared + .get_header_fields(&first_header.parent_hash()) + .is_some() } - pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult { let shared: &SyncShared = self.synchronizer.shared(); let verifier = HeaderVerifier::new(shared, shared.consensus()); @@ -98,8 +99,6 @@ impl<'a> HeadersProcess<'a> { pub fn execute(self) -> Status { debug!("HeadersProcess begins"); - let shared: &SyncShared = self.synchronizer.shared(); - let consensus = shared.consensus(); let headers = self .message .headers() @@ -107,6 +106,12 @@ impl<'a> HeadersProcess<'a> { .into_iter() .map(packed::Header::into_view) .collect::>(); + self.execute_inner(headers) + } + + fn execute_inner(self, headers: Vec) -> Status { + let shared: &SyncShared = self.synchronizer.shared(); + let consensus = shared.consensus(); if headers.len() > MAX_HEADERS_LEN { warn!("HeadersProcess is oversized"); @@ -136,7 +141,9 @@ impl<'a> HeadersProcess<'a> { if !self.is_parent_exists(&headers[0]) { // put the headers into a memory cache - self.synchronizer.header_cache.insert(headers[0].parent_hash, headers); + self.synchronizer + .header_cache + .insert(headers[0].parent_hash(), headers); // verify them later return Status::ok(); } @@ -226,8 +233,12 @@ impl<'a> HeadersProcess<'a> { { // these headers verify success // may the headers's tail header_hash exist in headers_cahce? - if let Some(headers) = self.synchronizer.headers_cache.get(headers.last().expect("last header must exist").hash){ - HeadersProcess::new().execute(); + if let Some(headers) = self + .synchronizer + .header_cache + .get(headers.last().expect("last header must exist").hash()) + { + return self.execute_inner(headers); } } diff --git a/sync/src/synchronizer/mod.rs b/sync/src/synchronizer/mod.rs index c716b9f1d05..0f07f056713 100644 --- a/sync/src/synchronizer/mod.rs +++ b/sync/src/synchronizer/mod.rs @@ -44,11 +44,14 @@ use ckb_systemtime::unix_time_as_millis; #[cfg(test)] use ckb_types::core; +use ckb_types::core::HeaderView; +use ckb_types::packed::Header; use ckb_types::{ core::BlockNumber, packed::{self, Byte32}, prelude::*, }; +use std::collections::HashMap; use std::{ collections::HashSet, sync::{atomic::Ordering, Arc}, @@ -303,7 +306,7 @@ pub struct Synchronizer { pub shared: Arc, // First Headers's parent_hash -> Headers - pub(crate) header_cache: HashMap>, + pub(crate) header_cache: HashMap>, fetch_channel: Option>, } @@ -312,10 +315,12 @@ impl Synchronizer { /// /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it pub fn new(chain: ChainController, shared: Arc) -> Synchronizer { + let header_cache = HashMap::new(); Synchronizer { chain, shared, fetch_channel: None, + header_cache, } } diff --git a/sync/src/types/mod.rs b/sync/src/types/mod.rs index 7c520d0ca22..741150b48e0 100644 --- a/sync/src/types/mod.rs +++ b/sync/src/types/mod.rs @@ -1914,9 +1914,14 @@ impl ActiveChain { block_number: BlockNumber, hash_stop: &Byte32, ) -> Vec> { - (0..32).iter().map(|index| { - get_locator_response(block_number + (i * MAX_HEADERS_LEN), &Byte32::default()) - }).collect(); + (0..32) + .map(|index| { + self.get_locator_response( + block_number + (index as u64 * MAX_HEADERS_LEN as u64), + &Byte32::default(), + ) + }) + .collect() } pub fn send_getheaders_to_peer( @@ -1955,9 +1960,11 @@ impl ActiveChain { block_number_and_hash.hash() ); let locator_hash = self.get_locator(block_number_and_hash); + let hash_size: packed::Uint32 = 20_u32.pack(); + let length_20_for_test = packed::Byte32::new_unchecked(hash_size.as_bytes()); let content = packed::GetHeaders::new_builder() .block_locator_hashes(locator_hash.pack()) - .hash_stop(packed::Byte32::zero()) + .hash_stop(length_20_for_test) .build(); let message = packed::SyncMessage::new_builder().set(content).build(); let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);