diff --git a/node/consensus/src/lib.rs b/node/consensus/src/lib.rs index fbbff78f70..12752d7d8e 100644 --- a/node/consensus/src/lib.rs +++ b/node/consensus/src/lib.rs @@ -306,8 +306,10 @@ impl Consensus { // If the transaction was recently seen, return early. return Ok(()); } + // Create transmission ID. + let transmission_id = TransmissionID::Solution(solution_id, checksum); // Check if the solution already exists in the ledger. - if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? { + if self.ledger.contains_transmission(&transmission_id)? { bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed()); } // Add the solution to the memory pool. @@ -385,8 +387,10 @@ impl Consensus { // If the transaction was recently seen, return early. return Ok(()); } + // Create transmission ID. + let transmission_id = TransmissionID::Transaction(transaction_id, checksum); // Check if the transaction already exists in the ledger. - if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? { + if self.ledger.contains_transmission(&transmission_id)? { bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed()); } // Add the transaction to the memory pool. diff --git a/node/router/src/outbound.rs b/node/router/src/outbound.rs index 83218eebe5..ddfb1fb3d3 100644 --- a/node/router/src/outbound.rs +++ b/node/router/src/outbound.rs @@ -20,9 +20,9 @@ use crate::{ use snarkos_node_sync_locators::BlockLocators; use snarkos_node_tcp::protocols::Writing; use snarkvm::prelude::Network; -use std::io; -use std::net::SocketAddr; +use rand::{rngs::OsRng, seq::IteratorRandom}; +use std::{io, net::SocketAddr}; use tokio::sync::oneshot; pub trait Outbound: Writing> { @@ -113,7 +113,7 @@ pub trait Outbound: Writing> { } /// Sends the given message to every connected validator, excluding the sender and any specified IPs. - fn propagate_to_validators(&self, message: Message, excluded_peers: &[SocketAddr]) { + fn propagate_to_validators(&self, message: Message, excluded_peers: &[SocketAddr], num_validators: usize) { // TODO (howardwu): Serialize large messages once only. // // Perform ahead-of-time, non-blocking serialization just once for applicable objects. // if let Message::UnconfirmedSolution(ref mut message) = message { @@ -130,9 +130,15 @@ pub trait Outbound: Writing> { // } // } + // Initialize an RNG. + let rng = &mut OsRng; + // Prepare the peers to send to. let connected_validators = self.router().connected_validators(); - let peers = connected_validators.iter().filter(|peer_ip| !excluded_peers.contains(peer_ip)); + let peers = connected_validators + .iter() + .filter(|peer_ip| !excluded_peers.contains(peer_ip)) + .choose_multiple(rng, num_validators); // Iterate through all validators that are not the sender and excluded validators. for peer_ip in peers { diff --git a/node/src/validator/router.rs b/node/src/validator/router.rs index c9d1ab6d37..17c5ea30f4 100644 --- a/node/src/validator/router.rs +++ b/node/src/validator/router.rs @@ -33,6 +33,8 @@ use snarkvm::{ use std::{io, net::SocketAddr, time::Duration}; +const TRANSMISSION_PROPAGATION_RATE: usize = 3; + impl> P2P for Validator { /// Returns a reference to the TCP instance. fn tcp(&self) -> &Tcp { @@ -279,8 +281,8 @@ impl> Inbound for Validator { return true; // Maintain the connection. } let message = Message::UnconfirmedSolution(serialized); - // Propagate the "UnconfirmedSolution" to the connected validators. - self.propagate_to_validators(message, &[peer_ip]); + // Propagate the "UnconfirmedSolution" to connected validators. + self.propagate_to_validators(message, &[peer_ip], TRANSMISSION_PROPAGATION_RATE); true } @@ -297,8 +299,8 @@ impl> Inbound for Validator { return true; // Maintain the connection. } let message = Message::UnconfirmedTransaction(serialized); - // Propagate the "UnconfirmedTransaction" to the connected validators. - self.propagate_to_validators(message, &[peer_ip]); + // Propagate the "UnconfirmedTransaction" to connected validators. + self.propagate_to_validators(message, &[peer_ip], TRANSMISSION_PROPAGATION_RATE); true } }