Skip to content

Commit

Permalink
Generic processing pipeline (#3467)
Browse files Browse the repository at this point in the history
  • Loading branch information
apfitzge authored Nov 11, 2024
1 parent 9d02885 commit 7037f88
Show file tree
Hide file tree
Showing 28 changed files with 280 additions and 305 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use {
transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender,
},
solana_runtime_transaction::svm_transaction_adapter::SVMTransactionAdapter,
solana_sdk::{pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction},
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
solana_sdk::{pubkey::Pubkey, saturating_add_assign},
solana_svm::{
transaction_commit_result::{TransactionCommitResult, TransactionCommitResultExtensions},
transaction_processing_result::{
Expand Down Expand Up @@ -68,7 +68,7 @@ impl Committer {

pub(super) fn commit_transactions(
&self,
batch: &TransactionBatch<SanitizedTransaction>,
batch: &TransactionBatch<impl TransactionWithMeta>,
processing_results: Vec<TransactionProcessingResult>,
starting_transaction_index: Option<usize>,
bank: &Arc<Bank>,
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Committer {
&self,
commit_results: Vec<TransactionCommitResult>,
bank: &Arc<Bank>,
batch: &TransactionBatch<SanitizedTransaction>,
batch: &TransactionBatch<impl TransactionWithMeta>,
pre_balance_info: &mut PreBalanceInfo,
starting_transaction_index: Option<usize>,
) {
Expand Down
26 changes: 13 additions & 13 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,14 @@ use {
},
solana_runtime_transaction::{
instructions_processor::process_compute_budget_instructions,
runtime_transaction::RuntimeTransaction,
transaction_with_meta::TransactionWithMeta,
},
solana_sdk::{
clock::{FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, MAX_PROCESSING_AGE},
fee::FeeBudgetLimits,
message::SanitizedMessage,
saturating_add_assign,
timing::timestamp,
transaction::{self, SanitizedTransaction, TransactionError},
transaction::{self, TransactionError},
},
solana_svm::{
account_loader::{validate_fee_payer, TransactionCheckResult},
Expand Down Expand Up @@ -231,7 +230,7 @@ impl Consumer {
&self,
bank: &Arc<Bank>,
bank_creation_time: &Instant,
sanitized_transactions: &[RuntimeTransaction<SanitizedTransaction>],
sanitized_transactions: &[impl TransactionWithMeta],
banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &mut LeaderSlotMetricsTracker,
) -> ProcessTransactionsSummary {
Expand Down Expand Up @@ -287,7 +286,7 @@ impl Consumer {
&self,
bank: &Arc<Bank>,
bank_creation_time: &Instant,
transactions: &[RuntimeTransaction<SanitizedTransaction>],
transactions: &[impl TransactionWithMeta],
) -> ProcessTransactionsSummary {
let mut chunk_start = 0;
let mut all_retryable_tx_indexes = vec![];
Expand Down Expand Up @@ -389,7 +388,7 @@ impl Consumer {
pub fn process_and_record_transactions(
&self,
bank: &Arc<Bank>,
txs: &[RuntimeTransaction<SanitizedTransaction>],
txs: &[impl TransactionWithMeta],
chunk_offset: usize,
) -> ProcessTransactionBatchOutput {
let mut error_counters = TransactionErrorMetrics::default();
Expand Down Expand Up @@ -432,7 +431,7 @@ impl Consumer {
pub fn process_and_record_aged_transactions(
&self,
bank: &Arc<Bank>,
txs: &[RuntimeTransaction<SanitizedTransaction>],
txs: &[impl TransactionWithMeta],
max_ages: &[MaxAge],
) -> ProcessTransactionBatchOutput {
let move_precompile_verification_to_svm = bank
Expand Down Expand Up @@ -476,7 +475,7 @@ impl Consumer {
fn process_and_record_transactions_with_pre_results(
&self,
bank: &Arc<Bank>,
txs: &[RuntimeTransaction<SanitizedTransaction>],
txs: &[impl TransactionWithMeta],
chunk_offset: usize,
pre_results: impl Iterator<Item = Result<(), TransactionError>>,
) -> ProcessTransactionBatchOutput {
Expand Down Expand Up @@ -556,7 +555,7 @@ impl Consumer {
fn execute_and_commit_transactions_locked(
&self,
bank: &Arc<Bank>,
batch: &TransactionBatch<SanitizedTransaction>,
batch: &TransactionBatch<impl TransactionWithMeta>,
) -> ExecuteAndCommitTransactionsOutput {
let transaction_status_sender_enabled = self.committer.transaction_status_sender_enabled();
let mut execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
Expand Down Expand Up @@ -755,12 +754,12 @@ impl Consumer {

pub fn check_fee_payer_unlocked(
bank: &Bank,
message: &SanitizedMessage,
message: &impl SVMMessage,
error_counters: &mut TransactionErrorMetrics,
) -> Result<(), TransactionError> {
let fee_payer = message.fee_payer();
let fee_budget_limits = FeeBudgetLimits::from(process_compute_budget_instructions(
SVMMessage::program_instructions_iter(message),
message.program_instructions_iter(),
)?);
let fee = solana_fee::calculate_fee(
message,
Expand Down Expand Up @@ -807,7 +806,7 @@ impl Consumer {
/// * `pending_indexes` - identifies which indexes in the `transactions` list are still pending
fn filter_pending_packets_from_pending_txs(
bank: &Bank,
transactions: &[RuntimeTransaction<SanitizedTransaction>],
transactions: &[impl TransactionWithMeta],
pending_indexes: &[usize],
) -> Vec<usize> {
let filter =
Expand Down Expand Up @@ -868,6 +867,7 @@ mod tests {
solana_poh::poh_recorder::{PohRecorder, Record, WorkingBankEntry},
solana_rpc::transaction_status_service::TransactionStatusService,
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
account::AccountSharedData,
account_utils::StateMut,
Expand Down Expand Up @@ -2347,7 +2347,7 @@ mod tests {

let lock_account = transactions[0].message.account_keys[1];
let manual_lock_tx =
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
RuntimeTransaction::from_transaction_for_tests(system_transaction::transfer(
&Keypair::new(),
&lock_account,
1,
Expand Down
14 changes: 6 additions & 8 deletions core/src/banking_stage/forward_packet_batches_by_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ use {
},
solana_feature_set::FeatureSet,
solana_perf::packet::Packet,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_svm_transaction::svm_message::SVMMessage,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
std::sync::Arc,
};

Expand Down Expand Up @@ -106,7 +105,7 @@ impl ForwardPacketBatchesByAccounts {

pub fn try_add_packet(
&mut self,
sanitized_transaction: &RuntimeTransaction<impl SVMMessage>,
sanitized_transaction: &impl TransactionWithMeta,
immutable_packet: Arc<ImmutableDeserializedPacket>,
feature_set: &FeatureSet,
) -> bool {
Expand Down Expand Up @@ -147,7 +146,7 @@ impl ForwardPacketBatchesByAccounts {
// put into batch #3 to satisfy all batch limits.
fn get_batch_index_by_updated_costs(
&self,
tx_cost: &TransactionCost<impl SVMMessage>,
tx_cost: &TransactionCost<impl TransactionWithMeta>,
updated_costs: &UpdatedCosts,
) -> usize {
let Some(batch_index_by_block_limit) =
Expand All @@ -174,6 +173,7 @@ mod tests {
lazy_static::lazy_static,
solana_cost_model::transaction_cost::{UsageCostDetails, WritableKeysTransaction},
solana_feature_set::FeatureSet,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
message::Message,
Expand Down Expand Up @@ -217,8 +217,7 @@ mod tests {

fn zero_transaction_cost() -> TransactionCost<'static, WritableKeysTransaction> {
lazy_static! {
static ref DUMMY_TRANSACTION: RuntimeTransaction<WritableKeysTransaction> =
RuntimeTransaction::new_for_tests(WritableKeysTransaction(vec![]));
static ref DUMMY_TRANSACTION: WritableKeysTransaction = WritableKeysTransaction(vec![]);
};

TransactionCost::Transaction(UsageCostDetails {
Expand Down Expand Up @@ -377,8 +376,7 @@ mod tests {
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
forward_packet_batches_by_accounts.batch_vote_limit = test_cost + 1;

let dummy_transaction =
RuntimeTransaction::new_for_tests(WritableKeysTransaction(vec![]));
let dummy_transaction = WritableKeysTransaction(vec![]);
let transaction_cost = TransactionCost::SimpleVote {
transaction: &dummy_transaction,
};
Expand Down
5 changes: 2 additions & 3 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use {
solana_perf::{data_budget::DataBudget, packet::Packet},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
solana_sdk::{pubkey::Pubkey, transport::TransportError},
solana_streamer::sendmmsg::batch_send,
solana_svm_transaction::svm_message::SVMMessage,
std::{
iter::repeat,
net::{SocketAddr, UdpSocket},
Expand Down Expand Up @@ -66,7 +65,7 @@ impl<T: LikeClusterInfo> Forwarder<T> {

pub fn try_add_packet(
&mut self,
sanitized_transaction: &RuntimeTransaction<impl SVMMessage>,
sanitized_transaction: &impl TransactionWithMeta,
immutable_packet: Arc<ImmutableDeserializedPacket>,
feature_set: &FeatureSet,
) -> bool {
Expand Down
56 changes: 21 additions & 35 deletions core/src/banking_stage/qos_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,12 @@ use {
solana_feature_set::FeatureSet,
solana_measure::measure::Measure,
solana_runtime::bank::Bank,
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_runtime_transaction::transaction_with_meta::TransactionWithMeta,
solana_sdk::{
clock::Slot,
saturating_add_assign,
transaction::{self, SanitizedTransaction, TransactionError},
transaction::{self, TransactionError},
},
solana_svm_transaction::svm_message::SVMMessage,
std::sync::atomic::{AtomicU64, Ordering},
};

Expand All @@ -41,15 +40,12 @@ impl QosService {
/// include in the slot, and accumulate costs in the cost tracker.
/// Returns a vector of results containing selected transaction costs, and the number of
/// transactions that were *NOT* selected.
pub fn select_and_accumulate_transaction_costs<'a>(
pub fn select_and_accumulate_transaction_costs<'a, Tx: TransactionWithMeta>(
&self,
bank: &Bank,
transactions: &'a [RuntimeTransaction<SanitizedTransaction>],
transactions: &'a [Tx],
pre_results: impl Iterator<Item = transaction::Result<()>>,
) -> (
Vec<transaction::Result<TransactionCost<'a, SanitizedTransaction>>>,
u64,
) {
) -> (Vec<transaction::Result<TransactionCost<'a, Tx>>>, u64) {
let transaction_costs =
self.compute_transaction_costs(&bank.feature_set, transactions.iter(), pre_results);
let (transactions_qos_cost_results, num_included) = self.select_transactions_per_cost(
Expand All @@ -71,12 +67,12 @@ impl QosService {

// invoke cost_model to calculate cost for the given list of transactions that have not
// been filtered out already.
fn compute_transaction_costs<'a>(
fn compute_transaction_costs<'a, Tx: TransactionWithMeta>(
&self,
feature_set: &FeatureSet,
transactions: impl Iterator<Item = &'a RuntimeTransaction<SanitizedTransaction>>,
transactions: impl Iterator<Item = &'a Tx>,
pre_results: impl Iterator<Item = transaction::Result<()>>,
) -> Vec<transaction::Result<TransactionCost<'a, SanitizedTransaction>>> {
) -> Vec<transaction::Result<TransactionCost<'a, Tx>>> {
let mut compute_cost_time = Measure::start("compute_cost_time");
let txs_costs: Vec<_> = transactions
.zip(pre_results)
Expand All @@ -97,17 +93,12 @@ impl QosService {
/// Given a list of transactions and their costs, this function returns a corresponding
/// list of Results that indicate if a transaction is selected to be included in the current block,
/// and a count of the number of transactions that would fit in the block
fn select_transactions_per_cost<'a>(
fn select_transactions_per_cost<'a, Tx: TransactionWithMeta>(
&self,
transactions: impl Iterator<Item = &'a RuntimeTransaction<SanitizedTransaction>>,
transactions_costs: impl Iterator<
Item = transaction::Result<TransactionCost<'a, SanitizedTransaction>>,
>,
transactions: impl Iterator<Item = &'a Tx>,
transactions_costs: impl Iterator<Item = transaction::Result<TransactionCost<'a, Tx>>>,
bank: &Bank,
) -> (
Vec<transaction::Result<TransactionCost<'a, SanitizedTransaction>>>,
usize,
) {
) -> (Vec<transaction::Result<TransactionCost<'a, Tx>>>, usize) {
let mut cost_tracking_time = Measure::start("cost_tracking_time");
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
Expand Down Expand Up @@ -162,10 +153,8 @@ impl QosService {

/// Removes transaction costs from the cost tracker if not committed or recorded, or
/// updates the transaction costs for committed transactions.
pub fn remove_or_update_costs<'a>(
transaction_cost_results: impl Iterator<
Item = &'a transaction::Result<TransactionCost<'a, SanitizedTransaction>>,
>,
pub fn remove_or_update_costs<'a, Tx: TransactionWithMeta + 'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost<'a, Tx>>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Bank,
) {
Expand All @@ -183,10 +172,8 @@ impl QosService {

/// For recorded transactions, remove units reserved by uncommitted transaction, or update
/// units for committed transactions.
fn remove_or_update_recorded_transaction_costs<'a>(
transaction_cost_results: impl Iterator<
Item = &'a transaction::Result<TransactionCost<'a, SanitizedTransaction>>,
>,
fn remove_or_update_recorded_transaction_costs<'a, Tx: TransactionWithMeta + 'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost<'a, Tx>>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Bank,
) {
Expand Down Expand Up @@ -223,10 +210,8 @@ impl QosService {
}

/// Remove reserved units for transaction batch that unsuccessfully recorded.
fn remove_unrecorded_transaction_costs<'a>(
transaction_cost_results: impl Iterator<
Item = &'a transaction::Result<TransactionCost<'a, SanitizedTransaction>>,
>,
fn remove_unrecorded_transaction_costs<'a, Tx: TransactionWithMeta + 'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost<'a, Tx>>>,
bank: &Bank,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
Expand Down Expand Up @@ -342,7 +327,7 @@ impl QosService {

// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
// execution_cost from the batch of transactions selected for block.
fn accumulate_batched_transaction_costs<'a, Tx: SVMMessage + 'a>(
fn accumulate_batched_transaction_costs<'a, Tx: TransactionWithMeta + 'a>(
transactions_costs: impl Iterator<Item = &'a transaction::Result<TransactionCost<'a, Tx>>>,
) -> BatchedTransactionDetails {
let mut batched_transaction_details = BatchedTransactionDetails::default();
Expand Down Expand Up @@ -627,6 +612,7 @@ mod tests {
itertools::Itertools,
solana_cost_model::transaction_cost::{UsageCostDetails, WritableKeysTransaction},
solana_runtime::genesis_utils::{create_genesis_config, GenesisConfigInfo},
solana_runtime_transaction::runtime_transaction::RuntimeTransaction,
solana_sdk::{
hash::Hash,
signature::{Keypair, Signer},
Expand Down Expand Up @@ -953,7 +939,7 @@ mod tests {
let programs_execution_cost = 10;
let num_txs = 4;

let dummy_transaction = RuntimeTransaction::new_for_tests(WritableKeysTransaction(vec![]));
let dummy_transaction = WritableKeysTransaction(vec![]);
let tx_cost_results: Vec<_> = (0..num_txs)
.map(|n| {
if n % 2 == 0 {
Expand Down
Loading

0 comments on commit 7037f88

Please sign in to comment.