-
Notifications
You must be signed in to change notification settings - Fork 261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
use tpu-client-next in send_transaction_service #3515
use tpu-client-next in send_transaction_service #3515
Conversation
If this PR represents a change to the public RPC API:
Thank you for keeping the RPC clients in sync with the server API @KirillLykov. |
7f3996a
to
298ef4e
Compare
); | ||
|
||
SendTransactionService::new(&bank_forks, receiver, client, 5_000, exit.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Out of scope of this PR, but no idea who usese this bank_server
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually nobody and it should be removed from agave
@@ -140,53 +138,31 @@ impl Default for Config { | |||
pub const MAX_RETRY_SLEEP_MS: u64 = 1000; | |||
|
|||
impl SendTransactionService { | |||
pub fn new<T: TpuInfo + std::marker::Send + 'static>( | |||
tpu_address: SocketAddr, | |||
pub fn new<Client: TransactionClient + Clone + std::marker::Send + 'static>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could have defined a type alias for that or, maybe, it is good to be explicit for these interface functionality (?).
} | ||
|
||
#[test] | ||
fn process_transactions() { | ||
#[tokio::test(flavor = "multi_thread")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note, that similar pattern for tests will be used in dozen other tests in rpc crate in the follow up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: Is this to allow tests to run in parallel, or does it change the concurrency model of tokio
itself for the duration of the test, or both?
@@ -474,15 +477,20 @@ impl JsonRpcService { | |||
|
|||
let leader_info = | |||
poh_recorder.map(|recorder| ClusterTpuInfo::new(cluster_info.clone(), recorder)); | |||
let _send_transaction_service = Arc::new(SendTransactionService::new_with_config( | |||
let client = ConnectionCacheClient::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is created here temporary to avoid changes in the rpc crate API. In the follow up PR I will move the creation to the core/validator.rs so rpc crate will accept Client as generic argument instead.
@@ -379,16 +380,15 @@ impl JsonRpcRequestProcessor { | |||
.tpu(connection_cache.protocol()) | |||
.unwrap(); | |||
let (sender, receiver) = unbounded(); | |||
SendTransactionService::new::<NullTpuInfo>( | |||
|
|||
let client = ConnectionCacheClient::<NullTpuInfo>::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the follow up PR, this method will be parametrized with the type of client.
@@ -6492,16 +6494,14 @@ pub mod tests { | |||
Arc::new(AtomicU64::default()), | |||
Arc::new(PrioritizationFeeCache::default()), | |||
); | |||
SendTransactionService::new::<NullTpuInfo>( | |||
let client = ConnectionCacheClient::<NullTpuInfo>::new( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
1277df6
to
d234341
Compare
recorder.leader_and_slot_after_n_slots(slots_in_the_future) | ||
}) | ||
}) | ||
let leader_pubkeys: Vec<_> = (0..max_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I allocate here vector and filter it later because I'm not sure if filtering using hash map takes too long to be inside critical section or not. If it seems that better filter than allocate, I will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called in a tight loop with the poh read lock held. Looking at
recorder.leader_after_n_slots() it looks like it would be better to get the
current tick height from poh (requires read lock), then use the leader schedule
cache independently from poh (Arc::clone it), release the poh lock and just
query the cache.
Pls do this in a followup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this max_count
should not be large (at least I don't understand why someone can use large value for it), I don't think that reducing the scope of lock will change much.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change can be found in https://github.com/KirillLykov/solana/blob/klykov/fix-get-leader-tpus/rpc/src/cluster_tpu_info.rs#L50
But it required adding some minor modifications to the poh_recorder, not sure if it is a good idea to add to PR that is backported for that reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alessandrod do you think this change is necessary to be in a PR that will be backported or a PR that is not necessary to be backported? I think that this change has minimal influence on the performance because we always use small max_count
.
Another consideration, is that I'm not sure if this fix is enough -- it looks like Arc<RwLock<PohRecorder>>
is used in many places and locks too much data unnecessarily. So there is a design problem that should be tackled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
#[test] | ||
fn process_transactions() { | ||
#[tokio::test(flavor = "multi_thread")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious: Is this to allow tests to run in parallel, or does it change the concurrency model of tokio
itself for the duration of the test, or both?
|
The expectations are limited to get rid of quic traffic fragmentation. So I try not to change anything else than adding an option to select a quic client implementation to be safe. |
b81e5ef
to
d5ee74c
Compare
} | ||
}); | ||
|
||
measure.stop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't increment error counter here because there is no way to do this without changing the type of stats to Arc<SendTransactionServiceStats>
. This matches the ConnectionCache version behavior, it also doesn't increment this counter (although it pretends it does)
tpu_peers, | ||
}; | ||
let config = ConnectionWorkersSchedulerConfig { | ||
bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wasn't the bind address configurable somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right that there is --bind-address
validator argument here. But it is not used in client code (not sure if this is by design):
let client_socket = solana_net_utils::bind_in_range(
IpAddr::V4(Ipv4Addr::UNSPECIFIED),
VALIDATOR_PORT_RANGE,
)
See https://github.com/anza-xyz/agave/blob/master/quic-client/src/nonblocking/quic_client.rs#L138
Probably, I should create socket using the same function bind_in_range
here.
I asked on discord if there is any sense to use UNSPECIFIED
address even when user specified address.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some discussions about this bind_in_range with Alex, I came to conclusion that there is no need in using bind_in_range and it is always better to use ephemeral client ports.
Regarding using bind_address
here: it is possible but requires plumbing the whole call-stack from validator/src/main.rs. Looks invasive for this PR.
// to match MAX_CONNECTIONS from ConnectionCache | ||
num_connections: 1024, | ||
skip_check_transaction_age: true, | ||
worker_channel_size: 64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
magic number, should at least document how it was chosen
num_connections: 1024, | ||
skip_check_transaction_age: true, | ||
worker_channel_size: 64, | ||
max_reconnect_attempts: 4, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another magic number
ca7b51d
to
c11c695
Compare
@alessandrod I've added 89b38d4 that introduces reusable Receiver to the tpu-client-next and uses this Receiver to implement |
a61a00f
to
03bdba5
Compare
leader_forward_count: usize, | ||
) -> ConnectionWorkersSchedulerConfig { | ||
ConnectionWorkersSchedulerConfig { | ||
bind: SocketAddr::new(Ipv4Addr::new(0, 0, 0, 0).into(), 0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe bind to both V6 and V4 addresses? Not make problems for future...
Also maybe figure out if the bind address should be configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Requires a separate issue to do this change on the level of validator. Maybe should discuss with Greg.
return Err("TpuClientNext task panicked.".into()); | ||
}; | ||
lock.1.cancel(); | ||
lock.0.take() // Take the `join_handle` out of the critical section |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do it to avoid critical section that wraps await call
fn send_transactions_in_batch( | ||
&self, | ||
wire_transactions: Vec<Vec<u8>>, | ||
stats: &SendTransactionServiceStats, | ||
) { | ||
let mut measure = Measure::start("send-us"); | ||
self.runtime_handle.spawn({ | ||
let sender = self.sender.clone(); | ||
async move { | ||
let res = sender.send(TransactionBatch::new(wire_transactions)).await; | ||
if res.is_err() { | ||
warn!("Failed to send transaction to channel: it is closed."); | ||
} | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code will "leak" memory, if the sender is full and blocks indefinitely, the Vecs with wire_transactions will pile up in the executor's queue, which will result in out of memory condition and validator crash. This is unacceptable.
Suggest set sender's channel capacity to be able to "smooth out" fluctuations in network load as appropriate. E.g. if network load fluctuation duration is ~50 ms, we need to be able to buffer up to 50 ms worth of transaction batches.
If channel is full suggest drop transaction batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For clarity, this is what currently happens also with ConnectionCache, so this change doesn't make situation worse.
A solution could be to use try_send
instead with the contract that if channel is full we drop RPC traffic. Should mention that internally in tpu-client-next we don't guarantee that all the txs it receives are sent (they might be dropped internally as well) and on the level of STS we do have retry.
Alex proposed to compute the size of this channel (currently set to 128
) to the number avg_outage_sec * rpc_tps/batch_size
, where avg_outage_sec
is the average duration of network typical events when we cannot send transactions for some time, batch_size
is defined by validator cli (default is 1 which is wrong).
@alessandrod what do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We've discussed this: we need to implement proper backoff, but not in this PR, which is already too big. This PR implements the exact same queueing strategy that's already there and used in production, but with a better quic client implementation. Let's land it, backport it, then we can do backoff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I already forgot :-( Yeah, lets postpone it for the follow up that will not be backported to 2.1
None, | ||
receiver, | ||
connection_cache, | ||
1000, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why 1000? Maybe some comment explaining this number?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pre-existing code used only in tests, I think it is magic number which is selected from common sense considerations. Will check this out in follow ups.
/// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, | ||
/// L1, ...]` it will return `[L1, L2]` (the last L1 will be not added to | ||
/// the result). | ||
fn get_unique_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe change signature here to return Vec? It would not take much more memory but will likely be
- easier to work with since no borrowing is needed
- will probably work a lot faster
- will not take much more memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do you mean? this already returns Vec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, idea is it returns a Vec of pointers, but it could return a Vec of SocketAddr's. It would be maybe 20% bigger, but it would run 5x faster, and will be easier to manipulate for programmers and for the optimizer (since it will not have indirections anymore)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is pre-existing method, so to minimize changes in the scope of this PR, will implement in follow up PR.
/// addresses for these leaders. | ||
/// | ||
/// For example, if leader schedule was `[L1, L1, L1, L1, L2, L2, L2, L2, | ||
/// L1, ...]` it will return `[L1, L2, L1]`. | ||
fn get_leader_tpus(&self, max_count: u64, protocol: Protocol) -> Vec<&SocketAddr>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here, maybe return Vec?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The unbounded accumulation of transaction batches is problematic
- Couple of small comments/suggestions related to future support of IPv6 and more complex networking setups
}; | ||
|
||
if let Some(join_handle) = join_handle { | ||
let Ok(result) = join_handle.await else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we wait with timeout here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just did another pass, the update identity thing looks ok. Left some comments and there are also unaddressed comments from my first review
recorder.leader_and_slot_after_n_slots(slots_in_the_future) | ||
}) | ||
}) | ||
let leader_pubkeys: Vec<_> = (0..max_count) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called in a tight loop with the poh read lock held. Looking at
recorder.leader_after_n_slots() it looks like it would be better to get the
current tick height from poh (requires read lock), then use the leader schedule
cache independently from poh (Arc::clone it), release the poh lock and just
query the cache.
Pls do this in a followup
if let Some(keypair) = keypair { | ||
QuicClientCertificate::new(keypair) | ||
} else { | ||
QuicClientCertificate::new(&Keypair::new()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this can only happen when called from the test ::create_client
method. If that's the case, instead of leaking a test detail all the way down to
here, handle the None part in the tests, and make it so that all the methods
that lead to here take &Keypair instead of Option
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not only: if client wants to create use unstaked connections, he passes None
as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then please make new take Option, no need to add another method and
"with_option" wouldn't be a good descriptive name
03bdba5
to
2af42fb
Compare
@alessandrod pushed updates, now it should address all the comments. |
c69ec43
to
4187f1e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approving with a couple of nits
if let Some(keypair) = keypair { | ||
QuicClientCertificate::new(keypair) | ||
} else { | ||
QuicClientCertificate::new(&Keypair::new()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then please make new take Option, no need to add another method and
"with_option" wouldn't be a good descriptive name
@@ -16,11 +16,13 @@ use { | |||
streamer::StakedNodes, | |||
}, | |||
solana_tpu_client_next::{ | |||
connection_workers_scheduler::{ConnectionWorkersSchedulerConfig, Fanout}, | |||
connection_workers_scheduler::{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think this file needs the _test suffix, I'd call it connection_workers_scheduler.rs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will postpone this for the later, because this change is not necessary for backporting
36449f4
to
12098e7
Compare
Backports to the beta branch are to be avoided unless absolutely necessary for fixing bugs, security issues, and perf regressions. Changes intended for backport should be structured such that a minimum effective diff can be committed separately from any refactoring, plumbing, cleanup, etc that are not strictly necessary to achieve the goal. Any of the latter should go only into master and ride the normal stabilization schedule. Exceptions include CI/metrics changes, CLI improvements and documentation updates on a case by case basis. |
* Add tpu-client-next to send_transaction_service * rename with_option to new * Update Cargo.lock (cherry picked from commit 5c0f173) # Conflicts: # banks-server/src/banks_server.rs # programs/sbf/Cargo.lock # rpc/src/rpc.rs # rpc/src/rpc_service.rs # send-transaction-service/src/send_transaction_service.rs # send-transaction-service/src/tpu_info.rs # send-transaction-service/src/transaction_client.rs # svm/examples/Cargo.lock
Problem
This PR adds tpu-client-next to send_transaction_service as an optional client.
Summary of Changes