Skip to content

Commit

Permalink
GH-1101 Add previous to block_notice_message which allows for immedia…
Browse files Browse the repository at this point in the history
…te determination if a block request is needed. Also include in the block request_message the peer head. The peer head allows the node to determine if on a fork and send from LIB instead. Also create block_on_fork function to remove duplicate code.
  • Loading branch information
heifner committed Jan 25, 2025
1 parent a2d6f9e commit ad712db
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 61 deletions.
5 changes: 2 additions & 3 deletions plugins/net_plugin/include/eosio/net_plugin/protocol.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ namespace eosio {
request_message() : req_trx(), req_blocks() {}
ordered_txn_ids req_trx;
ordered_blk_ids req_blocks;

bool operator==(const request_message&) const noexcept = default;
};

struct sync_request_message {
Expand All @@ -142,6 +140,7 @@ namespace eosio {
};

struct block_notice_message {
block_id_type previous;
block_id_type id;
};

Expand Down Expand Up @@ -176,7 +175,7 @@ FC_REFLECT( eosio::notice_message, (known_trx)(known_blocks) )
FC_REFLECT( eosio::request_message, (req_trx)(req_blocks) )
FC_REFLECT( eosio::sync_request_message, (start_block)(end_block) )
FC_REFLECT( eosio::block_nack_message, (id) )
FC_REFLECT( eosio::block_notice_message, (id) )
FC_REFLECT( eosio::block_notice_message, (previous)(id) )

/**
*
Expand Down
141 changes: 83 additions & 58 deletions plugins/net_plugin/net_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,7 @@ namespace eosio {
static constexpr uint16_t consecutive_block_nacks_threshold{2}; // stop sending blocks when reached
block_num_type consecutive_blocks_nacks{0};
block_id_type last_block_nack;
block_id_type last_block_notice;
request_message last_request_message GUARDED_BY(conn_mtx);
block_id_type last_block_nack_request_message_id GUARDED_BY(conn_mtx);

connection_status get_status()const;

Expand Down Expand Up @@ -1010,6 +1009,7 @@ namespace eosio {
/** @} */

void blk_send_branch( const block_id_type& msg_head_id );
void blk_send_branch_from_nack_request( const block_id_type& msg_head_id, const block_id_type& req_id );
void blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num );

void enqueue( const net_message& msg );
Expand Down Expand Up @@ -1210,6 +1210,24 @@ namespace eosio {

//---------------------------------------------------------------------------

struct on_fork_t {
bool on_fork = true;
bool unknown = true;
};
on_fork_t block_on_fork(const block_id_type& id) { // thread safe
auto id_num = block_header::num_from_id(id);
bool on_fork = false;
bool unknown_block = true;
try {
const controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> my_id = cc.fork_block_id_for_num( id_num ); // thread-safe
unknown_block = !my_id;
on_fork = my_id != id;
} catch( ... ) {
}
return { on_fork, unknown_block };
}

connection::connection( const string& endpoint, const string& this_address )
: peer_addr( endpoint ),
strand( boost::asio::make_strand(my_impl->thread_pool.get_executor()) ),
Expand Down Expand Up @@ -1441,8 +1459,7 @@ namespace eosio {
last_handshake_sent = handshake_message();
last_close = fc::time_point::now();
conn_node_id = fc::sha256();
last_request_message.req_blocks.mode = none;
last_request_message.req_blocks.ids.clear();
last_block_nack_request_message_id = block_id_type{};
}
peer_fork_db_root_num = 0;
peer_ping_time_ns = std::numeric_limits<decltype(peer_ping_time_ns)::value_type>::max();
Expand All @@ -1461,7 +1478,6 @@ namespace eosio {
last_vote_received = time_point{};
consecutive_blocks_nacks = 0;
last_block_nack = block_id_type{};
last_block_notice = block_id_type{};

uint32_t head_num = my_impl->get_chain_head_num();
if (last_received_block_num >= head_num) {
Expand Down Expand Up @@ -1499,18 +1515,12 @@ namespace eosio {
if( fork_db_root_num == 0 ) return; // if fork_db_root_id is null (we have not received handshake or reset)

auto msg_head_num = block_header::num_from_id(msg_head_id);
bool on_fork = msg_head_num == 0;
bool unknown_block = false;
if( !on_fork ) {
try {
const controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> my_id = cc.fork_block_id_for_num( msg_head_num ); // thread-safe
unknown_block = !my_id;
on_fork = my_id != msg_head_id;
} catch( ... ) {
unknown_block = true;
}
if (msg_head_num == 0) {
blk_send_branch( msg_head_num, fork_db_root_num, head_num );
return;
}

auto [on_fork, unknown_block] = block_on_fork(msg_head_id);
if( unknown_block ) {
peer_ilog( this, "Peer asked for unknown block ${mn}, sending: benign_other go away", ("mn", msg_head_num) );
no_retry = benign_other;
Expand All @@ -1523,6 +1533,23 @@ namespace eosio {
}
}

// called from connection strand
void connection::blk_send_branch_from_nack_request( const block_id_type& msg_head_id, const block_id_type& req_id ) {
auto [on_fork, unknown_block] = block_on_fork(msg_head_id);
uint32_t head_num = my_impl->get_chain_head_num();
// peer head might be unknown if our LIB has moved past it, so if unknown then just send the requested block
if (on_fork) { // send from lib if we know they are on a fork
// a more complicated better approach would be to find where the fork branches and send from there, for now use lib
uint32_t fork_db_root_num = my_impl->get_fork_db_root_num();
// --fork_db_root_num since blk_send_branch adds one to the request, and we want to start at fork_db_root_num
blk_send_branch( --fork_db_root_num, 0, head_num);
} else {
auto msg_req_num = block_header::num_from_id(req_id);
// --msg_req_num since blk_send_branch adds one to the request, and we need to start at msg_req_num
blk_send_branch( --msg_req_num, 0, head_num );
}
}

// called from connection strand
void connection::blk_send_branch( uint32_t msg_head_num, uint32_t fork_db_root_num, uint32_t head_num ) {
if( !peer_requested ) {
Expand Down Expand Up @@ -2406,11 +2433,9 @@ namespace eosio {
}
c->peer_syncing_from_us = false;
try {
controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> fork_db_head_id = cc.fork_block_id_for_num( msg.fork_db_head_num ); // thread-safe
if (fork_db_head_id && fork_db_head_id != msg.fork_db_head_id) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log
peer_dlog(c, "Sending catch_up request_message sync 4, fhead ${fh} != msg.fhead ${mfh}",
("fh", *fork_db_head_id)("mfh", msg.fork_db_head_id));
auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_head_id); // thread safe
if (on_fork) { // possible for fork_db_root to move and fork_db_head_num not be found if running with no block-log
peer_dlog(c, "Sending catch_up request_message sync 4, msg.fhead ${mfh} on fork", ("mfh", msg.fork_db_head_id));
request_message req;
req.req_blocks.mode = catch_up;
req.req_trx.mode = none;
Expand Down Expand Up @@ -2751,9 +2776,9 @@ namespace eosio {
if(my_impl->sync_master->syncing_from_peer() ) return;

block_buffer_factory buff_factory;
buffer_factory block_id_buff_factory;
buffer_factory block_notice_buff_factory;
const auto bnum = b->block_num();
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory, &block_id_buff_factory]( auto& cp ) {
my_impl->connections.for_each_block_connection( [this, &id, &bnum, &b, &buff_factory, &block_notice_buff_factory]( auto& cp ) {
fc_dlog( logger, "socket_is_open ${s}, state ${c}, syncing ${ss}, connection - ${cid}",
("s", cp->socket_is_open())("c", connection::state_str(cp->state()))("ss", cp->peer_syncing_from_us.load())("cid", cp->connection_id) );
if( !cp->current() ) return;
Expand All @@ -2767,7 +2792,7 @@ namespace eosio {
if (cp->consecutive_blocks_nacks > connection::consecutive_block_nacks_threshold) {
// only send block_notice if we didn't produce the block, otherwise broadcast the block below
if (!my_impl->is_producer(b->producer)) {
auto send_buffer = block_id_buff_factory.get_send_buffer( block_notice_message{id} );
auto send_buffer = block_notice_buff_factory.get_send_buffer( block_notice_message{b->previous, id} );
boost::asio::post(cp->strand, [cp, send_buffer{std::move(send_buffer)}, bnum]() {
cp->latest_blk_time = std::chrono::steady_clock::now();
peer_dlog( cp, "bcast block_notice ${b}", ("b", bnum) );
Expand Down Expand Up @@ -3506,12 +3531,11 @@ namespace eosio {

if( peer_fork_db_root_num <= fork_db_root_num && peer_fork_db_root_num > 0 ) {
try {
controller& cc = my_impl->chain_plug->chain();
std::optional<block_id_type> peer_fork_db_root_id = cc.fork_block_id_for_num( peer_fork_db_root_num ); // thread-safe
if (!peer_fork_db_root_id) {
auto [on_fork, unknown_block] = block_on_fork(msg.fork_db_root_id); // thread safe
if (unknown_block) {
// can be not found if running with a truncated block log
peer_dlog( this, "peer froot block ${n} is unknown", ("n", peer_fork_db_root_num) );
} else if (msg.fork_db_root_id != peer_fork_db_root_id) {
} else if (on_fork) {
peer_wlog( this, "Peer chain is forked, sending: forked go away" );
no_retry = go_away_reason::forked;
enqueue( go_away_message( go_away_reason::forked ) );
Expand Down Expand Up @@ -3711,7 +3735,7 @@ namespace eosio {
}

void connection::handle_message( const request_message& msg ) {
if( msg.req_blocks.ids.size() > 1 ) {
if( msg.req_blocks.ids.size() > 2 ) {
peer_wlog( this, "Invalid request_message, req_blocks.ids.size ${s}, closing",
("s", msg.req_blocks.ids.size()) );
close();
Expand All @@ -3730,16 +3754,14 @@ namespace eosio {
}
case normal : {
if (protocol_version >= proto_block_nack) {
if (!msg.req_blocks.ids.empty()) {
const block_id_type& id = msg.req_blocks.ids.back();
if (msg.req_blocks.ids.size() == 2 && msg.req_trx.ids.empty()) {
const block_id_type& req_id = msg.req_blocks.ids[0]; // 0 - req_id, 1 - peer_head_id
peer_dlog( this, "${d} request_message:normal #${bn}:${id}",
("d", is_blocks_connection() ? "received" : "ignoring")("bn", block_header::num_from_id(id))("id",id) );
("d", is_blocks_connection() ? "received" : "ignoring")("bn", block_header::num_from_id(req_id))("id",id) );
if (!is_blocks_connection())
return;
uint32_t head_num = my_impl->get_chain_head_num();
auto msg_head_num = block_header::num_from_id(id);
// --msg_head_num since blk_send_branch adds one to request and we need to start at msg_head_num
blk_send_branch( --msg_head_num, 0, head_num );
const block_id_type& peer_head_id = msg.req_blocks.ids[1];
blk_send_branch_from_nack_request(req_id, peer_head_id);
return;
}
}
Expand Down Expand Up @@ -3837,37 +3859,40 @@ namespace eosio {

// called from connection strand
void connection::handle_message( const block_notice_message& msg ) {
if (block_header::num_from_id(msg.id)-1 != block_header::num_from_id(msg.previous)) {
peer_dlog(this, "Invalid block_notice_message ${id}, closing", ("id", msg.id));
close();
return;
}

auto fork_db_root_num = my_impl->get_fork_db_root_num();
if (block_header::num_from_id(msg.id) <= fork_db_root_num)
return;

latest_blk_time = std::chrono::steady_clock::now();
if (my_impl->dispatcher.have_block(msg.id)) {
my_impl->dispatcher.add_peer_block(msg.id, connection_id);
} else {
if (!last_block_notice.empty() && !my_impl->dispatcher.have_block(last_block_notice)) { // still don't have previous block
if (block_header::num_from_id(last_block_notice) == block_header::num_from_id(msg.id) - 1) {
peer_dlog(this, "Received 2 consecutive unknown block notices, checking already requested");
request_message req;
req.req_blocks.mode = normal;
req.req_blocks.ids.push_back(last_block_notice);
bool already_requested = my_impl->connections.any_of_block_connections([&req](const auto& c) {
fc::lock_guard g_conn( c->conn_mtx );
return c->last_request_message == req;
});
if (!already_requested) {
peer_ilog(this, "Received 2 consecutive unknown block notices, requesting blocks from ${bn}",
("bn", block_header::num_from_id(last_block_notice)));
send_block_nack({});
{
fc::lock_guard g_conn( conn_mtx );
last_request_message = req;
}
enqueue( req );
}
} else if (!my_impl->dispatcher.have_block(msg.previous)) { // still don't have previous block
peer_dlog(this, "Received unknown block notice, checking already requested");
request_message req;
req.req_blocks.mode = normal;
req.req_blocks.ids.push_back(msg.previous);
bool already_requested = my_impl->connections.any_of_block_connections([&req](const auto& c) {
fc::lock_guard g_conn(c->conn_mtx);
return c->last_block_nack_request_message_id == req.req_blocks.ids[0];
});
if (!already_requested) {
peer_ilog(this, "Received unknown block notice, requesting blocks from ${bn}",
("bn", block_header::num_from_id(msg.previous)));
block_id_type head_id = my_impl->get_chain_info().head_id;
req.req_blocks.ids.push_back(head_id);
send_block_nack({});
{
fc::lock_guard g_conn(conn_mtx);
last_block_nack_request_message_id = req.req_blocks.ids[0];
}
enqueue(req);
}
last_block_notice = msg.id;
}
}

Expand Down

0 comments on commit ad712db

Please sign in to comment.