Skip to content

Commit

Permalink
wip. something weird going on with connectioncache imports
Browse files Browse the repository at this point in the history
  • Loading branch information
gregcusack committed Mar 18, 2024
1 parent 46ab465 commit f9e2e69
Show file tree
Hide file tree
Showing 14 changed files with 882 additions and 9,124 deletions.
1,729 changes: 839 additions & 890 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions bench-tps/src/bench_tps_client/tpu_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use {
message::Message, pubkey::Pubkey, signature::Signature, slot_history::Slot,
transaction::Transaction,
},
// crate::solana_tpu_client::tpu_client,
// solana_client::tpu_client::TpuClient,
solana_tpu_client::tpu_client::TpuClient,
solana_transaction_status::UiConfirmedBlock,
};
Expand Down
19 changes: 8 additions & 11 deletions bench-tps/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@ use {
keypairs::get_keypairs,
send_batch::{generate_durable_nonce_accounts, generate_keypairs},
},
solana_client::{
connection_cache::ConnectionCache,
// tpu_client::{TpuClient, TpuClientConfig},
},
solana_client::connection_cache::ConnectionCacheWrapper,
solana_genesis::Base64Account,
solana_rpc_client::rpc_client::RpcClient,
solana_sdk::{
Expand Down Expand Up @@ -80,15 +77,15 @@ fn create_connection_cache(
bind_address: IpAddr,
client_node_id: Option<&Keypair>,
commitment_config: CommitmentConfig,
) -> ConnectionCache {
) -> ConnectionCacheWrapper {
if !use_quic {
return ConnectionCache::with_udp(
return ConnectionCacheWrapper::with_udp(
"bench-tps-connection_cache_udp",
tpu_connection_pool_size,
);
}
if client_node_id.is_none() {
return ConnectionCache::new_quic(
return ConnectionCacheWrapper::new_quic(
"bench-tps-connection_cache_quic",
tpu_connection_pool_size,
);
Expand All @@ -111,7 +108,7 @@ fn create_connection_cache(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));
ConnectionCache::new_with_client_options(
ConnectionCacheWrapper::new_with_client_options(
"bench-tps-connection_cache_quic",
tpu_connection_pool_size,
None,
Expand All @@ -125,7 +122,7 @@ fn create_client(
external_client_type: &ExternalClientType,
json_rpc_url: &str,
websocket_url: &str,
connection_cache: ConnectionCache,
connection_cache: ConnectionCacheWrapper,
commitment_config: CommitmentConfig,
) -> Arc<dyn BenchTpsClient + Send + Sync> {
match external_client_type {
Expand All @@ -139,7 +136,7 @@ fn create_client(
commitment_config,
));
match connection_cache {
ConnectionCache::Udp(cache) => Arc::new(
ConnectionCacheWrapper::Udp(cache) => Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
Expand All @@ -151,7 +148,7 @@ fn create_client(
exit(1);
}),
),
ConnectionCache::Quic(cache) => Arc::new(
ConnectionCacheWrapper::Quic(cache) => Arc::new(
TpuClient::new_with_connection_cache(
rpc_client,
websocket_url,
Expand Down
14 changes: 7 additions & 7 deletions client/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const DEFAULT_CONNECTION_CACHE_USE_QUIC: bool = true;
/// A thin wrapper over connection-cache/ConnectionCache to ease
/// construction of the ConnectionCache for code dealing both with udp and quic.
/// For the scenario only using udp or quic, use connection-cache/ConnectionCache directly.
pub enum ConnectionCache {
pub enum ConnectionCacheWrapper {
Quic(Arc<BackendConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>>),
Udp(Arc<BackendConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>>),
}
Expand All @@ -46,7 +46,7 @@ pub enum NonblockingClientConnection {
Udp(Arc<<UdpBaseClientConnection as BaseClientConnection>::NonblockingClientConnection>),
}

impl NotifyKeyUpdate for ConnectionCache {
impl NotifyKeyUpdate for ConnectionCacheWrapper {
fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
match self {
Self::Udp(_) => Ok(()),
Expand All @@ -55,19 +55,19 @@ impl NotifyKeyUpdate for ConnectionCache {
}
}

impl ConnectionCache {
impl ConnectionCacheWrapper {
pub fn new(name: &'static str) -> Self {
if DEFAULT_CONNECTION_CACHE_USE_QUIC {
let cert_info = (&Keypair::new(), IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)));
ConnectionCache::new_with_client_options(
ConnectionCacheWrapper::new_with_client_options(
name,
DEFAULT_CONNECTION_POOL_SIZE,
None, // client_endpoint
Some(cert_info),
None, // stake_info
)
} else {
ConnectionCache::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE)
ConnectionCacheWrapper::with_udp(name, DEFAULT_CONNECTION_POOL_SIZE)
}
}

Expand Down Expand Up @@ -223,7 +223,7 @@ impl solana_connection_cache::nonblocking::client_connection::ClientConnection
mod tests {
use {
super::*,
crate::connection_cache::ConnectionCache,
crate::connection_cache::ConnectionCacheWrapper,
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
Expand Down Expand Up @@ -275,7 +275,7 @@ mod tests {
)
.unwrap();

let connection_cache = ConnectionCache::new_with_client_options(
let connection_cache = ConnectionCacheWrapper::new_with_client_options(
"connection_cache_test",
1, // connection_pool_size
Some(response_recv_endpoint), // client_endpoint
Expand Down
3 changes: 2 additions & 1 deletion client/src/nonblocking/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub use solana_tpu_client::nonblocking::tpu_client::{LeaderTpuService, TpuSenderError};
use {
crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
// crate::{connection_cache::ConnectionCache, tpu_client::TpuClientConfig},
crate::tpu_client::TpuClientConfig,
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
NewConnectionConfig,
Expand Down
3 changes: 2 additions & 1 deletion client/src/thin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
//! unstable and may change in future releases.
#[allow(deprecated)]
use {
crate::connection_cache::{dispatch, ConnectionCache},
// crate::connection_cache::{dispatch, ConnectionCache},
crate::connection_cache::dispatch,
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc_client::rpc_client::RpcClient,
solana_rpc_client_api::config::RpcProgramAccountsConfig,
Expand Down
8 changes: 1 addition & 7 deletions client/src/tpu_client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use {
crate::connection_cache::ConnectionCache,
// crate::connection_cache::ConnectionCache,
solana_connection_cache::connection_cache::{
ConnectionCache as BackendConnectionCache, ConnectionManager, ConnectionPool,
NewConnectionConfig,
Expand All @@ -13,7 +13,6 @@ use {
transport::Result as TransportResult,
},
solana_tpu_client::tpu_client::{Result, TpuClient as BackendTpuClient},
solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
std::sync::Arc,
};
pub use {
Expand All @@ -23,11 +22,6 @@ pub use {

pub type QuicTpuClient = TpuClient<QuicPool, QuicConnectionManager, QuicConfig>;

pub enum TpuClientWrapper {
Quic(TpuClient<QuicPool, QuicConnectionManager, QuicConfig>),
Udp(TpuClient<UdpPool, UdpConnectionManager, UdpConfig>),
}

/// Client which sends transactions directly to the current leader's TPU port over UDP.
/// The client uses RPC to determine the current leader and fetch node contact info
/// This is just a thin wrapper over the "BackendTpuClient", use that directly for more efficiency.
Expand Down
7 changes: 0 additions & 7 deletions connection-cache/src/connection_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use {
sync::{atomic::Ordering, Arc, RwLock},
thread::{Builder, JoinHandle},
},
// solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
// solana_udp_client::{UdpConfig, UdpConnectionManager, UdpPool},
thiserror::Error,
};

Expand All @@ -32,11 +30,6 @@ pub enum Protocol {
QUIC,
}

// pub enum ConnectionCacheWrapper {
// Quic(Arc<ConnectionCache<QuicPool, QuicConnectionManager, QuicConfig>>),
// Udp(Arc<ConnectionCache<UdpPool, UdpConnectionManager, UdpConfig>>),
// }`

pub trait ConnectionManager: Send + Sync + 'static {
type ConnectionPool: ConnectionPool;
type NewConnectionConfig: NewConnectionConfig;
Expand Down
2 changes: 2 additions & 0 deletions dos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ rand = { workspace = true }
serde = { workspace = true }
solana-bench-tps = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-core = { workspace = true }
solana-faucet = { workspace = true }
solana-gossip = { workspace = true }
Expand All @@ -32,6 +33,7 @@ solana-rpc-client = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-tpu-client = { workspace = true }
solana-udp-client = { workspace = true }
solana-version = { workspace = true }

[package.metadata.docs.rs]
Expand Down
18 changes: 9 additions & 9 deletions dos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ use {
log::*,
rand::{thread_rng, Rng},
solana_bench_tps::{bench::generate_and_fund_keypairs, bench_tps_client::BenchTpsClient},
solana_client::{
connection_cache::ConnectionCache, tpu_client::TpuClientWrapper,
tpu_connection::TpuConnection,
},
solana_client::{connection_cache::ConnectionCache, tpu_connection::TpuConnection},
solana_core::repair::serve_repair::{RepairProtocol, RepairRequestHeader, ServeRepair},
solana_dos::cli::*,
solana_gossip::{
Expand All @@ -72,7 +69,7 @@ use {
transaction::Transaction,
},
solana_streamer::socket::SocketAddrSpace,
solana_tpu_client::tpu_client::DEFAULT_TPU_CONNECTION_POOL_SIZE,
solana_tpu_client::tpu_client::{TpuClientWrapper, DEFAULT_TPU_CONNECTION_POOL_SIZE},
std::{
net::{SocketAddr, UdpSocket},
process::exit,
Expand Down Expand Up @@ -264,9 +261,10 @@ fn create_sender_thread(
"connection_cache_dos_quic",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
false => {
ConnectionCache::with_udp("connection_cache_dos_udp", DEFAULT_TPU_CONNECTION_POOL_SIZE)
}
false => ConnectionCache::with_udp(
"connection_cache_dos_udp",
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};
let connection = connection_cache.get_connection(target);

Expand Down Expand Up @@ -795,6 +793,7 @@ fn main() {
DEFAULT_TPU_CONNECTION_POOL_SIZE,
),
};

let client = get_client(&validators, Arc::new(connection_cache));
(gossip_nodes, Some(client))
} else {
Expand All @@ -818,7 +817,6 @@ fn main() {
pub mod test {
use {
super::*,
solana_client::tpu_client::{QuicTpuClient, TpuClient},
solana_core::validator::ValidatorConfig,
solana_faucet::faucet::run_local_faucet,
solana_gossip::contact_info::LegacyContactInfo,
Expand All @@ -827,8 +825,10 @@ pub mod test {
local_cluster::{ClusterConfig, LocalCluster},
validator_configs::make_identical_validator_configs,
},
solana_quic_client::{QuicConfig, QuicConnectionManager, QuicPool},
solana_rpc::rpc::JsonRpcConfig,
solana_sdk::timing::timestamp,
solana_tpu_client::tpu_client::TpuClient,
};

const TEST_SEND_BATCH_SIZE: usize = 1;
Expand Down
13 changes: 5 additions & 8 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,7 @@ use {
crate::{cluster_info::ClusterInfo, legacy_contact_info::LegacyContactInfo as ContactInfo},
crossbeam_channel::{unbounded, Sender},
rand::{thread_rng, Rng},
solana_client::{
connection_cache::ConnectionCache,
rpc_client::RpcClient,
tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper},
},
solana_client::{connection_cache::ConnectionCacheWrapper, rpc_client::RpcClient},
solana_perf::recycler::Recycler,
solana_runtime::bank_forks::BankForks,
solana_sdk::{
Expand All @@ -19,6 +15,7 @@ use {
socket::SocketAddrSpace,
streamer::{self, StreamerReceiveStats},
},
solana_tpu_client::tpu_client::{TpuClient, TpuClientConfig, TpuClientWrapper},
std::{
collections::HashSet,
net::{SocketAddr, TcpListener, UdpSocket},
Expand Down Expand Up @@ -200,15 +197,15 @@ pub fn discover(
/// Creates a TpuClient by selecting a valid node at random
pub fn get_client(
nodes: &[ContactInfo],
connection_cache: Arc<ConnectionCache>,
connection_cache: Arc<ConnectionCacheWrapper>,
) -> TpuClientWrapper {
let select = thread_rng().gen_range(0..nodes.len());

let rpc_pubsub_url = format!("ws://{}/", nodes[select].rpc_pubsub().unwrap());
let rpc_url = format!("http://{}", nodes[select].rpc().unwrap());

match &*connection_cache {
ConnectionCache::Quic(cache) => TpuClientWrapper::Quic(
ConnectionCacheWrapper::Quic(cache) => TpuClientWrapper::Quic(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
Expand All @@ -219,7 +216,7 @@ pub fn get_client(
panic!("Could not create TpuClient with Quic Cache {err:?}");
}),
),
ConnectionCache::Udp(cache) => TpuClientWrapper::Udp(
ConnectionCacheWrapper::Udp(cache) => TpuClientWrapper::Udp(
TpuClient::new_with_connection_cache(
Arc::new(RpcClient::new(rpc_url)),
rpc_pubsub_url.as_str(),
Expand Down
12 changes: 6 additions & 6 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use {
log::*,
solana_accounts_db::utils::create_accounts_run_and_snapshot_dirs,
solana_client::{
connection_cache::ConnectionCache, rpc_client::RpcClient, thin_client::ThinClient,
connection_cache::ConnectionCacheWrapper, rpc_client::RpcClient, thin_client::ThinClient,
},
solana_core::{
consensus::tower_storage::FileTowerStorage,
Expand Down Expand Up @@ -145,7 +145,7 @@ pub struct LocalCluster {
pub entry_point_info: ContactInfo,
pub validators: HashMap<Pubkey, ClusterValidatorInfo>,
pub genesis_config: GenesisConfig,
pub connection_cache: Arc<ConnectionCache>,
pub connection_cache: Arc<ConnectionCacheWrapper>,
}

impl LocalCluster {
Expand Down Expand Up @@ -322,11 +322,11 @@ impl LocalCluster {
validators,
genesis_config,
connection_cache: match config.tpu_use_quic {
true => Arc::new(ConnectionCache::new_quic(
true => Arc::new(ConnectionCacheWrapper::new_quic(
"connection_cache_local_cluster_quic",
config.tpu_connection_pool_size,
)),
false => Arc::new(ConnectionCache::with_udp(
false => Arc::new(ConnectionCacheWrapper::with_udp(
"connection_cache_local_cluster_udp",
config.tpu_connection_pool_size,
)),
Expand Down Expand Up @@ -814,8 +814,8 @@ impl LocalCluster {
let rpc_url = format!("http://{}", self.entry_point_info.rpc().unwrap());

let cache = match &*self.connection_cache {
ConnectionCache::Quic(cache) => cache,
ConnectionCache::Udp(_) => {
ConnectionCacheWrapper::Quic(cache) => cache,
ConnectionCacheWrapper::Udp(_) => {
return Err(Error::new(
ErrorKind::Other,
"Expected a Quic ConnectionCache. Got UDP",
Expand Down
Loading

0 comments on commit f9e2e69

Please sign in to comment.