diff --git a/Cargo.lock b/Cargo.lock index 80a67279..ab3d9901 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,7 +747,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive 0.10.3", - "hashbrown 0.13.2", + "hashbrown 0.11.2", ] [[package]] @@ -2068,9 +2068,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.22" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3422,9 +3422,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.59" +version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a257ad03cd8fb16ad4172fedf8094451e1af1c4b70097636ef2eac9a5f0cc33" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ "bitflags 2.4.1", "cfg-if 1.0.0", @@ -3463,9 +3463,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.95" +version = "0.9.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a4130519a360279579c2053038317e40eff64d13fd3f004f9e1b72b8a6aaf9" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" dependencies = [ "cc", "libc", @@ -4728,9 +4728,9 @@ checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" [[package]] name = "shlex" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook" @@ -8155,18 +8155,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.26" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.26" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index ff1495c4..995bdfc5 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -426,6 +426,7 @@ impl RelayerImpl { ofac_addresses: HashSet, address_lookup_table_cache: Arc>, validator_packet_batch_size: usize, + forward_all: bool, ) -> Self { const LEADER_LOOKAHEAD: u64 = 2; @@ -453,6 +454,7 @@ impl RelayerImpl { ofac_addresses, address_lookup_table_cache, validator_packet_batch_size, + forward_all, ); warn!("RelayerImpl thread exited with result {res:?}") }) @@ -488,6 +490,7 @@ impl RelayerImpl { ofac_addresses: HashSet, address_lookup_table_cache: Arc>, validator_packet_batch_size: usize, + forward_all: bool, ) -> RelayerResult<()> { let mut highest_slot = Slot::default(); @@ -518,7 +521,7 @@ impl RelayerImpl { }, recv(delay_packet_receiver) -> maybe_packet_batches => { let start = Instant::now(); - let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?; + let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?; Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics); let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64); }, @@ -636,6 +639,7 @@ impl RelayerImpl { ofac_addresses: &HashSet, address_lookup_table_cache: &Arc>, validator_packet_batch_size: usize, + forward_all: bool, ) -> RelayerResult> { let packet_batches = maybe_packet_batches?; @@ -679,42 +683,51 @@ impl RelayerImpl { let l_subscriptions = subscriptions.read().unwrap(); + let senders = if forward_all { + l_subscriptions.iter().collect::>, + )>>() + } else { + slot_leaders + .iter() + .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) + .collect() + }; + let mut failed_forwards = Vec::new(); - for pubkey in slot_leaders { - if let Some(sender) = l_subscriptions.get(pubkey) { - for batch in &proto_packet_batches { - // NOTE: this is important to avoid divide-by-0 inside the validator if packets - // get routed to sigverify under the assumption theres > 0 packets in the batch - if batch.packets.is_empty() { - continue; - } + for batch in &proto_packet_batches { + // NOTE: this is important to avoid divide-by-0 inside the validator if packets + // get routed to sigverify under the assumption theres > 0 packets in the batch + if batch.packets.is_empty() { + continue; + } - // try send because it's a bounded channel and we don't want to block if the channel is full - match sender.try_send(Ok(SubscribePacketsResponse { - header: Some(Header { - ts: Some(Timestamp::from(SystemTime::now())), - }), - msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), - })) { - Ok(_) => { - relayer_metrics - .increment_packets_forwarded(pubkey, batch.packets.len() as u64); - } - Err(TrySendError::Full(_)) => { - error!("packet channel is full for pubkey: {:?}", pubkey); - relayer_metrics - .increment_packets_dropped(pubkey, batch.packets.len() as u64); - } - Err(TrySendError::Closed(_)) => { - error!("channel is closed for pubkey: {:?}", pubkey); - failed_forwards.push(*pubkey); - break; - } + for (pubkey, sender) in &senders { + // try send because it's a bounded channel and we don't want to block if the channel is full + match sender.try_send(Ok(SubscribePacketsResponse { + header: Some(Header { + ts: Some(Timestamp::from(SystemTime::now())), + }), + msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), + })) { + Ok(_) => { + relayer_metrics + .increment_packets_forwarded(pubkey, batch.packets.len() as u64); + } + Err(TrySendError::Full(_)) => { + error!("packet channel is full for pubkey: {:?}", pubkey); + relayer_metrics + .increment_packets_dropped(pubkey, batch.packets.len() as u64); + } + Err(TrySendError::Closed(_)) => { + error!("channel is closed for pubkey: {:?}", pubkey); + failed_forwards.push(**pubkey); + break; } } } } - Ok(failed_forwards) } diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index cf983d40..b9fcc32d 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -222,12 +222,18 @@ struct Args { #[arg(long, env, default_value_t = false)] disable_mempool: bool, + /// Forward all received packets to all connected validators, + /// regardless of leader schedule. + /// Note: This is required to be true for Stake Weighted Quality of Service (SWQOS)! + #[arg(long, env, default_value_t = false)] + forward_all: bool, + /// Staked Nodes Overrides Path - /// "Provide path to a yaml file with custom overrides for stakes of specific + /// Provide path to a yaml file with custom overrides for stakes of specific /// identities. Overriding the amount of stake this validator considers as valid /// for other peers in network. The stake amount is used for calculating the /// number of QUIC streams permitted from the peer and vote packet sender stage. - /// Format of the file: `staked_map_id: {: }" + /// Format of the file: `staked_map_id: {: } #[arg(long, env)] staked_nodes_overrides: Option, } @@ -513,6 +519,7 @@ fn main() { ofac_addresses, address_lookup_table_cache, args.validator_packet_batch_size, + args.forward_all, ); let priv_key = fs::read(&args.signing_key_pem_path).unwrap_or_else(|_| {