Skip to content

Commit

Permalink
some small improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillLykov committed Mar 5, 2024
1 parent 921d486 commit 23fab83
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
2 changes: 1 addition & 1 deletion bench-tps/src/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ fn do_tx_transfers<T: BenchTpsClient + ?Sized>(
})
.is_err()
{
info!("Receiver has been dropped, stop sending transactions.");
error!("Receiver has been dropped, stop sending transactions.");
break;
}
}
Expand Down
28 changes: 13 additions & 15 deletions bench-tps/src/confirmations_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use {
collections::HashMap,
fs::File,
sync::Arc,
thread::{self, Builder, JoinHandle},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
Expand Down Expand Up @@ -134,29 +134,24 @@ impl LogTransactionService {
sent_at,
compute_unit_prices
}) => {
let mut measure_send_txs = Measure::start("measure_update_map");
signatures.iter().zip(compute_unit_prices).for_each( |(sign, compute_unit_price)| {signature_to_tx_info.insert(*sign, TransactionSendInfo {
sent_at,
compute_unit_price
});});

measure_send_txs.stop();
let time_send_ns = measure_send_txs.as_ns();
info!("@@@ Time to add signatures to map: {time_send_ns}")
}
Err(e) => {
info!("Stop LogTransactionService, error message received {e}");
info!("@@@ Stop LogTransactionService, message received: {e}");
log_writer.flush();
break;
}
}
},
recv(block_processing_timer_receiver) -> _ => {
let mut measure_send_txs = Measure::start("measure_update_map");
info!("sign_receiver queue len: {}", signature_receiver.len());
let mut measure_process_blocks = Measure::start("measure_process_blocks");
info!("@@@ sign_receiver queue len: {}", signature_receiver.len());
let block_slots = get_blocks_with_retry(&client, start_block);
let Ok(block_slots) = block_slots else {
error!("Failed to get blocks");
error!("Failed to get blocks, stop LogWriterService.");
drop(signature_receiver);
break;
};
Expand All @@ -170,6 +165,7 @@ impl LogTransactionService {
rpc_block_config
)
});
let num_blocks = blocks.len();
for (block, slot) in blocks.zip(&block_slots) {
let Ok(block) = block else {
continue;
Expand All @@ -185,10 +181,9 @@ impl LogTransactionService {

// maybe ok to write every time here? Or create a separate timer
log_writer.flush();
measure_send_txs.stop();
let time_send_ns = measure_send_txs.as_ns();
info!("@@@ Time to process blocks: {time_send_ns}")

measure_process_blocks.stop();
let time_send_us = measure_process_blocks.as_us();
info!("@@@ Time to process {num_blocks} blocks: {time_send_us}")
},
}
}
Expand Down Expand Up @@ -247,7 +242,7 @@ impl LogTransactionService {
Some(block.blockhash.clone()),
Some(slot_leader.clone()),
compute_unit_price,
true,
false,
);
}
}
Expand Down Expand Up @@ -423,6 +418,7 @@ impl LogWriter {
}

const NUM_RETRY: u64 = 5;
const RETRY_EVERY_MS: u64 = 4 * DEFAULT_MS_PER_SLOT;

fn get_slot_with_retry<T>(client: &Arc<T>) -> Result<Slot>
where
Expand All @@ -437,6 +433,7 @@ where
}
Err(error) => {
warn!("Failed to get slot: {error}, retry.");
sleep(Duration::from_millis(RETRY_EVERY_MS));
}
}
}
Expand All @@ -456,6 +453,7 @@ where
}
Err(error) => {
warn!("Failed to download blocks: {error}, retry.");
sleep(Duration::from_millis(RETRY_EVERY_MS));
}
}
}
Expand Down

0 comments on commit 23fab83

Please sign in to comment.