diff --git a/include/libnuraft/peer.hxx b/include/libnuraft/peer.hxx index b2fbc9c3..114a19cd 100644 --- a/include/libnuraft/peer.hxx +++ b/include/libnuraft/peer.hxx @@ -30,6 +30,7 @@ limitations under the License. #include "srv_config.hxx" #include +#include namespace nuraft { @@ -78,7 +79,7 @@ public: , rsv_msg_(nullptr) , rsv_msg_handler_(nullptr) , last_streamed_log_idx_(0) - , flying_bytes(0) + , bytes_in_flight_(0) , l_(logger) { reset_ls_timer(); @@ -318,20 +319,21 @@ public: last_streamed_log_idx_.store(0); } - size_t get_flying_bytes() { - return flying_bytes.load(); + int64_t get_bytes_in_flight() { + return bytes_in_flight_.load(); } - void flying_bytes_add(size_t total_size) { - flying_bytes.fetch_add(total_size); + void bytes_in_flight_add(size_t req_size_bytes) { + bytes_in_flight_.fetch_add(req_size_bytes); } - void flying_bytes_sub(size_t total_size) { - flying_bytes.fetch_sub(total_size); + void bytes_in_flight_sub(size_t req_size_bytes) { + bytes_in_flight_.fetch_sub(req_size_bytes); + assert(bytes_in_flight_ >= 0); } - void reset_flying_bytes() { - flying_bytes.store(0); + void reset_bytes_in_flight() { + bytes_in_flight_.store(0); } void try_set_free(msg_type type, bool streaming); @@ -346,7 +348,7 @@ private: ptr& req, ptr& pending_result, bool streaming, - size_t total_size, + size_t req_size_bytes, ptr& resp, ptr& err); @@ -560,9 +562,9 @@ private: std::atomic last_streamed_log_idx_; /** - * Current flying bytes of append entry requests. + * Current bytes of in-flight append entry requests. */ - std::atomic flying_bytes; + std::atomic bytes_in_flight_; /** * Logger instance. diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index 964a9659..8a8f0e60 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -98,7 +98,7 @@ struct raft_params { , use_full_consensus_among_healthy_members_(false) , parallel_log_appending_(false) , max_log_gap_in_stream_(0) - , max_flying_bytes_(0) + , max_bytes_in_flight_in_stream_(0) {} /** @@ -620,9 +620,9 @@ public: int32 max_log_gap_in_stream_; /** - * Max flying bytes we allow. If it is zero, we don't use it as throttling. + * Max in-flight bytes we allow. If it is zero, we don't use it as throttling. */ - ulong max_flying_bytes_; + int64_t max_bytes_in_flight_in_stream_; }; } diff --git a/src/handle_append_entries.cxx b/src/handle_append_entries.cxx index 88018dea..9649b0e8 100644 --- a/src/handle_append_entries.cxx +++ b/src/handle_append_entries.cxx @@ -308,8 +308,9 @@ bool raft_server::request_append_entries(ptr p) { // throttling if (max_gap_in_stream + p->get_next_log_idx() <= (last_streamed_log_idx + 1) || - ( params->max_flying_bytes_ && - p->get_flying_bytes() > params->max_flying_bytes_)) { + (params->max_bytes_in_flight_in_stream_ && + p->get_bytes_in_flight() > + params->max_bytes_in_flight_in_stream_)) { streaming = false; } else { p_tr("send following request to %d in stream mode, " @@ -352,10 +353,10 @@ bool raft_server::request_append_entries(ptr p) { p_wn("skipped sending msg to %d too long time, " "last streamed idx: %" PRIu64 "" "next log idx: %" PRIu64 "" - "flying bytes: %" PRIu64 "" + "in-flight: %" PRIu64 " bytes" "last msg sent %d ms ago", p->get_id(), p->get_last_streamed_log_idx(), - p->get_next_log_idx(), p->get_flying_bytes(), last_ts_ms); + p->get_next_log_idx(), p->get_bytes_in_flight(), last_ts_ms); } else if ( p->get_long_puase_warnings() == raft_server::raft_limits_.warning_limit_ ) { p_wn("long pause warning to %d is too verbose, " @@ -1050,8 +1051,8 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) { (int)p->get_id(), resp.get_next_idx()); int64 bs_hint = resp.get_next_batch_size_hint_in_bytes(); - p_tr("peer %d batch size hint: %" PRId64 " bytes, flying bytes: %" PRId64 "", - p->get_id(), bs_hint, p->get_flying_bytes()); + p_tr("peer %d batch size hint: %" PRId64 " bytes, in-flight: %" PRId64 " bytes", + p->get_id(), bs_hint, p->get_bytes_in_flight()); p->set_next_batch_size_hint_in_bytes(bs_hint); if (resp.get_accepted()) { diff --git a/src/peer.cxx b/src/peer.cxx index 75e80ae3..ec91104b 100644 --- a/src/peer.cxx +++ b/src/peer.cxx @@ -58,10 +58,10 @@ void peer::send_req( ptr myself, rpc_local = rpc_; } - size_t total_size = 0; + size_t req_size_bytes = 0; if (req->get_type() == append_entries_request) { for (auto& entry: req->log_entries()) { - total_size += entry->get_buf_ptr()->size(); + req_size_bytes += entry->get_buf_ptr()->size(); } } @@ -73,11 +73,11 @@ void peer::send_req( ptr myself, req, pending, streaming, - total_size, + req_size_bytes, std::placeholders::_1, std::placeholders::_2 ); if (rpc_local) { - myself->flying_bytes_add(total_size); + myself->bytes_in_flight_add(req_size_bytes); rpc_local->send(req, h); } } @@ -93,7 +93,7 @@ void peer::handle_rpc_result( ptr myself, ptr& req, ptr& pending_result, bool streaming, - size_t total_size, + size_t req_size_bytes, ptr& resp, ptr& err ) { @@ -130,7 +130,7 @@ void peer::handle_rpc_result( ptr myself, // WARNING: // `set_free()` should be protected by `rpc_protector_`, otherwise // it may free the peer even though new RPC client is already created. - flying_bytes_sub(total_size); + bytes_in_flight_sub(req_size_bytes); try_set_free(req->get_type(), streaming); } } @@ -168,7 +168,7 @@ void peer::handle_rpc_result( ptr myself, if (cur_rpc_id == given_rpc_id) { rpc_.reset(); reset_stream(); - reset_flying_bytes(); + reset_bytes_in_flight(); try_set_free(req->get_type(), streaming); } else { // WARNING (MONSTOR-9378): @@ -254,7 +254,7 @@ bool peer::recreate_rpc(ptr& config, reset_active_timer(); reset_stream(); - reset_flying_bytes(); + reset_bytes_in_flight(); set_free(); set_manual_free(); return true; diff --git a/tests/unit/stream_functional_test.cxx b/tests/unit/stream_functional_test.cxx index e49cb457..08ed0254 100644 --- a/tests/unit/stream_functional_test.cxx +++ b/tests/unit/stream_functional_test.cxx @@ -175,11 +175,11 @@ void update_stream_params(const std::vector& pkgs, } void update_max_fly_bytes_params(const std::vector& pkgs, - int max_flying_bytes) { + int max_bytes_in_flight_in_stream) { for (auto& entry: pkgs) { RaftPkg* pp = entry; raft_params param = pp->raftServer->get_current_params(); - param.max_flying_bytes_ = max_flying_bytes; + param.max_bytes_in_flight_in_stream_ = max_bytes_in_flight_in_stream; pp->raftServer->update_params(param); } } @@ -475,9 +475,9 @@ int stream_mode_flying_bytes_throttling_test() { // Set stream mode and max flying bytes params // idx 0-9, size is 13, idx 10-99, size is 14 - int max_flying_bytes = 13 * 9; + int max_bytes_in_flight = 13 * 9; update_stream_params(pkgs, 500); - update_max_fly_bytes_params(pkgs, max_flying_bytes); + update_max_fly_bytes_params(pkgs, max_bytes_in_flight); // Append 1 log to enable stream CHK_Z( activate_stream(s1, s2_addr) );