From 2e5fd6f53a8d489ec07dc6f54bf039d5c6eb6809 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Mon, 26 Feb 2024 01:34:38 +0000 Subject: [PATCH] blockstore: send duplicate proofs for chained merkle root conflicts --- core/src/window_service.rs | 23 +++ ledger/src/blockstore.rs | 249 +++++++++++++++++++++++++++++++-- ledger/src/blockstore_meta.rs | 6 + ledger/src/shred.rs | 23 ++- ledger/src/shred/merkle.rs | 68 ++++++++- ledger/src/shred/shred_code.rs | 7 + ledger/src/shred/shred_data.rs | 7 + sdk/src/feature_set.rs | 5 + 8 files changed, 377 insertions(+), 11 deletions(-) diff --git a/core/src/window_service.rs b/core/src/window_service.rs index 7d939eeea8de44..17c5b0ea945182 100644 --- a/core/src/window_service.rs +++ b/core/src/window_service.rs @@ -169,6 +169,11 @@ fn run_check_duplicate( shred_slot, &root_bank, ); + let chained_merkle_conflict_duplicate_proofs = cluster_nodes::check_feature_activation( + &feature_set::chained_merkle_conflict_duplicate_proofs::id(), + shred_slot, + &root_bank, + ); let (shred1, shred2) = match shred { PossibleDuplicateShred::LastIndexConflict(shred, conflict) | PossibleDuplicateShred::ErasureConflict(shred, conflict) => { @@ -196,6 +201,24 @@ fn run_check_duplicate( return Ok(()); } } + PossibleDuplicateShred::ChainedMerkleRootConflict(shred, conflict) => { + if chained_merkle_conflict_duplicate_proofs { + // Although this proof can be immediately stored on detection, we wait until + // here in order to check the feature flag, as storage in blockstore can + // preclude the detection of other duplicate proofs in this slot + if blockstore.has_duplicate_shreds_in_slot(shred_slot) { + return Ok(()); + } + blockstore.store_duplicate_slot( + shred_slot, + conflict.clone(), + shred.clone().into_payload(), + )?; + (shred, conflict) + } else { + return Ok(()); + } + } PossibleDuplicateShred::Exists(shred) => { // Unlike the other cases we have to wait until here to decide to handle the duplicate and store // in blockstore. This is because the duplicate could have been part of the same insert batch, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f8c8330843dfce..b5c7f18914c7a5 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -67,11 +67,15 @@ use { borrow::Cow, cell::RefCell, cmp, - collections::{hash_map::Entry as HashMapEntry, BTreeSet, HashMap, HashSet, VecDeque}, + collections::{ + btree_map::Entry as BTreeMapEntry, hash_map::Entry as HashMapEntry, BTreeMap, BTreeSet, + HashMap, HashSet, VecDeque, + }, convert::TryInto, fmt::Write, fs, io::{Error as IoError, ErrorKind}, + ops::Bound, path::{Path, PathBuf}, rc::Rc, sync::{ @@ -141,6 +145,7 @@ pub enum PossibleDuplicateShred { LastIndexConflict(/* original */ Shred, /* conflict */ Vec), // The index of this shred conflicts with `slot_meta.last_index` ErasureConflict(/* original */ Shred, /* conflict */ Vec), // The coding shred has a conflict in the erasure_meta MerkleRootConflict(/* original */ Shred, /* conflict */ Vec), // Merkle root conflict in the same fec set + ChainedMerkleRootConflict(/* original */ Shred, /* conflict */ Vec), // Merkle root chaining conflict with previous fec set } impl PossibleDuplicateShred { @@ -150,6 +155,7 @@ impl PossibleDuplicateShred { Self::LastIndexConflict(shred, _) => shred.slot(), Self::ErasureConflict(shred, _) => shred.slot(), Self::MerkleRootConflict(shred, _) => shred.slot(), + Self::ChainedMerkleRootConflict(shred, _) => shred.slot(), } } } @@ -486,6 +492,55 @@ impl Blockstore { self.erasure_meta_cf.get((slot, u64::from(fec_set_index))) } + /// Attempts to find the previous consecutive erasure set for `erasure_set`. + /// + /// Checks the map `erasure_metas`, if not present scans blockstore. Returns None + /// if the previous consecutive erasure set is not present in either. + fn previous_erasure_set( + &self, + erasure_set: ErasureSetId, + erasure_metas: &BTreeMap>, + ) -> Result> { + let (slot, fec_set_index) = erasure_set.store_key(); + + // Check the previous entry from the in memory map to see if it is the consecutive + // set to `erasure set` + let candidate_erasure_entry = erasure_metas + .range(( + Bound::Included(ErasureSetId::new(slot, 0)), + Bound::Excluded(erasure_set), + )) + .next_back(); + if let Some((candidate_erasure_set, candidate_erasure_meta)) = candidate_erasure_entry { + if candidate_erasure_meta.as_ref().next_set_index() == fec_set_index { + return Ok(Some(*candidate_erasure_set)); + } + } + + // Consecutive set was not found in memory, scan blockstore for a potential candidate + let Some(((candidate_slot, candidate_fec_set_index), candidate_erasure_meta)) = self + .erasure_meta_cf + .iter(IteratorMode::From( + (slot, u64::from(fec_set_index)), + IteratorDirection::Reverse, + ))? + .next() + else { + // Blockstore is empty + return Ok(None); + }; + let candidate_erasure_meta: ErasureMeta = deserialize(candidate_erasure_meta.as_ref())?; + // Check if this is actually the consecutive erasure set + if candidate_erasure_meta.next_set_index() == fec_set_index { + let candidate_erasure_set = ErasureSetId::new( + candidate_slot, + u32::try_from(candidate_fec_set_index).unwrap(), + ); + return Ok(Some(candidate_erasure_set)); + } + Ok(None) + } + fn merkle_root_meta(&self, erasure_set: ErasureSetId) -> Result> { self.merkle_root_meta_cf.get(erasure_set.store_key()) } @@ -758,7 +813,7 @@ impl Blockstore { fn try_shred_recovery( &self, - erasure_metas: &HashMap>, + erasure_metas: &BTreeMap>, index_working_set: &mut HashMap, prev_inserted_shreds: &HashMap, reed_solomon_cache: &ReedSolomonCache, @@ -877,7 +932,7 @@ impl Blockstore { let mut write_batch = self.db.batch()?; let mut just_inserted_shreds = HashMap::with_capacity(shreds.len()); - let mut erasure_metas = HashMap::new(); + let mut erasure_metas = BTreeMap::new(); let mut merkle_root_metas = HashMap::new(); let mut slot_meta_working_set = HashMap::new(); let mut index_working_set = HashMap::new(); @@ -1202,7 +1257,7 @@ impl Blockstore { fn check_insert_coding_shred( &self, shred: Shred, - erasure_metas: &mut HashMap>, + erasure_metas: &mut BTreeMap>, merkle_root_metas: &mut HashMap>, index_working_set: &mut HashMap, write_batch: &mut WriteBatch, @@ -1258,11 +1313,13 @@ impl Blockstore { } } + let mut new_erasure_meta = false; let erasure_meta_entry = erasure_metas.entry(erasure_set).or_insert_with(|| { self.erasure_meta(erasure_set) .expect("Expect database get to succeed") .map(WorkingEntry::Clean) .unwrap_or_else(|| { + new_erasure_meta = true; WorkingEntry::Dirty(ErasureMeta::from_coding_shred(&shred).unwrap()) }) }); @@ -1330,6 +1387,17 @@ impl Blockstore { .or_insert(WorkingEntry::Dirty(MerkleRootMeta::from_shred(&shred))); } + if new_erasure_meta { + // First coding shred from this erasure batch, check merkle root chaining to previous and next erasure batches + self.check_chained_merkle_root_consistency( + &shred, + just_received_shreds, + erasure_metas, + merkle_root_metas, + duplicate_shreds, + ); + } + if let HashMapEntry::Vacant(entry) = just_received_shreds.entry(shred.id()) { metrics.num_coding_shreds_inserted += 1; entry.insert(shred); @@ -1410,7 +1478,7 @@ impl Blockstore { fn check_insert_data_shred( &self, shred: Shred, - erasure_metas: &mut HashMap>, + erasure_metas: &mut BTreeMap>, merkle_root_metas: &mut HashMap>, index_working_set: &mut HashMap, slot_meta_working_set: &mut HashMap, @@ -1505,7 +1573,7 @@ impl Blockstore { just_inserted_shreds.insert(shred.id(), shred); index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; - if let HashMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) { + if let BTreeMapEntry::Vacant(entry) = erasure_metas.entry(erasure_set) { if let Some(meta) = self.erasure_meta(erasure_set).unwrap() { entry.insert(WorkingEntry::Clean(meta)); } @@ -1621,6 +1689,169 @@ impl Blockstore { false } + /// Returns true if there is no chaining conflict between + /// the `shred` and `merkle_root_meta` of the next or previous + /// FEC set, or if shreds from the next or previous set are + /// yet to be received. + /// + /// Otherwise return false and add duplicate proof to + /// `duplicate_shreds`. + /// + /// This is intended to be used right after `shred`'s `erasure_meta` + /// has been created for the first time and loaded into `erasure_metas`. + fn check_chained_merkle_root_consistency( + &self, + shred: &Shred, + just_inserted_shreds: &HashMap, + erasure_metas: &BTreeMap>, + merkle_root_metas: &mut HashMap>, + duplicate_shreds: &mut Vec, + ) -> bool { + let slot = shred.slot(); + let erasure_set = shred.erasure_set(); + let fec_set_index = shred.fec_set_index(); + let Some(erasure_meta_entry) = erasure_metas.get(&erasure_set) else { + error!( + "Checking chained merkle root consistency on an erasure meta {:?} + that is not loaded in memory, programmer error", + erasure_set + ); + return true; + }; + let erasure_meta = erasure_meta_entry.as_ref(); + debug_assert!(erasure_meta.check_coding_shred(shred)); + + if fec_set_index == 0 { + // Although the first fec set chains to the last fec set of the parent block, + // if this chain is incorrect we do not know which block is the duplicate until votes + // are received. We instead delay this check until the block reaches duplicate + // confirmation. + return true; + } + + // If a shred from the next fec set has already been inserted, check the chaining + let next_erasure_set = ErasureSetId::new(slot, erasure_meta.next_set_index()); + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(next_erasure_set) { + if let Some(meta) = self.merkle_root_meta(next_erasure_set).unwrap() { + entry.insert(WorkingEntry::Clean(meta)); + } + } + if let Some(next_merkle_root_meta) = + merkle_root_metas.get(&next_erasure_set).map(AsRef::as_ref) + { + let next_shred_id = ShredId::new( + slot, + next_merkle_root_meta.first_received_shred_index(), + next_merkle_root_meta.first_received_shred_type(), + ); + let next_shred = + Self::get_shred_from_just_inserted_or_db(self, just_inserted_shreds, next_shred_id) + .expect("Shred indicated by merkle root meta must exist") + .into_owned(); + let merkle_root = shred.merkle_root().ok(); + let chained_merkle_root = shred::layout::get_chained_merkle_root(&next_shred); + + if !self.check_chaining(merkle_root, chained_merkle_root) { + warn!( + "Received conflicting chained merkle roots for slot: {}, + shred {:?} type {:?} has merkle root {:?}, however + next fec set shred {:?} type {:?} chains to merkle root {:?}. Reporting as duplicate", + slot, + erasure_set, + shred.shred_type(), + merkle_root, + next_erasure_set, + next_merkle_root_meta.first_received_shred_type(), + chained_merkle_root, + ); + + if !self.has_duplicate_shreds_in_slot(shred.slot()) { + duplicate_shreds.push(PossibleDuplicateShred::ChainedMerkleRootConflict( + shred.clone(), + next_shred, + )); + } + return false; + } + } + + // If a shred from the previous fec set has already been inserted, check the chaining. + // Since we cannot compute the previous fec set index, we check the in memory map, otherwise + // check the previous key from blockstore to see if it is consecutive with our current set. + let Some(prev_erasure_set) = self + .previous_erasure_set(erasure_set, erasure_metas) + .expect("Expect database operations to succeed") + else { + // No shreds from the previous erasure batch have been received, + // so nothing to check. Once the previous erasure batch is received, + // we will verify this chain through the forward check above. + return true; + }; + if let HashMapEntry::Vacant(entry) = merkle_root_metas.entry(prev_erasure_set) { + if let Some(meta) = self.merkle_root_meta(prev_erasure_set).unwrap() { + entry.insert(WorkingEntry::Clean(meta)); + } + } + let Some(prev_merkle_root_meta_entry) = merkle_root_metas.get(prev_erasure_set) else { + // This can happen if the previous erasure meta was not consistent with the shred + // received. The merkle root meta is only created if the shred is inserted, + // however the erasure meta always exists. + return true; + }; + let prev_merkle_root_meta = prev_merkle_root_meta_entry.as_ref(); + + let prev_shred_id = ShredId::new( + slot, + prev_merkle_root_meta.first_received_shred_index(), + prev_merkle_root_meta.first_received_shred_type(), + ); + let prev_shred = + Self::get_shred_from_just_inserted_or_db(self, just_inserted_shreds, prev_shred_id) + .expect("Shred indicated by merkle root meta must exist") + .into_owned(); + let merkle_root = shred::layout::get_merkle_root(&prev_shred); + let chained_merkle_root = shred.chained_merkle_root().ok(); + + if !self.check_chaining(merkle_root, chained_merkle_root) { + warn!( + "Received conflicting chained merkle roots for slot: {}, + shred {:?} type {:?} chains to merkle root {:?}, however + previous fec set shred {:?} type {:?} has merkle root {:?}. Reporting as duplicate", + slot, + shred.erasure_set(), + shred.shred_type(), + chained_merkle_root, + prev_erasure_set, + prev_merkle_root_meta.first_received_shred_type(), + merkle_root, + ); + + if !self.has_duplicate_shreds_in_slot(shred.slot()) { + duplicate_shreds.push(PossibleDuplicateShred::ChainedMerkleRootConflict( + shred.clone(), + prev_shred, + )); + } + return false; + } + + true + } + + /// Checks if the chained merkle root == merkle root + /// + /// Returns true if no conflict, or if chained merkle roots are not enabled + fn check_chaining(&self, merkle_root: Option, chained_merkle_root: Option) -> bool { + let Some(chained_merkle_root) = chained_merkle_root else { + // Chained merkle roots have not been enabled yet + return true; + }; + if merkle_root == Some(chained_merkle_root) { + return true; + } + false + } + fn should_insert_data_shred( &self, shred: &Shred, @@ -6917,7 +7148,7 @@ pub mod tests { let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10); let coding_shred = coding_shreds[index as usize].clone(); - let mut erasure_metas = HashMap::new(); + let mut erasure_metas = BTreeMap::new(); let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); @@ -7112,7 +7343,7 @@ pub mod tests { setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index); let data_shred = data_shreds[0].clone(); - let mut erasure_metas = HashMap::new(); + let mut erasure_metas = BTreeMap::new(); let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); @@ -7329,7 +7560,7 @@ pub mod tests { 0, // version ); - let mut erasure_metas = HashMap::new(); + let mut erasure_metas = BTreeMap::new(); let mut merkle_root_metas = HashMap::new(); let mut index_working_set = HashMap::new(); let mut just_received_shreds = HashMap::new(); diff --git a/ledger/src/blockstore_meta.rs b/ledger/src/blockstore_meta.rs index c8b5f6cb4fee99..de93c6ea633250 100644 --- a/ledger/src/blockstore_meta.rs +++ b/ledger/src/blockstore_meta.rs @@ -392,6 +392,12 @@ impl ErasureMeta { self.first_coding_index..self.first_coding_index + num_coding } + pub(crate) fn next_set_index(&self) -> u32 { + u32::try_from(self.set_index) + .unwrap() + .saturating_add(self.config.num_data as u32) + } + pub(crate) fn status(&self, index: &Index) -> ErasureMetaStatus { use ErasureMetaStatus::*; diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 24d5000b65311b..51ef6a562ed346 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -292,10 +292,14 @@ impl ShredId { } /// Tuple which identifies erasure coding set that the shred belongs to. -#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, PartialOrd, Ord)] pub(crate) struct ErasureSetId(Slot, /*fec_set_index:*/ u32); impl ErasureSetId { + pub(crate) fn new(slot: Slot, fec_set_index: u32) -> Self { + Self(slot, fec_set_index) + } + pub(crate) fn slot(&self) -> Slot { self.0 } @@ -352,6 +356,7 @@ impl Shred { dispatch!(pub(crate) fn erasure_shard_index(&self) -> Result); dispatch!(pub fn into_payload(self) -> Vec); + dispatch!(pub fn chained_merkle_root(&self) -> Result); dispatch!(pub fn merkle_root(&self) -> Result); dispatch!(pub fn payload(&self) -> &Vec); dispatch!(pub fn sanitize(&self) -> Result<(), Error>); @@ -726,6 +731,22 @@ pub mod layout { } } + pub fn get_chained_merkle_root(shred: &[u8]) -> Option { + match get_shred_variant(shred).ok()? { + ShredVariant::LegacyCode | ShredVariant::LegacyData => None, + ShredVariant::MerkleCode { + proof_size, + chained, + resigned, + } => merkle::ShredCode::get_chained_merkle_root(shred, proof_size, chained, resigned), + ShredVariant::MerkleData { + proof_size, + chained, + resigned, + } => merkle::ShredData::get_chained_merkle_root(shred, proof_size, chained, resigned), + } + } + // Minimally corrupts the packet so that the signature no longer verifies. #[cfg(test)] pub(crate) fn corrupt_packet( diff --git a/ledger/src/shred/merkle.rs b/ledger/src/shred/merkle.rs index b785eeb6dc32cc..e00dc9edb13c74 100644 --- a/ledger/src/shred/merkle.rs +++ b/ledger/src/shred/merkle.rs @@ -193,6 +193,18 @@ impl ShredData { Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size, /*chained:*/ true, resigned)?) } + fn get_chained_merkle_root_offset(proof_size: u8, resigned: bool) -> Result { + Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size, /*chained:*/ true, resigned)?) + } + + pub(super) fn chained_merkle_root(&self) -> Result { + let offset = self.chained_merkle_root_offset()?; + self.payload + .get(offset..offset + SIZE_OF_MERKLE_ROOT) + .map(Hash::new) + .ok_or(Error::InvalidPayloadSize(self.payload.len())) + } + fn set_chained_merkle_root(&mut self, chained_merkle_root: &Hash) -> Result<(), Error> { let offset = self.chained_merkle_root_offset()?; let Some(buffer) = self.payload.get_mut(offset..offset + SIZE_OF_MERKLE_ROOT) else { @@ -307,6 +319,31 @@ impl ShredData { let node = get_merkle_node(shred, SIZE_OF_SIGNATURE..proof_offset).ok()?; get_merkle_root(index, node, proof).ok() } + + pub(super) fn get_chained_merkle_root( + shred: &[u8], + proof_size: u8, + chained: bool, + resigned: bool, + ) -> Option { + debug_assert_eq!( + shred::layout::get_shred_variant(shred).unwrap(), + ShredVariant::MerkleData { + proof_size, + chained, + resigned + } + ); + + if !chained { + return None; + } + + let offset = Self::get_chained_merkle_root_offset(proof_size, resigned).ok()?; + shred + .get(offset..offset + SIZE_OF_MERKLE_ROOT) + .map(Hash::new) + } } impl ShredCode { @@ -364,7 +401,11 @@ impl ShredCode { Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size, /*chained:*/ true, resigned)?) } - fn chained_merkle_root(&self) -> Result { + fn get_chained_merkle_root_offset(proof_size: u8, resigned: bool) -> Result { + Ok(Self::SIZE_OF_HEADERS + Self::capacity(proof_size, /*chained:*/ true, resigned)?) + } + + pub(super) fn chained_merkle_root(&self) -> Result { let offset = self.chained_merkle_root_offset()?; self.payload .get(offset..offset + SIZE_OF_MERKLE_ROOT) @@ -487,6 +528,31 @@ impl ShredCode { let node = get_merkle_node(shred, SIZE_OF_SIGNATURE..proof_offset).ok()?; get_merkle_root(index, node, proof).ok() } + + pub(super) fn get_chained_merkle_root( + shred: &[u8], + proof_size: u8, + chained: bool, + resigned: bool, + ) -> Option { + debug_assert_eq!( + shred::layout::get_shred_variant(shred).unwrap(), + ShredVariant::MerkleCode { + proof_size, + chained, + resigned + } + ); + + if !chained { + return None; + } + + let offset = Self::get_chained_merkle_root_offset(proof_size, resigned).ok()?; + shred + .get(offset..offset + SIZE_OF_MERKLE_ROOT) + .map(Hash::new) + } } impl<'a> ShredTrait<'a> for ShredData { diff --git a/ledger/src/shred/shred_code.rs b/ledger/src/shred/shred_code.rs index 0ad97a0f729a77..067d7edaf437eb 100644 --- a/ledger/src/shred/shred_code.rs +++ b/ledger/src/shred/shred_code.rs @@ -47,6 +47,13 @@ impl ShredCode { } } + pub(super) fn chained_merkle_root(&self) -> Result { + match self { + Self::Legacy(_) => Err(Error::InvalidShredType), + Self::Merkle(shred) => shred.chained_merkle_root(), + } + } + pub(super) fn merkle_root(&self) -> Result { match self { Self::Legacy(_) => Err(Error::InvalidShredType), diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 15f407172cfc4b..ac409376370420 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -41,6 +41,13 @@ impl ShredData { } } + pub(super) fn chained_merkle_root(&self) -> Result { + match self { + Self::Legacy(_) => Err(Error::InvalidShredType), + Self::Merkle(shred) => shred.chained_merkle_root(), + } + } + pub(super) fn merkle_root(&self) -> Result { match self { Self::Legacy(_) => Err(Error::InvalidShredType), diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 98dc5a4037bd05..6a8414a5cbd724 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -768,6 +768,10 @@ pub mod enable_gossip_duplicate_proof_ingestion { solana_sdk::declare_id!("FNKCMBzYUdjhHyPdsKG2LSmdzH8TCHXn3ytj8RNBS4nG"); } +pub mod chained_merkle_conflict_duplicate_proofs { + solana_sdk::declare_id!("chaie9S2zVfuxJKNRGkyTDokLwWxx6kD2ZLsqQHaDD8"); +} + pub mod enable_chained_merkle_shreds { solana_sdk::declare_id!("7uZBkJXJ1HkuP6R3MJfZs7mLwymBcDbKdqbF51ZWLier"); } @@ -965,6 +969,7 @@ lazy_static! { (enable_gossip_duplicate_proof_ingestion::id(), "enable gossip duplicate proof ingestion #32963"), (enable_chained_merkle_shreds::id(), "Enable chained Merkle shreds #34916"), (remove_rounding_in_fee_calculation::id(), "Removing unwanted rounding in fee calculation #34982"), + (chained_merkle_conflict_duplicate_proofs::id(), "generate duplicate proofs for chained merkle root conflicts #102"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()