Skip to content

Commit

Permalink
blockstore: Remove intermediate Database type (#3934)
Browse files Browse the repository at this point in the history
Blockstore contains a Database which contains a Rocks, which contains
the rust-rocksdb type. Database is mostly a passthrough layer from
Blockstore to Rocks which is unnecessary. So, this change removes it
  • Loading branch information
steviez authored Dec 6, 2024
1 parent 0abfe27 commit 3c793d6
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 207 deletions.
68 changes: 33 additions & 35 deletions ledger-tool/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use {
solana_ledger::{
ancestor_iterator::AncestorIterator,
blockstore::{Blockstore, PurgeType},
blockstore_db::{self, Column, ColumnName, Database},
blockstore_db::{Column, ColumnName},
blockstore_options::AccessType,
shred::Shred,
},
Expand All @@ -38,21 +38,17 @@ use {
},
};

fn analyze_column<
C: solana_ledger::blockstore_db::Column + solana_ledger::blockstore_db::ColumnName,
>(
db: &Database,
name: &str,
) -> Result<()> {
fn analyze_column(blockstore: &Blockstore, column_name: &str) -> Result<()> {
let mut key_len: u64 = 0;
let mut key_tot: u64 = 0;
let mut val_hist = histogram::Histogram::new();
let mut val_tot: u64 = 0;
let mut row_hist = histogram::Histogram::new();
for (key, val) in db.column::<C>().iter(blockstore_db::IteratorMode::Start)? {
let column_iterator = blockstore.iterator_cf(column_name)?;
for (key, val) in column_iterator {
// Key length is fixed, only need to calculate it once
if key_len == 0 {
key_len = C::key(key).len() as u64;
key_len = key.len() as u64;
}
let val_len = val.len() as u64;

Expand All @@ -65,7 +61,7 @@ fn analyze_column<

let json_result = if val_hist.entries() > 0 {
json!({
"column":name,
"column":column_name,
"entries":val_hist.entries(),
"key_stats":{
"max":key_len,
Expand Down Expand Up @@ -94,7 +90,7 @@ fn analyze_column<
})
} else {
json!({
"column":name,
"column":column_name,
"entries":val_hist.entries(),
"key_stats":{
"max":key_len,
Expand All @@ -113,28 +109,28 @@ fn analyze_column<
Ok(())
}

fn analyze_storage(database: &Database) -> Result<()> {
fn analyze_storage(blockstore: &Blockstore) -> Result<()> {
use solana_ledger::blockstore_db::columns::*;
analyze_column::<SlotMeta>(database, "SlotMeta")?;
analyze_column::<Orphans>(database, "Orphans")?;
analyze_column::<DeadSlots>(database, "DeadSlots")?;
analyze_column::<DuplicateSlots>(database, "DuplicateSlots")?;
analyze_column::<ErasureMeta>(database, "ErasureMeta")?;
analyze_column::<BankHash>(database, "BankHash")?;
analyze_column::<Root>(database, "Root")?;
analyze_column::<Index>(database, "Index")?;
analyze_column::<ShredData>(database, "ShredData")?;
analyze_column::<ShredCode>(database, "ShredCode")?;
analyze_column::<TransactionStatus>(database, "TransactionStatus")?;
analyze_column::<AddressSignatures>(database, "AddressSignatures")?;
analyze_column::<TransactionMemos>(database, "TransactionMemos")?;
analyze_column::<TransactionStatusIndex>(database, "TransactionStatusIndex")?;
analyze_column::<Rewards>(database, "Rewards")?;
analyze_column::<Blocktime>(database, "Blocktime")?;
analyze_column::<PerfSamples>(database, "PerfSamples")?;
analyze_column::<BlockHeight>(database, "BlockHeight")?;
analyze_column::<ProgramCosts>(database, "ProgramCosts")?;
analyze_column::<OptimisticSlots>(database, "OptimisticSlots")
analyze_column(blockstore, SlotMeta::NAME)?;
analyze_column(blockstore, Orphans::NAME)?;
analyze_column(blockstore, DeadSlots::NAME)?;
analyze_column(blockstore, DuplicateSlots::NAME)?;
analyze_column(blockstore, ErasureMeta::NAME)?;
analyze_column(blockstore, BankHash::NAME)?;
analyze_column(blockstore, Root::NAME)?;
analyze_column(blockstore, Index::NAME)?;
analyze_column(blockstore, ShredData::NAME)?;
analyze_column(blockstore, ShredCode::NAME)?;
analyze_column(blockstore, TransactionStatus::NAME)?;
analyze_column(blockstore, AddressSignatures::NAME)?;
analyze_column(blockstore, TransactionMemos::NAME)?;
analyze_column(blockstore, TransactionStatusIndex::NAME)?;
analyze_column(blockstore, Rewards::NAME)?;
analyze_column(blockstore, Blocktime::NAME)?;
analyze_column(blockstore, PerfSamples::NAME)?;
analyze_column(blockstore, BlockHeight::NAME)?;
analyze_column(blockstore, ProgramCosts::NAME)?;
analyze_column(blockstore, OptimisticSlots::NAME)
}

fn raw_key_to_slot(key: &[u8], column_name: &str) -> Option<Slot> {
Expand Down Expand Up @@ -602,9 +598,11 @@ fn do_blockstore_process_command(ledger_path: &Path, matches: &ArgMatches<'_>) -
let verbose_level = matches.occurrences_of("verbose");

match matches.subcommand() {
("analyze-storage", Some(arg_matches)) => analyze_storage(
&crate::open_blockstore(&ledger_path, arg_matches, AccessType::Secondary).db(),
)?,
("analyze-storage", Some(arg_matches)) => analyze_storage(&crate::open_blockstore(
&ledger_path,
arg_matches,
AccessType::Secondary,
))?,
("bounds", Some(arg_matches)) => {
let output_format = OutputFormat::from_matches(arg_matches, "output_format", false);
let all = arg_matches.is_present("all");
Expand Down
95 changes: 48 additions & 47 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use {
crate::{
ancestor_iterator::AncestorIterator,
blockstore_db::{
columns as cf, Column, ColumnIndexDeprecation, Database, IteratorDirection,
IteratorMode, LedgerColumn, Result, WriteBatch,
columns as cf, Column, ColumnIndexDeprecation, IteratorDirection, IteratorMode,
LedgerColumn, Result, Rocks, WriteBatch,
},
blockstore_meta::*,
blockstore_metrics::BlockstoreRpcApiMetrics,
Expand Down Expand Up @@ -232,7 +232,7 @@ pub struct BlockstoreSignals {
// ledger window
pub struct Blockstore {
ledger_path: PathBuf,
db: Arc<Database>,
db: Arc<Rocks>,
// Column families
address_signatures_cf: LedgerColumn<cf::AddressSignatures>,
bank_hash_cf: LedgerColumn<cf::BankHash>,
Expand Down Expand Up @@ -351,10 +351,6 @@ pub fn banking_retrace_path(path: &Path) -> PathBuf {
}

impl Blockstore {
pub fn db(self) -> Arc<Database> {
self.db
}

pub fn ledger_path(&self) -> &PathBuf {
&self.ledger_path
}
Expand Down Expand Up @@ -385,7 +381,7 @@ impl Blockstore {
// Open the database
let mut measure = Measure::start("blockstore open");
info!("Opening blockstore at {:?}", blockstore_path);
let db = Database::open(&blockstore_path, options)?;
let db = Arc::new(Rocks::open(blockstore_path, options)?);

let address_signatures_cf = db.column();
let bank_hash_cf = db.column();
Expand All @@ -409,8 +405,6 @@ impl Blockstore {
let transaction_status_cf = db.column();
let transaction_status_index_cf = db.column();

let db = Arc::new(db);

// Get max root or 0 if it doesn't exist
let max_root = roots_cf
.iter(IteratorMode::End)?
Expand Down Expand Up @@ -534,7 +528,7 @@ impl Blockstore {
pub fn destroy(ledger_path: &Path) -> Result<()> {
// Database::destroy() fails if the root directory doesn't exist
fs::create_dir_all(ledger_path)?;
Database::destroy(&Path::new(ledger_path).join(BLOCKSTORE_DIRECTORY_ROCKS_LEVEL))
Rocks::destroy(&Path::new(ledger_path).join(BLOCKSTORE_DIRECTORY_ROCKS_LEVEL))
}

/// Returns the SlotMeta of the specified slot.
Expand Down Expand Up @@ -675,6 +669,16 @@ impl Blockstore {
self.db.live_files_metadata()
}

#[cfg(feature = "dev-context-only-utils")]
pub fn iterator_cf(
&self,
cf_name: &str,
) -> Result<impl Iterator<Item = (Box<[u8]>, Box<[u8]>)> + '_> {
let cf = self.db.cf_handle(cf_name);
let iterator = self.db.iterator_cf_raw_key(cf, IteratorMode::Start);
Ok(iterator.map(|pair| pair.unwrap()))
}

pub fn slot_data_iterator(
&self,
slot: Slot,
Expand Down Expand Up @@ -1294,7 +1298,7 @@ impl Blockstore {
metrics.insert_lock_elapsed_us += start.as_us();

let mut shred_insertion_tracker =
ShredInsertionTracker::new(shreds.len(), self.db.batch()?);
ShredInsertionTracker::new(shreds.len(), self.get_write_batch()?);

self.attempt_shred_insertion(
shreds,
Expand Down Expand Up @@ -1329,7 +1333,7 @@ impl Blockstore {

// Write out the accumulated batch.
let mut start = Measure::start("Write Batch");
self.db.write(shred_insertion_tracker.write_batch)?;
self.write_batch(shred_insertion_tracker.write_batch)?;
start.stop();
metrics.write_batch_elapsed_us += start.as_us();

Expand Down Expand Up @@ -2603,22 +2607,19 @@ impl Blockstore {
end_index: u64,
max_missing: usize,
) -> Vec<u64> {
if let Ok(mut db_iterator) = self
.db
.raw_iterator_cf(self.db.cf_handle::<cf::ShredData>())
{
Self::find_missing_indexes::<cf::ShredData>(
&mut db_iterator,
slot,
first_timestamp,
defer_threshold_ticks,
start_index,
end_index,
max_missing,
)
} else {
vec![]
}
let Ok(mut db_iterator) = self.db.raw_iterator_cf(self.data_shred_cf.handle()) else {
return vec![];
};

Self::find_missing_indexes::<cf::ShredData>(
&mut db_iterator,
slot,
first_timestamp,
defer_threshold_ticks,
start_index,
end_index,
max_missing,
)
}

fn get_block_time(&self, slot: Slot) -> Result<Option<UnixTimestamp>> {
Expand Down Expand Up @@ -4078,7 +4079,7 @@ impl Blockstore {
&self,
duplicate_confirmed_slot_hashes: impl Iterator<Item = (Slot, Hash)>,
) -> Result<()> {
let mut write_batch = self.db.batch()?;
let mut write_batch = self.get_write_batch()?;
for (slot, frozen_hash) in duplicate_confirmed_slot_hashes {
let data = FrozenHashVersioned::Current(FrozenHashStatus {
frozen_hash,
Expand All @@ -4088,19 +4089,19 @@ impl Blockstore {
.put_in_batch(&mut write_batch, slot, &data)?;
}

self.db.write(write_batch)?;
self.write_batch(write_batch)?;
Ok(())
}

pub fn set_roots<'a>(&self, rooted_slots: impl Iterator<Item = &'a Slot>) -> Result<()> {
let mut write_batch = self.db.batch()?;
let mut write_batch = self.get_write_batch()?;
let mut max_new_rooted_slot = 0;
for slot in rooted_slots {
max_new_rooted_slot = std::cmp::max(max_new_rooted_slot, *slot);
self.roots_cf.put_in_batch(&mut write_batch, *slot, &true)?;
}

self.db.write(write_batch)?;
self.write_batch(write_batch)?;
self.max_root
.fetch_max(max_new_rooted_slot, Ordering::Relaxed);
Ok(())
Expand Down Expand Up @@ -4396,7 +4397,7 @@ impl Blockstore {
"Marking slot {} and any full children slots as connected",
root
);
let mut write_batch = self.db.batch()?;
let mut write_batch = self.get_write_batch()?;

// Mark both connected bits on the root slot so that the flags for this
// slot match the flags of slots that become connected the typical way.
Expand All @@ -4419,7 +4420,7 @@ impl Blockstore {
.put_in_batch(&mut write_batch, meta.slot, &meta)?;
}

self.db.write(write_batch)?;
self.write_batch(write_batch)?;
Ok(())
}

Expand Down Expand Up @@ -4772,7 +4773,7 @@ impl Blockstore {
res
}

pub fn get_write_batch(&self) -> std::result::Result<WriteBatch, BlockstoreError> {
pub fn get_write_batch(&self) -> Result<WriteBatch> {
self.db.batch()
}

Expand Down Expand Up @@ -7578,7 +7579,7 @@ pub mod tests {
let coding_shred = coding_shreds[index as usize].clone();

let mut shred_insertion_tracker =
ShredInsertionTracker::new(coding_shreds.len(), blockstore.db.batch().unwrap());
ShredInsertionTracker::new(coding_shreds.len(), blockstore.get_write_batch().unwrap());
assert!(blockstore.check_insert_coding_shred(
coding_shred.clone(),
&mut shred_insertion_tracker,
Expand Down Expand Up @@ -7624,14 +7625,14 @@ pub mod tests {
.put(erasure_set.store_key(), working_merkle_root_meta.as_ref())
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
blockstore.write_batch(write_batch).unwrap();

// Add a shred with different merkle root and index
let (_, coding_shreds, _) = setup_erasure_shreds(slot, parent_slot, 10);
let new_coding_shred = coding_shreds[(index + 1) as usize].clone();

let mut shred_insertion_tracker =
ShredInsertionTracker::new(coding_shreds.len(), blockstore.db.batch().unwrap());
ShredInsertionTracker::new(coding_shreds.len(), blockstore.get_write_batch().unwrap());

assert!(!blockstore.check_insert_coding_shred(
new_coding_shred.clone(),
Expand Down Expand Up @@ -7759,7 +7760,7 @@ pub mod tests {
let data_shred = data_shreds[0].clone();

let mut shred_insertion_tracker =
ShredInsertionTracker::new(data_shreds.len(), blockstore.db.batch().unwrap());
ShredInsertionTracker::new(data_shreds.len(), blockstore.get_write_batch().unwrap());
blockstore
.check_insert_data_shred(
data_shred.clone(),
Expand Down Expand Up @@ -7806,15 +7807,15 @@ pub mod tests {
.put(erasure_set.store_key(), working_merkle_root_meta.as_ref())
.unwrap();
}
blockstore.db.write(write_batch).unwrap();
blockstore.write_batch(write_batch).unwrap();

// Add a shred with different merkle root and index
let (data_shreds, _, _) =
setup_erasure_shreds_with_index(slot, parent_slot, 10, fec_set_index);
let new_data_shred = data_shreds[1].clone();

let mut shred_insertion_tracker =
ShredInsertionTracker::new(data_shreds.len(), blockstore.db.batch().unwrap());
ShredInsertionTracker::new(data_shreds.len(), blockstore.get_write_batch().unwrap());

assert!(blockstore
.check_insert_data_shred(
Expand Down Expand Up @@ -7956,7 +7957,7 @@ pub mod tests {
);

let mut shred_insertion_tracker =
ShredInsertionTracker::new(1, blockstore.db.batch().unwrap());
ShredInsertionTracker::new(1, blockstore.get_write_batch().unwrap());
assert!(blockstore.check_insert_coding_shred(
coding_shred.clone(),
&mut shred_insertion_tracker,
Expand Down Expand Up @@ -11962,12 +11963,12 @@ pub mod tests {
blockstore
.put_erasure_meta(coding_shred_previous.erasure_set(), &erasure_meta)
.unwrap();
let mut write_batch = blockstore.db.batch().unwrap();
let mut write_batch = blockstore.get_write_batch().unwrap();
blockstore
.merkle_root_meta_cf
.delete_range_in_batch(&mut write_batch, slot, slot)
.unwrap();
blockstore.db.write(write_batch).unwrap();
blockstore.write_batch(write_batch).unwrap();
assert!(blockstore
.merkle_root_meta(coding_shred_previous.erasure_set())
.unwrap()
Expand Down Expand Up @@ -12028,12 +12029,12 @@ pub mod tests {

// Remove the merkle root meta in order to simulate this blockstore originating from
// an older version.
let mut write_batch = blockstore.db.batch().unwrap();
let mut write_batch = blockstore.get_write_batch().unwrap();
blockstore
.merkle_root_meta_cf
.delete_range_in_batch(&mut write_batch, slot, slot)
.unwrap();
blockstore.db.write(write_batch).unwrap();
blockstore.write_batch(write_batch).unwrap();
assert!(blockstore
.merkle_root_meta(next_coding_shreds[0].erasure_set())
.unwrap()
Expand Down
Loading

0 comments on commit 3c793d6

Please sign in to comment.