Skip to content

Commit

Permalink
Refactor log_messages_bytes_limit parameter usage
Browse files Browse the repository at this point in the history
Replacing `log_messages_bytes_limit` parameter inside `TransactionBatchProcessor::execute_loaded_transaction()` with `self.runtime_config.log_messages_bytes_limit` allows removing many `log_messages_bytes_limit` parameters propagated across the codebase.
  • Loading branch information
andreisilviudragnea committed Apr 24, 2024
1 parent 23905c0 commit 45334e7
Show file tree
Hide file tree
Showing 19 changed files with 28 additions and 134 deletions.
1 change: 0 additions & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ fn main() {
num_banking_threads,
None,
replay_vote_sender,
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down
3 changes: 1 addition & 2 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
Expand Down Expand Up @@ -299,7 +299,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
gossip_vote_receiver,
None,
s,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down
2 changes: 1 addition & 1 deletion core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ fn create_consumer(poh_recorder: &RwLock<PohRecorder>) -> Consumer {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender, Arc::default());
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();
Consumer::new(committer, transaction_recorder, QosService::new(0), None)
Consumer::new(committer, transaction_recorder, QosService::new(0))
}

struct BenchFrame {
Expand Down
23 changes: 1 addition & 22 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@ impl BankingStage {
gossip_vote_receiver: BankingPacketReceiver,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand All @@ -346,7 +345,6 @@ impl BankingStage {
Self::num_threads(),
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
Expand All @@ -364,7 +362,6 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand All @@ -380,7 +377,6 @@ impl BankingStage {
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
Expand All @@ -395,7 +391,6 @@ impl BankingStage {
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
Expand All @@ -413,7 +408,6 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand Down Expand Up @@ -478,7 +472,6 @@ impl BankingStage {
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
forwarder,
unprocessed_transaction_storage,
)
Expand All @@ -497,7 +490,6 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand Down Expand Up @@ -533,7 +525,6 @@ impl BankingStage {
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
Expand Down Expand Up @@ -565,7 +556,6 @@ impl BankingStage {
committer.clone(),
poh_recorder.read().unwrap().new_recorder(),
QosService::new(id),
log_messages_bytes_limit,
),
finished_work_sender.clone(),
poh_recorder.read().unwrap().new_leader_bank_notifier(),
Expand Down Expand Up @@ -616,17 +606,11 @@ impl BankingStage {
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
forwarder: Forwarder,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
let consumer = Consumer::new(
committer,
transaction_recorder,
QosService::new(id),
log_messages_bytes_limit,
);
let consumer = Consumer::new(committer, transaction_recorder, QosService::new(id));

Builder::new()
.name(format!("solBanknStgTx{id:02}"))
Expand Down Expand Up @@ -866,7 +850,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -921,7 +904,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -1000,7 +982,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -1170,7 +1151,6 @@ mod tests {
3,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -1361,7 +1341,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let (consume_sender, consume_receiver) = unbounded();
let (consumed_sender, consumed_receiver) = unbounded();
Expand Down
30 changes: 13 additions & 17 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,21 +78,18 @@ pub struct Consumer {
committer: Committer,
transaction_recorder: TransactionRecorder,
qos_service: QosService,
log_messages_bytes_limit: Option<usize>,
}

impl Consumer {
pub fn new(
committer: Committer,
transaction_recorder: TransactionRecorder,
qos_service: QosService,
log_messages_bytes_limit: Option<usize>,
) -> Self {
Self {
committer,
transaction_recorder,
qos_service,
log_messages_bytes_limit,
}
}

Expand Down Expand Up @@ -597,7 +594,6 @@ impl Consumer {
ExecutionRecordingConfig::new_single_setting(transaction_status_sender_enabled),
&mut execute_and_commit_timings.execute_timings,
None, // account_overrides
self.log_messages_bytes_limit,
true,
));
execute_and_commit_timings.load_execute_us = load_execute_us;
Expand Down Expand Up @@ -922,7 +918,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));
let process_transactions_summary =
consumer.process_transactions(&bank, &Instant::now(), &transactions);

Expand Down Expand Up @@ -1095,7 +1091,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1279,7 +1275,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1380,7 +1376,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1469,7 +1465,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
let get_tx_count = || bank.read_cost_tracker().unwrap().transaction_count();
Expand Down Expand Up @@ -1624,7 +1620,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1820,7 +1816,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1));

let process_transactions_summary =
consumer.process_transactions(&bank, &Instant::now(), &transactions);
Expand Down Expand Up @@ -1947,7 +1943,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let _ = consumer.process_and_record_transactions(&bank, &transactions, 0);

Expand Down Expand Up @@ -2092,7 +2088,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let _ = consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0);

Expand Down Expand Up @@ -2152,7 +2148,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down Expand Up @@ -2230,7 +2226,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down Expand Up @@ -2282,7 +2278,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down Expand Up @@ -2407,7 +2403,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down
Loading

0 comments on commit 45334e7

Please sign in to comment.