diff --git a/zilliqa/benches/it.rs b/zilliqa/benches/it.rs index 75d963087..e7a395c01 100644 --- a/zilliqa/benches/it.rs +++ b/zilliqa/benches/it.rs @@ -256,9 +256,9 @@ pub fn produce_full(crit: &mut Criterion) { }) .collect(); for txn in txns { - let result = big.new_transaction(txn.clone()).unwrap(); + let result = big.new_transaction(txn.clone(), false).unwrap(); assert!(result.was_added()); - let result = tiny.new_transaction(txn).unwrap(); + let result = tiny.new_transaction(txn, false).unwrap(); assert!(result.was_added()); } diff --git a/zilliqa/src/api/eth.rs b/zilliqa/src/api/eth.rs index 6fda14fbd..c28260d41 100644 --- a/zilliqa/src/api/eth.rs +++ b/zilliqa/src/api/eth.rs @@ -743,6 +743,8 @@ fn send_raw_transaction(params: Params, node: &Arc>) -> Result( + RPCErrorCode::RpcVerifyRejected as i32, + "signature", + None, + ))? + }; + let (transaction_hash, result) = node.create_transaction(transaction)?; let info = match result { TxAddResult::AddedToMempool => Ok("Txn processed".to_string()), TxAddResult::Duplicate(_) => Ok("Txn already present".to_string()), diff --git a/zilliqa/src/consensus.rs b/zilliqa/src/consensus.rs index a37cc7789..479325e8a 100644 --- a/zilliqa/src/consensus.rs +++ b/zilliqa/src/consensus.rs @@ -164,7 +164,7 @@ pub struct Consensus { receipts_cache: HashMap)>, receipts_cache_hash: Hash, /// Actions that act on newly created blocks - transaction_pool: TransactionPool, + pub transaction_pool: TransactionPool, /// Pending proposal. Gets created as soon as we become aware that we are leader for this view. early_proposal: Option, /// Flag indicating that block creation should be postponed at least until empty_block_timeout is reached @@ -634,9 +634,10 @@ impl Consensus { let head_block = self.head_block(); let mut view = self.get_view()?; - trace!( + info!( block_view = block.view(), block_number = block.number(), + txns = transactions.len(), "handling block proposal {}", block.hash() ); @@ -1646,7 +1647,8 @@ impl Consensus { // need those transactions again for tx in opaque_transactions { let account_nonce = self.state.get_account(tx.signer)?.nonce; - self.transaction_pool.insert_transaction(tx, account_nonce); + self.transaction_pool + .insert_transaction(tx, account_nonce, false); } // finalise the proposal @@ -1665,11 +1667,12 @@ impl Consensus { } /// Insert transaction and add to early_proposal if possible. - pub fn handle_new_transaction(&mut self, txn: SignedTransaction) -> Result { - let Ok(verified) = txn.verify() else { - return Ok(TxAddResult::CannotVerifySignature); - }; - let inserted = self.new_transaction(verified)?; + pub fn handle_new_transaction( + &mut self, + verified: VerifiedTransaction, + from_broadcast: bool, + ) -> Result { + let inserted = self.new_transaction(verified, from_broadcast)?; if inserted.was_added() && self.create_next_block_on_timeout && self.early_proposal.is_some() @@ -1884,7 +1887,11 @@ impl Consensus { /// Returns (flag, outcome). /// flag is true if the transaction was newly added to the pool - ie. if it validated correctly and has not been seen before. - pub fn new_transaction(&mut self, txn: VerifiedTransaction) -> Result { + pub fn new_transaction( + &mut self, + txn: VerifiedTransaction, + from_broadcast: bool, + ) -> Result { if self.db.contains_transaction(&txn.hash)? { debug!("Transaction {:?} already in mempool", txn.hash); return Ok(TxAddResult::Duplicate(txn.hash)); @@ -1919,9 +1926,9 @@ impl Consensus { let txn_hash = txn.hash; - let insert_result = self - .transaction_pool - .insert_transaction(txn, early_account.nonce); + let insert_result = + self.transaction_pool + .insert_transaction(txn, early_account.nonce, from_broadcast); if insert_result.was_added() { let _ = self.new_transaction_hashes.send(txn_hash); @@ -2854,7 +2861,8 @@ impl Consensus { for txn in existing_txns { let account_nonce = self.state.get_account(txn.signer)?.nonce; - self.transaction_pool.insert_transaction(txn, account_nonce); + self.transaction_pool + .insert_transaction(txn, account_nonce, false); } // block transactions need to be removed from self.transactions and re-injected @@ -2864,7 +2872,7 @@ impl Consensus { // Insert this unwound transaction back into the transaction pool. let account_nonce = self.state.get_account(orig_tx.signer)?.nonce; self.transaction_pool - .insert_transaction(orig_tx, account_nonce); + .insert_transaction(orig_tx, account_nonce, false); } // then purge them all from the db, including receipts and indexes self.db @@ -3005,7 +3013,7 @@ impl Consensus { let mut touched_addresses = vec![]; for (tx_index, txn) in verified_txns.iter().enumerate() { - self.new_transaction(txn.clone())?; + self.new_transaction(txn.clone(), false)?; let tx_hash = txn.hash; let mut inspector = TouchedAddressInspector::default(); let result = self diff --git a/zilliqa/src/db.rs b/zilliqa/src/db.rs index 0ae2ba8bd..28924405e 100644 --- a/zilliqa/src/db.rs +++ b/zilliqa/src/db.rs @@ -232,6 +232,9 @@ impl Db { connection.pragma_update(None, "cache_size", (1 << 28) / page_size)?; let cache_size: i32 = connection.pragma_query_value(None, "cache_size", |r| r.get(0))?; + let mmap_size = 268435456; + connection.pragma_update(None, "mmap_size", mmap_size)?; + tracing::info!( ?journal_mode, ?journal_size_limit, @@ -239,6 +242,7 @@ impl Db { ?temp_store, ?page_size, ?cache_size, + ?mmap_size, "PRAGMA" ); diff --git a/zilliqa/src/exec.rs b/zilliqa/src/exec.rs index 40f60a69c..f3f58db66 100644 --- a/zilliqa/src/exec.rs +++ b/zilliqa/src/exec.rs @@ -670,7 +670,7 @@ impl State { ) -> Result { let hash = txn.hash; let from_addr = txn.signer; - info!(?hash, ?txn, "executing txn"); + debug!(?hash, ?txn, "executing txn"); let blessed = BLESSED_TRANSACTIONS.contains(&hash); diff --git a/zilliqa/src/message.rs b/zilliqa/src/message.rs index 40e8b3ae8..49ea73492 100644 --- a/zilliqa/src/message.rs +++ b/zilliqa/src/message.rs @@ -247,6 +247,7 @@ pub enum ExternalMessage { BlockResponse(BlockResponse), ProcessProposal(ProcessProposal), NewTransaction(SignedTransaction), + BatchedTransactions(Vec), /// An acknowledgement of the receipt of a message. Note this is only used as a response when the caller doesn't /// require any data in the response. Acknowledgement, @@ -314,6 +315,9 @@ impl Display for ExternalMessage { write!(f, "NewTransaction(Unable to verify txn due to: {:?})", err) } }, + ExternalMessage::BatchedTransactions(txns) => { + write!(f, "BatchedTransactions(txns_count: {:?})", txns.len()) + } ExternalMessage::Acknowledgement => write!(f, "RequestResponse"), } } diff --git a/zilliqa/src/node.rs b/zilliqa/src/node.rs index a090b893d..f83a4bdcf 100644 --- a/zilliqa/src/node.rs +++ b/zilliqa/src/node.rs @@ -208,20 +208,20 @@ impl Node { pub fn handle_broadcast(&mut self, from: PeerId, message: ExternalMessage) -> Result<()> { debug!(%from, to = %self.peer_id, %message, "handling broadcast"); - // We only expect `NewTransaction`s to be broadcast. // `Proposals` are re-routed to `handle_request()`. - match message { - ExternalMessage::NewTransaction(t) => { - // Don't process again txn sent by this node (it's already in the mempool) - if self.peer_id != from { - self.consensus.handle_new_transaction(t)?; - } - } - _ => { - warn!("unexpected message type"); - } - } + warn!("unexpected message type"); + + Ok(()) + } + pub fn handle_broadcasted_transactions( + &mut self, + transactions: Vec, + ) -> Result<()> { + for txn in transactions { + let from_broadcast = true; + self.consensus.handle_new_transaction(txn, from_broadcast)?; + } Ok(()) } @@ -388,7 +388,7 @@ impl Node { }; let verified_tx = tx.verify()?; trace!("Injecting intershard transaction {}", verified_tx.hash); - self.consensus.new_transaction(verified_tx)?; + self.consensus.new_transaction(verified_tx, false)?; Ok(()) } @@ -407,21 +407,29 @@ impl Node { Ok(false) } - pub fn create_transaction(&mut self, txn: SignedTransaction) -> Result<(Hash, TxAddResult)> { - let hash = txn.calculate_hash(); + pub fn create_transaction(&mut self, txn: VerifiedTransaction) -> Result<(Hash, TxAddResult)> { + let hash = txn.hash; - info!(?hash, "seen new txn {:?}", txn); + debug!(?hash, "seen new txn {:?}", txn); - let result = self.consensus.handle_new_transaction(txn.clone())?; - if result.was_added() { - // TODO: Avoid redundant self-broadcast - self.message_sender - .broadcast_external_message(ExternalMessage::NewTransaction(txn))?; + let from_broadcast = false; + let result = self.consensus.handle_new_transaction(txn, from_broadcast)?; + if !result.was_added() { + debug!(?result, "Transaction cannot be added to mempool"); } Ok((hash, result)) } + pub fn process_transactions_to_broadcast(&mut self) -> Result<()> { + let txns_to_broadcast = self.consensus.transaction_pool.pull_txns_to_broadcast()?; + if txns_to_broadcast.is_empty() { + return Ok(()); + } + self.message_sender + .broadcast_external_message(ExternalMessage::BatchedTransactions(txns_to_broadcast)) + } + pub fn number(&self) -> u64 { self.consensus.head_block().header.number } diff --git a/zilliqa/src/node_launcher.rs b/zilliqa/src/node_launcher.rs index c43b734a2..a39cd1782 100644 --- a/zilliqa/src/node_launcher.rs +++ b/zilliqa/src/node_launcher.rs @@ -180,8 +180,11 @@ impl NodeLauncher { return Err(anyhow!("Node already running!")); } - let sleep = time::sleep(Duration::from_millis(5)); - tokio::pin!(sleep); + let consensus_sleep = time::sleep(Duration::from_millis(5)); + tokio::pin!(consensus_sleep); + + let mempool_sleep = time::sleep(Duration::from_millis(5)); + tokio::pin!(mempool_sleep); self.node_launched = true; @@ -202,7 +205,20 @@ impl NodeLauncher { ]; let start = SystemTime::now(); - if let Err(e) = self.node.lock().unwrap().handle_broadcast(source, message) { + if let ExternalMessage::BatchedTransactions(transactions) = message { + let my_peer_id = self.node.lock().unwrap().consensus.peer_id(); + + if source == my_peer_id { + continue; + } + let mut verified = Vec::new(); + for txn in transactions { + let txn = txn.verify()?; + verified.push(txn); + } + self.node.lock().unwrap().handle_broadcasted_transactions(verified)?; + } + else if let Err(e) = self.node.lock().unwrap().handle_broadcast(source, message) { attributes.push(KeyValue::new(ERROR_TYPE, "process-error")); error!("Failed to process broadcast message: {e}"); } @@ -269,7 +285,7 @@ impl NodeLauncher { let (_source, _message) = message.expect("message stream should be infinite"); todo!("Local messages will need to be handled once cross-shard messaging is implemented"); } - () = &mut sleep => { + () = &mut consensus_sleep => { let attributes = vec![ KeyValue::new(MESSAGING_OPERATION_NAME, "handle"), KeyValue::new(MESSAGING_SYSTEM, "tokio_channel"), @@ -281,7 +297,7 @@ impl NodeLauncher { self.node.lock().unwrap().consensus.tick().unwrap(); // No messages for a while, so check if consensus wants to timeout self.node.lock().unwrap().handle_timeout().unwrap(); - sleep.as_mut().reset(Instant::now() + Duration::from_millis(500)); + consensus_sleep.as_mut().reset(Instant::now() + Duration::from_millis(500)); messaging_process_duration.record( start.elapsed().map_or(0.0, |d| d.as_secs_f64()), &attributes, @@ -290,7 +306,12 @@ impl NodeLauncher { r = self.reset_timeout_receiver.next() => { let sleep_time = r.expect("reset timeout stream should be infinite"); trace!(?sleep_time, "timeout reset"); - sleep.as_mut().reset(Instant::now() + sleep_time); + consensus_sleep.as_mut().reset(Instant::now() + sleep_time); + }, + + () = &mut mempool_sleep => { + self.node.lock().unwrap().process_transactions_to_broadcast()?; + mempool_sleep.as_mut().reset(Instant::now() + Duration::from_millis(100)); }, } } diff --git a/zilliqa/src/pool.rs b/zilliqa/src/pool.rs index 9280a153c..e754eb9bf 100644 --- a/zilliqa/src/pool.rs +++ b/zilliqa/src/pool.rs @@ -1,6 +1,6 @@ use std::{ - cmp::Ordering, - collections::{BTreeMap, BTreeSet, HashMap}, + cmp::{min, Ordering}, + collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, }; use alloy::primitives::Address; @@ -89,6 +89,8 @@ pub struct TransactionPool { /// Keeps transactions sorted by gas_price, each gas_price index can contain more than one txn /// These are candidates to be included in the next block gas_index: GasCollection, + /// Keeps transactions created at this node that will be broadcast + transactions_to_broadcast: VecDeque, } /// A wrapper for (gas price, sender, nonce), stored in the `ready` heap of [TransactionPool]. @@ -256,6 +258,7 @@ impl TransactionPool { &mut self, txn: VerifiedTransaction, account_nonce: u64, + from_broadcast: bool, ) -> TxAddResult { if txn.tx.nonce().is_some_and(|n| n < account_nonce) { debug!("Nonce is too low. Txn hash: {:?}, from: {:?}, nonce: {:?}, account nonce: {account_nonce}", txn.hash, txn.signer, txn.tx.nonce()); @@ -289,14 +292,41 @@ impl TransactionPool { Self::add_to_gas_index(&mut self.gas_index, &txn); } + // If this is a transaction created at this node, add it to broadcast vector + if !from_broadcast { + self.store_broadcast_txn(txn.tx.clone()); + } + debug!("Txn added to mempool. Hash: {:?}, from: {:?}, nonce: {:?}, account nonce: {account_nonce}", txn.hash, txn.signer, txn.tx.nonce()); // Finally we insert it into the tx store and the hash reverse-index self.hash_to_index.insert(txn.hash, txn.mempool_index()); self.transactions.insert(txn.mempool_index(), txn); + TxAddResult::AddedToMempool } + fn store_broadcast_txn(&mut self, txn: SignedTransaction) { + self.transactions_to_broadcast.push_back(txn); + } + + pub fn pull_txns_to_broadcast(&mut self) -> Result> { + const MAX_BATCH_SIZE: usize = 400; + + if self.transactions_to_broadcast.is_empty() { + return Ok(Vec::new()); + } + + let max_take = min(self.transactions_to_broadcast.len(), MAX_BATCH_SIZE); + + let ret_vec = self + .transactions_to_broadcast + .drain(..max_take) + .collect::>(); + + Ok(ret_vec) + } + fn remove_from_gas_index(gas_index: &mut GasCollection, txn: &VerifiedTransaction) { let gas_key = txn.tx.gas_price_per_evm_gas(); @@ -539,13 +569,13 @@ mod tests { let mut state = get_in_memory_state()?; create_acc(&mut state, from, 100, 0)?; - pool.insert_transaction(transaction(from, 1, 1), 0); + pool.insert_transaction(transaction(from, 1, 1), 0, false); let tx = pool.best_transaction(&state)?; assert_eq!(tx, None); - pool.insert_transaction(transaction(from, 2, 2), 0); - pool.insert_transaction(transaction(from, 0, 0), 0); + pool.insert_transaction(transaction(from, 2, 2), 0, false); + pool.insert_transaction(transaction(from, 0, 0), 0, false); let tx = pool.best_transaction(&state)?.unwrap().clone(); assert_eq!(tx.tx.nonce().unwrap(), 0); @@ -589,7 +619,7 @@ mod tests { nonces.shuffle(&mut rng); for i in 0..COUNT { - pool.insert_transaction(transaction(from, nonces[i as usize] as u8, 3), 0); + pool.insert_transaction(transaction(from, nonces[i as usize] as u8, 3), 0, false); } for i in 0..COUNT { @@ -616,11 +646,11 @@ mod tests { create_acc(&mut state, from2, 100, 0)?; create_acc(&mut state, from3, 100, 0)?; - pool.insert_transaction(intershard_transaction(0, 0, 1), 0); - pool.insert_transaction(transaction(from1, 0, 2), 0); - pool.insert_transaction(transaction(from2, 0, 3), 0); - pool.insert_transaction(transaction(from3, 0, 0), 0); - pool.insert_transaction(intershard_transaction(0, 1, 5), 0); + pool.insert_transaction(intershard_transaction(0, 0, 1), 0, false); + pool.insert_transaction(transaction(from1, 0, 2), 0, false); + pool.insert_transaction(transaction(from2, 0, 3), 0, false); + pool.insert_transaction(transaction(from3, 0, 0), 0, false); + pool.insert_transaction(intershard_transaction(0, 1, 5), 0, false); assert_eq!(pool.transactions.len(), 5); let tx = pool.best_transaction(&state)?.unwrap().clone(); @@ -655,8 +685,8 @@ mod tests { let mut state = get_in_memory_state()?; create_acc(&mut state, from, 100, 0)?; - pool.insert_transaction(transaction(from, 0, 0), 0); - pool.insert_transaction(transaction(from, 1, 0), 0); + pool.insert_transaction(transaction(from, 0, 0), 0, false); + pool.insert_transaction(transaction(from, 1, 0), 0, false); pool.mark_executed(&transaction(from, 0, 0)); state.mutate_account(from, |acc| { @@ -679,8 +709,8 @@ mod tests { let mut state = get_in_memory_state()?; create_acc(&mut state, from, 100, 0)?; - pool.insert_transaction(transaction(from, 0, 1), 0); - pool.insert_transaction(transaction(from, 1, 200), 0); + pool.insert_transaction(transaction(from, 0, 1), 0, false); + pool.insert_transaction(transaction(from, 1, 200), 0, false); assert_eq!( pool.best_transaction(&state)?.unwrap().tx.nonce().unwrap(), @@ -715,12 +745,12 @@ mod tests { let mut state = get_in_memory_state()?; create_acc(&mut state, from, 100, 0)?; - pool.insert_transaction(intershard_transaction(0, 0, 100), 0); - pool.insert_transaction(transaction(from, 0, 1), 0); - pool.insert_transaction(transaction(from, 1, 1), 1); - pool.insert_transaction(transaction(from, 2, 1), 2); - pool.insert_transaction(transaction(from, 3, 200), 3); - pool.insert_transaction(transaction(from, 10, 1), 3); + pool.insert_transaction(intershard_transaction(0, 0, 100), 0, false); + pool.insert_transaction(transaction(from, 0, 1), 0, false); + pool.insert_transaction(transaction(from, 1, 1), 1, false); + pool.insert_transaction(transaction(from, 2, 1), 2, false); + pool.insert_transaction(transaction(from, 3, 200), 3, false); + pool.insert_transaction(transaction(from, 10, 1), 3, false); let content = pool.preview_content(&state)?; diff --git a/zilliqa/tests/it/main.rs b/zilliqa/tests/it/main.rs index 09d457e99..36f4e3ebf 100644 --- a/zilliqa/tests/it/main.rs +++ b/zilliqa/tests/it/main.rs @@ -618,6 +618,11 @@ impl Network { // this could of course spin forever, but the test itself should time out. loop { for node in &self.nodes { + node.inner + .lock() + .unwrap() + .process_transactions_to_broadcast() + .unwrap(); // Trigger a tick so that block fetching can operate. node.inner.lock().unwrap().consensus.tick().unwrap(); if node.inner.lock().unwrap().handle_timeout().unwrap() { @@ -807,6 +812,11 @@ impl Network { let span = tracing::span!(tracing::Level::INFO, "handle_timeout", index); span.in_scope(|| { + node.inner + .lock() + .unwrap() + .process_transactions_to_broadcast() + .unwrap(); node.inner.lock().unwrap().handle_timeout().unwrap(); }); } @@ -1024,6 +1034,16 @@ impl Network { ResponseChannel::Local, ) .unwrap(), + ExternalMessage::BatchedTransactions(transactions) => { + let mut verified = Vec::new(); + for tx in transactions { + let tx = tx.clone().verify().unwrap(); + verified.push(tx); + } + inner + .handle_broadcasted_transactions(verified) + .unwrap(); + } _ => inner .handle_broadcast(source, external_message.clone()) .unwrap(), diff --git a/zilliqa/tests/it/persistence.rs b/zilliqa/tests/it/persistence.rs index bc08b9bdf..a67b84a2c 100644 --- a/zilliqa/tests/it/persistence.rs +++ b/zilliqa/tests/it/persistence.rs @@ -53,11 +53,18 @@ async fn block_and_tx_data_persistence(mut network: Network) { .unwrap() .is_some() }, - 50, + 150, ) .await .unwrap(); + let receipt = wallet + .provider() + .get_transaction_receipt(hash.0) + .await + .unwrap() + .unwrap(); + // make one block without txs network .run_until( @@ -67,9 +74,9 @@ async fn block_and_tx_data_persistence(mut network: Network) { .get_block(BlockId::latest()) .unwrap() .map_or(0, |b| b.number()); - block >= 3 + block > receipt.block_number.unwrap().as_u64() }, - 50, + 150, ) .await .unwrap(); @@ -222,8 +229,8 @@ async fn checkpoints_test(mut network: Network) { let scilla_contract_address = deploy_scilla_contract(&mut network, &secret_key, &code, &data).await; - // Run until block 9 so that we can insert a tx in block 10 (note that this transaction may not *always* appear in the desired block, therefore we do not assert its presence later) - network.run_until_block(&wallet, 9.into(), 200).await; + // Run until block 19 so that we can insert a tx in block 20 (note that this transaction may not *always* appear in the desired block, therefore we do not assert its presence later) + network.run_until_block(&wallet, 19.into(), 400).await; let _hash = wallet .send_transaction(TransactionRequest::pay(wallet.address(), 10), None) @@ -231,8 +238,8 @@ async fn checkpoints_test(mut network: Network) { .unwrap() .tx_hash(); - // wait 10 blocks for checkpoint to happen - then 3 more to finalize that block - network.run_until_block(&wallet, 13.into(), 200).await; + // wait 20 blocks for checkpoint to happen - then 3 more to finalize that block + network.run_until_block(&wallet, 33.into(), 400).await; let checkpoint_files = network .nodes @@ -244,7 +251,7 @@ async fn checkpoints_test(mut network: Network) { .path() .join(network.shard_id.to_string()) .join("checkpoints") - .join("10") + .join("30") }) .collect::>(); @@ -260,7 +267,7 @@ async fn checkpoints_test(mut network: Network) { // Create new node and pass it one of those checkpoint files let checkpoint_path = checkpoint_files[0].to_str().unwrap().to_owned(); - let checkpoint_hash = wallet.get_block(10).await.unwrap().unwrap().hash.unwrap(); + let checkpoint_hash = wallet.get_block(30).await.unwrap().unwrap().hash.unwrap(); let new_node_idx = network.add_node_with_options(NewNodeOptions { checkpoint: Some(Checkpoint { file: checkpoint_path, @@ -272,7 +279,7 @@ async fn checkpoints_test(mut network: Network) { // Confirm wallet and new_node_wallet have the same block and state let new_node_wallet = network.wallet_of_node(new_node_idx).await; let latest_block_number = new_node_wallet.get_block_number().await.unwrap(); - assert_eq!(latest_block_number, 10.into()); + assert_eq!(latest_block_number, 30.into()); let block = wallet .get_block(latest_block_number) @@ -314,7 +321,7 @@ async fn checkpoints_test(mut network: Network) { // check the new node catches up and keeps up with block production network - .run_until_block(&new_node_wallet, 20.into(), 200) + .run_until_block(&new_node_wallet, 40.into(), 400) .await; // check account nonce of old wallet diff --git a/zilliqa/tests/it/zil.rs b/zilliqa/tests/it/zil.rs index 21e47c8eb..fc4d5ad0a 100644 --- a/zilliqa/tests/it/zil.rs +++ b/zilliqa/tests/it/zil.rs @@ -1994,7 +1994,7 @@ async fn get_txns_for_tx_block_ex_1(mut network: Network) { let to_addr: H160 = "0x00000000000000000000000000000000deadbeef" .parse() .unwrap(); - send_transaction( + let (_, txn) = send_transaction( &mut network, &secret_key, 1, @@ -2006,9 +2006,7 @@ async fn get_txns_for_tx_block_ex_1(mut network: Network) { ) .await; - network.run_until_block(&wallet, 2.into(), 50).await; - - let block_number = "1"; + let block_number = txn["receipt"]["epoch_num"].as_str().unwrap(); let page_number = "0"; let response: Value = wallet @@ -2040,7 +2038,7 @@ async fn get_txns_for_tx_block_0(mut network: Network) { let to_addr: H160 = "0x00000000000000000000000000000000deadbeef" .parse() .unwrap(); - send_transaction( + let (_, txn) = send_transaction( &mut network, &secret_key, 1, @@ -2052,9 +2050,7 @@ async fn get_txns_for_tx_block_0(mut network: Network) { ) .await; - network.run_until_block(&wallet, 2.into(), 50).await; - - let block_number = "1"; + let block_number = txn["receipt"]["epoch_num"].as_str().unwrap(); let response: Value = wallet .provider() @@ -2095,7 +2091,7 @@ async fn get_txn_bodies_for_tx_block_0(mut network: Network) { let to_addr: H160 = "0x00000000000000000000000000000000deadbeef" .parse() .unwrap(); - send_transaction( + let (_, txn) = send_transaction( &mut network, &secret_key, 1, @@ -2107,9 +2103,7 @@ async fn get_txn_bodies_for_tx_block_0(mut network: Network) { ) .await; - network.run_until_block(&wallet, 2.into(), 50).await; - - let block_number = "1"; + let block_number = txn["receipt"]["epoch_num"].as_str().unwrap(); let response: Value = wallet .provider() @@ -2135,7 +2129,7 @@ async fn get_txn_bodies_for_tx_block_1(mut network: Network) { let to_addr: H160 = "0x00000000000000000000000000000000deadbeef" .parse() .unwrap(); - send_transaction( + let (_, txn) = send_transaction( &mut network, &secret_key, 1, @@ -2147,9 +2141,7 @@ async fn get_txn_bodies_for_tx_block_1(mut network: Network) { ) .await; - network.run_until_block(&wallet, 2.into(), 50).await; - - let block_number = "1"; + let block_number = txn["receipt"]["epoch_num"].as_str().unwrap(); let response: Value = wallet .provider() @@ -2179,7 +2171,7 @@ async fn get_txn_bodies_for_tx_block_ex_0(mut network: Network) { let to_addr: H160 = "0x00000000000000000000000000000000deadbeef" .parse() .unwrap(); - send_transaction( + let (_, txn) = send_transaction( &mut network, &secret_key, 1, @@ -2191,9 +2183,7 @@ async fn get_txn_bodies_for_tx_block_ex_0(mut network: Network) { ) .await; - network.run_until_block(&wallet, 2.into(), 50).await; - - let block_number = "1"; + let block_number = txn["receipt"]["epoch_num"].as_str().unwrap(); let page_number = "2"; let response: Value = wallet @@ -2225,7 +2215,7 @@ async fn get_txn_bodies_for_tx_block_ex_1(mut network: Network) { let to_addr: H160 = "0x00000000000000000000000000000000deadbeef" .parse() .unwrap(); - send_transaction( + let (_, txn) = send_transaction( &mut network, &secret_key, 1, @@ -2237,9 +2227,7 @@ async fn get_txn_bodies_for_tx_block_ex_1(mut network: Network) { ) .await; - network.run_until_block(&wallet, 2.into(), 50).await; - - let block_number = "1"; + let block_number = txn["receipt"]["epoch_num"].as_str().unwrap(); let page_number = "0"; let response: Value = wallet