From fd1954df03a5fba8ed6b69d92a1e26f8e2378aa7 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Wed, 15 Jan 2025 18:16:38 +0000 Subject: [PATCH] retransmits recovered shreds concurrently with blockstore insert (#4433) We can immediately channel recovered shreds to retransmit stage so that inserting recovered shreds into blockstore happens concurrently with retranmsit. This is similar to how shreds received from turbine are concurrently inserted into blockstore while also retransmitted: https://github.com/anza-xyz/agave/blob/525ea0c27/turbine/src/sigverify_shreds.rs#L241-L242 --- ledger/src/blockstore.rs | 102 ++++++++++++--------------- ledger/src/shred.rs | 149 ++++++++++++++++++++++++++------------- 2 files changed, 145 insertions(+), 106 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 8ac06d33f3fe11..d8bfb21b5bb3e2 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -119,18 +119,15 @@ pub struct SignatureInfosForAddress { } #[derive(Error, Debug)] -pub enum InsertDataShredError { +enum InsertDataShredError { + #[error("Data shred already exists in Blockstore")] Exists, + #[error("Invalid data shred")] InvalidShred, + #[error(transparent)] BlockstoreError(#[from] BlockstoreError), } -impl std::fmt::Display for InsertDataShredError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "insert data shred error") - } -} - #[derive(Eq, PartialEq, Debug, Clone)] pub enum PossibleDuplicateShred { Exists(Shred), // Blockstore has another shred in its spot @@ -940,14 +937,14 @@ impl Blockstore { metrics.insert_shreds_elapsed_us += start.as_us(); } - fn try_shred_recovery( - &self, - erasure_metas: &BTreeMap>, - index_working_set: &HashMap, - prev_inserted_shreds: &HashMap, - leader_schedule_cache: &LeaderScheduleCache, - reed_solomon_cache: &ReedSolomonCache, - ) -> Vec> { + fn try_shred_recovery<'a>( + &'a self, + erasure_metas: &'a BTreeMap>, + index_working_set: &'a HashMap, + prev_inserted_shreds: &'a HashMap, + leader_schedule_cache: &'a LeaderScheduleCache, + reed_solomon_cache: &'a ReedSolomonCache, + ) -> impl Iterator> + 'a { // Recovery rules: // 1. Only try recovery around indexes for which new data or coding shreds are received // 2. For new data shreds, check if an erasure set exists. If not, don't try recovery @@ -970,11 +967,9 @@ impl Blockstore { leader_schedule_cache, reed_solomon_cache, ) - .ok() - }) - .flatten() + })? + .ok() }) - .collect() } /// Attempts shred recovery and does the following for recovered data @@ -993,7 +988,8 @@ impl Blockstore { ) { let mut start = Measure::start("Shred recovery"); if let Some(leader_schedule_cache) = leader_schedule { - let recovered_shreds: Vec<_> = self + let mut recovered_shreds = Vec::new(); + let recovered_data_shreds: Vec<_> = self .try_shred_recovery( &shred_insertion_tracker.erasure_metas, &shred_insertion_tracker.index_working_set, @@ -1001,51 +997,41 @@ impl Blockstore { leader_schedule_cache, reed_solomon_cache, ) - .into_iter() - .flatten() - .filter_map(|shred| { - // Since the data shreds are fully recovered from the - // erasure batch, no need to store coding shreds in - // blockstore. - if shred.is_code() { - return Some(shred.into_payload()); - } - metrics.num_recovered += 1; - let shred_payload = shred.payload().clone(); - match self.check_insert_data_shred( - shred, - shred_insertion_tracker, - is_trusted, - leader_schedule, - ShredSource::Recovered, - ) { - Err(InsertDataShredError::Exists) => { - metrics.num_recovered_exists += 1; - None - } - Err(InsertDataShredError::InvalidShred) => { - metrics.num_recovered_failed_invalid += 1; - None - } - Err(InsertDataShredError::BlockstoreError(err)) => { - metrics.num_recovered_blockstore_error += 1; - error!("blockstore error: {}", err); - None - } - Ok(()) => { - metrics.num_recovered_inserted += 1; - Some(shred_payload) - } - } + .map(|mut shreds| { + // All shreds should be retransmitted, but because there + // are no more missing data shreds in the erasure batch, + // coding shreds are not stored in blockstore. + recovered_shreds + .extend(shred::drain_coding_shreds(&mut shreds).map(Shred::into_payload)); + recovered_shreds.extend(shreds.iter().map(Shred::payload).cloned()); + shreds }) - // Always collect recovered-shreds so that above insert code is - // executed even if retransmit-sender is None. .collect(); if !recovered_shreds.is_empty() { if let Some(retransmit_sender) = retransmit_sender { let _ = retransmit_sender.send(recovered_shreds); } } + for shred in recovered_data_shreds.into_iter().flatten() { + metrics.num_recovered += 1; + *match self.check_insert_data_shred( + shred, + shred_insertion_tracker, + is_trusted, + leader_schedule, + ShredSource::Recovered, + ) { + Err(InsertDataShredError::Exists) => &mut metrics.num_recovered_exists, + Err(InsertDataShredError::InvalidShred) => { + &mut metrics.num_recovered_failed_invalid + } + Err(InsertDataShredError::BlockstoreError(err)) => { + error!("blockstore error: {err}"); + &mut metrics.num_recovered_blockstore_error + } + Ok(()) => &mut metrics.num_recovered_inserted, + } += 1; + } } start.stop(); metrics.shred_recovery_elapsed_us += start.as_us(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 84ab63ffcc6361..1c9f2902ba3fc4 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -69,7 +69,7 @@ use { signature::{Keypair, Signature, Signer, SIGNATURE_BYTES}, }, static_assertions::const_assert_eq, - std::{fmt::Debug, time::Instant}, + std::{fmt::Debug, time::Instant, vec::Drain}, thiserror::Error, }; pub use { @@ -513,9 +513,12 @@ impl Shred { ShredType::from(self.common_header().shred_variant) } + #[inline] pub fn is_data(&self) -> bool { self.shred_type() == ShredType::Data } + + #[inline] pub fn is_code(&self) -> bool { self.shred_type() == ShredType::Code } @@ -1296,6 +1299,28 @@ pub fn should_discard_shred( false } +// Drains coding shreds out of the vector of shreds. +// Note that the function does not preserve the order of either the retained or +// the drained shreds. +// TODO: Use Vec::extract_if instead once stable. +pub(crate) fn drain_coding_shreds(shreds: &mut Vec) -> Drain<'_, Shred> { + let (mut i, mut j) = (0, shreds.len().saturating_sub(1)); + loop { + while i < j && shreds[i].is_data() { + i += 1; + } + while i < j && shreds[j].is_code() { + j -= 1; + } + if i < j { + shreds.swap(i, j); + } else { + let offset = usize::from(shreds.get(i).map(Shred::is_data).unwrap_or_default()); + return shreds.drain(i + offset..); + } + } +} + pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option) -> u64 { let ticks = create_ticks(1, 0, Hash::default()); max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size) @@ -1353,7 +1378,7 @@ mod tests { super::*, assert_matches::assert_matches, bincode::serialized_size, - rand::Rng, + rand::{seq::SliceRandom, Rng}, rand_chacha::{rand_core::SeedableRng, ChaChaRng}, rayon::ThreadPoolBuilder, solana_sdk::{shred_version, signature::Signer, signer::keypair::keypair_from_seed}, @@ -1367,6 +1392,36 @@ mod tests { bs58::decode(data).into_vec().unwrap() } + fn make_merkle_shreds_for_tests( + rng: &mut R, + slot: Slot, + data_size: usize, + chained: bool, + is_last_in_slot: bool, + ) -> Result>, Error> { + let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); + let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen())); + let parent_offset = rng.gen_range(1..=u16::try_from(slot).unwrap_or(u16::MAX)); + let parent_slot = slot.checked_sub(u64::from(parent_offset)).unwrap(); + let mut data = vec![0u8; data_size]; + rng.fill(&mut data[..]); + merkle::make_shreds_from_data( + &thread_pool, + &Keypair::new(), + chained_merkle_root, + &data[..], + slot, + parent_slot, + rng.gen(), // shred_version + rng.gen_range(1..64), // reference_tick + is_last_in_slot, + rng.gen_range(0..671), // next_shred_index + rng.gen_range(0..781), // next_code_index + &ReedSolomonCache::default(), + &mut ProcessShredsStats::default(), + ) + } + #[test] fn test_shred_constants() { let common_header = ShredCommonHeader { @@ -1482,37 +1537,22 @@ mod tests { fn test_should_discard_shred(chained: bool, is_last_in_slot: bool) { solana_logger::setup(); let mut rng = rand::thread_rng(); - let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - let reed_solomon_cache = ReedSolomonCache::default(); - let keypair = Keypair::new(); - let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen())); let slot = 18_291; - let parent_slot = rng.gen_range(1..slot); - let shred_version = rng.gen(); - let reference_tick = rng.gen_range(1..64); - let next_shred_index = rng.gen_range(0..671); - let next_code_index = rng.gen_range(0..781); - let mut data = vec![0u8; 1200 * 5]; - rng.fill(&mut data[..]); - let shreds = merkle::make_shreds_from_data( - &thread_pool, - &keypair, - chained_merkle_root, - &data[..], + let shreds = make_merkle_shreds_for_tests( + &mut rng, slot, - parent_slot, - shred_version, - reference_tick, + 1200 * 5, // data_size + chained, is_last_in_slot, - next_shred_index, - next_code_index, - &reed_solomon_cache, - &mut ProcessShredsStats::default(), ) .unwrap(); assert_eq!(shreds.len(), 1); let shreds: Vec<_> = shreds.into_iter().flatten().map(Shred::from).collect(); + assert_matches!(shreds[0].shred_type(), ShredType::Data); + let parent_slot = shreds[0].parent().unwrap(); + let shred_version = shreds[0].common_header().version; + let root = rng.gen_range(0..parent_slot); let max_slot = slot + rng.gen_range(1..65536); let mut packet = Packet::default(); @@ -2179,32 +2219,13 @@ mod tests { Shred::new_from_serialized_shred(shred).unwrap() } let mut rng = rand::thread_rng(); - let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap(); - let reed_solomon_cache = ReedSolomonCache::default(); - let keypair = Keypair::new(); - let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen())); let slot = 285_376_049 + rng.gen_range(0..100_000); - let parent_slot = slot - rng.gen_range(1..=65535); - let shred_version = rng.gen(); - let reference_tick = rng.gen_range(1..64); - let next_shred_index = rng.gen_range(0..671); - let next_code_index = rng.gen_range(0..781); - let mut data = vec![0u8; 1200 * 5]; - rng.fill(&mut data[..]); - let shreds: Vec<_> = merkle::make_shreds_from_data( - &thread_pool, - &keypair, - chained_merkle_root, - &data[..], + let shreds: Vec<_> = make_merkle_shreds_for_tests( + &mut rng, slot, - parent_slot, - shred_version, - reference_tick, + 1200 * 5, // data_size + chained, is_last_in_slot, - next_shred_index, - next_code_index, - &reed_solomon_cache, - &mut ProcessShredsStats::default(), ) .unwrap() .into_iter() @@ -2250,4 +2271,36 @@ mod tests { assert!(other.is_shred_duplicate(shred)); } } + + #[test] + fn test_drain_coding_shreds() { + let mut rng = rand::thread_rng(); + let slot = 314_972_727 + rng.gen_range(0..100_000); + let (chained, is_last_in_slot) = rng.gen(); + let mut shreds: Vec<_> = make_merkle_shreds_for_tests( + &mut rng, + slot, + 20 * 1232, // data_size + chained, + is_last_in_slot, + ) + .unwrap() + .into_iter() + .flatten() + .map(Shred::from) + .collect(); + shreds.shuffle(&mut rng); + assert!(shreds.iter().filter(|shred| shred.is_data()).count() > 20); + assert!(shreds.iter().filter(|shred| shred.is_code()).count() > 20); + let num_shreds = shreds.len(); + for offset in 0..num_shreds { + for size in 0..(num_shreds - offset) { + let mut shreds = Vec::from(&shreds[offset..offset + size]); + let coding_shreds: Vec<_> = drain_coding_shreds(&mut shreds).collect(); + assert_eq!(shreds.len() + coding_shreds.len(), size); + assert!(shreds.iter().all(Shred::is_data)); + assert!(coding_shreds.iter().all(Shred::is_code)); + } + } + } }