Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce transmission duplication by reducing propagation to other validators #3493

Draft
wants to merge 1 commit into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,10 @@ impl<N: Network> Consensus<N> {
// If the transaction was recently seen, return early.
return Ok(());
}
// Create transmission ID.
let transmission_id = TransmissionID::Solution(solution_id, checksum);
// Check if the solution already exists in the ledger.
if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
if self.ledger.contains_transmission(&transmission_id)? {
bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
}
// Add the solution to the memory pool.
Expand Down Expand Up @@ -385,8 +387,10 @@ impl<N: Network> Consensus<N> {
// If the transaction was recently seen, return early.
return Ok(());
}
// Create transmission ID.
let transmission_id = TransmissionID::Transaction(transaction_id, checksum);
// Check if the transaction already exists in the ledger.
if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
if self.ledger.contains_transmission(&transmission_id)? {
bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
}
// Add the transaction to the memory pool.
Expand Down
14 changes: 10 additions & 4 deletions node/router/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use crate::{
use snarkos_node_sync_locators::BlockLocators;
use snarkos_node_tcp::protocols::Writing;
use snarkvm::prelude::Network;
use std::io;

use std::net::SocketAddr;
use rand::{rngs::OsRng, seq::IteratorRandom};
use std::{io, net::SocketAddr};
use tokio::sync::oneshot;

pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
Expand Down Expand Up @@ -113,7 +113,7 @@ pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
}

/// Sends the given message to every connected validator, excluding the sender and any specified IPs.
fn propagate_to_validators(&self, message: Message<N>, excluded_peers: &[SocketAddr]) {
fn propagate_to_validators(&self, message: Message<N>, excluded_peers: &[SocketAddr], num_validators: usize) {
// TODO (howardwu): Serialize large messages once only.
// // Perform ahead-of-time, non-blocking serialization just once for applicable objects.
// if let Message::UnconfirmedSolution(ref mut message) = message {
Expand All @@ -130,9 +130,15 @@ pub trait Outbound<N: Network>: Writing<Message = Message<N>> {
// }
// }

// Initialize an RNG.
let rng = &mut OsRng;

// Prepare the peers to send to.
let connected_validators = self.router().connected_validators();
let peers = connected_validators.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip));
let peers = connected_validators
.iter()
.filter(|peer_ip| !excluded_peers.contains(peer_ip))
.choose_multiple(rng, num_validators);

// Iterate through all validators that are not the sender and excluded validators.
for peer_ip in peers {
Expand Down
10 changes: 6 additions & 4 deletions node/src/validator/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use snarkvm::{

use std::{io, net::SocketAddr, time::Duration};

const TRANSMISSION_PROPAGATION_RATE: usize = 3;

impl<N: Network, C: ConsensusStorage<N>> P2P for Validator<N, C> {
/// Returns a reference to the TCP instance.
fn tcp(&self) -> &Tcp {
Expand Down Expand Up @@ -279,8 +281,8 @@ impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Validator<N, C> {
return true; // Maintain the connection.
}
let message = Message::UnconfirmedSolution(serialized);
// Propagate the "UnconfirmedSolution" to the connected validators.
self.propagate_to_validators(message, &[peer_ip]);
// Propagate the "UnconfirmedSolution" to connected validators.
self.propagate_to_validators(message, &[peer_ip], TRANSMISSION_PROPAGATION_RATE);
true
}

Expand All @@ -297,8 +299,8 @@ impl<N: Network, C: ConsensusStorage<N>> Inbound<N> for Validator<N, C> {
return true; // Maintain the connection.
}
let message = Message::UnconfirmedTransaction(serialized);
// Propagate the "UnconfirmedTransaction" to the connected validators.
self.propagate_to_validators(message, &[peer_ip]);
// Propagate the "UnconfirmedTransaction" to connected validators.
self.propagate_to_validators(message, &[peer_ip], TRANSMISSION_PROPAGATION_RATE);
true
}
}