Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send transaction in batches #1910

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions zilliqa/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub struct Consensus {
receipts_cache: HashMap<Hash, (TransactionReceipt, Vec<Address>)>,
receipts_cache_hash: Hash,
/// Actions that act on newly created blocks
transaction_pool: TransactionPool,
pub transaction_pool: TransactionPool,
/// Pending proposal. Gets created as soon as we become aware that we are leader for this view.
early_proposal: Option<EarlyProposal>,
/// Flag indicating that block creation should be postponed at least until empty_block_timeout is reached
Expand Down Expand Up @@ -1649,7 +1649,8 @@ impl Consensus {
// need those transactions again
for tx in opaque_transactions {
let account_nonce = self.state.get_account(tx.signer)?.nonce;
self.transaction_pool.insert_transaction(tx, account_nonce);
self.transaction_pool
.insert_transaction(tx, account_nonce, false);
}

// finalise the proposal
Expand All @@ -1668,11 +1669,15 @@ impl Consensus {
}

/// Insert transaction and add to early_proposal if possible.
pub fn handle_new_transaction(&mut self, txn: SignedTransaction) -> Result<TxAddResult> {
pub fn handle_new_transaction(
&mut self,
txn: SignedTransaction,
from_broadcast: bool,
) -> Result<TxAddResult> {
let Ok(verified) = txn.verify() else {
return Ok(TxAddResult::CannotVerifySignature);
};
let inserted = self.new_transaction(verified)?;
let inserted = self.new_transaction(verified, from_broadcast)?;
if inserted.was_added()
&& self.create_next_block_on_timeout
&& self.early_proposal.is_some()
Expand Down Expand Up @@ -1887,7 +1892,11 @@ impl Consensus {

/// Returns (flag, outcome).
/// flag is true if the transaction was newly added to the pool - ie. if it validated correctly and has not been seen before.
pub fn new_transaction(&mut self, txn: VerifiedTransaction) -> Result<TxAddResult> {
pub fn new_transaction(
&mut self,
txn: VerifiedTransaction,
from_broadcast: bool,
) -> Result<TxAddResult> {
if self.db.contains_transaction(&txn.hash)? {
debug!("Transaction {:?} already in mempool", txn.hash);
return Ok(TxAddResult::Duplicate(txn.hash));
Expand Down Expand Up @@ -1922,9 +1931,9 @@ impl Consensus {

let txn_hash = txn.hash;

let insert_result = self
.transaction_pool
.insert_transaction(txn, early_account.nonce);
let insert_result =
self.transaction_pool
.insert_transaction(txn, early_account.nonce, from_broadcast);
if insert_result.was_added() {
let _ = self.new_transaction_hashes.send(txn_hash);

Expand Down Expand Up @@ -2857,7 +2866,8 @@ impl Consensus {

for txn in existing_txns {
let account_nonce = self.state.get_account(txn.signer)?.nonce;
self.transaction_pool.insert_transaction(txn, account_nonce);
self.transaction_pool
.insert_transaction(txn, account_nonce, false);
}

// block transactions need to be removed from self.transactions and re-injected
Expand All @@ -2867,7 +2877,7 @@ impl Consensus {
// Insert this unwound transaction back into the transaction pool.
let account_nonce = self.state.get_account(orig_tx.signer)?.nonce;
self.transaction_pool
.insert_transaction(orig_tx, account_nonce);
.insert_transaction(orig_tx, account_nonce, false);
}
// then purge them all from the db, including receipts and indexes
self.db
Expand Down Expand Up @@ -3008,7 +3018,7 @@ impl Consensus {

let mut touched_addresses = vec![];
for (tx_index, txn) in verified_txns.iter().enumerate() {
self.new_transaction(txn.clone())?;
self.new_transaction(txn.clone(), false)?;
let tx_hash = txn.hash;
let mut inspector = TouchedAddressInspector::default();
let result = self
Expand Down
4 changes: 4 additions & 0 deletions zilliqa/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub enum ExternalMessage {
BlockResponse(BlockResponse),
ProcessProposal(ProcessProposal),
NewTransaction(SignedTransaction),
BatchedTransactions(Vec<SignedTransaction>),
/// An acknowledgement of the receipt of a message. Note this is only used as a response when the caller doesn't
/// require any data in the response.
Acknowledgement,
Expand Down Expand Up @@ -314,6 +315,9 @@ impl Display for ExternalMessage {
write!(f, "NewTransaction(Unable to verify txn due to: {:?})", err)
}
},
ExternalMessage::BatchedTransactions(txns) => {
write!(f, "BatchedTransactions(txns_count: {:?})", txns.len())
}
ExternalMessage::Acknowledgement => write!(f, "RequestResponse"),
}
}
Expand Down
35 changes: 24 additions & 11 deletions zilliqa/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,13 +208,16 @@ impl Node {

pub fn handle_broadcast(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> {
debug!(%from, to = %self.peer_id, %message, "handling broadcast");
// We only expect `NewTransaction`s to be broadcast.
// We only expect `BatchedTransactions`s to be broadcast.
// `Proposals` are re-routed to `handle_request()`.
match message {
ExternalMessage::NewTransaction(t) => {
// Don't process again txn sent by this node (it's already in the mempool)
if self.peer_id != from {
self.consensus.handle_new_transaction(t)?;
ExternalMessage::BatchedTransactions(transactions) => {
if self.peer_id == from {
return Ok(());
}
for txn in transactions {
let from_broadcast = true;
self.consensus.handle_new_transaction(txn, from_broadcast)?;
}
}
_ => {
Expand Down Expand Up @@ -388,7 +391,7 @@ impl Node {
};
let verified_tx = tx.verify()?;
trace!("Injecting intershard transaction {}", verified_tx.hash);
self.consensus.new_transaction(verified_tx)?;
self.consensus.new_transaction(verified_tx, false)?;
Ok(())
}

Expand All @@ -412,16 +415,26 @@ impl Node {

info!(?hash, "seen new txn {:?}", txn);

let result = self.consensus.handle_new_transaction(txn.clone())?;
if result.was_added() {
// TODO: Avoid redundant self-broadcast
self.message_sender
.broadcast_external_message(ExternalMessage::NewTransaction(txn))?;
let from_broadcast = false;
let result = self
.consensus
.handle_new_transaction(txn.clone(), from_broadcast)?;
if !result.was_added() {
debug!(?result, "Transaction cannot be added to mempool");
}

Ok((hash, result))
}

pub fn process_transactions_to_broadcast(&mut self) -> Result<()> {
let txns_to_broadcast = self.consensus.transaction_pool.pull_txns_to_broadcast()?;
if txns_to_broadcast.is_empty() {
return Ok(());
}
self.message_sender
.broadcast_external_message(ExternalMessage::BatchedTransactions(txns_to_broadcast))
}

pub fn number(&self) -> u64 {
self.consensus.head_block().header.number
}
Expand Down
18 changes: 13 additions & 5 deletions zilliqa/src/node_launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ impl NodeLauncher {
return Err(anyhow!("Node already running!"));
}

let sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(sleep);
let consensus_sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(consensus_sleep);

let mempool_sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(mempool_sleep);

self.node_launched = true;

Expand Down Expand Up @@ -269,7 +272,7 @@ impl NodeLauncher {
let (_source, _message) = message.expect("message stream should be infinite");
todo!("Local messages will need to be handled once cross-shard messaging is implemented");
}
() = &mut sleep => {
() = &mut consensus_sleep => {
let attributes = vec![
KeyValue::new(MESSAGING_OPERATION_NAME, "handle"),
KeyValue::new(MESSAGING_SYSTEM, "tokio_channel"),
Expand All @@ -281,7 +284,7 @@ impl NodeLauncher {
self.node.lock().unwrap().consensus.tick().unwrap();
// No messages for a while, so check if consensus wants to timeout
self.node.lock().unwrap().handle_timeout().unwrap();
sleep.as_mut().reset(Instant::now() + Duration::from_millis(500));
consensus_sleep.as_mut().reset(Instant::now() + Duration::from_millis(500));
messaging_process_duration.record(
start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
&attributes,
Expand All @@ -290,7 +293,12 @@ impl NodeLauncher {
r = self.reset_timeout_receiver.next() => {
let sleep_time = r.expect("reset timeout stream should be infinite");
trace!(?sleep_time, "timeout reset");
sleep.as_mut().reset(Instant::now() + sleep_time);
consensus_sleep.as_mut().reset(Instant::now() + sleep_time);
},

() = &mut mempool_sleep => {
self.node.lock().unwrap().process_transactions_to_broadcast()?;
mempool_sleep.as_mut().reset(Instant::now() + Duration::from_millis(100));
},
}
}
Expand Down
73 changes: 52 additions & 21 deletions zilliqa/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{
cmp::Ordering,
collections::{BTreeMap, BTreeSet, HashMap},
cmp::{min, Ordering},
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
};

use alloy::primitives::Address;
Expand All @@ -15,6 +15,7 @@ use crate::{

/// The result of trying to add a transaction to the mempool. The argument is
/// a human-readable string to be returned to the user.
#[derive(Debug)]
pub enum TxAddResult {
/// Transaction was successfully added to the mempool
AddedToMempool,
Expand Down Expand Up @@ -88,6 +89,8 @@ pub struct TransactionPool {
/// Keeps transactions sorted by gas_price, each gas_price index can contain more than one txn
/// These are candidates to be included in the next block
gas_index: GasCollection,
/// Keeps transactions created at this node that will be broadcast
transactions_to_broadcast: VecDeque<SignedTransaction>,
}

/// A wrapper for (gas price, sender, nonce), stored in the `ready` heap of [TransactionPool].
Expand Down Expand Up @@ -256,6 +259,7 @@ impl TransactionPool {
&mut self,
txn: VerifiedTransaction,
account_nonce: u64,
from_broadcast: bool,
) -> TxAddResult {
if txn.tx.nonce().is_some_and(|n| n < account_nonce) {
debug!("Nonce is too low. Txn hash: {:?}, from: {:?}, nonce: {:?}, account nonce: {account_nonce}", txn.hash, txn.signer, txn.tx.nonce());
Expand Down Expand Up @@ -289,14 +293,41 @@ impl TransactionPool {
Self::add_to_gas_index(&mut self.gas_index, &txn);
}

// If this is a transaction created at this node, add it to broadcast vector
if !from_broadcast {
self.store_broadcast_txn(txn.tx.clone());
}

debug!("Txn added to mempool. Hash: {:?}, from: {:?}, nonce: {:?}, account nonce: {account_nonce}", txn.hash, txn.signer, txn.tx.nonce());

// Finally we insert it into the tx store and the hash reverse-index
self.hash_to_index.insert(txn.hash, txn.mempool_index());
self.transactions.insert(txn.mempool_index(), txn);

TxAddResult::AddedToMempool
}

fn store_broadcast_txn(&mut self, txn: SignedTransaction) {
self.transactions_to_broadcast.push_back(txn);
}

pub fn pull_txns_to_broadcast(&mut self) -> Result<Vec<SignedTransaction>> {
const MAX_BATCH_SIZE: usize = 100;
shawn-zil marked this conversation as resolved.
Show resolved Hide resolved

if self.transactions_to_broadcast.is_empty() {
return Ok(Vec::new());
}

let max_take = min(self.transactions_to_broadcast.len(), MAX_BATCH_SIZE);

let ret_vec = self
.transactions_to_broadcast
.drain(..max_take)
.collect::<Vec<_>>();

Ok(ret_vec)
}

fn remove_from_gas_index(gas_index: &mut GasCollection, txn: &VerifiedTransaction) {
let gas_key = txn.tx.gas_price_per_evm_gas();

Expand Down Expand Up @@ -538,13 +569,13 @@ mod tests {
let mut state = get_in_memory_state()?;
create_acc(&mut state, from, 100, 0)?;

pool.insert_transaction(transaction(from, 1, 1), 0);
pool.insert_transaction(transaction(from, 1, 1), 0, false);

let tx = pool.best_transaction(&state)?;
assert_eq!(tx, None);

pool.insert_transaction(transaction(from, 2, 2), 0);
pool.insert_transaction(transaction(from, 0, 0), 0);
pool.insert_transaction(transaction(from, 2, 2), 0, false);
pool.insert_transaction(transaction(from, 0, 0), 0, false);

let tx = pool.best_transaction(&state)?.unwrap().clone();
assert_eq!(tx.tx.nonce().unwrap(), 0);
Expand Down Expand Up @@ -588,7 +619,7 @@ mod tests {
nonces.shuffle(&mut rng);

for i in 0..COUNT {
pool.insert_transaction(transaction(from, nonces[i as usize] as u8, 3), 0);
pool.insert_transaction(transaction(from, nonces[i as usize] as u8, 3), 0, false);
}

for i in 0..COUNT {
Expand All @@ -615,11 +646,11 @@ mod tests {
create_acc(&mut state, from2, 100, 0)?;
create_acc(&mut state, from3, 100, 0)?;

pool.insert_transaction(intershard_transaction(0, 0, 1), 0);
pool.insert_transaction(transaction(from1, 0, 2), 0);
pool.insert_transaction(transaction(from2, 0, 3), 0);
pool.insert_transaction(transaction(from3, 0, 0), 0);
pool.insert_transaction(intershard_transaction(0, 1, 5), 0);
pool.insert_transaction(intershard_transaction(0, 0, 1), 0, false);
pool.insert_transaction(transaction(from1, 0, 2), 0, false);
pool.insert_transaction(transaction(from2, 0, 3), 0, false);
pool.insert_transaction(transaction(from3, 0, 0), 0, false);
pool.insert_transaction(intershard_transaction(0, 1, 5), 0, false);
assert_eq!(pool.transactions.len(), 5);

let tx = pool.best_transaction(&state)?.unwrap().clone();
Expand Down Expand Up @@ -654,8 +685,8 @@ mod tests {
let mut state = get_in_memory_state()?;
create_acc(&mut state, from, 100, 0)?;

pool.insert_transaction(transaction(from, 0, 0), 0);
pool.insert_transaction(transaction(from, 1, 0), 0);
pool.insert_transaction(transaction(from, 0, 0), 0, false);
pool.insert_transaction(transaction(from, 1, 0), 0, false);

pool.mark_executed(&transaction(from, 0, 0));
state.mutate_account(from, |acc| {
Expand All @@ -678,8 +709,8 @@ mod tests {
let mut state = get_in_memory_state()?;
create_acc(&mut state, from, 100, 0)?;

pool.insert_transaction(transaction(from, 0, 1), 0);
pool.insert_transaction(transaction(from, 1, 200), 0);
pool.insert_transaction(transaction(from, 0, 1), 0, false);
pool.insert_transaction(transaction(from, 1, 200), 0, false);

assert_eq!(
pool.best_transaction(&state)?.unwrap().tx.nonce().unwrap(),
Expand Down Expand Up @@ -714,12 +745,12 @@ mod tests {
let mut state = get_in_memory_state()?;
create_acc(&mut state, from, 100, 0)?;

pool.insert_transaction(intershard_transaction(0, 0, 100), 0);
pool.insert_transaction(transaction(from, 0, 1), 0);
pool.insert_transaction(transaction(from, 1, 1), 1);
pool.insert_transaction(transaction(from, 2, 1), 2);
pool.insert_transaction(transaction(from, 3, 200), 3);
pool.insert_transaction(transaction(from, 10, 1), 3);
pool.insert_transaction(intershard_transaction(0, 0, 100), 0, false);
pool.insert_transaction(transaction(from, 0, 1), 0, false);
pool.insert_transaction(transaction(from, 1, 1), 1, false);
pool.insert_transaction(transaction(from, 2, 1), 2, false);
pool.insert_transaction(transaction(from, 3, 200), 3, false);
pool.insert_transaction(transaction(from, 10, 1), 3, false);

let content = pool.preview_content(&state)?;

Expand Down
Loading
Loading