Skip to content

Commit

Permalink
curvefs/client: optimizing the read amplification problem(especial me…
Browse files Browse the repository at this point in the history
…mcached)
  • Loading branch information
wuhongsong committed Oct 23, 2023
1 parent cc5095a commit 1cd8114
Show file tree
Hide file tree
Showing 13 changed files with 623 additions and 57 deletions.
14 changes: 13 additions & 1 deletion curvefs/conf/client.conf
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,18 @@ s3.writeCacheMaxByte=838860800
s3.readCacheMaxByte=209715200
# file cache read thread num
s3.readCacheThreads=5

# The data in the cache cluster download to local
s3.memClusterToLocal=true
# The data in the s3 storage download to local
s3.s3ToLocal=true
# read size bigger than this value will read until prefetch is finished
s3.bigIoSize=131072
# retry times when read big io failed
s3.bigIoRetryTimes=100
# retry interval when read big io failed
s3.bigIoRetryIntervalUs=100

# http = 0, https = 1
s3.http_scheme=0
s3.verify_SSL=False
Expand Down Expand Up @@ -247,7 +259,7 @@ diskCache.asyncLoadPeriodMs=5
# ok nearfull full
# |------------|-------------------|----------------------|
# 0 trimRatio*safeRatio safeRatio fullRatio
#
#
# 1. 0<=ok<trimRatio*safeRatio;
# 2. trimRatio*safeRatio<=nearfull<safeRatio
# 3. safeRatio<=full<=fullRatio
Expand Down
11 changes: 11 additions & 0 deletions curvefs/src/client/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,16 @@ void InitS3Option(Configuration *conf, S3Option *s3Opt) {
&s3Opt->s3ClientAdaptorOpt.writeCacheMaxByte);
conf->GetValueFatalIfFail("s3.readCacheMaxByte",
&s3Opt->s3ClientAdaptorOpt.readCacheMaxByte);
conf->GetValueFatalIfFail("s3.memClusterToLocal",
&s3Opt->s3ClientAdaptorOpt.memClusterToLocal);
conf->GetValueFatalIfFail("s3.s3ToLocal",
&s3Opt->s3ClientAdaptorOpt.s3ToLocal);
conf->GetValueFatalIfFail("s3.bigIoSize",
&s3Opt->s3ClientAdaptorOpt.bigIoSize);
conf->GetValueFatalIfFail("s3.bigIoRetryTimes",
&s3Opt->s3ClientAdaptorOpt.bigIoRetryTimes);
conf->GetValueFatalIfFail("s3.bigIoRetryIntervalUs",
&s3Opt->s3ClientAdaptorOpt.bigIoRetryIntervalUs);
conf->GetValueFatalIfFail("s3.readCacheThreads",
&s3Opt->s3ClientAdaptorOpt.readCacheThreads);
conf->GetValueFatalIfFail("s3.nearfullRatio",
Expand All @@ -238,6 +248,7 @@ void InitS3Option(Configuration *conf, S3Option *s3Opt) {
&s3Opt->s3ClientAdaptorOpt.readRetryIntervalMs);
::curve::common::InitS3AdaptorOptionExceptS3InfoOption(conf,
&s3Opt->s3AdaptrOpt);

InitDiskCacheOption(conf, &s3Opt->s3ClientAdaptorOpt.diskCacheOpt);
}

Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ struct S3ClientAdaptorOption {
uint32_t flushIntervalSec;
uint64_t writeCacheMaxByte;
uint64_t readCacheMaxByte;
bool memClusterToLocal;
bool s3ToLocal;
uint32_t bigIoSize;
uint32_t bigIoRetryTimes;
uint32_t bigIoRetryIntervalUs;
uint32_t readCacheThreads;
uint32_t nearfullRatio;
uint32_t baseSleepUs;
Expand Down
21 changes: 21 additions & 0 deletions curvefs/src/client/kvclient/kvclient_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,26 @@ void KVClientManager::Get(std::shared_ptr<GetKVCacheTask> task) {
});
}

void KVClientManager::Enqueue(std::shared_ptr<GetObjectAsyncContext> context) {
auto task = [this, context]() { this->GetKvCache(context); };
threadPool_.Enqueue(task);
}

int KVClientManager::GetKvCache(
std::shared_ptr<GetObjectAsyncContext> context) {
VLOG(9) << "GetKvCache start: " << context->key;
std::string error_log;
memcached_return_t retCode;
uint64_t actLength = 0;
context->retCode =
!client_->Get(context->key, context->buf, context->offset, context->len,
&error_log, &actLength, &retCode);
context->actualLen = actLength;
context->cb(nullptr, context);
VLOG(9) << "GetKvCache end: " << context->key << ", " << context->retCode
<< ", " << context->actualLen;
return 0;
}

} // namespace client
} // namespace curvefs
37 changes: 36 additions & 1 deletion curvefs/src/client/kvclient/kvclient_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ namespace client {
class KVClientManager;
struct SetKVCacheTask;
struct GetKVCacheTask;

class GetKvCacheContext;
class SetKvCacheContext;

using curve::common::GetObjectAsyncContext;
using curve::common::TaskThreadPool;
using curvefs::client::common::KVClientManagerOpt;

Expand Down Expand Up @@ -96,6 +101,33 @@ struct GetKVCacheTask {
timer(butil::Timer::STARTED) {}
};

using GetKvCacheCallBack =
std::function<void(const std::shared_ptr<GetKvCacheContext>&)>;

using SetKvCacheCallBack =
std::function<void(const std::shared_ptr<SetKvCacheContext>&)>;

struct KvCacheContext {
std::string key;
uint64_t inodeId;
uint64_t offset;
uint64_t length;
uint64_t chunkIndex;
uint64_t chunkPos;
uint64_t startTime;
};

struct GetKvCacheContext : KvCacheContext {
char* value;
bool res;
GetKvCacheCallBack cb;
};

struct SetKvCacheContext : KvCacheContext {
const char* value;
SetKvCacheCallBack cb;
};

class KVClientManager {
public:
KVClientManager() = default;
Expand All @@ -118,8 +150,11 @@ class KVClientManager {
return kvClientManagerMetric_.get();
}

void Enqueue(std::shared_ptr<GetObjectAsyncContext> context);

private:
void Uninit();
int GetKvCache(std::shared_ptr<GetObjectAsyncContext> context);

private:
TaskThreadPool<bthread::Mutex, bthread::ConditionVariable> threadPool_;
Expand All @@ -129,4 +164,4 @@ class KVClientManager {

} // namespace client
} // namespace curvefs
#endif // CURVEFS_SRC_CLIENT_KVCLIENT_KVCLIENT_MANAGER_H_
#endif // CURVEFS_SRC_CLIENT_KVCLIENT_KVCLIENT_MANAGER_H_
40 changes: 37 additions & 3 deletions curvefs/src/client/s3/client_s3_adaptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,42 @@
* Author: huyao
*/

#include "curvefs/src/client/s3/client_s3_adaptor.h"

#include <brpc/channel.h>
#include <brpc/controller.h>

#include <algorithm>
#include <list>
#include <utility>

#include "absl/memory/memory.h"
#include "curvefs/src/client/s3/client_s3_adaptor.h"
#include "curvefs/src/common/s3util.h"

namespace curvefs {

namespace client {

/**
* use curl -L mdsIp:port/flags/memClusterToLocal?setvalue=true
* for dynamic parameter configuration
*/
static bool pass_uint32(const char*, uint32_t) { return true; }
static bool pass_bool(const char*, bool) { return true; }
DEFINE_bool(memClusterToLocal, true,
"The data in the cache cluster download to local");
DEFINE_validator(memClusterToLocal, &pass_bool);
DEFINE_bool(s3ToLocal, true, "The data in the s3 storage download to local");
DEFINE_validator(s3ToLocal, &pass_bool);
DEFINE_uint32(
bigIoSize, 131072,
"read size bigger than this value will read until prefetch is finished");
DEFINE_validator(bigIoSize, &pass_uint32);
DEFINE_uint32(bigIoRetryTimes, 100, "retry times when read big io failed");
DEFINE_validator(bigIoRetryTimes, &pass_uint32);
DEFINE_uint32(bigIoRetryIntervalUs, 100,
"retry interval when read big io failed");
DEFINE_validator(bigIoRetryIntervalUs, &pass_uint32);

CURVEFS_ERROR
S3ClientAdaptorImpl::Init(
const S3ClientAdaptorOption &option, std::shared_ptr<S3Client> client,
Expand All @@ -55,6 +78,11 @@ S3ClientAdaptorImpl::Init(
prefetchExecQueueNum_ = option.prefetchExecQueueNum;
diskCacheType_ = option.diskCacheOpt.diskCacheType;
memCacheNearfullRatio_ = option.nearfullRatio;
FLAGS_memClusterToLocal = option.memClusterToLocal;
FLAGS_s3ToLocal = option.s3ToLocal;
FLAGS_bigIoSize = option.bigIoSize;
FLAGS_bigIoRetryTimes = option.bigIoRetryTimes;
FLAGS_bigIoRetryIntervalUs = option.bigIoRetryIntervalUs;
throttleBaseSleepUs_ = option.baseSleepUs;
flushIntervalSec_ = option.flushIntervalSec;
chunkFlushThreads_ = option.chunkFlushThreads;
Expand Down Expand Up @@ -100,7 +128,13 @@ S3ClientAdaptorImpl::Init(
<< ", readCacheMaxByte: " << option.readCacheMaxByte
<< ", readCacheThreads: " << option.readCacheThreads
<< ", nearfullRatio: " << option.nearfullRatio
<< ", baseSleepUs: " << option.baseSleepUs;
<< ", baseSleepUs: " << option.baseSleepUs
<< ", memClusterToLocal: " << FLAGS_memClusterToLocal
<< ", s3ToLocal: " << FLAGS_s3ToLocal
<< ", bigIoSize: " << FLAGS_bigIoSize
<< ", bigIoRetryTimes: " << FLAGS_bigIoRetryTimes
<< ", bigIoRetryIntervalUs: " << FLAGS_bigIoRetryIntervalUs;

// start chunk flush threads
taskPool_.Start(chunkFlushThreads_);
return CURVEFS_ERROR::OK;
Expand Down
24 changes: 24 additions & 0 deletions curvefs/src/client/s3/client_s3_adaptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,67 +143,86 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
CURVEFS_ERROR FlushAllCache(uint64_t inodeId);
CURVEFS_ERROR FsSync();
int Stop();

uint64_t GetBlockSize() {
return blockSize_;
}

uint64_t GetChunkSize() {
return chunkSize_;
}

uint32_t GetObjectPrefix() {
return objectPrefix_;
}

std::shared_ptr<FsCacheManager> GetFsCacheManager() {
return fsCacheManager_;
}

uint32_t GetFlushInterval() { return flushIntervalSec_; }

std::shared_ptr<S3Client> GetS3Client() { return client_; }

uint32_t GetPrefetchBlocks() {
return prefetchBlocks_;
}

uint32_t GetDiskCacheType() {
return diskCacheType_;
}

bool DisableDiskCache() {
return diskCacheType_ == DiskCacheType::Disable;
}

bool HasDiskCache() {
return diskCacheType_ != DiskCacheType::Disable;
}

bool IsReadCache() {
return diskCacheType_ == DiskCacheType::OnlyRead;
}

bool IsReadWriteCache() {
return diskCacheType_ == DiskCacheType::ReadWrite;
}

std::shared_ptr<InodeCacheManager> GetInodeCacheManager() {
return inodeManager_;
}

std::shared_ptr<DiskCacheManagerImpl> GetDiskCacheManager() {
return diskCacheManagerImpl_;
}

FSStatusCode AllocS3ChunkId(uint32_t fsId, uint32_t idNum,
uint64_t *chunkId);
void FsSyncSignal() {
std::lock_guard<std::mutex> lk(mtx_);
VLOG(3) << "fs sync signal";
cond_.notify_one();
}

void FsSyncSignalAndDataCacheInc() {
std::lock_guard<std::mutex> lk(mtx_);
fsCacheManager_->DataCacheNumInc();
VLOG(3) << "fs sync signal";
cond_.notify_one();
}

void SetFsId(uint32_t fsId) {
fsId_ = fsId;
}

uint32_t GetFsId() {
return fsId_;
}

uint32_t GetPageSize() {
return pageSize_;
}

void InitMetrics(const std::string &fsName);

void SetDiskCache(DiskCacheType type) {
Expand Down Expand Up @@ -253,6 +272,11 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor {
uint32_t flushIntervalSec_;
uint32_t chunkFlushThreads_;
uint32_t memCacheNearfullRatio_;
bool memClusterToLocal_;
bool s3ToLocal_;
uint32_t bigIoSize_;
uint32_t bigIoRetryTimes_;
uint32_t bigIoRetryIntervalUs_;
uint32_t throttleBaseSleepUs_;
uint32_t maxReadRetryIntervalMs_;
uint32_t readRetryIntervalMs_;
Expand Down
Loading

0 comments on commit 1cd8114

Please sign in to comment.