Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to record transactions to ledger-tool #181

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub fn load_and_process_ledger_or_exit(
process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> (Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>) {
load_and_process_ledger(
arg_matches,
Expand All @@ -108,6 +109,7 @@ pub fn load_and_process_ledger_or_exit(
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
transaction_status_sender,
)
.unwrap_or_else(|err| {
eprintln!("Exiting. Failed to load and process ledger: {err}");
Expand All @@ -122,6 +124,7 @@ pub fn load_and_process_ledger(
process_options: ProcessOptions,
snapshot_archive_path: Option<PathBuf>,
incremental_snapshot_archive_path: Option<PathBuf>,
transaction_status_sender: Option<TransactionStatusSender>,
) -> Result<(Arc<RwLock<BankForks>>, Option<StartingSnapshotHashes>), LoadAndProcessLedgerError> {
let bank_snapshots_dir = if blockstore.is_primary_access() {
blockstore.ledger_path().join("snapshot")
Expand Down Expand Up @@ -387,7 +390,7 @@ pub fn load_and_process_ledger(
Some(transaction_status_service),
)
} else {
(None, None)
(transaction_status_sender, None)
seanyoung marked this conversation as resolved.
Show resolved Hide resolved
};

let result = blockstore_processor::process_blockstore_from_root(
Expand Down
151 changes: 135 additions & 16 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ use {
solana_ledger::{
blockstore::{create_new_ledger, Blockstore},
blockstore_options::{AccessType, LedgerColumnOptions},
blockstore_processor::ProcessSlotCallback,
blockstore_processor::{
ProcessSlotCallback, TransactionStatusMessage, TransactionStatusSender,
},
use_snapshot_archives_at_startup,
},
solana_measure::{measure, measure::Measure},
solana_runtime::{
bank::{bank_hash_details, Bank, RewardCalculationEvent},
bank::{
bank_hash_details::{self, SlotDetails, TransactionDetails},
Bank, RewardCalculationEvent,
},
bank_forks::BankForks,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_bank_utils,
Expand Down Expand Up @@ -73,6 +78,7 @@ use {
transaction::{MessageHash, SanitizedTransaction, SimpleAddressLoader},
},
solana_stake_program::{points::PointValue, stake_state},
solana_transaction_status::UiInstruction,
solana_unified_scheduler_pool::DefaultSchedulerPool,
solana_vote_program::{
self,
Expand All @@ -83,6 +89,7 @@ use {
ffi::OsStr,
fs::File,
io::{self, Write},
mem::swap,
path::{Path, PathBuf},
process::{exit, Command, Stdio},
str::FromStr,
Expand Down Expand Up @@ -1067,10 +1074,15 @@ fn main() {
.arg(
Arg::with_name("record_slots_config")
.long("record-slots-config")
.default_value("hash-only")
.possible_values(&["hash-only", "accounts"])
.multiple(true)
.takes_value(true)
.possible_values(&["accounts", "tx"])
.requires("record_slots")
.help("In the slot recording, include bank details or not"),
.conflicts_with_all(&[
"enable_rpc_transaction_history",
"geyser_plugin_config",
])
.help("In addition to the bank hash, optionally include accounts and/or transactions details for the slot"),
),
)
.subcommand(
Expand Down Expand Up @@ -1597,6 +1609,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);

println!(
Expand All @@ -1622,6 +1635,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
println!("{}", &bank_forks.read().unwrap().working_bank().hash());
}
Expand Down Expand Up @@ -1654,6 +1668,9 @@ fn main() {
exit(1);
}

let mut transaction_status_sender = None;
let mut tx_receiver = None;

let (slot_callback, record_slots_file, recorded_slots) = if arg_matches
.occurrences_of("record_slots")
> 0
Expand All @@ -1665,29 +1682,61 @@ fn main() {
exit(1);
});

let include_bank =
match arg_matches.value_of("record_slots_config").unwrap() {
"hash-only" => false,
"accounts" => true,
_ => unreachable!(),
};
let mut include_bank = false;
let mut include_tx = false;

if let Some(args) = arg_matches.values_of("record_slots_config") {
for arg in args {
match arg {
"tx" => include_tx = true,
"accounts" => include_bank = true,
_ => unreachable!(),
}
}
}

let slot_hashes = Arc::new(Mutex::new(Vec::new()));

if include_tx {
let (sender, receiver) = crossbeam_channel::unbounded();

transaction_status_sender = Some(TransactionStatusSender { sender });

let slots = Arc::clone(&slot_hashes);

tx_receiver = Some(std::thread::spawn(move || {
record_transactions(receiver, slots);
}));
}

let slot_callback = Arc::new({
let slots = Arc::clone(&slot_hashes);
move |bank: &Bank| {
let slot_details = if include_bank {
bank_hash_details::BankHashSlotDetails::try_from(bank).unwrap()
let mut details = if include_bank {
bank_hash_details::SlotDetails::try_from(bank).unwrap()
} else {
bank_hash_details::BankHashSlotDetails {
bank_hash_details::SlotDetails {
slot: bank.slot(),
bank_hash: bank.hash().to_string(),
..Default::default()
}
};

slots.lock().unwrap().push(slot_details);
let mut slots = slots.lock().unwrap();

if let Some(recorded_slot) =
slots.iter_mut().find(|f| f.slot == details.slot)
{
// copy all fields except transactions
swap(
&mut recorded_slot.transactions,
&mut details.transactions,
);

*recorded_slot = details;
} else {
slots.push(details);
}
}
});

Expand Down Expand Up @@ -1722,7 +1771,7 @@ fn main() {
bank.hash()
);
} else {
let bank_hash_details::BankHashSlotDetails {
let bank_hash_details::SlotDetails {
slot: expected_slot,
bank_hash: expected_hash,
..
Expand Down Expand Up @@ -1764,6 +1813,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
transaction_status_sender,
);

if print_accounts_stats {
Expand All @@ -1779,6 +1829,10 @@ fn main() {
.ok();
}

if let Some(tx_receiver) = tx_receiver {
tx_receiver.join().unwrap();
}

if let Some(recorded_slots_file) = record_slots_file {
if let Ok(recorded_slots) = recorded_slots.clone().unwrap().lock() {
let bank_hashes =
Expand Down Expand Up @@ -1821,6 +1875,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);

let dot = graph_forks(&bank_forks.read().unwrap(), &graph_config);
Expand Down Expand Up @@ -1984,6 +2039,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let mut bank = bank_forks
.read()
Expand Down Expand Up @@ -2373,6 +2429,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let bank = bank_forks.read().unwrap().working_bank();

Expand Down Expand Up @@ -2425,6 +2482,7 @@ fn main() {
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let bank_forks = bank_forks.read().unwrap();
let slot = bank_forks.working_bank().slot();
Expand Down Expand Up @@ -2947,3 +3005,64 @@ fn main() {
measure_total_execution_time.stop();
info!("{}", measure_total_execution_time);
}

fn record_transactions(
recv: crossbeam_channel::Receiver<TransactionStatusMessage>,
slots: Arc<Mutex<Vec<SlotDetails>>>,
) {
for tsm in recv {
if let TransactionStatusMessage::Batch(batch) = tsm {
let slot = batch.bank.slot();

assert_eq!(batch.transactions.len(), batch.execution_results.len());

let transactions: Vec<_> = batch
.transactions
.iter()
.zip(batch.execution_results)
.zip(batch.transaction_indexes)
.map(|((tx, execution_results), index)| {
let message = tx.message();

let accounts: Vec<String> = message
.account_keys()
.iter()
.map(|acc| acc.to_string())
.collect();

let instructions = message
.instructions()
.iter()
.map(|ix| UiInstruction::parse(ix, &message.account_keys(), None))
.collect();

let is_simple_vote_tx = tx.is_simple_vote_transaction();

TransactionDetails {
accounts,
instructions,
is_simple_vote_tx,
execution_results,
index,
}
})
.collect();

let mut slots = slots.lock().unwrap();

if let Some(recorded_slot) = slots.iter_mut().find(|f| f.slot == slot) {
recorded_slot.transactions.extend(transactions);
} else {
slots.push(SlotDetails {
slot,
transactions,
..Default::default()
});
}
}
}

for slot in slots.lock().unwrap().iter_mut() {
slot.transactions.sort_by(|a, b| a.index.cmp(&b.index));
}
}
1 change: 1 addition & 0 deletions ledger-tool/src/program.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ fn load_blockstore(ledger_path: &Path, arg_matches: &ArgMatches<'_>) -> Arc<Bank
process_options,
snapshot_archive_path,
incremental_snapshot_archive_path,
None,
);
let bank = bank_forks.read().unwrap().working_bank();
bank
Expand Down
1 change: 1 addition & 0 deletions programs/sbf/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ solana-sdk = { workspace = true }
solana-stake-program = { workspace = true }
solana-svm = { workspace = true }
solana-system-program = { workspace = true }
solana-transaction-status = { workspace = true }
solana-version = { workspace = true }
solana-vote = { workspace = true }
solana-vote-program = { workspace = true }
Expand Down
Loading