Skip to content

Commit

Permalink
Refactor: rename notify to notification
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 12, 2024
1 parent 78a1d48 commit 3ae6b4b
Show file tree
Hide file tree
Showing 11 changed files with 89 additions and 78 deletions.
4 changes: 2 additions & 2 deletions openraft/src/core/balancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ impl Balancer {
self.raft_msg
}

pub(crate) fn notify(&self) -> u64 {
pub(crate) fn notification(&self) -> u64 {
self.total - self.raft_msg
}

pub(crate) fn increase_notify(&mut self) {
pub(crate) fn increase_notification(&mut self) {
self.raft_msg = self.raft_msg * 15 / 16;
if self.raft_msg == 0 {
self.raft_msg = 1;
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
pub(crate) mod balancer;
pub(crate) mod command_state;
pub(crate) mod notify;
pub(crate) mod notification;
mod raft_core;
pub(crate) mod raft_msg;
mod replication_state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::StorageError;
use crate::Vote;

/// A message coming from the internal components.
pub(crate) enum Notify<C>
pub(crate) enum Notification<C>
where C: RaftTypeConfig
{
VoteResponse {
Expand Down Expand Up @@ -59,15 +59,15 @@ where C: RaftTypeConfig
},
}

impl<C> Notify<C>
impl<C> Notification<C>
where C: RaftTypeConfig
{
pub(crate) fn sm(command_result: sm::CommandResult<C>) -> Self {
Self::StateMachine { command_result }
}
}

impl<C> fmt::Display for Notify<C>
impl<C> fmt::Display for Notification<C>
where C: RaftTypeConfig
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Expand Down
54 changes: 27 additions & 27 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::config::Config;
use crate::config::RuntimeConfig;
use crate::core::balancer::Balancer;
use crate::core::command_state::CommandState;
use crate::core::notify::Notify;
use crate::core::notification::Notification;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ClientReadTx;
Expand Down Expand Up @@ -175,10 +175,10 @@ where

/// A Sender to send callback by other components to [`RaftCore`], when an action is finished,
/// such as flushing log to disk, or applying log entries to state machine.
pub(crate) tx_notify: MpscUnboundedSenderOf<C, Notify<C>>,
pub(crate) tx_notification: MpscUnboundedSenderOf<C, Notification<C>>,

/// A Receiver to receive callback from other components.
pub(crate) rx_notify: MpscUnboundedReceiverOf<C, Notify<C>>,
pub(crate) rx_notification: MpscUnboundedReceiverOf<C, Notification<C>>,

pub(crate) tx_metrics: WatchSenderOf<C, RaftMetrics<C>>,
pub(crate) tx_data_metrics: WatchSenderOf<C, RaftDataMetrics<C>>,
Expand Down Expand Up @@ -278,7 +278,7 @@ where
let my_vote = *self.engine.state.vote_ref();
let ttl = Duration::from_millis(self.config.heartbeat_interval);
let eff_mem = self.engine.state.membership_state.effective().clone();
let core_tx = self.tx_notify.clone();
let core_tx = self.tx_notification.clone();

let mut granted = btreeset! {my_id};

Expand Down Expand Up @@ -366,7 +366,7 @@ where
vote
);

let send_res = core_tx.send(Notify::HigherVote {
let send_res = core_tx.send(Notification::HigherVote {
target,
higher: vote,
sender_vote: my_vote,
Expand Down Expand Up @@ -765,7 +765,7 @@ where
snapshot_network,
self.log_store.get_log_reader().await,
self.sm_handle.new_snapshot_reader(),
self.tx_notify.clone(),
self.tx_notification.clone(),
tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(self.id), target=display(target)),
)
}
Expand Down Expand Up @@ -858,9 +858,9 @@ where
return Err(Fatal::Stopped);
}

notify_res = self.rx_notify.recv() => {
notify_res = self.rx_notification.recv() => {
match notify_res {
Some(notify) => self.handle_notify(notify)?,
Some(notify) => self.handle_notification(notify)?,
None => {
tracing::error!("all rx_notify senders are dropped");
return Err(Fatal::Stopped);
Expand All @@ -884,14 +884,14 @@ where
// There is a message waking up the loop, process channels one by one.

let raft_msg_processed = self.process_raft_msg(balancer.raft_msg()).await?;
let notify_processed = self.process_notify(balancer.notify()).await?;
let notify_processed = self.process_notification(balancer.notification()).await?;

// If one of the channel consumed all its budget, re-balance the budget ratio.

#[allow(clippy::collapsible_else_if)]
if notify_processed == balancer.notify() {
if notify_processed == balancer.notification() {
tracing::info!("there may be more Notify to process, increase Notify ratio");
balancer.increase_notify();
balancer.increase_notification();
} else {
if raft_msg_processed == balancer.raft_msg() {
tracing::info!("there may be more RaftMsg to process, increase RaftMsg ratio");
Expand Down Expand Up @@ -946,9 +946,9 @@ where
///
/// It returns the number of processed notifications.
/// If the input channel is closed, it returns `Fatal::Stopped`.
async fn process_notify(&mut self, at_most: u64) -> Result<u64, Fatal<C>> {
async fn process_notification(&mut self, at_most: u64) -> Result<u64, Fatal<C>> {
for i in 0..at_most {
let res = self.rx_notify.try_recv();
let res = self.rx_notification.try_recv();
let notify = match res {
Ok(msg) => msg,
Err(e) => match e {
Expand All @@ -963,7 +963,7 @@ where
},
};

self.handle_notify(notify)?;
self.handle_notification(notify)?;

// TODO: does run_engine_commands() run too frequently?
// to run many commands in one shot, it is possible to batch more commands to gain
Expand Down Expand Up @@ -995,7 +995,7 @@ where
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
let mut client = self.network_factory.new_client(target, &target_node).await;

let tx = self.tx_notify.clone();
let tx = self.tx_notification.clone();

let ttl = Duration::from_millis(self.config.election_timeout_min);
let id = self.id;
Expand Down Expand Up @@ -1023,7 +1023,7 @@ where

match res {
Ok(resp) => {
let _ = tx.send(Notify::VoteResponse {
let _ = tx.send(Notification::VoteResponse {
target,
resp,
sender_vote: vote,
Expand Down Expand Up @@ -1151,11 +1151,11 @@ where

// TODO: Make this method non-async. It does not need to run any async command in it.
#[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(self.id)))]
pub(crate) fn handle_notify(&mut self, notify: Notify<C>) -> Result<(), Fatal<C>> {
pub(crate) fn handle_notification(&mut self, notify: Notification<C>) -> Result<(), Fatal<C>> {
tracing::debug!("recv from rx_notify: {}", notify);

match notify {
Notify::VoteResponse {
Notification::VoteResponse {
target,
resp,
sender_vote,
Expand All @@ -1174,7 +1174,7 @@ where
}
}

Notify::HigherVote {
Notification::HigherVote {
target,
higher,
sender_vote,
Expand All @@ -1193,7 +1193,7 @@ where
}
}

Notify::Tick { i } => {
Notification::Tick { i } => {
// check every timer

let now = C::now();
Expand Down Expand Up @@ -1241,12 +1241,12 @@ where
}
}

Notify::StorageError { error } => {
Notification::StorageError { error } => {
tracing::error!("RaftCore received Notify::StorageError: {}", error);
return Err(Fatal::StorageError(error));
}

Notify::LocalIO { io_id } => {
Notification::LocalIO { io_id } => {
match io_id {
IOId::AppendLog(append_log_io_id) => {
// No need to check against membership change,
Expand All @@ -1259,7 +1259,7 @@ where
}
}

Notify::Network { response } => {
Notification::Network { response } => {
//
match response {
replication::Response::Progress {
Expand Down Expand Up @@ -1306,7 +1306,7 @@ where
}
}

Notify::StateMachine { command_result } => {
Notification::StateMachine { command_result } => {
tracing::debug!("sm::StateMachine command result: {:?}", command_result);

let seq = command_result.command_seq;
Expand Down Expand Up @@ -1554,8 +1554,8 @@ where
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

let io_id = IOId::new_append_log(vote.into_committed(), last_log_id);
let notify = Notify::LocalIO { io_id };
let callback = IOFlushed::new(notify, self.tx_notify.downgrade());
let notify = Notification::LocalIO { io_id };
let callback = IOFlushed::new(notify, self.tx_notification.downgrade());

// Submit IO request, do not wait for the response.
self.log_store.append(entries, callback).await?;
Expand All @@ -1564,7 +1564,7 @@ where
self.log_store.save_vote(&vote).await?;
self.engine.state.io_state_mut().update_vote(vote);

let _ = self.tx_notify.send(Notify::VoteResponse {
let _ = self.tx_notification.send(Notification::VoteResponse {
target: self.id,
// last_log_id is not used when sending VoteRequest to local node
resp: VoteResponse::new(vote, None),
Expand Down
20 changes: 12 additions & 8 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::async_runtime::MpscUnboundedReceiver;
use crate::async_runtime::MpscUnboundedSender;
use crate::async_runtime::OneshotSender;
use crate::core::notify::Notify;
use crate::core::notification::Notification;
use crate::core::raft_msg::ResultSender;
use crate::core::sm::handle::Handle;
use crate::core::sm::Command;
Expand Down Expand Up @@ -46,7 +46,7 @@ where
cmd_rx: MpscUnboundedReceiverOf<C, Command<C>>,

/// Send back the result of the command to RaftCore.
resp_tx: MpscUnboundedSenderOf<C, Notify<C>>,
resp_tx: MpscUnboundedSenderOf<C, Notification<C>>,
}

impl<C, SM, LR> Worker<C, SM, LR>
Expand All @@ -56,7 +56,11 @@ where
LR: RaftLogReader<C>,
{
/// Spawn a new state machine worker, return a controlling handle.
pub(crate) fn spawn(state_machine: SM, log_reader: LR, resp_tx: MpscUnboundedSenderOf<C, Notify<C>>) -> Handle<C> {
pub(crate) fn spawn(
state_machine: SM,
log_reader: LR,
resp_tx: MpscUnboundedSenderOf<C, Notification<C>>,
) -> Handle<C> {
let (cmd_tx, cmd_rx) = C::mpsc_unbounded();

let worker = Worker {
Expand All @@ -78,7 +82,7 @@ where
if let Err(err) = res {
tracing::error!("{} while execute state machine command", err,);

let _ = self.resp_tx.send(Notify::StateMachine {
let _ = self.resp_tx.send(Notification::StateMachine {
command_result: CommandResult {
command_seq: 0,
result: Err(err),
Expand Down Expand Up @@ -124,7 +128,7 @@ where
tracing::info!("Done install complete snapshot, meta: {}", meta);

let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(Some(meta))));
let _ = self.resp_tx.send(Notify::sm(res));
let _ = self.resp_tx.send(Notification::sm(res));
}
CommandPayload::BeginReceivingSnapshot { tx } => {
tracing::info!("{}: BeginReceivingSnapshot", func_name!());
Expand All @@ -137,7 +141,7 @@ where
CommandPayload::Apply { first, last } => {
let resp = self.apply(first, last).await?;
let res = CommandResult::new(cmd.seq, Ok(Response::Apply(resp)));
let _ = self.resp_tx.send(Notify::sm(res));
let _ = self.resp_tx.send(Notification::sm(res));
}
};
}
Expand Down Expand Up @@ -195,7 +199,7 @@ where
/// as applying a log entry,
/// - or it must be able to acquire a lock that prevents any write operations.
#[tracing::instrument(level = "info", skip_all)]
async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: MpscUnboundedSenderOf<C, Notify<C>>) {
async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: MpscUnboundedSenderOf<C, Notification<C>>) {
// TODO: need to be abortable?
// use futures::future::abortable;
// let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await });
Expand All @@ -208,7 +212,7 @@ where
let res = builder.build_snapshot().await;
let res = res.map(|snap| Response::BuildSnapshot(snap.meta));
let cmd_res = CommandResult::new(seq, res);
let _ = resp_tx.send(Notify::sm(cmd_res));
let _ = resp_tx.send(Notification::sm(cmd_res));
});
tracing::info!("{} returning; spawned building snapshot task", func_name!());
}
Expand Down
12 changes: 8 additions & 4 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tracing::Level;
use tracing::Span;

use crate::async_runtime::MpscUnboundedSender;
use crate::core::notify::Notify;
use crate::core::notification::Notification;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
use crate::type_config::TypeConfigExt;
Expand All @@ -25,7 +25,7 @@ where C: RaftTypeConfig
{
interval: Duration,

tx: MpscUnboundedSenderOf<C, Notify<C>>,
tx: MpscUnboundedSenderOf<C, Notification<C>>,

/// Emit event or not
enabled: Arc<AtomicBool>,
Expand Down Expand Up @@ -54,7 +54,11 @@ where C: RaftTypeConfig
impl<C> Tick<C>
where C: RaftTypeConfig
{
pub(crate) fn spawn(interval: Duration, tx: MpscUnboundedSenderOf<C, Notify<C>>, enabled: bool) -> TickHandle<C> {
pub(crate) fn spawn(
interval: Duration,
tx: MpscUnboundedSenderOf<C, Notification<C>>,
enabled: bool,
) -> TickHandle<C> {
let enabled = Arc::new(AtomicBool::from(enabled));
let this = Self {
interval,
Expand Down Expand Up @@ -106,7 +110,7 @@ where C: RaftTypeConfig

i += 1;

let send_res = self.tx.send(Notify::Tick { i });
let send_res = self.tx.send(Notification::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Stopping tick_loop(), main loop terminated");
break;
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,8 @@ where C: RaftTypeConfig
tx_api: tx_api.clone(),
rx_api,

tx_notify,
rx_notify,
tx_notification: tx_notify,
rx_notification: rx_notify,

tx_metrics,
tx_data_metrics,
Expand Down
Loading

0 comments on commit 3ae6b4b

Please sign in to comment.