Skip to content

Commit

Permalink
add comments and restructure code
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Mar 5, 2024
1 parent 18df742 commit 921d486
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 57 deletions.
4 changes: 2 additions & 2 deletions bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {
bench_tps_client::*,
cli::{ComputeUnitPrice, Config, InstructionPaddingConfig},
confirmations_processing::{
create_log_transactions_service_and_sender, SignatureBatch, SignatureBatchSender,
create_log_transactions_service_and_sender, SignatureBatchSender, TransactionInfoBatch,
},
perf_utils::{sample_txs, SampleStats},
send_batch::*,
Expand Down Expand Up @@ -975,7 +975,7 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(

if let Some(signatures_sender) = &signatures_sender {
if signatures_sender
.send(SignatureBatch {
.send(TransactionInfoBatch {
signatures,
sent_at: Utc::now(),
compute_unit_prices,
Expand Down
120 changes: 65 additions & 55 deletions bench-tps/src/confirmations_processing.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! `LogTransactionService` requests confirmed blocks, analyses transactions submitted by bench-tps,
//! and saves log files in csv format.
use {
crate::bench_tps_client::{BenchTpsClient, Result},
chrono::{DateTime, TimeZone, Utc},
Expand Down Expand Up @@ -25,58 +28,21 @@ use {
},
};

const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT;
const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64;

// Data to establish communication between sender thread and
// LogTransactionService.
#[derive(Clone)]
pub(crate) struct SignatureBatch {
pub(crate) struct TransactionInfoBatch {
pub signatures: Vec<Signature>,
pub sent_at: DateTime<Utc>,
pub compute_unit_prices: Vec<Option<u64>>,
}

#[derive(Clone)]
pub(crate) struct TransactionSendInfo {
pub sent_at: DateTime<Utc>,
pub compute_unit_price: Option<u64>,
}
pub(crate) type SignatureBatchSender = Sender<TransactionInfoBatch>;

#[derive(Clone, Serialize)]
struct BlockData {
pub block_hash: String,
pub block_slot: Slot,
pub block_leader: String,
pub block_time: Option<DateTime<Utc>>,
pub total_num_transactions: usize,
pub num_bench_tps_transactions: usize,
pub total_cu_consumed: u64,
pub bench_tps_cu_consumed: u64,
}

#[derive(Clone, Serialize)]
struct TransactionData {
pub signature: String,
pub sent_at: String, //TODO(klykov) use consistently DateTime<Utc>? I think it cane be serialized
pub confirmed_slot: Option<Slot>,
pub confirmed_at: Option<DateTime<Utc>>,
pub successful: bool,
pub slot_leader: Option<String>,
pub error: Option<String>,
pub blockhash: Option<String>,
pub timed_out: bool,
pub compute_unit_price: u64,
}

fn check_confirmations(
block_data_file: Option<&String>,
transaction_data_file: Option<&String>,
) -> bool {
block_data_file.is_some() || transaction_data_file.is_some()
pub(crate) struct LogTransactionService {
thread_handler: JoinHandle<()>,
}

pub(crate) type SignatureBatchSender = Sender<SignatureBatch>;
type SignatureBatchReceiver = Receiver<SignatureBatch>;

pub(crate) fn create_log_transactions_service_and_sender<T>(
client: &Arc<T>,
block_data_file: Option<&String>,
Expand All @@ -95,15 +61,22 @@ where
}
}

pub(crate) struct LogTransactionService {
thread_handler: JoinHandle<()>,
// How often process blocks.
const PROCESS_BLOCKS_EVERY_MS: u64 = 16 * DEFAULT_MS_PER_SLOT;
// Max age for transaction in the transaction map.
const REMOVE_TIMEOUT_TX_EVERY_SEC: i64 = (MAX_PROCESSING_AGE as f64 * DEFAULT_S_PER_SLOT) as i64;

// Map used to filter submitted transactions.
#[derive(Clone)]
struct TransactionSendInfo {
pub sent_at: DateTime<Utc>,
pub compute_unit_price: Option<u64>,
}
type MapSignatureToTxInfo = HashMap<Signature, TransactionSendInfo>;

impl LogTransactionService {
pub fn join(self) -> thread::Result<()> {
self.thread_handler.join()
}
type SignatureBatchReceiver = Receiver<TransactionInfoBatch>;

impl LogTransactionService {
fn new<T>(
client: &Arc<T>,
signature_receiver: SignatureBatchReceiver,
Expand All @@ -129,6 +102,10 @@ impl LogTransactionService {
Self { thread_handler }
}

pub fn join(self) -> thread::Result<()> {
self.thread_handler.join()
}

fn run<T>(client: Arc<T>, signature_receiver: SignatureBatchReceiver, mut log_writer: LogWriter)
where
T: 'static + BenchTpsClient + Send + Sync + ?Sized,
Expand All @@ -147,12 +124,12 @@ impl LogTransactionService {

let mut start_block = get_slot_with_retry(&client).expect("get_slot_with_retry succeed");

let mut signature_to_tx_info = HashMap::<Signature, TransactionSendInfo>::new();
let mut signature_to_tx_info = MapSignatureToTxInfo::new();
loop {
select! {
recv(signature_receiver) -> msg => {
match msg {
Ok(SignatureBatch {
Ok(TransactionInfoBatch {
signatures,
sent_at,
compute_unit_prices
Expand Down Expand Up @@ -219,7 +196,7 @@ impl LogTransactionService {

fn process_block(
block: UiConfirmedBlock,
signature_to_tx_info: &mut HashMap<Signature, TransactionSendInfo>,
signature_to_tx_info: &mut MapSignatureToTxInfo,
slot: u64,
log_writer: &mut LogWriter,
) {
Expand Down Expand Up @@ -247,7 +224,7 @@ impl LogTransactionService {
let cu_consumed = meta
.as_ref()
.map_or(0, |meta| match meta.compute_units_consumed {
OptionSerializer::Some(cu_consumed) => cu_consumed, //TODO(klykov): consider adding error info as well
OptionSerializer::Some(cu_consumed) => cu_consumed,
_ => 0,
});
let signature = &transaction.signatures[0];
Expand Down Expand Up @@ -288,7 +265,7 @@ impl LogTransactionService {

fn clean_transaction_map(
log_writer: &mut LogWriter,
signature_to_tx_info: &mut HashMap<Signature, TransactionSendInfo>,
signature_to_tx_info: &mut MapSignatureToTxInfo,
) {
let now: DateTime<Utc> = Utc::now();
signature_to_tx_info.retain(|signature, tx_info| {
Expand All @@ -313,6 +290,39 @@ impl LogTransactionService {
}
}

fn check_confirmations(
block_data_file: Option<&String>,
transaction_data_file: Option<&String>,
) -> bool {
block_data_file.is_some() || transaction_data_file.is_some()
}

#[derive(Clone, Serialize)]
struct BlockData {
pub block_hash: String,
pub block_slot: Slot,
pub block_leader: String,
pub block_time: Option<DateTime<Utc>>,
pub total_num_transactions: usize,
pub num_bench_tps_transactions: usize,
pub total_cu_consumed: u64,
pub bench_tps_cu_consumed: u64,
}

#[derive(Clone, Serialize)]
struct TransactionData {
pub signature: String,
pub sent_at: Option<DateTime<Utc>>,
pub confirmed_slot: Option<Slot>,
pub confirmed_at: Option<DateTime<Utc>>,
pub successful: bool,
pub slot_leader: Option<String>,
pub error: Option<String>,
pub blockhash: Option<String>,
pub timed_out: bool,
pub compute_unit_price: u64,
}

type CsvFileWriter = csv::Writer<File>;
struct LogWriter {
block_log_writer: Option<CsvFileWriter>,
Expand Down Expand Up @@ -357,7 +367,7 @@ impl LogWriter {
.latest()
.expect("valid timestamp")
}),
sent_at: sent_at.to_string(),
sent_at: Some(sent_at),
successful: meta.as_ref().map_or(false, |m| m.status.is_ok()),
error: meta
.as_ref()
Expand Down

0 comments on commit 921d486

Please sign in to comment.