diff --git a/src/replica/replica.h b/src/replica/replica.h index f79d14fcdd..6c99b1e454 100644 --- a/src/replica/replica.h +++ b/src/replica/replica.h @@ -296,6 +296,12 @@ class replica : public serverlet, public ref_counter, public replica_ba /*out*/ learn_response &response, /*out*/ bool &delayed_replay_prepare_list); + // Prepares the files on disk that will participate in the replica's learning. + void prepare_durable_learn_state(decree learn_start_decree, + const learn_request &request, + /*out*/ learn_response &response, + /*out*/ remote_learner_state &learn_state); + // Gets the position where this round of the learning process should begin. // This method is called on primary-side. // TODO(wutao1): mark it const diff --git a/src/replica/replica_learn.cpp b/src/replica/replica_learn.cpp index 23dceb27d5..49b61e6f17 100644 --- a/src/replica/replica_learn.cpp +++ b/src/replica/replica_learn.cpp @@ -445,98 +445,7 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) response, delayed_replay_prepare_list); if (!should_learn_cache) { - if (learn_start_decree > _app->last_durable_decree()) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 - ")", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - _app->last_durable_decree()); - _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state); - response.type = learn_type::LT_LOG; - } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because mutation_log::get_learn_state() returns true", - name(), - request.signature, - request.learner.to_string()); - response.type = learn_type::LT_LOG; - } else if (learn_start_decree < request.last_committed_decree_in_app + 1) { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " - "because learn_start_decree steps back for duplication", - name(), - request.signature, - request.learner.to_string()); - response.type = learn_type::LT_LOG; - } else { - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " - "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 - "), " - "and mutation_log::get_learn_state() returns false", - name(), - request.signature, - request.learner.to_string(), - learn_start_decree, - _app->last_durable_decree()); - response.type = learn_type::LT_APP; - response.state = learn_state(); - } - - if (response.type == learn_type::LT_LOG) { - response.base_local_dir = _private_log->dir(); - if (response.state.files.size() > 0) { - auto &last_file = response.state.files.back(); - if (last_file == learner_state.last_learn_log_file) { - ddebug( - "%s: on_learn[%016" PRIx64 - "]: learner = %s, learn the same file %s repeatedly, hint to switch file", - name(), - request.signature, - request.learner.to_string(), - last_file.c_str()); - _private_log->hint_switch_file(); - } else { - learner_state.last_learn_log_file = last_file; - } - } - // it is safe to commit to last_committed_decree() now - response.state.to_decree_included = last_committed_decree(); - ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, " - "learned_meta_size = %u, learned_file_count = %u, " - "to_decree_included = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); - } else { - ::dsn::error_code err = _app->get_checkpoint( - learn_start_decree, request.app_specific_learn_request, response.state); - - if (err != ERR_OK) { - response.err = ERR_GET_LEARN_STATE_FAILED; - derror("%s: on_learn[%016" PRIx64 - "]: learner = %s, get app checkpoint failed, error = %s", - name(), - request.signature, - request.learner.to_string(), - err.to_string()); - } else { - response.base_local_dir = _app->data_dir(); - ddebug( - "%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " - "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, - name(), - request.signature, - request.learner.to_string(), - response.state.meta.length(), - static_cast(response.state.files.size()), - response.state.to_decree_included); - } - } + prepare_durable_learn_state(learn_start_decree, request, response, learner_state); } for (auto &file : response.state.files) { @@ -551,6 +460,101 @@ void replica::on_learn(dsn::message_ex *msg, const learn_request &request) } } +void replica::prepare_durable_learn_state(decree learn_start_decree, + const learn_request &request, + /*out*/ learn_response &response, + /*out*/ remote_learner_state &learner_state) +{ + if (learn_start_decree > _app->last_durable_decree()) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because learn_start_decree(%" PRId64 ") > _app->last_durable_decree(%" PRId64 ")", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + _app->last_durable_decree()); + _private_log->get_learn_state(get_gpid(), learn_start_decree, response.state); + response.type = learn_type::LT_LOG; + } else if (_private_log->get_learn_state(get_gpid(), learn_start_decree, response.state)) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because mutation_log::get_learn_state() returns true", + name(), + request.signature, + request.learner.to_string()); + response.type = learn_type::LT_LOG; + } else if (learn_start_decree < request.last_committed_decree_in_app + 1) { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn private logs, " + "because learn_start_decree steps back for duplication", + name(), + request.signature, + request.learner.to_string()); + response.type = learn_type::LT_LOG; + } else { + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, choose to learn app, " + "beacuse learn_start_decree(%" PRId64 ") <= _app->last_durable_decree(%" PRId64 "), " + "and mutation_log::get_learn_state() returns false", + name(), + request.signature, + request.learner.to_string(), + learn_start_decree, + _app->last_durable_decree()); + response.type = learn_type::LT_APP; + response.state = learn_state(); + } + + if (response.type == learn_type::LT_LOG) { + response.base_local_dir = _private_log->dir(); + if (response.state.files.size() > 0) { + auto &last_file = response.state.files.back(); + if (last_file == learner_state.last_learn_log_file) { + ddebug("%s: on_learn[%016" PRIx64 + "]: learner = %s, learn the same file %s repeatedly, hint to switch file", + name(), + request.signature, + request.learner.to_string(), + last_file.c_str()); + _private_log->hint_switch_file(); + } else { + learner_state.last_learn_log_file = last_file; + } + } + // it is safe to commit to last_committed_decree() now + response.state.to_decree_included = last_committed_decree(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, learn private logs succeed, " + "learned_meta_size = %u, learned_file_count = %u, " + "to_decree_included = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); + } else { + ::dsn::error_code err = _app->get_checkpoint( + learn_start_decree, request.app_specific_learn_request, response.state); + + if (err != ERR_OK) { + response.err = ERR_GET_LEARN_STATE_FAILED; + derror("%s: on_learn[%016" PRIx64 + "]: learner = %s, get app checkpoint failed, error = %s", + name(), + request.signature, + request.learner.to_string(), + err.to_string()); + } else { + response.base_local_dir = _app->data_dir(); + ddebug("%s: on_learn[%016" PRIx64 "]: learner = %s, get app learn state succeed, " + "learned_meta_size = %u, learned_file_count = %u, learned_to_decree = %" PRId64, + name(), + request.signature, + request.learner.to_string(), + response.state.meta.length(), + static_cast(response.state.files.size()), + response.state.to_decree_included); + } + } +} + void replica::on_learn_reply(error_code err, learn_request &&req, learn_response &&resp) { _checker.only_one_thread_access();