Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FORWARD_ALL Option #125

Merged
merged 9 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

75 changes: 44 additions & 31 deletions relayer/src/relayer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ impl RelayerImpl {
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> Self {
const LEADER_LOOKAHEAD: u64 = 2;

Expand Down Expand Up @@ -453,6 +454,7 @@ impl RelayerImpl {
ofac_addresses,
address_lookup_table_cache,
validator_packet_batch_size,
forward_all,
);
warn!("RelayerImpl thread exited with result {res:?}")
})
Expand Down Expand Up @@ -488,6 +490,7 @@ impl RelayerImpl {
ofac_addresses: HashSet<Pubkey>,
address_lookup_table_cache: Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> RelayerResult<()> {
let mut highest_slot = Slot::default();

Expand Down Expand Up @@ -518,7 +521,7 @@ impl RelayerImpl {
},
recv(delay_packet_receiver) -> maybe_packet_batches => {
let start = Instant::now();
let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size)?;
let failed_forwards = Self::forward_packets(maybe_packet_batches, packet_subscriptions, &slot_leaders, &mut relayer_metrics, &ofac_addresses, &address_lookup_table_cache, validator_packet_batch_size, forward_all)?;
Self::drop_connections(failed_forwards, packet_subscriptions, &mut relayer_metrics);
let _ = relayer_metrics.crossbeam_delay_packet_receiver_processing_us.increment(start.elapsed().as_micros() as u64);
},
Expand Down Expand Up @@ -636,6 +639,7 @@ impl RelayerImpl {
ofac_addresses: &HashSet<Pubkey>,
address_lookup_table_cache: &Arc<DashMap<Pubkey, AddressLookupTableAccount>>,
validator_packet_batch_size: usize,
forward_all: bool,
) -> RelayerResult<Vec<Pubkey>> {
let packet_batches = maybe_packet_batches?;

Expand Down Expand Up @@ -679,42 +683,51 @@ impl RelayerImpl {

let l_subscriptions = subscriptions.read().unwrap();

let senders = if forward_all {
l_subscriptions.iter().collect::<Vec<(
&Pubkey,
&TokioSender<Result<SubscribePacketsResponse, Status>>,
)>>()
} else {
slot_leaders
.iter()
.filter_map(|pubkey| l_subscriptions.get(pubkey).map(|sender| (pubkey, sender)))
.collect()
};

let mut failed_forwards = Vec::new();
for pubkey in slot_leaders {
if let Some(sender) = l_subscriptions.get(pubkey) {
for batch in &proto_packet_batches {
// NOTE: this is important to avoid divide-by-0 inside the validator if packets
// get routed to sigverify under the assumption theres > 0 packets in the batch
if batch.packets.is_empty() {
continue;
}
for batch in &proto_packet_batches {
// NOTE: this is important to avoid divide-by-0 inside the validator if packets
// get routed to sigverify under the assumption theres > 0 packets in the batch
if batch.packets.is_empty() {
continue;
}

// try send because it's a bounded channel and we don't want to block if the channel is full
match sender.try_send(Ok(SubscribePacketsResponse {
header: Some(Header {
ts: Some(Timestamp::from(SystemTime::now())),
}),
msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())),
})) {
Ok(_) => {
relayer_metrics
.increment_packets_forwarded(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Full(_)) => {
error!("packet channel is full for pubkey: {:?}", pubkey);
relayer_metrics
.increment_packets_dropped(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Closed(_)) => {
error!("channel is closed for pubkey: {:?}", pubkey);
failed_forwards.push(*pubkey);
break;
}
for (pubkey, sender) in &senders {
// try send because it's a bounded channel and we don't want to block if the channel is full
match sender.try_send(Ok(SubscribePacketsResponse {
header: Some(Header {
ts: Some(Timestamp::from(SystemTime::now())),
}),
msg: Some(subscribe_packets_response::Msg::Batch(batch.clone())),
})) {
Ok(_) => {
relayer_metrics
.increment_packets_forwarded(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Full(_)) => {
error!("packet channel is full for pubkey: {:?}", pubkey);
relayer_metrics
.increment_packets_dropped(pubkey, batch.packets.len() as u64);
}
Err(TrySendError::Closed(_)) => {
error!("channel is closed for pubkey: {:?}", pubkey);
failed_forwards.push(**pubkey);
break;
}
}
}
}

Ok(failed_forwards)
}

Expand Down
11 changes: 9 additions & 2 deletions transaction-relayer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,18 @@ struct Args {
#[arg(long, env, default_value_t = false)]
disable_mempool: bool,

/// Forward all received packets to all connected validators,
/// regardless of leader schedule.
/// Note: This is required to be true for Stake Weighted Quality of Service (SWQOS)!
#[arg(long, env, default_value_t = false)]
forward_all: bool,

/// Staked Nodes Overrides Path
/// "Provide path to a yaml file with custom overrides for stakes of specific
/// `Provide path to a yaml file with custom overrides for stakes of specific
/// identities. Overriding the amount of stake this validator considers as valid
/// for other peers in network. The stake amount is used for calculating the
/// number of QUIC streams permitted from the peer and vote packet sender stage.
/// Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}"
/// Format of the file: `staked_map_id: {<pubkey>: <SOL stake amount>}`
#[arg(long, env)]
staked_nodes_overrides: Option<PathBuf>,
}
Expand Down Expand Up @@ -513,6 +519,7 @@ fn main() {
ofac_addresses,
address_lookup_table_cache,
args.validator_packet_batch_size,
args.forward_all,
);

let priv_key = fs::read(&args.signing_key_pem_path).unwrap_or_else(|_| {
Expand Down
Loading