Skip to content

Commit

Permalink
addressed some feedback from Behzad
Browse files Browse the repository at this point in the history
  • Loading branch information
lijunwangs committed Dec 13, 2024
1 parent d8b7f26 commit 4509323
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 60 deletions.
2 changes: 0 additions & 2 deletions bench-vote/.gitignore

This file was deleted.

100 changes: 42 additions & 58 deletions bench-vote/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,37 @@ use {
},
};

const SINK_REPORT_INTERVAL: Duration = Duration::from_secs(5);
const SINK_RECEIVE_TIMEOUT: Duration = Duration::from_secs(1);
const SOCKET_RECEIVE_TIMEOUT: Duration = Duration::from_secs(1);
const COALESCE_TIME: Duration = Duration::from_millis(1);

fn sink(
exit: Arc<AtomicBool>,
rvs: Arc<AtomicUsize>,
r: PacketBatchReceiver,
received_size: Arc<AtomicUsize>,
receiver: PacketBatchReceiver,
verbose: bool,
) -> JoinHandle<()> {
spawn(move || {
let mut last_report = Instant::now();
let interval: Duration = Duration::from_secs(5);
loop {
if exit.load(Ordering::Relaxed) {
return;
}
let timer = Duration::new(1, 0);
if let Ok(packet_batch) = r.recv_timeout(timer) {
rvs.fetch_add(packet_batch.len(), Ordering::Relaxed);
while !exit.load(Ordering::Relaxed) {
if let Ok(packet_batch) = receiver.recv_timeout(SINK_RECEIVE_TIMEOUT) {
received_size.fetch_add(packet_batch.len(), Ordering::Relaxed);
}

let count = rvs.load(Ordering::Relaxed);
let count = received_size.load(Ordering::Relaxed);

if verbose &&
/*count % 1000 == 0 && count != 0 &&*/
last_report.elapsed() > interval
{
if verbose && last_report.elapsed() > SINK_REPORT_INTERVAL {
println!("Received txns count: {count}");
last_report = Instant::now();
}
}
})
}

const TRANSACTIONS_PER_THREAD: u64 = 1000000; // Number of transactions per thread
const TRANSACTIONS_PER_THREAD: u64 = 1_000_000; // Number of transactions per thread

fn main() -> Result<()> {
let mut num_sockets = 1usize;

let matches = Command::new(crate_name!())
.about(crate_description!())
.version(solana_version::version!())
Expand All @@ -83,14 +78,14 @@ fn main() -> Result<()> {
Arg::new("server-only")
.long("server-only")
.takes_value(false)
.help("Use this many producer threads."),
.help("Run the bench tool as a server only."),
)
.arg(
Arg::new("client-only")
.long("client-only")
.takes_value(false)
.requires("destination")
.help("Use this many producer threads."),
.requires("server-address")
.help("Run the bench tool as a client only."),
)
.arg(
Arg::with_name("server-address")
Expand All @@ -115,6 +110,7 @@ fn main() -> Result<()> {
)
.get_matches();

let mut num_sockets = 1usize;
if let Some(n) = matches.value_of("num-recv-sockets") {
num_sockets = max(num_sockets, n.to_string().parse().expect("integer"));
}
Expand All @@ -137,7 +133,7 @@ fn main() -> Result<()> {
let port = destination.map_or(0, |addr| addr.port());
let ip_addr = destination.map_or(IpAddr::V4(Ipv4Addr::UNSPECIFIED), |addr| addr.ip());

let (exit, read_threads, sin_threads, destination) = if !client_only {
let (exit, read_threads, sink_threads, destination) = if !client_only {
let exit = Arc::new(AtomicBool::new(false));

let mut read_channels = Vec::new();
Expand All @@ -151,7 +147,7 @@ fn main() -> Result<()> {
.unwrap();
let stats = Arc::new(StreamerReceiveStats::new("bench-streamer-test"));
for read in read_sockets {
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
read.set_read_timeout(Some(SOCKET_RECEIVE_TIMEOUT)).unwrap();

let (s_reader, r_reader) = unbounded();
read_channels.push(r_reader);
Expand All @@ -162,17 +158,17 @@ fn main() -> Result<()> {
s_reader,
recycler.clone(),
stats.clone(),
Duration::from_millis(1), // coalesce
true,
None,
false,
COALESCE_TIME, // coalesce
true, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
));
}

let rvs = Arc::new(AtomicUsize::new(0));
let received_size = Arc::new(AtomicUsize::new(0));
let sink_threads: Vec<_> = read_channels
.into_iter()
.map(|r_reader| sink(exit.clone(), rvs.clone(), r_reader, verbose))
.map(|r_reader| sink(exit.clone(), received_size.clone(), r_reader, verbose))
.collect();

let destination = SocketAddr::new(ip_addr, port);
Expand All @@ -189,21 +185,13 @@ fn main() -> Result<()> {

let start = SystemTime::now();

let producer_threads = if !server_only {
let producer_threads: Vec<_> =
producer(destination, num_producers, use_connection_cache, verbose);
Some(producer_threads)
} else {
None
};
let producer_threads =
(!server_only).then(|| producer(destination, num_producers, use_connection_cache, verbose));

// sleep(Duration::new(5, 0));

if let Some(producer_threads) = producer_threads {
for t_producer in producer_threads {
t_producer.join()?;
}
}
producer_threads
.into_iter()
.flatten()
.try_for_each(JoinHandle::join)?;

if !server_only {
if let Some(exit) = exit {
Expand All @@ -213,26 +201,22 @@ fn main() -> Result<()> {
println!("To stop the server, please press ^C");
}

if let Some(read_threads) = read_threads {
for t_reader in read_threads {
t_reader.join()?;
}
}

if let Some(sink_threads) = sin_threads {
for t_sink in sink_threads {
t_sink.join()?;
}
}
read_threads
.into_iter()
.flatten()
.try_for_each(JoinHandle::join)?;
sink_threads
.into_iter()
.flatten()
.try_for_each(JoinHandle::join)?;

if !(server_only) {
let elapsed = start.elapsed().unwrap();
let time = elapsed.as_secs() * 10_000_000_000 + u64::from(elapsed.subsec_nanos());
let ftime = (time as f64) / 10_000_000_000_f64;
let ftime = elapsed.as_nanos() as f64;
let fcount = (TRANSACTIONS_PER_THREAD * num_producers) as f64;

println!(
"performance: {:?}, count: {fcount}, time: {ftime}",
"Performance: {:?}/ns, count: {fcount}, time in nano-second: {ftime}",
fcount / ftime
);
}
Expand All @@ -255,7 +239,7 @@ fn producer(
let transporter = if use_connection_cache {
Transporter::Cache(Arc::new(ConnectionCache::with_udp(
"connection_cache_vote_udp",
1,
1, // connection_pool_size
)))
} else {
Transporter::DirectSocket(Arc::new(bind_to_unspecified().unwrap()))
Expand Down

0 comments on commit 4509323

Please sign in to comment.