Skip to content

Commit

Permalink
fix: zk reconnect in db sdk (#3656)
Browse files Browse the repository at this point in the history
reconnect if need in CheckZk thread
  • Loading branch information
vagetablechicken authored Dec 8, 2023
1 parent e7538bd commit 40eaf50
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 18 deletions.
19 changes: 13 additions & 6 deletions src/sdk/db_sdk.cc
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,19 @@ ClusterSDK::~ClusterSDK() {
}

void ClusterSDK::CheckZk() {
if (session_id_ == 0) {
WatchNotify();
} else if (session_id_ != zk_client_->GetSessionTerm()) {
LOG(WARNING) << "session changed, re-watch notify";
WatchNotify();
// ensure that zk client is alive
if (zk_client_->EnsureConnected()) {
if (session_id_ == 0) {
WatchNotify();
} else if (session_id_ != zk_client_->GetSessionTerm()) {
LOG(WARNING) << "session changed, re-watch notify";
WatchNotify();
}
} else {
// 5min print once
LOG_EVERY_N(WARNING, 150) << "zk client is not connected, reconnect later";
}

pool_.DelayTask(2000, [this] { CheckZk(); });
}

Expand Down Expand Up @@ -383,7 +390,7 @@ bool ClusterSDK::InitTabletClient() {
std::vector<std::string> tablets;
bool ok = zk_client_->GetNodes(tablets);
if (!ok) {
LOG(WARNING) << "fail to get tablet";
LOG(WARNING) << "fail to get tablets from zk";
return false;
}
std::map<std::string, std::string> real_ep_map;
Expand Down
3 changes: 2 additions & 1 deletion src/sdk/db_sdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ class ClusterSDK : public DBSDK {
std::string globalvar_changed_notify_path_;
std::string leader_path_;
std::string taskmanager_leader_path_;

// CheckZk will be called periodically, so we don't need to check zk_client_ before using it
// if failed, just retry
::openmldb::zk::ZkClient* zk_client_;
::baidu::common::ThreadPool pool_;
};
Expand Down
31 changes: 24 additions & 7 deletions src/zk/zk_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void NodeWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat
}

void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* watcher_ctx) {
PDLOG(INFO, "node watcher with event type %d, state %d", type, state);
PDLOG(INFO, "item watcher with event type %d, state %d", type, state);
if (zoo_get_context(zh)) {
ZkClient* client = const_cast<ZkClient*>(reinterpret_cast<const ZkClient*>(zoo_get_context(zh)));
std::string path_str(path);
Expand All @@ -64,8 +64,8 @@ void ItemWatcher(zhandle_t* zh, int type, int state, const char* path, void* wat
}

ZkClient::ZkClient(const std::string& hosts, const std::string& real_endpoint, int32_t session_timeout,
const std::string& endpoint, const std::string& zk_root_path,
const std::string& auth_schema, const std::string& cert)
const std::string& endpoint, const std::string& zk_root_path, const std::string& auth_schema,
const std::string& cert)
: hosts_(hosts),
session_timeout_(session_timeout),
endpoint_(endpoint),
Expand All @@ -92,8 +92,8 @@ ZkClient::ZkClient(const std::string& hosts, const std::string& real_endpoint, i
}

ZkClient::ZkClient(const std::string& hosts, int32_t session_timeout, const std::string& endpoint,
const std::string& zk_root_path, const std::string& zone_path,
const std::string& auth_schema, const std::string& cert)
const std::string& zk_root_path, const std::string& zone_path, const std::string& auth_schema,
const std::string& cert)
: hosts_(hosts),
session_timeout_(session_timeout),
endpoint_(endpoint),
Expand Down Expand Up @@ -296,8 +296,7 @@ bool ZkClient::CreateNode(const std::string& node, const std::string& value, int
}
uint32_t size = node.size() + 11;
char path_buffer[size]; // NOLINT
int ret =
zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &acl_vector_, flags, path_buffer, size);
int ret = zoo_create(zk_, node.c_str(), value.c_str(), value.size(), &acl_vector_, flags, path_buffer, size);
if (ret == ZOK) {
assigned_path_name.assign(path_buffer, size - 1);
PDLOG(INFO, "create node %s ok and real node name %s", node.c_str(), assigned_path_name.c_str());
Expand Down Expand Up @@ -583,8 +582,13 @@ void ZkClient::LogEvent(int type, int state, const char* path) {
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
Connected();
} else if (state == ZOO_CONNECTING_STATE || state == ZOO_ASSOCIATING_STATE) {
// just wait
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
connected_ = false;
} else {
// unknow state, should retry
connected_ = false;
}
}
}
Expand Down Expand Up @@ -630,5 +634,18 @@ bool ZkClient::Mkdir(const std::string& path) {
return MkdirNoLock(path);
}

bool ZkClient::EnsureConnected() {
if (!IsConnected()) {
LOG(WARNING) << "reconnect zk";
if (Reconnect()) {
LOG(INFO) << "reconnect zk ok";
} else {
LOG(WARNING) << "reconnect zk failed";
return false;
}
}
return true;
}

} // namespace zk
} // namespace openmldb
13 changes: 9 additions & 4 deletions src/zk/zk_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ class ZkClient {
// session_timeout, the session timeout
// endpoint, the client endpoint
ZkClient(const std::string& hosts, const std::string& real_endpoint, int32_t session_timeout,
const std::string& endpoint, const std::string& zk_root_path,
const std::string& auth_schema, const std::string& cert);
const std::string& endpoint, const std::string& zk_root_path, const std::string& auth_schema,
const std::string& cert);

ZkClient(const std::string& hosts, int32_t session_timeout, const std::string& endpoint,
const std::string& zk_root_path, const std::string& zone_path,
const std::string& auth_schema, const std::string& cert);
const std::string& zk_root_path, const std::string& zone_path, const std::string& auth_schema,
const std::string& cert);
~ZkClient();

// init zookeeper connections
Expand Down Expand Up @@ -138,6 +138,11 @@ class ZkClient {
// when reconnect, need Register and Watchnodes again
bool Reconnect();

// ensure that zk client is connected:
// if not, try to reconnect, return false if reconnect failed
// DON'T use zk client if this function return false
bool EnsureConnected();

private:
void Connected();

Expand Down

0 comments on commit 40eaf50

Please sign in to comment.