Skip to content

Commit

Permalink
Initial implementations of the TPU Vortexor
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Oct 25, 2024
1 parent d6c7fd0 commit d6f0818
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 0 deletions.
38 changes: 38 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ members = [
"upload-perf",
"validator",
"version",
"vortexor",
"vote",
"watchtower",
"wen-restart",
Expand Down
53 changes: 53 additions & 0 deletions vortexor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
[package]
name = "solana-vortexor"
description = "Solana TPU Vortexor"
documentation = "https://docs.rs/solana-vortexor"
version = { workspace = true }
authors = { workspace = true }
repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
async-channel = { workspace = true }
bytes = { workspace = true }
crossbeam-channel = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
governor = { workspace = true }
histogram = { workspace = true }
indexmap = { workspace = true }
itertools = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
nix = { workspace = true, features = ["net"] }
pem = { workspace = true }
percentage = { workspace = true }
quinn = { workspace = true }
quinn-proto = { workspace = true }
rand = { workspace = true }
rustls = { workspace = true }
smallvec = { workspace = true }
socket2 = { workspace = true }
solana-measure = { workspace = true }
solana-metrics = { workspace = true }
solana-perf = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-metrics-tracker = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["full"] }
x509-parser = { workspace = true }

[dev-dependencies]
assert_matches = { workspace = true }
solana-logger = { workspace = true }

[lib]
crate-type = ["lib"]
name = "solana_vortexor"

[package.metadata.docs.rs]
targets = ["x86_64-unknown-linux-gnu"]
1 change: 1 addition & 0 deletions vortexor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod vortexor;
148 changes: 148 additions & 0 deletions vortexor/src/vortexor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use {
crossbeam_channel::Sender,
solana_perf::packet::PacketBatch,
solana_sdk::{quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
quic::{spawn_server_multi, EndpointKeyUpdater},
streamer::StakedNodes,
},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex, RwLock},
thread::{self, JoinHandle},
time::Duration,
},
};

pub struct TpuSockets {
pub tpu_quic: Vec<UdpSocket>,
pub tpu_quic_fwd: Vec<UdpSocket>,
}

pub struct TpuStreamerConfig {
tpu_thread_name: &'static str,
tpu_metrics_name: &'static str,
tpu_fwd_thread_name: &'static str,
tpu_fwd_metrics_name: &'static str,
max_connections_per_peer: usize,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
max_connections_per_ipaddr_per_min: u64,
wait_for_chunk_timeout: Duration,
sender_coalesce_duration: Duration,
}

pub struct Vortexor {
thread_handles: Vec<JoinHandle<()>>,
key_update_notifier: Arc<KeyUpdateNotifier>,
}

struct KeyUpdateNotifier {
key_updaters: Mutex<Vec<Arc<EndpointKeyUpdater>>>,
}

impl KeyUpdateNotifier {
fn new(key_updaters: Vec<Arc<EndpointKeyUpdater>>) -> Self {
Self {
key_updaters: Mutex::new(key_updaters),
}
}
}

impl NotifyKeyUpdate for KeyUpdateNotifier {
fn update_key(&self, key: &Keypair) -> Result<(), Box<dyn std::error::Error>> {
let updaters = self.key_updaters.lock().unwrap();
for updater in updaters.iter() {
updater.update_key(key)?
}
Ok(())
}
}

impl Vortexor {
/// Create a new TPU Vortexor
pub fn new(
keypair: &Keypair,
tpu_sockets: TpuSockets,
tpu_sender: Sender<PacketBatch>,
tpu_fwd_sender: Sender<PacketBatch>,
staked_nodes: Arc<RwLock<StakedNodes>>,
config: TpuStreamerConfig,
exit: Arc<AtomicBool>,
) -> Self {
let TpuSockets {
tpu_quic,
tpu_quic_fwd,
} = tpu_sockets;

let TpuStreamerConfig {
tpu_thread_name,
tpu_metrics_name,
tpu_fwd_thread_name,
tpu_fwd_metrics_name,
max_connections_per_peer,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
sender_coalesce_duration,
} = config;

let tpu_result = spawn_server_multi(
tpu_thread_name,
tpu_metrics_name,
tpu_quic,
keypair,
tpu_sender.clone(),
exit.clone(),
max_connections_per_peer,
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
sender_coalesce_duration,
)
.unwrap();

let tpu_fwd_result = spawn_server_multi(
tpu_fwd_thread_name,
tpu_fwd_metrics_name,
tpu_quic_fwd,
keypair,
tpu_fwd_sender,
exit.clone(),
max_connections_per_peer,
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
max_connections_per_ipaddr_per_min,
wait_for_chunk_timeout,
sender_coalesce_duration,
)
.unwrap();

Self {
thread_handles: vec![tpu_result.thread, tpu_fwd_result.thread],
key_update_notifier: Arc::new(KeyUpdateNotifier::new(vec![
tpu_result.key_updater,
tpu_fwd_result.key_updater,
])),
}
}

pub fn get_key_update_notifier(&self) -> Arc<dyn NotifyKeyUpdate + Sync + Send> {
self.key_update_notifier.clone()
}

pub fn join(self) -> thread::Result<()> {
for t in self.thread_handles {
t.join()?
}
Ok(())
}
}

0 comments on commit d6f0818

Please sign in to comment.