From c8be6d9b2fb35b525814e366a8ace0d7d189134f Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Fri, 12 Apr 2024 11:52:43 -0500 Subject: [PATCH 1/8] add forward_all --- relayer/src/relayer.rs | 75 +++++++++++++++++++-------------- transaction-relayer/src/main.rs | 5 +++ 2 files changed, 49 insertions(+), 31 deletions(-) diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index ff1495c4..cce160d9 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -426,6 +426,7 @@ impl RelayerImpl { ofac_addresses: HashSet, address_lookup_table_cache: Arc>, validator_packet_batch_size: usize, + forward_all: bool, ) -> Self { const LEADER_LOOKAHEAD: u64 = 2; @@ -453,6 +454,7 @@ impl RelayerImpl { ofac_addresses, address_lookup_table_cache, validator_packet_batch_size, + forward_all, ); warn!("RelayerImpl thread exited with result {res:?}") }) @@ -488,6 +490,7 @@ impl RelayerImpl { ofac_addresses: HashSet, address_lookup_table_cache: Arc>, validator_packet_batch_size: usize, + forward_all: bool, ) -> RelayerResult<()> { let mut highest_slot = Slot::default(); @@ -518,7 +521,7 @@ impl RelayerImpl { }, recv(delay_packet_receiver) -> maybe_packet_batches => { let start = Instant::now(); - let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?; + let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?; Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics); let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64); }, @@ -636,6 +639,7 @@ impl RelayerImpl { ofac_addresses: &HashSet, address_lookup_table_cache: &Arc>, validator_packet_batch_size: usize, + forward_all: bool, ) -> RelayerResult> { let packet_batches = maybe_packet_batches?; @@ -680,41 +684,50 @@ impl RelayerImpl { let l_subscriptions = subscriptions.read().unwrap(); let mut failed_forwards = Vec::new(); - for pubkey in slot_leaders { - if let Some(sender) = l_subscriptions.get(pubkey) { - for batch in &proto_packet_batches { - // NOTE: this is important to avoid divide-by-0 inside the validator if packets - // get routed to sigverify under the assumption theres > 0 packets in the batch - if batch.packets.is_empty() { - continue; - } + for batch in &proto_packet_batches { + // NOTE: this is important to avoid divide-by-0 inside the validator if packets + // get routed to sigverify under the assumption theres > 0 packets in the batch + if batch.packets.is_empty() { + continue; + } - // try send because it's a bounded channel and we don't want to block if the channel is full - match sender.try_send(Ok(SubscribePacketsResponse { - header: Some(Header { - ts: Some(Timestamp::from(SystemTime::now())), - }), - msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), - })) { - Ok(_) => { - relayer_metrics - .increment_packets_forwarded(pubkey, batch.packets.len() as u64); - } - Err(TrySendError::Full(_)) => { - error!("packet channel is full for pubkey: {:?}", pubkey); - relayer_metrics - .increment_packets_dropped(pubkey, batch.packets.len() as u64); - } - Err(TrySendError::Closed(_)) => { - error!("channel is closed for pubkey: {:?}", pubkey); - failed_forwards.push(*pubkey); - break; - } + let senders = if forward_all { + l_subscriptions.iter().collect::>, + )>>() + } else { + slot_leaders + .iter() + .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) + .collect() + }; + + for (pubkey, sender) in senders { + // try send because it's a bounded channel and we don't want to block if the channel is full + match sender.try_send(Ok(SubscribePacketsResponse { + header: Some(Header { + ts: Some(Timestamp::from(SystemTime::now())), + }), + msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), + })) { + Ok(_) => { + relayer_metrics + .increment_packets_forwarded(pubkey, batch.packets.len() as u64); + } + Err(TrySendError::Full(_)) => { + error!("packet channel is full for pubkey: {:?}", pubkey); + relayer_metrics + .increment_packets_dropped(pubkey, batch.packets.len() as u64); + } + Err(TrySendError::Closed(_)) => { + error!("channel is closed for pubkey: {:?}", pubkey); + failed_forwards.push(*pubkey); + break; } } } } - Ok(failed_forwards) } diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index cf983d40..2b6c4d11 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -222,6 +222,10 @@ struct Args { #[arg(long, env, default_value_t = false)] disable_mempool: bool, + /// Forward all received packets, regardless of leader schedule + #[arg(long, env, default_value_t = false)] + forward_all: bool, + /// Staked Nodes Overrides Path /// "Provide path to a yaml file with custom overrides for stakes of specific /// identities. Overriding the amount of stake this validator considers as valid @@ -513,6 +517,7 @@ fn main() { ofac_addresses, address_lookup_table_cache, args.validator_packet_batch_size, + args.forward_all, ); let priv_key = fs::read(&args.signing_key_pem_path).unwrap_or_else(|_| { From 0bcd9730dde706ecb36a5c4580958207ed4ef1ed Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Fri, 12 Apr 2024 12:05:56 -0500 Subject: [PATCH 2/8] remove collect --- relayer/src/relayer.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index cce160d9..84a052f8 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -691,19 +691,13 @@ impl RelayerImpl { continue; } - let senders = if forward_all { - l_subscriptions.iter().collect::>, - )>>() + for (pubkey, sender) in if forward_all { + l_subscriptions.iter() } else { slot_leaders .iter() .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) - .collect() - }; - - for (pubkey, sender) in senders { + } { // try send because it's a bounded channel and we don't want to block if the channel is full match sender.try_send(Ok(SubscribePacketsResponse { header: Some(Header { From 75421245d0696344c49351de50b41e84778ce114 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Fri, 12 Apr 2024 12:10:10 -0500 Subject: [PATCH 3/8] hoist --- relayer/src/relayer.rs | 16 +++++++++------- transaction-relayer/src/main.rs | 3 ++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index 84a052f8..0d52c9bd 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -683,6 +683,14 @@ impl RelayerImpl { let l_subscriptions = subscriptions.read().unwrap(); + let senders = if forward_all { + l_subscriptions.iter() + } else { + slot_leaders + .iter() + .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) + }; + let mut failed_forwards = Vec::new(); for batch in &proto_packet_batches { // NOTE: this is important to avoid divide-by-0 inside the validator if packets @@ -691,13 +699,7 @@ impl RelayerImpl { continue; } - for (pubkey, sender) in if forward_all { - l_subscriptions.iter() - } else { - slot_leaders - .iter() - .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) - } { + for (pubkey, sender) in senders { // try send because it's a bounded channel and we don't want to block if the channel is full match sender.try_send(Ok(SubscribePacketsResponse { header: Some(Header { diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index 2b6c4d11..2c5838ce 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -222,7 +222,8 @@ struct Args { #[arg(long, env, default_value_t = false)] disable_mempool: bool, - /// Forward all received packets, regardless of leader schedule + /// Forward all received packets to all connected validators, + /// regardless of leader schedule #[arg(long, env, default_value_t = false)] forward_all: bool, From 09c27763057dace44d2b3dc9512922f0a3aa1a38 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Fri, 12 Apr 2024 12:47:22 -0500 Subject: [PATCH 4/8] restore collect --- relayer/src/relayer.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index 0d52c9bd..995bdfc5 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -684,11 +684,15 @@ impl RelayerImpl { let l_subscriptions = subscriptions.read().unwrap(); let senders = if forward_all { - l_subscriptions.iter() + l_subscriptions.iter().collect::>, + )>>() } else { slot_leaders .iter() .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) + .collect() }; let mut failed_forwards = Vec::new(); @@ -699,7 +703,7 @@ impl RelayerImpl { continue; } - for (pubkey, sender) in senders { + for (pubkey, sender) in &senders { // try send because it's a bounded channel and we don't want to block if the channel is full match sender.try_send(Ok(SubscribePacketsResponse { header: Some(Header { @@ -718,7 +722,7 @@ impl RelayerImpl { } Err(TrySendError::Closed(_)) => { error!("channel is closed for pubkey: {:?}", pubkey); - failed_forwards.push(*pubkey); + failed_forwards.push(**pubkey); break; } } From 9f243dae50f339903f40fc2a262cc53fe2fb2e1a Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Tue, 16 Apr 2024 21:05:01 -0500 Subject: [PATCH 5/8] update comment --- transaction-relayer/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index 2c5838ce..d94a6110 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -223,7 +223,8 @@ struct Args { disable_mempool: bool, /// Forward all received packets to all connected validators, - /// regardless of leader schedule + /// regardless of leader schedule. + /// Note: This is required to be true for Stake Weighted Quality of Service (SWQOS)! #[arg(long, env, default_value_t = false)] forward_all: bool, From 62e10650af82d1af42388c4be414191af96be557 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Tue, 16 Apr 2024 21:05:22 -0500 Subject: [PATCH 6/8] update comment --- transaction-relayer/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index d94a6110..8a57cc4f 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -229,11 +229,11 @@ struct Args { forward_all: bool, /// Staked Nodes Overrides Path - /// "Provide path to a yaml file with custom overrides for stakes of specific + /// `Provide path to a yaml file with custom overrides for stakes of specific /// identities. Overriding the amount of stake this validator considers as valid /// for other peers in network. The stake amount is used for calculating the /// number of QUIC streams permitted from the peer and vote packet sender stage. - /// Format of the file: `staked_map_id: {: }" + /// Format of the file: `staked_map_id: {: }` #[arg(long, env)] staked_nodes_overrides: Option, } From 43c19c86e9710add3c7c6eb1a989a766ba8f6a56 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Tue, 16 Apr 2024 22:22:42 -0500 Subject: [PATCH 7/8] update deps --- Cargo.lock | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 80a67279..ab3d9901 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,7 +747,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4114279215a005bc675e386011e594e1d9b800918cea18fcadadcce864a2046b" dependencies = [ "borsh-derive 0.10.3", - "hashbrown 0.13.2", + "hashbrown 0.11.2", ] [[package]] @@ -2068,9 +2068,9 @@ dependencies = [ [[package]] name = "h2" -version = "0.3.22" +version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d6250322ef6e60f93f9a2162799302cd6f68f79f6e5d85c8c16f14d1d958178" +checksum = "81fe527a889e1532da5c525686d96d4c2e74cdd345badf8dfef9f6b39dd5f5e8" dependencies = [ "bytes", "fnv", @@ -3422,9 +3422,9 @@ checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" [[package]] name = "openssl" -version = "0.10.59" +version = "0.10.64" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a257ad03cd8fb16ad4172fedf8094451e1af1c4b70097636ef2eac9a5f0cc33" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" dependencies = [ "bitflags 2.4.1", "cfg-if 1.0.0", @@ -3463,9 +3463,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.95" +version = "0.9.102" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a4130519a360279579c2053038317e40eff64d13fd3f004f9e1b72b8a6aaf9" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" dependencies = [ "cc", "libc", @@ -4728,9 +4728,9 @@ checksum = "24188a676b6ae68c3b2cb3a01be17fbf7240ce009799bb56d5b1409051e78fde" [[package]] name = "shlex" -version = "1.2.0" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7cee0529a6d40f580e7a5e6c495c8fbfe21b7b52795ed4bb5e62cdf92bc6380" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "signal-hook" @@ -8155,18 +8155,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.26" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e97e415490559a91254a2979b4829267a57d2fcd741a98eee8b722fb57289aa0" +checksum = "74d4d3961e53fa4c9a25a8637fc2bfaf2595b3d3ae34875568a5cf64787716be" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.26" +version = "0.7.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd7e48ccf166952882ca8bd778a43502c64f33bf94c12ebe2a7f08e5a0f6689f" +checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", From 99b9dab43b6a57c93544a260466eedc2041c3347 Mon Sep 17 00:00:00 2001 From: Jed <4679729+jedleggett@users.noreply.github.com> Date: Wed, 17 Apr 2024 01:49:55 -0500 Subject: [PATCH 8/8] fix comment --- transaction-relayer/src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index 8a57cc4f..b9fcc32d 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -229,11 +229,11 @@ struct Args { forward_all: bool, /// Staked Nodes Overrides Path - /// `Provide path to a yaml file with custom overrides for stakes of specific + /// Provide path to a yaml file with custom overrides for stakes of specific /// identities. Overriding the amount of stake this validator considers as valid /// for other peers in network. The stake amount is used for calculating the /// number of QUIC streams permitted from the peer and vote packet sender stage. - /// Format of the file: `staked_map_id: {: }` + /// Format of the file: `staked_map_id: {: } #[arg(long, env)] staked_nodes_overrides: Option, }