Skip to content

Commit

Permalink
Add update_key for transaction client
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Dec 3, 2024
1 parent 5b0e8bc commit 98973ca
Showing 1 changed file with 64 additions and 7 deletions.
71 changes: 64 additions & 7 deletions send-transaction-service/src/transaction_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use {
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_measure::measure::Measure,
solana_sdk::signature::Keypair,
solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair},
solana_tpu_client_next::{
connection_workers_scheduler::{
ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver,
Expand Down Expand Up @@ -152,6 +152,15 @@ where
}
}

impl<T> NotifyKeyUpdate for ConnectionCacheClient<T>
where
T: TpuInfoWithSendStatic,
{
fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
self.connection_cache.update_key(identity)
}
}

#[derive(Clone)]
pub struct SendTransactionServiceLeaderUpdater<T: TpuInfoWithSendStatic> {
leader_info_provider: CurrentLeaderInfo<T>,
Expand Down Expand Up @@ -192,12 +201,6 @@ type TpuClientJoinHandle =
/// * Update the validator identity keypair and propagate the changes to the
/// scheduler. Most of the complexity of this structure arises from this
/// functionality.
///
#[allow(
dead_code,
reason = "Unused fields will be utilized soon,\
added in advance to avoid larger changes in the code."
)]
#[derive(Clone)]
pub struct TpuClientNextClient<T>
where
Expand Down Expand Up @@ -285,6 +288,60 @@ where
lock.1.cancel();
Ok(())
}

async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
let runtime_handle = self.runtime_handle.clone();
let config = Self::create_config(
Some(identity.insecure_clone()),
self.leader_forward_count as usize,
);
let leader_updater = self.leader_updater.clone();
let handle = self.join_and_cancel.clone();

let join_handle = {
let Ok(mut lock) = handle.lock() else {
return Err("TpuClientNext task panicked.".into());
};
lock.1.cancel();
lock.0.take() // Take the `join_handle` out of the critical section
};

if let Some(join_handle) = join_handle {
let Ok(result) = join_handle.await else {
return Err("TpuClientNext task panicked.".into());
};

match result {
Ok((_stats, receiver)) => {
let cancel = CancellationToken::new();
let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run(
config,
Box::new(leader_updater),
receiver,
cancel.clone(),
));

let Ok(mut lock) = handle.lock() else {
return Err("TpuClientNext task panicked.".into());
};
*lock = (Some(join_handle), cancel);
}
Err(error) => {
return Err(Box::new(error));
}
}
}
Ok(())
}
}

impl<T> NotifyKeyUpdate for TpuClientNextClient<T>
where
T: TpuInfoWithSendStatic + Clone,
{
fn update_key(&self, identity: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
self.runtime_handle.block_on(self.do_update_key(identity))
}
}

impl<T> TransactionClient for TpuClientNextClient<T>
Expand Down

0 comments on commit 98973ca

Please sign in to comment.