-
Notifications
You must be signed in to change notification settings - Fork 300
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add fanout to tpu-client-next #3478
add fanout to tpu-client-next #3478
Conversation
107384f
to
f62dee2
Compare
// Regardless of who is leader, add future leaders to the cache to | ||
// hide the latency of opening the connection. | ||
tokio::select! { | ||
send_res = workers.send_transactions_to_address(new_leader, transaction_batch.clone()) => match send_res { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now this will block if the corresponding worker's channel is full, so
effectively the slowest leader will slow down all other leaders.
I think we should 1) increase the channel size and 2) add a
try_send_transactions_to_address that returns an error if the channel is full.
If the channel is full, I guess as a start we could log a warning and drop the
batch for that leader, but at least we don't slow down the other leaders.
Then longer term, we need to think of a better API so that the caller of the
crate can decide what should happen: drop the batch? increase the channel size?
slow down upstream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, I'm not sure that there is a value in having both send_transactions
and try_send_transactions
in the context of this crate. So probably just modify the code to use try_
. This also allows to reduce select!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The size of the channel worker_channel_size
is configurable through the Scheduler config. The only question what value should be used in practice for the STS, for example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
one thing that worries me is the scenario with transaction-bench
. There we actually need to have a backpressure (which has been created with these worker_channel.send().await
) because it tries to generate as many as possible transaction batches and we rely on slowing down sending them.
So I want to add configuration flag controlling option try_send
or send
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of STS, try_send
is fine because even if we drop some transactions silently, there is a retry thread which sends them again anyways.
@@ -35,7 +35,7 @@ pub trait LeaderUpdater: Send { | |||
/// If the current leader estimation is incorrect and transactions are sent to | |||
/// only one estimated leader, there is a risk of losing all the transactions, | |||
/// depending on the forwarding policy. | |||
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr>; | |||
fn next_leaders(&mut self, lookahead_slots: u64) -> Vec<SocketAddr>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because to implement this trait in STS I need to able to modify self.
Backports to the beta branch are to be avoided unless absolutely necessary for fixing bugs, security issues, and perf regressions. Changes intended for backport should be structured such that a minimum effective diff can be committed separately from any refactoring, plumbing, cleanup, etc that are not strictly necessary to achieve the goal. Any of the latter should go only into master and ride the normal stabilization schedule. Exceptions include CI/metrics changes, CLI improvements and documentation updates on a case by case basis. |
263d796
to
c0fd36e
Compare
@alessandrod all addressed except for 0rtt (next time) |
// Define the non-atomic struct and the `to_non_atomic` conversion method | ||
define_non_atomic_struct!( | ||
SendTransactionStatsNonAtomic, | ||
SendTransactionStats, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
I would call the macro define_non_atoic_version_for!
. Just to be extra clear, that it creates a struct based on another struct.
// Define the non-atomic struct and the `to_non_atomic` conversion method | |
define_non_atomic_struct!( | |
SendTransactionStatsNonAtomic, | |
SendTransactionStats, | |
{ | |
// Define the non-atomic struct and the `to_non_atomic` conversion method | |
define_non_atomic_version_for!( | |
SendTransactionStats, | |
SendTransactionStatsNonAtomic, | |
{ |
tpu-client-next/src/workers_cache.rs
Outdated
mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel, | ||
mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style
Consider importing mpsc::error::TrySendError
.
mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel, | |
mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, | |
use { | |
/* ... */ | |
tokio::{sync::mpsc::{self, mpsc::error::TrySendError}, task::JoinHandle}, | |
/* ... */ | |
}; | |
/* ... */ | |
TrySendError::Full(_) => WorkersCacheError::FullChannel, | |
TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped, |
} | ||
|
||
/// Sends a batch of transactions to the worker for a given peer. If the | ||
/// worker for the peer is disconnected or fails, it is removed from the | ||
/// cache. | ||
pub async fn send_transactions_to_address( | ||
pub(crate) fn try_send_transactions_to_address( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
Unrelated to this PR, but the address
suffix in the name is probably unnecessary.
As the call will have the address as the first argument, it is probably readable enough if the function is called try_send_transactions()
or try_send_transactions_to()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I will not touch in this PR to minimize the number of changes (to simplify the backport 2.1)
tpu-client-next/src/workers_cache.rs
Outdated
if let Some(worker) = worker { | ||
tokio::spawn(async move { | ||
let leader = worker.leader(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style
You can reduce the whole body indentation with a let/else
:
if let Some(worker) = worker { | |
tokio::spawn(async move { | |
let leader = worker.leader(); | |
let Some(worker) = worker else { | |
return; | |
}; | |
tokio::spawn(async move { | |
let leader = worker.leader(); |
tpu-client-next/src/workers_cache.rs
Outdated
|
||
pub(crate) fn maybe_shutdown_worker(worker: Option<ShutdownWorker>) { | ||
if let Some(worker) = worker { | ||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spawning tasks without observing their execution, I think, is a bit fragile.
The executor will keep track of them, but it is not clear in the rest of the code as to what is going on.
And if those tasks take a long time to complete or even hang, it would be just reflected in the executor taking a long time to finish or hanging.
I do not fully understand the end to end logic here.
So, it could be hard to track.
But I suggest you consider putting all the shutdown tasks in an UnorderedFutures
or something like that, allowing you to track and even interrupt the shutdown process.
In particular, it would allow the shutdown errors to be processed in a centralized location, rather than them being just printed as warnings.
Though again, not sure if you really need this feature.
But, maybe for collection of the stats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I suggest you consider putting all the shutdown tasks in an
UnorderedFutures or something like that, allowing you to track and even
interrupt the shutdown process.
we explicitly don't want to do this tho: we don't want cleanup to delay sending
transactions. So we do need tasks. We could have a JoinSet and call
poll_join_next to poll without blocking and pop tasks off the join set, although
that seems extra work for little gain. Background tasks don't block the runtime https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#shutdown
@@ -433,10 +436,10 @@ async fn test_staked_connection() { | |||
|
|||
// Wait for the exchange to finish. | |||
tx_sender_shutdown.await; | |||
let localhost_stats = join_scheduler(scheduler_handle).await; | |||
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to put the .to_non_atomic()
call inside the join_scheduler()
?
Or we do not want it there for non-test case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, it allows to avoid cloning by doing this.
1341c0c
to
2dc802c
Compare
3ee1853
to
448d476
Compare
// Regardless of who is leader, add future leaders to the cache to | ||
// hide the latency of opening the connection. | ||
// add future leaders to the cache to hide the latency of opening the | ||
// connection. | ||
for peer in future_leaders { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here it established connection to future leaders
448d476
to
958de6f
Compare
/// This enum defines to how many discovered leaders we will send transactions. | ||
pub enum LeadersFanout { | ||
/// Send transactions to all the leaders discovered by the `next_leaders` | ||
/// call. | ||
All, | ||
/// Send transactions to the first selected number of leaders discovered by | ||
/// the `next_leaders` call. | ||
Next(usize), | ||
Next(Fanout), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't like Next { send: usize, connect: usize}
?
Co-authored-by: Illia Bobyr <[email protected]>
958de6f
to
5fbfddd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
approving with a nit. Feel free to fix the nit in one of the followups if you don't want to do another CI run
} | ||
|
||
for new_leader in fanout_leaders { | ||
if !workers.contains(new_leader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I don't think that this can ever happen? I'd remove the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can happen if a connection to the leader is dropped and the worker is stopped.
This arm in the send_err
match case below:
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
It is possible for the fanout_leaders
to contain duplicates.
The duplicate would not be able to get a matching worker.
* Add tpu-client-next to the root Cargo.toml * Change LeaderUpdater trait to accept mut self * add fanout to the tpu-client-next * Shutdown in separate task * Use try_send instead, minor impromenets * fix LeaderUpdaterError traits * improve lifetimes in split_leaders Co-authored-by: Illia Bobyr <[email protected]> * address PR comments * create connections in advance * removed lookahead_slots --------- Co-authored-by: Illia Bobyr <[email protected]> (cherry picked from commit 2a618b5) # Conflicts: # Cargo.toml
/// * the second vector contains the leaders, used to warm up connections. This | ||
/// slice includes the the first set. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
/// * the second vector contains the leaders, used to warm up connections. This | |
/// slice includes the the first set. | |
/// * the second slice contains the leaders, used to warm up connections. This | |
/// slice includes the first set. |
let worker = Self::spawn_worker( | ||
&endpoint, | ||
peer, | ||
worker_channel_size, | ||
skip_check_transaction_age, | ||
max_reconnect_attempts, | ||
stats.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor
This clone()
should be unnecessary - I do not see stats
used in this block anymore.
stats.clone(), | |
stats, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weird that it passed clippy
let (fanout_leaders, connect_leaders) = | ||
split_leaders(&updated_leaders, &leaders_fanout); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit confusing that we call it send
and connect
portions elsewhere, but here they are called fanout
and connection
portions.
Maybe it would be more consistent to call it send_leaders
and connect_leaders
here as well?
let (fanout_leaders, connect_leaders) = | |
split_leaders(&updated_leaders, &leaders_fanout); | |
let (send_leaders, connect_leaders) = | |
split_leaders(&updated_leaders, &leaders_fanout); |
for new_leader in fanout_leaders { | ||
if !workers.contains(new_leader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the fanout
to send
rename above, the new_leader
name is a bit confusing to me here.
I would expect the new_leader
name to imply we are going to open a connection to this leader or start a worker for it.
But we actually start workers in the block above.
Maybe call it send_to
instead?
Or some other name that indicates that this is only a destination for the next transaction batch.
It could as well be the same leader as in the previous slot group1.
for new_leader in fanout_leaders { | |
if !workers.contains(new_leader) { | |
for send_to in send_leaders { | |
if !workers.contains(send_to) { |
Footnotes
-
Is there a name for a sequence of
NUM_CONSECUTIVE_LEADER_SLOTS
slots? ↩
} | ||
|
||
for new_leader in fanout_leaders { | ||
if !workers.contains(new_leader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can happen if a connection to the leader is dropped and the worker is stopped.
This arm in the send_err
match case below:
Err(WorkersCacheError::ReceiverDropped) => {
// Remove the worker from the cache, if the peer has disconnected.
maybe_shutdown_worker(workers.pop(*new_leader));
}
It is possible for the fanout_leaders
to contain duplicates.
The duplicate would not be able to get a matching worker.
@ilya-bobyr yeah, in the follow up these renamings and also need to think how to have a backpressure for sending transactions. Not obvious to me so far, but this backpressure is not needed for SendTransactionService which is priority for now, only for transaction-bench. |
* add fanout to tpu-client-next (#3478) * Add tpu-client-next to the root Cargo.toml * Change LeaderUpdater trait to accept mut self * add fanout to the tpu-client-next * Shutdown in separate task * Use try_send instead, minor impromenets * fix LeaderUpdaterError traits * improve lifetimes in split_leaders Co-authored-by: Illia Bobyr <[email protected]> * address PR comments * create connections in advance * removed lookahead_slots --------- Co-authored-by: Illia Bobyr <[email protected]> (cherry picked from commit 2a618b5) # Conflicts: # Cargo.toml * resolve the conflict --------- Co-authored-by: kirill lykov <[email protected]>
Problem
Crate tpu-client-next doesn't have fan out so far.
Summary of Changes
Add this feature.