diff --git a/relayer/src/relayer.rs b/relayer/src/relayer.rs index ff1495c4..cce160d9 100644 --- a/relayer/src/relayer.rs +++ b/relayer/src/relayer.rs @@ -426,6 +426,7 @@ impl RelayerImpl { ofac_addresses: HashSet<Pubkey>, address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>, validator_packet_batch_size: usize, + forward_all: bool, ) -> Self { const LEADER_LOOKAHEAD: u64 = 2; @@ -453,6 +454,7 @@ impl RelayerImpl { ofac_addresses, address_lookup_table_cache, validator_packet_batch_size, + forward_all, ); warn!("RelayerImpl thread exited with result {res:?}") }) @@ -488,6 +490,7 @@ impl RelayerImpl { ofac_addresses: HashSet<Pubkey>, address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>, validator_packet_batch_size: usize, + forward_all: bool, ) -> RelayerResult<()> { let mut highest_slot = Slot::default(); @@ -518,7 +521,7 @@ impl RelayerImpl { }, recv(delay_packet_receiver) -> maybe_packet_batches => { let start = Instant::now(); - let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?; + let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?; Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics); let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64); }, @@ -636,6 +639,7 @@ impl RelayerImpl { ofac_addresses: &HashSet<Pubkey>, address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>, validator_packet_batch_size: usize, + forward_all: bool, ) -> RelayerResult<Vec<Pubkey>> { let packet_batches = maybe_packet_batches?; @@ -680,41 +684,50 @@ impl RelayerImpl { let l_subscriptions = subscriptions.read().unwrap(); let mut failed_forwards = Vec::new(); - for pubkey in slot_leaders { - if let Some(sender) = l_subscriptions.get(pubkey) { - for batch in &proto_packet_batches { - // NOTE: this is important to avoid divide-by-0 inside the validator if packets - // get routed to sigverify under the assumption theres > 0 packets in the batch - if batch.packets.is_empty() { - continue; - } + for batch in &proto_packet_batches { + // NOTE: this is important to avoid divide-by-0 inside the validator if packets + // get routed to sigverify under the assumption theres > 0 packets in the batch + if batch.packets.is_empty() { + continue; + } - // try send because it's a bounded channel and we don't want to block if the channel is full - match sender.try_send(Ok(SubscribePacketsResponse { - header: Some(Header { - ts: Some(Timestamp::from(SystemTime::now())), - }), - msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), - })) { - Ok(_) => { - relayer_metrics - .increment_packets_forwarded(pubkey, batch.packets.len() as u64); - } - Err(TrySendError::Full(_)) => { - error!("packet channel is full for pubkey: {:?}", pubkey); - relayer_metrics - .increment_packets_dropped(pubkey, batch.packets.len() as u64); - } - Err(TrySendError::Closed(_)) => { - error!("channel is closed for pubkey: {:?}", pubkey); - failed_forwards.push(*pubkey); - break; - } + let senders = if forward_all { + l_subscriptions.iter().collect::<Vec<( + &Pubkey, + &TokioSender<Result<SubscribePacketsResponse, Status>>, + )>>() + } else { + slot_leaders + .iter() + .filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender))) + .collect() + }; + + for (pubkey, sender) in senders { + // try send because it's a bounded channel and we don't want to block if the channel is full + match sender.try_send(Ok(SubscribePacketsResponse { + header: Some(Header { + ts: Some(Timestamp::from(SystemTime::now())), + }), + msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())), + })) { + Ok(_) => { + relayer_metrics + .increment_packets_forwarded(pubkey, batch.packets.len() as u64); + } + Err(TrySendError::Full(_)) => { + error!("packet channel is full for pubkey: {:?}", pubkey); + relayer_metrics + .increment_packets_dropped(pubkey, batch.packets.len() as u64); + } + Err(TrySendError::Closed(_)) => { + error!("channel is closed for pubkey: {:?}", pubkey); + failed_forwards.push(*pubkey); + break; } } } } - Ok(failed_forwards) } diff --git a/transaction-relayer/src/main.rs b/transaction-relayer/src/main.rs index cf983d40..2b6c4d11 100644 --- a/transaction-relayer/src/main.rs +++ b/transaction-relayer/src/main.rs @@ -222,6 +222,10 @@ struct Args { #[arg(long, env, default_value_t = false)] disable_mempool: bool, + /// Forward all received packets, regardless of leader schedule + #[arg(long, env, default_value_t = false)] + forward_all: bool, + /// Staked Nodes Overrides Path /// "Provide path to a yaml file with custom overrides for stakes of specific /// identities. Overriding the amount of stake this validator considers as valid @@ -513,6 +517,7 @@ fn main() { ofac_addresses, address_lookup_table_cache, args.validator_packet_batch_size, + args.forward_all, ); let priv_key = fs::read(&args.signing_key_pem_path).unwrap_or_else(|_| {