Skip to content

Commit

Permalink
Add FORWARD_ALL Option (#125)
Browse files Browse the repository at this point in the history
  • Loading branch information
jedleggett authored Apr 17, 2024
1 parent b6058ff commit e8c2bb6
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 46 deletions.
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 44 additions & 31 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ impl RelayerImpl {
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> Self {
const LEADER_LOOKAHEAD: u64 = 2;

Expand Down Expand Up @@ -453,6 +454,7 @@ impl RelayerImpl {
ofac_addresses,
address_lookup_table_cache,
validator_packet_batch_size,
forward_all,
);
warn!("RelayerImpl thread exited with result {res:?}")
})
Expand Down Expand Up @@ -488,6 +490,7 @@ impl RelayerImpl {
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> RelayerResult<()> {
let mut highest_slot = Slot::default();

Expand Down Expand Up @@ -518,7 +521,7 @@ impl RelayerImpl {
},
recv(delay_packet_receiver) -> maybe_packet_batches => {
let start = Instant::now();
let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?;
let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?;
Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64);
},
Expand Down Expand Up @@ -636,6 +639,7 @@ impl RelayerImpl {
ofac_addresses: &HashSet<Pubkey>,
address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> RelayerResult<Vec<Pubkey>> {
let packet_batches = maybe_packet_batches?;

Expand Down Expand Up @@ -679,42 +683,51 @@ impl RelayerImpl {

let l_subscriptions = subscriptions.read().unwrap();

let senders = if forward_all {
l_subscriptions.iter().collect::<Vec<(
&Pubkey,
&TokioSender<Result<SubscribePacketsResponse, Status>>,
)>>()
} else {
slot_leaders
.iter()
.filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender)))
.collect()
};

let mut failed_forwards = Vec::new();
for pubkey in slot_leaders {
if let Some(sender) = l_subscriptions.get(pubkey) {
for batch in &proto_packet_batches {
// NOTE: this is important to avoid divide-by-0 inside the validator if packets
// get routed to sigverify under the assumption theres > 0 packets in the batch
if batch.packets.is_empty() {
continue;
}
for batch in &proto_packet_batches {
// NOTE: this is important to avoid divide-by-0 inside the validator if packets
// get routed to sigverify under the assumption theres > 0 packets in the batch
if batch.packets.is_empty() {
continue;
}

// try send because it's a bounded channel and we don't want to block if the channel is full
match sender.try_send(Ok(SubscribePacketsResponse {
header: Some(Header {
ts: Some(Timestamp::from(SystemTime::now())),
}),
msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())),
})) {
Ok(_) => {
relayer_metrics
.increment_packets_forwarded(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Full(_)) => {
error!("packet channel is full for pubkey: {:?}", pubkey);
relayer_metrics
.increment_packets_dropped(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Closed(_)) => {
error!("channel is closed for pubkey: {:?}", pubkey);
failed_forwards.push(*pubkey);
break;
}
for (pubkey, sender) in &senders {
// try send because it's a bounded channel and we don't want to block if the channel is full
match sender.try_send(Ok(SubscribePacketsResponse {
header: Some(Header {
ts: Some(Timestamp::from(SystemTime::now())),
}),
msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())),
})) {
Ok(_) => {
relayer_metrics
.increment_packets_forwarded(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Full(_)) => {
error!("packet channel is full for pubkey: {:?}", pubkey);
relayer_metrics
.increment_packets_dropped(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Closed(_)) => {
error!("channel is closed for pubkey: {:?}", pubkey);
failed_forwards.push(**pubkey);
break;
}
}
}
}

Ok(failed_forwards)
}

Expand Down
11 changes: 9 additions & 2 deletions transaction-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,18 @@ struct Args {
#[arg(long, env, default_value_t = false)]
disable_mempool: bool,

/// Forward all received packets to all connected validators,
/// regardless of leader schedule.
/// Note: This is required to be true for Stake Weighted Quality of Service (SWQOS)!
#[arg(long, env, default_value_t = false)]
forward_all: bool,

/// Staked Nodes Overrides Path
/// "Provide path to a yaml file with custom overrides for stakes of specific
/// Provide path to a yaml file with custom overrides for stakes of specific
/// identities. Overriding the amount of stake this validator considers as valid
/// for other peers in network. The stake amount is used for calculating the
/// number of QUIC streams permitted from the peer and vote packet sender stage.
/// Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}"
/// Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}
#[arg(long, env)]
staked_nodes_overrides: Option<PathBuf>,
}
Expand Down Expand Up @@ -513,6 +519,7 @@ fn main() {
ofac_addresses,
address_lookup_table_cache,
args.validator_packet_batch_size,
args.forward_all,
);

let priv_key = fs::read(&args.signing_key_pem_path).unwrap_or_else(|_| {
Expand Down

0 comments on commit e8c2bb6

Please sign in to comment.