Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix missing slot regression, better track of payload attributes in logs #459

Merged
merged 5 commits into from
Mar 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,9 +409,12 @@ where
Err(err) => {
if err.is_consistent_db_view_err() {
let last_block_number = self.provider.last_block_number().unwrap_or_default();

debug!(
block_number,
last_block_number, "Can't build on this head, cancelling slot"
payload_id = self.building_ctx.payload_id,
last_block_number,
"Can't build on this head, cancelling slot"
);
self.cancel_on_fatal_error.cancel();
}
Expand Down
13 changes: 10 additions & 3 deletions crates/rbuilder/src/building/builders/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ pub mod parallel_builder;

use crate::{
building::{BlockBuildingContext, BuiltBlockTrace, SimulatedOrderSink, Sorting},
live_builder::{payload_events::MevBoostSlotData, simulation::SimulatedOrderCommand},
live_builder::{
payload_events::{InternalPayloadId, MevBoostSlotData},
simulation::SimulatedOrderCommand,
},
primitives::{AccountNonce, OrderId, SimulatedOrder},
provider::StateProviderFactory,
utils::{is_provider_factory_health_error, NonceCache},
Expand Down Expand Up @@ -254,12 +257,16 @@ pub struct BacktestSimulateBlockInput<'a, P> {

/// Handles error from block filling stage.
/// Answers if block filling should continue.
pub fn handle_building_error(err: eyre::Report) -> bool {
pub fn handle_building_error(err: eyre::Report, payload_id: InternalPayloadId) -> bool {
// @Types
let err_str = err.to_string();
if !err_str.contains("Profit too low") {
if is_provider_factory_health_error(&err) {
info!(?err, "Cancelling building due to provider factory error");
info!(
payload_id,
?err,
"Cancelling building due to provider factory error"
);
return false;
} else {
warn!(?err, "Error filling orders");
Expand Down
3 changes: 2 additions & 1 deletion crates/rbuilder/src/building/builders/ordering_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub fn run_ordering_builder<P>(input: LiveBuilderInput<P>, config: &OrderingBuil
where
P: StateProviderFactory + Clone + 'static,
{
let payload_id = input.ctx.payload_id;
let mut order_intake_consumer = OrderIntakeConsumer::new(
input.provider.clone(),
input.input,
Expand Down Expand Up @@ -112,7 +113,7 @@ where
}
}
Err(err) => {
if !handle_building_error(err) {
if !handle_building_error(err, payload_id) {
break 'building;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where
}
Err(err) => {
let _span = info_span!("Parallel builder failed to build new block",run_id = self.run_id,version = version,err=?err).entered();
if !handle_building_error(err) {
if !handle_building_error(err, self.ctx.payload_id) {
return false;
}
}
Expand Down
5 changes: 5 additions & 0 deletions crates/rbuilder/src/building/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use reth_primitives::BlockBody;
use reth_primitives_traits::{proofs, Block as _};

use crate::{
live_builder::payload_events::InternalPayloadId,
primitives::{Order, OrderId, SimValue, SimulatedOrder, TransactionSignedEcRecoveredWithBlobs},
provider::RootHasher,
roothash::RootHashError,
Expand Down Expand Up @@ -93,6 +94,7 @@ pub struct BlockBuildingContext {
/// Version of the EVM that we are going to use
pub spec_id: SpecId,
pub root_hasher: Arc<dyn RootHasher>,
pub payload_id: InternalPayloadId,
}

impl BlockBuildingContext {
Expand All @@ -109,6 +111,7 @@ impl BlockBuildingContext {
extra_data: Vec<u8>,
spec_id: Option<SpecId>,
root_hasher: Arc<dyn RootHasher>,
payload_id: InternalPayloadId,
) -> Option<BlockBuildingContext> {
let attributes = EthPayloadBuilderAttributes::try_new(
attributes.data.parent_block_hash,
Expand Down Expand Up @@ -171,6 +174,7 @@ impl BlockBuildingContext {
excess_blob_gas,
spec_id,
root_hasher,
payload_id,
})
}

Expand Down Expand Up @@ -254,6 +258,7 @@ impl BlockBuildingContext {
excess_blob_gas: onchain_block.header.excess_blob_gas,
spec_id,
root_hasher,
payload_id: 0,
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/rbuilder/src/building/testing/test_chain_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ impl TestBlockContextBuilder {
vec![],
Some(SpecId::SHANGHAI),
self.root_hasher,
0,
)
.unwrap();
if self.use_suggested_fee_recipient_as_coinbase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,20 +99,20 @@ impl SequentialSealerBidMakerProcess {
{
Ok(finalize_res) => match finalize_res {
Ok(res) => self.sink.new_block(res.block),
Err(error) => {
if error.is_critical() {
Err(err) => {
if err.is_critical() {
error!(
builder_name,
block_number,
?error,
block = block_number,
?err,
"Error on finalize_block on SequentialSealerBidMaker"
)
}
}
},
Err(error) => error!(
block_number,
?error,
Err(err) => error!(
block = block_number,
?err,
"Error on join finalize_block on SequentialSealerBidMaker"
),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ async fn run_submit_to_relays_job(
true_bid_value = format_ether(block.trace.true_bid_value),
seen_competition_bid = format_ether(block.trace.seen_competition_bid.unwrap_or_default()),
block = block.sealed_block.number,
slot = slot_data.slot(),
payload_id = slot_data.payload_id,
hash = ?block.sealed_block.hash(),
gas = block.sealed_block.gas_used,
txs = block.sealed_block.body().transactions.len(),
Expand Down
73 changes: 72 additions & 1 deletion crates/rbuilder/src/live_builder/building/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ use crate::{
use revm_primitives::Address;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use tracing::{debug, trace};
use tracing::{debug, info, trace, warn};
use unfinished_block_building_sink_muxer::UnfinishedBlockBuildingSinkMuxer;

/// Interval for checking if last block still corresponds to the parent of the given block building context
const CHECK_LAST_BLOCK_INTERVAL: Duration = Duration::from_millis(100);

use super::{
order_input::{
self, order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock,
Expand Down Expand Up @@ -79,6 +82,15 @@ where
cancel.cancel();
});

{
let provider = self.provider.clone();
let block_ctx = block_ctx.clone();
let block_cancellation = block_cancellation.clone();
tokio::task::spawn_blocking(move || {
run_check_if_parent_block_is_last_block(provider, block_ctx, block_cancellation);
});
}

let (orders_for_block, sink) = OrdersForBlock::new_with_sink();
// add OrderReplacementManager to manage replacements and cancellations
let order_replacement_manager = OrderReplacementManager::new(Box::new(sink));
Expand Down Expand Up @@ -208,3 +220,62 @@ fn merge_and_send(
}
trace!("Cancelling merge_and_send job, source stopped");
}

fn run_check_if_parent_block_is_last_block<P>(
provider: P,
block_ctx: BlockBuildingContext,
block_cancellation: CancellationToken,
) where
P: StateProviderFactory + Clone + 'static,
{
loop {
std::thread::sleep(CHECK_LAST_BLOCK_INTERVAL);
if block_cancellation.is_cancelled() {
return;
}
let last_block_number = match provider.last_block_number() {
Ok(n) => n,
Err(err) => {
warn!(?err, "Failed to get last block number");
continue;
}
};
if last_block_number + 1 != block_ctx.block() {
info!(
reason = "last block number",
last_block_number,
block = block_ctx.block(),
payload_id = block_ctx.payload_id,
"Cancelling building job"
);
block_cancellation.cancel();
return;
}

let last_block_hash = match provider.block_hash(last_block_number) {
Ok(Some(h)) => h,
Ok(None) => {
warn!(err = "hash is missing", "Failed to get last block hash");
continue;
}
Err(err) => {
warn!(?err, "Failed to get last block hash");
continue;
}
};

let parent_hash = block_ctx.attributes.parent;
if last_block_hash != parent_hash {
info!(
reason = "last block hash",
?last_block_hash,
?parent_hash,
block = block_ctx.block(),
payload_id = block_ctx.payload_id,
"Cancelling building job"
);
block_cancellation.cancel();
return;
}
}
}
52 changes: 29 additions & 23 deletions crates/rbuilder/src/live_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,33 +209,37 @@ where
let blocklist = self.blocklist_provider.get_blocklist()?;
if blocklist.contains(&payload.fee_recipient()) {
warn!(
slot = payload.slot(),
fee_recipient = ?payload.fee_recipient(),
"Fee recipient is in blocklist"
);
slot = payload.slot(),
fee_recipient = ?payload.fee_recipient(),
payload_id = payload.payload_id,
"Fee recipient is in blocklist"
);
continue;
}
let current_time = OffsetDateTime::now_utc();
// see if we can get parent header in a reasonable time
let time_to_slot = payload.timestamp() - current_time;
debug!(
slot = payload.slot(),
block = payload.block(),
?current_time,
payload_timestamp = ?payload.timestamp(),
?time_to_slot,
parent_hash = ?payload.parent_block_hash(),
provider_head_state = ?ProviderHeadState::new(&self.provider),
"Received payload, time till slot timestamp",
);
slot = payload.slot(),
block = payload.block(),
payload_id = payload.payload_id,
?current_time,
payload_timestamp = ?payload.timestamp(),
?time_to_slot,
parent_hash = ?payload.parent_block_hash(),
provider_head_state = ?ProviderHeadState::new(&self.provider),
"Received payload, time till slot timestamp",
);

let time_until_slot_end = time_to_slot + timings.slot_proposal_duration;
if time_until_slot_end.is_negative() {
warn!(
slot = payload.slot(),
parent_hash = ?payload.parent_block_hash(),
"Slot already ended, skipping block building"
);
slot = payload.slot(),
block = payload.block(),
payload_id = payload.payload_id,
parent_hash = ?payload.parent_block_hash(),
"Slot already ended, skipping block building"
);
continue;
};

Expand All @@ -247,18 +251,19 @@ where
{
Ok(header) => header,
Err(err) => {
warn!(parent_hash = ?payload.parent_block_hash(), ?err, "Failed to get parent header for new slot");
warn!(payload_id = payload.payload_id, parent_hash = ?payload.parent_block_hash(), ?err, "Failed to get parent header for new slot");
continue;
}
}
};

debug!(
slot = payload.slot(),
block = payload.block(),
parent_hash = ?payload.parent_block_hash(),
"Got header for slot"
);
slot = payload.slot(),
block = payload.block(),
payload_id = payload.payload_id,
parent_hash = ?payload.parent_block_hash(),
"Got header for slot"
);

// notify the order pool that there is a new header
if let Err(err) = header_sender.send(parent_header.clone()).await {
Expand All @@ -280,6 +285,7 @@ where
self.extra_data.clone(),
None,
root_hasher,
payload.payload_id,
) {
mark_building_started(block_ctx.timestamp());
builder_pool.start_block_building(
Expand Down
8 changes: 4 additions & 4 deletions crates/rbuilder/src/live_builder/order_input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ where
tokio::select! {
header = header_receiver.recv() => {
if let Some(header) = header {
let block_number = header.number;
set_current_block(block_number);
let current_block = header.number;
set_current_block(current_block);
let state = match provider_factory.latest() {
Ok(state) => state,
Err(err) => {
Expand All @@ -346,13 +346,13 @@ where
let mut orderpool = orderpool.lock();
let start = Instant::now();

orderpool.head_updated(block_number, &state);
orderpool.head_updated(current_block, &state);

let update_time = start.elapsed();
let (tx_count, bundle_count) = orderpool.content_count();
set_ordepool_count(tx_count, bundle_count);
debug!(
block_number,
current_block,
tx_count,
bundle_count,
update_time_ms = update_time.as_millis(),
Expand Down
Loading
Loading