From 1cd81146ae838bb5e53529ce2f410a30254c745e Mon Sep 17 00:00:00 2001 From: hzwuhongsong Date: Wed, 11 Oct 2023 15:24:37 +0800 Subject: [PATCH] curvefs/client: optimizing the read amplification problem(especial memcached) --- curvefs/conf/client.conf | 14 +- curvefs/src/client/common/config.cpp | 11 + curvefs/src/client/common/config.h | 5 + .../src/client/kvclient/kvclient_manager.cpp | 21 ++ .../src/client/kvclient/kvclient_manager.h | 37 ++- curvefs/src/client/s3/client_s3_adaptor.cpp | 40 ++- curvefs/src/client/s3/client_s3_adaptor.h | 24 ++ .../src/client/s3/client_s3_cache_manager.cpp | 146 +++++++-- .../src/client/s3/client_s3_cache_manager.h | 21 +- .../src/client/s3/disk_cache_manager_impl.h | 16 +- curvefs/test/client/BUILD | 24 ++ curvefs/test/client/client_prefetch_test.cpp | 309 ++++++++++++++++++ curvefs/test/client/mock_disk_cache_manager.h | 12 +- 13 files changed, 623 insertions(+), 57 deletions(-) create mode 100644 curvefs/test/client/client_prefetch_test.cpp diff --git a/curvefs/conf/client.conf b/curvefs/conf/client.conf index c66477926c..f9da11dfe0 100644 --- a/curvefs/conf/client.conf +++ b/curvefs/conf/client.conf @@ -204,6 +204,18 @@ s3.writeCacheMaxByte=838860800 s3.readCacheMaxByte=209715200 # file cache read thread num s3.readCacheThreads=5 + +# The data in the cache cluster download to local +s3.memClusterToLocal=true +# The data in the s3 storage download to local +s3.s3ToLocal=true +# read size bigger than this value will read until prefetch is finished +s3.bigIoSize=131072 +# retry times when read big io failed +s3.bigIoRetryTimes=100 +# retry interval when read big io failed +s3.bigIoRetryIntervalUs=100 + # http = 0, https = 1 s3.http_scheme=0 s3.verify_SSL=False @@ -247,7 +259,7 @@ diskCache.asyncLoadPeriodMs=5 # ok nearfull full # |------------|-------------------|----------------------| # 0 trimRatio*safeRatio safeRatio fullRatio -# +# # 1. 0<=oks3ClientAdaptorOpt.writeCacheMaxByte); conf->GetValueFatalIfFail("s3.readCacheMaxByte", &s3Opt->s3ClientAdaptorOpt.readCacheMaxByte); + conf->GetValueFatalIfFail("s3.memClusterToLocal", + &s3Opt->s3ClientAdaptorOpt.memClusterToLocal); + conf->GetValueFatalIfFail("s3.s3ToLocal", + &s3Opt->s3ClientAdaptorOpt.s3ToLocal); + conf->GetValueFatalIfFail("s3.bigIoSize", + &s3Opt->s3ClientAdaptorOpt.bigIoSize); + conf->GetValueFatalIfFail("s3.bigIoRetryTimes", + &s3Opt->s3ClientAdaptorOpt.bigIoRetryTimes); + conf->GetValueFatalIfFail("s3.bigIoRetryIntervalUs", + &s3Opt->s3ClientAdaptorOpt.bigIoRetryIntervalUs); conf->GetValueFatalIfFail("s3.readCacheThreads", &s3Opt->s3ClientAdaptorOpt.readCacheThreads); conf->GetValueFatalIfFail("s3.nearfullRatio", @@ -238,6 +248,7 @@ void InitS3Option(Configuration *conf, S3Option *s3Opt) { &s3Opt->s3ClientAdaptorOpt.readRetryIntervalMs); ::curve::common::InitS3AdaptorOptionExceptS3InfoOption(conf, &s3Opt->s3AdaptrOpt); + InitDiskCacheOption(conf, &s3Opt->s3ClientAdaptorOpt.diskCacheOpt); } diff --git a/curvefs/src/client/common/config.h b/curvefs/src/client/common/config.h index 6d93f5a059..30c6417ea7 100644 --- a/curvefs/src/client/common/config.h +++ b/curvefs/src/client/common/config.h @@ -127,6 +127,11 @@ struct S3ClientAdaptorOption { uint32_t flushIntervalSec; uint64_t writeCacheMaxByte; uint64_t readCacheMaxByte; + bool memClusterToLocal; + bool s3ToLocal; + uint32_t bigIoSize; + uint32_t bigIoRetryTimes; + uint32_t bigIoRetryIntervalUs; uint32_t readCacheThreads; uint32_t nearfullRatio; uint32_t baseSleepUs; diff --git a/curvefs/src/client/kvclient/kvclient_manager.cpp b/curvefs/src/client/kvclient/kvclient_manager.cpp index 4ce8f6bd78..d85cc2e5b2 100644 --- a/curvefs/src/client/kvclient/kvclient_manager.cpp +++ b/curvefs/src/client/kvclient/kvclient_manager.cpp @@ -106,5 +106,26 @@ void KVClientManager::Get(std::shared_ptr task) { }); } +void KVClientManager::Enqueue(std::shared_ptr context) { + auto task = [this, context]() { this->GetKvCache(context); }; + threadPool_.Enqueue(task); +} + +int KVClientManager::GetKvCache( + std::shared_ptr context) { + VLOG(9) << "GetKvCache start: " << context->key; + std::string error_log; + memcached_return_t retCode; + uint64_t actLength = 0; + context->retCode = + !client_->Get(context->key, context->buf, context->offset, context->len, + &error_log, &actLength, &retCode); + context->actualLen = actLength; + context->cb(nullptr, context); + VLOG(9) << "GetKvCache end: " << context->key << ", " << context->retCode + << ", " << context->actualLen; + return 0; +} + } // namespace client } // namespace curvefs diff --git a/curvefs/src/client/kvclient/kvclient_manager.h b/curvefs/src/client/kvclient/kvclient_manager.h index ad8701ec84..bad99193a8 100644 --- a/curvefs/src/client/kvclient/kvclient_manager.h +++ b/curvefs/src/client/kvclient/kvclient_manager.h @@ -46,6 +46,11 @@ namespace client { class KVClientManager; struct SetKVCacheTask; struct GetKVCacheTask; + +class GetKvCacheContext; +class SetKvCacheContext; + +using curve::common::GetObjectAsyncContext; using curve::common::TaskThreadPool; using curvefs::client::common::KVClientManagerOpt; @@ -96,6 +101,33 @@ struct GetKVCacheTask { timer(butil::Timer::STARTED) {} }; +using GetKvCacheCallBack = + std::function&)>; + +using SetKvCacheCallBack = + std::function&)>; + +struct KvCacheContext { + std::string key; + uint64_t inodeId; + uint64_t offset; + uint64_t length; + uint64_t chunkIndex; + uint64_t chunkPos; + uint64_t startTime; +}; + +struct GetKvCacheContext : KvCacheContext { + char* value; + bool res; + GetKvCacheCallBack cb; +}; + +struct SetKvCacheContext : KvCacheContext { + const char* value; + SetKvCacheCallBack cb; +}; + class KVClientManager { public: KVClientManager() = default; @@ -118,8 +150,11 @@ class KVClientManager { return kvClientManagerMetric_.get(); } + void Enqueue(std::shared_ptr context); + private: void Uninit(); + int GetKvCache(std::shared_ptr context); private: TaskThreadPool threadPool_; @@ -129,4 +164,4 @@ class KVClientManager { } // namespace client } // namespace curvefs -#endif // CURVEFS_SRC_CLIENT_KVCLIENT_KVCLIENT_MANAGER_H_ +#endif // CURVEFS_SRC_CLIENT_KVCLIENT_KVCLIENT_MANAGER_H_ diff --git a/curvefs/src/client/s3/client_s3_adaptor.cpp b/curvefs/src/client/s3/client_s3_adaptor.cpp index d6ce3a8bc9..3ec74685e2 100644 --- a/curvefs/src/client/s3/client_s3_adaptor.cpp +++ b/curvefs/src/client/s3/client_s3_adaptor.cpp @@ -20,19 +20,42 @@ * Author: huyao */ +#include "curvefs/src/client/s3/client_s3_adaptor.h" + #include #include + #include #include #include #include "absl/memory/memory.h" -#include "curvefs/src/client/s3/client_s3_adaptor.h" #include "curvefs/src/common/s3util.h" namespace curvefs { - namespace client { + +/** + * use curl -L mdsIp:port/flags/memClusterToLocal?setvalue=true + * for dynamic parameter configuration + */ +static bool pass_uint32(const char*, uint32_t) { return true; } +static bool pass_bool(const char*, bool) { return true; } +DEFINE_bool(memClusterToLocal, true, + "The data in the cache cluster download to local"); +DEFINE_validator(memClusterToLocal, &pass_bool); +DEFINE_bool(s3ToLocal, true, "The data in the s3 storage download to local"); +DEFINE_validator(s3ToLocal, &pass_bool); +DEFINE_uint32( + bigIoSize, 131072, + "read size bigger than this value will read until prefetch is finished"); +DEFINE_validator(bigIoSize, &pass_uint32); +DEFINE_uint32(bigIoRetryTimes, 100, "retry times when read big io failed"); +DEFINE_validator(bigIoRetryTimes, &pass_uint32); +DEFINE_uint32(bigIoRetryIntervalUs, 100, + "retry interval when read big io failed"); +DEFINE_validator(bigIoRetryIntervalUs, &pass_uint32); + CURVEFS_ERROR S3ClientAdaptorImpl::Init( const S3ClientAdaptorOption &option, std::shared_ptr client, @@ -55,6 +78,11 @@ S3ClientAdaptorImpl::Init( prefetchExecQueueNum_ = option.prefetchExecQueueNum; diskCacheType_ = option.diskCacheOpt.diskCacheType; memCacheNearfullRatio_ = option.nearfullRatio; + FLAGS_memClusterToLocal = option.memClusterToLocal; + FLAGS_s3ToLocal = option.s3ToLocal; + FLAGS_bigIoSize = option.bigIoSize; + FLAGS_bigIoRetryTimes = option.bigIoRetryTimes; + FLAGS_bigIoRetryIntervalUs = option.bigIoRetryIntervalUs; throttleBaseSleepUs_ = option.baseSleepUs; flushIntervalSec_ = option.flushIntervalSec; chunkFlushThreads_ = option.chunkFlushThreads; @@ -100,7 +128,13 @@ S3ClientAdaptorImpl::Init( << ", readCacheMaxByte: " << option.readCacheMaxByte << ", readCacheThreads: " << option.readCacheThreads << ", nearfullRatio: " << option.nearfullRatio - << ", baseSleepUs: " << option.baseSleepUs; + << ", baseSleepUs: " << option.baseSleepUs + << ", memClusterToLocal: " << FLAGS_memClusterToLocal + << ", s3ToLocal: " << FLAGS_s3ToLocal + << ", bigIoSize: " << FLAGS_bigIoSize + << ", bigIoRetryTimes: " << FLAGS_bigIoRetryTimes + << ", bigIoRetryIntervalUs: " << FLAGS_bigIoRetryIntervalUs; + // start chunk flush threads taskPool_.Start(chunkFlushThreads_); return CURVEFS_ERROR::OK; diff --git a/curvefs/src/client/s3/client_s3_adaptor.h b/curvefs/src/client/s3/client_s3_adaptor.h index 5f9e52a528..dcc25c3795 100644 --- a/curvefs/src/client/s3/client_s3_adaptor.h +++ b/curvefs/src/client/s3/client_s3_adaptor.h @@ -143,12 +143,15 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor { CURVEFS_ERROR FlushAllCache(uint64_t inodeId); CURVEFS_ERROR FsSync(); int Stop(); + uint64_t GetBlockSize() { return blockSize_; } + uint64_t GetChunkSize() { return chunkSize_; } + uint32_t GetObjectPrefix() { return objectPrefix_; } @@ -156,32 +159,43 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor { std::shared_ptr GetFsCacheManager() { return fsCacheManager_; } + uint32_t GetFlushInterval() { return flushIntervalSec_; } + std::shared_ptr GetS3Client() { return client_; } + uint32_t GetPrefetchBlocks() { return prefetchBlocks_; } + uint32_t GetDiskCacheType() { return diskCacheType_; } + bool DisableDiskCache() { return diskCacheType_ == DiskCacheType::Disable; } + bool HasDiskCache() { return diskCacheType_ != DiskCacheType::Disable; } + bool IsReadCache() { return diskCacheType_ == DiskCacheType::OnlyRead; } + bool IsReadWriteCache() { return diskCacheType_ == DiskCacheType::ReadWrite; } + std::shared_ptr GetInodeCacheManager() { return inodeManager_; } + std::shared_ptr GetDiskCacheManager() { return diskCacheManagerImpl_; } + FSStatusCode AllocS3ChunkId(uint32_t fsId, uint32_t idNum, uint64_t *chunkId); void FsSyncSignal() { @@ -189,21 +203,26 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor { VLOG(3) << "fs sync signal"; cond_.notify_one(); } + void FsSyncSignalAndDataCacheInc() { std::lock_guard lk(mtx_); fsCacheManager_->DataCacheNumInc(); VLOG(3) << "fs sync signal"; cond_.notify_one(); } + void SetFsId(uint32_t fsId) { fsId_ = fsId; } + uint32_t GetFsId() { return fsId_; } + uint32_t GetPageSize() { return pageSize_; } + void InitMetrics(const std::string &fsName); void SetDiskCache(DiskCacheType type) { @@ -253,6 +272,11 @@ class S3ClientAdaptorImpl : public S3ClientAdaptor { uint32_t flushIntervalSec_; uint32_t chunkFlushThreads_; uint32_t memCacheNearfullRatio_; + bool memClusterToLocal_; + bool s3ToLocal_; + uint32_t bigIoSize_; + uint32_t bigIoRetryTimes_; + uint32_t bigIoRetryIntervalUs_; uint32_t throttleBaseSleepUs_; uint32_t maxReadRetryIntervalMs_; uint32_t readRetryIntervalMs_; diff --git a/curvefs/src/client/s3/client_s3_cache_manager.cpp b/curvefs/src/client/s3/client_s3_cache_manager.cpp index 4eab99161e..a2b3de3d40 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.cpp +++ b/curvefs/src/client/s3/client_s3_cache_manager.cpp @@ -28,6 +28,8 @@ #include #include "absl/cleanup/cleanup.h" +#include "absl/strings/str_split.h" +#include "absl/strings/string_view.h" #include "absl/synchronization/blocking_counter.h" #include "curvefs/src/client/kvclient/kvclient_manager.h" #include "curvefs/src/client/metric/client_metric.h" @@ -37,6 +39,13 @@ namespace curvefs { namespace client { + +DECLARE_bool(memClusterToLocal); +DECLARE_bool(s3ToLocal); +DECLARE_uint32(bigIoSize); +DECLARE_uint32(bigIoRetryTimes); +DECLARE_uint32(bigIoRetryIntervalUs); + namespace common { DECLARE_bool(enableCto); } // namespace common @@ -128,9 +137,9 @@ void FsCacheManager::ReleaseFileCacheManager(uint64_t inodeId) { bool FsCacheManager::Set(DataCachePtr dataCache, std::list::iterator *outIter) { std::lock_guard lk(lruMtx_); - VLOG(3) << "lru current byte:" << lruByte_ - << ",lru max byte:" << readCacheMaxByte_ - << ", dataCache len:" << dataCache->GetLen(); + VLOG(3) << "lru current byte: " << lruByte_ + << ", lru max byte: " << readCacheMaxByte_ + << ", dataCache len: " << dataCache->GetLen(); if (readCacheMaxByte_ == 0) { return false; } @@ -437,7 +446,7 @@ int FileCacheManager::Read(uint64_t inodeId, uint64_t offset, uint64_t length, if (memCacheMissRequest.empty()) { return actualReadLen; } - + VLOG(6) << "memcache miss request size: " << memCacheMissRequest.size(); // 2. read from localcache and remote cluster std::shared_ptr inodeWrapper; @@ -477,15 +486,29 @@ int FileCacheManager::Read(uint64_t inodeId, uint64_t offset, uint64_t length, return actualReadLen; } -bool FileCacheManager::ReadKVRequestFromLocalCache(const std::string &name, - char *databuf, +bool FileCacheManager::ReadKVRequestFromLocalCache(const std::string& name, + char* databuf, uint64_t offset, uint64_t len) { uint64_t start = butil::cpuwide_time_us(); + if (!s3ClientAdaptor_->HasDiskCache()) { + return false; + } + if (!IsCachedInLocal(name) && len >= FLAGS_bigIoSize && + s3ClientAdaptor_->GetPrefetchBlocks()) { + int retry = 0; + do { + VLOG(6) << "wait for download object: " << name; + bthread_usleep(FLAGS_bigIoRetryIntervalUs); + if (++retry >= FLAGS_bigIoRetryTimes) { + LOG(WARNING) << "download object: " << name << " timeout"; + return false; + } + } while (!IsCachedInLocal(name)); + } - bool mayCached = s3ClientAdaptor_->HasDiskCache() && - s3ClientAdaptor_->GetDiskCacheManager()->IsCached(name); - if (!mayCached) { + if (!IsCachedInLocal(name)) { + VLOG(9) << "not cachd in disk, " << name; return false; } @@ -591,8 +614,34 @@ void FileCacheManager::ProcessKVRequest(const S3ReadRequest &req, char *dataBuf, const uint32_t objectPrefix = s3ClientAdaptor_->GetObjectPrefix(); GetBlockLoc(req.offset, &chunkIndex, &chunkPos, &blockIndex, &blockPos); + std::string prefetchName = curvefs::common::s3util::GenObjName( + req.chunkId, blockIndex, req.compaction, req.fsId, req.inodeId, + objectPrefix); + bool waitDownloading = false; + // if obj is in downloading, wait for it. + while (true) { + { + curve::common::LockGuard lg(downloadMtx_); + if (downloadingObj_.find(prefetchName) != downloadingObj_.end()) { + VLOG(9) << "wait for obj is in downloading: " << prefetchName + << ", size: " << downloadingObj_.size() << ", " + << downloadingObj_.size(); + waitDownloading = true; + } else { + VLOG(9) << "obj is not in downloading: " << prefetchName + << ", size: " << downloadingObj_.size() << ", " + << downloadingObj_.size(); + break; + } + } + if (waitDownloading) { + bthread_usleep(FLAGS_bigIoRetryIntervalUs); + } + } + // prefetch - if (s3ClientAdaptor_->HasDiskCache()) { + if (s3ClientAdaptor_->HasDiskCache() && !waitDownloading && + !IsCachedInLocal(prefetchName)) { PrefetchForBlock(req, fileLen, blockSize, chunkSize, blockIndex); } @@ -675,9 +724,11 @@ void FileCacheManager::PrefetchForBlock(const S3ReadRequest &req, uint64_t chunkSize, uint64_t startBlockIndex) { uint32_t prefetchBlocks = s3ClientAdaptor_->GetPrefetchBlocks(); + if (prefetchBlocks == 0) { + return; + } uint32_t objectPrefix = s3ClientAdaptor_->GetObjectPrefix(); std::vector> prefetchObjs; - uint64_t blockIndex = startBlockIndex; for (uint32_t i = 0; i < prefetchBlocks; i++) { std::string name = curvefs::common::s3util::GenObjName( @@ -695,18 +746,26 @@ void FileCacheManager::PrefetchForBlock(const S3ReadRequest &req, } } - PrefetchS3Objs(prefetchObjs); + // It is configurable whether to write to local cache or not + if (!kvClientManager_ && FLAGS_s3ToLocal) { + // get from s3 directly + PrefetchS3Objs(prefetchObjs, true); + } else if (FLAGS_memClusterToLocal) { + // get from memcached first, if failed, get from s3 + PrefetchS3Objs(prefetchObjs, false); + } } class AsyncPrefetchCallback { public: - AsyncPrefetchCallback(uint64_t inode, S3ClientAdaptorImpl *s3Client) - : inode_(inode), s3Client_(s3Client) {} + AsyncPrefetchCallback(uint64_t inode, S3ClientAdaptorImpl* s3Client, + bool fromS3) + : inode_(inode), s3Client_(s3Client), fromS3_(fromS3) {} void operator()(const S3Adapter *, const std::shared_ptr &context) { VLOG(9) << "prefetch end: " << context->key << ", len " << context->len - << "actual len: " << context->actualLen; + << "actual len: " << context->actualLen << ", " << fromS3_; std::unique_ptr guard(context->buf); auto fileCache = s3Client_->GetFsCacheManager()->FindFileCacheManager(inode_); @@ -718,8 +777,18 @@ class AsyncPrefetchCallback { return; } - if (context->retCode < 0) { - LOG(WARNING) << "prefetch failed, key: " << context->key; + if (context->retCode != 0 && !fromS3_) { + VLOG(6) << "failed and then get from s3, key: " << context->key; + std::vector> prefetchObjs; + prefetchObjs.push_back(std::make_pair(context->key, context->len)); + fileCache->PrefetchS3Objs(prefetchObjs); + curve::common::LockGuard lg(fileCache->downloadMtx_); + fileCache->downloadingObj_.erase(context->key); + return; + } else if (context->retCode != 0 && fromS3_) { + curve::common::LockGuard lg(fileCache->downloadMtx_); + fileCache->downloadingObj_.erase(context->key); + LOG_EVERY_SECOND(INFO) << "prefetch failed, key: " << context->key; return; } if (s3Client_->s3Metric_ != nullptr) { @@ -744,16 +813,19 @@ class AsyncPrefetchCallback { curve::common::LockGuard lg(fileCache->downloadMtx_); fileCache->downloadingObj_.erase(context->key); } + VLOG(9) << "prefetch success: " << context->key; } private: const uint64_t inode_; S3ClientAdaptorImpl *s3Client_; + bool fromS3_; }; void FileCacheManager::PrefetchS3Objs( - const std::vector> &prefetchObjs) { - for (auto &obj : prefetchObjs) { + const std::vector>& prefetchObjs, + bool fromS3) { + for (auto& obj : prefetchObjs) { std::string name = obj.first; uint64_t readLen = obj.second; curve::common::LockGuard lg(downloadMtx_); @@ -762,28 +834,32 @@ void FileCacheManager::PrefetchS3Objs( << ", size: " << downloadingObj_.size(); continue; } - if (s3ClientAdaptor_->GetDiskCacheManager()->IsCached(name)) { + if (IsCachedInLocal(name)) { VLOG(9) << "downloading is exist in cache: " << name << ", size: " << downloadingObj_.size(); continue; } VLOG(9) << "download start: " << name - << ", size: " << downloadingObj_.size(); + << ", size: " << downloadingObj_.size() + << ", from s3: " << fromS3; downloadingObj_.emplace(name); - auto inode = inode_; - auto s3ClientAdaptor = s3ClientAdaptor_; - auto task = [name, inode, s3ClientAdaptor, readLen]() { - char *dataCacheS3 = new char[readLen]; + char* dataCacheS3 = new char[readLen]; + VLOG(9) << "prefetch start: " << name << ", len: " << readLen; + if (fromS3) { auto context = std::make_shared( name, dataCacheS3, 0, readLen, - AsyncPrefetchCallback{inode, s3ClientAdaptor}, - curve::common::ContextType::S3); - VLOG(9) << "prefetch start: " << context->key - << ", len: " << context->len; - s3ClientAdaptor->GetS3Client()->DownloadAsync(context); - }; - s3ClientAdaptor_->PushAsyncTask(task); + AsyncPrefetchCallback{inode_, s3ClientAdaptor_, true}); + auto task = [this, context]() { + s3ClientAdaptor_->GetS3Client()->DownloadAsync(context); + }; + s3ClientAdaptor_->PushAsyncTask(task); + } else { + auto context = std::make_shared( + name, dataCacheS3, 0, readLen, + AsyncPrefetchCallback{inode_, s3ClientAdaptor_, false}); + kvClientManager_->Enqueue(context); + } } return; } @@ -1141,6 +1217,10 @@ CURVEFS_ERROR FileCacheManager::Flush(bool force, bool toS3) { return ret; } +bool FileCacheManager::IsCachedInLocal(const std::string name) { + return s3ClientAdaptor_->GetDiskCacheManager()->IsCached(name); +} + void ChunkCacheManager::ReadChunk(uint64_t index, uint64_t chunkPos, uint64_t readLen, char *dataBuf, uint64_t dataBufOffset, @@ -1298,7 +1378,7 @@ void ChunkCacheManager::ReadByReadCache(uint64_t chunkPos, uint64_t readLen, ReadLockGuard readLockGuard(rwLockRead_); VLOG(9) << "ReadByReadCache chunkPos:" << chunkPos << ",readLen:" << readLen - << ",dataBufOffset:" << dataBufOffset; + << ",dataBufOffset:" << dataBufOffset << "chunkIndex: " << index_; if (dataRCacheMap_.empty()) { VLOG(9) << "dataRCacheMap_ is empty"; ReadRequest request; diff --git a/curvefs/src/client/s3/client_s3_cache_manager.h b/curvefs/src/client/s3/client_s3_cache_manager.h index d27aa308c5..df3d665ac4 100644 --- a/curvefs/src/client/s3/client_s3_cache_manager.h +++ b/curvefs/src/client/s3/client_s3_cache_manager.h @@ -27,19 +27,19 @@ #include #include #include +#include #include #include #include #include -#include #include "curvefs/proto/metaserver.pb.h" #include "curvefs/src/client/filesystem/error.h" +#include "curvefs/src/client/inode_wrapper.h" +#include "curvefs/src/client/kvclient/kvclient_manager.h" #include "curvefs/src/client/s3/client_s3.h" #include "src/common/concurrent/concurrent.h" #include "src/common/concurrent/task_thread_pool.h" -#include "curvefs/src/client/kvclient/kvclient_manager.h" -#include "curvefs/src/client/inode_wrapper.h" using curve::common::ReadLockGuard; using curve::common::RWLock; @@ -370,12 +370,13 @@ class FileCacheManager { void WriteChunk(uint64_t index, uint64_t chunkPos, uint64_t writeLen, const char *dataBuf); void GenerateS3Request(ReadRequest request, - const S3ChunkInfoList &s3ChunkInfoList, - char *dataBuf, std::vector *requests, + const S3ChunkInfoList& s3ChunkInfoList, + char* dataBuf, std::vector* requests, uint64_t fsId, uint64_t inodeId); void PrefetchS3Objs( - const std::vector> &prefetchObjs); + const std::vector>& prefetchObjs, + bool fromS3 = true); void HandleReadRequest(const ReadRequest &request, const S3ChunkInfo &s3ChunkInfo, @@ -403,9 +404,11 @@ class FileCacheManager { // miss read from memory read/write cache, need read from // kv(localdisk/remote cache/s3) - int GenerateKVRequest(const std::shared_ptr &inodeWrapper, - const std::vector &readRequest, - char *dataBuf, std::vector *kvRequest); + int GenerateKVRequest(const std::shared_ptr& inodeWrapper, + const std::vector& readRequest, + char* dataBuf, std::vector* kvRequest); + + bool IsCachedInLocal(const std::string name); enum class ReadStatus { OK = 0, diff --git a/curvefs/src/client/s3/disk_cache_manager_impl.h b/curvefs/src/client/s3/disk_cache_manager_impl.h index 372844baef..f6e02deeb1 100644 --- a/curvefs/src/client/s3/disk_cache_manager_impl.h +++ b/curvefs/src/client/s3/disk_cache_manager_impl.h @@ -79,7 +79,7 @@ class DiskCacheManagerImpl { * @param[in] option config option * @return success: 0, fail : < 0 */ - int Init(const S3ClientAdaptorOption option); + virtual int Init(const S3ClientAdaptorOption option); /** * @brief Write obj * @param[in] name obj name @@ -87,13 +87,13 @@ class DiskCacheManagerImpl { * @param[in] length write length * @return success: write length, fail : < 0 */ - int Write(const std::string name, const char *buf, uint64_t length); + virtual int Write(const std::string name, const char* buf, uint64_t length); /** * @brief whether obj is cached in cached disk * @param[in] name obj name * @return cached: true, not cached : < 0 */ - bool IsCached(const std::string name); + virtual bool IsCached(const std::string name); /** * @brief read obj * @param[in] name obj name @@ -102,17 +102,17 @@ class DiskCacheManagerImpl { * @param[in] length read length * @return success: length, fail : < length */ - int Read(const std::string name, char *buf, uint64_t offset, - uint64_t length); + virtual int Read(const std::string name, char* buf, uint64_t offset, + uint64_t length); /** * @brief umount disk cache * @return success: 0, fail : < 0 */ - int UmountDiskCache(); + virtual int UmountDiskCache(); bool IsDiskCacheFull(); - int WriteReadDirect(const std::string fileName, const char *buf, - uint64_t length); + virtual int WriteReadDirect(const std::string fileName, const char* buf, + uint64_t length); void InitMetrics(std::string fsName, std::shared_ptr s3Metric); virtual int UploadWriteCacheByInode(const std::string &inode); diff --git a/curvefs/test/client/BUILD b/curvefs/test/client/BUILD index 82adff0717..65bfb51d50 100644 --- a/curvefs/test/client/BUILD +++ b/curvefs/test/client/BUILD @@ -93,6 +93,7 @@ cc_test( "file_cache_manager_test.cpp", "chunk_cache_manager_test.cpp", "data_cache_test.cpp", + "client_prefetch_test.cpp", "client_s3_adaptor_Integration.cpp", "client_memcache_test.cpp", ], @@ -144,3 +145,26 @@ cc_test( copts = CURVE_TEST_COPTS, visibility = ["//visibility:public"], ) + +cc_test( + name = "curvefs_client_prefetch_test", + srcs = glob([ + "main.cpp", + "client_prefetch_test.cpp", + "*.h", + ]), + copts = CURVE_TEST_COPTS, + defines = ["UNIT_TEST"], + visibility = ["//visibility:public"], + deps = [ + "//external:gtest", + "//curvefs/src/client:fuse_client_lib", + "//curvefs/proto:metaserver_cc_proto", + "//curvefs/proto:mds_cc_proto", + "//curvefs/proto:space_cc_proto", + "//curvefs/test/client/rpcclient:rpcclient_test_mock", + "//test/client/mock:client_mock_lib", + ], + linkopts = [ + "-L/usr/local/lib/x86_64-linux-gnu"], +) diff --git a/curvefs/test/client/client_prefetch_test.cpp b/curvefs/test/client/client_prefetch_test.cpp new file mode 100644 index 0000000000..79b232834b --- /dev/null +++ b/curvefs/test/client/client_prefetch_test.cpp @@ -0,0 +1,309 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: Wed Mar 23 2023 + * Author: wuhongsong + */ + +#include +#include + +#include "curvefs/src/client/common/common.h" +#include "curvefs/src/client/s3/client_s3_adaptor.h" +#include "curvefs/src/client/s3/client_s3_cache_manager.h" +#include "curvefs/src/client/s3/disk_cache_manager_impl.h" +#include "curvefs/test/client/mock_client_s3.h" +#include "curvefs/test/client/mock_client_s3_cache_manager.h" +#include "curvefs/test/client/mock_disk_cache_base.h" +#include "curvefs/test/client/mock_disk_cache_manager.h" +#include "curvefs/test/client/mock_disk_cache_read.h" +#include "curvefs/test/client/mock_disk_cache_write.h" +#include "curvefs/test/client/mock_inode_cache_manager.h" +#include "curvefs/test/client/mock_kvclient.h" +#include "curvefs/test/client/mock_test_posix_wapper.h" +#include "src/common/concurrent/task_thread_pool.h" + +namespace curvefs { +namespace client { +namespace common { +DECLARE_bool(enableCto); +DECLARE_bool(supportKVcache); +} // namespace common +} // namespace client +} // namespace curvefs + +namespace curvefs { +namespace client { +using curve::common::TaskThreadPool; +using ::testing::_; +using ::testing::DoAll; +using ::testing::Invoke; +using ::testing::Return; +using ::testing::SetArgPointee; +using ::testing::SetArgReferee; +using ::testing::WithArg; + +class FileCacheManagerDiskTest : public testing::Test { + protected: + FileCacheManagerDiskTest() {} + ~FileCacheManagerDiskTest() {} + void SetUp() override { + Aws::InitAPI(awsOptions_); + uint64_t inodeId = 1; + uint64_t fsId = 2; + S3ClientAdaptorOption option; + option.blockSize = 1 * 1024 * 1024; + option.chunkSize = 4 * 1024 * 1024; + option.baseSleepUs = 500; + option.objectPrefix = 0; + option.pageSize = 64 * 1024; + option.intervalSec = 5000; + option.flushIntervalSec = 5000; + option.readCacheMaxByte = 104857600; + option.writeCacheMaxByte = 10485760000; + option.readCacheThreads = 5; + option.diskCacheOpt.diskCacheType = (DiskCacheType)2; + option.chunkFlushThreads = 5; + option.s3ToLocal = true; + s3ClientAdaptor_ = new S3ClientAdaptorImpl(); + fsCacheManager_ = std::make_shared( + s3ClientAdaptor_, option.readCacheMaxByte, option.writeCacheMaxByte, + option.readCacheThreads, nullptr); + mockInodeManager_ = std::make_shared(); + mockS3Client_ = std::make_shared(); + std::shared_ptr client = std::make_shared(); + std::shared_ptr wrapper = + std::make_shared(); + std::shared_ptr diskCacheWrite = + std::make_shared(); + std::shared_ptr diskCacheRead = + std::make_shared(); + std::shared_ptr diskCacheManager = + std::make_shared(wrapper, diskCacheWrite, + diskCacheRead); + mockDiskcacheManagerImpl_ = + std::make_shared(); + mockKVClient_ = std::make_shared(); + KVClientManagerOpt config; + kvClientManager_ = std::make_shared(); + kvClientManager_->Init(config, mockKVClient_); + s3ClientAdaptor_->Init(option, mockS3Client_, mockInodeManager_, + nullptr, fsCacheManager_, + mockDiskcacheManagerImpl_, kvClientManager_); + s3ClientAdaptor_->SetFsId(fsId); + + threadPool_->Start(option.readCacheThreads); + fileCacheManager_ = std::make_shared( + fsId, inodeId, s3ClientAdaptor_, kvClientManager_, threadPool_); + mockChunkCacheManager_ = std::make_shared(); + curvefs::client::common::FLAGS_enableCto = false; + } + + void TearDown() override { + Aws::ShutdownAPI(awsOptions_); + delete s3ClientAdaptor_; + s3ClientAdaptor_ = nullptr; + } + + protected: + Aws::SDKOptions awsOptions_; + S3ClientAdaptorImpl* s3ClientAdaptor_; + std::shared_ptr fileCacheManager_; + std::shared_ptr fsCacheManager_; + std::shared_ptr mockChunkCacheManager_; + std::shared_ptr mockInodeManager_; + std::shared_ptr mockS3Client_; + + std::shared_ptr kvClientManager_; + std::shared_ptr mockKVClient_; + std::shared_ptr mockDiskcacheManagerImpl_; + + std::shared_ptr> threadPool_ = + std::make_shared>(); +}; + +TEST_F(FileCacheManagerDiskTest, test_read_local_prefetchfail) { + const uint64_t inodeId = 1; + const uint64_t offset = 0; + const uint64_t len = 1024; + + std::vector buf(len); + std::vector tmpBuf(len, 'a'); + ReadRequest req{.index = 0, .chunkPos = offset, .len = len, .bufOffset = 0}; + std::vector requests{req}; + EXPECT_CALL(*mockChunkCacheManager_, ReadByWriteCache(_, _, _, _, _)) + .WillOnce(DoAll(SetArgPointee<4>(requests), Return())); + EXPECT_CALL(*mockChunkCacheManager_, ReadByReadCache(_, _, _, _, _)) + .WillOnce(DoAll(SetArgPointee<4>(requests), Return())); + EXPECT_CALL(*mockChunkCacheManager_, AddReadDataCache(_)) + .WillOnce(Return()); + fileCacheManager_->SetChunkCacheManagerForTest(0, mockChunkCacheManager_); + Inode inode; + inode.set_length(len); + auto* s3ChunkInfoMap = inode.mutable_s3chunkinfomap(); + auto* s3ChunkInfoList = new S3ChunkInfoList(); + auto* s3ChunkInfo = s3ChunkInfoList->add_s3chunks(); + s3ChunkInfo->set_chunkid(25); + s3ChunkInfo->set_compaction(0); + s3ChunkInfo->set_offset(offset); + s3ChunkInfo->set_len(len); + s3ChunkInfo->set_size(len); + s3ChunkInfo->set_zero(false); + s3ChunkInfoMap->insert({0, *s3ChunkInfoList}); + fsCacheManager_->SetFileCacheManagerForTest(inodeId, fileCacheManager_); + auto inodeWrapper = std::make_shared(inode, nullptr); + EXPECT_CALL(*mockInodeManager_, GetInode(_, _)) + .WillOnce( + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); + EXPECT_CALL(*mockS3Client_, DownloadAsync(_)) + .WillRepeatedly( + Invoke([&](const std::shared_ptr& context) { + context->retCode = 0; + context->cb(nullptr, context); + })); + EXPECT_CALL(*mockDiskcacheManagerImpl_, IsCached(_)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(true)); + EXPECT_CALL(*mockKVClient_, Get(_, _, _, _, _, _, _)) + .WillOnce(Return(true)); + EXPECT_CALL(*mockDiskcacheManagerImpl_, Read(_, _, _, _)) + .WillOnce(Return(len)); + EXPECT_CALL(*mockDiskcacheManagerImpl_, WriteReadDirect(_, _, _)) + .WillOnce(Return(-1)); + ASSERT_EQ(len, fileCacheManager_->Read(inodeId, offset, len, buf.data())); + sleep(3); +} + +TEST_F(FileCacheManagerDiskTest, test_read_local_prefetchsuc) { + const uint64_t inodeId = 1; + const uint64_t offset = 0; + const uint64_t len = 1024; + + std::vector buf(len); + std::vector tmpBuf(len, 'a'); + + ReadRequest req{.index = 0, .chunkPos = offset, .len = len, .bufOffset = 0}; + std::vector requests{req}; + EXPECT_CALL(*mockChunkCacheManager_, ReadByWriteCache(_, _, _, _, _)) + .WillOnce(DoAll(SetArgPointee<4>(requests), Return())); + EXPECT_CALL(*mockChunkCacheManager_, ReadByReadCache(_, _, _, _, _)) + .WillOnce(DoAll(SetArgPointee<4>(requests), Return())); + EXPECT_CALL(*mockChunkCacheManager_, AddReadDataCache(_)) + .WillOnce(Return()); + fileCacheManager_->SetChunkCacheManagerForTest(0, mockChunkCacheManager_); + Inode inode; + inode.set_length(len); + auto* s3ChunkInfoMap = inode.mutable_s3chunkinfomap(); + auto* s3ChunkInfoList = new S3ChunkInfoList(); + auto* s3ChunkInfo = s3ChunkInfoList->add_s3chunks(); + s3ChunkInfo->set_chunkid(25); + s3ChunkInfo->set_compaction(0); + s3ChunkInfo->set_offset(offset); + s3ChunkInfo->set_len(len); + s3ChunkInfo->set_size(len); + s3ChunkInfo->set_zero(false); + s3ChunkInfoMap->insert({0, *s3ChunkInfoList}); + + fsCacheManager_->SetFileCacheManagerForTest(inodeId, fileCacheManager_); + auto inodeWrapper = std::make_shared(inode, nullptr); + EXPECT_CALL(*mockInodeManager_, GetInode(_, _)) + .WillOnce( + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); + EXPECT_CALL(*mockS3Client_, DownloadAsync(_)) + .WillRepeatedly( + Invoke([&](const std::shared_ptr& context) { + context->retCode = 0; + context->cb(nullptr, context); + })); + EXPECT_CALL(*mockDiskcacheManagerImpl_, IsCached(_)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(true)); + EXPECT_CALL(*mockDiskcacheManagerImpl_, Read(_, _, _, _)) + .WillOnce(Return(len)); + EXPECT_CALL(*mockDiskcacheManagerImpl_, WriteReadDirect(_, _, _)) + .WillOnce(Return(0)); + EXPECT_CALL(*mockKVClient_, Get(_, _, _, _, _, _, _)) + .WillOnce(Return(true)); + ASSERT_EQ(len, fileCacheManager_->Read(inodeId, offset, len, buf.data())); + sleep(3); +} + +TEST_F(FileCacheManagerDiskTest, test_read_remote) { + const uint64_t inodeId = 1; + const uint64_t offset = 0; + const uint64_t len = 1024; + + std::vector buf(len); + std::vector tmpBuf(len, 'a'); + + ReadRequest req{.index = 0, .chunkPos = offset, .len = len, .bufOffset = 0}; + std::vector requests{req}; + EXPECT_CALL(*mockChunkCacheManager_, ReadByWriteCache(_, _, _, _, _)) + .WillOnce(DoAll(SetArgPointee<4>(requests), Return())); + EXPECT_CALL(*mockChunkCacheManager_, ReadByReadCache(_, _, _, _, _)) + .WillOnce(DoAll(SetArgPointee<4>(requests), Return())); + EXPECT_CALL(*mockChunkCacheManager_, AddReadDataCache(_)) + .WillOnce(Return()); + fileCacheManager_->SetChunkCacheManagerForTest(0, mockChunkCacheManager_); + Inode inode; + inode.set_length(len); + auto* s3ChunkInfoMap = inode.mutable_s3chunkinfomap(); + auto* s3ChunkInfoList = new S3ChunkInfoList(); + auto* s3ChunkInfo = s3ChunkInfoList->add_s3chunks(); + s3ChunkInfo->set_chunkid(25); + s3ChunkInfo->set_compaction(0); + s3ChunkInfo->set_offset(offset); + s3ChunkInfo->set_len(len); + s3ChunkInfo->set_size(len); + s3ChunkInfo->set_zero(false); + s3ChunkInfoMap->insert({0, *s3ChunkInfoList}); + + fsCacheManager_->SetFileCacheManagerForTest(inodeId, fileCacheManager_); + auto inodeWrapper = std::make_shared(inode, nullptr); + EXPECT_CALL(*mockInodeManager_, GetInode(_, _)) + .WillOnce( + DoAll(SetArgReferee<1>(inodeWrapper), Return(CURVEFS_ERROR::OK))); + EXPECT_CALL(*mockS3Client_, DownloadAsync(_)) + .WillRepeatedly( + Invoke([&](const std::shared_ptr& context) { + context->retCode = 0; + context->cb(nullptr, context); + })); + EXPECT_CALL(*mockDiskcacheManagerImpl_, IsCached(_)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)) + .WillOnce(Return(false)); + + EXPECT_CALL(*mockDiskcacheManagerImpl_, WriteReadDirect(_, _, _)) + .WillOnce(Return(0)); + + EXPECT_CALL(*mockKVClient_, Get(_, _, _, _, _, _, _)) + .WillOnce(Return(true)) + .WillOnce(Return(true)); + + ASSERT_EQ(len, fileCacheManager_->Read(inodeId, offset, len, buf.data())); + sleep(3); +} + +} // namespace client +} // namespace curvefs diff --git a/curvefs/test/client/mock_disk_cache_manager.h b/curvefs/test/client/mock_disk_cache_manager.h index c78a7ff5a1..ef66ab55c0 100644 --- a/curvefs/test/client/mock_disk_cache_manager.h +++ b/curvefs/test/client/mock_disk_cache_manager.h @@ -81,9 +81,17 @@ class MockDiskCacheManagerImpl : public DiskCacheManagerImpl { } ~MockDiskCacheManagerImpl() {} - MOCK_METHOD1(UploadWriteCacheByInode, int(const std::string &inode)); - MOCK_METHOD1(ClearReadCache, int(const std::list &files)); + MOCK_METHOD1(Init, int(const S3ClientAdaptorOption option)); + MOCK_METHOD1(UploadWriteCacheByInode, int(const std::string& inode)); + MOCK_METHOD1(ClearReadCache, int(const std::list& files)); MOCK_METHOD1(IsCached, bool(const std::string)); + MOCK_METHOD0(UmountDiskCache, int()); + MOCK_METHOD3(WriteReadDirect, int(const std::string fileName, + const char* buf, uint64_t length)); + MOCK_METHOD3(Write, + int(const std::string name, const char* buf, uint64_t length)); + MOCK_METHOD4(Read, int(const std::string name, char* buf, uint64_t offset, + uint64_t length)); }; } // namespace client