Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use tpu-client-next in send_transaction_service #3515

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions banks-server/src/banks_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use {
solana_send_transaction_service::{
send_transaction_service::{SendTransactionService, TransactionInfo},
tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
std::{
io,
Expand Down Expand Up @@ -454,17 +455,16 @@ pub async fn start_tcp_server(
.map(move |chan| {
let (sender, receiver) = unbounded();

SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_addr,
&bank_forks,
None,
receiver,
connection_cache.clone(),
5_000,
None,
0,
exit.clone(),
);

SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone());
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of scope of this PR, but no idea who usese this bank_server

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nobody and it should be removed from agave


let server = BanksServer::new(
bank_forks.clone(),
block_commitment_cache.clone(),
Expand Down
27 changes: 26 additions & 1 deletion programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

77 changes: 24 additions & 53 deletions rpc/src/cluster_tpu_info.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use {
solana_gossip::{cluster_info::ClusterInfo, contact_info::Protocol},
solana_poh::poh_recorder::PohRecorder,
solana_sdk::{
clock::{Slot, NUM_CONSECUTIVE_LEADER_SLOTS},
pubkey::Pubkey,
},
solana_sdk::{clock::NUM_CONSECUTIVE_LEADER_SLOTS, pubkey::Pubkey},
solana_send_transaction_service::tpu_info::TpuInfo,
std::{
collections::HashMap,
Expand Down Expand Up @@ -50,7 +47,7 @@ impl TpuInfo for ClusterTpuInfo {
.collect();
}

fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
Expand All @@ -70,37 +67,23 @@ impl TpuInfo for ClusterTpuInfo {
unique_leaders
}

fn get_leader_tpus_with_slots(
&self,
max_count: u64,
protocol: Protocol,
) -> Vec<(&SocketAddr, Slot)> {
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr> {
let recorder = self.poh_recorder.read().unwrap();
let leaders: Vec<_> = (0..max_count)
.rev()
.filter_map(|future_slot| {
NUM_CONSECUTIVE_LEADER_SLOTS
.checked_mul(future_slot)
.and_then(|slots_in_the_future| {
recorder.leader_and_slot_after_n_slots(slots_in_the_future)
})
})
let leader_pubkeys: Vec<_> = (0..max_count)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I allocate here vector and filter it later because I'm not sure if filtering using hash map takes too long to be inside critical section or not. If it seems that better filter than allocate, I will fix.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called in a tight loop with the poh read lock held. Looking at
recorder.leader_after_n_slots() it looks like it would be better to get the
current tick height from poh (requires read lock), then use the leader schedule
cache independently from poh (Arc::clone it), release the poh lock and just
query the cache.

Pls do this in a followup

Copy link
Author

@KirillLykov KirillLykov Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this max_count should not be large (at least I don't understand why someone can use large value for it), I don't think that reducing the scope of lock will change much.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change can be found in https://github.com/KirillLykov/solana/blob/klykov/fix-get-leader-tpus/rpc/src/cluster_tpu_info.rs#L50

But it required adding some minor modifications to the poh_recorder, not sure if it is a good idea to add to PR that is backported for that reason.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alessandrod do you think this change is necessary to be in a PR that will be backported or a PR that is not necessary to be backported? I think that this change has minimal influence on the performance because we always use small max_count.
Another consideration, is that I'm not sure if this fix is enough -- it looks like Arc<RwLock<PohRecorder>> is used in many places and locks too much data unnecessarily. So there is a design problem that should be tackled.

.filter_map(|i| recorder.leader_after_n_slots(i * NUM_CONSECUTIVE_LEADER_SLOTS))
.collect();
drop(recorder);
let addrs_to_slots = leaders
.into_iter()
.filter_map(|(leader_id, leader_slot)| {
leader_pubkeys
.iter()
.filter_map(|leader_pubkey| {
self.recent_peers
.get(&leader_id)
.map(|(udp_tpu, quic_tpu)| match protocol {
Protocol::UDP => (udp_tpu, leader_slot),
Protocol::QUIC => (quic_tpu, leader_slot),
.get(leader_pubkey)
.map(|addr| match protocol {
Protocol::UDP => &addr.0,
Protocol::QUIC => &addr.1,
})
})
.collect::<HashMap<_, _>>();
let mut unique_leaders = Vec::from_iter(addrs_to_slots);
unique_leaders.sort_by_key(|(_addr, slot)| *slot);
unique_leaders
.collect()
}
}

Expand Down Expand Up @@ -275,12 +258,12 @@ mod test {
let first_leader =
solana_ledger::leader_schedule_utils::slot_leader_at(slot, &bank).unwrap();
assert_eq!(
leader_info.get_leader_tpus(1, Protocol::UDP),
leader_info.get_unique_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(1, Protocol::UDP),
vec![(&recent_peers.get(&first_leader).unwrap().0, 0)]
leader_info.get_leader_tpus(1, Protocol::UDP),
vec![&recent_peers.get(&first_leader).unwrap().0]
);

let second_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -294,15 +277,12 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(2, Protocol::UDP),
leader_info.get_unique_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
);
assert_eq!(
leader_info.get_leader_tpus_with_slots(2, Protocol::UDP),
leader_info.get_leader_tpus(2, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

let third_leader = solana_ledger::leader_schedule_utils::slot_leader_at(
Expand All @@ -317,26 +297,17 @@ mod test {
];
expected_leader_sockets.dedup();
assert_eq!(
leader_info.get_leader_tpus(3, Protocol::UDP),
leader_info.get_unique_leader_tpus(3, Protocol::UDP),
expected_leader_sockets
);
// Only 2 leader tpus are returned always... so [0, 4, 8] isn't right here.
// This assumption is safe. After all, leader schedule generation must be deterministic.
assert_eq!(
leader_info.get_leader_tpus_with_slots(3, Protocol::UDP),
expected_leader_sockets
.into_iter()
.zip([0, 4])
.collect::<Vec<_>>()
);

for x in 4..8 {
assert!(leader_info.get_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len());
assert!(
leader_info
.get_leader_tpus_with_slots(x, Protocol::UDP)
.len()
<= recent_peers.len()
leader_info.get_unique_leader_tpus(x, Protocol::UDP).len() <= recent_peers.len()
);
assert_eq!(
leader_info.get_leader_tpus(x, Protocol::UDP).len(),
x as usize
);
}
}
Expand Down
37 changes: 18 additions & 19 deletions rpc/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ use {
solana_runtime::commitment::CommitmentSlots,
solana_send_transaction_service::{
send_transaction_service::SendTransactionService, tpu_info::NullTpuInfo,
transaction_client::ConnectionCacheClient,
},
solana_streamer::socket::SocketAddrSpace,
};
Expand Down Expand Up @@ -379,16 +380,15 @@ impl JsonRpcRequestProcessor {
.tpu(connection_cache.protocol())
.unwrap();
let (sender, receiver) = unbounded();
SendTransactionService::new::<NullTpuInfo>(

let client = ConnectionCacheClient::<NullTpuInfo>::new(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the follow up PR, this method will be parametrized with the type of client.

connection_cache.clone(),
tpu_address,
&bank_forks,
None,
receiver,
connection_cache,
1000,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 1000? Maybe some comment explaining this number?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pre-existing code used only in tests, I think it is magic number which is selected from common sense considerations. Will check this out in follow ups.

None,
1,
exit.clone(),
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let leader_schedule_cache = Arc::new(LeaderScheduleCache::new_from_bank(&bank));
let startup_verification_complete = Arc::clone(bank.get_startup_verification_complete());
Expand Down Expand Up @@ -4386,7 +4386,9 @@ pub mod tests {
},
vote::state::VoteState,
},
solana_send_transaction_service::tpu_info::NullTpuInfo,
solana_send_transaction_service::{
tpu_info::NullTpuInfo, transaction_client::ConnectionCacheClient,
},
solana_transaction_status::{
EncodedConfirmedBlock, EncodedTransaction, EncodedTransactionWithStatusMeta,
TransactionDetails,
Expand Down Expand Up @@ -6492,16 +6494,14 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here

connection_cache.clone(),
tpu_address,
&bank_forks,
None,
receiver,
connection_cache,
1000,
None,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

let mut bad_transaction = system_transaction::transfer(
&mint_keypair,
Expand Down Expand Up @@ -6766,16 +6766,15 @@ pub mod tests {
Arc::new(AtomicU64::default()),
Arc::new(PrioritizationFeeCache::default()),
);
SendTransactionService::new::<NullTpuInfo>(
let client = ConnectionCacheClient::<NullTpuInfo>::new(
connection_cache.clone(),
tpu_address,
&bank_forks,
None,
receiver,
connection_cache,
1000,
None,
1,
exit,
);
SendTransactionService::new(&bank_forks, receiver, client, 1000, exit.clone());

assert_eq!(
request_processor.get_block_commitment(0),
RpcBlockCommitment {
Expand Down
18 changes: 13 additions & 5 deletions rpc/src/rpc_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,10 @@ use {
exit::Exit, genesis_config::DEFAULT_GENESIS_DOWNLOAD_PATH, hash::Hash,
native_token::lamports_to_sol,
},
solana_send_transaction_service::send_transaction_service::{self, SendTransactionService},
solana_send_transaction_service::{
send_transaction_service::{self, SendTransactionService},
transaction_client::ConnectionCacheClient,
},
solana_storage_bigtable::CredentialType,
std::{
net::SocketAddr,
Expand Down Expand Up @@ -474,15 +477,20 @@ impl JsonRpcService {

let leader_info =
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder));
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config(
let client = ConnectionCacheClient::new(
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is created here temporary to avoid changes in the rpc crate API. In the follow up PR I will move the creation to the core/validator.rs so rpc crate will accept Client as generic argument instead.

connection_cache,
tpu_address,
&bank_forks,
send_transaction_service_config.tpu_peers.clone(),
leader_info,
send_transaction_service_config.leader_forward_count,
);
let _send_transaction_service = SendTransactionService::new_with_config(
&bank_forks,
receiver,
connection_cache,
client,
send_transaction_service_config,
exit,
));
);

#[cfg(test)]
let test_request_processor = request_processor.clone();
Expand Down
9 changes: 8 additions & 1 deletion send-transaction-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,27 @@ license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-trait = { workspace = true }
crossbeam-channel = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
solana-client = { workspace = true }
solana-connection-cache = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-quic-client = { workspace = true }
solana-runtime = { workspace = true }
solana-sdk = { workspace = true }
solana-tpu-client = { workspace = true }
solana-tpu-client-next = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-util = { workspace = true }

[dev-dependencies]
solana-logger = { workspace = true }
solana-runtime = { workspace = true, features = ["dev-context-only-utils"] }

[features]
dev-context-only-utils = []

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
3 changes: 3 additions & 0 deletions send-transaction-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ pub mod send_transaction_service_stats;
pub mod tpu_info;
pub mod transaction_client;

#[cfg(any(test, feature = "dev-context-only-utils"))]
pub mod test_utils;

#[macro_use]
extern crate solana_metrics;
Loading
Loading