Skip to content

Commit

Permalink
Remove the usage of result sender and receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
ksolana committed Oct 30, 2024
1 parent ba5d445 commit 976e073
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 31 deletions.
4 changes: 2 additions & 2 deletions core/src/banking_stage/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,10 +1272,10 @@ mod tests {
record.mixin,
record.transactions,
);
poh_recorder.write().unwrap().tick();
if record.sender.send(record_response).is_err() {
if record_response.is_err() {
panic!("Error returning mixin hash");
}
poh_recorder.write().unwrap().tick();
}
if is_exited.load(Ordering::Relaxed) {
break;
Expand Down
4 changes: 2 additions & 2 deletions poh/src/mpsc_ringbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,8 @@ impl<T> ArrayQueue<T> {
}
pub fn pop_with_timeout(&self, t: Duration) -> Option<T> {
// Wait/Backoff and then pop.
//todo!();
error!("recv_timeout {}ms", t.as_millis());
log::debug!("TODO: Implement wait.");
assert!(self.capacity() > 0);
self.pop()
}
}
Expand Down
12 changes: 3 additions & 9 deletions poh/src/poh_recorder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,21 @@ impl BankStart {
}
}

// Sends the Result of the record operation, including the index in the slot of the first
// transaction, if being tracked by WorkingBank
type RecordResultSender = Sender<Result<Option<usize>>>;

pub struct Record {
pub mixin: Hash,
pub transactions: Vec<VersionedTransaction>,
pub slot: Slot,
pub sender: RecordResultSender,
}
impl Record {
pub fn new(
mixin: Hash,
transactions: Vec<VersionedTransaction>,
slot: Slot,
sender: RecordResultSender,
) -> Self {
Self {
mixin,
transactions,
slot,
sender,
}
}
}
Expand Down Expand Up @@ -212,10 +205,10 @@ impl TransactionRecorder {
transactions: Vec<VersionedTransaction>,
) -> Result<Option<usize>> {
// create a new channel so that there is only 1 sender and when it goes out of scope, the receiver fails
let (result_sender, result_receiver) = bounded(1);
let (_result_sender, result_receiver) = bounded(1);
let res =
self.record_sender
.push(Record::new(mixin, transactions, bank_slot, result_sender));
.push(Record::new(mixin, transactions, bank_slot));
if res.is_err() {
// If the channel is dropped, then the validator is shutting down so return that we are hitting
// the max tick height to stop transaction processing and flush any transactions in the pipeline.
Expand Down Expand Up @@ -314,6 +307,7 @@ pub struct PohRecorder {
delay_leader_block_for_pending_fork: bool,
last_reported_slot_for_pending_fork: Arc<Mutex<Slot>>,
pub is_exited: Arc<AtomicBool>,
// idx_counter: Arc<Mutex<usize>>,
}

impl PohRecorder {
Expand Down
22 changes: 4 additions & 18 deletions poh/src/poh_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
},
log::*,
solana_entry::poh::Poh,
solana_measure::{measure::Measure, measure_us},
solana_measure::measure::Measure,
solana_sdk::poh_config::PohConfig,
std::{
sync::{
Expand Down Expand Up @@ -45,7 +45,6 @@ struct PohTiming {
total_tick_time_ns: u64,
last_metric: Instant,
total_record_time_us: u64,
total_send_record_result_us: u64,
}

impl PohTiming {
Expand All @@ -59,7 +58,6 @@ impl PohTiming {
total_tick_time_ns: 0,
last_metric: Instant::now(),
total_record_time_us: 0,
total_send_record_result_us: 0,
}
}
fn report(&mut self, ticks_per_slot: u64) {
Expand All @@ -76,11 +74,6 @@ impl PohTiming {
("total_lock_time_us", self.total_lock_time_ns / 1000, i64),
("total_hash_time_us", self.total_hash_time_ns / 1000, i64),
("total_record_time_us", self.total_record_time_us, i64),
(
"total_send_record_result_us",
self.total_send_record_result_us,
i64
),
);
self.total_sleep_us = 0;
self.num_ticks = 0;
Expand All @@ -90,7 +83,6 @@ impl PohTiming {
self.total_hash_time_ns = 0;
self.last_metric = Instant::now();
self.total_record_time_us = 0;
self.total_send_record_result_us = 0;
}
}
}
Expand Down Expand Up @@ -192,13 +184,11 @@ impl PohService {
) {
let record = record_receiver.pop_with_timeout(timeout);
if let Some(record) = record {
if record
.sender
.send(poh_recorder.write().unwrap().record(
if poh_recorder.write().unwrap().record(
record.slot,
record.mixin,
record.transactions,
))
)
.is_err()
{
panic!("Error returning mixin hash");
Expand Down Expand Up @@ -257,15 +247,11 @@ impl PohService {
timing.total_lock_time_ns += lock_time.as_ns();
let mut record_time = Measure::start("record");
loop {
let res = poh_recorder_l.record(
let _res = poh_recorder_l.record(
record.slot,
record.mixin,
std::mem::take(&mut record.transactions),
);
let (send_res, send_record_result_us) = measure_us!(record.sender.send(res));
debug_assert!(send_res.is_ok(), "Record wasn't sent.");

timing.total_send_record_result_us += send_record_result_us;
timing.num_hashes += 1; // note: may have also ticked inside record
if let Some(new_record) = record_receiver.pop() {
// we already have second request to record, so record again while we still have the mutex
Expand Down

0 comments on commit 976e073

Please sign in to comment.