Skip to content

Commit

Permalink
add forward_all
Browse files Browse the repository at this point in the history
  • Loading branch information
jedleggett committed Apr 12, 2024
1 parent 869f7a6 commit c8be6d9
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 31 deletions.
75 changes: 44 additions & 31 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ impl RelayerImpl {
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> Self {
const LEADER_LOOKAHEAD: u64 = 2;

Expand Down Expand Up @@ -453,6 +454,7 @@ impl RelayerImpl {
ofac_addresses,
address_lookup_table_cache,
validator_packet_batch_size,
forward_all,
);
warn!("RelayerImpl thread exited with result {res:?}")
})
Expand Down Expand Up @@ -488,6 +490,7 @@ impl RelayerImpl {
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> RelayerResult<()> {
let mut highest_slot = Slot::default();

Expand Down Expand Up @@ -518,7 +521,7 @@ impl RelayerImpl {
},
recv(delay_packet_receiver) -> maybe_packet_batches => {
let start = Instant::now();
let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?;
let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?;
Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64);
},
Expand Down Expand Up @@ -636,6 +639,7 @@ impl RelayerImpl {
ofac_addresses: &HashSet<Pubkey>,
address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> RelayerResult<Vec<Pubkey>> {
let packet_batches = maybe_packet_batches?;

Expand Down Expand Up @@ -680,41 +684,50 @@ impl RelayerImpl {
let l_subscriptions = subscriptions.read().unwrap();

let mut failed_forwards = Vec::new();
for pubkey in slot_leaders {
if let Some(sender) = l_subscriptions.get(pubkey) {
for batch in &proto_packet_batches {
// NOTE: this is important to avoid divide-by-0 inside the validator if packets
// get routed to sigverify under the assumption theres > 0 packets in the batch
if batch.packets.is_empty() {
continue;
}
for batch in &proto_packet_batches {
// NOTE: this is important to avoid divide-by-0 inside the validator if packets
// get routed to sigverify under the assumption theres > 0 packets in the batch
if batch.packets.is_empty() {
continue;
}

// try send because it's a bounded channel and we don't want to block if the channel is full
match sender.try_send(Ok(SubscribePacketsResponse {
header: Some(Header {
ts: Some(Timestamp::from(SystemTime::now())),
}),
msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())),
})) {
Ok(_) => {
relayer_metrics
.increment_packets_forwarded(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Full(_)) => {
error!("packet channel is full for pubkey: {:?}", pubkey);
relayer_metrics
.increment_packets_dropped(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Closed(_)) => {
error!("channel is closed for pubkey: {:?}", pubkey);
failed_forwards.push(*pubkey);
break;
}
let senders = if forward_all {
l_subscriptions.iter().collect::<Vec<(
&Pubkey,
&TokioSender<Result<SubscribePacketsResponse, Status>>,
)>>()
} else {
slot_leaders
.iter()
.filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender)))
.collect()
};

for (pubkey, sender) in senders {
// try send because it's a bounded channel and we don't want to block if the channel is full
match sender.try_send(Ok(SubscribePacketsResponse {
header: Some(Header {
ts: Some(Timestamp::from(SystemTime::now())),
}),
msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())),
})) {
Ok(_) => {
relayer_metrics
.increment_packets_forwarded(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Full(_)) => {
error!("packet channel is full for pubkey: {:?}", pubkey);
relayer_metrics
.increment_packets_dropped(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Closed(_)) => {
error!("channel is closed for pubkey: {:?}", pubkey);
failed_forwards.push(*pubkey);
break;
}
}
}
}

Ok(failed_forwards)
}

Expand Down
5 changes: 5 additions & 0 deletions transaction-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ struct Args {
#[arg(long, env, default_value_t = false)]
disable_mempool: bool,

/// Forward all received packets, regardless of leader schedule
#[arg(long, env, default_value_t = false)]
forward_all: bool,

/// Staked Nodes Overrides Path
/// "Provide path to a yaml file with custom overrides for stakes of specific
/// identities. Overriding the amount of stake this validator considers as valid
Expand Down Expand Up @@ -513,6 +517,7 @@ fn main() {
ofac_addresses,
address_lookup_table_cache,
args.validator_packet_batch_size,
args.forward_all,
);

let priv_key = fs::read(&args.signing_key_pem_path).unwrap_or_else(|_| {
Expand Down

0 comments on commit c8be6d9

Please sign in to comment.