Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor log_messages_bytes_limit parameter usage #949

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ fn main() {
num_banking_threads,
None,
replay_vote_sender,
None,
Arc::new(connection_cache),
bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down
3 changes: 1 addition & 2 deletions core/benches/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
);
let (s, _r) = unbounded();
let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));
// This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor.
bencher.iter(move || {
Expand Down Expand Up @@ -299,7 +299,6 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
gossip_vote_receiver,
None,
s,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down
2 changes: 1 addition & 1 deletion core/benches/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ fn create_consumer(poh_recorder: &RwLock<PohRecorder>) -> Consumer {
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender, Arc::default());
let transaction_recorder = poh_recorder.read().unwrap().new_recorder();
Consumer::new(committer, transaction_recorder, QosService::new(0), None)
Consumer::new(committer, transaction_recorder, QosService::new(0))
}

struct BenchFrame {
Expand Down
23 changes: 1 addition & 22 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ impl BankingStage {
gossip_vote_receiver: BankingPacketReceiver,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand All @@ -351,7 +350,6 @@ impl BankingStage {
Self::num_threads(),
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
Expand All @@ -370,7 +368,6 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand All @@ -387,7 +384,6 @@ impl BankingStage {
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
Expand All @@ -402,7 +398,6 @@ impl BankingStage {
num_threads,
transaction_status_sender,
replay_vote_sender,
log_messages_bytes_limit,
connection_cache,
bank_forks,
prioritization_fee_cache,
Expand All @@ -421,7 +416,6 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand Down Expand Up @@ -486,7 +480,6 @@ impl BankingStage {
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
forwarder,
unprocessed_transaction_storage,
)
Expand All @@ -505,7 +498,6 @@ impl BankingStage {
num_threads: u32,
transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
Expand Down Expand Up @@ -542,7 +534,6 @@ impl BankingStage {
decision_maker.clone(),
committer.clone(),
transaction_recorder.clone(),
log_messages_bytes_limit,
Forwarder::new(
poh_recorder.clone(),
bank_forks.clone(),
Expand Down Expand Up @@ -574,7 +565,6 @@ impl BankingStage {
committer.clone(),
poh_recorder.read().unwrap().new_recorder(),
QosService::new(id),
log_messages_bytes_limit,
),
finished_work_sender.clone(),
poh_recorder.read().unwrap().new_leader_bank_notifier(),
Expand Down Expand Up @@ -636,17 +626,11 @@ impl BankingStage {
decision_maker: DecisionMaker,
committer: Committer,
transaction_recorder: TransactionRecorder,
log_messages_bytes_limit: Option<usize>,
mut forwarder: Forwarder,
unprocessed_transaction_storage: UnprocessedTransactionStorage,
) -> JoinHandle<()> {
let mut packet_receiver = PacketReceiver::new(id, packet_receiver, bank_forks);
let consumer = Consumer::new(
committer,
transaction_recorder,
QosService::new(id),
log_messages_bytes_limit,
);
let consumer = Consumer::new(committer, transaction_recorder, QosService::new(id));

Builder::new()
.name(format!("solBanknStgTx{id:02}"))
Expand Down Expand Up @@ -881,7 +865,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -937,7 +920,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -1017,7 +999,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks.clone(), // keep a local-copy of bank-forks so worker threads do not lose weak access to bank-forks
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -1188,7 +1169,6 @@ mod tests {
3,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down Expand Up @@ -1379,7 +1359,6 @@ mod tests {
gossip_vote_receiver,
None,
replay_vote_sender,
None,
Arc::new(ConnectionCache::new("connection_cache_test")),
bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
Expand Down
2 changes: 1 addition & 1 deletion core/src/banking_stage/consume_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let (consume_sender, consume_receiver) = unbounded();
let (consumed_sender, consumed_receiver) = unbounded();
Expand Down
29 changes: 13 additions & 16 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,21 +88,18 @@ pub struct Consumer {
committer: Committer,
transaction_recorder: TransactionRecorder,
qos_service: QosService,
log_messages_bytes_limit: Option<usize>,
}

impl Consumer {
pub fn new(
committer: Committer,
transaction_recorder: TransactionRecorder,
qos_service: QosService,
log_messages_bytes_limit: Option<usize>,
) -> Self {
Self {
committer,
transaction_recorder,
qos_service,
log_messages_bytes_limit,
}
}

Expand Down Expand Up @@ -935,7 +932,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));
let process_transactions_summary =
consumer.process_transactions(&bank, &Instant::now(), &transactions);

Expand Down Expand Up @@ -1110,7 +1107,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1300,7 +1297,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1404,7 +1401,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1483,7 +1480,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
let get_tx_count = || bank.read_cost_tracker().unwrap().transaction_count();
Expand Down Expand Up @@ -1646,7 +1643,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
Expand Down Expand Up @@ -1851,7 +1848,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1));

let process_transactions_summary =
consumer.process_transactions(&bank, &Instant::now(), &transactions);
Expand Down Expand Up @@ -1980,7 +1977,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let _ = consumer.process_and_record_transactions(&bank, &transactions, 0);

Expand Down Expand Up @@ -2125,7 +2122,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

let _ = consumer.process_and_record_transactions(&bank, &[sanitized_tx.clone()], 0);

Expand Down Expand Up @@ -2185,7 +2182,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down Expand Up @@ -2270,7 +2267,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down Expand Up @@ -2322,7 +2319,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down Expand Up @@ -2454,7 +2451,7 @@ mod tests {
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let consumer = Consumer::new(committer, recorder, QosService::new(1));

// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
Expand Down
Loading