diff --git a/send-transaction-service/src/transaction_client.rs b/send-transaction-service/src/transaction_client.rs index 29d1c3c3411944..f098c95643320b 100644 --- a/send-transaction-service/src/transaction_client.rs +++ b/send-transaction-service/src/transaction_client.rs @@ -5,7 +5,7 @@ use { solana_client::connection_cache::{ConnectionCache, Protocol}, solana_connection_cache::client_connection::ClientConnection as TpuConnection, solana_measure::measure::Measure, - solana_sdk::signature::Keypair, + solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair}, solana_tpu_client_next::{ connection_workers_scheduler::{ ConnectionWorkersSchedulerConfig, Fanout, TransactionStatsAndReceiver, @@ -152,6 +152,15 @@ where } } +impl NotifyKeyUpdate for ConnectionCacheClient +where + T: TpuInfoWithSendStatic, +{ + fn update_key(&self, identity: &Keypair) -> Result<(), Box> { + self.connection_cache.update_key(identity) + } +} + #[derive(Clone)] pub struct SendTransactionServiceLeaderUpdater { leader_info_provider: CurrentLeaderInfo, @@ -192,12 +201,6 @@ type TpuClientJoinHandle = /// * Update the validator identity keypair and propagate the changes to the /// scheduler. Most of the complexity of this structure arises from this /// functionality. -/// -#[allow( - dead_code, - reason = "Unused fields will be utilized soon,\ - added in advance to avoid larger changes in the code." -)] #[derive(Clone)] pub struct TpuClientNextClient where @@ -285,6 +288,60 @@ where lock.1.cancel(); Ok(()) } + + async fn do_update_key(&self, identity: &Keypair) -> Result<(), Box> { + let runtime_handle = self.runtime_handle.clone(); + let config = Self::create_config( + Some(identity.insecure_clone()), + self.leader_forward_count as usize, + ); + let leader_updater = self.leader_updater.clone(); + let handle = self.join_and_cancel.clone(); + + let join_handle = { + let Ok(mut lock) = handle.lock() else { + return Err("TpuClientNext task panicked.".into()); + }; + lock.1.cancel(); + lock.0.take() // Take the `join_handle` out of the critical section + }; + + if let Some(join_handle) = join_handle { + let Ok(result) = join_handle.await else { + return Err("TpuClientNext task panicked.".into()); + }; + + match result { + Ok((_stats, receiver)) => { + let cancel = CancellationToken::new(); + let join_handle = runtime_handle.spawn(ConnectionWorkersScheduler::run( + config, + Box::new(leader_updater), + receiver, + cancel.clone(), + )); + + let Ok(mut lock) = handle.lock() else { + return Err("TpuClientNext task panicked.".into()); + }; + *lock = (Some(join_handle), cancel); + } + Err(error) => { + return Err(Box::new(error)); + } + } + } + Ok(()) + } +} + +impl NotifyKeyUpdate for TpuClientNextClient +where + T: TpuInfoWithSendStatic + Clone, +{ + fn update_key(&self, identity: &Keypair) -> Result<(), Box> { + self.runtime_handle.block_on(self.do_update_key(identity)) + } } impl TransactionClient for TpuClientNextClient