diff --git a/conf/chunkserver.conf b/conf/chunkserver.conf index 0cfc27b544..d57d9561b3 100644 --- a/conf/chunkserver.conf +++ b/conf/chunkserver.conf @@ -194,7 +194,7 @@ rconcurrentapply.queuedepth=1 # 是否开启从chunkfilepool获取chunk,一般是true chunkfilepool.enable_get_chunk_from_pool=true # chunkfilepool目录 -chunkfilepool.chunk_file_pool_dir=./0/ # __CURVEADM_TEMPLATE__ ${prefix}/data __CURVEADM_TEMPLATE__ +chunkfilepool.chunk_file_pool_dir=./0/chunks # __CURVEADM_TEMPLATE__ ${prefix}/data __CURVEADM_TEMPLATE__ # chunkfilepool meta文件路径 chunkfilepool.meta_path=./chunkfilepool.meta # __CURVEADM_TEMPLATE__ ${prefix}/data/chunkfilepool.meta __CURVEADM_TEMPLATE__ # chunkfilepool meta文件大小 @@ -207,6 +207,14 @@ chunkfilepool.clean.enable=true chunkfilepool.clean.bytes_per_write=4096 # The throttle iops for cleaning chunk (4KB/IO) chunkfilepool.clean.throttle_iops=500 +# Whether allocate filePool by percent of disk size. +chunkfilepool.allocated_by_percent=true +# Preallocate storage percent of total disk +chunkfilepool.allocate_percent=10 +# Preallocate storage size of chunkfilepool (None/KB/MB/GB/TB) +chunkfilepool.chunk_file_pool_size=1GB +# The thread num for format chunks +chunkfilepool.thread_num=1 # # WAL file pool @@ -229,6 +237,14 @@ walfilepool.metapage_size=4096 walfilepool.meta_file_size=4096 # WAL filepool get chunk最大重试次数 walfilepool.retry_times=5 +# Whether allocate filePool by percent of disk size. +walfilepool.allocated_by_percent=true +# Preallocate storage percent of total disk +walfilepool.allocate_percent=90 +# Preallocate storage size size of walfilepool (None/KB/MB/GB/TB) +walfilepool.wal_file_pool_size=0 +# The thread num for format chunks +walfilepool.thread_num=1 # # trash settings diff --git a/conf/chunkserver.conf.example b/conf/chunkserver.conf.example index eb664c2fd6..2dfcb2aca9 100644 --- a/conf/chunkserver.conf.example +++ b/conf/chunkserver.conf.example @@ -186,7 +186,7 @@ rconcurrentapply.queuedepth=1 # 是否开启从chunkfilepool获取chunk,一般是true chunkfilepool.enable_get_chunk_from_pool=true # chunkfilepool目录 -chunkfilepool.chunk_file_pool_dir=./0/ +chunkfilepool.chunk_file_pool_dir=./0/chunks # chunkfilepool meta文件路径 #chunkfilepool.meta_path=./chunkfilepool.meta # chunkfilepool meta文件大小 @@ -199,6 +199,14 @@ chunkfilepool.clean.enable=true chunkfilepool.clean.bytes_per_write=4096 # The throttle iops for cleaning chunk (4KB/IO) chunkfilepool.clean.throttle_iops=500 +# Whether allocate filePool by percent of disk size. +chunkfilepool.allocated_by_percent=true +# Preallocate storage percent of total disk +chunkfilepool.allocate_percent=90 +# Preallocate storage size of chunkfilepool (None/KB/MB/GB/TB) +chunkfilepool.chunk_file_pool_size=1GB +# The thread num for format chunks +chunkfilepool.thread_num=1 # # WAL file pool @@ -221,6 +229,14 @@ walfilepool.metapage_size=4096 walfilepool.meta_file_size=4096 # WAL filepool get chunk最大重试次数 walfilepool.retry_times=5 +# Whether allocate filePool by percent of disk size. +walfilepool.allocated_by_percent=true +# Preallocate storage percent of total disk +walfilepool.allocate_percent=10 +# Preallocate storage size size of walfilepool (None/KB/MB/GB/TB) +walfilepool.wal_file_pool_size=0 +# The thread num for format chunks +walfilepool.thread_num=1 # # trash settings diff --git a/proto/heartbeat.proto b/proto/heartbeat.proto index d54723dfb8..6b51d40277 100644 --- a/proto/heartbeat.proto +++ b/proto/heartbeat.proto @@ -89,6 +89,8 @@ message ChunkServerStatisticInfo { required uint64 chunkSizeTrashedBytes = 7; // chunkfilepool的大小 optional uint64 chunkFilepoolSize = 8; + // percentage of chunkfilepool formatting + optional uint32 chunkFilepoolFormatPercent = 9; }; message ChunkServerHeartbeatRequest { diff --git a/proto/topology.proto b/proto/topology.proto index 2057cafe2a..6e88d4e102 100644 --- a/proto/topology.proto +++ b/proto/topology.proto @@ -501,6 +501,19 @@ message GetCopySetsInChunkServerRequest { optional uint32 port = 3; } +message ChunkFormatStatus { + required string ip = 1; + required uint32 port = 2; + required uint32 chunkServerID = 3; + required uint32 formatPercent = 4; +} + +message ListChunkFormatStatusRequest {} + +message ListChunkFormatStatusResponse { + repeated ChunkFormatStatus chunkFormatStatus = 1; +} + message GetCopySetsInChunkServerResponse { required sint32 statusCode = 1; repeated common.CopysetInfo copysetInfos = 2; @@ -595,4 +608,5 @@ service TopologyService { rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse); rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse); + rpc ListChunkFormatStatus(ListChunkFormatStatusRequest) returns (ListChunkFormatStatusResponse); } diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 07f1f48d5f..ded5550d4a 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -20,30 +20,31 @@ * Author: lixiaocui */ -#include +#include "src/chunkserver/chunkserver.h" -#include #include #include #include +#include +#include #include -#include "src/chunkserver/chunkserver.h" -#include "src/chunkserver/chunkserver_metrics.h" -#include "src/chunkserver/chunkserver_service.h" -#include "src/chunkserver/copyset_service.h" -#include "src/chunkserver/chunk_service.h" #include "src/chunkserver/braft_cli_service.h" #include "src/chunkserver/braft_cli_service2.h" +#include "src/chunkserver/chunk_service.h" #include "src/chunkserver/chunkserver_helper.h" -#include "src/common/concurrent/task_thread_pool.h" -#include "src/common/uri_parser.h" -#include "src/chunkserver/raftsnapshot/curve_snapshot_attachment.h" +#include "src/chunkserver/chunkserver_metrics.h" +#include "src/chunkserver/chunkserver_service.h" +#include "src/chunkserver/copyset_service.h" +#include "src/chunkserver/raftlog/curve_segment_log_storage.h" #include "src/chunkserver/raftsnapshot/curve_file_service.h" +#include "src/chunkserver/raftsnapshot/curve_snapshot_attachment.h" #include "src/chunkserver/raftsnapshot/curve_snapshot_storage.h" -#include "src/chunkserver/raftlog/curve_segment_log_storage.h" +#include "src/common/bytes_convert.h" +#include "src/common/concurrent/task_thread_pool.h" #include "src/common/curve_version.h" +#include "src/common/uri_parser.h" using ::curve::fs::LocalFileSystem; using ::curve::fs::LocalFileSystemOption; @@ -65,6 +66,10 @@ DEFINE_string(raftSnapshotUri, "curve://./0/copysets", "raft snapshot uri"); DEFINE_string(raftLogUri, "curve://./0/copysets", "raft log uri"); DEFINE_string(recycleUri, "local://./0/recycler" , "recycle uri"); DEFINE_string(chunkFilePoolDir, "./0/", "chunk file pool location"); +DEFINE_int32(chunkFilePoolAllocatedPercent, 90, + "format percent for chunkfillpool."); +DEFINE_uint32(chunkFormatThreadNum, 1, + "number of threads while file pool formatting"); DEFINE_string(chunkFilePoolMetaPath, "./chunkfilepool.meta", "chunk file pool meta path"); DEFINE_string(logPath, "./0/chunkserver.log-", "log file path"); @@ -479,8 +484,6 @@ void ChunkServer::Stop() { brpc::AskToQuit(); } - - void ChunkServer::InitChunkFilePoolOptions( common::Configuration *conf, FilePoolOptions *chunkFilePoolOptions) { LOG_IF(FATAL, !conf->GetUInt32Value("global.chunk_size", @@ -513,13 +516,56 @@ void ChunkServer::InitChunkFilePoolOptions( "chunkfilepool.meta_path", &metaUri)); ::memcpy( chunkFilePoolOptions->metaPath, metaUri.c_str(), metaUri.size()); + + std::string chunkFilePoolUri; + LOG_IF(FATAL, !conf->GetStringValue("chunkfilepool.chunk_file_pool_dir", + &chunkFilePoolUri)); + + ::memcpy(chunkFilePoolOptions->filePoolDir, chunkFilePoolUri.c_str(), + chunkFilePoolUri.size()); + std::string pool_size; + LOG_IF(FATAL, !conf->GetStringValue( + "chunkfilepool.chunk_file_pool_size", &pool_size)); + LOG_IF(FATAL, !curve::common::ToNumbericByte( + pool_size, &chunkFilePoolOptions->filePoolSize)); + LOG_IF(FATAL, + !conf->GetBoolValue("chunkfilepool.allocated_by_percent", + &chunkFilePoolOptions->allocatedByPercent)); + LOG_IF(FATAL, + !conf->GetUInt32Value("chunkfilepool.allocated_percent", + &chunkFilePoolOptions->allocatedPercent)); + LOG_IF(FATAL, !conf->GetUInt32Value( + "chunkfilepool.chunk_file_pool_format_thread_num", + &chunkFilePoolOptions->formatThreadNum)); LOG_IF(FATAL, !conf->GetBoolValue("chunkfilepool.clean.enable", &chunkFilePoolOptions->needClean)); - LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.clean.bytes_per_write", // NOLINT - &chunkFilePoolOptions->bytesPerWrite)); + LOG_IF(FATAL, + !conf->GetUInt32Value("chunkfilepool.clean.bytes_per_write", + &chunkFilePoolOptions->bytesPerWrite)); LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.clean.throttle_iops", &chunkFilePoolOptions->iops4clean)); + std::string copysetUri; + LOG_IF(FATAL, + !conf->GetStringValue("copyset.raft_snapshot_uri", ©setUri)); + curve::common::UriParser::ParseUri(copysetUri, + &chunkFilePoolOptions->copysetDir); + + std::string recycleUri; + LOG_IF(FATAL, + !conf->GetStringValue("copyset.recycler_uri", &recycleUri)); + curve::common::UriParser::ParseUri(recycleUri, + &chunkFilePoolOptions->recycleDir); + + bool useChunkFilePoolAsWalPool; + LOG_IF(FATAL, !conf->GetBoolValue("walfilepool.use_chunk_file_pool", + &useChunkFilePoolAsWalPool)); + + chunkFilePoolOptions->isAllocated = [=](const std::string& filename) { + return Trash::IsChunkOrSnapShotFile(filename) || + (useChunkFilePoolAsWalPool && Trash::IsWALFile(filename)); + }; + if (0 == chunkFilePoolOptions->bytesPerWrite || chunkFilePoolOptions->bytesPerWrite > 1 * 1024 * 1024 || 0 != chunkFilePoolOptions->bytesPerWrite % 4096) { @@ -565,6 +611,36 @@ void ChunkServer::InitWalFilePoolOptions( std::string metaUri; LOG_IF(FATAL, !conf->GetStringValue( "walfilepool.meta_path", &metaUri)); + + std::string pool_size; + LOG_IF(FATAL, !conf->GetStringValue("walfilepool.chunk_file_pool_size", + &pool_size)); + LOG_IF(FATAL, !curve::common::ToNumbericByte( + pool_size, &walPoolOptions->filePoolSize)); + LOG_IF(FATAL, !conf->GetUInt64Value("walfilepool.wal_file_pool_size", + &walPoolOptions->filePoolSize)); + LOG_IF(FATAL, !conf->GetBoolValue("walfilepool.allocated_by_percent", + &walPoolOptions->allocatedByPercent)); + LOG_IF(FATAL, !conf->GetUInt32Value("walfilepool.allocated_percent", + &walPoolOptions->allocatedPercent)); + LOG_IF(FATAL, !conf->GetUInt32Value("walfilepool.thread_num", + &walPoolOptions->formatThreadNum)); + + std::string copysetUri; + LOG_IF(FATAL, + !conf->GetStringValue("copyset.raft_log_uri", ©setUri)); + curve::common::UriParser::ParseUri(copysetUri, + &walPoolOptions->copysetDir); + + std::string recycleUri; + LOG_IF(FATAL, + !conf->GetStringValue("copyset.recycler_uri", &recycleUri)); + curve::common::UriParser::ParseUri(recycleUri, + &walPoolOptions->recycleDir); + + walPoolOptions->isAllocated = [](const string& filename) { + return Trash::IsWALFile(filename); + }; ::memcpy( walPoolOptions->metaPath, metaUri.c_str(), metaUri.size()); } @@ -833,6 +909,16 @@ void ChunkServer::LoadConfigFromCmdline(common::Configuration *conf) { << "chunkFilePoolDir must be set when run chunkserver in command."; } + if (GetCommandLineFlagInfo("chunkFilePoolAllocatedPercent", &info)) { + conf->SetUInt32Value("chunkfilepool.allocated_percent", + FLAGS_chunkFilePoolAllocatedPercent); + } + + if (GetCommandLineFlagInfo("chunkFormatThreadNum", &info)) { + conf->SetUInt64Value("chunkfilepool.chunk_file_pool_format_thread_num", + FLAGS_chunkFormatThreadNum); + } + if (GetCommandLineFlagInfo("chunkFilePoolMetaPath", &info) && !info.is_default) { conf->SetStringValue( diff --git a/src/chunkserver/datastore/file_pool.cpp b/src/chunkserver/datastore/file_pool.cpp index d3f20feb6a..0c6489ed80 100644 --- a/src/chunkserver/datastore/file_pool.cpp +++ b/src/chunkserver/datastore/file_pool.cpp @@ -22,8 +22,10 @@ #include "src/chunkserver/datastore/file_pool.h" +#include #include #include +#include #include #include #include @@ -33,18 +35,19 @@ #include #include #include -#include #include +#include -#include "src/common/string_util.h" -#include "src/common/throttle.h" +#include "absl/utility/utility.h" #include "src/common/configuration.h" #include "src/common/crc32.h" #include "src/common/curve_define.h" - -#include "absl/utility/utility.h" +#include "src/common/string_util.h" +#include "src/common/throttle.h" using curve::common::kFilePoolMagic; +DEFINE_int64(formatInterval, 100, "Sets a interval between formatting."); +DEFINE_validator(formatInterval, brpc::PositiveInteger); namespace curve { namespace chunkserver { @@ -68,7 +71,6 @@ std::ostream& operator<<(std::ostream& os, const FilePoolMeta& meta) { << ", hasblocksize: " << meta.hasBlockSize << ", blocksize: " << meta.blockSize << ", filepoolpath: " << meta.filePoolPath; - return os; } @@ -220,20 +222,31 @@ FilePool::FilePool(std::shared_ptr fsptr) memset(writeBuffer_.get(), 0, poolOpt_.bytesPerWrite); } +FilePool::~FilePool() { UnInitialize(); } + bool FilePool::Initialize(const FilePoolOptions &cfopt) { poolOpt_ = cfopt; if (poolOpt_.getFileFromPool) { + currentdir_ = poolOpt_.filePoolDir; + currentState_.chunkSize = poolOpt_.fileSize; + currentState_.metaPageSize = poolOpt_.metaPageSize; if (!CheckValid()) { - LOG(ERROR) << "check valid failed!"; + LOG(ERROR) << "Check vaild failed!"; return false; } - if (fsptr_->DirExists(currentdir_)) { - return ScanInternal(); - } else { - LOG(ERROR) << "chunkfile pool not exists, inited failed!" - << " chunkfile pool path = " << currentdir_.c_str(); + + if (!ScanInternal()) { + LOG(ERROR) << "Scan pool files failed!"; + return false; + } + + if (!PrepareFormat()) { + LOG(ERROR) << "Prepare format failed!"; return false; } + + formatAlived_.store(true); + formatThread_ = Thread(&FilePool::FormatWorker, this); } else { currentdir_ = poolOpt_.filePoolDir; if (!fsptr_->DirExists(currentdir_.c_str())) { @@ -245,6 +258,15 @@ bool FilePool::Initialize(const FilePoolOptions &cfopt) { bool FilePool::CheckValid() { FilePoolMeta meta; + if (!fsptr_->FileExists(poolOpt_.metaPath)) { + LOG(INFO) << "Metafile in path '" << poolOpt_.metaPath + << "'not found, it's the first initialized."; + currentState_.chunkSize = poolOpt_.fileSize; + currentState_.metaPageSize = poolOpt_.metaPageSize; + currentState_.blockSize = poolOpt_.blockSize; + currentdir_ = poolOpt_.filePoolDir; + return true; + } int ret = FilePoolHelper::DecodeMetaInfoFromMetaFile( fsptr_, poolOpt_.metaPath, poolOpt_.metaFileSize, &meta); if (ret == -1) { @@ -375,6 +397,129 @@ void FilePool::CleanWorker() { } } +bool FilePool::PrepareFormat() { + curve::fs::FileSystemInfo finfo; + int r = fsptr_->Statfs(currentdir_, &finfo); + if (r != 0) { + LOG(ERROR) << "get disk usage info failed!"; + return false; + } + + if (poolOpt_.allocatedByPercent) { + poolOpt_.filePoolSize = finfo.total * poolOpt_.allocatedPercent / 100; + } + + uint64_t bytesPerPage = poolOpt_.fileSize + poolOpt_.metaFileSize; + uint64_t needSpace = + poolOpt_.filePoolSize - currentState_.chunkNum * bytesPerPage; + uint64_t vaildSpace = 0; + + if (poolOpt_.filePoolSize / bytesPerPage < currentState_.chunkNum) { + LOG(INFO) << "It is no need to format chunks."; + formatStat_.preAllocateNum = 0; + formatStat_.allocateChunkNum = 0; + return true; + } + + vaildSpace = finfo.available; + LOG(INFO) << "free space = " << finfo.available + << ", total space = " << finfo.total + << ", need space = " << needSpace; + if (vaildSpace < needSpace) { + LOG(ERROR) << "disk free space not enough."; + return false; + } + + formatStat_.preAllocateNum = needSpace / bytesPerPage; + formatStat_.allocateChunkNum = 0; + LOG(INFO) << "preAllocateNum = " << formatStat_.preAllocateNum; + return true; +} + +bool FilePool::WaitoFormatDoneForTesting() { + std::unique_lock lk(mtx_); + if (formatStat_.allocateChunkNum.load() != formatStat_.preAllocateNum) { + cond_.wait(lk, [&]() { + return formatStat_.allocateChunkNum.load() == + formatStat_.preAllocateNum; + }); + } + lk.unlock(); + if (formatThread_.joinable()) { + formatThread_.join(); + } + return true; +} + +bool FilePool::StopFormatting() { + if (formatAlived_.exchange(false)) { + LOG(INFO) << "Stop formatting..."; + if (formatThread_.joinable()) { + formatThread_.join(); + } + LOG(INFO) << "Stop format thread ok."; + } + return true; +} +int FilePool::FormatTask(uint64_t indexOffset, + std::atomic* allocatIndex) { + LOG(INFO) << "format thread has been work!"; + while (!this->formatStat_.isWrong.load() && this->formatAlived_.load()) { + uint32_t chunkIndex = 0; + if ((chunkIndex = allocatIndex->fetch_add(1)) >= + formatStat_.preAllocateNum) { + allocatIndex->fetch_sub(1); + break; + } + std::string chunkPath = this->currentdir_ + "/" + + std::to_string(chunkIndex + indexOffset) + + kCleanChunkSuffix_; + this->formatSleeper_.wait_for( + std::chrono::milliseconds{FLAGS_formatInterval}); + int res = this->AllocateChunk(chunkPath); + if (res != 0) { + this->formatStat_.isWrong.store(true); + LOG(ERROR) << "Format ERROR!"; + break; + } + this->mtx_.lock(); + cleanChunks_.push_back(chunkIndex + indexOffset); + this->currentState_.cleanChunksLeft++; + this->currentState_.preallocatedChunksLeft++; + this->currentState_.chunkNum++; + this->formatStat_.allocateChunkNum++; + this->mtx_.unlock(); + this->cond_.notify_all(); + } + LOG(INFO) << "format thread has done!"; + return 0; +} + +int FilePool::FormatWorker() { + std::vector threads; + uint64_t offset = + this->currentmaxfilenum_.fetch_add(formatStat_.preAllocateNum); + std::atomic allocatIndex{0}; + + for (uint32_t i = 0; i < poolOpt_.formatThreadNum; i++) { + threads.emplace_back( + Thread(&FilePool::FormatTask, this, offset, &allocatIndex)); + } + + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + LOG(INFO) << "format worker done"; + + if (formatStat_.isWrong.load()) { + LOG(ERROR) << "Chunk format failed!"; + return -1; + } + return 0; +} + bool FilePool::StartCleaning() { if (poolOpt_.needClean && !cleanAlived_.exchange(true)) { ReadWriteThrottleParams params; @@ -400,9 +545,8 @@ bool FilePool::StopCleaning() { } bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { - auto pop = [&](std::vector *chunks, uint64_t *chunksLeft, + auto pop = [&](std::vector* chunks, uint64_t* chunksLeft, bool isCleanChunks) -> bool { - std::unique_lock lk(mtx_); if (chunks->empty()) { return false; } @@ -415,16 +559,28 @@ bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { return true; }; - if (!needClean) { - return pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false) || - pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true); - } + auto wake_up = [&]() { + return (formatStat_.allocateChunkNum.load() == + formatStat_.preAllocateNum) // NOLINT + || !(dirtyChunks_.empty() && cleanChunks_.empty()); + }; - // Need clean chunk - *isCleaned = false; - bool ret = pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true) || - pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false); + bool ret = true; + { + std::unique_lock lk(mtx_); + if (formatStat_.allocateChunkNum.load() != formatStat_.preAllocateNum) { + cond_.wait(lk, wake_up); + } + if (!needClean) { + return pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false) || + pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true); + } + // Need clean chunk + *isCleaned = false; + ret = pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true) || + pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false); + } if (true == ret && false == *isCleaned && CleanChunk(*chunkid, true)) { *isCleaned = true; } @@ -638,7 +794,7 @@ int FilePool::RecycleFile(const std::string &chunkpath) { void FilePool::UnInitialize() { currentdir_ = ""; - + StopFormatting(); std::unique_lock lk(mtx_); dirtyChunks_.clear(); cleanChunks_.clear(); @@ -647,8 +803,17 @@ void FilePool::UnInitialize() { bool FilePool::ScanInternal() { uint64_t maxnum = 0; std::vector tmpvec; + int ret = 0; LOG(INFO) << "scan dir" << currentdir_; - int ret = fsptr_->List(currentdir_.c_str(), &tmpvec); + if (!fsptr_->DirExists(currentdir_.c_str())) { + ret = fsptr_->Mkdir(currentdir_.c_str()); + if (ret != 0) { + LOG(ERROR) << "Mkdir [" << currentdir_ << "]" + << " failed!"; + return false; + } + } + ret = fsptr_->List(currentdir_.c_str(), &tmpvec); if (ret < 0) { LOG(ERROR) << "list file pool dir failed!"; return false; @@ -709,6 +874,10 @@ bool FilePool::ScanInternal() { } } + currentState_.chunkNum = tmpvec.size(); + currentState_.chunkNum += CountAllocatedNum(poolOpt_.copysetDir); + currentState_.chunkNum += CountAllocatedNum(poolOpt_.recycleDir); + std::unique_lock lk(mtx_); currentmaxfilenum_.store(maxnum + 1); currentState_.dirtyChunksLeft = dirtyChunks_.size(); @@ -721,6 +890,30 @@ bool FilePool::ScanInternal() { return true; } +uint64_t FilePool::CountAllocatedNum(const std::string& path) { + std::vector files; + if ("" == path || 0 != fsptr_->List(path, &files)) { + LOG(ERROR) << "FilePool failed to list files in " << path; + return 0; + } + + // Traverse subdirectories + uint32_t chunkNum = 0; + for (auto& file : files) { + std::string filePath = path + "/" + file; + bool isDir = fsptr_->DirExists(filePath); + if (!isDir) { + LOG(INFO) << "path = " << filePath; + if (poolOpt_.isAllocated(file)) { + ++chunkNum; + } + } else { + chunkNum += CountAllocatedNum(filePath); + } + } + return chunkNum; +} + size_t FilePool::Size() { std::unique_lock lk(mtx_); return currentState_.preallocatedChunksLeft; @@ -730,6 +923,10 @@ FilePoolState FilePool::GetState() const { return currentState_; } +const ChunkFormatStat& FilePool::GetChunkFormatStat() const { + return formatStat_; +} + uint32_t FilePoolMeta::Crc32() const { const size_t size = sizeof(kFilePoolMagic) + sizeof(chunkSize) + sizeof(metaPageSize) + filePoolPath.size() + diff --git a/src/chunkserver/datastore/file_pool.h b/src/chunkserver/datastore/file_pool.h index f3417e160a..691f0235ad 100644 --- a/src/chunkserver/datastore/file_pool.h +++ b/src/chunkserver/datastore/file_pool.h @@ -25,19 +25,20 @@ #include -#include +#include +#include +#include +#include #include // NOLINT -#include +#include #include -#include -#include -#include +#include +#include "include/curve_compiler_specific.h" #include "src/common/concurrent/concurrent.h" #include "src/common/interruptible_sleeper.h" #include "src/common/throttle.h" #include "src/fs/local_filesystem.h" -#include "include/curve_compiler_specific.h" using curve::fs::LocalFileSystem; using curve::common::Thread; @@ -65,6 +66,17 @@ struct FilePoolOptions { // retry times for get file uint16_t retryTimes; + bool allocatedByPercent; + uint32_t allocatedPercent; + uint32_t preAllocateNum; + uint64_t filePoolSize; + uint32_t formatThreadNum; + + std::string copysetDir; + std::string recycleDir; + + std::function isAllocated; + FilePoolOptions() { getFileFromPool = true; needClean = false; @@ -75,6 +87,11 @@ struct FilePoolOptions { metaPageSize = 0; retryTimes = 5; blockSize = 0; + allocatedByPercent = true; + allocatedPercent = 0; + preAllocateNum = 0; + filePoolSize = 0; + formatThreadNum = 1; ::memset(metaPath, 0, 256); ::memset(filePoolDir, 0, 256); } @@ -87,7 +104,8 @@ struct FilePoolState { uint64_t cleanChunksLeft = 0; // How many pre-allocated chunks are not used by the datastore uint64_t preallocatedChunksLeft = 0; - + // Total num of chunks in the datastore + uint64_t chunkNum; // chunksize uint32_t chunkSize = 0; // metapage size @@ -126,6 +144,12 @@ struct FilePoolMeta { uint32_t Crc32() const; }; +struct ChunkFormatStat { + std::atomic isWrong{false}; + std::atomic allocateChunkNum{0}; + uint32_t preAllocateNum = 0; +}; + class FilePoolHelper { public: static const char* kFileSize; @@ -168,7 +192,7 @@ class FilePoolHelper { class CURVE_CACHELINE_ALIGNMENT FilePool { public: explicit FilePool(std::shared_ptr fsptr); - virtual ~FilePool() = default; + virtual ~FilePool(); /** * Initialization function @@ -237,12 +261,30 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { */ bool StopCleaning(); + /** + * @brief: Wait for format thread done. + * @return: Return true if success. + */ + bool WaitoFormatDoneForTesting(); + + /** + * @brief: Get the format status of FilePool + * @return: Return the format status. + */ + virtual const ChunkFormatStat& GetChunkFormatStat() const; + private: // Traverse the pre-allocated chunk information from the // chunkfile pool directory bool ScanInternal(); - // Check whether the chunkfile pool pre-allocation is legal + // Prepare for format. + bool PrepareFormat(); + // bool CheckValid(); + // Count the num of files that has been allocated. + uint64_t CountAllocatedNum(const std::string& path); + // Check whether pool file is legal. + bool CheckPoolFile(const std::string& file); /** * Perform metapage assignment for the new chunkfile * @param: sourcepath is the file path to be written @@ -282,6 +324,19 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { */ bool CleaningChunk(); + int FormatTask(uint64_t indexOffset, std::atomic* allocatIndex); + + /** + * @brief: The function of thread for formatting chunk + */ + int FormatWorker(); + + /** + * @brief: Stop thread for formatting chunk + * @return: Return true if success, otherwise return false + */ + bool StopFormatting(); + /** * @brief: The function of thread for cleaning chunk */ @@ -300,6 +355,9 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { // Protect dirtyChunks_, cleanChunks_ std::mutex mtx_; + // Wait for GetChunk + std::condition_variable cond_; + // Current FilePool pre-allocated files, folder path std::string currentdir_; @@ -331,9 +389,21 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { // The throttle iops for cleaning chunk (4KB/IO) Throttle cleanThrottle_; + // Whether the format thread is alive + Atomic formatAlived_{true}; + + // Thread for format chunks. + Thread formatThread_; + + // Stat for format chunks. + ChunkFormatStat formatStat_; + // Sleeper for cleaning chunk thread InterruptibleSleeper cleanSleeper_; + // Sleeper for formatting chunk thread + InterruptibleSleeper formatSleeper_; + // The buffer for write chunk file std::unique_ptr writeBuffer_; }; diff --git a/src/chunkserver/heartbeat.cpp b/src/chunkserver/heartbeat.cpp index 0e756b29c6..b81fe6bdb3 100644 --- a/src/chunkserver/heartbeat.cpp +++ b/src/chunkserver/heartbeat.cpp @@ -268,10 +268,21 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) { * walSegmentFileSize; uint64_t chunkPoolSize = options_.chunkFilePool->Size() * options_.chunkFilePool->GetFilePoolOpt().fileSize; + + // compute format progress rate. + const ChunkFormatStat& formatStat = + options_.chunkFilePool->GetChunkFormatStat(); // NOLINT + stats->set_chunkfilepoolsize(chunkPoolSize); stats->set_chunksizeusedbytes(usedChunkSize+usedWalSegmentSize); stats->set_chunksizeleftbytes(leftChunkSize+leftWalSegmentSize); stats->set_chunksizetrashedbytes(trashedChunkSize); + if (formatStat.preAllocateNum != 0) { + stats->set_chunkfilepoolformatpercent( + 100 * formatStat.allocateChunkNum / formatStat.preAllocateNum); + } else { + stats->set_chunkfilepoolformatpercent(100); + } req->set_allocated_stats(stats); size_t cap, avail; @@ -391,7 +402,8 @@ int Heartbeat::SendHeartbeat(const HeartbeatRequest& request, cntl.ErrorCode() == brpc::ELOGOFF || cntl.ErrorCode() == brpc::ERPCTIMEDOUT) { LOG(WARNING) << "current mds: " << mdsEps_[inServiceIndex_] - << " is shutdown or going to quit"; + << " is shutdown or going to quit," + << cntl.ErrorText(); inServiceIndex_ = (inServiceIndex_ + 1) % mdsEps_.size(); LOG(INFO) << "next heartbeat switch to " << mdsEps_[inServiceIndex_]; diff --git a/src/chunkserver/trash.h b/src/chunkserver/trash.h index ff037db8a4..a3a3c89d53 100644 --- a/src/chunkserver/trash.h +++ b/src/chunkserver/trash.h @@ -73,6 +73,25 @@ class Trash { */ uint32_t GetChunkNum() {return chunkNum_.load();} + /** + * @brief is WAL or not ? + * + * @param fileName file name + * + * @retval true yes + * @retval false no + */ + static bool IsWALFile(const std::string& fileName); + + /* + * @brief IsChunkOrSnapShotFile 是否为chunk或snapshot文件 + * + * @param[in] chunkName 文件名 + * + * @return true-符合chunk或snapshot文件命名规则 + */ + static bool IsChunkOrSnapShotFile(const std::string& chunkName); + private: /* * @brief DeleteEligibleFileInTrashInterval 每隔一段时间进行trash物理空间回收 @@ -98,15 +117,6 @@ class Trash { */ bool IsCopysetInTrash(const std::string &dirName); - /* - * @brief IsChunkOrSnapShotFile 是否为chunk或snapshot文件 - * - * @param[in] chunkName 文件名 - * - * @return true-符合chunk或snapshot文件命名规则 - */ - bool IsChunkOrSnapShotFile(const std::string &chunkName); - /* * @brief Recycle Chunkfile and wal file in Copyset * @@ -134,19 +144,7 @@ class Trash { * @retval true success * @retval false failure */ - bool RecycleWAL( - const std::string &filepath, const std::string &filename); - - - /** - * @brief is WAL or not ? - * - * @param fileName file name - * - * @retval true yes - * @retval false no - */ - bool IsWALFile(const std::string &fileName); + bool RecycleWAL(const std::string& filepath, const std::string& filename); /* * @brief 统计copyset目录中的chunk个数 diff --git a/src/common/bytes_convert.h b/src/common/bytes_convert.h new file mode 100644 index 0000000000..3093567d90 --- /dev/null +++ b/src/common/bytes_convert.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: 2023-10-06 + * Author: yyyyufeng + */ + +#ifndef SRC_COMMON_BYTES_CONVERT_H_ +#define SRC_COMMON_BYTES_CONVERT_H_ + +#include +#include + +namespace curve { +namespace common { + +constexpr uint64_t kKiB = 1024ULL; +constexpr uint64_t kMiB = 1024ULL * kKiB; +constexpr uint64_t kGiB = 1024ULL * kMiB; +constexpr uint64_t kTiB = 1024ULL * kGiB; + +/** + * @brief convert string to bytes + * @return true if success + */ +inline bool ToNumbericByte(const std::string& source, uint64_t* target) { + int len = source.size(); + if (source[len - 1] >= '0' && source[len - 1] <= '9') { + *target = std::stoul(source); + return true; + } + if (len < 3 || (source[len - 1] != 'b' && source[len - 1] != 'B')) { + return false; + } + *target = 0; + for (int i = 0; i < len - 2; i++) { + char ch = source[i]; + if (ch < '0' || ch > '9') { + return false; + } + *target *= 10; + *target += (ch - '0'); + } + bool ret = true; + switch (source[len - 2]) { + case 'k': + case 'K': + *target *= kKiB; + break; + case 'M': + case 'm': + *target *= kMiB; + break; + case 'G': + case 'g': + *target *= kGiB; + break; + case 'T': + case 't': + *target *= kTiB; + break; + default: + ret = false; + break; + } + return ret; +} + +} // namespace common +} // namespace curve +#endif // SRC_COMMON_BYTES_CONVERT_H_ diff --git a/src/mds/heartbeat/heartbeat_manager.cpp b/src/mds/heartbeat/heartbeat_manager.cpp index b5100eaa73..188ee03112 100644 --- a/src/mds/heartbeat/heartbeat_manager.cpp +++ b/src/mds/heartbeat/heartbeat_manager.cpp @@ -137,6 +137,11 @@ void HeartbeatManager::UpdateChunkServerStatistics( stat.chunkFilepoolSize = request.stats().chunkfilepoolsize(); } + if (request.stats().has_chunkfilepoolformatpercent()) { + stat.chunkFilepoolFormatPercent = + request.stats().chunkfilepoolformatpercent(); // NOLINT + } + for (int i = 0; i < request.copysetinfos_size(); i++) { CopysetStat cstat; cstat.logicalPoolId = request.copysetinfos(i).logicalpoolid(); diff --git a/src/mds/topology/topology_service.cpp b/src/mds/topology/topology_service.cpp index 2671b5d8d5..95ad76fcbc 100644 --- a/src/mds/topology/topology_service.cpp +++ b/src/mds/topology/topology_service.cpp @@ -1154,6 +1154,19 @@ void TopologyServiceImpl::ListUnAvailCopySets( } } +void TopologyServiceImpl::ListChunkFormatStatus( + google::protobuf::RpcController* cntl_base, + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response, google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = static_cast(cntl_base); + + LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from " + << cntl->remote_side() << " to " << cntl->local_side() + << ". [ListChunkFormatStatus] " << request->DebugString(); + topology_->ListChunkFormatStatus(request, response); +} + } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service.h b/src/mds/topology/topology_service.h index a3eeaed257..ff56fb6dbd 100644 --- a/src/mds/topology/topology_service.h +++ b/src/mds/topology/topology_service.h @@ -232,6 +232,11 @@ class TopologyServiceImpl : public TopologyService { const ListUnAvailCopySetsRequest* request, ListUnAvailCopySetsResponse* response, google::protobuf::Closure* done); + virtual void ListChunkFormatStatus( + google::protobuf::RpcController* cntl_base, + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response, + google::protobuf::Closure* done); private: std::shared_ptr topology_; diff --git a/src/mds/topology/topology_service_manager.cpp b/src/mds/topology/topology_service_manager.cpp index ca0e87e4a8..ce0b6822a4 100644 --- a/src/mds/topology/topology_service_manager.cpp +++ b/src/mds/topology/topology_service_manager.cpp @@ -1795,6 +1795,25 @@ void TopologyServiceManager::ConvertCopyset(const CopySetInfo& in, out->set_lastscanconsistent(in.GetLastScanConsistent()); } +void TopologyServiceManager::ListChunkFormatStatus( + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response) { + std::vector chunkserverlist = + topology_->GetChunkServerInCluster(); + for (auto& id : chunkserverlist) { + ChunkServer cs; + ChunkServerStat stat; + if (topology_->GetChunkServer(id, &cs) && + topoStat_->GetChunkServerStat(id, &stat)) { + ChunkFormatStatus* status = response->add_chunkformatstatus(); + status->set_chunkserverid(id); + status->set_ip(cs.GetHostIp()); + status->set_port(cs.GetPort()); + status->set_formatpercent(stat.chunkFilepoolFormatPercent); + } + } +} + } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service_manager.h b/src/mds/topology/topology_service_manager.h index 2de2ab5c1d..bcbfc98f20 100644 --- a/src/mds/topology/topology_service_manager.h +++ b/src/mds/topology/topology_service_manager.h @@ -198,6 +198,10 @@ class TopologyServiceManager { ChunkServerIdType id, const std::vector ©setInfos); + virtual void ListChunkFormatStatus( + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response); + private: /** * @brief create copyset for logical pool diff --git a/src/mds/topology/topology_stat.h b/src/mds/topology/topology_stat.h index 1dfba17e00..f2d2b89943 100644 --- a/src/mds/topology/topology_stat.h +++ b/src/mds/topology/topology_stat.h @@ -85,17 +85,20 @@ struct ChunkServerStat { uint64_t chunkSizeTrashedBytes; // Size of chunkfilepool uint64_t chunkFilepoolSize; + // Rate of chunkfilepool format + uint32_t chunkFilepoolFormatPercent; // Copyset statistic std::vector copysetStats; - ChunkServerStat() : - leaderCount(0), - copysetCount(0), - readRate(0), - writeRate(0), - readIOPS(0), - writeIOPS(0) {} + ChunkServerStat() + : leaderCount(0), + copysetCount(0), + readRate(0), + writeRate(0), + readIOPS(0), + writeIOPS(0), + chunkFilepoolFormatPercent(0) {} }; /** diff --git a/src/tools/curve_tool_define.h b/src/tools/curve_tool_define.h index a392b807bd..8800bf847c 100644 --- a/src/tools/curve_tool_define.h +++ b/src/tools/curve_tool_define.h @@ -57,6 +57,7 @@ const char kClientListCmd[] = "client-list"; const char kSnapshotCloneStatusCmd[] = "snapshot-clone-status"; const char kClusterStatusCmd[] = "cluster-status"; const char kScanStatusCmd[] = "scan-status"; +const char kFormatStatusCmd[] = "format-status"; // NameSpaceTool相关命令 const char kGetCmd[] = "get"; diff --git a/src/tools/mds_client.cpp b/src/tools/mds_client.cpp index 7a119c77bc..4db8bb81f0 100644 --- a/src/tools/mds_client.cpp +++ b/src/tools/mds_client.cpp @@ -1176,5 +1176,24 @@ int MDSClient::ListPoolset(std::vector* poolsets) { return -1; } +int MDSClient::ListChunkFormatStatus( + std::vector* formatStatuses) { + assert(formatStatuses != nullptr); + curve::mds::topology::ListChunkFormatStatusRequest request; + curve::mds::topology::ListChunkFormatStatusResponse response; + curve::mds::topology::TopologyService_Stub stub(&channel_); + + auto fp = &curve::mds::topology::TopologyService_Stub:: + ListChunkFormatStatus; // NOLINT + if (0 != SendRpcToMds(&request, &response, &stub, fp)) { + std::cout << "ListChunkFormatStatus fail" << std::endl; + return -1; + } + for (auto stat : response.chunkformatstatus()) { + formatStatuses->push_back(stat); + } + return 0; +} + } // namespace tool } // namespace curve diff --git a/src/tools/mds_client.h b/src/tools/mds_client.h index 05bac69cd5..fbbc94ffab 100644 --- a/src/tools/mds_client.h +++ b/src/tools/mds_client.h @@ -47,28 +47,31 @@ #include "src/tools/common.h" #include "src/tools/curve_tool_define.h" +using curve::common::ChunkServerLocation; +using curve::common::CopysetInfo; using curve::mds::FileInfo; +using curve::mds::PageFileChunkInfo; using curve::mds::PageFileSegment; using curve::mds::StatusCode; -using curve::mds::PageFileChunkInfo; -using curve::mds::topology::kTopoErrCodeSuccess; +using curve::mds::topology::ChunkFormatStatus; +using curve::mds::topology::ChunkServerIdType; using curve::mds::topology::ChunkServerInfo; -using curve::common::ChunkServerLocation; +using curve::mds::topology::ChunkServerStatus; +using curve::mds::topology::CopySetIdType; using curve::mds::topology::CopySetServerInfo; -using curve::mds::topology::ServerInfo; -using curve::mds::topology::ZoneInfo; -using curve::mds::topology::PhysicalPoolInfo; +using curve::mds::topology::GetChunkServerInfoRequest; +using curve::mds::topology::GetCopySetsInChunkServerRequest; +using curve::mds::topology::kTopoErrCodeSuccess; +using curve::mds::topology::ListChunkFormatStatusRequest; +using curve::mds::topology::ListChunkServerRequest; using curve::mds::topology::LogicalPoolInfo; -using curve::common::CopysetInfo; +using curve::mds::topology::PhysicalPoolInfo; +using curve::mds::topology::PoolIdType; using curve::mds::topology::ServerIdType; +using curve::mds::topology::ServerInfo; using curve::mds::topology::ZoneIdType; -using curve::mds::topology::PoolIdType; -using curve::mds::topology::CopySetIdType; -using curve::mds::topology::ChunkServerIdType; -using curve::mds::topology::ChunkServerStatus; -using curve::mds::topology::ListChunkServerRequest; -using curve::mds::topology::GetChunkServerInfoRequest; -using curve::mds::topology::GetCopySetsInChunkServerRequest; +using curve::mds::topology::ZoneInfo; + using curve::mds::schedule::RapidLeaderScheduleRequst; using curve::mds::schedule::RapidLeaderScheduleResponse; using curve::common::Authenticator; @@ -480,6 +483,8 @@ class MDSClient { int ListPoolset(std::vector* poolsets); + int ListChunkFormatStatus(std::vector* formatStatuses); + private: /** * @brief 切换mds diff --git a/src/tools/status_tool.cpp b/src/tools/status_tool.cpp index bb9e8f97b9..4444f51fd2 100644 --- a/src/tools/status_tool.cpp +++ b/src/tools/status_tool.cpp @@ -114,18 +114,14 @@ bool StatusTool::CommandNeedSnapshotClone(const std::string& command) { } bool StatusTool::SupportCommand(const std::string& command) { - return (command == kSpaceCmd || command == kStatusCmd - || command == kChunkserverListCmd - || command == kChunkserverStatusCmd - || command == kMdsStatusCmd - || command == kEtcdStatusCmd - || command == kClientStatusCmd - || command == kClientListCmd - || command == kSnapshotCloneStatusCmd - || command == kClusterStatusCmd - || command == kServerListCmd - || command == kLogicalPoolList - || command == kScanStatusCmd); + return (command == kSpaceCmd || command == kStatusCmd || + command == kChunkserverListCmd || + command == kChunkserverStatusCmd || command == kMdsStatusCmd || + command == kEtcdStatusCmd || command == kClientStatusCmd || + command == kClientListCmd || command == kSnapshotCloneStatusCmd || + command == kClusterStatusCmd || command == kServerListCmd || + command == kLogicalPoolList || command == kScanStatusCmd || + command == kFormatStatusCmd); } void StatusTool::PrintHelp(const std::string& cmd) { @@ -225,6 +221,21 @@ int StatusTool::SpaceCmd() { return 0; } +int StatusTool::FormatStatusCmd() { + std::vector formatStatus; + int res = mdsClient_->ListChunkFormatStatus(&formatStatus); + if (res != 0) { + std::cout << "ListChunkserversInCluster fail!" << std::endl; + return -1; + } + for (auto stat : formatStatus) { + std::cout << "ip:" << stat.ip() << " port:" << stat.port() + << " id:" << stat.chunkserverid() + << " format percent:" << stat.formatpercent() << std::endl; + } + return 0; +} + int StatusTool::ChunkServerListCmd() { std::vector chunkservers; int res = mdsClient_->ListChunkServersInCluster(&chunkservers); @@ -1138,6 +1149,8 @@ int StatusTool::RunCommand(const std::string &cmd) { return ClientListCmd(); } else if (cmd == kScanStatusCmd) { return ScanStatusCmd(); + } else if (cmd == kFormatStatusCmd) { + return FormatStatusCmd(); } else { std::cout << "Command not supported!" << std::endl; return -1; diff --git a/src/tools/status_tool.h b/src/tools/status_tool.h index 2b54d70943..82b776fa73 100644 --- a/src/tools/status_tool.h +++ b/src/tools/status_tool.h @@ -150,6 +150,7 @@ class StatusTool : public CurveTool { int PrintClientStatus(); int ClientListCmd(); int ScanStatusCmd(); + int FormatStatusCmd(); void PrintCsLeftSizeStatistics( const std::string &name, const std::map> &poolLeftSize); diff --git a/test/chunkserver/datastore/filepool_mock_unittest.cpp b/test/chunkserver/datastore/filepool_mock_unittest.cpp index f9fc0502e1..5a70d46551 100644 --- a/test/chunkserver/datastore/filepool_mock_unittest.cpp +++ b/test/chunkserver/datastore/filepool_mock_unittest.cpp @@ -102,6 +102,7 @@ class CSChunkfilePoolMockTest : public testing::Test { } void FakeMetaFile() { + EXPECT_CALL(*lfs_, FileExists(poolMetaPath)).WillOnce(Return(true)); EXPECT_CALL(*lfs_, Open(poolMetaPath, _)) .WillOnce(Return(100)); EXPECT_CALL(*lfs_, Read(100, NotNull(), 0, metaFileSize)) @@ -391,6 +392,7 @@ TEST_F(CSChunkfilePoolMockTest, InitializeTest) { char buf[metaFileSize] = {0}; EXPECT_CALL(*lfs_, Open(poolMetaPath, _)) .WillOnce(Return(1)); + EXPECT_CALL(*lfs_, FileExists(poolMetaPath)).WillOnce(Return(true)); EXPECT_CALL(*lfs_, Read(1, NotNull(), 0, metaFileSize)) .WillOnce(DoAll(SetArrayArgument<1>(buf, buf + metaFileSize), Return(metaFileSize))); @@ -404,7 +406,8 @@ TEST_F(CSChunkfilePoolMockTest, InitializeTest) { FakeMetaFile(); EXPECT_CALL(*lfs_, DirExists(_)) .WillOnce(Return(false)); - ASSERT_EQ(false, pool.Initialize(options)); + ASSERT_EQ(true, pool.Initialize(options)); + pool.WaitoFormatDoneForTesting(); } // 当前目录存在,list目录失败 { diff --git a/test/chunkserver/datastore/filepool_unittest.cpp b/test/chunkserver/datastore/filepool_unittest.cpp index 480f6da72a..ea1592f62b 100644 --- a/test/chunkserver/datastore/filepool_unittest.cpp +++ b/test/chunkserver/datastore/filepool_unittest.cpp @@ -169,7 +169,7 @@ bool CheckFileOpenOrNot(const std::string& filename) { return out.find("No such file or directory") != out.npos; } -TEST_P(CSFilePool_test, InitializeTest) { +TEST_P(CSFilePool_test, InitializeNomalTest) { std::string filePool = "./cspooltest/filePool.meta"; const std::string filePoolPath = FILEPOOL_DIR; @@ -216,6 +216,64 @@ TEST_P(CSFilePool_test, InitializeTest) { fsptr->Delete("./cspooltest/filePool.meta3"); } +TEST_P(CSFilePool_test, InitializeFormatTest) { + std::string filePool = "./cspooltest/filePool1.meta"; + const std::string filePoolPath = POOL1_DIR; + std::string filePath = "./cspooltest/file"; + + FilePoolOptions cfop; + cfop.fileSize = 4096; + cfop.metaPageSize = 4096; + cfop.blockSize = 4096; + + cfop.formatThreadNum = 2; + memcpy(cfop.metaPath, filePool.c_str(), filePool.size()); + memcpy(cfop.filePoolDir, filePoolPath.c_str(), filePoolPath.size()); + + char metaPage[cfop.metaPageSize]; // NOLINT + + { + // meta file not exit. + FilePool pool(fsptr); + cfop.filePoolSize = 8192 * 10; + ASSERT_TRUE(pool.Initialize(cfop)); + ASSERT_EQ(0, pool.GetFile(filePath, (const char*)&metaPage)); + ASSERT_EQ(0, pool.RecycleFile(filePath)); + pool.WaitoFormatDoneForTesting(); + ASSERT_EQ(pool.Size(), 10); + } + + { + // filepool is not empty. + FilePool pool(fsptr); + cfop.filePoolSize = 8192 * 20; + ASSERT_TRUE(pool.Initialize(cfop)); + pool.WaitoFormatDoneForTesting(); + ASSERT_EQ(pool.Size(), 20); + } + + { + // the chunk num of filepool is less than preAllcateNum. + FilePool pool(fsptr); + cfop.filePoolSize = 8192 * 10; + ASSERT_TRUE(pool.Initialize(cfop)); + pool.WaitoFormatDoneForTesting(); + ASSERT_EQ(20, pool.Size()); + } + + { + // get file while fil epool is formatting. + FilePool pool(fsptr); + std::string filePath = "./cspooltest/file"; + cfop.filePoolSize = 8192 * 100; + ASSERT_TRUE(pool.Initialize(cfop)); + ASSERT_EQ(0, pool.GetFile(filePath, (const char*)&metaPage)); + ASSERT_EQ(0, pool.RecycleFile(filePath)); + sleep(3); + ASSERT_GT(pool.Size(), 20); + } +} + TEST_P(CSFilePool_test, GetFileTest) { std::string filePool = "./cspooltest/filePool.meta"; FilePoolOptions cfop; diff --git a/tools-v2/internal/error/error.go b/tools-v2/internal/error/error.go index d9676184db..1d8057a980 100644 --- a/tools-v2/internal/error/error.go +++ b/tools-v2/internal/error/error.go @@ -481,6 +481,9 @@ var ( ErrListWarmup = func() *CmdError { return NewInternalCmdError(74, "list warmup progress fail, err: %s") } + ErrBsGetFormatStatus = func() *CmdError { + return NewInternalCmdError(75, "get format status fail, err: %s") + } // http error ErrHttpUnreadableResult = func() *CmdError { diff --git a/tools-v2/internal/utils/row.go b/tools-v2/internal/utils/row.go index 4154203619..33590c75f8 100644 --- a/tools-v2/internal/utils/row.go +++ b/tools-v2/internal/utils/row.go @@ -129,7 +129,8 @@ const ( ROW_HEALTHY_COUNT = "healthyCount" ROW_UNHEALTHY_COUNT = "unhealthyCount" ROW_HEALTHY_RATIO = "ratio" - ROW_UNHEALTHY_RATIO = "unhealthy-Ratio" + ROW_UNHEALTHY_RATIO = "unhealthyRatio" + ROW_FORMAT_PERCENT = "formatPercent" ROW_RW_STATUS = "rwStatus" ROW_DISK_STATE = "diskState" diff --git a/tools-v2/pkg/cli/command/curvebs/list/formatstatus/formatstatus.go b/tools-v2/pkg/cli/command/curvebs/list/formatstatus/formatstatus.go new file mode 100644 index 0000000000..30b2dd2470 --- /dev/null +++ b/tools-v2/pkg/cli/command/curvebs/list/formatstatus/formatstatus.go @@ -0,0 +1,137 @@ +package formatstatus + +import ( + "context" + "fmt" + + cmderror "github.com/opencurve/curve/tools-v2/internal/error" + cobrautil "github.com/opencurve/curve/tools-v2/internal/utils" + basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" + "github.com/opencurve/curve/tools-v2/pkg/config" + "github.com/opencurve/curve/tools-v2/pkg/output" + "github.com/opencurve/curve/tools-v2/proto/proto/topology" + "github.com/spf13/cobra" + "google.golang.org/grpc" +) + +const ( + formatExample = `$ curve bs list format-status` +) + +type GetFormatStatusRpc struct { + Info *basecmd.Rpc + Request *topology.ListChunkFormatStatusRequest + mdsClient topology.TopologyServiceClient +} + +var _ basecmd.RpcFunc = (*GetFormatStatusRpc)(nil) // check interface + +func (gRpc *GetFormatStatusRpc) NewRpcClient(cc grpc.ClientConnInterface) { + gRpc.mdsClient = topology.NewTopologyServiceClient(cc) +} + +func (gRpc *GetFormatStatusRpc) Stub_Func(ctx context.Context) (interface{}, error) { + return gRpc.mdsClient.ListChunkFormatStatus(ctx, gRpc.Request) +} + +type FormatStatusCommand struct { + Rpc *GetFormatStatusRpc + FormatStatusInfoList []*topology.ChunkFormatStatus + basecmd.FinalCurveCmd +} + +var _ basecmd.FinalCurveCmdFunc = (*FormatStatusCommand)(nil) // check interface + +func NewListFormatStatusCommand() *FormatStatusCommand { + formatCmd := &FormatStatusCommand{ + FinalCurveCmd: basecmd.FinalCurveCmd{ + Use: "format-status", + Short: "list all format status in cluster", + Example: formatExample, + }, + } + + basecmd.NewFinalCurveCli(&formatCmd.FinalCurveCmd, formatCmd) + return formatCmd +} + +func NewFormatStatusCommand() *cobra.Command { + return NewListFormatStatusCommand().Cmd +} + +func (fCmd *FormatStatusCommand) AddFlags() { + config.AddRpcTimeoutFlag(fCmd.Cmd) + config.AddRpcRetryTimesFlag(fCmd.Cmd) + config.AddBsMdsFlagOption(fCmd.Cmd) + config.AddBsFilterOptionFlag(fCmd.Cmd) +} + +func (fCmd *FormatStatusCommand) Init(cmd *cobra.Command, args []string) error { + mdsAddrs, err := config.GetBsMdsAddrSlice(fCmd.Cmd) + if err.TypeCode() != cmderror.CODE_SUCCESS { + return err.ToError() + } + timeout := config.GetFlagDuration(fCmd.Cmd, config.RPCTIMEOUT) + retrytimes := config.GetFlagInt32(fCmd.Cmd, config.RPCRETRYTIMES) + + fCmd.Rpc = &GetFormatStatusRpc{ + Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "ListChunkFormatStatus"), + Request: &topology.ListChunkFormatStatusRequest{}, + mdsClient: nil, + } + fCmd.SetHeader([]string{ + cobrautil.ROW_CHUNKSERVER, cobrautil.ROW_IP, cobrautil.ROW_PORT, cobrautil.ROW_FORMAT_PERCENT, + }) + return nil +} + +func (fCmd *FormatStatusCommand) Print(cmd *cobra.Command, args []string) error { + return output.FinalCmdOutput(&fCmd.FinalCurveCmd, fCmd) +} + +func (fCmd *FormatStatusCommand) RunCommand(cmd *cobra.Command, args []string) error { + result, err := basecmd.GetRpcResponse(fCmd.Rpc.Info, fCmd.Rpc) + if err.TypeCode() != cmderror.CODE_SUCCESS { + fCmd.Error = err + fCmd.Result = nil + return err.ToError() + } + res := result.(*topology.ListChunkFormatStatusResponse) + fCmd.FormatStatusInfoList = res.GetChunkFormatStatus() + + rows := make([]map[string]string, 0) + for _, info := range fCmd.FormatStatusInfoList { + fmt.Println("sssss: ", info.GetIp()) + row := make(map[string]string) + row[cobrautil.ROW_CHUNKSERVER] = fmt.Sprint(info.GetChunkServerID()) + row[cobrautil.ROW_IP] = info.GetIp() + row[cobrautil.ROW_PORT] = fmt.Sprint(uint64(info.GetPort())) + row[cobrautil.ROW_FORMAT_PERCENT] = fmt.Sprint(info.GetFormatPercent()) + rows = append(rows, row) + } + list := cobrautil.ListMap2ListSortByKeys(rows, fCmd.Header, []string{}) + fCmd.TableNew.AppendBulk(list) + fCmd.Result, fCmd.Error = rows, cmderror.Success() + return nil +} + +func (fCmd *FormatStatusCommand) ResultPlainOutput() error { + return output.FinalCmdOutputPlain(&fCmd.FinalCurveCmd) +} + +func GetFormatStatusInfoList(caller *cobra.Command) ([]*topology.ChunkFormatStatus, *cmderror.CmdError) { + getCmd := NewListFormatStatusCommand() + config.AlignFlagsValue(caller, getCmd.Cmd, []string{ + config.CURVEBS_MDSADDR, config.RPCRETRYTIMES, config.RPCTIMEOUT, config.CURVEBS_FIlTER, + }) + getCmd.Cmd.SilenceErrors = true + getCmd.Cmd.SilenceUsage = true + getCmd.Cmd.SetArgs([]string{fmt.Sprintf("--%s", config.FORMAT), config.FORMAT_NOOUT}) + err := getCmd.Cmd.Execute() + if err != nil { + retErr := cmderror.ErrBsGetFormatStatus() + retErr.Format(err.Error()) + return getCmd.FormatStatusInfoList, retErr + } + return getCmd.FormatStatusInfoList, cmderror.Success() +} diff --git a/tools-v2/pkg/cli/command/curvebs/list/list.go b/tools-v2/pkg/cli/command/curvebs/list/list.go index 58e9b433fd..0b2e4266e3 100644 --- a/tools-v2/pkg/cli/command/curvebs/list/list.go +++ b/tools-v2/pkg/cli/command/curvebs/list/list.go @@ -27,6 +27,7 @@ import ( "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/chunkserver" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/client" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/dir" + "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/formatstatus" logicalpool "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/logicalPool" may_broken_vol "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/may-broken-vol" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/scanstatus" @@ -51,6 +52,7 @@ func (listCmd *ListCommand) AddSubCommands() { chunkserver.NewChunkServerCommand(), scanstatus.NewScanStatusCommand(), may_broken_vol.NewMayBrokenVolCommand(), + formatstatus.NewFormatStatusCommand(), ) }