Skip to content

Commit

Permalink
retransmits recovered shreds concurrently with blockstore insert (#4433)
Browse files Browse the repository at this point in the history
We can immediately channel recovered shreds to retransmit stage so that
inserting recovered shreds into blockstore happens concurrently with
retranmsit.

This is similar to how shreds received from turbine are concurrently
inserted into blockstore while also retransmitted:
https://github.com/anza-xyz/agave/blob/525ea0c27/turbine/src/sigverify_shreds.rs#L241-L242
  • Loading branch information
behzadnouri authored Jan 15, 2025
1 parent 9f7f34b commit fd1954d
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 106 deletions.
102 changes: 44 additions & 58 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,15 @@ pub struct SignatureInfosForAddress {
}

#[derive(Error, Debug)]
pub enum InsertDataShredError {
enum InsertDataShredError {
#[error("Data shred already exists in Blockstore")]
Exists,
#[error("Invalid data shred")]
InvalidShred,
#[error(transparent)]
BlockstoreError(#[from] BlockstoreError),
}

impl std::fmt::Display for InsertDataShredError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "insert data shred error")
}
}

#[derive(Eq, PartialEq, Debug, Clone)]
pub enum PossibleDuplicateShred {
Exists(Shred), // Blockstore has another shred in its spot
Expand Down Expand Up @@ -940,14 +937,14 @@ impl Blockstore {
metrics.insert_shreds_elapsed_us += start.as_us();
}

fn try_shred_recovery(
&self,
erasure_metas: &BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
index_working_set: &HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &HashMap<ShredId, Shred>,
leader_schedule_cache: &LeaderScheduleCache,
reed_solomon_cache: &ReedSolomonCache,
) -> Vec<Vec<Shred>> {
fn try_shred_recovery<'a>(
&'a self,
erasure_metas: &'a BTreeMap<ErasureSetId, WorkingEntry<ErasureMeta>>,
index_working_set: &'a HashMap<u64, IndexMetaWorkingSetEntry>,
prev_inserted_shreds: &'a HashMap<ShredId, Shred>,
leader_schedule_cache: &'a LeaderScheduleCache,
reed_solomon_cache: &'a ReedSolomonCache,
) -> impl Iterator<Item = Vec<Shred>> + 'a {
// Recovery rules:
// 1. Only try recovery around indexes for which new data or coding shreds are received
// 2. For new data shreds, check if an erasure set exists. If not, don't try recovery
Expand All @@ -970,11 +967,9 @@ impl Blockstore {
leader_schedule_cache,
reed_solomon_cache,
)
.ok()
})
.flatten()
})?
.ok()
})
.collect()
}

/// Attempts shred recovery and does the following for recovered data
Expand All @@ -993,59 +988,50 @@ impl Blockstore {
) {
let mut start = Measure::start("Shred recovery");
if let Some(leader_schedule_cache) = leader_schedule {
let recovered_shreds: Vec<_> = self
let mut recovered_shreds = Vec::new();
let recovered_data_shreds: Vec<_> = self
.try_shred_recovery(
&shred_insertion_tracker.erasure_metas,
&shred_insertion_tracker.index_working_set,
&shred_insertion_tracker.just_inserted_shreds,
leader_schedule_cache,
reed_solomon_cache,
)
.into_iter()
.flatten()
.filter_map(|shred| {
// Since the data shreds are fully recovered from the
// erasure batch, no need to store coding shreds in
// blockstore.
if shred.is_code() {
return Some(shred.into_payload());
}
metrics.num_recovered += 1;
let shred_payload = shred.payload().clone();
match self.check_insert_data_shred(
shred,
shred_insertion_tracker,
is_trusted,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => {
metrics.num_recovered_exists += 1;
None
}
Err(InsertDataShredError::InvalidShred) => {
metrics.num_recovered_failed_invalid += 1;
None
}
Err(InsertDataShredError::BlockstoreError(err)) => {
metrics.num_recovered_blockstore_error += 1;
error!("blockstore error: {}", err);
None
}
Ok(()) => {
metrics.num_recovered_inserted += 1;
Some(shred_payload)
}
}
.map(|mut shreds| {
// All shreds should be retransmitted, but because there
// are no more missing data shreds in the erasure batch,
// coding shreds are not stored in blockstore.
recovered_shreds
.extend(shred::drain_coding_shreds(&mut shreds).map(Shred::into_payload));
recovered_shreds.extend(shreds.iter().map(Shred::payload).cloned());
shreds
})
// Always collect recovered-shreds so that above insert code is
// executed even if retransmit-sender is None.
.collect();
if !recovered_shreds.is_empty() {
if let Some(retransmit_sender) = retransmit_sender {
let _ = retransmit_sender.send(recovered_shreds);
}
}
for shred in recovered_data_shreds.into_iter().flatten() {
metrics.num_recovered += 1;
*match self.check_insert_data_shred(
shred,
shred_insertion_tracker,
is_trusted,
leader_schedule,
ShredSource::Recovered,
) {
Err(InsertDataShredError::Exists) => &mut metrics.num_recovered_exists,
Err(InsertDataShredError::InvalidShred) => {
&mut metrics.num_recovered_failed_invalid
}
Err(InsertDataShredError::BlockstoreError(err)) => {
error!("blockstore error: {err}");
&mut metrics.num_recovered_blockstore_error
}
Ok(()) => &mut metrics.num_recovered_inserted,
} += 1;
}
}
start.stop();
metrics.shred_recovery_elapsed_us += start.as_us();
Expand Down
149 changes: 101 additions & 48 deletions ledger/src/shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use {
signature::{Keypair, Signature, Signer, SIGNATURE_BYTES},
},
static_assertions::const_assert_eq,
std::{fmt::Debug, time::Instant},
std::{fmt::Debug, time::Instant, vec::Drain},
thiserror::Error,
};
pub use {
Expand Down Expand Up @@ -513,9 +513,12 @@ impl Shred {
ShredType::from(self.common_header().shred_variant)
}

#[inline]
pub fn is_data(&self) -> bool {
self.shred_type() == ShredType::Data
}

#[inline]
pub fn is_code(&self) -> bool {
self.shred_type() == ShredType::Code
}
Expand Down Expand Up @@ -1296,6 +1299,28 @@ pub fn should_discard_shred(
false
}

// Drains coding shreds out of the vector of shreds.
// Note that the function does not preserve the order of either the retained or
// the drained shreds.
// TODO: Use Vec::extract_if instead once stable.
pub(crate) fn drain_coding_shreds(shreds: &mut Vec<Shred>) -> Drain<'_, Shred> {
let (mut i, mut j) = (0, shreds.len().saturating_sub(1));
loop {
while i < j && shreds[i].is_data() {
i += 1;
}
while i < j && shreds[j].is_code() {
j -= 1;
}
if i < j {
shreds.swap(i, j);
} else {
let offset = usize::from(shreds.get(i).map(Shred::is_data).unwrap_or_default());
return shreds.drain(i + offset..);
}
}
}

pub fn max_ticks_per_n_shreds(num_shreds: u64, shred_data_size: Option<usize>) -> u64 {
let ticks = create_ticks(1, 0, Hash::default());
max_entries_per_n_shred(&ticks[0], num_shreds, shred_data_size)
Expand Down Expand Up @@ -1353,7 +1378,7 @@ mod tests {
super::*,
assert_matches::assert_matches,
bincode::serialized_size,
rand::Rng,
rand::{seq::SliceRandom, Rng},
rand_chacha::{rand_core::SeedableRng, ChaChaRng},
rayon::ThreadPoolBuilder,
solana_sdk::{shred_version, signature::Signer, signer::keypair::keypair_from_seed},
Expand All @@ -1367,6 +1392,36 @@ mod tests {
bs58::decode(data).into_vec().unwrap()
}

fn make_merkle_shreds_for_tests<R: Rng>(
rng: &mut R,
slot: Slot,
data_size: usize,
chained: bool,
is_last_in_slot: bool,
) -> Result<Vec<Vec<merkle::Shred>>, Error> {
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen()));
let parent_offset = rng.gen_range(1..=u16::try_from(slot).unwrap_or(u16::MAX));
let parent_slot = slot.checked_sub(u64::from(parent_offset)).unwrap();
let mut data = vec![0u8; data_size];
rng.fill(&mut data[..]);
merkle::make_shreds_from_data(
&thread_pool,
&Keypair::new(),
chained_merkle_root,
&data[..],
slot,
parent_slot,
rng.gen(), // shred_version
rng.gen_range(1..64), // reference_tick
is_last_in_slot,
rng.gen_range(0..671), // next_shred_index
rng.gen_range(0..781), // next_code_index
&ReedSolomonCache::default(),
&mut ProcessShredsStats::default(),
)
}

#[test]
fn test_shred_constants() {
let common_header = ShredCommonHeader {
Expand Down Expand Up @@ -1482,37 +1537,22 @@ mod tests {
fn test_should_discard_shred(chained: bool, is_last_in_slot: bool) {
solana_logger::setup();
let mut rng = rand::thread_rng();
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let reed_solomon_cache = ReedSolomonCache::default();
let keypair = Keypair::new();
let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen()));
let slot = 18_291;
let parent_slot = rng.gen_range(1..slot);
let shred_version = rng.gen();
let reference_tick = rng.gen_range(1..64);
let next_shred_index = rng.gen_range(0..671);
let next_code_index = rng.gen_range(0..781);
let mut data = vec![0u8; 1200 * 5];
rng.fill(&mut data[..]);
let shreds = merkle::make_shreds_from_data(
&thread_pool,
&keypair,
chained_merkle_root,
&data[..],
let shreds = make_merkle_shreds_for_tests(
&mut rng,
slot,
parent_slot,
shred_version,
reference_tick,
1200 * 5, // data_size
chained,
is_last_in_slot,
next_shred_index,
next_code_index,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
)
.unwrap();
assert_eq!(shreds.len(), 1);
let shreds: Vec<_> = shreds.into_iter().flatten().map(Shred::from).collect();

assert_matches!(shreds[0].shred_type(), ShredType::Data);
let parent_slot = shreds[0].parent().unwrap();
let shred_version = shreds[0].common_header().version;

let root = rng.gen_range(0..parent_slot);
let max_slot = slot + rng.gen_range(1..65536);
let mut packet = Packet::default();
Expand Down Expand Up @@ -2179,32 +2219,13 @@ mod tests {
Shred::new_from_serialized_shred(shred).unwrap()
}
let mut rng = rand::thread_rng();
let thread_pool = ThreadPoolBuilder::new().num_threads(2).build().unwrap();
let reed_solomon_cache = ReedSolomonCache::default();
let keypair = Keypair::new();
let chained_merkle_root = chained.then(|| Hash::new_from_array(rng.gen()));
let slot = 285_376_049 + rng.gen_range(0..100_000);
let parent_slot = slot - rng.gen_range(1..=65535);
let shred_version = rng.gen();
let reference_tick = rng.gen_range(1..64);
let next_shred_index = rng.gen_range(0..671);
let next_code_index = rng.gen_range(0..781);
let mut data = vec![0u8; 1200 * 5];
rng.fill(&mut data[..]);
let shreds: Vec<_> = merkle::make_shreds_from_data(
&thread_pool,
&keypair,
chained_merkle_root,
&data[..],
let shreds: Vec<_> = make_merkle_shreds_for_tests(
&mut rng,
slot,
parent_slot,
shred_version,
reference_tick,
1200 * 5, // data_size
chained,
is_last_in_slot,
next_shred_index,
next_code_index,
&reed_solomon_cache,
&mut ProcessShredsStats::default(),
)
.unwrap()
.into_iter()
Expand Down Expand Up @@ -2250,4 +2271,36 @@ mod tests {
assert!(other.is_shred_duplicate(shred));
}
}

#[test]
fn test_drain_coding_shreds() {
let mut rng = rand::thread_rng();
let slot = 314_972_727 + rng.gen_range(0..100_000);
let (chained, is_last_in_slot) = rng.gen();
let mut shreds: Vec<_> = make_merkle_shreds_for_tests(
&mut rng,
slot,
20 * 1232, // data_size
chained,
is_last_in_slot,
)
.unwrap()
.into_iter()
.flatten()
.map(Shred::from)
.collect();
shreds.shuffle(&mut rng);
assert!(shreds.iter().filter(|shred| shred.is_data()).count() > 20);
assert!(shreds.iter().filter(|shred| shred.is_code()).count() > 20);
let num_shreds = shreds.len();
for offset in 0..num_shreds {
for size in 0..(num_shreds - offset) {
let mut shreds = Vec::from(&shreds[offset..offset + size]);
let coding_shreds: Vec<_> = drain_coding_shreds(&mut shreds).collect();
assert_eq!(shreds.len() + coding_shreds.len(), size);
assert!(shreds.iter().all(Shred::is_data));
assert!(coding_shreds.iter().all(Shred::is_code));
}
}
}
}

0 comments on commit fd1954d

Please sign in to comment.