Skip to content

Commit

Permalink
Merge pull request nervosnetwork#4333 from eval-exec/exec/relayer-han…
Browse files Browse the repository at this point in the history
…dle-accept_block_error

Modify the method `Relayer::accept_block` to return `StatusCode::BlockIsInvalid` when `shared.insert_new_block()` produces an error.
  • Loading branch information
zhangsoledad authored Feb 1, 2024
2 parents 73acfed + 740f2f3 commit dfa4f37
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 116 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ ckb-chain = { path = "../chain", version = "= 0.114.0-pre", features = ["mock"]
faux = "^0.1"
once_cell = "1.8.0"
ckb-systemtime = { path = "../util/systemtime", version = "= 0.114.0-pre" , features = ["enable_faketime"]}
ckb-proposal-table = { path = "../util/proposal-table", version = "= 0.114.0-pre" }

[features]
default = []
Expand Down
5 changes: 3 additions & 2 deletions sync/src/relayer/block_transactions_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ impl<'a> BlockTransactionsProcess<'a> {
match ret {
ReconstructionResult::Block(block) => {
pending.remove();
self.relayer
let status = self
.relayer
.accept_block(self.nc.as_ref(), self.peer, block);
return Status::ok();
return status;
}
ReconstructionResult::Missing(transactions, uncles) => {
// We need to get all transactions and uncles that do not exist locally
Expand Down
5 changes: 3 additions & 2 deletions sync/src/relayer/compact_block_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,16 @@ impl<'a> CompactBlockProcess<'a> {
>= block.epoch().number()
});
shrink_to_fit!(pending_compact_blocks, 20);
self.relayer
let status = self
.relayer
.accept_block(self.nc.as_ref(), self.peer, block);

if let Some(metrics) = ckb_metrics::handle() {
metrics
.ckb_relay_cb_verify_duration
.observe(instant.elapsed().as_secs_f64());
}
Status::ok()
status
}
ReconstructionResult::Missing(transactions, uncles) => {
let missing_transactions: Vec<u32> =
Expand Down
166 changes: 95 additions & 71 deletions sync/src/relayer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ use self::transaction_hashes_process::TransactionHashesProcess;
use self::transactions_process::TransactionsProcess;
use crate::block_status::BlockStatus;
use crate::types::{ActiveChain, BlockNumberAndHash, SyncShared};
use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
use crate::utils::{
is_internal_db_error, metric_ckb_message_bytes, send_message_to, MetricDirection,
};
use crate::{Status, StatusCode};
use ckb_chain::chain::ChainController;
use ckb_constant::sync::BAD_MESSAGE_BAN_TIME;
Expand Down Expand Up @@ -284,99 +286,121 @@ impl Relayer {
nc: &dyn CKBProtocolContext,
peer: PeerIndex,
block: core::BlockView,
) {
) -> Status {
if self
.shared()
.active_chain()
.contains_block_status(&block.hash(), BlockStatus::BLOCK_STORED)
{
return;
return Status::ok();
}

let boxed = Arc::new(block);
if self
let boxed: Arc<BlockView> = Arc::new(block);
match self
.shared()
.insert_new_block(&self.chain, Arc::clone(&boxed))
.unwrap_or(false)
{
Ok(true) => self.broadcast_compact_block(nc, peer, &boxed),
Ok(false) => debug_target!(
crate::LOG_TARGET_RELAY,
"Relayer accept_block received an uncle block, don't broadcast compact block"
),
Err(err) => {
if !is_internal_db_error(&err) {
return StatusCode::BlockIsInvalid.with_context(format!(
"{}, error: {}",
boxed.hash(),
err,
));
}
}
}
Status::ok()
}

fn broadcast_compact_block(
&self,
nc: &dyn CKBProtocolContext,
peer: PeerIndex,
boxed: &Arc<BlockView>,
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"[block_relay] relayer accept_block {} {}",
boxed.header().hash(),
unix_time_as_millis()
);
let block_hash = boxed.hash();
self.shared().state().remove_header_view(&block_hash);
let cb = packed::CompactBlock::build_from_block(boxed, &HashSet::new());
let message = packed::RelayMessage::new_builder().set(cb).build();

let selected_peers: Vec<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|target_peer| peer != *target_peer)
.take(MAX_RELAY_PEERS)
.collect();
if let Err(err) = nc.quick_filter_broadcast(
TargetSession::Multi(Box::new(selected_peers.into_iter())),
message.as_bytes(),
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"[block_relay] relayer accept_block {} {}",
boxed.header().hash(),
unix_time_as_millis()
"relayer send block when accept block error: {:?}",
err,
);
let block_hash = boxed.hash();
self.shared().state().remove_header_view(&block_hash);
let cb = packed::CompactBlock::build_from_block(&boxed, &HashSet::new());
let message = packed::RelayMessage::new_builder().set(cb).build();
}

if let Some(p2p_control) = nc.p2p_control() {
let snapshot = self.shared.shared().snapshot();
let parent_chain_root = {
let mmr = snapshot.chain_root_mmr(boxed.header().number() - 1);
match mmr.get_root() {
Ok(root) => root,
Err(err) => {
error_target!(
crate::LOG_TARGET_RELAY,
"Generate last state to light client failed: {:?}",
err
);
return;
}
}
};

let selected_peers: Vec<PeerIndex> = nc
let tip_header = packed::VerifiableHeader::new_builder()
.header(boxed.header().data())
.uncles_hash(boxed.calc_uncles_hash())
.extension(Pack::pack(&boxed.extension()))
.parent_chain_root(parent_chain_root)
.build();
let light_client_message = {
let content = packed::SendLastState::new_builder()
.last_header(tip_header)
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
};
let light_client_peers: HashSet<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter(|target_peer| peer != *target_peer)
.take(MAX_RELAY_PEERS)
.filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
.filter(|(_id, peer)| peer.if_lightclient_subscribed)
.map(|(id, _)| id)
.collect();
if let Err(err) = nc.quick_filter_broadcast(
TargetSession::Multi(Box::new(selected_peers.into_iter())),
message.as_bytes(),
if let Err(err) = p2p_control.filter_broadcast(
TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
SupportProtocols::LightClient.protocol_id(),
light_client_message.as_bytes(),
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"relayer send block when accept block error: {:?}",
"relayer send last state to light client when accept block, error: {:?}",
err,
);
}

if let Some(p2p_control) = nc.p2p_control() {
let snapshot = self.shared.shared().snapshot();
let parent_chain_root = {
let mmr = snapshot.chain_root_mmr(boxed.header().number() - 1);
match mmr.get_root() {
Ok(root) => root,
Err(err) => {
error_target!(
crate::LOG_TARGET_RELAY,
"Generate last state to light client failed: {:?}",
err
);
return;
}
}
};

let tip_header = packed::VerifiableHeader::new_builder()
.header(boxed.header().data())
.uncles_hash(boxed.calc_uncles_hash())
.extension(Pack::pack(&boxed.extension()))
.parent_chain_root(parent_chain_root)
.build();
let light_client_message = {
let content = packed::SendLastState::new_builder()
.last_header(tip_header)
.build();
packed::LightClientMessage::new_builder()
.set(content)
.build()
};
let light_client_peers: HashSet<PeerIndex> = nc
.connected_peers()
.into_iter()
.filter_map(|index| nc.get_peer(index).map(|peer| (index, peer)))
.filter(|(_id, peer)| peer.if_lightclient_subscribed)
.map(|(id, _)| id)
.collect();
if let Err(err) = p2p_control.filter_broadcast(
TargetSession::Filter(Box::new(move |id| light_client_peers.contains(id))),
SupportProtocols::LightClient.protocol_id(),
light_client_message.as_bytes(),
) {
debug_target!(
crate::LOG_TARGET_RELAY,
"relayer send last state to light client when accept block, error: {:?}",
err,
);
}
}
}
}

Expand Down
78 changes: 46 additions & 32 deletions sync/src/relayer/tests/compact_block_process.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use crate::block_status::BlockStatus;
use crate::relayer::compact_block_process::CompactBlockProcess;
use crate::relayer::tests::helper::{build_chain, new_header_builder, MockProtocolContext};
use crate::relayer::tests::helper::{
build_chain, gen_block, new_header_builder, MockProtocolContext,
};
use crate::{Status, StatusCode};
use ckb_chain::chain::ChainService;
use ckb_network::{PeerIndex, SupportProtocols};
use ckb_store::ChainStore;
use ckb_systemtime::unix_time_as_millis;
use ckb_tx_pool::{PlugTarget, TxEntry};
use ckb_types::core::{BlockView, HeaderView};
use ckb_types::prelude::*;
use ckb_types::{
bytes::Bytes,
Expand Down Expand Up @@ -152,25 +156,36 @@ fn test_unknow_parent() {
#[test]
fn test_accept_not_a_better_block() {
let (relayer, _) = build_chain(5);
let header = {
let tip_header = {
let active_chain = relayer.shared.active_chain();
active_chain.tip_header()
};
let second_to_last_header: HeaderView = {
let tip_header: HeaderView = relayer.shared().store().get_tip_header().unwrap();
let second_to_last_header = relayer
.shared()
.store()
.get_block_header(&tip_header.data().raw().parent_hash())
.unwrap();
second_to_last_header
};

// The timestamp is random, so it may be not a better block.
let not_sure_a_better_header = header
.as_advanced_builder()
.timestamp((header.timestamp() + 1).pack())
.build();

let block = BlockBuilder::default()
.header(not_sure_a_better_header)
.transaction(TransactionBuilder::default().build())
.build();
let uncle_block: BlockView = gen_block(
&second_to_last_header,
relayer.shared().shared(),
1,
1,
None,
);
//uncle_block's block_hash must not equal to tip's block_hash
assert_ne!(uncle_block.header().hash(), tip_header.hash());
// uncle_block's difficulty must less than tip's difficulty
assert!(uncle_block.difficulty().lt(&tip_header.difficulty()));

let mut prefilled_transactions_indexes = HashSet::new();
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);
let compact_block =
CompactBlock::build_from_block(&uncle_block, &prefilled_transactions_indexes);

let mock_protocol_context = MockProtocolContext::new(SupportProtocols::RelayV2);
let nc = Arc::new(mock_protocol_context);
Expand All @@ -182,7 +197,7 @@ fn test_accept_not_a_better_block() {
Arc::<MockProtocolContext>::clone(&nc),
peer_index,
);
assert_eq!(compact_block_process.execute(), Status::ok(),);
assert_eq!(compact_block_process.execute(), Status::ok());
}

#[test]
Expand Down Expand Up @@ -323,18 +338,15 @@ fn test_accept_block() {
active_chain.tip_header()
};

let header = new_header_builder(relayer.shared.shared(), &parent).build();

let uncle = BlockBuilder::default().build();
let ext = packed::BlockExtBuilder::default()
.verified(Some(true).pack())
.build();
let uncle = gen_block(&parent, relayer.shared().shared(), 0, 1, None);

let block = BlockBuilder::default()
.header(header)
.transaction(TransactionBuilder::default().build())
.uncle(uncle.as_uncle())
.build();
let block = gen_block(
&parent,
relayer.shared().shared(),
0,
1,
Some(uncle.as_uncle()),
);

let mock_block_1 = BlockBuilder::default()
.number(4.pack())
Expand Down Expand Up @@ -365,16 +377,18 @@ fn test_accept_block() {
);
}

let uncle_hash = uncle.hash();
{
let db_txn = relayer.shared().shared().store().begin_transaction();
db_txn.insert_block(&uncle).unwrap();
db_txn.attach_block(&uncle).unwrap();
db_txn.insert_block_ext(&uncle_hash, &ext.unpack()).unwrap();
db_txn.commit().unwrap();
let chain_controller = {
let proposal_window = ckb_proposal_table::ProposalTable::new(
relayer.shared().shared().consensus().tx_proposal_window(),
);
let chain_service =
ChainService::new(relayer.shared().shared().to_owned(), proposal_window);
chain_service.start::<&str>(None)
};
chain_controller.process_block(Arc::new(uncle)).unwrap();
}

relayer.shared().shared().refresh_snapshot();
let mut prefilled_transactions_indexes = HashSet::new();
prefilled_transactions_indexes.insert(0);
let compact_block = CompactBlock::build_from_block(&block, &prefilled_transactions_indexes);
Expand Down
Loading

0 comments on commit dfa4f37

Please sign in to comment.