diff --git a/crates/rbuilder/src/building/builders/block_building_helper.rs b/crates/rbuilder/src/building/builders/block_building_helper.rs index a6075451..c4f253ca 100644 --- a/crates/rbuilder/src/building/builders/block_building_helper.rs +++ b/crates/rbuilder/src/building/builders/block_building_helper.rs @@ -409,9 +409,12 @@ where Err(err) => { if err.is_consistent_db_view_err() { let last_block_number = self.provider.last_block_number().unwrap_or_default(); + debug!( block_number, - last_block_number, "Can't build on this head, cancelling slot" + payload_id = self.building_ctx.payload_id, + last_block_number, + "Can't build on this head, cancelling slot" ); self.cancel_on_fatal_error.cancel(); } diff --git a/crates/rbuilder/src/building/builders/mod.rs b/crates/rbuilder/src/building/builders/mod.rs index 236efc1a..bc221ff4 100644 --- a/crates/rbuilder/src/building/builders/mod.rs +++ b/crates/rbuilder/src/building/builders/mod.rs @@ -6,7 +6,10 @@ pub mod parallel_builder; use crate::{ building::{BlockBuildingContext, BuiltBlockTrace, SimulatedOrderSink, Sorting}, - live_builder::{payload_events::MevBoostSlotData, simulation::SimulatedOrderCommand}, + live_builder::{ + payload_events::{InternalPayloadId, MevBoostSlotData}, + simulation::SimulatedOrderCommand, + }, primitives::{AccountNonce, OrderId, SimulatedOrder}, provider::StateProviderFactory, utils::{is_provider_factory_health_error, NonceCache}, @@ -254,12 +257,16 @@ pub struct BacktestSimulateBlockInput<'a, P> { /// Handles error from block filling stage. /// Answers if block filling should continue. -pub fn handle_building_error(err: eyre::Report) -> bool { +pub fn handle_building_error(err: eyre::Report, payload_id: InternalPayloadId) -> bool { // @Types let err_str = err.to_string(); if !err_str.contains("Profit too low") { if is_provider_factory_health_error(&err) { - info!(?err, "Cancelling building due to provider factory error"); + info!( + payload_id, + ?err, + "Cancelling building due to provider factory error" + ); return false; } else { warn!(?err, "Error filling orders"); diff --git a/crates/rbuilder/src/building/builders/ordering_builder.rs b/crates/rbuilder/src/building/builders/ordering_builder.rs index b0ff521e..dde57098 100644 --- a/crates/rbuilder/src/building/builders/ordering_builder.rs +++ b/crates/rbuilder/src/building/builders/ordering_builder.rs @@ -62,6 +62,7 @@ pub fn run_ordering_builder

(input: LiveBuilderInput

, config: &OrderingBuil where P: StateProviderFactory + Clone + 'static, { + let payload_id = input.ctx.payload_id; let mut order_intake_consumer = OrderIntakeConsumer::new( input.provider.clone(), input.input, @@ -112,7 +113,7 @@ where } } Err(err) => { - if !handle_building_error(err) { + if !handle_building_error(err, payload_id) { break 'building; } } diff --git a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs index 4f558f03..6bbbf19b 100644 --- a/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs +++ b/crates/rbuilder/src/building/builders/parallel_builder/block_building_result_assembler.rs @@ -161,7 +161,7 @@ where } Err(err) => { let _span = info_span!("Parallel builder failed to build new block",run_id = self.run_id,version = version,err=?err).entered(); - if !handle_building_error(err) { + if !handle_building_error(err, self.ctx.payload_id) { return false; } } diff --git a/crates/rbuilder/src/building/mod.rs b/crates/rbuilder/src/building/mod.rs index 33bab78f..3e3708c4 100644 --- a/crates/rbuilder/src/building/mod.rs +++ b/crates/rbuilder/src/building/mod.rs @@ -17,6 +17,7 @@ use reth_primitives::BlockBody; use reth_primitives_traits::{proofs, Block as _}; use crate::{ + live_builder::payload_events::InternalPayloadId, primitives::{Order, OrderId, SimValue, SimulatedOrder, TransactionSignedEcRecoveredWithBlobs}, provider::RootHasher, roothash::RootHashError, @@ -93,6 +94,7 @@ pub struct BlockBuildingContext { /// Version of the EVM that we are going to use pub spec_id: SpecId, pub root_hasher: Arc, + pub payload_id: InternalPayloadId, } impl BlockBuildingContext { @@ -109,6 +111,7 @@ impl BlockBuildingContext { extra_data: Vec, spec_id: Option, root_hasher: Arc, + payload_id: InternalPayloadId, ) -> Option { let attributes = EthPayloadBuilderAttributes::try_new( attributes.data.parent_block_hash, @@ -171,6 +174,7 @@ impl BlockBuildingContext { excess_blob_gas, spec_id, root_hasher, + payload_id, }) } @@ -254,6 +258,7 @@ impl BlockBuildingContext { excess_blob_gas: onchain_block.header.excess_blob_gas, spec_id, root_hasher, + payload_id: 0, } } diff --git a/crates/rbuilder/src/building/testing/test_chain_state.rs b/crates/rbuilder/src/building/testing/test_chain_state.rs index e3dd82d3..dc9f2f69 100644 --- a/crates/rbuilder/src/building/testing/test_chain_state.rs +++ b/crates/rbuilder/src/building/testing/test_chain_state.rs @@ -381,6 +381,7 @@ impl TestBlockContextBuilder { vec![], Some(SpecId::SHANGHAI), self.root_hasher, + 0, ) .unwrap(); if self.use_suggested_fee_recipient_as_coinbase { diff --git a/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs b/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs index ac5c4474..ceaf3445 100644 --- a/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs +++ b/crates/rbuilder/src/live_builder/block_output/bidding/sequential_sealer_bid_maker.rs @@ -99,20 +99,20 @@ impl SequentialSealerBidMakerProcess { { Ok(finalize_res) => match finalize_res { Ok(res) => self.sink.new_block(res.block), - Err(error) => { - if error.is_critical() { + Err(err) => { + if err.is_critical() { error!( builder_name, - block_number, - ?error, + block = block_number, + ?err, "Error on finalize_block on SequentialSealerBidMaker" ) } } }, - Err(error) => error!( - block_number, - ?error, + Err(err) => error!( + block = block_number, + ?err, "Error on join finalize_block on SequentialSealerBidMaker" ), } diff --git a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs index 2fd49aeb..4a43c135 100644 --- a/crates/rbuilder/src/live_builder/block_output/relay_submit.rs +++ b/crates/rbuilder/src/live_builder/block_output/relay_submit.rs @@ -212,6 +212,8 @@ async fn run_submit_to_relays_job( true_bid_value = format_ether(block.trace.true_bid_value), seen_competition_bid = format_ether(block.trace.seen_competition_bid.unwrap_or_default()), block = block.sealed_block.number, + slot = slot_data.slot(), + payload_id = slot_data.payload_id, hash = ?block.sealed_block.hash(), gas = block.sealed_block.gas_used, txs = block.sealed_block.body().transactions.len(), diff --git a/crates/rbuilder/src/live_builder/building/mod.rs b/crates/rbuilder/src/live_builder/building/mod.rs index 154a03a7..fe63efe9 100644 --- a/crates/rbuilder/src/live_builder/building/mod.rs +++ b/crates/rbuilder/src/live_builder/building/mod.rs @@ -17,9 +17,12 @@ use crate::{ use revm_primitives::Address; use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; -use tracing::{debug, trace}; +use tracing::{debug, info, trace, warn}; use unfinished_block_building_sink_muxer::UnfinishedBlockBuildingSinkMuxer; +/// Interval for checking if last block still corresponds to the parent of the given block building context +const CHECK_LAST_BLOCK_INTERVAL: Duration = Duration::from_millis(100); + use super::{ order_input::{ self, order_replacement_manager::OrderReplacementManager, orderpool::OrdersForBlock, @@ -79,6 +82,15 @@ where cancel.cancel(); }); + { + let provider = self.provider.clone(); + let block_ctx = block_ctx.clone(); + let block_cancellation = block_cancellation.clone(); + tokio::task::spawn_blocking(move || { + run_check_if_parent_block_is_last_block(provider, block_ctx, block_cancellation); + }); + } + let (orders_for_block, sink) = OrdersForBlock::new_with_sink(); // add OrderReplacementManager to manage replacements and cancellations let order_replacement_manager = OrderReplacementManager::new(Box::new(sink)); @@ -208,3 +220,62 @@ fn merge_and_send( } trace!("Cancelling merge_and_send job, source stopped"); } + +fn run_check_if_parent_block_is_last_block

( + provider: P, + block_ctx: BlockBuildingContext, + block_cancellation: CancellationToken, +) where + P: StateProviderFactory + Clone + 'static, +{ + loop { + std::thread::sleep(CHECK_LAST_BLOCK_INTERVAL); + if block_cancellation.is_cancelled() { + return; + } + let last_block_number = match provider.last_block_number() { + Ok(n) => n, + Err(err) => { + warn!(?err, "Failed to get last block number"); + continue; + } + }; + if last_block_number + 1 != block_ctx.block() { + info!( + reason = "last block number", + last_block_number, + block = block_ctx.block(), + payload_id = block_ctx.payload_id, + "Cancelling building job" + ); + block_cancellation.cancel(); + return; + } + + let last_block_hash = match provider.block_hash(last_block_number) { + Ok(Some(h)) => h, + Ok(None) => { + warn!(err = "hash is missing", "Failed to get last block hash"); + continue; + } + Err(err) => { + warn!(?err, "Failed to get last block hash"); + continue; + } + }; + + let parent_hash = block_ctx.attributes.parent; + if last_block_hash != parent_hash { + info!( + reason = "last block hash", + ?last_block_hash, + ?parent_hash, + block = block_ctx.block(), + payload_id = block_ctx.payload_id, + "Cancelling building job" + ); + block_cancellation.cancel(); + return; + } + } +} diff --git a/crates/rbuilder/src/live_builder/mod.rs b/crates/rbuilder/src/live_builder/mod.rs index d5f0a819..a4b4727e 100644 --- a/crates/rbuilder/src/live_builder/mod.rs +++ b/crates/rbuilder/src/live_builder/mod.rs @@ -209,33 +209,37 @@ where let blocklist = self.blocklist_provider.get_blocklist()?; if blocklist.contains(&payload.fee_recipient()) { warn!( - slot = payload.slot(), - fee_recipient = ?payload.fee_recipient(), - "Fee recipient is in blocklist" - ); + slot = payload.slot(), + fee_recipient = ?payload.fee_recipient(), + payload_id = payload.payload_id, + "Fee recipient is in blocklist" + ); continue; } let current_time = OffsetDateTime::now_utc(); // see if we can get parent header in a reasonable time let time_to_slot = payload.timestamp() - current_time; debug!( - slot = payload.slot(), - block = payload.block(), - ?current_time, - payload_timestamp = ?payload.timestamp(), - ?time_to_slot, - parent_hash = ?payload.parent_block_hash(), - provider_head_state = ?ProviderHeadState::new(&self.provider), - "Received payload, time till slot timestamp", - ); + slot = payload.slot(), + block = payload.block(), + payload_id = payload.payload_id, + ?current_time, + payload_timestamp = ?payload.timestamp(), + ?time_to_slot, + parent_hash = ?payload.parent_block_hash(), + provider_head_state = ?ProviderHeadState::new(&self.provider), + "Received payload, time till slot timestamp", + ); let time_until_slot_end = time_to_slot + timings.slot_proposal_duration; if time_until_slot_end.is_negative() { warn!( - slot = payload.slot(), - parent_hash = ?payload.parent_block_hash(), - "Slot already ended, skipping block building" - ); + slot = payload.slot(), + block = payload.block(), + payload_id = payload.payload_id, + parent_hash = ?payload.parent_block_hash(), + "Slot already ended, skipping block building" + ); continue; }; @@ -247,18 +251,19 @@ where { Ok(header) => header, Err(err) => { - warn!(parent_hash = ?payload.parent_block_hash(), ?err, "Failed to get parent header for new slot"); + warn!(payload_id = payload.payload_id, parent_hash = ?payload.parent_block_hash(), ?err, "Failed to get parent header for new slot"); continue; } } }; debug!( - slot = payload.slot(), - block = payload.block(), - parent_hash = ?payload.parent_block_hash(), - "Got header for slot" - ); + slot = payload.slot(), + block = payload.block(), + payload_id = payload.payload_id, + parent_hash = ?payload.parent_block_hash(), + "Got header for slot" + ); // notify the order pool that there is a new header if let Err(err) = header_sender.send(parent_header.clone()).await { @@ -280,6 +285,7 @@ where self.extra_data.clone(), None, root_hasher, + payload.payload_id, ) { mark_building_started(block_ctx.timestamp()); builder_pool.start_block_building( diff --git a/crates/rbuilder/src/live_builder/order_input/mod.rs b/crates/rbuilder/src/live_builder/order_input/mod.rs index c74a16d3..164010af 100644 --- a/crates/rbuilder/src/live_builder/order_input/mod.rs +++ b/crates/rbuilder/src/live_builder/order_input/mod.rs @@ -332,8 +332,8 @@ where tokio::select! { header = header_receiver.recv() => { if let Some(header) = header { - let block_number = header.number; - set_current_block(block_number); + let current_block = header.number; + set_current_block(current_block); let state = match provider_factory.latest() { Ok(state) => state, Err(err) => { @@ -346,13 +346,13 @@ where let mut orderpool = orderpool.lock(); let start = Instant::now(); - orderpool.head_updated(block_number, &state); + orderpool.head_updated(current_block, &state); let update_time = start.elapsed(); let (tx_count, bundle_count) = orderpool.content_count(); set_ordepool_count(tx_count, bundle_count); debug!( - block_number, + current_block, tx_count, bundle_count, update_time_ms = update_time.as_millis(), diff --git a/crates/rbuilder/src/live_builder/payload_events/mod.rs b/crates/rbuilder/src/live_builder/payload_events/mod.rs index de3d5ad9..0544423e 100644 --- a/crates/rbuilder/src/live_builder/payload_events/mod.rs +++ b/crates/rbuilder/src/live_builder/payload_events/mod.rs @@ -15,10 +15,12 @@ use crate::{ SlotSource, }, primitives::mev_boost::{MevBoostRelayID, MevBoostRelaySlotInfoProvider}, + utils::timestamp_ms_to_offset_datetime, }; use alloy_eips::{merge::SLOT_DURATION, BlockNumHash}; use alloy_primitives::{utils::format_ether, Address, B256, U256}; use alloy_rpc_types_beacon::events::PayloadAttributesEvent; +use derivative::Derivative; use std::{collections::VecDeque, sync::Arc, time::Duration}; use tokio::{sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; @@ -33,9 +35,13 @@ const NEW_PAYLOAD_RECV_TIMEOUT: Duration = SLOT_DURATION.saturating_mul(2); /// One slot (12secs) is enough so we don't saturate any resource and we don't miss to many slots. const CONSENSUS_CLIENT_RECONNECT_WAIT: Duration = SLOT_DURATION; +/// Unique paload ID used to track payload across the builder. +pub type InternalPayloadId = u64; + /// Data about a slot received from relays. /// Contains the important information needed to build and submit the block. -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Derivative)] +#[derivative(Debug, Clone, PartialEq, Eq)] pub struct MevBoostSlotData { /// The .data.payload_attributes.suggested_fee_recipient is replaced pub payload_attributes_event: PayloadAttributesEvent, @@ -43,6 +49,8 @@ pub struct MevBoostSlotData { /// List of relays agreeing to the slot_data. It may not contain all the relays (eg: errors, forks, validators registering only to some relays) pub relays: Vec, pub slot_data: SlotData, + #[derivative(PartialEq = "ignore", Hash = "ignore")] + pub payload_id: InternalPayloadId, } impl MevBoostSlotData { @@ -119,6 +127,8 @@ impl MevBoostSlotDataGenerator { pub fn spawn(self) -> (JoinHandle<()>, mpsc::UnboundedReceiver) { let relays = RelaysForSlotData::new(&self.relays); + let mut payload_counter = 0; + let (send, receive) = mpsc::unbounded_channel(); let handle = tokio::spawn(async move { let mut source = PayloadSourceMuxer::new( @@ -137,32 +147,61 @@ impl MevBoostSlotDataGenerator { return; } - let (slot_data, relays) = - if let Some(res) = relays.slot_data(event.data.proposal_slot).await { - res - } else { - continue; - }; + let payload_id: InternalPayloadId = payload_counter; + payload_counter += 1; + + let slot = event.data.proposal_slot; + let block = event.data.parent_block_number + 1; + let parent_hash = event.data.parent_block_hash; + let timestamp = + timestamp_ms_to_offset_datetime(event.data.payload_attributes.timestamp * 1000); + info!( + payload_id, + slot, + block, + ?parent_hash, + ?timestamp, + "Payload attributes received from CL client" + ); + + let (slot_data, relays) = if let Some(res) = relays.slot_data(slot).await { + res + } else { + info!( + payload_id, + reason = "no MEV-Boost relay data", + "Payload attributes discarded" + ); + continue; + }; let mut correct_event = event; correct_event .data .payload_attributes .suggested_fee_recipient = slot_data.fee_recipient; + info!(payload_id, address = ?slot_data.fee_recipient, "Payload attributes correct fee recipient set"); let mev_boost_slot_data = MevBoostSlotData { payload_attributes_event: correct_event, suggested_gas_limit: slot_data.gas_limit, relays, slot_data, + payload_id, }; match check_slot_data_for_blocklist( &mev_boost_slot_data, self.blocklist_provider.as_ref(), + payload_id, ) { Ok(can_build) => { if !can_build { + info!( + payload_id, + reason = "blocklist", + "Payload attributes discarded" + ); continue; } } @@ -175,6 +214,11 @@ impl MevBoostSlotDataGenerator { } if recently_sent_data.contains(&mev_boost_slot_data) { + info!( + payload_id, + reason = "the same payload was already sent", + "Payload attributes discarded" + ); continue; } if recently_sent_data.len() > RECENTLY_SENT_EVENTS_BUFF { @@ -185,7 +229,7 @@ impl MevBoostSlotDataGenerator { report_slot_withdrawals_to_fee_recipients(&mev_boost_slot_data); if send.send(mev_boost_slot_data).is_err() { - debug!("MevBoostSlotData events channel closed"); + debug!(payload_id, "MevBoostSlotData events channel closed"); break; } } @@ -213,9 +257,10 @@ impl SlotSource for MevBoostSlotDataGenerator { fn check_slot_data_for_blocklist( data: &MevBoostSlotData, blocklist_provider: &dyn BlockListProvider, + payload_id: InternalPayloadId, ) -> Result { if blocklist_provider.current_list_contains(&data.fee_recipient())? { - warn!(recipiend=?data.fee_recipient(),"Slot data fee recipient is in the blocklist"); + warn!(payload_id, recipiend=?data.fee_recipient(),"Slot data fee recipient is in the blocklist"); return Ok(false); } Ok(true)