Skip to content

Commit

Permalink
use tpu-client-next in validator
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Dec 3, 2024
1 parent 6b2a99c commit 9a2de64
Show file tree
Hide file tree
Showing 8 changed files with 273 additions and 132 deletions.
152 changes: 121 additions & 31 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use {
},
solana_rayon_threadlimit::{get_max_thread_count, get_thread_count},
solana_rpc::{
cluster_tpu_info::ClusterTpuInfo,
max_slots::MaxSlots,
optimistically_confirmed_bank_tracker::{
BankNotificationSenderConfig, OptimisticallyConfirmedBank,
Expand Down Expand Up @@ -120,11 +121,15 @@ use {
hard_forks::HardForks,
hash::Hash,
pubkey::Pubkey,
quic::NotifyKeyUpdate,
shred_version::compute_shred_version,
signature::{Keypair, Signer},
timing::timestamp,
},
solana_send_transaction_service::send_transaction_service,
solana_send_transaction_service::{
send_transaction_service,
transaction_client::{ConnectionCacheClient, TpuClientNextClient},
},
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_turbine::{self, broadcast_stage::BroadcastStageType},
solana_unified_scheduler_pool::DefaultSchedulerPool,
Expand All @@ -138,17 +143,29 @@ use {
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Arc, Mutex, RwLock,
Arc, LazyLock, Mutex, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
},
strum::VariantNames,
strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
thiserror::Error,
tokio::runtime::Runtime as TokioRuntime,
tokio::runtime::{self, Runtime as TokioRuntime},
};

static GLOBAL_RUNTIME: LazyLock<TokioRuntime> = LazyLock::new(|| {
runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime")
});

// Function to get a handle to the runtime
fn get_runtime_handle() -> tokio::runtime::Handle {
GLOBAL_RUNTIME.handle().clone()
}

const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
const WAIT_FOR_SUPERMAJORITY_THRESHOLD_PERCENT: u64 = 80;
// Right now since we reuse the wait for supermajority code, the
Expand Down Expand Up @@ -286,6 +303,7 @@ pub struct ValidatorConfig {
pub replay_transactions_threads: NonZeroUsize,
pub tvu_shred_sigverify_threads: NonZeroUsize,
pub delay_leader_block_for_pending_fork: bool,
pub use_tpu_client_next: bool,
}

impl Default for ValidatorConfig {
Expand Down Expand Up @@ -358,6 +376,7 @@ impl Default for ValidatorConfig {
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
tvu_shred_sigverify_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
delay_leader_block_for_pending_fork: false,
use_tpu_client_next: false,
}
}
}
Expand Down Expand Up @@ -989,6 +1008,9 @@ impl Validator {

let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));

// ConnectionCache might be used for JsonRpc and for Forwarding. Since
// the later is not migrated yet to the tpu-client-next, create
// ConnectionCache regardless of config.use_tpu_client_next for now.
let connection_cache = match use_quic {
true => {
let connection_cache = ConnectionCache::new_with_client_options(
Expand Down Expand Up @@ -1024,6 +1046,7 @@ impl Validator {
rpc_completed_slots_service,
optimistically_confirmed_bank_tracker,
bank_notification_sender,
client_updater,
) = if let Some((rpc_addr, rpc_pubsub_addr)) = config.rpc_addrs {
assert_eq!(
node.info
Expand All @@ -1042,31 +1065,91 @@ impl Validator {
None
};

let json_rpc_service = JsonRpcService::new(
rpc_addr,
config.rpc_config.clone(),
Some(config.snapshot_config.clone()),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
Some(poh_recorder.clone()),
genesis_config.hash(),
ledger_path,
config.validator_exit.clone(),
exit.clone(),
rpc_override_health_check.clone(),
startup_verification_complete,
optimistically_confirmed_bank.clone(),
config.send_transaction_service_config.clone(),
max_slots.clone(),
leader_schedule_cache.clone(),
connection_cache.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache.clone(),
)
.map_err(ValidatorError::Other)?;
let leader_info = ClusterTpuInfo::new(cluster_info.clone(), poh_recorder.clone());
let (json_rpc_service, client_updater) = if config.use_tpu_client_next {
let my_tpu_address = cluster_info
.my_contact_info()
.tpu(Protocol::QUIC)
.map_err(|err| ValidatorError::Other(format!("{err}")))?;

let client = TpuClientNextClient::new(
get_runtime_handle(),
my_tpu_address,
config.send_transaction_service_config.tpu_peers.clone(),
Some(leader_info),
config.send_transaction_service_config.leader_forward_count,
Some(identity_keypair.insecure_clone()),
);

let json_rpc_service = JsonRpcService::new(
rpc_addr,
config.rpc_config.clone(),
Some(config.snapshot_config.clone()),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_config.hash(),
ledger_path,
config.validator_exit.clone(),
exit.clone(),
rpc_override_health_check.clone(),
startup_verification_complete,
optimistically_confirmed_bank.clone(),
config.send_transaction_service_config.clone(),
max_slots.clone(),
leader_schedule_cache.clone(),
client.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache.clone(),
)
.map_err(ValidatorError::Other)?;
(
json_rpc_service,
Arc::new(client) as Arc<dyn NotifyKeyUpdate + Send + Sync>,
)
} else {
let my_tpu_address = cluster_info
.my_contact_info()
.tpu(connection_cache.protocol())
.map_err(|err| ValidatorError::Other(format!("{err}")))?;
let client = ConnectionCacheClient::new(
connection_cache.clone(),
my_tpu_address,
config.send_transaction_service_config.tpu_peers.clone(),
Some(leader_info),
config.send_transaction_service_config.leader_forward_count,
);
let json_rpc_service = JsonRpcService::new(
rpc_addr,
config.rpc_config.clone(),
Some(config.snapshot_config.clone()),
bank_forks.clone(),
block_commitment_cache.clone(),
blockstore.clone(),
cluster_info.clone(),
genesis_config.hash(),
ledger_path,
config.validator_exit.clone(),
exit.clone(),
rpc_override_health_check.clone(),
startup_verification_complete,
optimistically_confirmed_bank.clone(),
config.send_transaction_service_config.clone(),
max_slots.clone(),
leader_schedule_cache.clone(),
client.clone(),
max_complete_transaction_status_slot,
max_complete_rewards_slot,
prioritization_fee_cache.clone(),
)
.map_err(ValidatorError::Other)?;
(
json_rpc_service,
Arc::new(client) as Arc<dyn NotifyKeyUpdate + Send + Sync>,
)
};

let pubsub_service = if !config.rpc_config.full_api {
None
Expand Down Expand Up @@ -1142,9 +1225,10 @@ impl Validator {
rpc_completed_slots_service,
optimistically_confirmed_bank_tracker,
bank_notification_sender_config,
Some(client_updater),
)
} else {
(None, None, None, None, None, None, None)
(None, None, None, None, None, None, None, None)
};

if config.halt_at_slot.is_some() {
Expand Down Expand Up @@ -1414,7 +1498,8 @@ impl Validator {
config.wait_to_vote_slot,
accounts_background_request_sender.clone(),
config.runtime_config.log_messages_bytes_limit,
json_rpc_service.is_some().then_some(&connection_cache), // for the cache warmer only used for STS for RPC service
// for the cache warmer only used for STS for RPC service
(json_rpc_service.is_some() && config.use_tpu_client_next).then_some(&connection_cache),
&prioritization_fee_cache,
banking_tracer.clone(),
turbine_quic_endpoint_sender.clone(),
Expand Down Expand Up @@ -1507,7 +1592,12 @@ impl Validator {
);

*start_progress.write().unwrap() = ValidatorStartProgress::Running;
key_notifies.push(connection_cache);
if let Some(client_updater) = client_updater {
key_notifies.push(client_updater);
} else {
// add connection_cache because it is still used in Forwarder.
key_notifies.push(connection_cache);
}

*admin_rpc_service_post_init.write().unwrap() = Some(AdminRpcRequestMetadataPostInit {
bank_forks: bank_forks.clone(),
Expand Down
1 change: 1 addition & 0 deletions local-cluster/src/validator_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
replay_transactions_threads: config.replay_transactions_threads,
tvu_shred_sigverify_threads: config.tvu_shred_sigverify_threads,
delay_leader_block_for_pending_fork: config.delay_leader_block_for_pending_fork,
use_tpu_client_next: config.use_tpu_client_next,
}
}

Expand Down
1 change: 1 addition & 0 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }
solana-runtime-transaction = { workspace = true, features = [
"dev-context-only-utils",
] }
solana-send-transaction-service = { workspace = true, features = ["dev-context-only-utils"] }
solana-stake-program = { workspace = true }
spl-pod = { workspace = true }
symlink = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#![allow(clippy::arithmetic_side_effects)]
mod cluster_tpu_info;
pub mod cluster_tpu_info;
pub mod filter;
pub mod max_slots;
pub mod optimistically_confirmed_bank_tracker;
Expand Down
Loading

0 comments on commit 9a2de64

Please sign in to comment.