Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send transaction in batches #1910

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 21 additions & 11 deletions zilliqa/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub struct Consensus {
receipts_cache: HashMap<Hash, (TransactionReceipt, Vec<Address>)>,
receipts_cache_hash: Hash,
/// Actions that act on newly created blocks
transaction_pool: TransactionPool,
pub transaction_pool: TransactionPool,
/// Pending proposal. Gets created as soon as we become aware that we are leader for this view.
early_proposal: Option<EarlyProposal>,
/// Flag indicating that block creation should be postponed at least until empty_block_timeout is reached
Expand Down Expand Up @@ -1649,7 +1649,8 @@ impl Consensus {
// need those transactions again
for tx in opaque_transactions {
let account_nonce = self.state.get_account(tx.signer)?.nonce;
self.transaction_pool.insert_transaction(tx, account_nonce);
self.transaction_pool
.insert_transaction(tx, account_nonce, false);
}

// finalise the proposal
Expand All @@ -1668,11 +1669,15 @@ impl Consensus {
}

/// Insert transaction and add to early_proposal if possible.
pub fn handle_new_transaction(&mut self, txn: SignedTransaction) -> Result<TxAddResult> {
pub fn handle_new_transaction(
&mut self,
txn: SignedTransaction,
from_broadcast: bool,
) -> Result<TxAddResult> {
let Ok(verified) = txn.verify() else {
return Ok(TxAddResult::CannotVerifySignature);
};
let inserted = self.new_transaction(verified)?;
let inserted = self.new_transaction(verified, from_broadcast)?;
if inserted.was_added()
&& self.create_next_block_on_timeout
&& self.early_proposal.is_some()
Expand Down Expand Up @@ -1887,7 +1892,11 @@ impl Consensus {

/// Returns (flag, outcome).
/// flag is true if the transaction was newly added to the pool - ie. if it validated correctly and has not been seen before.
pub fn new_transaction(&mut self, txn: VerifiedTransaction) -> Result<TxAddResult> {
pub fn new_transaction(
&mut self,
txn: VerifiedTransaction,
from_broadcast: bool,
) -> Result<TxAddResult> {
if self.db.contains_transaction(&txn.hash)? {
debug!("Transaction {:?} already in mempool", txn.hash);
return Ok(TxAddResult::Duplicate(txn.hash));
Expand Down Expand Up @@ -1922,9 +1931,9 @@ impl Consensus {

let txn_hash = txn.hash;

let insert_result = self
.transaction_pool
.insert_transaction(txn, early_account.nonce);
let insert_result =
self.transaction_pool
.insert_transaction(txn, early_account.nonce, from_broadcast);
if insert_result.was_added() {
let _ = self.new_transaction_hashes.send(txn_hash);

Expand Down Expand Up @@ -2857,7 +2866,8 @@ impl Consensus {

for txn in existing_txns {
let account_nonce = self.state.get_account(txn.signer)?.nonce;
self.transaction_pool.insert_transaction(txn, account_nonce);
self.transaction_pool
.insert_transaction(txn, account_nonce, false);
}

// block transactions need to be removed from self.transactions and re-injected
Expand All @@ -2867,7 +2877,7 @@ impl Consensus {
// Insert this unwound transaction back into the transaction pool.
let account_nonce = self.state.get_account(orig_tx.signer)?.nonce;
self.transaction_pool
.insert_transaction(orig_tx, account_nonce);
.insert_transaction(orig_tx, account_nonce, false);
}
// then purge them all from the db, including receipts and indexes
self.db
Expand Down Expand Up @@ -3008,7 +3018,7 @@ impl Consensus {

let mut touched_addresses = vec![];
for (tx_index, txn) in verified_txns.iter().enumerate() {
self.new_transaction(txn.clone())?;
self.new_transaction(txn.clone(), false)?;
let tx_hash = txn.hash;
let mut inspector = TouchedAddressInspector::default();
let result = self
Expand Down
4 changes: 4 additions & 0 deletions zilliqa/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ pub enum ExternalMessage {
BlockResponse(BlockResponse),
ProcessProposal(ProcessProposal),
NewTransaction(SignedTransaction),
BatchedTransactions(Vec<SignedTransaction>),
/// An acknowledgement of the receipt of a message. Note this is only used as a response when the caller doesn't
/// require any data in the response.
Acknowledgement,
Expand Down Expand Up @@ -314,6 +315,9 @@ impl Display for ExternalMessage {
write!(f, "NewTransaction(Unable to verify txn due to: {:?})", err)
}
},
ExternalMessage::BatchedTransactions(txns) => {
write!(f, "BatchedTransactions(txns_count: {:?})", txns.len())
}
ExternalMessage::Acknowledgement => write!(f, "RequestResponse"),
}
}
Expand Down
37 changes: 25 additions & 12 deletions zilliqa/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,18 @@ impl Node {

pub fn handle_broadcast(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> {
debug!(%from, to = %self.peer_id, %message, "handling broadcast");
// We only expect `Proposal`s and `NewTransaction`s to be broadcast.
// We only expect `Proposal`s and `BatchedTransactions` to be broadcast.
match message {
ExternalMessage::Proposal(m) => {
self.handle_proposal(from, m)?;
}
ExternalMessage::NewTransaction(t) => {
// Don't process again txn sent by this node (it's already in the mempool)
if self.peer_id != from {
self.consensus.handle_new_transaction(t)?;
ExternalMessage::BatchedTransactions(transactions) => {
if self.peer_id == from {
return Ok(());
}
for txn in transactions {
let from_broadcast = true;
self.consensus.handle_new_transaction(txn, from_broadcast)?;
}
}
_ => {
Expand Down Expand Up @@ -390,7 +393,7 @@ impl Node {
};
let verified_tx = tx.verify()?;
trace!("Injecting intershard transaction {}", verified_tx.hash);
self.consensus.new_transaction(verified_tx)?;
self.consensus.new_transaction(verified_tx, false)?;
Ok(())
}

Expand All @@ -412,18 +415,28 @@ impl Node {
pub fn create_transaction(&mut self, txn: SignedTransaction) -> Result<(Hash, TxAddResult)> {
let hash = txn.calculate_hash();

info!(?hash, "seen new txn {:?}", txn);
debug!(?hash, "seen new txn {:?}", txn);

let result = self.consensus.handle_new_transaction(txn.clone())?;
if result.was_added() {
// TODO: Avoid redundant self-broadcast
self.message_sender
.broadcast_external_message(ExternalMessage::NewTransaction(txn))?;
let from_broadcast = false;
let result = self
.consensus
.handle_new_transaction(txn.clone(), from_broadcast)?;
if !result.was_added() {
debug!(?result, "Transaction cannot be added to mempool");
}

Ok((hash, result))
}

pub fn process_transactions_to_broadcast(&mut self) -> Result<()> {
let txns_to_broadcast = self.consensus.transaction_pool.pull_txns_to_broadcast()?;
if txns_to_broadcast.is_empty() {
return Ok(());
}
self.message_sender
.broadcast_external_message(ExternalMessage::BatchedTransactions(txns_to_broadcast))
}

pub fn number(&self) -> u64 {
self.consensus.head_block().header.number
}
Expand Down
18 changes: 13 additions & 5 deletions zilliqa/src/node_launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,11 @@ impl NodeLauncher {
return Err(anyhow!("Node already running!"));
}

let sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(sleep);
let consensus_sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(consensus_sleep);

let mempool_sleep = time::sleep(Duration::from_millis(5));
tokio::pin!(mempool_sleep);

self.node_launched = true;

Expand Down Expand Up @@ -269,7 +272,7 @@ impl NodeLauncher {
let (_source, _message) = message.expect("message stream should be infinite");
todo!("Local messages will need to be handled once cross-shard messaging is implemented");
}
() = &mut sleep => {
() = &mut consensus_sleep => {
let attributes = vec![
KeyValue::new(MESSAGING_OPERATION_NAME, "handle"),
KeyValue::new(MESSAGING_SYSTEM, "tokio_channel"),
Expand All @@ -281,7 +284,7 @@ impl NodeLauncher {
self.node.lock().unwrap().consensus.tick().unwrap();
// No messages for a while, so check if consensus wants to timeout
self.node.lock().unwrap().handle_timeout().unwrap();
sleep.as_mut().reset(Instant::now() + Duration::from_millis(500));
consensus_sleep.as_mut().reset(Instant::now() + Duration::from_millis(500));
messaging_process_duration.record(
start.elapsed().map_or(0.0, |d| d.as_secs_f64()),
&attributes,
Expand All @@ -290,7 +293,12 @@ impl NodeLauncher {
r = self.reset_timeout_receiver.next() => {
let sleep_time = r.expect("reset timeout stream should be infinite");
trace!(?sleep_time, "timeout reset");
sleep.as_mut().reset(Instant::now() + sleep_time);
consensus_sleep.as_mut().reset(Instant::now() + sleep_time);
},

() = &mut mempool_sleep => {
self.node.lock().unwrap().process_transactions_to_broadcast()?;
mempool_sleep.as_mut().reset(Instant::now() + Duration::from_millis(20));
shawn-zil marked this conversation as resolved.
Show resolved Hide resolved
},
}
}
Expand Down
Loading