Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve transaction inclusion when block is nearly full #111

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 156 additions & 13 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,33 +470,40 @@ impl Consumer {
chunk_offset: usize,
pre_results: impl Iterator<Item = Result<(), TransactionError>>,
) -> ProcessTransactionBatchOutput {
// Lock accounts so that other threads cannot encode transactions that
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow-up(?): seems these 3 operations are essentially creating a batch (lock, cost model, unlock).
Do you think this makes sense to create a function for?

// will modify the same account state
let (mut batch, lock_us) =
measure_us!(bank.prepare_sanitized_batch_with_results(txs, pre_results));

// After locking, select the transactions that can fit into the block
let (
(transaction_qos_cost_results, cost_model_throttled_transactions_count),
cost_model_us,
) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs(
bank,
txs,
pre_results
batch.sanitized_transactions(),
batch.lock_results().iter().cloned(),
));

// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let (batch, lock_us) = measure_us!(bank.prepare_sanitized_batch_with_results(
txs,
transaction_qos_cost_results.iter().map(|r| match r {
Ok(_cost) => Ok(()),
Err(err) => Err(err.clone()),
})
));
// Unlock accounts for any transactions that were not selected for the block;
batch.unlock_failures(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt it outweighs the benefits of this, but did you measure the impact of this change on performance when we have many small batches? Maybe run bench-tps as a sanity check on this, using --num-conflict-groups 1

We're grabbing the account locks lock 3 times instead of twice per batch now, so could lead to more contention for that lock.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

banking-bench will be better probably to highlight any differences.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran the banking-bench with write lock contention and this change brings in a pretty consistent perf hit that I can't really debug. After discussing with @sakridge I think it's best to just skip this change to avoid the backport risk to v1.17 and wait until v1.18 and the new scheduler..

cargo run --release --manifest-path banking-bench/Cargo.toml -- --write-lock-contention full --iterations 100

before

[total_sent: 115200, base_tx_count: 36864, txs_processed: 140306, txs_landed: 103442, total_us: 6721644, tx_total_us: 6620406]
{'name': 'banking_bench_total', 'median': '17138.66'}
{'name': 'banking_bench_tx_total', 'median': '17400.75'}
{'name': 'banking_bench_success_tx_total', 'median': '15389.39'}

after

[total_sent: 115200, base_tx_count: 36864, txs_processed: 142769, txs_landed: 105905, total_us: 7230649, tx_total_us: 7135781]
{'name': 'banking_bench_total', 'median': '15932.18'}
{'name': 'banking_bench_tx_total', 'median': '16143.99'}
{'name': 'banking_bench_success_tx_total', 'median': '14646.68'}

transaction_qos_cost_results
.iter()
.map(|r| match r {
Ok(_cost) => Ok(()),
Err(err) => Err(err.clone()),
})
.collect(),
);

// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
// and WouldExceedMaxAccountDataCostLimit
let mut execute_and_commit_transactions_output =
self.execute_and_commit_transactions_locked(bank, &batch);

// Once the accounts are new transactions can enter the pipeline to process them
// Drop the batch to unlock transaction accounts so that new
// transactions can enter the pipeline to process them
let (_, unlock_us) = measure_us!(drop(batch));

let ExecuteAndCommitTransactionsOutput {
Expand Down Expand Up @@ -1656,6 +1663,142 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_bank_process_and_record_transactions_unlock_cost_tracker_failures() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_slow_genesis_config(10_000);
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config).0;
let pubkey = solana_sdk::pubkey::new_rand();
let pubkey1 = solana_sdk::pubkey::new_rand();

let transactions = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey1, 1, genesis_config.hash()),
]);

let transactions2 = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
system_transaction::transfer(&mint_keypair, &pubkey1, 2, genesis_config.hash()),
]);

let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.clone(),
Some((4, 4)),
bank.ticks_per_slot(),
&pubkey,
Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&PohConfig::default(),
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.new_recorder();
let poh_recorder = Arc::new(RwLock::new(poh_recorder));

poh_recorder
.write()
.unwrap()
.set_bank_for_test(bank.clone());

let poh_simulator = simulate_poh(record_receiver, &poh_recorder);

// Hold onto poh lock so that workers cannot finish recording
// transactions and cannot unlock accounts immediately. This allows
// us to test having parallel workers that have account lock
// conflicts.
let poh_lock = poh_recorder.write().unwrap();
apfitzge marked this conversation as resolved.
Show resolved Hide resolved

let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let worker1 = {
let bank = bank.clone();
let committer = Committer::new(
None,
replay_vote_sender.clone(),
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None);

std::thread::spawn(move || {
// Set strict cost tracker limits so that transactions will be rejected
bank.write_cost_tracker().unwrap().set_limits(0, 0, 0);

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);

let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
retryable_transaction_indexes,
commit_transactions_result,
..
} = process_transactions_batch_output.execute_and_commit_transactions_output;

assert_eq!(transactions_attempted_execution_count, 2);
assert_eq!(executed_transactions_count, 0);
assert_eq!(retryable_transaction_indexes, vec![0, 1]);
assert!(commit_transactions_result.is_ok());
})
};

// Sleep so that processing in the other thread hits the poh recorder lock
std::thread::sleep(Duration::from_millis(10));

let worker2 = std::thread::spawn(move || {
let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(2), None);

// Relax cost tracker limits so that transactions can be processed again
bank.write_cost_tracker()
.unwrap()
.set_limits(u64::MAX, u64::MAX, u64::MAX);

let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions2, 0);

let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
retryable_transaction_indexes,
commit_transactions_result,
..
} = process_transactions_batch_output.execute_and_commit_transactions_output;

assert_eq!(transactions_attempted_execution_count, 2);
assert_eq!(executed_transactions_count, 1);
assert_eq!(retryable_transaction_indexes, vec![1]);
assert!(commit_transactions_result.is_ok());
});

// Allow both workers to advance
drop(poh_lock);

// Wait for both workers to finish
assert!(worker1.join().is_ok());
assert!(worker2.join().is_ok());

poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}

#[test]
fn test_process_transactions_instruction_error() {
solana_logger::setup();
Expand Down
Loading