Skip to content

Commit

Permalink
Fix to bug on snapshot ctx of removed member (#473)
Browse files Browse the repository at this point in the history
* If a member receiving snapshot is not responding, and then removed
by the rpc error handler, its snapshot ctx is kept open and will never
be closed.

* To avoid such an issue, rpc error handler should explicitly destroy
the snapshot ctx of the removed member.
  • Loading branch information
greensky00 authored Oct 6, 2023
1 parent f42b12c commit e66b4b6
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
4 changes: 1 addition & 3 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -564,9 +564,7 @@ void raft_server::handle_join_leave_rpc_err(msg_type t_msg, ptr<peer> p) {
if (peers_.size() == 1) {
peer_itor pit = peers_.find(p->get_id());
if (pit != peers_.end()) {
pit->second->enable_hb(false);
peers_.erase(pit);
p_in("server %d is removed from cluster", p->get_id());
remove_peer_from_peers(pit->second);
} else {
p_in("peer %d cannot be found, no action for removing",
p->get_id());
Expand Down
1 change: 1 addition & 0 deletions src/handle_timeout.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ void raft_server::handle_hb_timeout(int32 srv_id) {
if (p->is_hb_enabled()) {
// Schedule another heartbeat if heartbeat is still enabled
schedule_task(p->get_hb_task(), p->get_current_hb_interval());
p_tr("reschedule heartbeat for peer %d", p->get_id());
} else {
p_db("heartbeat is disabled for peer %d", p->get_id());
}
Expand Down
80 changes: 79 additions & 1 deletion tests/unit/raft_server_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ int add_node_error_cases_test() {
std::string s2_addr = "S2";
std::string s3_addr = "S3";
// Hard to make a server really non-existent as to fail an rpc req with a FakeNetwork
// you need to actually have a recipient. So we simulate a nonexistent server with an
// you need to actually have a recipient. So we simulate a nonexistent server with an
// offline one
std::string nonexistent_addr = "nonexistent";

Expand Down Expand Up @@ -2026,6 +2026,9 @@ int snapshot_basic_test() {
CHK_OK( s2.getTestSm()->isSame( *s1.getTestSm() ) );
CHK_OK( s3.getTestSm()->isSame( *s1.getTestSm() ) );

// There shouldn't be any open snapshot ctx.
CHK_Z( s1.getTestSm()->getNumOpenedUserCtxs() );

print_stats(pkgs);

s1.raftServer->shutdown();
Expand Down Expand Up @@ -2124,6 +2127,9 @@ int snapshot_manual_creation_test() {

CHK_EQ( committed_index, s3.getTestSm()->last_snapshot()->get_last_log_idx() );

// There shouldn't be any open snapshot ctx.
CHK_Z( s1.getTestSm()->getNumOpenedUserCtxs() );

print_stats(pkgs);

s1.raftServer->shutdown();
Expand Down Expand Up @@ -2221,6 +2227,75 @@ int snapshot_randomized_creation_test() {
return 0;
}

int snapshot_close_for_removed_peer_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();

std::string s1_addr = "S1";
std::string s2_addr = "S2";

RaftPkg s1(f_base, 1, s1_addr);
RaftPkg s2(f_base, 2, s2_addr);
std::vector<RaftPkg*> pkgs = {&s1, &s2};

CHK_Z( launch_servers( pkgs ) );
CHK_Z( make_group( pkgs ) );

for (auto& entry: pkgs) {
RaftPkg* pp = entry;
raft_params param = pp->raftServer->get_current_params();
param.return_method_ = raft_params::async_handler;
// Set quorum to 1 so as to make S1 commits data locally.
param.custom_commit_quorum_size_ = 1;
param.custom_election_quorum_size_ = 1;
pp->raftServer->update_params(param);
}

const size_t NUM = 10;

// Append messages asynchronously.
std::list< ptr< cmd_result< ptr<buffer> > > > handlers;
for (size_t ii=0; ii<NUM; ++ii) {
std::string test_msg = "test" + std::to_string(ii);
ptr<buffer> msg = buffer::alloc(test_msg.size() + 1);
msg->put(test_msg);
ptr< cmd_result< ptr<buffer> > > ret =
s1.raftServer->append_entries( {msg} );

CHK_TRUE( ret->get_accepted() );

handlers.push_back(ret);
}
// Wait for bg commit.
CHK_Z( wait_for_sm_exec(pkgs, COMMIT_TIMEOUT_SEC) );

// Make req to S2 failed.
s1.fNet->makeReqFailAll("S2");

// Heartbeat, this will initiate snapshot transfer to S2.
s1.fTimer->invoke(timer_task_type::heartbeat_timer);
s1.fNet->execReqResp("S2");

// Now remove S2.
s1.raftServer->remove_srv(2);

// Heartbeat, and make request fail.
s1.fTimer->invoke(timer_task_type::heartbeat_timer);
s1.fNet->makeReqFailAll("S2");

// After S2 is removed, the snapshot ctx should be destroyed.
CHK_Z( s1.getTestSm()->getNumOpenedUserCtxs() );

print_stats(pkgs);

s1.raftServer->shutdown();
s2.raftServer->shutdown();

f_base->destroy();

return 0;
}

int join_empty_node_test() {
reset_log_files();
ptr<FakeNetworkBase> f_base = cs_new<FakeNetworkBase>();
Expand Down Expand Up @@ -3219,6 +3294,9 @@ int main(int argc, char** argv) {
ts.doTest( "snapshot randomized creation test",
snapshot_randomized_creation_test );

ts.doTest( "snapshot close for removed peer test",
snapshot_close_for_removed_peer_test );

ts.doTest( "join empty node test",
join_empty_node_test );

Expand Down

0 comments on commit e66b4b6

Please sign in to comment.