From ae470d70041339f00de5d404ff75bcc3e0cf31eb Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 4 Mar 2024 15:46:49 +1100 Subject: [PATCH] Add `DataColumnSidecarsByRoot ` req/resp protocol (#5196) * Add stub for `DataColumnsByRoot` * Add basic implementation of serving RPC data column from DA checker. * Store data columns in early attester cache and blobs db. * Apply suggestions from code review Co-authored-by: Eitan Seri-Levi Co-authored-by: Jacob Kaufmann * Fix build. * Store `DataColumnInfo` in database and various cleanups. * Update `DataColumnSidecar` ssz max size and remove panic code. --------- Co-authored-by: Eitan Seri-Levi Co-authored-by: Jacob Kaufmann --- beacon_node/beacon_chain/src/beacon_chain.rs | 95 ++++++-- .../beacon_chain/src/blob_verification.rs | 41 +--- .../beacon_chain/src/block_verification.rs | 32 +++ .../src/block_verification_types.rs | 35 ++- beacon_node/beacon_chain/src/builder.rs | 10 + .../src/data_availability_checker.rs | 102 +++++--- .../availability_view.rs | 95 +++++++- .../child_components.rs | 14 +- .../src/data_availability_checker/error.rs | 2 + .../overflow_lru_cache.rs | 168 +++++++++++++- .../processing_cache.rs | 6 + .../src/data_column_verification.rs | 218 ++++++++++++++++++ .../beacon_chain/src/early_attester_cache.rs | 14 +- .../beacon_chain/src/historical_blocks.rs | 25 +- beacon_node/beacon_chain/src/lib.rs | 1 + beacon_node/beacon_chain/tests/store_tests.rs | 4 +- beacon_node/beacon_processor/src/lib.rs | 17 +- .../src/peer_manager/mod.rs | 10 +- .../src/rpc/codec/ssz_snappy.rs | 32 ++- .../lighthouse_network/src/rpc/config.rs | 13 ++ .../lighthouse_network/src/rpc/methods.rs | 33 ++- beacon_node/lighthouse_network/src/rpc/mod.rs | 1 + .../lighthouse_network/src/rpc/outbound.rs | 9 + .../lighthouse_network/src/rpc/protocol.rs | 22 +- .../src/rpc/rate_limiter.rs | 14 ++ .../src/service/api_types.rs | 13 +- .../lighthouse_network/src/service/mod.rs | 15 ++ .../gossip_methods.rs | 58 ++++- .../src/network_beacon_processor/mod.rs | 21 +- .../network_beacon_processor/rpc_methods.rs | 93 +++++++- .../network_beacon_processor/sync_methods.rs | 5 + beacon_node/network/src/router.rs | 19 +- .../sync/block_lookups/single_block_lookup.rs | 2 + .../network/src/sync/block_lookups/tests.rs | 4 +- beacon_node/network/src/sync/manager.rs | 2 +- beacon_node/store/src/errors.rs | 2 + beacon_node/store/src/hot_cold_store.rs | 202 +++++++++++++++- beacon_node/store/src/lib.rs | 6 + beacon_node/store/src/metadata.rs | 28 +++ beacon_node/store/src/metrics.rs | 4 + consensus/types/src/chain_spec.rs | 38 +++ consensus/types/src/data_column_sidecar.rs | 71 +++++- 42 files changed, 1465 insertions(+), 131 deletions(-) create mode 100644 beacon_node/beacon_chain/src/data_column_verification.rs diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index ff856ef7f35..58960ed8078 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,9 +7,7 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckCaches}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; -use crate::blob_verification::{ - GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar, -}; +use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::{ @@ -25,6 +23,7 @@ use crate::chain_config::ChainConfig; use crate::data_availability_checker::{ Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker, }; +use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn}; use crate::early_attester_cache::EarlyAttesterCache; use crate::errors::{BeaconChainError as Error, BlockProductionError}; use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend}; @@ -124,6 +123,7 @@ use tokio_stream::Stream; use tree_hash::TreeHash; use types::beacon_state::CloneConfig; use types::blob_sidecar::{BlobSidecarList, FixedBlobSidecarList}; +use types::data_column_sidecar::DataColumnSidecarList; use types::payload::BlockProductionVersion; use types::*; @@ -1176,6 +1176,15 @@ impl BeaconChain { .map_or_else(|| self.get_blobs(block_root), Ok) } + pub fn get_data_columns_checking_early_attester_cache( + &self, + block_root: &Hash256, + ) -> Result, Error> { + self.early_attester_cache + .get_data_columns(*block_root) + .map_or_else(|| self.get_data_columns(block_root), Ok) + } + /// Returns the block at the given root, if any. /// /// ## Errors @@ -1251,6 +1260,20 @@ impl BeaconChain { } } + /// Returns the data columns at the given root, if any. + /// + /// ## Errors + /// May return a database error. + pub fn get_data_columns( + &self, + block_root: &Hash256, + ) -> Result, Error> { + match self.store.get_data_columns(block_root)? { + Some(data_columns) => Ok(data_columns), + None => Ok(DataColumnSidecarList::default()), + } + } + pub fn get_blinded_block( &self, block_root: &Hash256, @@ -2088,10 +2111,10 @@ impl BeaconChain { self: &Arc, data_column_sidecar: Arc>, subnet_id: u64, - ) -> Result, GossipBlobError> { + ) -> Result, GossipDataColumnError> { metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS); let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); - GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| { + GossipVerifiedDataColumn::new(data_column_sidecar, subnet_id, self).map(|v| { metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES); v }) @@ -2912,18 +2935,28 @@ impl BeaconChain { self.remove_notified(&block_root, r) } - pub fn process_gossip_data_column( + /// Cache the data column in the processing cache, process it, then evict it from the cache if it was + /// imported or errors. + pub async fn process_gossip_data_column( self: &Arc, - gossip_verified_data_column: GossipVerifiedDataColumnSidecar, - ) { - let data_column = gossip_verified_data_column.as_data_column(); - // TODO(das) send to DA checker - info!( - self.log, - "Processed gossip data column"; - "index" => data_column.index, - "slot" => data_column.slot().as_u64() - ); + data_column: GossipVerifiedDataColumn, + ) -> Result> { + let block_root = data_column.block_root(); + + // If this block has already been imported to forkchoice it must have been available, so + // we don't need to process its samples again. + if self + .canonical_head + .fork_choice_read_lock() + .contains_block(&block_root) + { + return Err(BlockError::BlockIsAlreadyKnown); + } + + let r = self + .check_gossip_data_column_availability_and_import(data_column) + .await; + self.remove_notified(&block_root, r) } /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was @@ -3198,6 +3231,23 @@ impl BeaconChain { self.process_availability(slot, availability).await } + /// Checks if the provided data column can make any cached blocks available, and imports immediately + /// if so, otherwise caches the data column in the data availability checker. + async fn check_gossip_data_column_availability_and_import( + self: &Arc, + data_column: GossipVerifiedDataColumn, + ) -> Result> { + let slot = data_column.slot(); + if let Some(slasher) = self.slasher.as_ref() { + slasher.accept_block_header(data_column.signed_block_header()); + } + let availability = self + .data_availability_checker + .put_gossip_data_column(data_column)?; + + self.process_availability(slot, availability).await + } + /// Checks if the provided blobs can make any cached blocks available, and imports immediately /// if so, otherwise caches the blob in the data availability checker. async fn check_rpc_blob_availability_and_import( @@ -3475,7 +3525,7 @@ impl BeaconChain { // If the write fails, revert fork choice to the version from disk, else we can // end up with blocks in fork choice that are missing from disk. // See https://github.com/sigp/lighthouse/issues/2028 - let (_, signed_block, blobs) = signed_block.deconstruct(); + let (_, signed_block, blobs, data_columns) = signed_block.deconstruct(); let block = signed_block.message(); ops.extend( confirmed_state_roots @@ -3496,6 +3546,17 @@ impl BeaconChain { } } + if let Some(data_columns) = data_columns { + if !data_columns.is_empty() { + debug!( + self.log, "Writing data_columns to store"; + "block_root" => %block_root, + "count" => data_columns.len(), + ); + ops.push(StoreOp::PutDataColumns(block_root, data_columns)); + } + } + let txn_lock = self.store.hot_db.begin_rw_transaction(); if let Err(e) = self.store.do_atomically_with_block_and_blobs_cache(ops) { diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index ffc64f9d10e..f2d150d72bf 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -17,8 +17,7 @@ use ssz_types::VariableList; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256, - SignedBeaconBlockHeader, Slot, + BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot, }; /// An error occurred while validating a gossip blob. @@ -185,33 +184,6 @@ pub type GossipVerifiedBlobList = VariableList< <::EthSpec as EthSpec>::MaxBlobsPerBlock, >; -#[derive(Debug)] -pub struct GossipVerifiedDataColumnSidecar { - data_column_sidecar: Arc>, -} - -impl GossipVerifiedDataColumnSidecar { - pub fn new( - column_sidecar: Arc>, - subnet_id: u64, - chain: &BeaconChain, - ) -> Result> { - let header = column_sidecar.signed_block_header.clone(); - // We only process slashing info if the gossip verification failed - // since we do not process the blob any further in that case. - validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { - process_block_slash_info::<_, GossipBlobError>( - chain, - BlockSlashInfo::from_early_error_blob(header, e), - ) - }) - } - - pub fn as_data_column(&self) -> &Arc> { - &self.data_column_sidecar - } -} - /// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on /// the p2p network. #[derive(Debug)] @@ -675,17 +647,6 @@ pub fn validate_blob_sidecar_for_gossip( }) } -pub fn validate_data_column_sidecar_for_gossip( - data_column_sidecar: Arc>, - _subnet: u64, - _chain: &BeaconChain, -) -> Result, GossipBlobError> { - // TODO(das): validate kzg commitments, cell proofs etc - Ok(GossipVerifiedDataColumnSidecar { - data_column_sidecar: data_column_sidecar.clone(), - }) -} - /// Returns the canonical root of the given `blob`. /// /// Use this function to ensure that we report the blob hashing time Prometheus metric. diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index ac3d3e3ab80..b896327e06f 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -53,6 +53,7 @@ use crate::block_verification_types::{ AsBlock, BlockContentsError, BlockImportData, GossipVerifiedBlockContents, RpcBlock, }; use crate::data_availability_checker::{AvailabilityCheckError, MaybeAvailableBlock}; +use crate::data_column_verification::GossipDataColumnError; use crate::eth1_finalization_cache::Eth1FinalizationData; use crate::execution_payload::{ is_optimistic_candidate_block, validate_execution_payload_for_gossip, validate_merge_block, @@ -528,6 +529,20 @@ impl BlockSlashInfo> { } } +impl BlockSlashInfo> { + pub fn from_early_error_data_column( + header: SignedBeaconBlockHeader, + e: GossipDataColumnError, + ) -> Self { + match e { + GossipDataColumnError::ProposalSignatureInvalid => BlockSlashInfo::SignatureInvalid(e), + // `InvalidSignature` could indicate any signature in the block, so we want + // to recheck the proposer signature alone. + _ => BlockSlashInfo::SignatureNotChecked(header, e), + } + } +} + /// Process invalid blocks to see if they are suitable for the slasher. /// /// If no slasher is configured, this is a no-op. @@ -2002,6 +2017,23 @@ impl BlockBlobError for GossipBlobError { } } +impl BlockBlobError for GossipDataColumnError { + fn not_later_than_parent_error(data_column_slot: Slot, parent_slot: Slot) -> Self { + GossipDataColumnError::DataColumnIsNotLaterThanParent { + data_column_slot, + parent_slot, + } + } + + fn unknown_validator_error(validator_index: u64) -> Self { + GossipDataColumnError::UnknownValidator(validator_index) + } + + fn proposer_signature_invalid() -> Self { + GossipDataColumnError::ProposalSignatureInvalid + } +} + /// Performs a cheap (time-efficient) state advancement so the committees and proposer shuffling for /// `slot` can be obtained from `state`. /// diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index edba7a211cb..263a6eab074 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -9,6 +9,7 @@ use ssz_types::VariableList; use state_processing::ConsensusContext; use std::sync::Arc; use types::blob_sidecar::{BlobIdentifier, BlobSidecarError, FixedBlobSidecarList}; +use types::data_column_sidecar::DataColumnSidecarList; use types::{ BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, @@ -43,6 +44,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(block) => block, RpcBlockInner::BlockAndBlobs(block, _) => block, + RpcBlockInner::BlockAndDataColumns(block, _) => block, } } @@ -50,6 +52,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(block) => block.clone(), RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), + RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(), } } @@ -57,6 +60,7 @@ impl RpcBlock { match &self.block { RpcBlockInner::Block(_) => None, RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs), + RpcBlockInner::BlockAndDataColumns(_, _) => None, } } } @@ -72,6 +76,9 @@ enum RpcBlockInner { /// This variant is used with parent lookups and by-range responses. It should have all blobs /// ordered, all block roots matching, and the correct number of blobs for this block. BlockAndBlobs(Arc>, BlobSidecarList), + /// This variant is used with parent lookups and by-range responses. It should have all data columns + /// ordered, all block roots matching, and the correct number of data columns for this block. + BlockAndDataColumns(Arc>, DataColumnSidecarList), } impl RpcBlock { @@ -141,25 +148,36 @@ impl RpcBlock { Self::new(Some(block_root), block, blobs) } + #[allow(clippy::type_complexity)] pub fn deconstruct( self, ) -> ( Hash256, Arc>, Option>, + Option>, ) { let block_root = self.block_root(); match self.block { - RpcBlockInner::Block(block) => (block_root, block, None), - RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs)), + RpcBlockInner::Block(block) => (block_root, block, None, None), + RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None), + RpcBlockInner::BlockAndDataColumns(block, data_columns) => { + (block_root, block, None, Some(data_columns)) + } } } pub fn n_blobs(&self) -> usize { match &self.block { - RpcBlockInner::Block(_) => 0, + RpcBlockInner::Block(_) | RpcBlockInner::BlockAndDataColumns(_, _) => 0, RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), } } + pub fn n_data_columns(&self) -> usize { + match &self.block { + RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0, + RpcBlockInner::BlockAndDataColumns(_, data_columns) => data_columns.len(), + } + } } /// A block that has gone through all pre-deneb block processing checks including block processing @@ -485,12 +503,13 @@ impl AsBlock for AvailableBlock { } fn into_rpc_block(self) -> RpcBlock { - let (block_root, block, blobs_opt) = self.deconstruct(); + let (block_root, block, blobs_opt, data_columns_opt) = self.deconstruct(); // Circumvent the constructor here, because an Available block will have already had // consistency checks performed. - let inner = match blobs_opt { - None => RpcBlockInner::Block(block), - Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs), + let inner = match (blobs_opt, data_columns_opt) { + (None, None) => RpcBlockInner::Block(block), + (Some(blobs), _) => RpcBlockInner::BlockAndBlobs(block, blobs), + (_, Some(data_columns)) => RpcBlockInner::BlockAndDataColumns(block, data_columns), }; RpcBlock { block_root, @@ -522,12 +541,14 @@ impl AsBlock for RpcBlock { match &self.block { RpcBlockInner::Block(block) => block, RpcBlockInner::BlockAndBlobs(block, _) => block, + RpcBlockInner::BlockAndDataColumns(block, _) => block, } } fn block_cloned(&self) -> Arc> { match &self.block { RpcBlockInner::Block(block) => block.clone(), RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), + RpcBlockInner::BlockAndDataColumns(block, _) => block.clone(), } } fn canonical_root(&self) -> Hash256 { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index c75c3f695b3..a1d2706726b 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -407,6 +407,11 @@ where .init_blob_info(genesis.beacon_block.slot()) .map_err(|e| format!("Failed to initialize genesis blob info: {:?}", e))?, ); + self.pending_io_batch.push( + store + .init_data_column_info(genesis.beacon_block.slot()) + .map_err(|e| format!("Failed to initialize genesis data column info: {:?}", e))?, + ); let fc_store = BeaconForkChoiceStore::get_forkchoice_store(store, &genesis) .map_err(|e| format!("Unable to initialize fork choice store: {e:?}"))?; @@ -564,6 +569,11 @@ where .init_blob_info(weak_subj_block.slot()) .map_err(|e| format!("Failed to initialize blob info: {:?}", e))?, ); + self.pending_io_batch.push( + store + .init_data_column_info(weak_subj_block.slot()) + .map_err(|e| format!("Failed to initialize data column info: {:?}", e))?, + ); // Store pruning checkpoint to prevent attempting to prune before the anchor state. self.pending_io_batch diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index f906032ecd2..9a4f5eea048 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -22,7 +22,9 @@ use std::sync::Arc; use task_executor::TaskExecutor; use types::beacon_block_body::KzgCommitmentOpts; use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; -use types::{BlobSidecarList, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot}; +use types::{ + BlobSidecarList, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot, +}; mod availability_view; mod child_components; @@ -31,7 +33,9 @@ mod overflow_lru_cache; mod processing_cache; mod state_lru_cache; +use crate::data_column_verification::{verify_kzg_for_data_column_list, GossipVerifiedDataColumn}; pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory}; +use types::data_column_sidecar::{DataColumnIdentifier, DataColumnSidecarList}; use types::non_zero_usize::new_non_zero_usize; /// The LRU Cache stores `PendingComponents` which can store up to @@ -200,6 +204,14 @@ impl DataAvailabilityChecker { .and_then(|cached| cached.block.clone()) } + /// Get a data column from the availability cache. + pub fn get_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + self.availability_cache.peek_data_column(data_column_id) + } + /// Put a list of blobs received via RPC into the availability cache. This performs KZG /// verification on the blobs in the list. pub fn put_rpc_blobs( @@ -231,6 +243,21 @@ impl DataAvailabilityChecker { .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) } + /// Check if we've cached other data columns for this block. If it satisfies the custody requirement and we also + /// have a block cached, return the `Availability` variant triggering block import. + /// Otherwise cache the data column sidecar. + /// + /// This should only accept gossip verified data columns, so we should not have to worry about dupes. + pub fn put_gossip_data_column( + &self, + gossip_data_column: GossipVerifiedDataColumn, + ) -> Result, AvailabilityCheckError> { + self.availability_cache.put_kzg_verified_data_columns( + gossip_data_column.block_root(), + vec![gossip_data_column.into_inner()], + ) + } + /// Check if we have all the blobs for a block. Returns `Availability` which has information /// about whether all components have been received or more are required. pub fn put_pending_executed_block( @@ -250,9 +277,9 @@ impl DataAvailabilityChecker { &self, block: RpcBlock, ) -> Result, AvailabilityCheckError> { - let (block_root, block, blobs) = block.deconstruct(); - match blobs { - None => { + let (block_root, block, blobs, data_columns) = block.deconstruct(); + match (blobs, data_columns) { + (None, None) => { if self.blobs_required_for_block(&block) { Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) } else { @@ -260,25 +287,35 @@ impl DataAvailabilityChecker { block_root, block, blobs: None, + data_columns: None, })) } } - Some(blob_list) => { - let verified_blobs = if self.blobs_required_for_block(&block) { - let kzg = self - .kzg - .as_ref() - .ok_or(AvailabilityCheckError::KzgNotInitialized)?; - verify_kzg_for_blob_list(blob_list.iter(), kzg) - .map_err(AvailabilityCheckError::Kzg)?; - Some(blob_list) - } else { - None - }; + (maybe_blob_list, maybe_data_column_list) => { + let (verified_blobs, verified_data_column) = + if self.blobs_required_for_block(&block) { + let kzg = self + .kzg + .as_ref() + .ok_or(AvailabilityCheckError::KzgNotInitialized)?; + + if let Some(blob_list) = maybe_blob_list.as_ref() { + verify_kzg_for_blob_list(blob_list.iter(), kzg) + .map_err(AvailabilityCheckError::Kzg)?; + } + if let Some(data_column_list) = maybe_data_column_list.as_ref() { + verify_kzg_for_data_column_list(data_column_list.iter(), kzg) + .map_err(AvailabilityCheckError::Kzg)?; + } + (maybe_blob_list, maybe_data_column_list) + } else { + (None, None) + }; Ok(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, blobs: verified_blobs, + data_columns: verified_data_column, })) } } @@ -314,9 +351,9 @@ impl DataAvailabilityChecker { } for block in blocks { - let (block_root, block, blobs) = block.deconstruct(); - match blobs { - None => { + let (block_root, block, blobs, data_columns) = block.deconstruct(); + match (blobs, data_columns) { + (None, None) => { if self.blobs_required_for_block(&block) { results.push(MaybeAvailableBlock::AvailabilityPending { block_root, block }) } else { @@ -324,20 +361,23 @@ impl DataAvailabilityChecker { block_root, block, blobs: None, + data_columns: None, })) } } - Some(blob_list) => { - let verified_blobs = if self.blobs_required_for_block(&block) { - Some(blob_list) - } else { - None - }; + (maybe_blob_list, maybe_data_column_list) => { + let (verified_blobs, verified_data_columns) = + if self.blobs_required_for_block(&block) { + (maybe_blob_list, maybe_data_column_list) + } else { + (None, None) + }; // already verified kzg for all blobs results.push(MaybeAvailableBlock::Available(AvailableBlock { block_root, block, blobs: verified_blobs, + data_columns: verified_data_columns, })) } } @@ -564,6 +604,7 @@ pub struct AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, + data_columns: Option>, } impl AvailableBlock { @@ -571,11 +612,13 @@ impl AvailableBlock { block_root: Hash256, block: Arc>, blobs: Option>, + data_columns: Option>, ) -> Self { Self { block_root, block, blobs, + data_columns, } } @@ -590,19 +633,26 @@ impl AvailableBlock { self.blobs.as_ref() } + pub fn data_columns(&self) -> Option<&DataColumnSidecarList> { + self.data_columns.as_ref() + } + + #[allow(clippy::type_complexity)] pub fn deconstruct( self, ) -> ( Hash256, Arc>, Option>, + Option>, ) { let AvailableBlock { block_root, block, blobs, + data_columns, } = self; - (block_root, block, blobs) + (block_root, block, blobs, data_columns) } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs index 65093db26bd..f79f28b1cad 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/availability_view.rs @@ -4,11 +4,12 @@ use crate::blob_verification::KzgVerifiedBlob; use crate::block_verification_types::AsBlock; use crate::data_availability_checker::overflow_lru_cache::PendingComponents; use crate::data_availability_checker::ProcessingComponents; +use crate::data_column_verification::KzgVerifiedDataColumn; use kzg::KzgCommitment; use ssz_types::FixedVector; use std::sync::Arc; use types::beacon_block_body::KzgCommitments; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; /// Defines an interface for managing data availability with two key invariants: /// @@ -26,12 +27,20 @@ pub trait AvailabilityView { /// The type representing a blob in the implementation. Must implement `Clone`. type BlobType: Clone + GetCommitment; + /// The type representing a data column in the implementation. + type DataColumnType: Clone; + /// Returns an immutable reference to the cached block. fn get_cached_block(&self) -> &Option; /// Returns an immutable reference to the fixed vector of cached blobs. fn get_cached_blobs(&self) -> &FixedVector, E::MaxBlobsPerBlock>; + /// Returns an immutable reference to the fixed vector of cached data columns. + fn get_cached_data_columns( + &self, + ) -> &FixedVector, E::DataColumnCount>; + /// Returns a mutable reference to the cached block. fn get_cached_block_mut(&mut self) -> &mut Option; @@ -40,6 +49,11 @@ pub trait AvailabilityView { &mut self, ) -> &mut FixedVector, E::MaxBlobsPerBlock>; + /// Returns a mutable reference to the fixed vector of cached data columns. + fn get_cached_data_columns_mut( + &mut self, + ) -> &mut FixedVector, E::DataColumnCount>; + /// Checks if a block exists in the cache. /// /// Returns: @@ -61,6 +75,18 @@ pub trait AvailabilityView { .unwrap_or(false) } + /// Checks if a data column exists at the given index in the cache. + /// + /// Returns: + /// - `true` if a data column exists at the given index. + /// - `false` otherwise. + fn data_column_exists(&self, data_colum_index: usize) -> bool { + self.get_cached_data_columns() + .get(data_colum_index) + .map(|d| d.is_some()) + .unwrap_or(false) + } + /// Returns the number of blobs that are expected to be present. Returns `None` if we don't have a /// block. /// @@ -90,6 +116,42 @@ pub trait AvailabilityView { } } + /// Inserts a data column at a specific index in the cache. + /// + /// Existing data column at the index will be replaced. + fn insert_data_column_at_index( + &mut self, + data_column_index: usize, + data_column: Self::DataColumnType, + ) { + if let Some(b) = self + .get_cached_data_columns_mut() + .get_mut(data_column_index) + { + *b = Some(data_column); + } + } + + /// Merges a given set of data columns into the cache. + /// + /// Data columns are only inserted if: + /// 1. The data column entry at the index is empty and no block exists. + /// 2. The block exists and its commitments matches the data column's commitments. + fn merge_data_columns( + &mut self, + data_columns: FixedVector, E::DataColumnCount>, + ) { + for (index, data_column) in data_columns.iter().cloned().enumerate() { + let Some(data_column) = data_column else { + continue; + }; + // TODO(das): Add equivalent checks for data columns if necessary + if !self.data_column_exists(index) { + self.insert_data_column_at_index(index, data_column) + } + } + } + /// Merges a given set of blobs into the cache. /// /// Blobs are only inserted if: @@ -148,14 +210,16 @@ pub trait AvailabilityView { /// - `$struct_name`: The name of the struct for which to implement `AvailabilityView`. /// - `$block_type`: The type to use for `BlockType` in the `AvailabilityView` trait. /// - `$blob_type`: The type to use for `BlobType` in the `AvailabilityView` trait. +/// - `$data_column_type`: The type to use for `DataColumnType` in the `AvailabilityView` trait. /// - `$block_field`: The field name in the struct that holds the cached block. -/// - `$blob_field`: The field name in the struct that holds the cached blobs. +/// - `$data_column_field`: The field name in the struct that holds the cached data columns. #[macro_export] macro_rules! impl_availability_view { - ($struct_name:ident, $block_type:ty, $blob_type:ty, $block_field:ident, $blob_field:ident) => { + ($struct_name:ident, $block_type:ty, $blob_type:ty, $data_column_type:ty, $block_field:ident, $blob_field:ident, $data_column_field:ident) => { impl AvailabilityView for $struct_name { type BlockType = $block_type; type BlobType = $blob_type; + type DataColumnType = $data_column_type; fn get_cached_block(&self) -> &Option { &self.$block_field @@ -167,6 +231,12 @@ macro_rules! impl_availability_view { &self.$blob_field } + fn get_cached_data_columns( + &self, + ) -> &FixedVector, E::DataColumnCount> { + &self.$data_column_field + } + fn get_cached_block_mut(&mut self) -> &mut Option { &mut self.$block_field } @@ -176,6 +246,12 @@ macro_rules! impl_availability_view { ) -> &mut FixedVector, E::MaxBlobsPerBlock> { &mut self.$blob_field } + + fn get_cached_data_columns_mut( + &mut self, + ) -> &mut FixedVector, E::DataColumnCount> { + &mut self.$data_column_field + } } }; } @@ -184,24 +260,30 @@ impl_availability_view!( ProcessingComponents, Arc>, KzgCommitment, + (), block, - blob_commitments + blob_commitments, + data_column_opts ); impl_availability_view!( PendingComponents, DietAvailabilityPendingExecutedBlock, KzgVerifiedBlob, + KzgVerifiedDataColumn, executed_block, - verified_blobs + verified_blobs, + verified_data_columns ); impl_availability_view!( ChildComponents, Arc>, Arc>, + Arc>, downloaded_block, - downloaded_blobs + downloaded_blobs, + downloaded_data_columns ); pub trait GetCommitments { @@ -247,6 +329,7 @@ impl GetCommitments for Arc> { .unwrap_or_default() } } + impl GetCommitment for Arc> { fn get_commitment(&self) -> &KzgCommitment { &self.kzg_commitment diff --git a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs index 028bf9d67c8..09cc5da9027 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/child_components.rs @@ -3,6 +3,7 @@ use crate::data_availability_checker::AvailabilityView; use bls::Hash256; use std::sync::Arc; use types::blob_sidecar::FixedBlobSidecarList; +use types::data_column_sidecar::FixedDataColumnSidecarList; use types::{EthSpec, SignedBeaconBlock}; /// For requests triggered by an `UnknownBlockParent` or `UnknownBlobParent`, this struct @@ -13,15 +14,19 @@ pub struct ChildComponents { pub block_root: Hash256, pub downloaded_block: Option>>, pub downloaded_blobs: FixedBlobSidecarList, + pub downloaded_data_columns: FixedDataColumnSidecarList, } impl From> for ChildComponents { fn from(value: RpcBlock) -> Self { - let (block_root, block, blobs) = value.deconstruct(); + let (block_root, block, blobs, data_columns) = value.deconstruct(); let fixed_blobs = blobs.map(|blobs| { FixedBlobSidecarList::from(blobs.into_iter().map(Some).collect::>()) }); - Self::new(block_root, Some(block), fixed_blobs) + let fixed_data_columns = data_columns.map(|data_columns| { + FixedDataColumnSidecarList::from(data_columns.into_iter().map(Some).collect::>()) + }); + Self::new(block_root, Some(block), fixed_blobs, fixed_data_columns) } } @@ -31,12 +36,14 @@ impl ChildComponents { block_root, downloaded_block: None, downloaded_blobs: <_>::default(), + downloaded_data_columns: <_>::default(), } } pub fn new( block_root: Hash256, block: Option>>, blobs: Option>, + data_columns: Option>, ) -> Self { let mut cache = Self::empty(block_root); if let Some(block) = block { @@ -45,6 +52,9 @@ impl ChildComponents { if let Some(blobs) = blobs { cache.merge_blobs(blobs); } + if let Some(data_columns) = data_columns { + cache.merge_data_columns(data_columns); + } cache } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index 0804fe3b9ab..9e52b34185f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -14,6 +14,7 @@ pub enum Error { SszTypes(ssz_types::Error), MissingBlobs, BlobIndexInvalid(u64), + DataColumnIndexInvalid(u64), StoreError(store::Error), DecodeError(ssz::DecodeError), ParentStateMissing(Hash256), @@ -42,6 +43,7 @@ impl Error { | Error::RebuildingStateCaches(_) => ErrorCategory::Internal, Error::Kzg(_) | Error::BlobIndexInvalid(_) + | Error::DataColumnIndexInvalid(_) | Error::KzgCommitmentMismatch { .. } | Error::KzgVerificationFailed => ErrorCategory::Malicious, } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 80cbc6c8990..246daf9579d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -35,6 +35,7 @@ use crate::block_verification_types::{ }; use crate::data_availability_checker::availability_view::AvailabilityView; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; +use crate::data_column_verification::KzgVerifiedDataColumn; use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; @@ -45,7 +46,8 @@ use ssz_types::{FixedVector, VariableList}; use std::num::NonZeroUsize; use std::{collections::HashSet, sync::Arc}; use types::blob_sidecar::BlobIdentifier; -use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; +use types::data_column_sidecar::DataColumnIdentifier; +use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256}; /// This represents the components of a partially available block /// @@ -55,6 +57,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, T::MaxBlobsPerBlock>, + pub verified_data_columns: FixedVector>, T::DataColumnCount>, pub executed_block: Option>, } @@ -63,6 +66,7 @@ impl PendingComponents { Self { block_root, verified_blobs: FixedVector::default(), + verified_data_columns: FixedVector::default(), executed_block: None, } } @@ -82,6 +86,7 @@ impl PendingComponents { let Self { block_root, verified_blobs, + verified_data_columns, executed_block, } = self; @@ -100,6 +105,14 @@ impl PendingComponents { }; let verified_blobs = VariableList::new(verified_blobs)?; + // TODO(das) Do we need a check here for number of expected custody columns? + let verified_data_columns = verified_data_columns + .into_iter() + .cloned() + .filter_map(|d| d.map(|d| d.to_data_column())) + .collect::>() + .into(); + let executed_block = recover(diet_executed_block)?; let AvailabilityPendingExecutedBlock { @@ -112,6 +125,7 @@ impl PendingComponents { block_root, block, blobs: Some(verified_blobs), + data_columns: Some(verified_data_columns), }; Ok(Availability::Available(Box::new( AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), @@ -133,6 +147,16 @@ impl PendingComponents { }); } } + for maybe_data_column in self.verified_data_columns.iter() { + if maybe_data_column.is_some() { + return maybe_data_column.as_ref().map(|kzg_verified_data_column| { + kzg_verified_data_column + .as_data_column() + .slot() + .epoch(T::slots_per_epoch()) + }); + } + } None }) } @@ -144,6 +168,7 @@ impl PendingComponents { enum OverflowKey { Block(Hash256), Blob(Hash256, u8), + DataColumn(Hash256, u8), } impl OverflowKey { @@ -160,10 +185,27 @@ impl OverflowKey { Ok(Self::Blob(blob_id.block_root, blob_id.index as u8)) } + pub fn from_data_column_id( + data_column_id: DataColumnIdentifier, + ) -> Result { + if data_column_id.index >= E::number_of_columns() as u64 + || data_column_id.index > u8::MAX as u64 + { + return Err(AvailabilityCheckError::DataColumnIndexInvalid( + data_column_id.index, + )); + } + Ok(Self::DataColumn( + data_column_id.block_root, + data_column_id.index as u8, + )) + } + pub fn root(&self) -> &Hash256 { match self { Self::Block(root) => root, Self::Blob(root, _) => root, + Self::DataColumn(root, _) => root, } } } @@ -203,6 +245,22 @@ impl OverflowStore { .put_bytes(col.as_str(), &key.as_ssz_bytes(), &blob.as_ssz_bytes())? } + for data_column in Vec::from(pending_components.verified_data_columns) + .into_iter() + .flatten() + { + let key = OverflowKey::from_data_column_id::(DataColumnIdentifier { + block_root, + index: data_column.data_column_index(), + })?; + + self.0.hot_db.put_bytes( + col.as_str(), + &key.as_ssz_bytes(), + &data_column.as_ssz_bytes(), + )? + } + Ok(()) } @@ -236,6 +294,16 @@ impl OverflowStore { .ok_or(AvailabilityCheckError::BlobIndexInvalid(index as u64))? = Some(KzgVerifiedBlob::from_ssz_bytes(value_bytes.as_slice())?); } + OverflowKey::DataColumn(_, index) => { + *maybe_pending_components + .get_or_insert_with(|| PendingComponents::empty(block_root)) + .verified_data_columns + .get_mut(index as usize) + .ok_or(AvailabilityCheckError::DataColumnIndexInvalid(index as u64))? = + Some(KzgVerifiedDataColumn::from_ssz_bytes( + value_bytes.as_slice(), + )?); + } } } @@ -267,6 +335,23 @@ impl OverflowStore { .map_err(|e| e.into()) } + /// Load a single data column from the database + pub fn load_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + let key = OverflowKey::from_data_column_id::(*data_column_id)?; + + self.0 + .hot_db + .get_bytes(DBColumn::OverflowLRUCache.as_str(), &key.as_ssz_bytes())? + .map(|data_column_bytes| { + Arc::>::from_ssz_bytes(data_column_bytes.as_slice()) + }) + .transpose() + .map_err(|e| e.into()) + } + /// Delete a set of keys from the database pub fn delete_keys(&self, keys: &Vec) -> Result<(), AvailabilityCheckError> { for key in keys { @@ -322,6 +407,25 @@ impl Critical { } } + /// This only checks for the data columns in memory + pub fn peek_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + if let Some(pending_components) = self.in_memory.peek(&data_column_id.block_root) { + Ok(pending_components + .verified_data_columns + .get(data_column_id.index as usize) + .ok_or(AvailabilityCheckError::DataColumnIndexInvalid( + data_column_id.index, + ))? + .as_ref() + .map(|data_column| data_column.clone_data_column())) + } else { + Ok(None) + } + } + /// Puts the pending components in the LRU cache. If the cache /// is at capacity, the LRU entry is written to the store first pub fn put_pending_components( @@ -425,6 +529,55 @@ impl OverflowLRUCache { } } + /// Fetch a data column from the cache without affecting the LRU ordering + pub fn peek_data_column( + &self, + data_column_id: &DataColumnIdentifier, + ) -> Result>>, AvailabilityCheckError> { + let read_lock = self.critical.read(); + if let Some(data_column) = read_lock.peek_data_column(data_column_id)? { + Ok(Some(data_column)) + } else if read_lock.store_keys.contains(&data_column_id.block_root) { + drop(read_lock); + self.overflow_store.load_data_column(data_column_id) + } else { + Ok(None) + } + } + + pub fn put_kzg_verified_data_columns< + I: IntoIterator>, + >( + &self, + block_root: Hash256, + kzg_verified_data_columns: I, + ) -> Result, AvailabilityCheckError> { + let mut fixed_data_columns = FixedVector::default(); + + for data_column in kzg_verified_data_columns { + if let Some(data_column_opt) = + fixed_data_columns.get_mut(data_column.data_column_index() as usize) + { + *data_column_opt = Some(data_column); + } + } + + let mut write_lock = self.critical.write(); + + // Grab existing entry or create a new entry. + let mut pending_components = write_lock + .pop_pending_components(block_root, &self.overflow_store)? + .unwrap_or_else(|| PendingComponents::empty(block_root)); + + // Merge in the data columns. + pending_components.merge_data_columns(fixed_data_columns); + + write_lock.put_pending_components(block_root, pending_components, &self.overflow_store)?; + + // TODO(das): Currently this does not change availability status and nor import yet. + Ok(Availability::MissingComponents(block_root)) + } + pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, @@ -660,6 +813,14 @@ impl OverflowLRUCache { .slot() .epoch(T::EthSpec::slots_per_epoch()) } + OverflowKey::DataColumn(_, _) => { + KzgVerifiedDataColumn::::from_ssz_bytes( + value_bytes.as_slice(), + )? + .as_data_column() + .slot() + .epoch(T::EthSpec::slots_per_epoch()) + } }; current_block_data = Some(BlockData { keys: vec![overflow_key], @@ -713,6 +874,10 @@ impl ssz::Encode for OverflowKey { block_hash.ssz_append(buf); buf.push(*index + 1) } + OverflowKey::DataColumn(block_hash, index) => { + block_hash.ssz_append(buf); + buf.push(*index + 1) + } } } @@ -724,6 +889,7 @@ impl ssz::Encode for OverflowKey { match self { Self::Block(root) => root.ssz_bytes_len() + 1, Self::Blob(root, _) => root.ssz_bytes_len() + 1, + Self::DataColumn(root, _) => root.ssz_bytes_len() + 1, } } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs index af94803dcfb..7abbd700104 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/processing_cache.rs @@ -1,4 +1,5 @@ use crate::data_availability_checker::AvailabilityView; +use ssz_types::FixedVector; use std::collections::hash_map::Entry; use std::collections::HashMap; use std::sync::Arc; @@ -53,6 +54,9 @@ pub struct ProcessingComponents { /// `KzgCommitments` for blobs are always known, even if we haven't seen the block. See /// `AvailabilityView`'s trait definition for more details. pub blob_commitments: KzgCommitmentOpts, + // TODO(das): `KzgCommitments` are available in every data column sidecar, hence it may not be useful to store them + // again here and a `()` may be sufficient to indicate what we have. + pub data_column_opts: FixedVector, E::DataColumnCount>, } impl ProcessingComponents { @@ -61,6 +65,7 @@ impl ProcessingComponents { slot, block: None, blob_commitments: KzgCommitmentOpts::::default(), + data_column_opts: FixedVector::default(), } } } @@ -73,6 +78,7 @@ impl ProcessingComponents { slot: Slot::new(0), block: None, blob_commitments: KzgCommitmentOpts::::default(), + data_column_opts: FixedVector::default(), } } } diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs new file mode 100644 index 00000000000..2ea8fe1ae4f --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -0,0 +1,218 @@ +use crate::block_verification::{process_block_slash_info, BlockSlashInfo}; +use crate::{BeaconChain, BeaconChainError, BeaconChainTypes}; +use derivative::Derivative; +use kzg::{Error as KzgError, Kzg}; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use types::data_column_sidecar::{ColumnIndex, DataColumnIdentifier}; +use types::{BeaconStateError, DataColumnSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot}; + +/// An error occurred while validating a gossip data column. +#[derive(Debug)] +pub enum GossipDataColumnError { + /// There was an error whilst processing the data column. It is not known if it is + /// valid or invalid. + /// + /// ## Peer scoring + /// + /// We were unable to process this data column due to an internal error. It's + /// unclear if the data column is valid. + BeaconChainError(BeaconChainError), + + /// The proposal signature in invalid. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + ProposalSignatureInvalid, + + /// The proposal_index corresponding to data column.beacon_block_root is not known. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + UnknownValidator(u64), + + /// The provided data column is not from a later slot than its parent. + /// + /// ## Peer scoring + /// + /// The data column is invalid and the peer is faulty. + DataColumnIsNotLaterThanParent { + data_column_slot: Slot, + parent_slot: Slot, + }, + + /// `Kzg` struct hasn't been initialized. This is an internal error. + /// + /// ## Peer scoring + /// + /// The peer isn't faulty, This is an internal error. + KzgNotInitialized, + + /// The kzg verification failed. + /// + /// ## Peer scoring + /// + /// The data column sidecar is invalid and the peer is faulty. + KzgError(kzg::Error), + + /// The provided data column's parent block is unknown. + /// + /// ## Peer scoring + /// + /// We cannot process the data column without validating its parent, the peer isn't necessarily faulty. + DataColumnParentUnknown(Arc>), +} + +impl From for GossipDataColumnError { + fn from(e: BeaconChainError) -> Self { + GossipDataColumnError::BeaconChainError(e) + } +} + +impl From for GossipDataColumnError { + fn from(e: BeaconStateError) -> Self { + GossipDataColumnError::BeaconChainError(BeaconChainError::BeaconStateError(e)) + } +} + +/// A wrapper around a `DataColumnSidecar` that indicates it has been approved for re-gossiping on +/// the p2p network. +#[derive(Debug)] +pub struct GossipVerifiedDataColumn { + block_root: Hash256, + data_column: KzgVerifiedDataColumn, +} + +impl GossipVerifiedDataColumn { + pub fn new( + column_sidecar: Arc>, + subnet_id: u64, + chain: &BeaconChain, + ) -> Result> { + let header = column_sidecar.signed_block_header.clone(); + // We only process slashing info if the gossip verification failed + // since we do not process the data column any further in that case. + validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { + process_block_slash_info::<_, GossipDataColumnError>( + chain, + BlockSlashInfo::from_early_error_data_column(header, e), + ) + }) + } + + pub fn id(&self) -> DataColumnIdentifier { + DataColumnIdentifier { + block_root: self.block_root, + index: self.data_column.data_column_index(), + } + } + + pub fn as_data_column(&self) -> &DataColumnSidecar { + self.data_column.as_data_column() + } + + pub fn block_root(&self) -> Hash256 { + self.block_root + } + + pub fn slot(&self) -> Slot { + self.data_column.data_column.slot() + } + + pub fn index(&self) -> ColumnIndex { + self.data_column.data_column.index + } + + pub fn signed_block_header(&self) -> SignedBeaconBlockHeader { + self.data_column.data_column.signed_block_header.clone() + } + + pub fn into_inner(self) -> KzgVerifiedDataColumn { + self.data_column + } +} + +/// Wrapper over a `DataColumnSidecar` for which we have completed kzg verification. +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedDataColumn { + data_column: Arc>, +} + +impl KzgVerifiedDataColumn { + pub fn new(data_column: Arc>, kzg: &Kzg) -> Result { + verify_kzg_for_data_column(data_column, kzg) + } + pub fn to_data_column(self) -> Arc> { + self.data_column + } + pub fn as_data_column(&self) -> &DataColumnSidecar { + &self.data_column + } + /// This is cheap as we're calling clone on an Arc + pub fn clone_data_column(&self) -> Arc> { + self.data_column.clone() + } + + pub fn data_column_index(&self) -> u64 { + self.data_column.index + } +} + +/// Complete kzg verification for a `DataColumnSidecar`. +/// +/// Returns an error if the kzg verification check fails. +pub fn verify_kzg_for_data_column( + data_column: Arc>, + _kzg: &Kzg, +) -> Result, KzgError> { + // TODO(das): validate data column + // validate_blob::( + // kzg, + // &data_column.blob, + // data_column.kzg_commitment, + // data_column.kzg_proof, + // )?; + Ok(KzgVerifiedDataColumn { data_column }) +} + +/// Complete kzg verification for a list of `DataColumnSidecar`s. +/// Returns an error if any of the `DataColumnSidecar`s fails kzg verification. +/// +/// Note: This function should be preferred over calling `verify_kzg_for_data_column` +/// in a loop since this function kzg verifies a list of data columns more efficiently. +pub fn verify_kzg_for_data_column_list<'a, T: EthSpec, I>( + _data_column_iter: I, + _kzg: &'a Kzg, +) -> Result<(), KzgError> +where + I: Iterator>>, +{ + // TODO(das): implement kzg verification + Ok(()) +} + +pub fn validate_data_column_sidecar_for_gossip( + data_column: Arc>, + _subnet: u64, + chain: &BeaconChain, +) -> Result, GossipDataColumnError> { + // TODO(das): validate gossip rules + let block_root = data_column.block_root(); + + // Kzg verification for gossip data column sidecar + let kzg = chain + .kzg + .as_ref() + .ok_or(GossipDataColumnError::KzgNotInitialized)?; + let kzg_verified_data_column = + KzgVerifiedDataColumn::new(data_column, kzg).map_err(GossipDataColumnError::KzgError)?; + + Ok(GossipVerifiedDataColumn { + block_root, + data_column: kzg_verified_data_column, + }) +} diff --git a/beacon_node/beacon_chain/src/early_attester_cache.rs b/beacon_node/beacon_chain/src/early_attester_cache.rs index da3c2c8a1e9..50e5578c5fc 100644 --- a/beacon_node/beacon_chain/src/early_attester_cache.rs +++ b/beacon_node/beacon_chain/src/early_attester_cache.rs @@ -7,6 +7,7 @@ use parking_lot::RwLock; use proto_array::Block as ProtoBlock; use std::sync::Arc; use types::blob_sidecar::BlobSidecarList; +use types::data_column_sidecar::DataColumnSidecarList; use types::*; pub struct CacheItem { @@ -23,6 +24,7 @@ pub struct CacheItem { */ block: Arc>, blobs: Option>, + data_columns: Option>, proto_block: ProtoBlock, } @@ -70,7 +72,7 @@ impl EarlyAttesterCache { }, }; - let (_, block, blobs) = block.deconstruct(); + let (_, block, blobs, data_columns) = block.deconstruct(); let item = CacheItem { epoch, committee_lengths, @@ -79,6 +81,7 @@ impl EarlyAttesterCache { target, block, blobs, + data_columns, proto_block, }; @@ -167,6 +170,15 @@ impl EarlyAttesterCache { .and_then(|item| item.blobs.clone()) } + /// Returns the data columns, if `block_root` matches the cached item. + pub fn get_data_columns(&self, block_root: Hash256) -> Option> { + self.item + .read() + .as_ref() + .filter(|item| item.beacon_block_root == block_root) + .and_then(|item| item.data_columns.clone()) + } + /// Returns the proto-array block, if `block_root` matches the cached item. pub fn get_proto_block(&self, block_root: Hash256) -> Option { self.item diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 85208c8ad6f..526027b998f 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -9,6 +9,7 @@ use state_processing::{ use std::borrow::Cow; use std::iter; use std::time::Duration; +use store::metadata::DataColumnInfo; use store::{chunked_vector::BlockRoots, AnchorInfo, BlobInfo, ChunkWriter, KeyValueStore}; use types::{Hash256, Slot}; @@ -66,6 +67,7 @@ impl BeaconChain { .get_anchor_info() .ok_or(HistoricalBlockError::NoAnchorInfo)?; let blob_info = self.store.get_blob_info(); + let data_column_info = self.store.get_data_column_info(); // Take all blocks with slots less than the oldest block slot. let num_relevant = blocks.partition_point(|available_block| { @@ -100,6 +102,7 @@ impl BeaconChain { let mut chunk_writer = ChunkWriter::::new(&self.store.cold_db, prev_block_slot.as_usize())?; let mut new_oldest_blob_slot = blob_info.oldest_blob_slot; + let mut new_oldest_data_column_slot = data_column_info.oldest_data_column_slot; let mut blob_batch = Vec::with_capacity(n_blobs_lists_to_import); let mut cold_batch = Vec::with_capacity(blocks_to_import.len()); @@ -107,7 +110,8 @@ impl BeaconChain { let mut signed_blocks = Vec::with_capacity(blocks_to_import.len()); for available_block in blocks_to_import.into_iter().rev() { - let (block_root, block, maybe_blobs) = available_block.deconstruct(); + let (block_root, block, maybe_blobs, maybe_data_columns) = + available_block.deconstruct(); if block_root != expected_block_root { return Err(HistoricalBlockError::MismatchedBlockRoot { @@ -127,6 +131,12 @@ impl BeaconChain { self.store .blobs_as_kv_store_ops(&block_root, blobs, &mut blob_batch); } + // Store the data columns too + if let Some(data_columns) = maybe_data_columns { + new_oldest_data_column_slot = Some(block.slot()); + self.store + .data_columns_as_kv_store_ops(&block_root, data_columns, &mut blob_batch); + } // Store block roots, including at all skip slots in the freezer DB. for slot in (block.slot().as_usize()..prev_block_slot.as_usize()).rev() { @@ -220,6 +230,19 @@ impl BeaconChain { } } + // Update the data column info. + if new_oldest_data_column_slot != data_column_info.oldest_data_column_slot { + if let Some(oldest_data_column_slot) = new_oldest_data_column_slot { + let new_data_column_info = DataColumnInfo { + oldest_data_column_slot: Some(oldest_data_column_slot), + }; + anchor_and_blob_batch.push( + self.store + .compare_and_set_data_column_info(data_column_info, new_data_column_info)?, + ); + } + } + // Update the anchor. let new_anchor = AnchorInfo { oldest_block_slot: prev_block_slot, diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 522009b1b27..20023b2f299 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -18,6 +18,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; +pub mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; mod errors; diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ff201729821..cec5f22af5a 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -2569,10 +2569,10 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { // signatures correctly. Regression test for https://github.com/sigp/lighthouse/pull/5120. let mut batch_with_invalid_first_block = available_blocks.clone(); batch_with_invalid_first_block[0] = { - let (block_root, block, blobs) = available_blocks[0].clone().deconstruct(); + let (block_root, block, blobs, data_columns) = available_blocks[0].clone().deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs) + AvailableBlock::__new_for_testing(block_root, Arc::new(corrupt_block), blobs, data_columns) }; // Importing the invalid batch should error. diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 8b1c127d300..5ac1aaac4c7 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -181,6 +181,10 @@ const MAX_BLOCKS_BY_ROOTS_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_BLOBS_BY_ROOTS_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `DataColumnsByRootRequest` objects received from the network RPC that +/// will be stored before we start dropping them. +const MAX_DATA_COLUMNS_BY_ROOTS_QUEUE_LEN: usize = 2_048; + /// Maximum number of `SignedBlsToExecutionChange` messages to queue before dropping them. /// /// This value is set high to accommodate the large spike that is expected immediately after Capella @@ -247,6 +251,7 @@ pub const BLOCKS_BY_RANGE_REQUEST: &str = "blocks_by_range_request"; pub const BLOCKS_BY_ROOTS_REQUEST: &str = "blocks_by_roots_request"; pub const BLOBS_BY_RANGE_REQUEST: &str = "blobs_by_range_request"; pub const BLOBS_BY_ROOTS_REQUEST: &str = "blobs_by_roots_request"; +pub const DATA_COLUMNS_BY_ROOTS_REQUEST: &str = "data_columns_by_roots_request"; pub const LIGHT_CLIENT_BOOTSTRAP_REQUEST: &str = "light_client_bootstrap"; pub const UNKNOWN_BLOCK_ATTESTATION: &str = "unknown_block_attestation"; pub const UNKNOWN_BLOCK_AGGREGATE: &str = "unknown_block_aggregate"; @@ -624,6 +629,7 @@ pub enum Work { BlocksByRootsRequest(BlockingFnWithManualSendOnIdle), BlobsByRangeRequest(BlockingFn), BlobsByRootsRequest(BlockingFn), + DataColumnsByRootsRequest(BlockingFn), GossipBlsToExecutionChange(BlockingFn), LightClientBootstrapRequest(BlockingFn), ApiRequestP0(BlockingOrAsync), @@ -665,6 +671,7 @@ impl Work { Work::BlocksByRootsRequest(_) => BLOCKS_BY_ROOTS_REQUEST, Work::BlobsByRangeRequest(_) => BLOBS_BY_RANGE_REQUEST, Work::BlobsByRootsRequest(_) => BLOBS_BY_ROOTS_REQUEST, + Work::DataColumnsByRootsRequest(_) => DATA_COLUMNS_BY_ROOTS_REQUEST, Work::LightClientBootstrapRequest(_) => LIGHT_CLIENT_BOOTSTRAP_REQUEST, Work::UnknownBlockAttestation { .. } => UNKNOWN_BLOCK_ATTESTATION, Work::UnknownBlockAggregate { .. } => UNKNOWN_BLOCK_AGGREGATE, @@ -824,6 +831,7 @@ impl BeaconProcessor { let mut bbroots_queue = FifoQueue::new(MAX_BLOCKS_BY_ROOTS_QUEUE_LEN); let mut blbroots_queue = FifoQueue::new(MAX_BLOBS_BY_ROOTS_QUEUE_LEN); let mut blbrange_queue = FifoQueue::new(MAX_BLOBS_BY_RANGE_QUEUE_LEN); + let mut dcbroots_queue = FifoQueue::new(MAX_DATA_COLUMNS_BY_ROOTS_QUEUE_LEN); let mut gossip_bls_to_execution_change_queue = FifoQueue::new(MAX_BLS_TO_EXECUTION_CHANGE_QUEUE_LEN); @@ -1123,6 +1131,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = blbroots_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = dcbroots_queue.pop() { + self.spawn_worker(item, idle_tx); // Check slashings after all other consensus messages so we prioritize // following head. // @@ -1276,6 +1286,9 @@ impl BeaconProcessor { Work::BlobsByRootsRequest { .. } => { blbroots_queue.push(work, work_id, &self.log) } + Work::DataColumnsByRootsRequest { .. } => { + dcbroots_queue.push(work, work_id, &self.log) + } Work::UnknownLightClientOptimisticUpdate { .. } => { unknown_light_client_update_queue.push(work, work_id, &self.log) } @@ -1477,7 +1490,9 @@ impl BeaconProcessor { | Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move { work.await; }), - Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => { + Work::BlobsByRangeRequest(process_fn) + | Work::BlobsByRootsRequest(process_fn) + | Work::DataColumnsByRootsRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => { diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 7c126fe1b9d..32a7e06868c 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -530,7 +530,10 @@ impl PeerManager { RPCResponseErrorCode::Unknown => PeerAction::HighToleranceError, RPCResponseErrorCode::ResourceUnavailable => { // Don't ban on this because we want to retry with a block by root request. - if matches!(protocol, Protocol::BlobsByRoot) { + if matches!( + protocol, + Protocol::BlobsByRoot | Protocol::DataColumnsByRoot + ) { return; } @@ -563,8 +566,9 @@ impl PeerManager { Protocol::BlocksByRange => PeerAction::MidToleranceError, Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, - Protocol::LightClientBootstrap => PeerAction::LowToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, + Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, + Protocol::LightClientBootstrap => PeerAction::LowToleranceError, Protocol::Goodbye => PeerAction::LowToleranceError, Protocol::MetaData => PeerAction::LowToleranceError, Protocol::Status => PeerAction::LowToleranceError, @@ -583,6 +587,7 @@ impl PeerManager { Protocol::BlocksByRoot => return, Protocol::BlobsByRange => return, Protocol::BlobsByRoot => return, + Protocol::DataColumnsByRoot => return, Protocol::Goodbye => return, Protocol::LightClientBootstrap => return, Protocol::MetaData => PeerAction::Fatal, @@ -601,6 +606,7 @@ impl PeerManager { Protocol::BlocksByRoot => PeerAction::MidToleranceError, Protocol::BlobsByRange => PeerAction::MidToleranceError, Protocol::BlobsByRoot => PeerAction::MidToleranceError, + Protocol::DataColumnsByRoot => PeerAction::MidToleranceError, Protocol::LightClientBootstrap => return, Protocol::Goodbye => return, Protocol::MetaData => return, diff --git a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs index 7a7f2969f16..0fb90c5d36d 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec/ssz_snappy.rs @@ -15,12 +15,12 @@ use std::io::{Read, Write}; use std::marker::PhantomData; use std::sync::Arc; use tokio_util::codec::{Decoder, Encoder}; -use types::ChainSpec; use types::{ BlobSidecar, EthSpec, ForkContext, ForkName, Hash256, LightClientBootstrap, RuntimeVariableList, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockMerge, }; +use types::{ChainSpec, DataColumnSidecar}; use unsigned_varint::codec::Uvi; const CONTEXT_BYTES_LEN: usize = 4; @@ -74,6 +74,7 @@ impl Encoder> for SSZSnappyInboundCodec< RPCResponse::BlocksByRoot(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRange(res) => res.as_ssz_bytes(), RPCResponse::BlobsByRoot(res) => res.as_ssz_bytes(), + RPCResponse::DataColumnsByRoot(res) => res.as_ssz_bytes(), RPCResponse::LightClientBootstrap(res) => res.as_ssz_bytes(), RPCResponse::Pong(res) => res.data.as_ssz_bytes(), RPCResponse::MetaData(res) => @@ -230,6 +231,7 @@ impl Encoder> for SSZSnappyOutboundCodec< }, OutboundRequest::BlobsByRange(req) => req.as_ssz_bytes(), OutboundRequest::BlobsByRoot(req) => req.blob_ids.as_ssz_bytes(), + OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.as_ssz_bytes(), OutboundRequest::Ping(req) => req.as_ssz_bytes(), OutboundRequest::MetaData(_) => return Ok(()), // no metadata to encode }; @@ -498,6 +500,14 @@ fn handle_rpc_request( )?, }))) } + SupportedProtocol::DataColumnsByRootV1 => Ok(Some(InboundRequest::DataColumnsByRoot( + DataColumnsByRootRequest { + data_column_ids: RuntimeVariableList::from_ssz_bytes( + decoded_buffer, + spec.max_request_data_column_sidecars as usize, + )?, + }, + ))), SupportedProtocol::PingV1 => Ok(Some(InboundRequest::Ping(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -584,6 +594,23 @@ fn handle_rpc_response( ), )), }, + SupportedProtocol::DataColumnsByRootV1 => match fork_name { + // TODO(das): update fork name + Some(ForkName::Deneb) => Ok(Some(RPCResponse::DataColumnsByRoot(Arc::new( + DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, + )))), + Some(_) => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + "Invalid fork name for data columns by root".to_string(), + )), + None => Err(RPCError::ErrorResponse( + RPCResponseErrorCode::InvalidRequest, + format!( + "No context bytes provided for {:?} response", + versioned_protocol + ), + )), + }, SupportedProtocol::PingV1 => Ok(Some(RPCResponse::Pong(Ping { data: u64::from_ssz_bytes(decoded_buffer)?, }))), @@ -945,6 +972,9 @@ mod tests { OutboundRequest::BlobsByRoot(bbroot) => { assert_eq!(decoded, InboundRequest::BlobsByRoot(bbroot)) } + OutboundRequest::DataColumnsByRoot(dcbroot) => { + assert_eq!(decoded, InboundRequest::DataColumnsByRoot(dcbroot)) + } OutboundRequest::Ping(ping) => { assert_eq!(decoded, InboundRequest::Ping(ping)) } diff --git a/beacon_node/lighthouse_network/src/rpc/config.rs b/beacon_node/lighthouse_network/src/rpc/config.rs index 9895149198a..ea780e1dff7 100644 --- a/beacon_node/lighthouse_network/src/rpc/config.rs +++ b/beacon_node/lighthouse_network/src/rpc/config.rs @@ -91,6 +91,7 @@ pub struct RateLimiterConfig { pub(super) blocks_by_root_quota: Quota, pub(super) blobs_by_range_quota: Quota, pub(super) blobs_by_root_quota: Quota, + pub(super) data_columns_by_root_quota: Quota, pub(super) light_client_bootstrap_quota: Quota, } @@ -103,6 +104,7 @@ impl RateLimiterConfig { pub const DEFAULT_BLOCKS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); pub const DEFAULT_BLOBS_BY_RANGE_QUOTA: Quota = Quota::n_every(768, 10); pub const DEFAULT_BLOBS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); + pub const DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA: Quota = Quota::n_every(128, 10); pub const DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA: Quota = Quota::one_every(10); } @@ -117,6 +119,7 @@ impl Default for RateLimiterConfig { blocks_by_root_quota: Self::DEFAULT_BLOCKS_BY_ROOT_QUOTA, blobs_by_range_quota: Self::DEFAULT_BLOBS_BY_RANGE_QUOTA, blobs_by_root_quota: Self::DEFAULT_BLOBS_BY_ROOT_QUOTA, + data_columns_by_root_quota: Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA, light_client_bootstrap_quota: Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA, } } @@ -143,6 +146,10 @@ impl Debug for RateLimiterConfig { .field("blocks_by_root", fmt_q!(&self.blocks_by_root_quota)) .field("blobs_by_range", fmt_q!(&self.blobs_by_range_quota)) .field("blobs_by_root", fmt_q!(&self.blobs_by_root_quota)) + .field( + "data_columns_by_root", + fmt_q!(&self.data_columns_by_root_quota), + ) .finish() } } @@ -163,6 +170,7 @@ impl FromStr for RateLimiterConfig { let mut blocks_by_root_quota = None; let mut blobs_by_range_quota = None; let mut blobs_by_root_quota = None; + let mut data_columns_by_root_quota = None; let mut light_client_bootstrap_quota = None; for proto_def in s.split(';') { @@ -175,6 +183,9 @@ impl FromStr for RateLimiterConfig { Protocol::BlocksByRoot => blocks_by_root_quota = blocks_by_root_quota.or(quota), Protocol::BlobsByRange => blobs_by_range_quota = blobs_by_range_quota.or(quota), Protocol::BlobsByRoot => blobs_by_root_quota = blobs_by_root_quota.or(quota), + Protocol::DataColumnsByRoot => { + data_columns_by_root_quota = data_columns_by_root_quota.or(quota) + } Protocol::Ping => ping_quota = ping_quota.or(quota), Protocol::MetaData => meta_data_quota = meta_data_quota.or(quota), Protocol::LightClientBootstrap => { @@ -194,6 +205,8 @@ impl FromStr for RateLimiterConfig { blobs_by_range_quota: blobs_by_range_quota .unwrap_or(Self::DEFAULT_BLOBS_BY_RANGE_QUOTA), blobs_by_root_quota: blobs_by_root_quota.unwrap_or(Self::DEFAULT_BLOBS_BY_ROOT_QUOTA), + data_columns_by_root_quota: data_columns_by_root_quota + .unwrap_or(Self::DEFAULT_DATA_COLUMNS_BY_ROOT_QUOTA), light_client_bootstrap_quota: light_client_bootstrap_quota .unwrap_or(Self::DEFAULT_LIGHT_CLIENT_BOOTSTRAP_QUOTA), }) diff --git a/beacon_node/lighthouse_network/src/rpc/methods.rs b/beacon_node/lighthouse_network/src/rpc/methods.rs index cd3579ad6e3..efc80d55faa 100644 --- a/beacon_node/lighthouse_network/src/rpc/methods.rs +++ b/beacon_node/lighthouse_network/src/rpc/methods.rs @@ -12,9 +12,10 @@ use std::sync::Arc; use strum::IntoStaticStr; use superstruct::superstruct; use types::blob_sidecar::BlobIdentifier; +use types::data_column_sidecar::DataColumnIdentifier; use types::{ - blob_sidecar::BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, LightClientBootstrap, - RuntimeVariableList, SignedBeaconBlock, Slot, + blob_sidecar::BlobSidecar, ChainSpec, DataColumnSidecar, Epoch, EthSpec, Hash256, + LightClientBootstrap, RuntimeVariableList, SignedBeaconBlock, Slot, }; /// Maximum length of error message. @@ -366,6 +367,13 @@ impl BlobsByRootRequest { } } +/// Request a number of data columns from a peer. +#[derive(Clone, Debug, PartialEq)] +pub struct DataColumnsByRootRequest { + /// The list of beacon block roots and column indices being requested. + pub data_column_ids: RuntimeVariableList, +} + /* RPC Handling and Grouping */ // Collection of enums and structs used by the Codecs to encode/decode RPC messages @@ -390,6 +398,9 @@ pub enum RPCResponse { /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Arc>), + /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. + DataColumnsByRoot(Arc>), + /// A PONG response to a PING request. Pong(Ping), @@ -411,6 +422,9 @@ pub enum ResponseTermination { /// Blobs by root stream termination. BlobsByRoot, + + /// Data column sidecars by root stream termination. + DataColumnsByRoot, } /// The structured response containing a result/code indicating success or failure @@ -482,6 +496,7 @@ impl RPCCodedResponse { RPCResponse::BlocksByRoot(_) => true, RPCResponse::BlobsByRange(_) => true, RPCResponse::BlobsByRoot(_) => true, + RPCResponse::DataColumnsByRoot(_) => true, RPCResponse::Pong(_) => false, RPCResponse::MetaData(_) => false, RPCResponse::LightClientBootstrap(_) => false, @@ -520,6 +535,7 @@ impl RPCResponse { RPCResponse::BlocksByRoot(_) => Protocol::BlocksByRoot, RPCResponse::BlobsByRange(_) => Protocol::BlobsByRange, RPCResponse::BlobsByRoot(_) => Protocol::BlobsByRoot, + RPCResponse::DataColumnsByRoot(_) => Protocol::DataColumnsByRoot, RPCResponse::Pong(_) => Protocol::Ping, RPCResponse::MetaData(_) => Protocol::MetaData, RPCResponse::LightClientBootstrap(_) => Protocol::LightClientBootstrap, @@ -563,6 +579,9 @@ impl std::fmt::Display for RPCResponse { RPCResponse::BlobsByRoot(sidecar) => { write!(f, "BlobsByRoot: Blob slot: {}", sidecar.slot()) } + RPCResponse::DataColumnsByRoot(sidecar) => { + write!(f, "DataColumnsByRoot: Data column slot: {}", sidecar.slot()) + } RPCResponse::Pong(ping) => write!(f, "Pong: {}", ping.data), RPCResponse::MetaData(metadata) => write!(f, "Metadata: {}", metadata.seq_number()), RPCResponse::LightClientBootstrap(bootstrap) => { @@ -645,6 +664,16 @@ impl std::fmt::Display for BlobsByRangeRequest { } } +impl std::fmt::Display for DataColumnsByRootRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Request: DataColumnsByRoot: Number of Requested Data Column Ids: {}", + self.data_column_ids.len() + ) + } +} + impl slog::KV for StatusMessage { fn serialize( &self, diff --git a/beacon_node/lighthouse_network/src/rpc/mod.rs b/beacon_node/lighthouse_network/src/rpc/mod.rs index e22e5273866..a32db20441a 100644 --- a/beacon_node/lighthouse_network/src/rpc/mod.rs +++ b/beacon_node/lighthouse_network/src/rpc/mod.rs @@ -414,6 +414,7 @@ where ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot, ResponseTermination::BlobsByRange => Protocol::BlobsByRange, ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot, + ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot, }, ), }; diff --git a/beacon_node/lighthouse_network/src/rpc/outbound.rs b/beacon_node/lighthouse_network/src/rpc/outbound.rs index 713e9e0ec9d..89762fc623d 100644 --- a/beacon_node/lighthouse_network/src/rpc/outbound.rs +++ b/beacon_node/lighthouse_network/src/rpc/outbound.rs @@ -37,6 +37,7 @@ pub enum OutboundRequest { BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), + DataColumnsByRoot(DataColumnsByRootRequest), Ping(Ping), MetaData(MetadataRequest), } @@ -80,6 +81,10 @@ impl OutboundRequest { SupportedProtocol::BlobsByRootV1, Encoding::SSZSnappy, )], + OutboundRequest::DataColumnsByRoot(_) => vec![ProtocolId::new( + SupportedProtocol::DataColumnsByRootV1, + Encoding::SSZSnappy, + )], OutboundRequest::Ping(_) => vec![ProtocolId::new( SupportedProtocol::PingV1, Encoding::SSZSnappy, @@ -101,6 +106,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64, OutboundRequest::BlobsByRange(req) => req.max_blobs_requested::(), OutboundRequest::BlobsByRoot(req) => req.blob_ids.len() as u64, + OutboundRequest::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, OutboundRequest::Ping(_) => 1, OutboundRequest::MetaData(_) => 1, } @@ -121,6 +127,7 @@ impl OutboundRequest { }, OutboundRequest::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, OutboundRequest::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, + OutboundRequest::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, OutboundRequest::Ping(_) => SupportedProtocol::PingV1, OutboundRequest::MetaData(req) => match req { MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, @@ -139,6 +146,7 @@ impl OutboundRequest { OutboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, OutboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, OutboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, + OutboundRequest::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, OutboundRequest::Status(_) => unreachable!(), OutboundRequest::Goodbye(_) => unreachable!(), OutboundRequest::Ping(_) => unreachable!(), @@ -196,6 +204,7 @@ impl std::fmt::Display for OutboundRequest { OutboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), OutboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), OutboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), + OutboundRequest::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), OutboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), OutboundRequest::MetaData(_) => write!(f, "MetaData request"), } diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 9c174b8e425..d03f45211f1 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -21,8 +21,8 @@ use tokio_util::{ }; use types::{ BeaconBlock, BeaconBlockAltair, BeaconBlockBase, BeaconBlockCapella, BeaconBlockMerge, - BlobSidecar, ChainSpec, EmptyBlock, EthSpec, ForkContext, ForkName, MainnetEthSpec, Signature, - SignedBeaconBlock, + BlobSidecar, ChainSpec, DataColumnSidecar, EmptyBlock, EthSpec, ForkContext, ForkName, + MainnetEthSpec, Signature, SignedBeaconBlock, }; lazy_static! { @@ -166,6 +166,9 @@ pub enum Protocol { /// The `BlobsByRoot` protocol name. #[strum(serialize = "blob_sidecars_by_root")] BlobsByRoot, + /// The `DataColumnSidecarsByRoot` protocol name. + #[strum(serialize = "data_column_sidecars_by_root")] + DataColumnsByRoot, /// The `Ping` protocol name. Ping, /// The `MetaData` protocol name. @@ -185,6 +188,7 @@ impl Protocol { Protocol::BlocksByRoot => Some(ResponseTermination::BlocksByRoot), Protocol::BlobsByRange => Some(ResponseTermination::BlobsByRange), Protocol::BlobsByRoot => Some(ResponseTermination::BlobsByRoot), + Protocol::DataColumnsByRoot => Some(ResponseTermination::DataColumnsByRoot), Protocol::Ping => None, Protocol::MetaData => None, Protocol::LightClientBootstrap => None, @@ -209,6 +213,7 @@ pub enum SupportedProtocol { BlocksByRootV2, BlobsByRangeV1, BlobsByRootV1, + DataColumnsByRootV1, PingV1, MetaDataV1, MetaDataV2, @@ -226,6 +231,7 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRootV2 => "2", SupportedProtocol::BlobsByRangeV1 => "1", SupportedProtocol::BlobsByRootV1 => "1", + SupportedProtocol::DataColumnsByRootV1 => "1", SupportedProtocol::PingV1 => "1", SupportedProtocol::MetaDataV1 => "1", SupportedProtocol::MetaDataV2 => "2", @@ -243,6 +249,7 @@ impl SupportedProtocol { SupportedProtocol::BlocksByRootV2 => Protocol::BlocksByRoot, SupportedProtocol::BlobsByRangeV1 => Protocol::BlobsByRange, SupportedProtocol::BlobsByRootV1 => Protocol::BlobsByRoot, + SupportedProtocol::DataColumnsByRootV1 => Protocol::DataColumnsByRoot, SupportedProtocol::PingV1 => Protocol::Ping, SupportedProtocol::MetaDataV1 => Protocol::MetaData, SupportedProtocol::MetaDataV2 => Protocol::MetaData, @@ -369,6 +376,7 @@ impl ProtocolId { ::ssz_fixed_len(), ), Protocol::BlobsByRoot => RpcLimits::new(0, spec.max_blobs_by_root_request), + Protocol::DataColumnsByRoot => RpcLimits::new(0, spec.max_data_columns_by_root_request), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -393,6 +401,10 @@ impl ProtocolId { Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), + Protocol::DataColumnsByRoot => RpcLimits::new( + DataColumnSidecar::::min_size(), + DataColumnSidecar::::max_size(), + ), Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), ::ssz_fixed_len(), @@ -416,6 +428,7 @@ impl ProtocolId { | SupportedProtocol::BlocksByRootV2 | SupportedProtocol::BlobsByRangeV1 | SupportedProtocol::BlobsByRootV1 + | SupportedProtocol::DataColumnsByRootV1 | SupportedProtocol::LightClientBootstrapV1 => true, SupportedProtocol::StatusV1 | SupportedProtocol::BlocksByRootV1 @@ -527,6 +540,7 @@ pub enum InboundRequest { BlocksByRoot(BlocksByRootRequest), BlobsByRange(BlobsByRangeRequest), BlobsByRoot(BlobsByRootRequest), + DataColumnsByRoot(DataColumnsByRootRequest), LightClientBootstrap(LightClientBootstrapRequest), Ping(Ping), MetaData(MetadataRequest), @@ -545,6 +559,7 @@ impl InboundRequest { InboundRequest::BlocksByRoot(req) => req.block_roots().len() as u64, InboundRequest::BlobsByRange(req) => req.max_blobs_requested::(), InboundRequest::BlobsByRoot(req) => req.blob_ids.len() as u64, + InboundRequest::DataColumnsByRoot(req) => req.data_column_ids.len() as u64, InboundRequest::Ping(_) => 1, InboundRequest::MetaData(_) => 1, InboundRequest::LightClientBootstrap(_) => 1, @@ -566,6 +581,7 @@ impl InboundRequest { }, InboundRequest::BlobsByRange(_) => SupportedProtocol::BlobsByRangeV1, InboundRequest::BlobsByRoot(_) => SupportedProtocol::BlobsByRootV1, + InboundRequest::DataColumnsByRoot(_) => SupportedProtocol::DataColumnsByRootV1, InboundRequest::Ping(_) => SupportedProtocol::PingV1, InboundRequest::MetaData(req) => match req { MetadataRequest::V1(_) => SupportedProtocol::MetaDataV1, @@ -585,6 +601,7 @@ impl InboundRequest { InboundRequest::BlocksByRoot(_) => ResponseTermination::BlocksByRoot, InboundRequest::BlobsByRange(_) => ResponseTermination::BlobsByRange, InboundRequest::BlobsByRoot(_) => ResponseTermination::BlobsByRoot, + InboundRequest::DataColumnsByRoot(_) => ResponseTermination::DataColumnsByRoot, InboundRequest::Status(_) => unreachable!(), InboundRequest::Goodbye(_) => unreachable!(), InboundRequest::Ping(_) => unreachable!(), @@ -693,6 +710,7 @@ impl std::fmt::Display for InboundRequest { InboundRequest::BlocksByRoot(req) => write!(f, "Blocks by root: {:?}", req), InboundRequest::BlobsByRange(req) => write!(f, "Blobs by range: {:?}", req), InboundRequest::BlobsByRoot(req) => write!(f, "Blobs by root: {:?}", req), + InboundRequest::DataColumnsByRoot(req) => write!(f, "Data columns by root: {:?}", req), InboundRequest::Ping(ping) => write!(f, "Ping: {}", ping.data), InboundRequest::MetaData(_) => write!(f, "MetaData request"), InboundRequest::LightClientBootstrap(bootstrap) => { diff --git a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs index 0b57374e8b6..b9ada25c1de 100644 --- a/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs +++ b/beacon_node/lighthouse_network/src/rpc/rate_limiter.rs @@ -98,6 +98,8 @@ pub struct RPCRateLimiter { blbrange_rl: Limiter, /// BlobsByRoot rate limiter. blbroot_rl: Limiter, + /// DataColumnssByRoot rate limiter. + dcbroot_rl: Limiter, /// LightClientBootstrap rate limiter. lcbootstrap_rl: Limiter, } @@ -130,6 +132,8 @@ pub struct RPCRateLimiterBuilder { blbrange_quota: Option, /// Quota for the BlobsByRoot protocol. blbroot_quota: Option, + /// Quota for the DataColumnsByRoot protocol. + dcbroot_quota: Option, /// Quota for the LightClientBootstrap protocol. lcbootstrap_quota: Option, } @@ -147,6 +151,7 @@ impl RPCRateLimiterBuilder { Protocol::BlocksByRoot => self.bbroots_quota = q, Protocol::BlobsByRange => self.blbrange_quota = q, Protocol::BlobsByRoot => self.blbroot_quota = q, + Protocol::DataColumnsByRoot => self.dcbroot_quota = q, Protocol::LightClientBootstrap => self.lcbootstrap_quota = q, } self @@ -176,6 +181,10 @@ impl RPCRateLimiterBuilder { .blbroot_quota .ok_or("BlobsByRoot quota not specified")?; + let dcbroot_quota = self + .dcbroot_quota + .ok_or("DataColumnsByRoot quota not specified")?; + // create the rate limiters let ping_rl = Limiter::from_quota(ping_quota)?; let metadata_rl = Limiter::from_quota(metadata_quota)?; @@ -185,6 +194,7 @@ impl RPCRateLimiterBuilder { let bbrange_rl = Limiter::from_quota(bbrange_quota)?; let blbrange_rl = Limiter::from_quota(blbrange_quota)?; let blbroot_rl = Limiter::from_quota(blbroots_quota)?; + let dcbroot_rl = Limiter::from_quota(dcbroot_quota)?; let lcbootstrap_rl = Limiter::from_quota(lcbootstrap_quote)?; // check for peers to prune every 30 seconds, starting in 30 seconds @@ -201,6 +211,7 @@ impl RPCRateLimiterBuilder { bbrange_rl, blbrange_rl, blbroot_rl, + dcbroot_rl, lcbootstrap_rl, init_time: Instant::now(), }) @@ -243,6 +254,7 @@ impl RPCRateLimiter { blocks_by_root_quota, blobs_by_range_quota, blobs_by_root_quota, + data_columns_by_root_quota, light_client_bootstrap_quota, } = config; @@ -255,6 +267,7 @@ impl RPCRateLimiter { .set_quota(Protocol::BlocksByRoot, blocks_by_root_quota) .set_quota(Protocol::BlobsByRange, blobs_by_range_quota) .set_quota(Protocol::BlobsByRoot, blobs_by_root_quota) + .set_quota(Protocol::DataColumnsByRoot, data_columns_by_root_quota) .set_quota(Protocol::LightClientBootstrap, light_client_bootstrap_quota) .build() } @@ -283,6 +296,7 @@ impl RPCRateLimiter { Protocol::BlocksByRoot => &mut self.bbroots_rl, Protocol::BlobsByRange => &mut self.blbrange_rl, Protocol::BlobsByRoot => &mut self.blbroot_rl, + Protocol::DataColumnsByRoot => &mut self.dcbroot_rl, Protocol::LightClientBootstrap => &mut self.lcbootstrap_rl, }; check(limiter) diff --git a/beacon_node/lighthouse_network/src/service/api_types.rs b/beacon_node/lighthouse_network/src/service/api_types.rs index 96c9d283327..e12904a0a5e 100644 --- a/beacon_node/lighthouse_network/src/service/api_types.rs +++ b/beacon_node/lighthouse_network/src/service/api_types.rs @@ -1,9 +1,9 @@ use std::sync::Arc; use libp2p::swarm::ConnectionId; -use types::{BlobSidecar, EthSpec, LightClientBootstrap, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, LightClientBootstrap, SignedBeaconBlock}; -use crate::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use crate::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest}; use crate::rpc::{ methods::{ BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, @@ -42,6 +42,8 @@ pub enum Request { LightClientBootstrap(LightClientBootstrapRequest), /// A request blobs root request. BlobsByRoot(BlobsByRootRequest), + /// A request data columns root request. + DataColumnsByRoot(DataColumnsByRootRequest), } impl std::convert::From for OutboundRequest { @@ -69,6 +71,7 @@ impl std::convert::From for OutboundRequest { } Request::BlobsByRange(r) => OutboundRequest::BlobsByRange(r), Request::BlobsByRoot(r) => OutboundRequest::BlobsByRoot(r), + Request::DataColumnsByRoot(r) => OutboundRequest::DataColumnsByRoot(r), Request::Status(s) => OutboundRequest::Status(s), } } @@ -92,6 +95,8 @@ pub enum Response { BlocksByRoot(Option>>), /// A response to a get BLOBS_BY_ROOT request. BlobsByRoot(Option>>), + /// A response to a get DATA_COLUMN_SIDECARS_BY_ROOT request. + DataColumnsByRoot(Option>>), /// A response to a LightClientUpdate request. LightClientBootstrap(LightClientBootstrap), } @@ -115,6 +120,10 @@ impl std::convert::From> for RPCCodedResponse RPCCodedResponse::Success(RPCResponse::BlobsByRange(b)), None => RPCCodedResponse::StreamTermination(ResponseTermination::BlobsByRange), }, + Response::DataColumnsByRoot(r) => match r { + Some(d) => RPCCodedResponse::Success(RPCResponse::DataColumnsByRoot(d)), + None => RPCCodedResponse::StreamTermination(ResponseTermination::DataColumnsByRoot), + }, Response::Status(s) => RPCCodedResponse::Success(RPCResponse::Status(s)), Response::LightClientBootstrap(b) => { RPCCodedResponse::Success(RPCResponse::LightClientBootstrap(b)) diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index bc1b60189e7..89c4ce6df1f 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -1137,6 +1137,9 @@ impl Network { Request::BlobsByRoot { .. } => { metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["blobs_by_root"]) } + Request::DataColumnsByRoot { .. } => { + metrics::inc_counter_vec(&metrics::TOTAL_RPC_REQUESTS, &["data_columns_by_root"]) + } } NetworkEvent::RequestReceived { peer_id, @@ -1458,6 +1461,14 @@ impl Network { self.build_request(peer_request_id, peer_id, Request::BlobsByRoot(req)); Some(event) } + InboundRequest::DataColumnsByRoot(req) => { + let event = self.build_request( + peer_request_id, + peer_id, + Request::DataColumnsByRoot(req), + ); + Some(event) + } InboundRequest::LightClientBootstrap(req) => { let event = self.build_request( peer_request_id, @@ -1499,6 +1510,9 @@ impl Network { RPCResponse::BlobsByRoot(resp) => { self.build_response(id, peer_id, Response::BlobsByRoot(Some(resp))) } + RPCResponse::DataColumnsByRoot(resp) => { + self.build_response(id, peer_id, Response::DataColumnsByRoot(Some(resp))) + } // Should never be reached RPCResponse::LightClientBootstrap(bootstrap) => { self.build_response(id, peer_id, Response::LightClientBootstrap(bootstrap)) @@ -1511,6 +1525,7 @@ impl Network { ResponseTermination::BlocksByRoot => Response::BlocksByRoot(None), ResponseTermination::BlobsByRange => Response::BlobsByRange(None), ResponseTermination::BlobsByRoot => Response::BlobsByRoot(None), + ResponseTermination::DataColumnsByRoot => Response::DataColumnsByRoot(None), }; self.build_response(id, peer_id, response) } diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index fb76f2a2677..62a1216f13b 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,10 +4,9 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{ - GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar, -}; +use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; use beacon_chain::block_verification_types::AsBlock; +use beacon_chain::data_column_verification::GossipVerifiedDataColumn; use beacon_chain::store::Error; use beacon_chain::{ attestation_verification::{self, Error as AttnError, VerifiedAttestation}, @@ -869,12 +868,59 @@ impl NetworkBeaconProcessor { pub async fn process_gossip_verified_data_column( self: &Arc, - _peer_id: PeerId, - verified_data_column: GossipVerifiedDataColumnSidecar, + peer_id: PeerId, + verified_data_column: GossipVerifiedDataColumn, // This value is not used presently, but it might come in handy for debugging. _seen_duration: Duration, ) { - self.chain.process_gossip_data_column(verified_data_column); + let block_root = verified_data_column.block_root(); + let data_column_slot = verified_data_column.slot(); + let data_column_index = verified_data_column.id().index; + + match self + .chain + .process_gossip_data_column(verified_data_column) + .await + { + Ok(AvailabilityProcessingStatus::Imported(block_root)) => { + // Note: Reusing block imported metric here + metrics::inc_counter(&metrics::BEACON_PROCESSOR_GOSSIP_BLOCK_IMPORTED_TOTAL); + info!( + self.log, + "Gossipsub data column processed, imported fully available block"; + "block_root" => %block_root + ); + self.chain.recompute_head_at_current_slot().await; + } + Ok(AvailabilityProcessingStatus::MissingComponents(slot, block_root)) => { + trace!( + self.log, + "Processed data column, waiting for other components"; + "slot" => %slot, + "data_column_index" => %data_column_index, + "block_root" => %block_root, + ); + } + Err(err) => { + debug!( + self.log, + "Invalid gossip data column"; + "outcome" => ?err, + "block root" => ?block_root, + "block slot" => data_column_slot, + "data column index" => data_column_index, + ); + self.gossip_penalize_peer( + peer_id, + PeerAction::MidToleranceError, + "bad_gossip_data_column_ssz", + ); + trace!( + self.log, + "Invalid gossip data column ssz"; + ); + } + } } /// Process the beacon block received from the gossip network and: diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 8361554fa9d..7c444b8b52e 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -13,7 +13,9 @@ use beacon_processor::{ WorkEvent as BeaconWorkEvent, }; use environment::null_logger; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest, +}; use lighthouse_network::{ rpc::{BlocksByRangeRequest, BlocksByRootRequest, LightClientBootstrapRequest, StatusMessage}, Client, MessageId, NetworkGlobals, PeerId, PeerRequestId, @@ -618,6 +620,23 @@ impl NetworkBeaconProcessor { }) } + /// Create a new work event to process `DataColumnsByRootRequest`s from the RPC network. + pub fn send_data_columns_by_roots_request( + self: &Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRootRequest, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = + move || processor.handle_data_columns_by_root_request(peer_id, request_id, request); + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::DataColumnsByRootsRequest(Box::new(process_fn)), + }) + } + /// Create a new work event to process `LightClientBootstrap`s from the RPC network. pub fn send_light_client_bootstrap_request( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 66c98ff3b84..167601afbe9 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -5,7 +5,9 @@ use crate::sync::SyncMessage; use beacon_chain::{BeaconChainError, BeaconChainTypes, HistoricalBlockError, WhenSlotSkipped}; use beacon_processor::SendOnDrop; use itertools::process_results; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, BlobsByRootRequest, DataColumnsByRootRequest, +}; use lighthouse_network::rpc::StatusMessage; use lighthouse_network::rpc::*; use lighthouse_network::{PeerId, PeerRequestId, ReportSource, Response, SyncInfo}; @@ -16,6 +18,7 @@ use std::sync::Arc; use task_executor::TaskExecutor; use tokio_stream::StreamExt; use types::blob_sidecar::BlobIdentifier; +use types::data_column_sidecar::DataColumnIdentifier; use types::{Epoch, EthSpec, ForkName, Hash256, Slot}; impl NetworkBeaconProcessor { @@ -294,6 +297,94 @@ impl NetworkBeaconProcessor { } } + /// Handle a `DataColumnsByRoot` request from the peer. + pub fn handle_data_columns_by_root_request( + self: Arc, + peer_id: PeerId, + request_id: PeerRequestId, + request: DataColumnsByRootRequest, + ) { + let Some(requested_root) = request + .data_column_ids + .as_slice() + .first() + .map(|id| id.block_root) + else { + // No data column ids requested. + return; + }; + let requested_indices = request + .data_column_ids + .as_slice() + .iter() + .map(|id| id.index) + .collect::>(); + let mut send_data_column_count = 0; + + let mut data_column_list_results = HashMap::new(); + for id in request.data_column_ids.as_slice() { + // Attempt to get the data columns from the RPC cache. + if let Ok(Some(data_column)) = self.chain.data_availability_checker.get_data_column(id) + { + self.send_response( + peer_id, + Response::DataColumnsByRoot(Some(data_column)), + request_id, + ); + send_data_column_count += 1; + } else { + let DataColumnIdentifier { + block_root: root, + index, + } = id; + + let data_column_list_result = match data_column_list_results.entry(root) { + Entry::Vacant(entry) => entry.insert( + self.chain + .get_data_columns_checking_early_attester_cache(root), + ), + Entry::Occupied(entry) => entry.into_mut(), + }; + + match data_column_list_result.as_ref() { + Ok(data_columns_sidecar_list) => { + 'inner: for data_column_sidecar in data_columns_sidecar_list.iter() { + if data_column_sidecar.index == *index { + self.send_response( + peer_id, + Response::DataColumnsByRoot(Some(data_column_sidecar.clone())), + request_id, + ); + send_data_column_count += 1; + break 'inner; + } + } + } + Err(e) => { + debug!( + self.log, + "Error fetching data column for peer"; + "peer" => %peer_id, + "request_root" => ?root, + "error" => ?e, + ); + } + } + } + } + debug!( + self.log, + "Received DataColumnsByRoot Request"; + "peer" => %peer_id, + "request_root" => %requested_root, + "request_indices" => ?requested_indices, + "returned" => send_data_column_count + ); + + // send stream termination + self.send_response(peer_id, Response::DataColumnsByRoot(None), request_id); + } + /// Handle a `BlocksByRoot` request from the peer. pub fn handle_light_client_bootstrap( self: &Arc, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 8894d5d9fd9..7acb99a616e 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -361,6 +361,10 @@ impl NetworkBeaconProcessor { .iter() .map(|wrapped| wrapped.n_blobs()) .sum::(); + let n_data_columns = downloaded_blocks + .iter() + .map(|wrapped| wrapped.n_data_columns()) + .sum::(); match self.process_backfill_blocks(downloaded_blocks) { (_, Ok(_)) => { @@ -370,6 +374,7 @@ impl NetworkBeaconProcessor { "last_block_slot" => end_slot, "processed_blocks" => sent_blocks, "processed_blobs" => n_blobs, + "processed_data_columns" => n_data_columns, "service"=> "sync"); BatchProcessResult::Success { was_non_empty: sent_blocks > 0, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 3d88f764e0e..23b14ac1439 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::mpsc; use tokio_stream::wrappers::UnboundedReceiverStream; -use types::{BlobSidecar, EthSpec, SignedBeaconBlock}; +use types::{BlobSidecar, DataColumnSidecar, EthSpec, SignedBeaconBlock}; /// Handles messages from the network and routes them to the appropriate service to be handled. pub struct Router { @@ -216,6 +216,10 @@ impl Router { self.network_beacon_processor .send_blobs_by_roots_request(peer_id, request_id, request), ), + Request::DataColumnsByRoot(request) => self.handle_beacon_processor_send_result( + self.network_beacon_processor + .send_data_columns_by_roots_request(peer_id, request_id, request), + ), Request::LightClientBootstrap(request) => self.handle_beacon_processor_send_result( self.network_beacon_processor .send_light_client_bootstrap_request(peer_id, request_id, request), @@ -250,6 +254,9 @@ impl Router { Response::BlobsByRoot(blob) => { self.on_blobs_by_root_response(peer_id, request_id, blob); } + Response::DataColumnsByRoot(data_column) => { + self.on_data_columns_by_root_response(peer_id, request_id, data_column); + } Response::LightClientBootstrap(_) => unreachable!(), } } @@ -637,6 +644,16 @@ impl Router { }); } + /// Handle a `DataColumnsByRoot` response from the peer. + pub fn on_data_columns_by_root_response( + &mut self, + _peer_id: PeerId, + _request_id: RequestId, + _data_column_sidecar: Option>>, + ) { + // TODO(das) implement `DataColumnsByRoot` response handling + } + fn handle_beacon_processor_send_result( &mut self, result: Result<(), crate::network_beacon_processor::Error>, diff --git a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs index 8c60621f1c7..989bfab00f0 100644 --- a/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs +++ b/beacon_node/network/src/sync/block_lookups/single_block_lookup.rs @@ -179,11 +179,13 @@ impl SingleBlockLookup { block_root: _, downloaded_block, downloaded_blobs, + downloaded_data_columns, } = components; if let Some(block) = downloaded_block { existing_components.merge_block(block); } existing_components.merge_blobs(downloaded_blobs); + existing_components.merge_data_columns(downloaded_data_columns); } else { self.child_components = Some(components); } diff --git a/beacon_node/network/src/sync/block_lookups/tests.rs b/beacon_node/network/src/sync/block_lookups/tests.rs index c506696b9d3..f81f16dfb57 100644 --- a/beacon_node/network/src/sync/block_lookups/tests.rs +++ b/beacon_node/network/src/sync/block_lookups/tests.rs @@ -1249,7 +1249,7 @@ mod deneb_only { RequestTrigger::GossipUnknownParentBlock { .. } => { bl.search_child_block( block_root, - ChildComponents::new(block_root, Some(block.clone()), None), + ChildComponents::new(block_root, Some(block.clone()), None, None), &[peer_id], &mut cx, ); @@ -1274,7 +1274,7 @@ mod deneb_only { *lookup_blobs.index_mut(0) = Some(single_blob); bl.search_child_block( child_root, - ChildComponents::new(child_root, None, Some(lookup_blobs)), + ChildComponents::new(child_root, None, Some(lookup_blobs), None), &[peer_id], &mut cx, ); diff --git a/beacon_node/network/src/sync/manager.rs b/beacon_node/network/src/sync/manager.rs index acb735ea442..7fff76dd9eb 100644 --- a/beacon_node/network/src/sync/manager.rs +++ b/beacon_node/network/src/sync/manager.rs @@ -644,7 +644,7 @@ impl SyncManager { block_root, parent_root, blob_slot, - ChildComponents::new(block_root, None, Some(blobs)), + ChildComponents::new(block_root, None, Some(blobs), None), ); } SyncMessage::UnknownBlockHashFromAttestation(peer_id, block_hash) => { diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 96e02b80ff8..7e5b539406c 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -27,6 +27,8 @@ pub enum Error { AnchorInfoConcurrentMutation, /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. BlobInfoConcurrentMutation, + /// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied. + DataColumnInfoConcurrentMutation, /// The block or state is unavailable due to weak subjectivity sync. HistoryUnavailable, /// State reconstruction cannot commence because not all historic blocks are known. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 4bdb0deca33..5acd8ff8445 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -12,9 +12,10 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, - BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, + ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, + DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, + STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::metrics; use crate::{ @@ -40,6 +41,7 @@ use std::path::Path; use std::sync::Arc; use std::time::Duration; use types::blob_sidecar::BlobSidecarList; +use types::data_column_sidecar::DataColumnSidecarList; use types::*; /// On-disk database that stores finalized states efficiently. @@ -57,6 +59,8 @@ pub struct HotColdDB, Cold: ItemStore> { anchor_info: RwLock>, /// The starting slots for the range of blobs stored in the database. blob_info: RwLock, + /// The starting slots for the range of data columns stored in the database. + data_column_info: RwLock, pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, @@ -82,6 +86,7 @@ pub struct HotColdDB, Cold: ItemStore> { struct BlockCache { block_cache: LruCache>, blob_cache: LruCache>, + data_column_cache: LruCache>, } impl BlockCache { @@ -89,6 +94,7 @@ impl BlockCache { Self { block_cache: LruCache::new(size), blob_cache: LruCache::new(size), + data_column_cache: LruCache::new(size), } } pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock) { @@ -97,12 +103,25 @@ impl BlockCache { pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { self.blob_cache.put(block_root, blobs); } + pub fn put_data_columns( + &mut self, + block_root: Hash256, + data_columns: DataColumnSidecarList, + ) { + self.data_column_cache.put(block_root, data_columns); + } pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) } pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { self.blob_cache.get(block_root) } + pub fn get_data_columns<'a>( + &'a mut self, + block_root: &Hash256, + ) -> Option<&'a DataColumnSidecarList> { + self.data_column_cache.get(block_root) + } pub fn delete_block(&mut self, block_root: &Hash256) { let _ = self.block_cache.pop(block_root); } @@ -176,6 +195,7 @@ impl HotColdDB, MemoryStore> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: MemoryStore::open(), blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -213,6 +233,7 @@ impl HotColdDB, LevelDB> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: LevelDB::open(cold_path)?, blobs_db: LevelDB::open(blobs_db_path)?, hot_db: LevelDB::open(hot_path)?, @@ -290,11 +311,35 @@ impl HotColdDB, LevelDB> { }, }; db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; + + let data_column_info = db.load_data_column_info()?; + let new_data_column_info = match &data_column_info { + // TODO[das]: update to EIP-7594 fork + Some(data_column_info) => { + // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + let oldest_data_column_slot = + data_column_info.oldest_data_column_slot.or(deneb_fork_slot); + DataColumnInfo { + oldest_data_column_slot, + } + } + // First start. + None => DataColumnInfo { + // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + oldest_data_column_slot: deneb_fork_slot, + }, + }; + db.compare_and_set_data_column_info_with_write( + <_>::default(), + new_data_column_info.clone(), + )?; + info!( db.log, "Blob DB initialized"; "path" => ?blobs_db_path, "oldest_blob_slot" => ?new_blob_info.oldest_blob_slot, + "oldest_data_column_slot" => ?new_data_column_info.oldest_data_column_slot, ); // Ensure that the schema version of the on-disk database matches the software. @@ -607,6 +652,19 @@ impl, Cold: ItemStore> HotColdDB ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes())); } + pub fn data_columns_as_kv_store_ops( + &self, + key: &Hash256, + data_columns: DataColumnSidecarList, + ops: &mut Vec, + ) { + let db_key = get_key_for_col(DBColumn::BeaconDataColumn.into(), key.as_bytes()); + ops.push(KeyValueStoreOp::PutKeyValue( + db_key, + data_columns.as_ssz_bytes(), + )); + } + pub fn put_state_summary( &self, state_root: &Hash256, @@ -896,6 +954,14 @@ impl, Cold: ItemStore> HotColdDB self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch); } + StoreOp::PutDataColumns(block_root, data_columns) => { + self.data_columns_as_kv_store_ops( + &block_root, + data_columns, + &mut key_value_batch, + ); + } + StoreOp::PutStateSummary(state_root, summary) => { key_value_batch.push(summary.as_kv_store_op(state_root)); } @@ -920,6 +986,12 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + StoreOp::DeleteDataColumns(block_root) => { + let key = + get_key_for_col(DBColumn::BeaconDataColumn.into(), block_root.as_bytes()); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + StoreOp::DeleteState(state_root, slot) => { let state_summary_key = get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); @@ -950,9 +1022,10 @@ impl, Cold: ItemStore> HotColdDB batch: Vec>, ) -> Result<(), Error> { let mut blobs_to_delete = Vec::new(); + let mut data_columns_to_delete = Vec::new(); let (blobs_ops, hot_db_ops): (Vec>, Vec>) = batch.into_iter().partition(|store_op| match store_op { - StoreOp::PutBlobs(_, _) => true, + StoreOp::PutBlobs(_, _) | StoreOp::PutDataColumns(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { Ok(Some(blob_sidecar_list)) => { @@ -969,6 +1042,22 @@ impl, Cold: ItemStore> HotColdDB } true } + StoreOp::DeleteDataColumns(block_root) => { + match self.get_data_columns(block_root) { + Ok(Some(data_column_sidecar_list)) => { + data_columns_to_delete.push((*block_root, data_column_sidecar_list)); + } + Err(e) => { + error!( + self.log, "Error getting data columns"; + "block_root" => %block_root, + "error" => ?e + ); + } + _ => (), + } + true + } StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, _ => false, }); @@ -1000,10 +1089,19 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops.iter_mut() { let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), + StoreOp::PutDataColumns(block_root, _) => { + StoreOp::DeleteDataColumns(*block_root) + } StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, + StoreOp::DeleteDataColumns(_) => match data_columns_to_delete.pop() { + Some((block_root, data_columns)) => { + StoreOp::PutDataColumns(block_root, data_columns) + } + None => return Err(HotColdDBError::Rollback.into()), + }, _ => return Err(HotColdDBError::Rollback.into()), }; *op = reverse_op; @@ -1021,6 +1119,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => (), + StoreOp::PutDataColumns(_, _) => (), + StoreOp::PutState(_, _) => (), StoreOp::PutStateSummary(_, _) => (), @@ -1035,6 +1135,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteBlobs(_) => (), + StoreOp::DeleteDataColumns(_) => (), + StoreOp::DeleteState(_, _) => (), StoreOp::DeleteExecutionPayload(_) => (), @@ -1448,6 +1550,32 @@ impl, Cold: ItemStore> HotColdDB } } + /// Fetch data_columns for a given block from the store. + pub fn get_data_columns( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + // Check the cache. + if let Some(data_columns) = self.block_cache.lock().get_data_columns(block_root) { + metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT); + return Ok(Some(data_columns.clone())); + } + + match self + .blobs_db + .get_bytes(DBColumn::BeaconDataColumn.into(), block_root.as_bytes())? + { + Some(ref data_columns_bytes) => { + let data_columns = DataColumnSidecarList::from_ssz_bytes(data_columns_bytes)?; + self.block_cache + .lock() + .put_data_columns(*block_root, data_columns.clone()); + Ok(Some(data_columns)) + } + None => Ok(None), + } + } + /// Get a reference to the `ChainSpec` used by the database. pub fn get_chain_spec(&self) -> &ChainSpec { &self.spec @@ -1644,6 +1772,24 @@ impl, Cold: ItemStore> HotColdDB self.blob_info.read_recursive().clone() } + /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. + pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { + let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { + std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) + }); + let data_column_info = DataColumnInfo { + oldest_data_column_slot, + }; + self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info) + } + + /// Get a clone of the store's data column info. + /// + /// To do mutations, use `compare_and_set_data_column_info`. + pub fn get_data_column_info(&self) -> DataColumnInfo { + self.data_column_info.read_recursive().clone() + } + /// Atomically update the blob info from `prev_value` to `new_value`. /// /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other @@ -1689,6 +1835,54 @@ impl, Cold: ItemStore> HotColdDB blob_info.as_kv_store_op(BLOB_INFO_KEY) } + /// Atomically update the data column info from `prev_value` to `new_value`. + /// + /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other + /// values. + /// + /// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided + /// is not correct. + pub fn compare_and_set_data_column_info( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result { + let mut data_column_info = self.data_column_info.write(); + if *data_column_info == prev_value { + let kv_op = self.store_data_column_info_in_batch(&new_value); + *data_column_info = new_value; + Ok(kv_op) + } else { + Err(Error::DataColumnInfoConcurrentMutation) + } + } + + /// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately. + pub fn compare_and_set_data_column_info_with_write( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result<(), Error> { + let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?; + self.hot_db.do_atomically(vec![kv_store_op]) + } + + /// Load the blob info from disk, but do not set `self.data_column_info`. + fn load_data_column_info(&self) -> Result, Error> { + self.hot_db.get(&DATA_COLUMN_INFO_KEY) + } + + /// Store the given `data_column_info` to disk. + /// + /// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues + /// with recursive locking. + fn store_data_column_info_in_batch( + &self, + data_column_info: &DataColumnInfo, + ) -> KeyValueStoreOp { + data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY) + } + /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index e86689b0cf1..31b0bd5658e 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -42,6 +42,7 @@ pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; +use types::data_column_sidecar::DataColumnSidecarList; pub use types::*; pub type ColumnIter<'a, K> = Box), Error>> + 'a>; @@ -203,11 +204,13 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), PutBlobs(Hash256, BlobSidecarList), + PutDataColumns(Hash256, DataColumnSidecarList), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteBlobs(Hash256), + DeleteDataColumns(Hash256), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), KeyValueOp(KeyValueStoreOp), @@ -223,6 +226,8 @@ pub enum DBColumn { BeaconBlock, #[strum(serialize = "blb")] BeaconBlob, + #[strum(serialize = "bdc")] + BeaconDataColumn, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, @@ -294,6 +299,7 @@ impl DBColumn { | Self::BeaconBlock | Self::BeaconState | Self::BeaconBlob + | Self::BeaconDataColumn | Self::BeaconStateSummary | Self::BeaconStateTemporary | Self::ExecPayload diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 1675051bd80..5aada7c95a5 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); +pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -152,3 +153,30 @@ impl StoreItem for BlobInfo { Ok(Self::from_ssz_bytes(bytes)?) } } + +/// Database parameters relevant to data column sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] +pub struct DataColumnInfo { + /// The slot after which data columns are or *will be* available (>=). + /// + /// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which + /// data columns will be available. + /// + /// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is + /// not yet known. + pub oldest_data_column_slot: Option, +} + +impl StoreItem for DataColumnInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 2d901fdd932..a5aba47830c 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -105,6 +105,10 @@ lazy_static! { "store_beacon_blobs_cache_hit_total", "Number of hits to the store's blob cache" ); + pub static ref BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: Result = try_create_int_counter( + "store_beacon_data_columns_cache_hit_total", + "Number of hits to the store's data column cache" + ); pub static ref BEACON_BLOCK_READ_TIMES: Result = try_create_histogram( "store_beacon_block_read_overhead_seconds", "Overhead on reading a beacon block from the DB (e.g., decoding)" diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index ac22c8e7613..e5f94bfe3ae 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -1,5 +1,6 @@ use crate::application_domain::{ApplicationDomain, APPLICATION_DOMAIN_BUILDER}; use crate::blob_sidecar::BlobIdentifier; +use crate::data_column_sidecar::DataColumnIdentifier; use crate::*; use int_to_bytes::int_to_bytes4; use serde::Deserialize; @@ -202,6 +203,7 @@ pub struct ChainSpec { */ pub max_request_blocks_deneb: u64, pub max_request_blob_sidecars: u64, + pub max_request_data_column_sidecars: u64, pub min_epochs_for_blob_sidecars_requests: u64, pub blob_sidecar_subnet_count: u64, pub data_column_sidecar_subnet_count: u64, @@ -214,6 +216,7 @@ pub struct ChainSpec { pub max_blocks_by_root_request: usize, pub max_blocks_by_root_request_deneb: usize, pub max_blobs_by_root_request: usize, + pub max_data_columns_by_root_request: usize, /* * Application params @@ -723,6 +726,7 @@ impl ChainSpec { */ max_request_blocks_deneb: default_max_request_blocks_deneb(), max_request_blob_sidecars: default_max_request_blob_sidecars(), + max_request_data_column_sidecars: default_max_request_data_column_sidecars(), min_epochs_for_blob_sidecars_requests: default_min_epochs_for_blob_sidecars_requests(), blob_sidecar_subnet_count: default_blob_sidecar_subnet_count(), data_column_sidecar_subnet_count: default_data_column_sidecar_subnet_count(), @@ -733,6 +737,7 @@ impl ChainSpec { max_blocks_by_root_request: default_max_blocks_by_root_request(), max_blocks_by_root_request_deneb: default_max_blocks_by_root_request_deneb(), max_blobs_by_root_request: default_max_blobs_by_root_request(), + max_data_columns_by_root_request: default_data_columns_by_root_request(), /* * Application specific @@ -990,6 +995,7 @@ impl ChainSpec { */ max_request_blocks_deneb: default_max_request_blocks_deneb(), max_request_blob_sidecars: default_max_request_blob_sidecars(), + max_request_data_column_sidecars: default_max_request_data_column_sidecars(), min_epochs_for_blob_sidecars_requests: 16384, blob_sidecar_subnet_count: default_blob_sidecar_subnet_count(), data_column_sidecar_subnet_count: default_data_column_sidecar_subnet_count(), @@ -1000,6 +1006,7 @@ impl ChainSpec { max_blocks_by_root_request: default_max_blocks_by_root_request(), max_blocks_by_root_request_deneb: default_max_blocks_by_root_request_deneb(), max_blobs_by_root_request: default_max_blobs_by_root_request(), + max_data_columns_by_root_request: default_data_columns_by_root_request(), /* * Application specific @@ -1170,6 +1177,9 @@ pub struct Config { #[serde(default = "default_max_request_blob_sidecars")] #[serde(with = "serde_utils::quoted_u64")] max_request_blob_sidecars: u64, + #[serde(default = "default_max_request_data_column_sidecars")] + #[serde(with = "serde_utils::quoted_u64")] + max_request_data_column_sidecars: u64, #[serde(default = "default_min_epochs_for_blob_sidecars_requests")] #[serde(with = "serde_utils::quoted_u64")] min_epochs_for_blob_sidecars_requests: u64, @@ -1280,6 +1290,10 @@ const fn default_max_request_blob_sidecars() -> u64 { 768 } +const fn default_max_request_data_column_sidecars() -> u64 { + 16384 +} + const fn default_min_epochs_for_blob_sidecars_requests() -> u64 { 4096 } @@ -1329,6 +1343,20 @@ fn max_blobs_by_root_request_common(max_request_blob_sidecars: u64) -> usize { .len() } +fn max_data_columns_by_root_request_common(max_request_data_column_sidecars: u64) -> usize { + let max_request_data_column_sidecars = max_request_data_column_sidecars as usize; + let empty_data_column_id = DataColumnIdentifier { + block_root: Hash256::zero(), + index: 0, + }; + RuntimeVariableList::from_vec( + vec![empty_data_column_id; max_request_data_column_sidecars], + max_request_data_column_sidecars, + ) + .as_ssz_bytes() + .len() +} + fn default_max_blocks_by_root_request() -> usize { max_blocks_by_root_request_common(default_max_request_blocks()) } @@ -1341,6 +1369,10 @@ fn default_max_blobs_by_root_request() -> usize { max_blobs_by_root_request_common(default_max_request_blob_sidecars()) } +fn default_data_columns_by_root_request() -> usize { + max_data_columns_by_root_request_common(default_max_request_data_column_sidecars()) +} + impl Default for Config { fn default() -> Self { let chain_spec = MainnetEthSpec::default_spec(); @@ -1458,6 +1490,7 @@ impl Config { attestation_subnet_shuffling_prefix_bits: spec.attestation_subnet_shuffling_prefix_bits, max_request_blocks_deneb: spec.max_request_blocks_deneb, max_request_blob_sidecars: spec.max_request_blob_sidecars, + max_request_data_column_sidecars: spec.max_request_data_column_sidecars, min_epochs_for_blob_sidecars_requests: spec.min_epochs_for_blob_sidecars_requests, blob_sidecar_subnet_count: spec.blob_sidecar_subnet_count, data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count, @@ -1524,6 +1557,7 @@ impl Config { maximum_gossip_clock_disparity_millis, max_request_blocks_deneb, max_request_blob_sidecars, + max_request_data_column_sidecars, min_epochs_for_blob_sidecars_requests, blob_sidecar_subnet_count, data_column_sidecar_subnet_count, @@ -1583,6 +1617,7 @@ impl Config { maximum_gossip_clock_disparity_millis, max_request_blocks_deneb, max_request_blob_sidecars, + max_request_data_column_sidecars, min_epochs_for_blob_sidecars_requests, blob_sidecar_subnet_count, data_column_sidecar_subnet_count, @@ -1593,6 +1628,9 @@ impl Config { max_request_blocks_deneb, ), max_blobs_by_root_request: max_blobs_by_root_request_common(max_request_blob_sidecars), + max_data_columns_by_root_request: max_data_columns_by_root_request_common( + max_request_data_column_sidecars, + ), ..chain_spec.clone() }) diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index 310c13a5e94..a6fc4c56745 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -1,21 +1,37 @@ use crate::beacon_block_body::KzgCommitments; use crate::test_utils::TestRandom; -use crate::{BlobSidecarList, EthSpec, Hash256, KzgProofs, SignedBeaconBlockHeader, Slot}; +use crate::{ + BeaconBlockHeader, BlobSidecarList, EthSpec, Hash256, KzgProofs, SignedBeaconBlockHeader, Slot, +}; +use bls::Signature; use derivative::Derivative; +use kzg::{KzgCommitment, KzgProof}; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; use safe_arith::ArithError; use serde::{Deserialize, Serialize}; +use ssz::Encode; use ssz_derive::{Decode, Encode}; +use ssz_types::typenum::Unsigned; use ssz_types::Error as SszError; use ssz_types::{FixedVector, VariableList}; +use std::sync::Arc; use test_random_derive::TestRandom; use tree_hash::TreeHash; use tree_hash_derive::TreeHash; pub type ColumnIndex = u64; pub type Cell = FixedVector::FieldElementsPerCell>; -pub type DataColumn = VariableList, ::MaxBlobsPerBlock>; +pub type DataColumn = VariableList, ::MaxBlobCommitmentsPerBlock>; + +/// Container of the data that identifies an individual data column. +#[derive( + Serialize, Deserialize, Encode, Decode, TreeHash, Copy, Clone, Debug, PartialEq, Eq, Hash, +)] +pub struct DataColumnIdentifier { + pub block_root: Hash256, + pub index: ColumnIndex, +} #[derive( Debug, @@ -121,6 +137,13 @@ impl DataColumnSidecar { Ok(column) } + pub fn id(&self) -> DataColumnIdentifier { + DataColumnIdentifier { + block_root: self.block_root(), + index: self.index, + } + } + pub fn slot(&self) -> Slot { self.signed_block_header.message.slot } @@ -128,6 +151,45 @@ impl DataColumnSidecar { pub fn block_root(&self) -> Hash256 { self.signed_block_header.message.tree_hash_root() } + + pub fn min_size() -> usize { + // min size is one cell + Self { + index: 0, + column: VariableList::new(vec![Cell::::default()]).unwrap(), + kzg_commitments: VariableList::new(vec![KzgCommitment::empty_for_testing()]).unwrap(), + kzg_proofs: VariableList::new(vec![KzgProof::empty()]).unwrap(), + signed_block_header: SignedBeaconBlockHeader { + message: BeaconBlockHeader::empty(), + signature: Signature::empty(), + }, + kzg_commitments_inclusion_proof: Default::default(), + } + .as_ssz_bytes() + .len() + } + + pub fn max_size() -> usize { + Self { + index: 0, + column: VariableList::new(vec![Cell::::default(); T::MaxBlobsPerBlock::to_usize()]) + .unwrap(), + kzg_commitments: VariableList::new(vec![ + KzgCommitment::empty_for_testing(); + T::MaxBlobsPerBlock::to_usize() + ]) + .unwrap(), + kzg_proofs: VariableList::new(vec![KzgProof::empty(); T::MaxBlobsPerBlock::to_usize()]) + .unwrap(), + signed_block_header: SignedBeaconBlockHeader { + message: BeaconBlockHeader::empty(), + signature: Signature::empty(), + }, + kzg_commitments_inclusion_proof: Default::default(), + } + .as_ssz_bytes() + .len() + } } #[derive(Debug)] @@ -151,6 +213,11 @@ impl From for DataColumnSidecarError { } } +pub type DataColumnSidecarList = + VariableList>, ::DataColumnCount>; +pub type FixedDataColumnSidecarList = + FixedVector>>, ::DataColumnCount>; + #[cfg(test)] mod test { use crate::beacon_block::EmptyBlock;