Skip to content

Commit

Permalink
handle comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lihzeng committed Oct 16, 2024
1 parent 9f53b08 commit 3bb39af
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 33 deletions.
26 changes: 14 additions & 12 deletions include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include "srv_config.hxx"

#include <atomic>
#include <cassert>

namespace nuraft {

Expand Down Expand Up @@ -78,7 +79,7 @@ public:
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, flying_bytes(0)
, bytes_in_flight_(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -318,20 +319,21 @@ public:
last_streamed_log_idx_.store(0);
}

size_t get_flying_bytes() {
return flying_bytes.load();
int64_t get_bytes_in_flight() {
return bytes_in_flight_.load();
}

void flying_bytes_add(size_t total_size) {
flying_bytes.fetch_add(total_size);
void bytes_in_flight_add(size_t req_size_bytes) {
bytes_in_flight_.fetch_add(req_size_bytes);
}

void flying_bytes_sub(size_t total_size) {
flying_bytes.fetch_sub(total_size);
void bytes_in_flight_sub(size_t req_size_bytes) {
bytes_in_flight_.fetch_sub(req_size_bytes);
assert(bytes_in_flight_ >= 0);
}

void reset_flying_bytes() {
flying_bytes.store(0);
void reset_bytes_in_flight() {
bytes_in_flight_.store(0);
}

void try_set_free(msg_type type, bool streaming);
Expand All @@ -346,7 +348,7 @@ private:
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t total_size,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

Expand Down Expand Up @@ -560,9 +562,9 @@ private:
std::atomic<ulong> last_streamed_log_idx_;

/**
* Current flying bytes of append entry requests.
* Current bytes of in-flight append entry requests.
*/
std::atomic<size_t> flying_bytes;
std::atomic<int64_t> bytes_in_flight_;

/**
* Logger instance.
Expand Down
6 changes: 3 additions & 3 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ struct raft_params {
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
, max_log_gap_in_stream_(0)
, max_flying_bytes_(0)
, max_bytes_in_flight_in_stream_(0)
{}

/**
Expand Down Expand Up @@ -620,9 +620,9 @@ public:
int32 max_log_gap_in_stream_;

/**
* Max flying bytes we allow. If it is zero, we don't use it as throttling.
* Max in-flight bytes we allow. If it is zero, we don't use it as throttling.
*/
ulong max_flying_bytes_;
int64_t max_bytes_in_flight_in_stream_;
};

}
Expand Down
13 changes: 7 additions & 6 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,9 @@ bool raft_server::request_append_entries(ptr<peer> p) {
// throttling
if (max_gap_in_stream + p->get_next_log_idx() <=
(last_streamed_log_idx + 1) ||
( params->max_flying_bytes_ &&
p->get_flying_bytes() > params->max_flying_bytes_)) {
(params->max_bytes_in_flight_in_stream_ &&
p->get_bytes_in_flight() >
params->max_bytes_in_flight_in_stream_)) {
streaming = false;
} else {
p_tr("send following request to %d in stream mode, "
Expand Down Expand Up @@ -352,10 +353,10 @@ bool raft_server::request_append_entries(ptr<peer> p) {
p_wn("skipped sending msg to %d too long time, "
"last streamed idx: %" PRIu64 ""
"next log idx: %" PRIu64 ""
"flying bytes: %" PRIu64 ""
"in-flight: %" PRIu64 " bytes"
"last msg sent %d ms ago",
p->get_id(), p->get_last_streamed_log_idx(),
p->get_next_log_idx(), p->get_flying_bytes(), last_ts_ms);
p->get_next_log_idx(), p->get_bytes_in_flight(), last_ts_ms);
} else if ( p->get_long_puase_warnings() ==
raft_server::raft_limits_.warning_limit_ ) {
p_wn("long pause warning to %d is too verbose, "
Expand Down Expand Up @@ -1050,8 +1051,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
(int)p->get_id(), resp.get_next_idx());

int64 bs_hint = resp.get_next_batch_size_hint_in_bytes();
p_tr("peer %d batch size hint: %" PRId64 " bytes, flying bytes: %" PRId64 "",
p->get_id(), bs_hint, p->get_flying_bytes());
p_tr("peer %d batch size hint: %" PRId64 " bytes, in-flight: %" PRId64 " bytes",
p->get_id(), bs_hint, p->get_bytes_in_flight());
p->set_next_batch_size_hint_in_bytes(bs_hint);

if (resp.get_accepted()) {
Expand Down
16 changes: 8 additions & 8 deletions src/peer.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ void peer::send_req( ptr<peer> myself,
rpc_local = rpc_;
}

size_t total_size = 0;
size_t req_size_bytes = 0;
if (req->get_type() == append_entries_request) {
for (auto& entry: req->log_entries()) {
total_size += entry->get_buf_ptr()->size();
req_size_bytes += entry->get_buf_ptr()->size();
}
}

Expand All @@ -73,11 +73,11 @@ void peer::send_req( ptr<peer> myself,
req,
pending,
streaming,
total_size,
req_size_bytes,
std::placeholders::_1,
std::placeholders::_2 );
if (rpc_local) {
myself->flying_bytes_add(total_size);
myself->bytes_in_flight_add(req_size_bytes);
rpc_local->send(req, h);
}
}
Expand All @@ -93,7 +93,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t total_size,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err )
{
Expand Down Expand Up @@ -130,7 +130,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
// WARNING:
// `set_free()` should be protected by `rpc_protector_`, otherwise
// it may free the peer even though new RPC client is already created.
flying_bytes_sub(total_size);
bytes_in_flight_sub(req_size_bytes);
try_set_free(req->get_type(), streaming);
}
}
Expand Down Expand Up @@ -168,7 +168,7 @@ void peer::handle_rpc_result( ptr<peer> myself,
if (cur_rpc_id == given_rpc_id) {
rpc_.reset();
reset_stream();
reset_flying_bytes();
reset_bytes_in_flight();
try_set_free(req->get_type(), streaming);
} else {
// WARNING (MONSTOR-9378):
Expand Down Expand Up @@ -254,7 +254,7 @@ bool peer::recreate_rpc(ptr<srv_config>& config,
reset_active_timer();

reset_stream();
reset_flying_bytes();
reset_bytes_in_flight();
set_free();
set_manual_free();
return true;
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/stream_functional_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,11 @@ void update_stream_params(const std::vector<RaftPkg*>& pkgs,
}

void update_max_fly_bytes_params(const std::vector<RaftPkg*>& pkgs,
int max_flying_bytes) {
int max_bytes_in_flight_in_stream) {
for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.max_flying_bytes_ = max_flying_bytes;
param.max_bytes_in_flight_in_stream_ = max_bytes_in_flight_in_stream;
pp->raftServer->update_params(param);
}
}
Expand Down Expand Up @@ -475,9 +475,9 @@ int stream_mode_flying_bytes_throttling_test() {

// Set stream mode and max flying bytes params
// idx 0-9, size is 13, idx 10-99, size is 14
int max_flying_bytes = 13 * 9;
int max_bytes_in_flight = 13 * 9;
update_stream_params(pkgs, 500);
update_max_fly_bytes_params(pkgs, max_flying_bytes);
update_max_fly_bytes_params(pkgs, max_bytes_in_flight);

// Append 1 log to enable stream
CHK_Z( activate_stream(s1, s2_addr) );
Expand Down

0 comments on commit 3bb39af

Please sign in to comment.