Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds metrics to crossbeam arms #86

Merged
merged 3 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ rand = "0.8.5"
serde = "1.0.160"
sha2 = "0.10.6"
solana-client = "=1.14.18"
solana-measure = "=1.14.18"
solana-metrics = "=1.14.18"
solana-perf = "=1.14.18"
solana-sdk = "=1.14.18"
Expand Down
136 changes: 132 additions & 4 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use jito_protos::{
use jito_rpc::load_balancer::LoadBalancer;
use log::*;
use prost_types::Timestamp;
use solana_measure::measure;
use solana_metrics::datapoint_info;
use solana_perf::packet::PacketBatch;
use solana_sdk::{
Expand Down Expand Up @@ -55,6 +56,12 @@ struct RelayerMetrics {
pub num_try_send_channel_full: u64,
pub packet_latencies_us: Histogram,

pub crossbeam_slot_receiver_latency_us: Histogram,
pub crossbeam_delay_packet_receiver_latency_us: Histogram,
pub crossbeam_subscription_receiver_latency_us: Histogram,
pub crossbeam_heartbeat_tick_latency_us: Histogram,
pub crossbeam_metrics_tick_latency_us: Histogram,

// channel stats
pub slot_receiver_max_len: usize,
pub slot_receiver_capacity: usize,
Expand All @@ -81,7 +88,12 @@ impl RelayerMetrics {
max_heartbeat_tick_latency_us: 0,
metrics_latency_us: 0,
num_try_send_channel_full: 0,
packet_latencies_us: Default::default(),
packet_latencies_us: Histogram::default(),
crossbeam_slot_receiver_latency_us: Histogram::default(),
segfaultdoc marked this conversation as resolved.
Show resolved Hide resolved
crossbeam_delay_packet_receiver_latency_us: Histogram::default(),
crossbeam_subscription_receiver_latency_us: Histogram::default(),
crossbeam_heartbeat_tick_latency_us: Histogram::default(),
crossbeam_metrics_tick_latency_us: Histogram::default(),
slot_receiver_max_len: 0,
slot_receiver_capacity,
subscription_receiver_max_len: 0,
Expand Down Expand Up @@ -202,6 +214,112 @@ impl RelayerMetrics {
.unwrap_or_default(),
i64
),
// crossbeam arm latencies
(
"crossbeam_subscription_receiver_latency_us_p50",
self.crossbeam_subscription_receiver_latency_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_subscription_receiver_latency_us_p90",
self.crossbeam_subscription_receiver_latency_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_subscription_receiver_latency_us_p99",
self.crossbeam_subscription_receiver_latency_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_slot_receiver_latency_us_p50",
self.crossbeam_slot_receiver_latency_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_slot_receiver_latency_us_p90",
self.crossbeam_slot_receiver_latency_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_slot_receiver_latency_us_p99",
self.crossbeam_slot_receiver_latency_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_metrics_tick_latency_us_p50",
self.crossbeam_metrics_tick_latency_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_metrics_tick_latency_us_p90",
self.crossbeam_metrics_tick_latency_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_metrics_tick_latency_us_p99",
self.crossbeam_metrics_tick_latency_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_delay_packet_receiver_latency_us_p50",
self.crossbeam_delay_packet_receiver_latency_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_delay_packet_receiver_latency_us_p90",
self.crossbeam_delay_packet_receiver_latency_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_delay_packet_receiver_latency_us_p99",
self.crossbeam_delay_packet_receiver_latency_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_heartbeat_tick_latency_us_p50",
self.crossbeam_heartbeat_tick_latency_us
.percentile(50.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_heartbeat_tick_latency_us_p90",
self.crossbeam_heartbeat_tick_latency_us
.percentile(90.0)
.unwrap_or_default(),
i64
),
(
"crossbeam_heartbeat_tick_latency_us_p99",
self.crossbeam_heartbeat_tick_latency_us
.percentile(99.0)
.unwrap_or_default(),
i64
),
// channel lengths
("slot_receiver_len", self.slot_receiver_max_len, i64),
("slot_receiver_capacity", self.slot_receiver_capacity, i64),
Expand Down Expand Up @@ -377,16 +495,23 @@ impl RelayerImpl {
while !exit.load(Ordering::Relaxed) {
crossbeam_channel::select! {
recv(slot_receiver) -> maybe_slot => {
Self::update_highest_slot(maybe_slot, &mut highest_slot, &mut relayer_metrics)?;
let (relayer_result, latency) = measure!(Self::update_highest_slot(maybe_slot, &mut highest_slot, &mut relayer_metrics));
let _ = relayer_metrics.crossbeam_slot_receiver_latency_us.increment(latency.as_us());
let _ = relayer_result?;
},
recv(delay_packet_receiver) -> maybe_packet_batches => {
let failed_forwards = Self::forward_packets(maybe_packet_batches, &packet_subscriptions, &leader_schedule_cache, &highest_slot, &leader_lookahead, &mut relayer_metrics)?;
let (relayer_result, latency) = measure!(Self::forward_packets(maybe_packet_batches, &packet_subscriptions, &leader_schedule_cache, &highest_slot, &leader_lookahead, &mut relayer_metrics));
segfaultdoc marked this conversation as resolved.
Show resolved Hide resolved
let failed_forwards = relayer_result?;
Self::drop_connections(failed_forwards, &packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_delay_packet_receiver_latency_us.increment(latency.as_us());
},
recv(subscription_receiver) -> maybe_subscription => {
Self::handle_subscription(maybe_subscription, &packet_subscriptions, &mut relayer_metrics)?;
let (relayer_result, latency) = measure!(Self::handle_subscription(maybe_subscription, &packet_subscriptions, &mut relayer_metrics));
let _ = relayer_result?;
let _ = relayer_metrics.crossbeam_subscription_receiver_latency_us.increment(latency.as_us());
}
recv(heartbeat_tick) -> time_generated => {
let start = Instant::now();
if let Ok(time_generated) = time_generated {
relayer_metrics.max_heartbeat_tick_latency_us = std::cmp::max(relayer_metrics.max_heartbeat_tick_latency_us, Instant::now().duration_since(time_generated).as_micros() as u64);
}
Expand All @@ -402,8 +527,10 @@ impl RelayerImpl {
HealthState::Unhealthy => packet_subscriptions.read().unwrap().keys().cloned().collect(),
};
Self::drop_connections(pubkeys_to_drop, &packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_heartbeat_tick_latency_us.increment(start.elapsed().as_micros() as u64);
}
recv(metrics_tick) -> time_generated => {
let start = Instant::now();
let l_packet_subscriptions = packet_subscriptions.read().unwrap();
relayer_metrics.num_current_connections = l_packet_subscriptions.len() as u64;
relayer_metrics.update_packet_subscription_total_capacity(&l_packet_subscriptions);
Expand All @@ -412,6 +539,7 @@ impl RelayerImpl {
if let Ok(time_generated) = time_generated {
relayer_metrics.metrics_latency_us = time_generated.elapsed().as_micros() as u64;
}
let _ = relayer_metrics.crossbeam_metrics_tick_latency_us.increment(start.elapsed().as_micros() as u64);

relayer_metrics.report();
relayer_metrics = RelayerMetrics::new(
Expand Down
2 changes: 2 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[toolchain]
channel = "1.69.0"
Loading