Skip to content

Commit

Permalink
feat: support committing fee-only transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
jstarry committed Aug 3, 2024
1 parent 1141ded commit acc60fb
Show file tree
Hide file tree
Showing 18 changed files with 581 additions and 392 deletions.
24 changes: 13 additions & 11 deletions core/src/banking_stage/committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use {
},
solana_measure::measure_us,
solana_runtime::{
bank::{Bank, ExecutedTransactionCounts, TransactionBalancesSet},
bank::{Bank, ProcessedTransactionCounts, TransactionBalancesSet},
bank_utils,
prioritization_fee_cache::PrioritizationFeeCache,
transaction_batch::TransactionBatch,
Expand All @@ -15,7 +15,9 @@ use {
solana_sdk::{hash::Hash, pubkey::Pubkey, saturating_add_assign},
solana_svm::{
transaction_commit_result::{TransactionCommitResult, TransactionCommitResultExtensions},
transaction_execution_result::TransactionExecutionResult,
transaction_processing_result::{
TransactionProcessingResult, TransactionProcessingResultExtensions,
},
},
solana_transaction_status::{
token_balances::TransactionTokenBalancesSet, TransactionTokenBalance,
Expand Down Expand Up @@ -67,27 +69,27 @@ impl Committer {
pub(super) fn commit_transactions(
&self,
batch: &TransactionBatch,
execution_results: Vec<TransactionExecutionResult>,
processing_results: Vec<TransactionProcessingResult>,
last_blockhash: Hash,
lamports_per_signature: u64,
starting_transaction_index: Option<usize>,
bank: &Arc<Bank>,
pre_balance_info: &mut PreBalanceInfo,
execute_and_commit_timings: &mut LeaderExecuteAndCommitTimings,
execution_counts: &ExecutedTransactionCounts,
processed_counts: &ProcessedTransactionCounts,
) -> (u64, Vec<CommitTransactionDetails>) {
let executed_transactions = execution_results
let processed_transactions = processing_results
.iter()
.zip(batch.sanitized_transactions())
.filter_map(|(execution_result, tx)| execution_result.was_executed().then_some(tx))
.filter_map(|(processing_result, tx)| processing_result.was_processed().then_some(tx))
.collect_vec();

let (commit_results, commit_time_us) = measure_us!(bank.commit_transactions(
batch.sanitized_transactions(),
execution_results,
processing_results,
last_blockhash,
lamports_per_signature,
execution_counts,
processed_counts,
&mut execute_and_commit_timings.execute_timings,
));
execute_and_commit_timings.commit_us = commit_time_us;
Expand All @@ -99,7 +101,7 @@ impl Committer {
// transaction committed to block. qos_service uses these information to adjust
// reserved block space.
Ok(committed_tx) => CommitTransactionDetails::Committed {
compute_units: committed_tx.execution_details.executed_units,
compute_units: committed_tx.executed_units,
loaded_accounts_data_size: committed_tx
.loaded_account_stats
.loaded_accounts_data_size,
Expand All @@ -122,7 +124,7 @@ impl Committer {
starting_transaction_index,
);
self.prioritization_fee_cache
.update(bank, executed_transactions.into_iter());
.update(bank, processed_transactions.into_iter());
});
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_us;
(commit_time_us, commit_transaction_statuses)
Expand All @@ -145,7 +147,7 @@ impl Committer {
let batch_transaction_indexes: Vec<_> = commit_results
.iter()
.map(|commit_result| {
if commit_result.was_executed() {
if commit_result.was_committed() {
let this_transaction_index = transaction_index;
saturating_add_assign!(transaction_index, 1);
this_transaction_index
Expand Down
36 changes: 18 additions & 18 deletions core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,18 +234,18 @@ impl ConsumeWorkerMetrics {
}: &ExecuteAndCommitTransactionsOutput,
) {
self.count_metrics
.transactions_attempted_execution_count
.transactions_attempted_processing_count
.fetch_add(
transaction_counts.attempted_execution_count,
transaction_counts.attempted_processing_count,
Ordering::Relaxed,
);
self.count_metrics
.executed_transactions_count
.fetch_add(transaction_counts.executed_count, Ordering::Relaxed);
.processed_transactions_count
.fetch_add(transaction_counts.processed_count, Ordering::Relaxed);
self.count_metrics
.executed_with_successful_result_count
.processed_with_successful_result_count
.fetch_add(
transaction_counts.executed_with_successful_result_count,
transaction_counts.processed_with_successful_result_count,
Ordering::Relaxed,
);
self.count_metrics
Expand Down Expand Up @@ -410,9 +410,9 @@ impl ConsumeWorkerMetrics {
}

struct ConsumeWorkerCountMetrics {
transactions_attempted_execution_count: AtomicU64,
executed_transactions_count: AtomicU64,
executed_with_successful_result_count: AtomicU64,
transactions_attempted_processing_count: AtomicU64,
processed_transactions_count: AtomicU64,
processed_with_successful_result_count: AtomicU64,
retryable_transaction_count: AtomicUsize,
retryable_expired_bank_count: AtomicUsize,
cost_model_throttled_transactions_count: AtomicU64,
Expand All @@ -423,9 +423,9 @@ struct ConsumeWorkerCountMetrics {
impl Default for ConsumeWorkerCountMetrics {
fn default() -> Self {
Self {
transactions_attempted_execution_count: AtomicU64::default(),
executed_transactions_count: AtomicU64::default(),
executed_with_successful_result_count: AtomicU64::default(),
transactions_attempted_processing_count: AtomicU64::default(),
processed_transactions_count: AtomicU64::default(),
processed_with_successful_result_count: AtomicU64::default(),
retryable_transaction_count: AtomicUsize::default(),
retryable_expired_bank_count: AtomicUsize::default(),
cost_model_throttled_transactions_count: AtomicU64::default(),
Expand All @@ -441,19 +441,19 @@ impl ConsumeWorkerCountMetrics {
"banking_stage_worker_counts",
"id" => id,
(
"transactions_attempted_execution_count",
self.transactions_attempted_execution_count
"transactions_attempted_processing_count",
self.transactions_attempted_processing_count
.swap(0, Ordering::Relaxed),
i64
),
(
"executed_transactions_count",
self.executed_transactions_count.swap(0, Ordering::Relaxed),
"processed_transactions_count",
self.processed_transactions_count.swap(0, Ordering::Relaxed),
i64
),
(
"executed_with_successful_result_count",
self.executed_with_successful_result_count
"processed_with_successful_result_count",
self.processed_with_successful_result_count
.swap(0, Ordering::Relaxed),
i64
),
Expand Down
49 changes: 25 additions & 24 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
committer::{CommitTransactionDetails, Committer, PreBalanceInfo},
immutable_deserialized_packet::ImmutableDeserializedPacket,
leader_slot_metrics::{
LeaderSlotMetricsTracker, ProcessTransactionsCounts, ProcessTransactionsSummary,
CommittedTransactionsCounts, LeaderSlotMetricsTracker, ProcessTransactionsSummary,
},
leader_slot_timing_metrics::LeaderExecuteAndCommitTimings,
qos_service::QosService,
Expand Down Expand Up @@ -34,6 +34,7 @@ use {
solana_svm::{
account_loader::{validate_fee_payer, TransactionCheckResult},
transaction_error_metrics::TransactionErrorMetrics,
transaction_processing_result::TransactionProcessingResultExtensions,
transaction_processor::{ExecutionRecordingConfig, TransactionProcessingConfig},
},
solana_timings::ExecuteTimings,
Expand Down Expand Up @@ -73,13 +74,13 @@ pub struct ExecuteAndCommitTransactionsOutput {
#[derive(Debug, Default, PartialEq)]
pub struct ExecuteAndCommitTransactionsCounts {
// Total number of transactions that were passed as candidates for execution
pub(crate) attempted_execution_count: u64,
// The number of transactions of that were executed. See description of in `ProcessTransactionsSummary`
pub(crate) attempted_processing_count: u64,
// The number of transactions of that were processed. See description of in `ProcessTransactionsSummary`
// for possible outcomes of execution.
pub(crate) executed_count: u64,
// Total number of the executed transactions that returned success/not
pub(crate) processed_count: u64,
// Total number of the processed transactions that returned success/not
// an error.
pub(crate) executed_with_successful_result_count: u64,
pub(crate) processed_with_successful_result_count: u64,
}

pub struct Consumer {
Expand Down Expand Up @@ -284,7 +285,7 @@ impl Consumer {
) -> ProcessTransactionsSummary {
let mut chunk_start = 0;
let mut all_retryable_tx_indexes = vec![];
let mut total_transaction_counts = ProcessTransactionsCounts::default();
let mut total_transaction_counts = CommittedTransactionsCounts::default();
let mut total_cost_model_throttled_transactions_count: u64 = 0;
let mut total_cost_model_us: u64 = 0;
let mut total_execute_and_commit_timings = LeaderExecuteAndCommitTimings::default();
Expand Down Expand Up @@ -622,22 +623,22 @@ impl Consumer {
execute_and_commit_timings.load_execute_us = load_execute_us;

let LoadAndExecuteTransactionsOutput {
execution_results,
execution_counts,
processing_results,
processed_counts,
} = load_and_execute_transactions_output;

let transaction_counts = ExecuteAndCommitTransactionsCounts {
executed_count: execution_counts.executed_transactions_count,
executed_with_successful_result_count: execution_counts.executed_successfully_count,
attempted_execution_count: execution_results.len() as u64,
processed_count: processed_counts.processed_transactions_count,
processed_with_successful_result_count: processed_counts.processed_successfully,
attempted_processing_count: processing_results.len() as u64,
};

let (executed_transactions, execution_results_to_transactions_us) =
measure_us!(execution_results
let (processed_transactions, execution_results_to_transactions_us) =
measure_us!(processing_results
.iter()
.zip(batch.sanitized_transactions())
.filter_map(|(execution_result, tx)| {
if execution_result.was_executed() {
.filter_map(|(processing_result, tx)| {
if processing_result.was_processed() {
Some(tx.to_versioned_transaction())
} else {
None
Expand All @@ -661,7 +662,7 @@ impl Consumer {

let (record_transactions_summary, record_us) = measure_us!(self
.transaction_recorder
.record_transactions(bank.slot(), executed_transactions));
.record_transactions(bank.slot(), processed_transactions));
execute_and_commit_timings.record_us = record_us;

let RecordTransactionsSummary {
Expand All @@ -675,8 +676,8 @@ impl Consumer {
};

if let Err(recorder_err) = record_transactions_result {
retryable_transaction_indexes.extend(execution_results.iter().enumerate().filter_map(
|(index, execution_result)| execution_result.was_executed().then_some(index),
retryable_transaction_indexes.extend(processing_results.iter().enumerate().filter_map(
|(index, processing_result)| processing_result.was_processed().then_some(index),
));

return ExecuteAndCommitTransactionsOutput {
Expand All @@ -691,22 +692,22 @@ impl Consumer {
}

let (commit_time_us, commit_transaction_statuses) =
if execution_counts.executed_transactions_count != 0 {
if processed_counts.processed_transactions_count != 0 {
self.committer.commit_transactions(
batch,
execution_results,
processing_results,
last_blockhash,
lamports_per_signature,
starting_transaction_index,
bank,
&mut pre_balance_info,
&mut execute_and_commit_timings,
&execution_counts,
&processed_counts,
)
} else {
(
0,
vec![CommitTransactionDetails::NotCommitted; execution_results.len()],
vec![CommitTransactionDetails::NotCommitted; processing_results.len()],
)
};

Expand All @@ -727,7 +728,7 @@ impl Consumer {
);

debug_assert_eq!(
transaction_counts.attempted_execution_count,
transaction_counts.attempted_processing_count,
commit_transaction_statuses.len() as u64,
);

Expand Down
Loading

0 comments on commit acc60fb

Please sign in to comment.