From 6bb780b3730e1d9805d85e1ccdb21821c2c2405e Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Sat, 10 Aug 2024 14:36:53 -0700 Subject: [PATCH 01/12] Batch tx status writes to db. --- ledger/src/blockstore.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 80ce6b83dce1a3..bc5aafe0c37e09 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2906,13 +2906,22 @@ impl Blockstore { self.transaction_status_cf .put_protobuf((signature, slot), &status)?; + let mut write_batch = self.db.batch()?; + for (address, writeable) in keys_with_writable { - self.address_signatures_cf.put( + write_batch.put::( (*address, slot, transaction_index, signature), &AddressSignatureMeta { writeable }, )?; } + self.db.write(write_batch).inspect_err(|e| { + error!( + "Error: {:?} while submitting write batch of tx status for slot {:?}", + e, slot + ) + })?; + Ok(()) } From eb80aa733eff0883d0b19dc6dfb9f113125f8fe7 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Tue, 24 Sep 2024 21:15:35 -0700 Subject: [PATCH 02/12] Batch tx status writes across multiple transactions to db. --- ledger/src/blockstore.rs | 39 ++++++++++++++++------- ledger/src/blockstore/blockstore_purge.rs | 3 ++ ledger/src/blockstore_db.rs | 1 + rpc/src/transaction_status_service.rs | 12 ++++++- 4 files changed, 42 insertions(+), 13 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index bc5aafe0c37e09..f85720033f787f 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -313,6 +313,9 @@ impl Blockstore { self.db } + pub fn db_ref(&self) -> &Arc { + &self.db + } pub fn ledger_path(&self) -> &PathBuf { &self.ledger_path } @@ -2899,29 +2902,32 @@ impl Blockstore { keys_with_writable: impl Iterator, status: TransactionStatusMeta, transaction_index: usize, + db_write_batch: Option<&mut WriteBatch<'_>>, ) -> Result<()> { + let batch = db_write_batch.is_some(); let status = status.into(); let transaction_index = u32::try_from(transaction_index) .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; self.transaction_status_cf .put_protobuf((signature, slot), &status)?; - let mut write_batch = self.db.batch()?; + let default_wrb = &mut WriteBatch::default(); + let write_batch = db_write_batch.unwrap_or(default_wrb); for (address, writeable) in keys_with_writable { - write_batch.put::( - (*address, slot, transaction_index, signature), - &AddressSignatureMeta { writeable }, - )?; + if batch { + write_batch.put::( + (*address, slot, transaction_index, signature), + &AddressSignatureMeta { writeable }, + )?; + } else { + self.address_signatures_cf.put( + (*address, slot, transaction_index, signature), + &AddressSignatureMeta { writeable }, + )?; + } } - self.db.write(write_batch).inspect_err(|e| { - error!( - "Error: {:?} while submitting write batch of tx status for slot {:?}", - e, slot - ) - })?; - Ok(()) } @@ -8705,6 +8711,7 @@ pub mod tests { ..TransactionStatusMeta::default() }, 0, + None, ) .unwrap(); @@ -9089,6 +9096,7 @@ pub mod tests { vec![(&address0, true)].into_iter(), TransactionStatusMeta::default(), 0, + None, ) .unwrap(); blockstore @@ -9098,6 +9106,7 @@ pub mod tests { vec![(&address1, true)].into_iter(), TransactionStatusMeta::default(), 0, + None, ) .unwrap(); @@ -9466,6 +9475,7 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, + None, ) .unwrap(); } @@ -9479,6 +9489,7 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, + None, ) .unwrap(); } @@ -9491,6 +9502,7 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, + None, ) .unwrap(); } @@ -9504,6 +9516,7 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, + None, ) .unwrap(); } @@ -9591,6 +9604,7 @@ pub mod tests { .map(|key| (key, true)), TransactionStatusMeta::default(), counter, + None, ) .unwrap(); counter += 1; @@ -9622,6 +9636,7 @@ pub mod tests { .map(|key| (key, true)), TransactionStatusMeta::default(), counter, + None, ) .unwrap(); counter += 1; diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index b2d79c2bf59672..a9c3521b3a835f 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -589,6 +589,7 @@ pub mod tests { .into_iter(), TransactionStatusMeta::default(), 0, + None, ) .unwrap(); } @@ -650,6 +651,7 @@ pub mod tests { .into_iter(), TransactionStatusMeta::default(), 0, + None, ) .unwrap(); } @@ -728,6 +730,7 @@ pub mod tests { .map(|key| (key, true)), TransactionStatusMeta::default(), 0, + None, ) .unwrap(); } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 8c96403f20e9da..763cedcaef5fe0 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -1405,6 +1405,7 @@ impl LedgerColumn { } } +#[derive(Default)] pub struct WriteBatch<'a> { write_batch: RWriteBatch, map: HashMap<&'static str, &'a ColumnFamily>, diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 314f8b9a4f5fda..4b85a9b7fd470a 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -73,6 +73,8 @@ impl TransactionStatusService { token_balances, transaction_indexes, }) => { + let mut batch = blockstore.db_ref().batch().unwrap(); + for ( transaction, commit_result, @@ -177,10 +179,18 @@ impl TransactionStatusService { keys_with_writable, transaction_status_meta, transaction_index, + Some(&mut batch), ) - .expect("Expect database write to succeed: TransactionStatus"); + .expect("Expect database batching to succeed: TransactionStatus"); } } + + if enable_rpc_transaction_history { + blockstore + .db_ref() + .write(batch) + .expect("Expect database batched write to succeed: TransactionStatus"); + } } TransactionStatusMessage::Freeze(slot) => { max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst); From 6b7c773b7f41fd3ce982cf262b32fce20955909c Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Fri, 27 Sep 2024 22:46:27 -0700 Subject: [PATCH 03/12] Add db batch writes to tx memos. --- ledger/src/blockstore.rs | 14 +++++++++++--- rpc/src/transaction_status_service.rs | 23 ++++++++++++++++------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f85720033f787f..017185d99ed33d 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2911,8 +2911,7 @@ impl Blockstore { self.transaction_status_cf .put_protobuf((signature, slot), &status)?; - let default_wrb = &mut WriteBatch::default(); - let write_batch = db_write_batch.unwrap_or(default_wrb); + let write_batch = db_write_batch.unwrap(); for (address, writeable) in keys_with_writable { if batch { @@ -2954,8 +2953,17 @@ impl Blockstore { signature: &Signature, slot: Slot, memos: String, + db_write_batch: Option<&mut WriteBatch<'_>>, ) -> Result<()> { - self.transaction_memos_cf.put((*signature, slot), &memos) + let batch = db_write_batch.is_some(); + + if batch { + db_write_batch + .unwrap() + .put::((*signature, slot), &memos) + } else { + self.transaction_memos_cf.put((*signature, slot), &memos) + } } /// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 4b85a9b7fd470a..41aa63b6600938 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -73,7 +73,7 @@ impl TransactionStatusService { token_balances, transaction_indexes, }) => { - let mut batch = blockstore.db_ref().batch().unwrap(); + let mut status_and_memos_batch = blockstore.db_ref().batch().unwrap(); for ( transaction, @@ -161,8 +161,15 @@ impl TransactionStatusService { if enable_rpc_transaction_history { if let Some(memos) = extract_and_fmt_memos(transaction.message()) { blockstore - .write_transaction_memos(transaction.signature(), slot, memos) - .expect("Expect database write to succeed: TransactionMemos"); + .write_transaction_memos( + transaction.signature(), + slot, + memos, + Some(&mut status_and_memos_batch), + ) + .expect( + "Expect database batch accumulation to succeed: TransactionMemos", + ); } let message = transaction.message(); @@ -179,17 +186,19 @@ impl TransactionStatusService { keys_with_writable, transaction_status_meta, transaction_index, - Some(&mut batch), + Some(&mut status_and_memos_batch), ) - .expect("Expect database batching to succeed: TransactionStatus"); + .expect( + "Expect database batch accumulation to succeed: TransactionStatus", + ); } } if enable_rpc_transaction_history { blockstore .db_ref() - .write(batch) - .expect("Expect database batched write to succeed: TransactionStatus"); + .write(status_and_memos_batch) + .expect("Expect database batched writes to succeed: TransactionStatus + TransactionMemos"); } } TransactionStatusMessage::Freeze(slot) => { From 5439b3eb66e1c123e1edb68e74286250408f2500 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Fri, 27 Sep 2024 23:15:35 -0700 Subject: [PATCH 04/12] Default derivation no longer needed. --- ledger/src/blockstore_db.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 763cedcaef5fe0..8c96403f20e9da 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -1405,7 +1405,6 @@ impl LedgerColumn { } } -#[derive(Default)] pub struct WriteBatch<'a> { write_batch: RWriteBatch, map: HashMap<&'static str, &'a ColumnFamily>, From 4f867b5af2bd6669d8142f1f0e48b083c0076b34 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Sun, 29 Sep 2024 00:59:03 -0700 Subject: [PATCH 05/12] Fix blockstore unit tests. --- ledger/src/blockstore.rs | 3 ++- ledger/src/blockstore_db.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 017185d99ed33d..3317a101dca447 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2911,7 +2911,8 @@ impl Blockstore { self.transaction_status_cf .put_protobuf((signature, slot), &status)?; - let write_batch = db_write_batch.unwrap(); + let default_wrb = &mut WriteBatch::default(); + let write_batch = db_write_batch.unwrap_or(default_wrb); for (address, writeable) in keys_with_writable { if batch { diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 8c96403f20e9da..763cedcaef5fe0 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -1405,6 +1405,7 @@ impl LedgerColumn { } } +#[derive(Default)] pub struct WriteBatch<'a> { write_batch: RWriteBatch, map: HashMap<&'static str, &'a ColumnFamily>, From 20f08489b98faf7abd7a3d95b9d10e06cb6762cc Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Sun, 29 Sep 2024 01:59:30 -0700 Subject: [PATCH 06/12] minor fix --- ledger/src/blockstore.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 3317a101dca447..b4a42fe91aab34 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2911,8 +2911,11 @@ impl Blockstore { self.transaction_status_cf .put_protobuf((signature, slot), &status)?; - let default_wrb = &mut WriteBatch::default(); - let write_batch = db_write_batch.unwrap_or(default_wrb); + let write_batch = if batch { + db_write_batch.unwrap() + } else { + &mut WriteBatch::default() + }; for (address, writeable) in keys_with_writable { if batch { From 8cdc3d279039fb3d28ab5d0feb0dd55a355d070a Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Thu, 3 Oct 2024 17:20:36 -0700 Subject: [PATCH 07/12] Add tss unit test. Split batching into new functions. --- ledger/src/blockstore.rs | 74 ++++++------ ledger/src/blockstore/blockstore_purge.rs | 3 - rpc/src/transaction_status_service.rs | 136 +++++++++++++++++++++- runtime/src/bank.rs | 2 + 4 files changed, 172 insertions(+), 43 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index b4a42fe91aab34..a123518d3cebb7 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2902,33 +2902,43 @@ impl Blockstore { keys_with_writable: impl Iterator, status: TransactionStatusMeta, transaction_index: usize, - db_write_batch: Option<&mut WriteBatch<'_>>, ) -> Result<()> { - let batch = db_write_batch.is_some(); let status = status.into(); let transaction_index = u32::try_from(transaction_index) .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; self.transaction_status_cf .put_protobuf((signature, slot), &status)?; - let write_batch = if batch { - db_write_batch.unwrap() - } else { - &mut WriteBatch::default() - }; + for (address, writeable) in keys_with_writable { + self.address_signatures_cf.put( + (*address, slot, transaction_index, signature), + &AddressSignatureMeta { writeable }, + )?; + } + + Ok(()) + } + + pub fn add_transaction_status_to_batch<'a>( + &self, + slot: Slot, + signature: Signature, + keys_with_writable: impl Iterator, + status: TransactionStatusMeta, + transaction_index: usize, + db_write_batch: &mut WriteBatch<'_>, + ) -> Result<()> { + let status = status.into(); + let transaction_index = u32::try_from(transaction_index) + .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; + self.transaction_status_cf + .put_protobuf((signature, slot), &status)?; for (address, writeable) in keys_with_writable { - if batch { - write_batch.put::( - (*address, slot, transaction_index, signature), - &AddressSignatureMeta { writeable }, - )?; - } else { - self.address_signatures_cf.put( - (*address, slot, transaction_index, signature), - &AddressSignatureMeta { writeable }, - )?; - } + db_write_batch.put::( + (*address, slot, transaction_index, signature), + &AddressSignatureMeta { writeable }, + )?; } Ok(()) @@ -2957,17 +2967,18 @@ impl Blockstore { signature: &Signature, slot: Slot, memos: String, - db_write_batch: Option<&mut WriteBatch<'_>>, ) -> Result<()> { - let batch = db_write_batch.is_some(); + self.transaction_memos_cf.put((*signature, slot), &memos) + } - if batch { - db_write_batch - .unwrap() - .put::((*signature, slot), &memos) - } else { - self.transaction_memos_cf.put((*signature, slot), &memos) - } + pub fn add_transaction_memos_to_batch( + &self, + signature: &Signature, + slot: Slot, + memos: String, + db_write_batch: &mut WriteBatch<'_>, + ) -> Result<()> { + db_write_batch.put::((*signature, slot), &memos) } /// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock @@ -8723,7 +8734,6 @@ pub mod tests { ..TransactionStatusMeta::default() }, 0, - None, ) .unwrap(); @@ -9108,7 +9118,6 @@ pub mod tests { vec![(&address0, true)].into_iter(), TransactionStatusMeta::default(), 0, - None, ) .unwrap(); blockstore @@ -9118,7 +9127,6 @@ pub mod tests { vec![(&address1, true)].into_iter(), TransactionStatusMeta::default(), 0, - None, ) .unwrap(); @@ -9487,7 +9495,6 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, - None, ) .unwrap(); } @@ -9501,7 +9508,6 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, - None, ) .unwrap(); } @@ -9514,7 +9520,6 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, - None, ) .unwrap(); } @@ -9528,7 +9533,6 @@ pub mod tests { vec![(&address0, true), (&address1, false)].into_iter(), TransactionStatusMeta::default(), x as usize, - None, ) .unwrap(); } @@ -9616,7 +9620,6 @@ pub mod tests { .map(|key| (key, true)), TransactionStatusMeta::default(), counter, - None, ) .unwrap(); counter += 1; @@ -9648,7 +9651,6 @@ pub mod tests { .map(|key| (key, true)), TransactionStatusMeta::default(), counter, - None, ) .unwrap(); counter += 1; diff --git a/ledger/src/blockstore/blockstore_purge.rs b/ledger/src/blockstore/blockstore_purge.rs index a9c3521b3a835f..b2d79c2bf59672 100644 --- a/ledger/src/blockstore/blockstore_purge.rs +++ b/ledger/src/blockstore/blockstore_purge.rs @@ -589,7 +589,6 @@ pub mod tests { .into_iter(), TransactionStatusMeta::default(), 0, - None, ) .unwrap(); } @@ -651,7 +650,6 @@ pub mod tests { .into_iter(), TransactionStatusMeta::default(), 0, - None, ) .unwrap(); } @@ -730,7 +728,6 @@ pub mod tests { .map(|key| (key, true)), TransactionStatusMeta::default(), 0, - None, ) .unwrap(); } diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 41aa63b6600938..d859f5578b11c9 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -161,11 +161,11 @@ impl TransactionStatusService { if enable_rpc_transaction_history { if let Some(memos) = extract_and_fmt_memos(transaction.message()) { blockstore - .write_transaction_memos( + .add_transaction_memos_to_batch( transaction.signature(), slot, memos, - Some(&mut status_and_memos_batch), + &mut status_and_memos_batch, ) .expect( "Expect database batch accumulation to succeed: TransactionMemos", @@ -180,13 +180,13 @@ impl TransactionStatusService { .map(|(index, key)| (key, message.is_writable(index))); blockstore - .write_transaction_status( + .add_transaction_status_to_batch( slot, *transaction.signature(), keys_with_writable, transaction_status_meta, transaction_index, - Some(&mut status_and_memos_batch), + &mut status_and_memos_batch, ) .expect( "Expect database batch accumulation to succeed: TransactionStatus", @@ -439,4 +439,132 @@ pub(crate) mod tests { result.transaction.signature() ); } + + #[test] + fn test_batch_transaction_status_and_memos() { + let genesis_config = create_genesis_config(2).genesis_config; + let (bank, _bank_forks) = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + + let (transaction_status_sender, transaction_status_receiver) = unbounded(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let blockstore = Arc::new(blockstore); + + let transaction1 = build_test_transaction_legacy(); + let transaction1 = VersionedTransaction::from(transaction1); + let transaction1 = SanitizedTransaction::try_create( + transaction1, + MessageHash::Compute, + None, + SimpleAddressLoader::Disabled, + &ReservedAccountKeys::empty_key_set(), + ) + .unwrap(); + + let transaction2 = build_test_transaction_legacy(); + let transaction2 = VersionedTransaction::from(transaction2); + let transaction2 = SanitizedTransaction::try_create( + transaction2, + MessageHash::Compute, + None, + SimpleAddressLoader::Disabled, + &ReservedAccountKeys::empty_key_set(), + ) + .unwrap(); + + let expected_transaction1 = transaction1.clone(); + let expected_transaction2 = transaction2.clone(); + + let commit_result = Ok(CommittedTransaction { + status: Ok(()), + log_messages: None, + inner_instructions: None, + return_data: None, + executed_units: 0, + fee_details: FeeDetails::default(), + rent_debits: RentDebits::default(), + loaded_account_stats: TransactionLoadedAccountsStats::default(), + }); + + let balances = TransactionBalancesSet { + pre_balances: vec![vec![123456], vec![234567]], + post_balances: vec![vec![234567], vec![345678]], + }; + + let token_balances = TransactionTokenBalancesSet { + pre_token_balances: vec![vec![], vec![]], + post_token_balances: vec![vec![], vec![]], + }; + + let slot = bank.slot(); + let transaction_index1: usize = bank.transaction_count().try_into().unwrap(); + let transaction_index2: usize = transaction_index1 + 1; + + let transaction_status_batch = TransactionStatusBatch { + slot, + transactions: vec![transaction1, transaction2], + commit_results: vec![commit_result.clone(), commit_result], + balances: balances.clone(), + token_balances: token_balances, + transaction_indexes: vec![transaction_index1, transaction_index2], + }; + + let test_notifier = Arc::new(TestTransactionNotifier::new()); + + let exit = Arc::new(AtomicBool::new(false)); + let transaction_status_service = TransactionStatusService::new( + transaction_status_receiver, + Arc::new(AtomicU64::default()), + true, + Some(test_notifier.clone()), + blockstore, + false, + exit.clone(), + ); + + transaction_status_sender + .send(TransactionStatusMessage::Batch(transaction_status_batch)) + .unwrap(); + sleep(Duration::from_millis(5000)); + + exit.store(true, Ordering::Relaxed); + transaction_status_service.join().unwrap(); + assert_eq!(test_notifier.notifications.len(), 2); + + let key1 = TestNotifierKey { + slot, + transaction_index: transaction_index1, + signature: *expected_transaction1.signature(), + }; + let key2 = TestNotifierKey { + slot, + transaction_index: transaction_index2, + signature: *expected_transaction2.signature(), + }; + + assert!(test_notifier.notifications.contains_key(&key1)); + assert!(test_notifier.notifications.contains_key(&key2)); + + let result1 = test_notifier.notifications.get(&key1).unwrap(); + let result2 = test_notifier.notifications.get(&key2).unwrap(); + + assert_eq!( + expected_transaction1.signature(), + result1.transaction.signature() + ); + assert_eq!( + expected_transaction1.message_hash(), + result1.transaction.message_hash() + ); + + assert_eq!( + expected_transaction2.signature(), + result2.transaction.signature() + ); + assert_eq!( + expected_transaction2.message_hash(), + result2.transaction.message_hash() + ); + } } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 0a4cb785f2c091..3cadbc6bfd4dfb 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -346,6 +346,8 @@ pub struct TransactionSimulationResult { pub return_data: Option, pub inner_instructions: Option>, } + +#[derive(Clone)] pub struct TransactionBalancesSet { pub pre_balances: TransactionBalances, pub post_balances: TransactionBalances, From 42e86640c7dccc814951793b56fa6b7679cbd8cf Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Thu, 3 Oct 2024 22:11:00 -0700 Subject: [PATCH 08/12] Fix CI issue. --- ledger/src/blockstore.rs | 1 + ledger/src/blockstore_db.rs | 1 - rpc/src/transaction_status_service.rs | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index a123518d3cebb7..07935e52ec6847 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -316,6 +316,7 @@ impl Blockstore { pub fn db_ref(&self) -> &Arc { &self.db } + pub fn ledger_path(&self) -> &PathBuf { &self.ledger_path } diff --git a/ledger/src/blockstore_db.rs b/ledger/src/blockstore_db.rs index 763cedcaef5fe0..8c96403f20e9da 100644 --- a/ledger/src/blockstore_db.rs +++ b/ledger/src/blockstore_db.rs @@ -1405,7 +1405,6 @@ impl LedgerColumn { } } -#[derive(Default)] pub struct WriteBatch<'a> { write_batch: RWriteBatch, map: HashMap<&'static str, &'a ColumnFamily>, diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index d859f5578b11c9..3678a1ee3c59d6 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -506,7 +506,7 @@ pub(crate) mod tests { transactions: vec![transaction1, transaction2], commit_results: vec![commit_result.clone(), commit_result], balances: balances.clone(), - token_balances: token_balances, + token_balances, transaction_indexes: vec![transaction_index1, transaction_index2], }; From b07da7f4547f57891cd08781594accdb74b7b76a Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Tue, 15 Oct 2024 16:22:49 -0700 Subject: [PATCH 09/12] Add cargo benches and more unit tests. --- ledger/benches/blockstore.rs | 121 +++++++++++++++++++++++++++++- ledger/src/blockstore.rs | 141 +++++++++++++++++++++++++++++++++++ 2 files changed, 261 insertions(+), 1 deletion(-) diff --git a/ledger/benches/blockstore.rs b/ledger/benches/blockstore.rs index 27296d412d7ab3..7e4b0d4d66d1d4 100644 --- a/ledger/benches/blockstore.rs +++ b/ledger/benches/blockstore.rs @@ -10,7 +10,8 @@ use { blockstore::{entries_to_test_shreds, Blockstore}, get_tmp_ledger_path_auto_delete, }, - solana_sdk::{clock::Slot, hash::Hash}, + solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey, signature::Signature}, + solana_transaction_status::TransactionStatusMeta, std::path::Path, test::Bencher, }; @@ -154,3 +155,121 @@ fn bench_insert_data_shred_big(bench: &mut Bencher) { blockstore.insert_shreds(shreds, None, false).unwrap(); }); } + +#[bench] +#[ignore] +fn bench_write_transaction_memos(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + b.iter(|| { + for (i, signature) in signatures.iter().enumerate() { + blockstore + .write_transaction_memos( + signature, + i as u64, + "bench_write_transaction_memos".to_string(), + ) + .unwrap(); + } + }); +} + +#[bench] +#[ignore] +fn bench_add_transaction_memos_to_batch(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + b.iter(|| { + let mut memos_batch = blockstore.db_ref().batch().unwrap(); + + for (i, signature) in signatures.iter().enumerate() { + blockstore + .add_transaction_memos_to_batch( + signature, + i as u64, + "bench_write_transaction_memos".to_string(), + &mut memos_batch, + ) + .unwrap(); + } + + blockstore.db_ref().write(memos_batch).unwrap(); + }); +} + +#[bench] +#[ignore] +fn bench_write_transaction_status(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec> = (0..64) + .map(|_| { + vec![ + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + ] + }) + .collect(); + let slot = 5; + + b.iter(|| { + for (i, signature) in signatures.iter().enumerate() { + blockstore + .write_transaction_status( + slot, + *signature, + keys_with_writable[i].iter().map(|(k, v)| (k, *v)), + TransactionStatusMeta::default(), + i, + ) + .unwrap(); + } + }); +} + +#[bench] +#[ignore] +fn bench_add_transaction_status_to_batch(b: &mut Bencher) { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = + Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec> = (0..64) + .map(|_| { + vec![ + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + (Pubkey::new_unique(), true), + (Pubkey::new_unique(), false), + ] + }) + .collect(); + let slot = 5; + + b.iter(|| { + let mut status_batch = blockstore.db_ref().batch().unwrap(); + + for (i, signature) in signatures.iter().enumerate() { + blockstore + .add_transaction_status_to_batch( + slot, + *signature, + keys_with_writable[i].iter().map(|(k, v)| (k, *v)), + TransactionStatusMeta::default(), + i, + &mut status_batch, + ) + .unwrap(); + } + + blockstore.db_ref().write(status_batch).unwrap(); + }); +} diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 07935e52ec6847..45afb74d3615b0 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -12292,4 +12292,145 @@ pub mod tests { ); } } + + #[test] + fn test_write_transaction_memos() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signature: Signature = Signature::new_unique(); + + blockstore + .write_transaction_memos(&signature, 4, "test_write_transaction_memos".to_string()) + .unwrap(); + + let memo = blockstore + .read_transaction_memos(signature, 4) + .expect("Expected to find memo"); + assert_eq!(memo, Some("test_write_transaction_memos".to_string())); + } + + #[test] + fn test_add_transaction_memos_to_batch() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); + let mut memos_batch = blockstore.db_ref().batch().unwrap(); + + blockstore + .add_transaction_memos_to_batch( + &signatures[0], + 4, + "test_write_transaction_memos1".to_string(), + &mut memos_batch, + ) + .unwrap(); + + blockstore + .add_transaction_memos_to_batch( + &signatures[1], + 5, + "test_write_transaction_memos2".to_string(), + &mut memos_batch, + ) + .unwrap(); + + blockstore.db_ref().write(memos_batch).unwrap(); + + let memo1 = blockstore + .read_transaction_memos(signatures[0], 4) + .expect("Expected to find memo"); + assert_eq!(memo1, Some("test_write_transaction_memos1".to_string())); + + let memo2 = blockstore + .read_transaction_memos(signatures[1], 5) + .expect("Expected to find memo"); + assert_eq!(memo2, Some("test_write_transaction_memos2".to_string())); + } + + #[test] + fn test_write_transaction_status() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec<(Pubkey, bool)> = + vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)]; + let slot = 5; + + blockstore + .write_transaction_status( + slot, + signatures[0], + keys_with_writable + .iter() + .map(|&(ref pubkey, writable)| (pubkey, writable)), + TransactionStatusMeta { + fee: 4200, + ..TransactionStatusMeta::default() + }, + 0, + ) + .unwrap(); + + let tx_status = blockstore + .read_transaction_status((signatures[0], slot)) + .unwrap() + .unwrap(); + assert_eq!(tx_status.fee, 4200); + } + + #[test] + fn test_add_transaction_status_to_batch() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + let blockstore = Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"); + let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); + let keys_with_writable: Vec> = (0..2) + .map(|_| vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)]) + .collect(); + let slot = 5; + let mut status_batch = blockstore.db_ref().batch().unwrap(); + + for (i, signature) in signatures.iter().enumerate() { + blockstore + .add_transaction_status_to_batch( + slot, + *signature, + keys_with_writable[i].iter().map(|(k, v)| (k, *v)), + TransactionStatusMeta { + fee: 5700 + i as u64, + status: if i % 2 == 0 { + Ok(()) + } else { + Err(TransactionError::InsufficientFundsForFee) + }, + ..TransactionStatusMeta::default() + }, + i, + &mut status_batch, + ) + .unwrap(); + } + + blockstore.db_ref().write(status_batch).unwrap(); + + let tx_status1 = blockstore + .read_transaction_status((signatures[0], slot)) + .unwrap() + .unwrap(); + assert_eq!(tx_status1.fee, 5700); + assert_eq!(tx_status1.status.unwrap(), ()); + + let tx_status2 = blockstore + .read_transaction_status((signatures[1], slot)) + .unwrap() + .unwrap(); + assert_eq!(tx_status2.fee, 5701); + assert_eq!( + tx_status2.status, + Err(TransactionError::InsufficientFundsForFee) + ); + } } From 90b6c5a537f27ea34b3187572a1f7e21c550d0b9 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Tue, 15 Oct 2024 16:51:32 -0700 Subject: [PATCH 10/12] Fix clippy error. --- ledger/src/blockstore.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index 45afb74d3615b0..d1f738e463da3e 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -12421,7 +12421,7 @@ pub mod tests { .unwrap() .unwrap(); assert_eq!(tx_status1.fee, 5700); - assert_eq!(tx_status1.status.unwrap(), ()); + assert_eq!(tx_status1.status, Ok(())); let tx_status2 = blockstore .read_transaction_status((signatures[1], slot)) From e3127843bd4d5cbd8d8c6ac509a4cc13665a0658 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Wed, 16 Oct 2024 10:27:02 -0700 Subject: [PATCH 11/12] Better loop indexer names. Better batching functions abstraction. --- ledger/benches/blockstore.rs | 28 ++++++++++++------------- ledger/src/blockstore.rs | 30 +++++++++++++++------------ rpc/src/transaction_status_service.rs | 6 ++---- 3 files changed, 33 insertions(+), 31 deletions(-) diff --git a/ledger/benches/blockstore.rs b/ledger/benches/blockstore.rs index 7e4b0d4d66d1d4..853f4f0ff11e31 100644 --- a/ledger/benches/blockstore.rs +++ b/ledger/benches/blockstore.rs @@ -164,11 +164,11 @@ fn bench_write_transaction_memos(b: &mut Bencher) { Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); b.iter(|| { - for (i, signature) in signatures.iter().enumerate() { + for (slot, signature) in signatures.iter().enumerate() { blockstore .write_transaction_memos( signature, - i as u64, + slot as u64, "bench_write_transaction_memos".to_string(), ) .unwrap(); @@ -184,20 +184,20 @@ fn bench_add_transaction_memos_to_batch(b: &mut Bencher) { Blockstore::open(ledger_path.path()).expect("Expected to be able to open database ledger"); let signatures: Vec = (0..64).map(|_| Signature::new_unique()).collect(); b.iter(|| { - let mut memos_batch = blockstore.db_ref().batch().unwrap(); + let mut memos_batch = blockstore.get_write_batch().unwrap(); - for (i, signature) in signatures.iter().enumerate() { + for (slot, signature) in signatures.iter().enumerate() { blockstore .add_transaction_memos_to_batch( signature, - i as u64, + slot as u64, "bench_write_transaction_memos".to_string(), &mut memos_batch, ) .unwrap(); } - blockstore.db_ref().write(memos_batch).unwrap(); + blockstore.write_batch(memos_batch).unwrap(); }); } @@ -221,14 +221,14 @@ fn bench_write_transaction_status(b: &mut Bencher) { let slot = 5; b.iter(|| { - for (i, signature) in signatures.iter().enumerate() { + for (tx_idx, signature) in signatures.iter().enumerate() { blockstore .write_transaction_status( slot, *signature, - keys_with_writable[i].iter().map(|(k, v)| (k, *v)), + keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)), TransactionStatusMeta::default(), - i, + tx_idx, ) .unwrap(); } @@ -255,21 +255,21 @@ fn bench_add_transaction_status_to_batch(b: &mut Bencher) { let slot = 5; b.iter(|| { - let mut status_batch = blockstore.db_ref().batch().unwrap(); + let mut status_batch = blockstore.get_write_batch().unwrap(); - for (i, signature) in signatures.iter().enumerate() { + for (tx_idx, signature) in signatures.iter().enumerate() { blockstore .add_transaction_status_to_batch( slot, *signature, - keys_with_writable[i].iter().map(|(k, v)| (k, *v)), + keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)), TransactionStatusMeta::default(), - i, + tx_idx, &mut status_batch, ) .unwrap(); } - blockstore.db_ref().write(status_batch).unwrap(); + blockstore.write_batch(status_batch).unwrap(); }); } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index d1f738e463da3e..bc29b372b4077b 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -313,10 +313,6 @@ impl Blockstore { self.db } - pub fn db_ref(&self) -> &Arc { - &self.db - } - pub fn ledger_path(&self) -> &PathBuf { &self.ledger_path } @@ -4612,6 +4608,14 @@ impl Blockstore { *index_meta_time_us += total_start.as_us(); res } + + pub fn get_write_batch(&self) -> std::result::Result, BlockstoreError> { + self.db.batch() + } + + pub fn write_batch(&self, write_batch: WriteBatch) -> Result<()> { + self.db.write(write_batch) + } } // Update the `completed_data_indexes` with a new shred `new_shred_index`. If a @@ -12316,7 +12320,7 @@ pub mod tests { let blockstore = Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"); let signatures: Vec = (0..2).map(|_| Signature::new_unique()).collect(); - let mut memos_batch = blockstore.db_ref().batch().unwrap(); + let mut memos_batch = blockstore.get_write_batch().unwrap(); blockstore .add_transaction_memos_to_batch( @@ -12336,7 +12340,7 @@ pub mod tests { ) .unwrap(); - blockstore.db_ref().write(memos_batch).unwrap(); + blockstore.write_batch(memos_batch).unwrap(); let memo1 = blockstore .read_transaction_memos(signatures[0], 4) @@ -12391,30 +12395,30 @@ pub mod tests { .map(|_| vec![(Pubkey::new_unique(), true), (Pubkey::new_unique(), false)]) .collect(); let slot = 5; - let mut status_batch = blockstore.db_ref().batch().unwrap(); + let mut status_batch = blockstore.get_write_batch().unwrap(); - for (i, signature) in signatures.iter().enumerate() { + for (tx_idx, signature) in signatures.iter().enumerate() { blockstore .add_transaction_status_to_batch( slot, *signature, - keys_with_writable[i].iter().map(|(k, v)| (k, *v)), + keys_with_writable[tx_idx].iter().map(|(k, v)| (k, *v)), TransactionStatusMeta { - fee: 5700 + i as u64, - status: if i % 2 == 0 { + fee: 5700 + tx_idx as u64, + status: if tx_idx % 2 == 0 { Ok(()) } else { Err(TransactionError::InsufficientFundsForFee) }, ..TransactionStatusMeta::default() }, - i, + tx_idx, &mut status_batch, ) .unwrap(); } - blockstore.db_ref().write(status_batch).unwrap(); + blockstore.write_batch(status_batch).unwrap(); let tx_status1 = blockstore .read_transaction_status((signatures[0], slot)) diff --git a/rpc/src/transaction_status_service.rs b/rpc/src/transaction_status_service.rs index 3678a1ee3c59d6..3ae6e2f130d1dd 100644 --- a/rpc/src/transaction_status_service.rs +++ b/rpc/src/transaction_status_service.rs @@ -73,7 +73,7 @@ impl TransactionStatusService { token_balances, transaction_indexes, }) => { - let mut status_and_memos_batch = blockstore.db_ref().batch().unwrap(); + let mut status_and_memos_batch = blockstore.get_write_batch().unwrap(); for ( transaction, @@ -195,9 +195,7 @@ impl TransactionStatusService { } if enable_rpc_transaction_history { - blockstore - .db_ref() - .write(status_and_memos_batch) + blockstore.write_batch(status_and_memos_batch) .expect("Expect database batched writes to succeed: TransactionStatus + TransactionMemos"); } } From a214b7c1221fac13cde6002630e4819cbe4cd9f9 Mon Sep 17 00:00:00 2001 From: Faycel Kouteib Date: Wed, 16 Oct 2024 18:09:53 -0700 Subject: [PATCH 12/12] Review feedback. Refactor common code. Co-authored-by: Lijun Wang <83639177+lijunwangs@users.noreply.github.com> --- ledger/src/blockstore.rs | 64 +++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index bc29b372b4077b..07d9a6d0cb5404 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -2892,14 +2892,19 @@ impl Blockstore { } } - pub fn write_transaction_status<'a>( + #[inline] + fn write_transaction_status_helper<'a, F>( &self, slot: Slot, signature: Signature, keys_with_writable: impl Iterator, status: TransactionStatusMeta, transaction_index: usize, - ) -> Result<()> { + mut write_fn: F, + ) -> Result<()> + where + F: FnMut(&Pubkey, Slot, u32, Signature, bool) -> Result<()>, + { let status = status.into(); let transaction_index = u32::try_from(transaction_index) .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; @@ -2907,15 +2912,35 @@ impl Blockstore { .put_protobuf((signature, slot), &status)?; for (address, writeable) in keys_with_writable { - self.address_signatures_cf.put( - (*address, slot, transaction_index, signature), - &AddressSignatureMeta { writeable }, - )?; + write_fn(address, slot, transaction_index, signature, writeable)?; } Ok(()) } + pub fn write_transaction_status<'a>( + &self, + slot: Slot, + signature: Signature, + keys_with_writable: impl Iterator, + status: TransactionStatusMeta, + transaction_index: usize, + ) -> Result<()> { + self.write_transaction_status_helper( + slot, + signature, + keys_with_writable, + status, + transaction_index, + |address, slot, tx_index, signature, writeable| { + self.address_signatures_cf.put( + (*address, slot, tx_index, signature), + &AddressSignatureMeta { writeable }, + ) + }, + ) + } + pub fn add_transaction_status_to_batch<'a>( &self, slot: Slot, @@ -2925,20 +2950,19 @@ impl Blockstore { transaction_index: usize, db_write_batch: &mut WriteBatch<'_>, ) -> Result<()> { - let status = status.into(); - let transaction_index = u32::try_from(transaction_index) - .map_err(|_| BlockstoreError::TransactionIndexOverflow)?; - self.transaction_status_cf - .put_protobuf((signature, slot), &status)?; - - for (address, writeable) in keys_with_writable { - db_write_batch.put::( - (*address, slot, transaction_index, signature), - &AddressSignatureMeta { writeable }, - )?; - } - - Ok(()) + self.write_transaction_status_helper( + slot, + signature, + keys_with_writable, + status, + transaction_index, + |address, slot, tx_index, signature, writeable| { + db_write_batch.put::( + (*address, slot, tx_index, signature), + &AddressSignatureMeta { writeable }, + ) + }, + ) } pub fn read_transaction_memos(