Skip to content

Commit

Permalink
Replace new_with_dummy_keypair() with traits
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Aug 28, 2024
1 parent f505e4c commit 902dd5f
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 70 deletions.
1 change: 0 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ rustc_version = { workspace = true, optional = true }

[features]
dev-context-only-utils = [
"solana-gossip/dev-context-only-utils",
"solana-runtime/dev-context-only-utils",
]
frozen-abi = [
Expand Down
2 changes: 1 addition & 1 deletion core/benches/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use {
struct BenchSetup {
exit: Arc<AtomicBool>,
poh_service: PohService,
forwarder: Forwarder,
forwarder: Forwarder<Arc<ClusterInfo>>,
unprocessed_packet_batches: UnprocessedTransactionStorage,
tracker: LeaderSlotMetricsTracker,
stats: BankingStageStats,
Expand Down
41 changes: 35 additions & 6 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#![cfg(feature = "dev-context-only-utils")]
use {
crate::{
banking_stage::BankingStage,
banking_stage::{BankingStage, LikeClusterInfo},
banking_trace::{
BankingPacketBatch, BankingTracer, ChannelLabel, TimedTracedEvent, TracedEvent,
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT, BASENAME,
Expand All @@ -13,7 +13,10 @@ use {
itertools::Itertools,
log::*,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_gossip::{
cluster_info::{ClusterInfo, Node},
contact_info::ContactInfo,
},
solana_ledger::{
blockstore::{Blockstore, PurgeType},
leader_schedule_cache::LeaderScheduleCache,
Expand All @@ -30,7 +33,10 @@ use {
solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET},
genesis_config::GenesisConfig,
pubkey::Pubkey,
shred_version::compute_shred_version,
signature::Signer,
signer::keypair::Keypair,
},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::broadcast_stage::BroadcastStageType,
Expand Down Expand Up @@ -207,6 +213,24 @@ impl BankingTraceEvents {
}
}

#[derive(Clone)]
struct DummyClusterInfo {
id: Pubkey,
}

impl LikeClusterInfo for DummyClusterInfo {
fn id(&self) -> Pubkey {
self.id
}

fn lookup_contact_info<F, Y>(&self, _id: &Pubkey, _map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y,
{
None
}
}

impl BankingSimulator {
pub fn new(banking_trace_events: BankingTraceEvents, first_simulated_slot: Slot) -> Self {
Self {
Expand Down Expand Up @@ -318,10 +342,6 @@ impl BankingSimulator {
let (tpu_vote_sender, tpu_vote_receiver) = retracer.create_channel_tpu_vote();
let (gossip_vote_sender, gossip_vote_receiver) = retracer.create_channel_gossip_vote();

let cluster_info = Arc::new(ClusterInfo::new_with_dummy_keypair(
Node::new_localhost_with_pubkey(&simulated_leader).info,
SocketAddrSpace::Unspecified,
));
let connection_cache = Arc::new(ConnectionCache::new("connection_kache!"));
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let (retransmit_slots_sender, retransmit_slots_receiver) = unbounded();
Expand All @@ -330,6 +350,12 @@ impl BankingSimulator {
Some(&bank_forks.read().unwrap().root_bank().hard_forks()),
);
let (sender, _receiver) = tokio::sync::mpsc::channel(1);
let random_keypair = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
Node::new_localhost_with_pubkey(&random_keypair.pubkey()).info,
random_keypair,
SocketAddrSpace::Unspecified,
));
let broadcast_stage = BroadcastStageType::Standard.new_broadcast_stage(
vec![UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)).unwrap()],
cluster_info.clone(),
Expand All @@ -343,6 +369,9 @@ impl BankingSimulator {
);

info!("Start banking stage!...");
let cluster_info = DummyClusterInfo {
id: simulated_leader,
};
let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64));
let banking_stage = BankingStage::new_num_threads(
block_production_method.clone(),
Expand Down
46 changes: 34 additions & 12 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use {
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
histogram::Histogram,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::blockstore_processor::TransactionStatusSender,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
Expand All @@ -40,9 +40,10 @@ use {
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::timing::AtomicInterval,
solana_sdk::{pubkey::Pubkey, timing::AtomicInterval},
std::{
cmp, env,
ops::Deref,
sync::{
atomic::{AtomicU64, AtomicUsize, Ordering},
Arc, RwLock,
Expand Down Expand Up @@ -323,12 +324,33 @@ pub struct FilterForwardingResults {
pub(crate) total_filter_packets_us: u64,
}

pub trait LikeClusterInfo: Send + Sync + 'static + Clone {
fn id(&self) -> Pubkey;

fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y;
}

impl LikeClusterInfo for Arc<ClusterInfo> {
fn id(&self) -> Pubkey {
self.deref().id()
}

fn lookup_contact_info<F, Y>(&self, id: &Pubkey, map: F) -> Option<Y>
where
F: FnOnce(&ContactInfo) -> Y,
{
self.deref().lookup_contact_info(id, map)
}
}

impl BankingStage {
/// Create the stage using `bank`. Exit when `verified_receiver` is dropped.
#[allow(clippy::too_many_arguments)]
pub fn new(
block_production_method: BlockProductionMethod,
cluster_info: &Arc<ClusterInfo>,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
Expand Down Expand Up @@ -362,7 +384,7 @@ impl BankingStage {
#[allow(clippy::too_many_arguments)]
pub fn new_num_threads(
block_production_method: BlockProductionMethod,
cluster_info: &Arc<ClusterInfo>,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
Expand Down Expand Up @@ -413,7 +435,7 @@ impl BankingStage {

#[allow(clippy::too_many_arguments)]
pub fn new_thread_local_multi_iterator(
cluster_info: &Arc<ClusterInfo>,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
Expand Down Expand Up @@ -497,7 +519,7 @@ impl BankingStage {

#[allow(clippy::too_many_arguments)]
pub fn new_central_scheduler(
cluster_info: &Arc<ClusterInfo>,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
Expand Down Expand Up @@ -629,15 +651,15 @@ impl BankingStage {
Self { bank_thread_hdls }
}

fn spawn_thread_local_multi_iterator_thread(
fn spawn_thread_local_multi_iterator_thread<T: LikeClusterInfo>(
id: u32,
packet_receiver: BankingPacketReceiver,
bank_forks: Arc<RwLock<BankForks>>,
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
mut forwarder: Forwarder,
mut forwarder: Forwarder<T>,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
Expand All @@ -664,9 +686,9 @@ impl BankingStage {
}

#[allow(clippy::too_many_arguments)]
fn process_buffered_packets(
fn process_buffered_packets<T: LikeClusterInfo>(
decision_maker: &DecisionMaker,
forwarder: &mut Forwarder,
forwarder: &mut Forwarder<T>,
consumer: &Consumer,
unprocessed_transaction_storage: &mut UnprocessedTransactionStorage,
banking_stage_stats: &BankingStageStats,
Expand Down Expand Up @@ -730,10 +752,10 @@ impl BankingStage {
}
}

fn process_loop(
fn process_loop<T: LikeClusterInfo>(
packet_receiver: &mut PacketReceiver,
decision_maker: &DecisionMaker,
forwarder: &mut Forwarder,
forwarder: &mut Forwarder<T>,
consumer: &Consumer,
id: u32,
mut unprocessed_transaction_storage: UnprocessedTransactionStorage,
Expand Down
12 changes: 7 additions & 5 deletions core/src/banking_stage/forward_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use {
scheduler_messages::{FinishedForwardWork, ForwardWork},
ForwardOption,
},
crate::banking_stage::LikeClusterInfo,
crossbeam_channel::{Receiver, RecvError, SendError, Sender},
thiserror::Error,
};
Expand All @@ -16,19 +17,19 @@ pub enum ForwardWorkerError {
Send(#[from] SendError<FinishedForwardWork>),
}

pub(crate) struct ForwardWorker {
pub(crate) struct ForwardWorker<T: LikeClusterInfo> {
forward_receiver: Receiver<ForwardWork>,
forward_option: ForwardOption,
forwarder: Forwarder,
forwarder: Forwarder<T>,
forwarded_sender: Sender<FinishedForwardWork>,
}

#[allow(dead_code)]
impl ForwardWorker {
impl<T: LikeClusterInfo> ForwardWorker<T> {
pub fn new(
forward_receiver: Receiver<ForwardWork>,
forward_option: ForwardOption,
forwarder: Forwarder,
forwarder: Forwarder<T>,
forwarded_sender: Sender<FinishedForwardWork>,
) -> Self {
Self {
Expand Down Expand Up @@ -90,6 +91,7 @@ mod tests {
},
crossbeam_channel::unbounded,
solana_client::connection_cache::ConnectionCache,
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
blockstore::Blockstore, genesis_utils::GenesisConfigInfo,
get_tmp_ledger_path_auto_delete, leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -121,7 +123,7 @@ mod tests {
forwarded_receiver: Receiver<FinishedForwardWork>,
}

fn setup_test_frame() -> (TestFrame, ForwardWorker) {
fn setup_test_frame() -> (TestFrame, ForwardWorker<Arc<ClusterInfo>>) {
let GenesisConfigInfo {
genesis_config,
mint_keypair,
Expand Down
15 changes: 8 additions & 7 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ use {
ForwardOption,
},
crate::{
banking_stage::immutable_deserialized_packet::ImmutableDeserializedPacket,
banking_stage::{
immutable_deserialized_packet::ImmutableDeserializedPacket, LikeClusterInfo,
},
next_leader::{next_leader, next_leader_tpu_vote},
tracer_packet_stats::TracerPacketStats,
},
solana_client::connection_cache::ConnectionCache,
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_gossip::cluster_info::ClusterInfo,
solana_measure::measure_us,
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
Expand All @@ -29,21 +30,21 @@ use {
},
};

pub struct Forwarder {
pub struct Forwarder<T: LikeClusterInfo> {
poh_recorder: Arc<RwLock<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
socket: UdpSocket,
cluster_info: Arc<ClusterInfo>,
cluster_info: T,
connection_cache: Arc<ConnectionCache>,
data_budget: Arc<DataBudget>,
forward_packet_batches_by_accounts: ForwardPacketBatchesByAccounts,
}

impl Forwarder {
impl<T: LikeClusterInfo> Forwarder<T> {
pub fn new(
poh_recorder: Arc<RwLock<PohRecorder>>,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
cluster_info: T,
connection_cache: Arc<ConnectionCache>,
data_budget: Arc<DataBudget>,
) -> Self {
Expand Down Expand Up @@ -313,7 +314,7 @@ mod tests {
unprocessed_packet_batches::{DeserializedPacket, UnprocessedPacketBatches},
unprocessed_transaction_storage::ThreadType,
},
solana_gossip::cluster_info::Node,
solana_gossip::cluster_info::{ClusterInfo, Node},
solana_ledger::{blockstore::Blockstore, genesis_utils::GenesisConfigInfo},
solana_perf::packet::PacketFlags,
solana_poh::{poh_recorder::create_test_recorder, poh_service::PohService},
Expand Down
Loading

0 comments on commit 902dd5f

Please sign in to comment.