Skip to content

Commit

Permalink
feat: store safe block num as well (#11648)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Oct 10, 2024
1 parent 250785f commit 1ba631b
Show file tree
Hide file tree
Showing 14 changed files with 130 additions and 44 deletions.
2 changes: 1 addition & 1 deletion crates/blockchain-tree/src/externals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use reth_db_api::{cursor::DbCursorRO, transaction::DbTx};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::StaticFileSegment;
use reth_provider::{
providers::ProviderNodeTypes, FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory,
providers::ProviderNodeTypes, ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory,
StaticFileProviderFactory, StatsReader,
};
use reth_storage_errors::provider::ProviderResult;
Expand Down
8 changes: 6 additions & 2 deletions crates/chain-state/src/chain_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ pub struct ChainInfoTracker {
impl ChainInfoTracker {
/// Create a new chain info container for the given canonical head and finalized header if it
/// exists.
pub fn new(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
pub fn new(
head: SealedHeader,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
) -> Self {
let (finalized_block, _) = watch::channel(finalized);
let (safe_block, _) = watch::channel(None);
let (safe_block, _) = watch::channel(safe);

Self {
inner: Arc::new(ChainInfoInner {
Expand Down
17 changes: 11 additions & 6 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,13 @@ impl CanonicalInMemoryState {
numbers: BTreeMap<u64, B256>,
pending: Option<BlockState>,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
) -> Self {
let in_memory_state = InMemoryState::new(blocks, numbers, pending);
let header = in_memory_state
.head_state()
.map_or_else(SealedHeader::default, |state| state.block_ref().block().header.clone());
let chain_info_tracker = ChainInfoTracker::new(header, finalized);
let chain_info_tracker = ChainInfoTracker::new(header, finalized, safe);
let (canon_state_notification_sender, _) =
broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE);

Expand All @@ -193,13 +194,17 @@ impl CanonicalInMemoryState {

/// Create an empty state.
pub fn empty() -> Self {
Self::new(HashMap::default(), BTreeMap::new(), None, None)
Self::new(HashMap::default(), BTreeMap::new(), None, None, None)
}

/// Create a new in memory state with the given local head and finalized header
/// if it exists.
pub fn with_head(head: SealedHeader, finalized: Option<SealedHeader>) -> Self {
let chain_info_tracker = ChainInfoTracker::new(head, finalized);
pub fn with_head(
head: SealedHeader,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
) -> Self {
let chain_info_tracker = ChainInfoTracker::new(head, finalized, safe);
let in_memory_state = InMemoryState::default();
let (canon_state_notification_sender, _) =
broadcast::channel(CANON_STATE_NOTIFICATION_CHANNEL_SIZE);
Expand Down Expand Up @@ -1255,7 +1260,7 @@ mod tests {
numbers.insert(2, block2.block().hash());
numbers.insert(3, block3.block().hash());

let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None);
let canonical_state = CanonicalInMemoryState::new(blocks, numbers, None, None, None);

let historical: StateProviderBox = Box::new(MockStateProvider);

Expand Down Expand Up @@ -1297,7 +1302,7 @@ mod tests {
let mut numbers = BTreeMap::new();
numbers.insert(1, hash);

let state = CanonicalInMemoryState::new(blocks, numbers, None, None);
let state = CanonicalInMemoryState::new(blocks, numbers, None, None, None);
let chain: Vec<_> = state.canonical_chain().collect();

assert_eq!(chain.len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion crates/cli/commands/src/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_node_builder::{NodeTypesWithDB, NodeTypesWithEngine};
use reth_node_core::args::NetworkArgs;
use reth_provider::{
providers::ProviderNodeTypes, BlockExecutionWriter, BlockNumReader, ChainSpecProvider,
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
ChainStateBlockReader, ChainStateBlockWriter, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::PruneModes;
use reth_stages::{
Expand Down
9 changes: 7 additions & 2 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,13 @@ where
let (header, seal) = sealed.into_parts();
let genesis_block = SealedHeader::new(header, seal);

let blockchain_provider =
BlockchainProvider::with_blocks(provider_factory.clone(), tree, genesis_block, None);
let blockchain_provider = BlockchainProvider::with_blocks(
provider_factory.clone(),
tree,
genesis_block,
None,
None,
);

let pruner = Pruner::new_with_factory(
provider_factory.clone(),
Expand Down
18 changes: 17 additions & 1 deletion crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
DatabaseProviderFactory, FinalizedBlockWriter, ProviderFactory, StaticFileProviderFactory,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
Expand Down Expand Up @@ -97,6 +97,11 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
provider.save_finalized_block_number(finalized_block)?;
provider.commit()?;
}
PersistenceAction::SaveSafeBlock(safe_block) => {
let provider = self.provider.database_provider_rw()?;
provider.save_safe_block_number(safe_block)?;
provider.commit()?;
}
}
}
Ok(())
Expand Down Expand Up @@ -176,6 +181,9 @@ pub enum PersistenceAction {

/// Update the persisted finalized block on disk
SaveFinalizedBlock(u64),

/// Update the persisted safe block on disk
SaveSafeBlock(u64),
}

/// A handle to the persistence service
Expand Down Expand Up @@ -251,6 +259,14 @@ impl PersistenceHandle {
self.send_action(PersistenceAction::SaveFinalizedBlock(finalized_block))
}

/// Persists the finalized block number on disk.
pub fn save_safe_block_number(
&self,
safe_block: u64,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::SaveSafeBlock(safe_block))
}

/// Tells the persistence service to remove blocks above a certain block number. The removed
/// blocks are returned by the service.
///
Expand Down
13 changes: 9 additions & 4 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2401,8 +2401,13 @@ where
// if the safe block is not known, we can't update the safe block
return Err(OnForkChoiceUpdated::invalid_state())
}
Ok(Some(finalized)) => {
self.canonical_in_memory_state.set_safe(finalized);
Ok(Some(safe)) => {
if Some(safe.num_hash()) != self.canonical_in_memory_state.get_safe_num_hash() {
// we're also persisting the safe block on disk so we can reload it on
// restart this is required by optimism which queries the safe block: <https://github.com/ethereum-optimism/optimism/blob/c383eb880f307caa3ca41010ec10f30f08396b2e/op-node/rollup/sync/start.go#L65-L65>
let _ = self.persistence.save_safe_block_number(safe.number);
self.canonical_in_memory_state.set_safe(safe);
}
}
Err(err) => {
error!(target: "engine::tree", %err, "Failed to fetch safe block header");
Expand Down Expand Up @@ -2680,7 +2685,7 @@ mod tests {
let (header, seal) = sealed.into_parts();
let header = SealedHeader::new(header, seal);
let engine_api_tree_state = EngineApiTreeState::new(10, 10, header.num_hash());
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None);
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header, None, None);

let (to_payload_service, _payload_command_rx) = unbounded_channel();
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
Expand Down Expand Up @@ -2744,7 +2749,7 @@ mod tests {
let last_executed_block = blocks.last().unwrap().clone();
let pending = Some(BlockState::new(last_executed_block));
self.tree.canonical_in_memory_state =
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None);
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending, None, None);

self.blocks = blocks.clone();
self.persist_blocks(
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/api/src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ pub use event::*;
use futures_util::Future;
use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, DatabaseProviderFactory,
FinalizedBlockReader, FinalizedBlockWriter, ProviderFactory, StageCheckpointReader,
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader,
ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader,
StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_prune::PrunerBuilder;
Expand Down
11 changes: 7 additions & 4 deletions crates/storage/db/src/tables/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,8 @@ tables! {
pub enum ChainStateKey {
/// Last finalized block key
LastFinalizedBlock,
/// Last finalized block key
LastSafeBlockBlock,
}

impl Encode for ChainStateKey {
Expand All @@ -424,16 +426,17 @@ impl Encode for ChainStateKey {
fn encode(self) -> Self::Encoded {
match self {
Self::LastFinalizedBlock => [0],
Self::LastSafeBlockBlock => [1],
}
}
}

impl Decode for ChainStateKey {
fn decode(value: &[u8]) -> Result<Self, reth_db_api::DatabaseError> {
if value == [0] {
Ok(Self::LastFinalizedBlock)
} else {
Err(reth_db_api::DatabaseError::Decode)
match value {
[0] => Ok(Self::LastFinalizedBlock),
[1] => Ok(Self::LastSafeBlockBlock),
_ => Err(reth_db_api::DatabaseError::Decode),
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
providers::StaticFileProvider, AccountReader, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, BlockSource, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
DatabaseProviderRO, EvmEnvProvider, FinalizedBlockReader, HeaderProvider, ProviderError,
CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader,
DatabaseProviderFactory, DatabaseProviderRO, EvmEnvProvider, HeaderProvider, ProviderError,
ProviderFactory, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt,
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
Expand Down Expand Up @@ -93,9 +93,23 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
.map(|num| provider.sealed_header(num))
.transpose()?
.flatten();
let safe_header = provider
.last_safe_block_number()?
.or_else(|| {
// for the purpose of this we can also use the finalized block if we don't have the
// safe block
provider.last_finalized_block_number().ok().flatten()
})
.map(|num| provider.sealed_header(num))
.transpose()?
.flatten();
Ok(Self {
database,
canonical_in_memory_state: CanonicalInMemoryState::with_head(latest, finalized_header),
canonical_in_memory_state: CanonicalInMemoryState::with_head(
latest,
finalized_header,
safe_header,
),
})
}

Expand Down
37 changes: 28 additions & 9 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use crate::{
},
writer::UnifiedStorageWriter,
AccountReader, BlockExecutionReader, BlockExecutionWriter, BlockHashReader, BlockNumReader,
BlockReader, BlockWriter, BundleStateInit, DBProvider, EvmEnvProvider, FinalizedBlockReader,
FinalizedBlockWriter, HashingWriter, HeaderProvider, HeaderSyncGap, HeaderSyncGapProvider,
HistoricalStateProvider, HistoryWriter, LatestStateProvider, OriginalValuesKnown,
ProviderError, PruneCheckpointReader, PruneCheckpointWriter, RequestsProvider, RevertsInit,
StageCheckpointReader, StateChangeWriter, StateProviderBox, StateReader, StateWriter,
StaticFileProviderFactory, StatsReader, StorageReader, StorageTrieWriter, TransactionVariant,
TransactionsProvider, TransactionsProviderExt, TrieWriter, WithdrawalsProvider,
BlockReader, BlockWriter, BundleStateInit, ChainStateBlockReader, ChainStateBlockWriter,
DBProvider, EvmEnvProvider, HashingWriter, HeaderProvider, HeaderSyncGap,
HeaderSyncGapProvider, HistoricalStateProvider, HistoryWriter, LatestStateProvider,
OriginalValuesKnown, ProviderError, PruneCheckpointReader, PruneCheckpointWriter,
RequestsProvider, RevertsInit, StageCheckpointReader, StateChangeWriter, StateProviderBox,
StateReader, StateWriter, StaticFileProviderFactory, StatsReader, StorageReader,
StorageTrieWriter, TransactionVariant, TransactionsProvider, TransactionsProviderExt,
TrieWriter, WithdrawalsProvider,
};
use alloy_eips::BlockHashOrNumber;
use alloy_primitives::{keccak256, Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, U256};
Expand Down Expand Up @@ -3596,7 +3597,7 @@ impl<TX: DbTx, Spec: Send + Sync> StatsReader for DatabaseProvider<TX, Spec> {
}
}

impl<TX: DbTx, Spec: Send + Sync> FinalizedBlockReader for DatabaseProvider<TX, Spec> {
impl<TX: DbTx, Spec: Send + Sync> ChainStateBlockReader for DatabaseProvider<TX, Spec> {
fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
let mut finalized_blocks = self
.tx
Expand All @@ -3608,14 +3609,32 @@ impl<TX: DbTx, Spec: Send + Sync> FinalizedBlockReader for DatabaseProvider<TX,
let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
Ok(last_finalized_block_number)
}

fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>> {
let mut finalized_blocks = self
.tx
.cursor_read::<tables::ChainState>()?
.walk(Some(tables::ChainStateKey::LastSafeBlockBlock))?
.take(1)
.collect::<Result<BTreeMap<tables::ChainStateKey, BlockNumber>, _>>()?;

let last_finalized_block_number = finalized_blocks.pop_first().map(|pair| pair.1);
Ok(last_finalized_block_number)
}
}

impl<TX: DbTxMut, Spec: Send + Sync> FinalizedBlockWriter for DatabaseProvider<TX, Spec> {
impl<TX: DbTxMut, Spec: Send + Sync> ChainStateBlockWriter for DatabaseProvider<TX, Spec> {
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
Ok(self
.tx
.put::<tables::ChainState>(tables::ChainStateKey::LastFinalizedBlock, block_number)?)
}

fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()> {
Ok(self
.tx
.put::<tables::ChainState>(tables::ChainStateKey::LastSafeBlockBlock, block_number)?)
}
}

impl<TX: DbTx + 'static, Spec: Send + Sync + 'static> DBProvider for DatabaseProvider<TX, Spec> {
Expand Down
16 changes: 12 additions & 4 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use crate::{
AccountReader, BlockHashReader, BlockIdReader, BlockNumReader, BlockReader, BlockReaderIdExt,
BlockSource, BlockchainTreePendingStateProvider, CanonChainTracker, CanonStateNotifications,
CanonStateSubscriptions, ChainSpecProvider, ChangeSetReader, DatabaseProviderFactory,
EvmEnvProvider, FinalizedBlockReader, FullExecutionDataProvider, HeaderProvider, ProviderError,
PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
CanonStateSubscriptions, ChainSpecProvider, ChainStateBlockReader, ChangeSetReader,
DatabaseProviderFactory, EvmEnvProvider, FullExecutionDataProvider, HeaderProvider,
ProviderError, PruneCheckpointReader, ReceiptProvider, ReceiptProviderIdExt, RequestsProvider,
StageCheckpointReader, StateProviderBox, StateProviderFactory, StaticFileProviderFactory,
TransactionVariant, TransactionsProvider, TreeViewer, WithdrawalsProvider,
};
Expand Down Expand Up @@ -109,8 +109,9 @@ impl<N: ProviderNodeTypes> BlockchainProvider<N> {
tree: Arc<dyn TreeViewer>,
latest: SealedHeader,
finalized: Option<SealedHeader>,
safe: Option<SealedHeader>,
) -> Self {
Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized) }
Self { database, tree, chain_info: ChainInfoTracker::new(latest, finalized, safe) }
}

/// Create a new provider using only the database and the tree, fetching the latest header from
Expand All @@ -128,11 +129,18 @@ impl<N: ProviderNodeTypes> BlockchainProvider<N> {
.transpose()?
.flatten();

let safe_header = provider
.last_safe_block_number()?
.map(|num| provider.sealed_header(num))
.transpose()?
.flatten();

Ok(Self::with_blocks(
database,
tree,
SealedHeader::new(latest_header, best.best_hash),
finalized_header,
safe_header,
))
}

Expand Down
15 changes: 11 additions & 4 deletions crates/storage/provider/src/traits/finalized_block.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use alloy_primitives::BlockNumber;
use reth_errors::ProviderResult;

/// Functionality to read the last known finalized block from the database.
pub trait FinalizedBlockReader: Send + Sync {
/// Functionality to read the last known chain blocks from the database.
pub trait ChainStateBlockReader: Send + Sync {
/// Returns the last finalized block number.
///
/// If no finalized block has been written yet, this returns `None`.
fn last_finalized_block_number(&self) -> ProviderResult<Option<BlockNumber>>;
/// Returns the last safe block number.
///
/// If no safe block has been written yet, this returns `None`.
fn last_safe_block_number(&self) -> ProviderResult<Option<BlockNumber>>;
}

/// Functionality to write the last known finalized block to the database.
pub trait FinalizedBlockWriter: Send + Sync {
/// Functionality to write the last known chain blocks to the database.
pub trait ChainStateBlockWriter: Send + Sync {
/// Saves the given finalized block number in the DB.
fn save_finalized_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>;

/// Saves the given safe block number in the DB.
fn save_safe_block_number(&self, block_number: BlockNumber) -> ProviderResult<()>;
}
Loading

0 comments on commit 1ba631b

Please sign in to comment.