From e151482829de2e0f43b143e6b629816c720361d3 Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 19 Dec 2024 19:50:57 -0600 Subject: [PATCH] dedups shreds by common-header instead of the entire payload Shreds in the retransmit stage: * don't have repair nonce (repaired shreds are not retransmitted). * are already resigned by this node as the retransmitter. * have their leader's signature verified. Therefore in order to dedup shreds, it suffices to compare: (signature, slot, shred-index, shred-type) Because ShredCommonHeader already includes all of the above tuple, the rest of the payload can be skipped. --- Cargo.lock | 1 + ledger/src/shred.rs | 5 +++++ turbine/Cargo.toml | 1 + turbine/src/retransmit_stage.rs | 37 ++++++++++++++++++++++++++++----- 4 files changed, 39 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index faef96ad0f1d03..e704175cb6e19f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9804,6 +9804,7 @@ version = "2.2.0" dependencies = [ "assert_matches", "bincode", + "bs58", "bytes", "crossbeam-channel", "futures 0.3.31", diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index c9039091813575..7b721c9407483f 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -638,6 +638,11 @@ pub mod layout { packet.buffer_mut().get_mut(..size) } + #[inline] + pub fn get_common_header_bytes(shred: &[u8]) -> Option<&[u8]> { + shred.get(..SIZE_OF_COMMON_SHRED_HEADER) + } + pub(crate) fn get_signature(shred: &[u8]) -> Option { shred .get(..SIZE_OF_SIGNATURE) diff --git a/turbine/Cargo.toml b/turbine/Cargo.toml index 4d6ce063a3e79e..798d39062d2425 100644 --- a/turbine/Cargo.toml +++ b/turbine/Cargo.toml @@ -47,6 +47,7 @@ tokio = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +bs58 = { workspace = true } solana-logger = { workspace = true } solana-runtime = { workspace = true, features = ["dev-context-only-utils"] } test-case = { workspace = true } diff --git a/turbine/src/retransmit_stage.rs b/turbine/src/retransmit_stage.rs index db9d1f96662a16..5aa42f6e428c32 100644 --- a/turbine/src/retransmit_stage.rs +++ b/turbine/src/retransmit_stage.rs @@ -165,10 +165,22 @@ impl ShredDeduper { .maybe_reset(rng, false_positive_rate, reset_cycle); } + // Returns true if the shred is duplicate and should be discarded. + #[must_use] fn dedup(&self, key: ShredId, shred: &[u8], max_duplicate_count: usize) -> bool { + // Shreds in the retransmit stage: + // * don't have repair nonce (repaired shreds are not retransmitted). + // * are already resigned by this node as the retransmitter. + // * have their leader's signature verified. + // Therefore in order to dedup shreds, it suffices to compare: + // (signature, slot, shred-index, shred-type) + // Because ShredCommonHeader already includes all of the above tuple, + // the rest of the payload can be skipped. // In order to detect duplicate blocks across cluster, we retransmit // max_duplicate_count different shreds for each ShredId. - self.deduper.dedup(shred) + shred::layout::get_common_header_bytes(shred) + .map(|header| self.deduper.dedup(header)) + .unwrap_or(true) || (0..max_duplicate_count).all(|i| self.shred_id_filter.dedup(&(key, i))) } } @@ -630,14 +642,27 @@ mod tests { rand::SeedableRng, rand_chacha::ChaChaRng, solana_ledger::shred::{Shred, ShredFlags}, + solana_sdk::signature::Keypair, }; + fn get_keypair() -> Keypair { + const KEYPAIR: &str = "Fcc2HUvRC7Dv4GgehTziAremzRvwDw5miYu8Ahuu1rsGjA\ + 5eCn55pXiSkEPcuqviV41rJxrFpZDmHmQkZWfoYYS"; + bs58::decode(KEYPAIR) + .into_vec() + .as_deref() + .map(Keypair::from_bytes) + .unwrap() + .unwrap() + } + #[test] fn test_already_received() { let slot = 1; let index = 5; let version = 0x40; - let shred = Shred::new_from_data( + let keypair = get_keypair(); + let mut shred = Shred::new_from_data( slot, index, 0, @@ -647,14 +672,14 @@ mod tests { version, 0, ); + shred.sign(&keypair); let mut rng = ChaChaRng::from_seed([0xa5; 32]); let shred_deduper = ShredDeduper::<2>::new(&mut rng, /*num_bits:*/ 640_007); // unique shred for (1, 5) should pass assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); // duplicate shred for (1, 5) blocked assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); - - let shred = Shred::new_from_data( + let mut shred = Shred::new_from_data( slot, index, 2, @@ -664,12 +689,13 @@ mod tests { version, 0, ); + shred.sign(&keypair); // first duplicate shred for (1, 5) passed assert!(!shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); // then blocked assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); - let shred = Shred::new_from_data( + let mut shred = Shred::new_from_data( slot, index, 8, @@ -679,6 +705,7 @@ mod tests { version, 0, ); + shred.sign(&keypair); // 2nd duplicate shred for (1, 5) blocked assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT)); assert!(shred_deduper.dedup(shred.id(), shred.payload(), MAX_DUPLICATE_COUNT));