Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge streaming branch into master #550

Merged
merged 7 commits into from
Nov 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/cmake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ on:
pull_request:
branches:
- master
- streaming

env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ if (CODE_COVERAGE GREATER 0)
strfmt_test
stat_mgr_test
logger_test
asio_service_stream_test
stream_functional_test
)

# lcov
Expand Down
8 changes: 8 additions & 0 deletions include/libnuraft/asio_service_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ struct asio_service_options {
, crc_on_entire_message_(false)
, crc_on_payload_(false)
, corrupted_msg_handler_(nullptr)
, streaming_mode_(false)
{}

/**
Expand Down Expand Up @@ -276,6 +277,13 @@ struct asio_service_options {
*/
std::function< void( std::shared_ptr<buffer>,
std::shared_ptr<buffer> ) > corrupted_msg_handler_;

/**
* If `true`, NuRaft will use streaming mode, which allows it to send
* subsequent requests without waiting for the response to previous requests.
* The order of responses will be identical to the order of requests.
*/
bool streaming_mode_;
};

}
Expand Down
49 changes: 48 additions & 1 deletion include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ limitations under the License.
#include "srv_config.hxx"

#include <atomic>
#include <cassert>

namespace nuraft {

Expand Down Expand Up @@ -77,6 +78,8 @@ public:
, lost_by_leader_(false)
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, bytes_in_flight_(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -217,7 +220,8 @@ public:

void send_req(ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler);
rpc_handler& handler,
bool streaming = false);

void shutdown();

Expand Down Expand Up @@ -303,6 +307,37 @@ public:
ptr<req_msg> get_rsv_msg() const { return rsv_msg_; }
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; }

ulong get_last_streamed_log_idx() {
return last_streamed_log_idx_.load();
}

void set_last_streamed_log_idx(ulong expected, ulong idx) {
last_streamed_log_idx_.compare_exchange_strong(expected, idx);
}

void reset_stream() {
last_streamed_log_idx_.store(0);
}

int64_t get_bytes_in_flight() {
return bytes_in_flight_.load();
}

void bytes_in_flight_add(size_t req_size_bytes) {
bytes_in_flight_.fetch_add(req_size_bytes);
}

void bytes_in_flight_sub(size_t req_size_bytes) {
bytes_in_flight_.fetch_sub(req_size_bytes);
assert(bytes_in_flight_ >= 0);
}

void reset_bytes_in_flight() {
bytes_in_flight_.store(0);
}

void try_set_free(msg_type type, bool streaming);

bool is_lost() const { return lost_by_leader_; }
void set_lost() { lost_by_leader_ = true; }
void set_recovered() { lost_by_leader_ = false; }
Expand All @@ -312,6 +347,8 @@ private:
ptr<rpc_client> my_rpc_client,
ptr<req_msg>& req,
ptr<rpc_result>& pending_result,
bool streaming,
size_t req_size_bytes,
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

Expand Down Expand Up @@ -519,6 +556,16 @@ private:
*/
rpc_handler rsv_msg_handler_;

/**
* Last log index sent in stream mode.
*/
std::atomic<ulong> last_streamed_log_idx_;

/**
* Current bytes of in-flight append entry requests.
*/
std::atomic<int64_t> bytes_in_flight_;

/**
* Logger instance.
*/
Expand Down
18 changes: 18 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ struct raft_params {
, use_bg_thread_for_snapshot_io_(false)
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
, max_log_gap_in_stream_(0)
, max_bytes_in_flight_in_stream_(0)
{}

/**
Expand Down Expand Up @@ -604,6 +606,22 @@ public:
* before returning the response.
*/
bool parallel_log_appending_;

/**
* If non-zero, streaming mode is enabled and `append_entries` requests are
* dispatched instantly without awaiting the response from the prior request.
*,
* The count of logs in-flight will be capped by this value, allowing it
* to function as a throttling mechanism, in conjunction with
* `max_bytes_in_flight_in_stream_`.
*/
int32 max_log_gap_in_stream_;

/**
* If non-zero, the volume of data in-flight will be restricted to this
* specified byte limit. This limitation is effective only in streaming mode.
*/
int64_t max_bytes_in_flight_in_stream_;
};

}
Expand Down
3 changes: 2 additions & 1 deletion include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -929,6 +929,7 @@ protected:
void request_vote(bool force_vote);
void request_append_entries();
bool request_append_entries(ptr<peer> p);
bool send_request(ptr<peer>& p, ptr<req_msg>& msg, rpc_handler& m_handler, bool streaming = false);
void handle_peer_resp(ptr<resp_msg>& resp, ptr<rpc_exception>& err);
void handle_append_entries_resp(resp_msg& resp);
void handle_install_snapshot_resp(resp_msg& resp);
Expand All @@ -946,7 +947,7 @@ protected:
void handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p);
void reset_srv_to_join();
void reset_srv_to_leave();
ptr<req_msg> create_append_entries_req(ptr<peer>& pp);
ptr<req_msg> create_append_entries_req(ptr<peer>& pp, ulong custom_last_log_idx = 0);
ptr<req_msg> create_sync_snapshot_req(ptr<peer>& pp,
ulong last_log_idx,
ulong term,
Expand Down
2 changes: 2 additions & 0 deletions scripts/test/runtests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ set -e
./tests/raft_server_test --abort-on-failure
./tests/failure_test --abort-on-failure
./tests/asio_service_test --abort-on-failure
./tests/asio_service_stream_test --abort-on-failure
./tests/stream_functional_test --abort-on-failure
Loading