Skip to content

Commit

Permalink
[feat]curvefs/client kvcache monitor
Browse files Browse the repository at this point in the history
1. fix base metric(bps/qps/latnecy) error
2. add hit and miss metric
3. change metric prefix to curvefs_kvclient_manager
4. add memcache client metric

Signed-off-by: Cyber-SiKu <[email protected]>
  • Loading branch information
Cyber-SiKu committed Oct 21, 2023
1 parent 712fa17 commit 5e44c53
Show file tree
Hide file tree
Showing 12 changed files with 2,179 additions and 335 deletions.
2,285 changes: 2,017 additions & 268 deletions curvefs/monitor/grafana/provisioning/dashboards/client.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions curvefs/src/client/fuse_s3_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,14 @@ bool FuseS3Client::InitKVCache(const KVClientManagerOpt &opt) {

// init kvcache client
auto memcacheClient = std::make_shared<MemCachedClient>();
if (!memcacheClient->Init(kvcachecluster)) {
if (!memcacheClient->Init(kvcachecluster, fsInfo_->fsname())) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init memcache client fail";
return false;
}

kvClientManager_ = std::make_shared<KVClientManager>();
if (!kvClientManager_->Init(opt, memcacheClient)) {
if (!kvClientManager_->Init(opt, memcacheClient, fsInfo_->fsname())) {
LOG(ERROR) << "FLAGS_supportKVcache = " << FLAGS_supportKVcache
<< ", but init kvClientManager fail";
return false;
Expand Down
7 changes: 5 additions & 2 deletions curvefs/src/client/kvclient/kvclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#ifndef CURVEFS_SRC_CLIENT_KVCLIENT_KVCLIENT_H_
#define CURVEFS_SRC_CLIENT_KVCLIENT_KVCLIENT_H_

#include <libmemcached-1.0/types/return.h>

#include <string>

namespace curvefs {
Expand Down Expand Up @@ -49,8 +51,9 @@ class KVClient {
virtual bool Set(const std::string &key, const char *value,
const uint64_t value_len, std::string *errorlog) = 0;

virtual bool Get(const std::string &key, char *value, uint64_t offset,
uint64_t length, std::string *errorlog) = 0;
virtual bool Get(const std::string& key, char* value, uint64_t offset,
uint64_t length, std::string* errorlog,
uint64_t* actLength, memcached_return_t* retCod) = 0;
};

} // namespace client
Expand Down
72 changes: 50 additions & 22 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,37 @@
*/

#include "curvefs/src/client/kvclient/kvclient_manager.h"

#include <memory>

#include "absl/memory/memory.h"
#include "curvefs/src/client/metric/client_metric.h"
#include "src/client/client_metric.h"
#include "src/common/concurrent/count_down_event.h"

using curve::client::LatencyGuard;
using curve::common::CountDownEvent;
using curvefs::client::metric::KVClientMetric;
using curvefs::client::metric::KVClientManagerMetric;

namespace curvefs {
namespace client {

#define ONRETURN(TYPE, RES) \
if (RES) { \
kvClientMetric_.kvClient##TYPE.qps.count << 1; \
} else { \
kvClientMetric_.kvClient##TYPE.eps.count << 1; \
} \
template <typename Metric, typename TaskSharePtr>
void OnReturn(Metric* metric, const TaskSharePtr task) {
task->timer.stop();
if (task->res) {
metric::CollectMetrics(metric, task->length, task->timer.u_elapsed());
} else {
metric->eps.count << 1;
}
task->done(task);
}

bool KVClientManager::Init(const KVClientManagerOpt &config,
const std::shared_ptr<KVClient> &kvclient) {
bool KVClientManager::Init(const KVClientManagerOpt& config,
const std::shared_ptr<KVClient>& kvclient,
const std::string& fsName) {
client_ = kvclient;
kvClientManagerMetric_ = absl::make_unique<KVClientManagerMetric>(fsName);
return threadPool_.Start(config.setThreadPooln) == 0;
}

Expand All @@ -52,29 +62,47 @@ void KVClientManager::Uninit() {

void KVClientManager::Set(std::shared_ptr<SetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&kvClientMetric_.kvClientSet.latency);

std::string error_log;
task->res =
client_->Set(task->key, task->value, task->length, &error_log);
task->timer.stop();
ONRETURN(Set, task->res);

task->done(task);
if (task->res) {
kvClientManagerMetric_->count << 1;
}
OnReturn(&kvClientManagerMetric_->set, task);
});
}

void UpdateHitMissMetric(memcached_return_t retCode,
KVClientManagerMetric* metric) {
// https://awesomized.github.io/libmemcached/libmemcached/memcached_return_t.html
switch (retCode) {
case MEMCACHED_SUCCESS:
metric->hit << 1;
break;
case MEMCACHED_DATA_DOES_NOT_EXIST:
// The data requested with the key given was not found.
case MEMCACHED_DATA_EXISTS:
// The data requested with the key given was not found.
case MEMCACHED_DELETED:
// The object requested by the key has been deleted.
case MEMCACHED_NOTFOUND:
// The object requested was not found.
metric->miss << 1;
break;
default:
break;
}
}

void KVClientManager::Get(std::shared_ptr<GetKVCacheTask> task) {
threadPool_.Enqueue([task, this]() {
LatencyGuard guard(&kvClientMetric_.kvClientGet.latency);

std::string error_log;
memcached_return_t retCode;
task->res = client_->Get(task->key, task->value, task->offset,
task->length, &error_log);
task->timer.stop();
ONRETURN(Get, task->res);

task->done(task);
task->valueLength, &error_log, &task->length,
&retCode);
UpdateHitMissMetric(retCode, kvClientManagerMetric_.get());
OnReturn(&kvClientManagerMetric_->get, task);
});
}

Expand Down
26 changes: 16 additions & 10 deletions curvefs/src/client/kvclient/kvclient_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,20 @@

#include <bthread/condition_variable.h>

#include <thread>
#include <cstdint>
#include <memory>
#include <utility>
#include <string>
#include <thread>
#include <utility>

#include "absl/strings/string_view.h"
#include "curvefs/src/client/kvclient/kvclient.h"
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/kvclient/kvclient.h"
#include "curvefs/src/client/metric/client_metric.h"
#include "src/common/concurrent/task_thread_pool.h"
#include "src/common/s3_adapter.h"

using curvefs::client::metric::KVClientMetric;
using curvefs::client::metric::KVClientManagerMetric;

namespace curvefs {
namespace client {
Expand Down Expand Up @@ -76,7 +77,8 @@ struct GetKVCacheTask {
const std::string& key;
char* value;
uint64_t offset;
uint64_t length;
uint64_t valueLength;
uint64_t length; // actual length of value
bool res;
GetKVCacheDone done;
butil::Timer timer;
Expand All @@ -87,7 +89,8 @@ struct GetKVCacheTask {
: key(k),
value(v),
offset(off),
length(len),
valueLength(len),
length(0),
res(false),
done(std::move(done)),
timer(butil::Timer::STARTED) {}
Expand All @@ -98,8 +101,9 @@ class KVClientManager {
KVClientManager() = default;
~KVClientManager() { Uninit(); }

bool Init(const KVClientManagerOpt &config,
const std::shared_ptr<KVClient> &kvclient);
bool Init(const KVClientManagerOpt& config,
const std::shared_ptr<KVClient>& kvclient,
const std::string& fsName);

/**
* It will get a db client and set the key value asynchronusly.
Expand All @@ -110,15 +114,17 @@ class KVClientManager {

void Get(std::shared_ptr<GetKVCacheTask> task);

KVClientMetric *GetClientMetricForTesting() { return &kvClientMetric_; }
KVClientManagerMetric* GetMetricForTesting() {
return kvClientManagerMetric_.get();
}

private:
void Uninit();

private:
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable> threadPool_;
std::shared_ptr<KVClient> client_;
KVClientMetric kvClientMetric_;
std::unique_ptr<KVClientManagerMetric> kvClientManagerMetric_;
};

} // namespace client
Expand Down
36 changes: 29 additions & 7 deletions curvefs/src/client/kvclient/memcache_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@
#include <libmemcached-1.0/memcached.h>
#include <libmemcached-1.0/types/return.h>

#include <cstdint>
#include <memory>
#include <string>

#include "curvefs/src/client/kvclient/kvclient.h"
#include "absl/memory/memory.h"
#include "curvefs/proto/topology.pb.h"
#include "curvefs/src/client/kvclient/kvclient.h"
#include "curvefs/src/client/metric/client_metric.h"

namespace curvefs {

Expand Down Expand Up @@ -83,7 +87,9 @@ class MemCachedClient : public KVClient {
explicit MemCachedClient(memcached_st *cli) : client_(cli) {}
~MemCachedClient() { UnInit(); }

bool Init(const MemcacheClusterInfo &kvcachecluster) {
bool Init(const MemcacheClusterInfo& kvcachecluster,
const std::string& fsName) {
metric_ = absl::make_unique<metric::MemcacheClientMetric>(fsName);
client_ = memcached(nullptr, 0);

for (int i = 0; i < kvcachecluster.servers_size(); i++) {
Expand All @@ -106,26 +112,32 @@ class MemCachedClient : public KVClient {
}
}

bool Set(const std::string &key, const char *value,
const uint64_t value_len, std::string *errorlog) override {
bool Set(const std::string& key, const char* value,
const uint64_t value_len, std::string* errorlog) override {
uint64_t start = butil::cpuwide_time_us();
if (nullptr == tcli) {
tcli = memcached_clone(nullptr, client_);
}
auto res = memcached_set(tcli, key.c_str(), key.length(), value,
value_len, 0, 0);
if (MEMCACHED_SUCCESS == res) {
VLOG(9) << "Set key = " << key << " OK";
metric::CollectMetrics(&metric_->set, value_len,
butil::cpuwide_time_us() - start);
return true;
}
*errorlog = ResError(res);
memcached_free(tcli);
tcli = nullptr;
LOG(ERROR) << "Set key = " << key << " error = " << *errorlog;
metric_->set.eps.count << 1;
return false;
}

bool Get(const std::string &key, char *value, uint64_t offset,
uint64_t length, std::string *errorlog) override {
bool Get(const std::string& key, char* value, uint64_t offset,
uint64_t length, std::string* errorlog, uint64_t* actLength,
memcached_return_t* retCode) override {
uint64_t start = butil::cpuwide_time_us();
if (nullptr == tcli) {
// multi thread use a memcached_st* client is unsafe.
// should clone it or use memcached_st_pool.
Expand All @@ -136,11 +148,19 @@ class MemCachedClient : public KVClient {
memcached_return_t ue;
char *res = memcached_get(tcli, key.c_str(), key.length(),
&value_length, &flags, &ue);
if (actLength != nullptr) {
(*actLength) = value_length;
}
if (retCode != nullptr) {
(*retCode) = ue;
}
if (MEMCACHED_SUCCESS == ue && res != nullptr && value &&
value_length >= length) {
VLOG(9) << "Get key = " << key << " OK";
memcpy(value, res + offset, length);
free(res);
metric::CollectMetrics(&metric_->get, value_length,
butil::cpuwide_time_us() - start);
return true;
}

Expand All @@ -153,6 +173,7 @@ class MemCachedClient : public KVClient {
tcli = nullptr;
}

metric_->get.eps.count << 1;
return false;
}

Expand Down Expand Up @@ -198,7 +219,8 @@ class MemCachedClient : public KVClient {

private:
memcached_server_st *server_;
memcached_st *client_;
memcached_st* client_;
std::unique_ptr<metric::MemcacheClientMetric> metric_;
};

} // namespace client
Expand Down
5 changes: 4 additions & 1 deletion curvefs/src/client/metric/client_metric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ const std::string S3MultiManagerMetric::prefix = "curvefs_client_manager"; // N
const std::string FSMetric::prefix = "curvefs_client"; // NOLINT
const std::string S3Metric::prefix = "curvefs_s3"; // NOLINT
const std::string DiskCacheMetric::prefix = "curvefs_disk_cache"; // NOLINT
const std::string KVClientMetric::prefix = "curvefs_kvclient"; // NOLINT
const std::string KVClientManagerMetric::prefix = // NOLINT
"curvefs_kvclient_manager"; // NOLINT
const std::string MemcacheClientMetric::prefix = // NOLINT
"curvefs_memcache_client"; // NOLINT
const std::string S3ChunkInfoMetric::prefix = "inode_s3_chunk_info"; // NOLINT
const std::string WarmupManagerS3Metric::prefix = "curvefs_warmup"; // NOLINT

Expand Down
40 changes: 34 additions & 6 deletions curvefs/src/client/metric/client_metric.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,13 +304,41 @@ struct DiskCacheMetric {
trim_(prefix, fsName + "_diskcache_trim") {}
};

struct KVClientMetric {
struct KVClientManagerMetric {
static const std::string prefix;
InterfaceMetric kvClientGet;
InterfaceMetric kvClientSet;

KVClientMetric()
: kvClientGet(prefix, "get"), kvClientSet(prefix, "set") {}
std::string fsName;
InterfaceMetric get;
InterfaceMetric set;
// kvcache count
bvar::Adder<uint64_t> count;
// kvcache hit
bvar::Adder<uint64_t> hit;
// kvcache miss
bvar::Adder<uint64_t> miss;

explicit KVClientManagerMetric(const std::string& name = "")
: fsName(!name.empty() ? name
: prefix + curve::common::ToHexString(this)),
get(prefix, fsName + "_get"),
set(prefix, fsName + "_set"),
count(prefix, fsName + "_count"),
hit(prefix, fsName + "_hit"),
miss(prefix, fsName + "_miss") {}
};

struct MemcacheClientMetric {
static const std::string prefix;

std::string fsName;
InterfaceMetric get;
InterfaceMetric set;

explicit MemcacheClientMetric(const std::string& name = "")
: fsName(!name.empty() ? name
: prefix + curve::common::ToHexString(this)),
get(prefix, fsName + "_get"),
set(prefix, fsName + "_set") {}
};

struct S3ChunkInfoMetric {
Expand All @@ -332,7 +360,7 @@ struct WarmupManagerS3Metric {
warmupS3CacheSize(prefix, "s3_cache_size") {}
};

void CollectMetrics(InterfaceMetric* interface, int count, uint64_t start);
void CollectMetrics(InterfaceMetric* interface, int count, uint64_t u_elapsed);

void AsyncContextCollectMetrics(
std::shared_ptr<S3Metric> s3Metric,
Expand Down
Loading

0 comments on commit 5e44c53

Please sign in to comment.