Skip to content

Commit

Permalink
增加 require_snapshot 选项
Browse files Browse the repository at this point in the history
  • Loading branch information
owent committed Nov 6, 2024
1 parent bab746b commit 29ad05c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
15 changes: 14 additions & 1 deletion include/distributed_system/wal_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
using vtable_pointer = typename wal_mt_mode_data_trait<vtable_type, log_operator_type::mt_mode>::strong_ptr;

struct configure_type : public object_type::configure_type {
bool require_snapshot;
duration subscriber_heartbeat_interval;
duration subscriber_heartbeat_retry_interval;
};
Expand All @@ -104,7 +105,8 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
: vtable_(helper.vt),
configure_(helper.conf),
wal_object_(helper.wal_object),
next_heartbeat_timepoint_(helper.next_heartbeat) {
next_heartbeat_timepoint_(helper.next_heartbeat),
received_snapshot_(false) {
if (wal_object_) {
wal_object_->set_internal_event_on_assign_logs([this](object_type& wal) {
// reset finished key
Expand Down Expand Up @@ -159,6 +161,7 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
object_type::default_configure(*ret);
ret->accept_log_when_hash_matched = true;

ret->require_snapshot = false;
ret->subscriber_heartbeat_interval = std::chrono::duration_cast<duration>(std::chrono::minutes{3});
ret->subscriber_heartbeat_retry_interval = std::chrono::duration_cast<duration>(std::chrono::minutes{1});

Expand Down Expand Up @@ -296,6 +299,10 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
return wal_result_code::kInvalidParam;
}

if (configure_ && configure_->require_snapshot && !received_snapshot_) {
return wal_result_code::kClientRequireSnapshot;
}

if (vtable_ && vtable_->get_log_key) {
auto log_key = vtable_->get_log_key(*wal_object_, *log);

Expand Down Expand Up @@ -350,6 +357,10 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
return wal_result_code::kInvalidParam;
}

if (configure_ && configure_->require_snapshot && !received_snapshot_) {
return wal_result_code::kClientRequireSnapshot;
}

if (vtable_ && vtable_->get_log_key) {
auto log_key = vtable_->get_log_key(*wal_object_, *log);

Expand Down Expand Up @@ -396,6 +407,7 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
}

wal_result_code receive_snapshot(const snapshot_type& snapshot, callback_param_type param) {
received_snapshot_ = true;
if (vtable_ && vtable_->on_receive_snapshot) {
return vtable_->on_receive_snapshot(*this, snapshot, param);
}
Expand Down Expand Up @@ -432,6 +444,7 @@ class LIBATFRAME_UTILS_API_HEAD_ONLY wal_client {
// publish-subscribe
time_point next_heartbeat_timepoint_;
std::unique_ptr<log_key_type> last_finished_log_key_;
bool received_snapshot_;
};

} // namespace distributed_system
Expand Down
1 change: 1 addition & 0 deletions include/distributed_system/wal_common_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ struct LIBATFRAME_UTILS_API_HEAD_ONLY wal_meta_type {
};

enum class wal_result_code : int32_t {
kClientRequireSnapshot = -301,
kSubscriberNotFound = -201,

kHashCodeMismatch = -106,
Expand Down

0 comments on commit 29ad05c

Please sign in to comment.