Skip to content

Commit

Permalink
Implement stream mode in peer and raft server
Browse files Browse the repository at this point in the history
1. it is a proposal for stream mode.
    I keep the make_busy and set_free for append-log requests
2. most of changes in peer is used in raft server. so I merge the change of peer and raft server to see if it is a proper way to implement stream mode.
3. I will split it into 2 PR and add tests separately once this proposal is accepted.
  • Loading branch information
lihzeng committed Aug 20, 2024
1 parent 680fbdf commit a7eeaa5
Show file tree
Hide file tree
Showing 5 changed files with 213 additions and 13 deletions.
99 changes: 98 additions & 1 deletion include/libnuraft/peer.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public:
, lost_by_leader_(false)
, rsv_msg_(nullptr)
, rsv_msg_handler_(nullptr)
, last_streamed_log_idx_(0)
, stream_mode_flag_(false)
, append_log_flag_(false)
, first_append_log_flag_(false)
, writing_flag_(false)
, flying_requests_count(0)
, l_(logger)
{
reset_ls_timer();
Expand Down Expand Up @@ -215,7 +221,7 @@ public:
hb_interval_ = new_interval;
}

void send_req(ptr<peer> myself,
bool send_req(ptr<peer> myself,
ptr<req_msg>& req,
rpc_handler& handler);

Expand Down Expand Up @@ -303,6 +309,56 @@ public:
ptr<req_msg> get_rsv_msg() const { return rsv_msg_; }
rpc_handler get_rsv_msg_handler() const { return rsv_msg_handler_; }

ulong get_last_streamed_log_idx() {
return last_streamed_log_idx_.load();
}

void set_last_streamed_log_idx(ulong idx) {
last_streamed_log_idx_.store(idx);
}

bool start_writing() {
bool f = false;
return writing_flag_.compare_exchange_strong(f, true);
}

void write_done() {
writing_flag_.store(false);
}

void enable_stream() {
stream_mode_flag_.store(true);
}

void disable_stream() {
stream_mode_flag_.store(false);
}

bool is_streaming() const {
return stream_mode_flag_ && append_log_flag_;
}

void start_append() {
append_log_flag_.store(true);
}

void append_done() {
append_log_flag_.store(false);
}

void first_append() {
first_append_log_flag_.store(true);
}

bool try_finish_first_append() {
bool f = true;
return first_append_log_flag_.compare_exchange_strong(f, false);
}

void reset_flying_requests() {
flying_requests_count = 0;
}

bool is_lost() const { return lost_by_leader_; }
void set_lost() { lost_by_leader_ = true; }
void set_recovered() { lost_by_leader_ = false; }
Expand All @@ -315,6 +371,17 @@ private:
ptr<resp_msg>& resp,
ptr<rpc_exception>& err);

void try_set_free();

void reset_stream() {
set_last_streamed_log_idx(0);
try_finish_first_append();
append_done();
write_done();
disable_stream();
reset_flying_requests();
}

/**
* Information (config) of this server.
*/
Expand Down Expand Up @@ -519,6 +586,36 @@ private:
*/
rpc_handler rsv_msg_handler_;

/**
* Last log index sent in stream mode.
*/
std::atomic<ulong> last_streamed_log_idx_;

/**
* `true` if we enable stream mode.
*/
std::atomic<bool> stream_mode_flag_;

/**
* `true` if peer is locked by appending log.
*/
std::atomic<bool> append_log_flag_;

/**
* `true` if the peer lock was acquired in non-stream mode.
*/
std::atomic<bool> first_append_log_flag_;

/**
* `true` if this peer is processing append log request.
*/
std::atomic<bool> writing_flag_;

/**
* Count of flying requests sent by this peer.
*/
ulong flying_requests_count;

/**
* Logger instance.
*/
Expand Down
3 changes: 3 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ struct raft_params {
, use_bg_thread_for_snapshot_io_(false)
, use_full_consensus_among_healthy_members_(false)
, parallel_log_appending_(false)
, enable_stream_mode_(false)
{}

/**
Expand Down Expand Up @@ -604,6 +605,8 @@ public:
* before returning the response.
*/
bool parallel_log_appending_;

bool enable_stream_mode_;
};

}
Expand Down
1 change: 1 addition & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,7 @@ protected:
void apply_to_not_responding_peers(const std::function<void(const ptr<peer>&)>&, int expiry = 0);

ptr<resp_msg> handle_append_entries(req_msg& req);
bool try_start_append(ptr<peer>& p, bool make_busy_success);
ptr<resp_msg> handle_prevote_req(req_msg& req);
ptr<resp_msg> handle_vote_req(req_msg& req);
ptr<resp_msg> handle_cli_req_prelock(req_msg& req, const req_ext_params& ext_params);
Expand Down
95 changes: 87 additions & 8 deletions src/handle_append_entries.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -271,15 +271,24 @@ bool raft_server::request_append_entries(ptr<peer> p) {
}
}

if (p->make_busy()) {
p_tr("send request to %d\n", (int)p->get_id());
bool make_busy_success = p->make_busy();
if (try_start_append(p, make_busy_success)) {
if (make_busy_success) {
p_tr("send first request to %d", (int)p->get_id());
p->start_append();
if (!p->is_streaming()) {
p->first_append();
}
} else {
p_tr("send following request to %d", (int)p->get_id());
}

// If reserved message exists, process it first.
ptr<req_msg> msg = p->get_rsv_msg();
rpc_handler m_handler = p->get_rsv_msg_handler();
bool use_rsv = false;
if (msg) {
// Clear the reserved message.
p->set_rsv_msg(nullptr, nullptr);
use_rsv = true;
p_in("found reserved message to peer %d, type %d",
p->get_id(), msg->get_type());

Expand All @@ -291,12 +300,18 @@ bool raft_server::request_append_entries(ptr<peer> p) {

if (!msg) {
// Even normal message doesn't exist.
p->set_free();
if (make_busy_success) {
p->try_finish_first_append();
p->append_done();
p->set_free();
}

if ( params->use_bg_thread_for_snapshot_io_ &&
p->get_snapshot_sync_ctx() ) {
// If this is an async snapshot request, invoke IO thread.
snapshot_io_mgr::instance().invoke();
}
p->write_done();
return true;
}

Expand Down Expand Up @@ -329,7 +344,29 @@ bool raft_server::request_append_entries(ptr<peer> p) {
p->reset_manual_free();
}

p->send_req(p, msg, m_handler);
if (msg->get_type() == msg_type::append_entries_request) {
if (p->send_req(p, msg, m_handler)) {
p->set_last_streamed_log_idx(msg->get_last_log_idx() + msg->log_entries().size());
}
} else {
p->try_finish_first_append();
p->disable_stream();
// it is not an append entry request, disable stream here, and let flying request finish
if (make_busy_success) {
// there is no flying request, send this request, clear reserved msg here
if (use_rsv) {
p->set_rsv_msg(nullptr, nullptr);
}

p->append_done();
p->send_req(p, msg, m_handler);
} else {
p_wn("there are flying append log requests, peer %d is busy now for %s", p->get_id(),
msg_type_to_string(msg->get_type()).c_str());
p->write_done();
return false;
}
}
p->reset_ls_timer();

cb_func::Param param(id_, leader_, p->get_id(), msg.get());
Expand All @@ -351,6 +388,7 @@ bool raft_server::request_append_entries(ptr<peer> p) {
msg->get_commit_idx());
}

p->write_done();
p_tr("sent\n");
return true;
}
Expand Down Expand Up @@ -398,7 +436,11 @@ ptr<req_msg> raft_server::create_append_entries_req(ptr<peer>& pp) {
p.set_next_log_idx(cur_nxt_idx);
}

last_log_idx = p.get_next_log_idx() - 1;
if (p.is_streaming() && ctx_->get_params()->enable_stream_mode_) {
last_log_idx = p.get_last_streamed_log_idx();
} else {
last_log_idx = p.get_next_log_idx() - 1;
}
}

if (last_log_idx >= cur_nxt_idx) {
Expand Down Expand Up @@ -943,6 +985,25 @@ ptr<resp_msg> raft_server::handle_append_entries(req_msg& req)
return resp;
}

bool raft_server::try_start_append(ptr<peer>& p, bool make_busy_success) {
if (p->start_writing()) {
bool success = make_busy_success || (p->is_streaming() && ctx_->get_params()->enable_stream_mode_);
if (!success) {
if (make_busy_success) {
p->set_free();
}

p->write_done();
}
return success;
}

if (make_busy_success) {
p->set_free();
}
return false;
}

bool raft_server::try_update_precommit_index(ulong desired, const size_t MAX_ATTEMPTS) {
// If `MAX_ATTEMPTS == 0`, try forever.
size_t num_attempts = 0;
Expand Down Expand Up @@ -1022,9 +1083,21 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
// Try to commit with this response.
ulong committed_index = get_expected_committed_log_idx();
commit( committed_index );

// todo: even we check streamed log index here, catch up may send empty request.
ulong next_sent_log = 0;
if (p->is_streaming() && ctx_->get_params()->enable_stream_mode_) {
next_sent_log = p->get_last_streamed_log_idx() + 1;
} else {
next_sent_log = resp.get_next_idx();
}
need_to_catchup = p->clear_pending_commit() ||
resp.get_next_idx() < log_store_->next_slot();
next_sent_log < log_store_->next_slot();

// Try enable stream here
if (p->try_finish_first_append()) {
p->enable_stream();
}
} else {
std::lock_guard<std::mutex> guard(p->get_lock());
ulong prev_next_log = p->get_next_log_idx();
Expand Down Expand Up @@ -1066,13 +1139,18 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
"resp next %" PRIu64 ", new next log idx %" PRIu64,
p->get_id(), prev_next_log,
resp.get_next_idx(), p->get_next_log_idx() );

// disable stream
p->try_finish_first_append();
p->disable_stream();
}

// NOTE:
// If all other followers are not responding, we may not make
// below condition true. In that case, we check the timeout of
// re-election timer in heartbeat handler, and do force resign.
ulong p_matched_idx = p->get_matched_idx();
// todo: how to handle this in stream mode
if ( write_paused_ &&
p->get_id() == next_leader_candidate_ &&
p_matched_idx &&
Expand Down Expand Up @@ -1130,6 +1208,7 @@ void raft_server::handle_append_entries_resp(resp_msg& resp) {
// to `false` to avoid sending meaningless messages continuously
// which eats up CPU. Then the leader will send heartbeats only.
need_to_catchup = false;
p->disable_stream();
}

// This may not be a leader anymore,
Expand Down
Loading

0 comments on commit a7eeaa5

Please sign in to comment.