Skip to content

Commit

Permalink
use tpu-client-next in send_transaction_service (#3515)
Browse files Browse the repository at this point in the history
* Add tpu-client-next to send_transaction_service

* rename with_option to new

* Update Cargo.lock
  • Loading branch information
KirillLykov authored Dec 3, 2024
1 parent 1c9cf0f commit 5c0f173
Show file tree
Hide file tree
Showing 17 changed files with 591 additions and 247 deletions.
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use {
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
std::{
io,
Expand Down Expand Up @@ -454,17 +455,16 @@ pub async fn start_tcp_server(
.map(move |chan| {
let (sender, receiver) = unbounded();

SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_addr,
&bank_forks,
None,
receiver,
connection_cache.clone(),
5_000,
None,
0,
exit.clone(),
);

SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone());

let server = BanksServer::new(
bank_forks.clone(),
block_commitment_cache.clone(),
Expand Down
27 changes: 26 additions & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 24 additions & 53 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
},
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_send_transaction_service::tpu_info::TpuInfo,
std::{
collections::HashMap,
Expand Down Expand Up @@ -50,7 +47,7 @@ impl TpuInfo for ClusterTpuInfo {
.collect();
}

fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
Expand All @@ -70,37 +67,23 @@ impl TpuInfo for ClusterTpuInfo {
unique_leaders
}

fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.rev()
.filter_map(|future_slot| {
NUM_CONSECUTIVE_LEADER_SLOTS
.checked_mul(future_slot)
.and_then(|slots_in_the_future| {
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
})
})
let leader_pubkeys: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
drop(recorder);
let addrs_to_slots = leaders
.into_iter()
.filter_map(|(leader_id, leader_slot)| {
leader_pubkeys
.iter()
.filter_map(|leader_pubkey| {
self.recent_peers
.get(&leader_id)
.map(|(udp_tpu, quic_tpu)| match protocol {
Protocol::UDP => (udp_tpu, leader_slot),
Protocol::QUIC => (quic_tpu, leader_slot),
.get(leader_pubkey)
.map(|addr| match protocol {
Protocol::UDP => &addr.0,
Protocol::QUIC => &addr.1,
})
})
.collect::<HashMap<_, _>>();
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
unique_leaders
.collect()
}
}

Expand Down Expand Up @@ -275,12 +258,12 @@ mod test {
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
leader_info.get_unique_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(1, Protocol::UDP),
vec![(&recent_peers.get(&first_leader).unwrap().0, 0)]
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);

let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -294,15 +277,12 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
leader_info.get_unique_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(2, Protocol::UDP),
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -317,26 +297,17 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(3, Protocol::UDP),
leader_info.get_unique_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
// Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here.
// This assumption is safe. After all, leader schedule generation must be deterministic.
assert_eq!(
leader_info.get_leader_tpus_with_slots(3, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
assert!(
leader_info
.get_leader_tpus_with_slots(x, Protocol::UDP)
.len()
<= recent_peers.len()
leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()
);
assert_eq!(
leader_info.get_leader_tpus(x, Protocol::UDP).len(),
x as usize
);
}
}
Expand Down
37 changes: 18 additions & 19 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ use {
solana_runtime::commitment::CommitmentSlots,
solana_send_transaction_service::{
send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
solana_streamer::socket::SocketAddrSpace,
};
Expand Down Expand Up @@ -379,16 +380,15 @@ impl JsonRpcRequestProcessor {
.tpu(connection_cache.protocol())
.unwrap();
let (sender, receiver) = unbounded();
SendTransactionService::new::<NullTpuInfo>(

let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
receiver,
connection_cache,
1000,
None,
1,
exit.clone(),
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
Expand Down Expand Up @@ -4386,7 +4386,9 @@ pub mod tests {
},
vote::state::VoteState,
},
solana_send_transaction_service::tpu_info::NullTpuInfo,
solana_send_transaction_service::{
tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient,
},
solana_transaction_status::{
EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta,
TransactionDetails,
Expand Down Expand Up @@ -6492,16 +6494,14 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
receiver,
connection_cache,
1000,
None,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let mut bad_transaction = system_transaction::transfer(
&mint_keypair,
Expand Down Expand Up @@ -6766,16 +6766,15 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
receiver,
connection_cache,
1000,
None,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

assert_eq!(
request_processor.get_block_commitment(0),
RpcBlockCommitment {
Expand Down
18 changes: 13 additions & 5 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use {
exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash,
native_token::lamports_to_sol,
},
solana_send_transaction_service::send_transaction_service::{self, SendTransactionService},
solana_send_transaction_service::{
send_transaction_service::{self, SendTransactionService},
transaction_client::ConnectionCacheClient,
},
solana_storage_bigtable::CredentialType,
std::{
net::SocketAddr,
Expand Down Expand Up @@ -474,15 +477,20 @@ impl JsonRpcService {

let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
let client = ConnectionCacheClient::new(
connection_cache,
tpu_address,
&bank_forks,
send_transaction_service_config.tpu_peers.clone(),
leader_info,
send_transaction_service_config.leader_forward_count,
);
let _send_transaction_service = SendTransactionService::new_with_config(
&bank_forks,
receiver,
connection_cache,
client,
send_transaction_service_config,
exit,
));
);

#[cfg(test)]
let test_request_processor = request_processor.clone();
Expand Down
9 changes: 8 additions & 1 deletion send-transaction-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,27 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
crossbeam-channel = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-quic-client = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-tpu-client = { workspace = true }
solana-tpu-client-next = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }

[dev-dependencies]
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }

[features]
dev-context-only-utils = []

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
3 changes: 3 additions & 0 deletions send-transaction-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ pub mod send_transaction_service_stats;
pub mod tpu_info;
pub mod transaction_client;

#[cfg(any(test, feature = "dev-context-only-utils"))]
pub mod test_utils;

#[macro_use]
extern crate solana_metrics;
Loading

0 comments on commit 5c0f173

Please sign in to comment.