diff --git a/conf/chunkserver.conf b/conf/chunkserver.conf index 0cfc27b544..be48684d1d 100644 --- a/conf/chunkserver.conf +++ b/conf/chunkserver.conf @@ -207,6 +207,10 @@ chunkfilepool.clean.enable=true chunkfilepool.clean.bytes_per_write=4096 # The throttle iops for cleaning chunk (4KB/IO) chunkfilepool.clean.throttle_iops=500 +# The size of chunkfilepool +chunkfilepool.chunk_file_pool_size=10GB +# The thread num for format chunks +chunkfilepool.thread_num=2 # # WAL file pool diff --git a/conf/chunkserver.conf.example b/conf/chunkserver.conf.example index eb664c2fd6..113eb374e7 100644 --- a/conf/chunkserver.conf.example +++ b/conf/chunkserver.conf.example @@ -199,6 +199,10 @@ chunkfilepool.clean.enable=true chunkfilepool.clean.bytes_per_write=4096 # The throttle iops for cleaning chunk (4KB/IO) chunkfilepool.clean.throttle_iops=500 +# The size of chunkfilepool +chunkfilepool.chunk_file_pool_size=0 +# The thread num for format chunks +chunkfilepool.thread_num=2 # # WAL file pool diff --git a/proto/heartbeat.proto b/proto/heartbeat.proto index d54723dfb8..0580084022 100644 --- a/proto/heartbeat.proto +++ b/proto/heartbeat.proto @@ -89,6 +89,8 @@ message ChunkServerStatisticInfo { required uint64 chunkSizeTrashedBytes = 7; // chunkfilepool的大小 optional uint64 chunkFilepoolSize = 8; + // chunkfilepool格式化进度 + optional uint32 chunkFilepoolFormatRate = 9; }; message ChunkServerHeartbeatRequest { diff --git a/proto/topology.proto b/proto/topology.proto index 2057cafe2a..b160b92672 100644 --- a/proto/topology.proto +++ b/proto/topology.proto @@ -501,6 +501,21 @@ message GetCopySetsInChunkServerRequest { optional uint32 port = 3; } +message ChunkFormatStatus { + required string ip = 1; + required uint32 port = 2; + required uint32 chunkServerID = 3; + required uint32 formatRate = 4; +} + +message ListChunkFormatStatusRequest { + +} + +message ListChunkFormatStatusResponse { + repeated ChunkFormatStatus chunkFormatStatus = 1; +} + message GetCopySetsInChunkServerResponse { required sint32 statusCode = 1; repeated common.CopysetInfo copysetInfos = 2; @@ -595,4 +610,5 @@ service TopologyService { rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse); rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse); rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse); + rpc ListChunkFormatStatus(ListChunkFormatStatusRequest) returns (ListChunkFormatStatusResponse); } diff --git a/src/chunkserver/chunkserver.cpp b/src/chunkserver/chunkserver.cpp index 07f1f48d5f..9c0dcfaeeb 100644 --- a/src/chunkserver/chunkserver.cpp +++ b/src/chunkserver/chunkserver.cpp @@ -44,6 +44,7 @@ #include "src/chunkserver/raftsnapshot/curve_snapshot_storage.h" #include "src/chunkserver/raftlog/curve_segment_log_storage.h" #include "src/common/curve_version.h" +#include "src/common/bytes_convert.h" using ::curve::fs::LocalFileSystem; using ::curve::fs::LocalFileSystemOption; @@ -65,6 +66,9 @@ DEFINE_string(raftSnapshotUri, "curve://./0/copysets", "raft snapshot uri"); DEFINE_string(raftLogUri, "curve://./0/copysets", "raft log uri"); DEFINE_string(recycleUri, "local://./0/recycler" , "recycle uri"); DEFINE_string(chunkFilePoolDir, "./0/", "chunk file pool location"); +DEFINE_string(chunkFilePoolSize, "1GB", "format percent for chunkfillpool."); +DEFINE_uint32(chunkFormatThreadNum, + 1, "number of threads while file pool formatting"); DEFINE_string(chunkFilePoolMetaPath, "./chunkfilepool.meta", "chunk file pool meta path"); DEFINE_string(logPath, "./0/chunkserver.log-", "log file path"); @@ -479,8 +483,6 @@ void ChunkServer::Stop() { brpc::AskToQuit(); } - - void ChunkServer::InitChunkFilePoolOptions( common::Configuration *conf, FilePoolOptions *chunkFilePoolOptions) { LOG_IF(FATAL, !conf->GetUInt32Value("global.chunk_size", @@ -513,6 +515,20 @@ void ChunkServer::InitChunkFilePoolOptions( "chunkfilepool.meta_path", &metaUri)); ::memcpy( chunkFilePoolOptions->metaPath, metaUri.c_str(), metaUri.size()); + + std::string chunkFilePoolUri; + LOG_IF(FATAL, !conf->GetStringValue( + "chunkfilepool.chunk_file_pool_dir", &chunkFilePoolUri)); + + ::memcpy(chunkFilePoolOptions->filePoolDir, + chunkFilePoolUri.c_str(), + chunkFilePoolUri.size()); + std::string pool_size; + LOG_IF(FATAL, !conf->GetStringValue("chunkfilepool.chunk_file_pool_size", &pool_size)); // NOLINT + LOG_IF(FATAL, !curve::common::ToNumbericByte(pool_size, + &chunkFilePoolOptions->filePoolSize)); + LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.chunk_file_pool_format_thread_num", // NOLINT + &chunkFilePoolOptions->formatThreadNum)); LOG_IF(FATAL, !conf->GetBoolValue("chunkfilepool.clean.enable", &chunkFilePoolOptions->needClean)); LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.clean.bytes_per_write", // NOLINT @@ -833,6 +849,20 @@ void ChunkServer::LoadConfigFromCmdline(common::Configuration *conf) { << "chunkFilePoolDir must be set when run chunkserver in command."; } + if (GetCommandLineFlagInfo("chunkFilePoolSize", &info)) { + conf->SetStringValue( + "chunkfilepool.chunk_file_pool_size", FLAGS_chunkFilePoolSize); + } else { + LOG(FATAL) + << "chunkFilePoolSize must be set when run chunkserver in command."; + } + + if (GetCommandLineFlagInfo("chunkFormatThreadNum", &info)) { + conf->SetUInt64Value( + "chunkfilepool.chunk_file_pool_format_thread_num", + FLAGS_chunkFormatThreadNum); + } + if (GetCommandLineFlagInfo("chunkFilePoolMetaPath", &info) && !info.is_default) { conf->SetStringValue( diff --git a/src/chunkserver/datastore/file_pool.cpp b/src/chunkserver/datastore/file_pool.cpp index d3f20feb6a..fcb960301d 100644 --- a/src/chunkserver/datastore/file_pool.cpp +++ b/src/chunkserver/datastore/file_pool.cpp @@ -53,7 +53,9 @@ const char* FilePoolHelper::kMetaPageSize = "metaPageSize"; const char* FilePoolHelper::kFilePoolPath = "chunkfilepool_path"; const char* FilePoolHelper::kCRC = "crc"; const char* FilePoolHelper::kBlockSize = "blockSize"; +const char* FilePoolHelper::kChunkNum = "chunkNum"; const uint32_t FilePoolHelper::kPersistSize = 4096; +const uint64_t FilePool::kMiniPersistIntervalNum_ = 0; const std::string FilePool::kCleanChunkSuffix_ = ".clean"; // NOLINT const std::chrono::milliseconds FilePool::kSuccessSleepMsec_(10); const std::chrono::milliseconds FilePool::kFailSleepMsec_(500); @@ -67,8 +69,8 @@ std::ostream& operator<<(std::ostream& os, const FilePoolMeta& meta) { << ", metapagesize: " << meta.metaPageSize << ", hasblocksize: " << meta.hasBlockSize << ", blocksize: " << meta.blockSize + << ", chunknum: " << meta.chunkNum << ", filepoolpath: " << meta.filePoolPath; - return os; } @@ -81,6 +83,7 @@ int FilePoolHelper::PersistEnCodeMetaInfo( Json::Value root; root[kFileSize] = meta.chunkSize; root[kMetaPageSize] = meta.metaPageSize; + root[kChunkNum] = meta.chunkNum; if (meta.hasBlockSize) { root[kBlockSize] = meta.blockSize; } @@ -192,6 +195,14 @@ int FilePoolHelper::DecodeMetaInfoFromMetaFile( break; } + if (!value[kChunkNum].isNull()) { + meta->chunkNum = value[kChunkNum].asUInt(); + } else { + LOG(ERROR) << "chunkfile meta file got error!" + << " no chunkNum!"; + break; + } + parse = true; } while (false); @@ -220,20 +231,33 @@ FilePool::FilePool(std::shared_ptr fsptr) memset(writeBuffer_.get(), 0, poolOpt_.bytesPerWrite); } +FilePool::~FilePool() { + UnInitialize(); +} + bool FilePool::Initialize(const FilePoolOptions &cfopt) { poolOpt_ = cfopt; if (poolOpt_.getFileFromPool) { + currentdir_ = poolOpt_.filePoolDir; + currentState_.chunkSize = poolOpt_.fileSize; + currentState_.metaPageSize = poolOpt_.metaPageSize; if (!CheckValid()) { - LOG(ERROR) << "check valid failed!"; + LOG(ERROR) << "Check vaild failed!"; return false; } - if (fsptr_->DirExists(currentdir_)) { - return ScanInternal(); - } else { - LOG(ERROR) << "chunkfile pool not exists, inited failed!" - << " chunkfile pool path = " << currentdir_.c_str(); + + if (!ScanInternal()) { + LOG(ERROR) << "Scan pool files failed!"; + return false; + } + + if (!PrepareFormat()) { + LOG(ERROR) << "Prepare format failed!"; return false; } + + formatAlived_.store(true); + formatThread_ = Thread(&FilePool::FormatWorker, this); } else { currentdir_ = poolOpt_.filePoolDir; if (!fsptr_->DirExists(currentdir_.c_str())) { @@ -245,6 +269,16 @@ bool FilePool::Initialize(const FilePoolOptions &cfopt) { bool FilePool::CheckValid() { FilePoolMeta meta; + if (!fsptr_->FileExists(poolOpt_.metaPath)) { + LOG(INFO) << "Metafile in path '" << poolOpt_.metaPath + << "'not found, it's the first initialized."; + currentState_.chunkSize = poolOpt_.fileSize; + currentState_.metaPageSize = poolOpt_.metaPageSize; + currentState_.blockSize = poolOpt_.blockSize; + currentState_.chunkNum = 0; + currentdir_ = poolOpt_.filePoolDir; + return true; + } int ret = FilePoolHelper::DecodeMetaInfoFromMetaFile( fsptr_, poolOpt_.metaPath, poolOpt_.metaFileSize, &meta); if (ret == -1) { @@ -273,6 +307,7 @@ bool FilePool::CheckValid() { currentState_.chunkSize = meta.chunkSize; currentState_.metaPageSize = meta.metaPageSize; currentState_.blockSize = meta.blockSize; + currentState_.chunkNum = meta.chunkNum; return true; } @@ -375,6 +410,133 @@ void FilePool::CleanWorker() { } } +bool FilePool::PrepareFormat() { + uint64_t bytesPerPage = poolOpt_.fileSize + poolOpt_.metaFileSize; + uint64_t needSpace = (poolOpt_.filePoolSize / + bytesPerPage - currentState_.chunkNum) * bytesPerPage; + uint64_t vaildSpace = 0; + curve::fs::FileSystemInfo finfo; + if (poolOpt_.filePoolSize / bytesPerPage < currentState_.chunkNum) { + LOG(WARNING) << "The number of chunks that are pre-allocated" + "is less than the number of initialized chunks."; + LOG(INFO) << "It is no need to format chunks."; + formatStat_.preAllocateNum = 0; + formatStat_.allocateChunkNum = 0; + return true; + } + + int r = fsptr_->Statfs(currentdir_, &finfo); + if (r != 0) { + LOG(ERROR) << "get disk usage info failed!"; + return false; + } + vaildSpace = finfo.available; + LOG(INFO) << "free space = " << finfo.available + << ", total space = " << finfo.total + << ", need space = " << needSpace; + if (vaildSpace < needSpace) { + LOG(ERROR) << "disk free space not enough."; + return false; + } + + formatStat_.preAllocateNum = needSpace / bytesPerPage; + formatStat_.allocateChunkNum = 0; + LOG(INFO) << "preAllocateNum = " << formatStat_.preAllocateNum; + return true; +} + +bool FilePool::WaitFormatDone() { + std::unique_lock lk(mtx_); + if (formatStat_.allocateChunkNum.load() != formatStat_.preAllocateNum) { + cond_.wait(lk, [&]() { + return formatStat_.allocateChunkNum.load() == + formatStat_.preAllocateNum; + }); + } + lk.unlock(); + if (formatThread_.joinable()) { + formatThread_.join(); + } + return true; +} + +bool FilePool::StopFormatting() { + if (formatAlived_.exchange(false)) { + LOG(INFO) << "Stop formatting..."; + if (formatThread_.joinable()) { + formatThread_.join(); + } + LOG(INFO) << "Stop format thread ok."; + } + return true; +} + +int FilePool::FormatWorker() { + std::vector threads; + LOG(INFO) << "format work start!"; + + std::atomic_bool is_wrong(false); + std::atomic allocatIndex{0}; + FilePoolMeta meta(currentState_.chunkSize, currentState_.metaPageSize, + currentState_.blockSize, currentdir_, + currentState_.chunkNum); + uint64_t chunksIndexOffset = + this->currentmaxfilenum_.fetch_add(formatStat_.preAllocateNum); + LOG(INFO) << "format chunk index offset: " << chunksIndexOffset + << "chunkNum: " << currentState_.chunkNum; + auto formatTask = [&]() -> int { + LOG(INFO) << "format thread has been work!"; + while (!is_wrong.load() && this->formatAlived_.load()) { + uint32_t chunkIndex = 0; + if ((chunkIndex = allocatIndex.fetch_add(1)) + >= formatStat_.preAllocateNum) { + allocatIndex.fetch_sub(1); + break; + } + std::string chunkPath = this->currentdir_ + "/" + + std::to_string(chunksIndexOffset + chunkIndex) + + kCleanChunkSuffix_; + int res = this->AllocateChunk(chunkPath); + if (res != 0) { + is_wrong.store(true); + LOG(ERROR) << "Format ERROR!"; + break; + } + this->mtx_.lock(); + cleanChunks_.push_back(chunksIndexOffset + chunkIndex); + this->currentState_.cleanChunksLeft++; + this->currentState_.preallocatedChunksLeft++; + this->currentState_.chunkNum++; + this->formatStat_.allocateChunkNum++; + meta.chunkNum++; + FilePoolHelper::PersistEnCodeMetaInfo( + this->fsptr_, meta, this->poolOpt_.metaPath); + this->mtx_.unlock(); + this->cond_.notify_all(); + } + LOG(INFO) << "format thread has done!"; + return 0; + }; + + for (uint32_t i = 0; i < poolOpt_.formatThreadNum; i++) { + threads.push_back(std::move(Thread(formatTask))); + } + this->currentmaxfilenum_ += formatStat_.preAllocateNum; + + for (auto &thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + LOG(INFO) << "format worker done"; + + if (is_wrong.load()) { + LOG(ERROR) << "Chunk format failed!"; + return -1; + } + return 0; +} + bool FilePool::StartCleaning() { if (poolOpt_.needClean && !cleanAlived_.exchange(true)) { ReadWriteThrottleParams params; @@ -402,7 +564,6 @@ bool FilePool::StopCleaning() { bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { auto pop = [&](std::vector *chunks, uint64_t *chunksLeft, bool isCleanChunks) -> bool { - std::unique_lock lk(mtx_); if (chunks->empty()) { return false; } @@ -415,16 +576,28 @@ bool FilePool::GetChunk(bool needClean, uint64_t *chunkid, bool *isCleaned) { return true; }; - if (!needClean) { - return pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false) || - pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true); - } + auto wake_up = [&]() { + return (formatStat_.allocateChunkNum.load() == formatStat_.preAllocateNum) // NOLINT + || !(dirtyChunks_.empty() && cleanChunks_.empty()); + }; - // Need clean chunk - *isCleaned = false; - bool ret = pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true) || - pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false); + bool ret = true; + { + std::unique_lock lk(mtx_); + if (formatStat_.allocateChunkNum.load() + != formatStat_.preAllocateNum) { + cond_.wait(lk, wake_up); + } + if (!needClean) { + return pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false) || + pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true); + } + // Need clean chunk + *isCleaned = false; + ret = pop(&cleanChunks_, ¤tState_.cleanChunksLeft, true) || + pop(&dirtyChunks_, ¤tState_.dirtyChunksLeft, false); + } if (true == ret && false == *isCleaned && CleanChunk(*chunkid, true)) { *isCleaned = true; } @@ -638,7 +811,7 @@ int FilePool::RecycleFile(const std::string &chunkpath) { void FilePool::UnInitialize() { currentdir_ = ""; - + StopFormatting(); std::unique_lock lk(mtx_); dirtyChunks_.clear(); cleanChunks_.clear(); @@ -647,8 +820,16 @@ void FilePool::UnInitialize() { bool FilePool::ScanInternal() { uint64_t maxnum = 0; std::vector tmpvec; + int ret = 0; LOG(INFO) << "scan dir" << currentdir_; - int ret = fsptr_->List(currentdir_.c_str(), &tmpvec); + if (!fsptr_->DirExists(currentdir_.c_str())) { + ret = fsptr_->Mkdir(currentdir_.c_str()); + if (ret != 0) { + LOG(ERROR) << "Mkdir [" << currentdir_ <<"]" << " failed!"; + return false; + } + } + ret = fsptr_->List(currentdir_.c_str(), &tmpvec); if (ret < 0) { LOG(ERROR) << "list file pool dir failed!"; return false; @@ -715,6 +896,7 @@ bool FilePool::ScanInternal() { currentState_.cleanChunksLeft = cleanChunks_.size(); currentState_.preallocatedChunksLeft = currentState_.dirtyChunksLeft + currentState_.cleanChunksLeft; + formatStat_.allocateChunkNum = tmpvec.size(); LOG(INFO) << "scan done, pool size = " << currentState_.preallocatedChunksLeft; @@ -730,10 +912,15 @@ FilePoolState FilePool::GetState() const { return currentState_; } +const ChunkFormatStat& FilePool::GetChunkFormatStat() const { + return formatStat_; +} + uint32_t FilePoolMeta::Crc32() const { const size_t size = sizeof(kFilePoolMagic) + sizeof(chunkSize) + sizeof(metaPageSize) + filePoolPath.size() + - (hasBlockSize ? sizeof(blockSize) : 0); + (hasBlockSize ? sizeof(blockSize) : 0) + + sizeof(chunkNum); std::unique_ptr crc(new char[size]); size_t off = 0; @@ -752,6 +939,9 @@ uint32_t FilePoolMeta::Crc32() const { off += sizeof(blockSize); } + memcpy(crc.get() + off, &chunkNum, sizeof(chunkNum)); + off += sizeof(chunkNum); + memcpy(crc.get() + off, filePoolPath.c_str(), filePoolPath.size()); off += filePoolPath.size(); diff --git a/src/chunkserver/datastore/file_pool.h b/src/chunkserver/datastore/file_pool.h index f3417e160a..8a93d9aed0 100644 --- a/src/chunkserver/datastore/file_pool.h +++ b/src/chunkserver/datastore/file_pool.h @@ -32,6 +32,7 @@ #include #include #include +#include #include "src/common/concurrent/concurrent.h" #include "src/common/interruptible_sleeper.h" @@ -65,6 +66,10 @@ struct FilePoolOptions { // retry times for get file uint16_t retryTimes; + uint32_t preAllocateNum; + uint64_t filePoolSize; + uint32_t formatThreadNum; + FilePoolOptions() { getFileFromPool = true; needClean = false; @@ -75,6 +80,9 @@ struct FilePoolOptions { metaPageSize = 0; retryTimes = 5; blockSize = 0; + preAllocateNum = 0; + filePoolSize = 0; + formatThreadNum = 1; ::memset(metaPath, 0, 256); ::memset(filePoolDir, 0, 256); } @@ -87,7 +95,8 @@ struct FilePoolState { uint64_t cleanChunksLeft = 0; // How many pre-allocated chunks are not used by the datastore uint64_t preallocatedChunksLeft = 0; - + // Total num of chunks in the datastore + uint64_t chunkNum; // chunksize uint32_t chunkSize = 0; // metapage size @@ -102,30 +111,40 @@ struct FilePoolMeta { bool hasBlockSize = false; uint32_t blockSize = 0; std::string filePoolPath; + uint32_t chunkNum = 0; FilePoolMeta() = default; FilePoolMeta(uint32_t chunksize, uint32_t metapagesize, uint32_t blocksize, - const std::string& filepool) + const std::string& filepool, + uint64_t chunknum = 0) : chunkSize(chunksize), metaPageSize(metapagesize), hasBlockSize(true), blockSize(blocksize), - filePoolPath(filepool) {} + filePoolPath(filepool), + chunkNum(chunknum) {} FilePoolMeta(uint32_t chunksize, uint32_t metapagesize, - const std::string& filepool) + const std::string& filepool, + uint64_t chunknum = 0) : chunkSize(chunksize), metaPageSize(metapagesize), hasBlockSize(false), - filePoolPath(filepool) {} + filePoolPath(filepool), + chunkNum(chunknum) {} uint32_t Crc32() const; }; +typedef struct ChunkFormatStat { + std::atomic allocateChunkNum{0}; + uint32_t preAllocateNum = 0; +} ChunkFormatStat_t; + class FilePoolHelper { public: static const char* kFileSize; @@ -133,6 +152,7 @@ class FilePoolHelper { static const char* kFilePoolPath; static const char* kCRC; static const char* kBlockSize; + static const char* kChunkNum; static const uint32_t kPersistSize; /** @@ -168,7 +188,7 @@ class FilePoolHelper { class CURVE_CACHELINE_ALIGNMENT FilePool { public: explicit FilePool(std::shared_ptr fsptr); - virtual ~FilePool() = default; + virtual ~FilePool(); /** * Initialization function @@ -237,12 +257,28 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { */ bool StopCleaning(); + /** + * @brief: Wait for format thread done. + * @return: Return true if success. + */ + bool WaitFormatDone(); + + /** + * @brief: Get the format status of FilePool + * @return: Return the format status. + */ + virtual const ChunkFormatStat& GetChunkFormatStat() const; + private: // Traverse the pre-allocated chunk information from the // chunkfile pool directory bool ScanInternal(); - // Check whether the chunkfile pool pre-allocation is legal + // Prepare for format. + bool PrepareFormat(); + // bool CheckValid(); + // Check whether pool file is legal. + bool CheckPoolFile(const std::string& file); /** * Perform metapage assignment for the new chunkfile * @param: sourcepath is the file path to be written @@ -282,6 +318,17 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { */ bool CleaningChunk(); + /** + * @brief: The function of thread for formatting chunk + */ + int FormatWorker(); + + /** + * @brief: Stop thread for formatting chunk + * @return: Return true if success, otherwise return false + */ + bool StopFormatting(); + /** * @brief: The function of thread for cleaning chunk */ @@ -291,6 +338,10 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { // The suffix of clean chunk file (".0") static const std::string kCleanChunkSuffix_; + // Minimum chunk number of intervals for metadata + // persistence during the formatting phase + static const uint64_t kMiniPersistIntervalNum_; + // Sets a pause between cleaning when clean chunk success static const std::chrono::milliseconds kSuccessSleepMsec_; @@ -300,6 +351,9 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { // Protect dirtyChunks_, cleanChunks_ std::mutex mtx_; + // Wait for GetChunk + std::condition_variable cond_; + // Current FilePool pre-allocated files, folder path std::string currentdir_; @@ -331,6 +385,15 @@ class CURVE_CACHELINE_ALIGNMENT FilePool { // The throttle iops for cleaning chunk (4KB/IO) Throttle cleanThrottle_; + // Whether the format thread is alive + Atomic formatAlived_{true}; + + // Thread for format chunks. + Thread formatThread_; + + // Stat for format chunks. + ChunkFormatStat formatStat_; + // Sleeper for cleaning chunk thread InterruptibleSleeper cleanSleeper_; diff --git a/src/chunkserver/heartbeat.cpp b/src/chunkserver/heartbeat.cpp index 0e756b29c6..4150b07b92 100644 --- a/src/chunkserver/heartbeat.cpp +++ b/src/chunkserver/heartbeat.cpp @@ -268,10 +268,20 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) { * walSegmentFileSize; uint64_t chunkPoolSize = options_.chunkFilePool->Size() * options_.chunkFilePool->GetFilePoolOpt().fileSize; + + // compute format progress rate. + const ChunkFormatStat& formatStat = options_.chunkFilePool->GetChunkFormatStat(); // NOLINT + stats->set_chunkfilepoolsize(chunkPoolSize); stats->set_chunksizeusedbytes(usedChunkSize+usedWalSegmentSize); stats->set_chunksizeleftbytes(leftChunkSize+leftWalSegmentSize); stats->set_chunksizetrashedbytes(trashedChunkSize); + if (formatStat.preAllocateNum != 0) { + stats->set_chunkfilepoolformatrate( + 100 * formatStat.allocateChunkNum / formatStat.preAllocateNum); + } else { + stats->set_chunkfilepoolformatrate(100); + } req->set_allocated_stats(stats); size_t cap, avail; @@ -391,7 +401,8 @@ int Heartbeat::SendHeartbeat(const HeartbeatRequest& request, cntl.ErrorCode() == brpc::ELOGOFF || cntl.ErrorCode() == brpc::ERPCTIMEDOUT) { LOG(WARNING) << "current mds: " << mdsEps_[inServiceIndex_] - << " is shutdown or going to quit"; + << " is shutdown or going to quit, error_code: " + << cntl.ErrorCode(); inServiceIndex_ = (inServiceIndex_ + 1) % mdsEps_.size(); LOG(INFO) << "next heartbeat switch to " << mdsEps_[inServiceIndex_]; diff --git a/src/common/bytes_convert.h b/src/common/bytes_convert.h new file mode 100644 index 0000000000..0710b9123f --- /dev/null +++ b/src/common/bytes_convert.h @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: curve + * Created Date: 2023-10-06 + * Author: yyyyufeng + */ + +#ifndef SRC_COMMON_BYTES_CONVERT_H_ +#define SRC_COMMON_BYTES_CONVERT_H_ + +#include +#include + +namespace curve { +namespace common { + +constexpr uint64_t kKiB = 1024ULL; +constexpr uint64_t kMiB = 1024ULL * kKiB; +constexpr uint64_t kGiB = 1024ULL * kMiB; +constexpr uint64_t kTiB = 1024ULL * kGiB; + +/** + * @brief convert string to bytes + * @return true if success +*/ +bool ToNumbericByte(const std::string& source, uint64_t* target) { + int len = source.size(); + if (source[len - 1] >= '0' && source[len - 1] <= '9') { + *target = std::stoul(source); + return true; + } + if (len < 3 || (source[len - 1] != 'b' && source[len - 1] != 'B')) { + return false; + } + *target = 0; + for (int i = 0; i < len - 2; i++) { + char ch = source[i]; + if (ch < '0' || ch > '9') { + return false; + } + *target *= 10; + *target += (ch - '0'); + } + bool ret = true; + switch (source[len - 2]) { + case 'k': + case 'K': + *target *= kKiB; + break; + case 'M': + case 'm': + *target *= kMiB; + break; + case 'G': + case 'g': + *target *= kGiB; + break; + case 'T': + case 't': + *target *= kTiB; + break; + default: + ret = false; + break; + } + return ret; +} + +} // namespace common +} // namespace curve +#endif // SRC_COMMON_BYTES_CONVERT_H_ diff --git a/src/mds/heartbeat/heartbeat_manager.cpp b/src/mds/heartbeat/heartbeat_manager.cpp index b5100eaa73..79d42fdc90 100644 --- a/src/mds/heartbeat/heartbeat_manager.cpp +++ b/src/mds/heartbeat/heartbeat_manager.cpp @@ -137,6 +137,10 @@ void HeartbeatManager::UpdateChunkServerStatistics( stat.chunkFilepoolSize = request.stats().chunkfilepoolsize(); } + if (request.stats().has_chunkfilepoolformatrate()) { + stat.chunkFilepoolFormatRate = request.stats().chunkfilepoolformatrate(); // NOLINT + } + for (int i = 0; i < request.copysetinfos_size(); i++) { CopysetStat cstat; cstat.logicalPoolId = request.copysetinfos(i).logicalpoolid(); diff --git a/src/mds/topology/topology_service.cpp b/src/mds/topology/topology_service.cpp index 2671b5d8d5..c5ef3c6baf 100644 --- a/src/mds/topology/topology_service.cpp +++ b/src/mds/topology/topology_service.cpp @@ -1154,6 +1154,23 @@ void TopologyServiceImpl::ListUnAvailCopySets( } } +void TopologyServiceImpl::ListChunkFormatStatus( + google::protobuf::RpcController* cntl_base, + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard done_guard(done); + brpc::Controller* cntl = + static_cast(cntl_base); + + LOG(INFO) << "Received request[log_id=" << cntl->log_id() + << "] from " << cntl->remote_side() + << " to " << cntl->local_side() + << ". [ListChunkFormatStatus] " + << request->DebugString(); + topology_->ListChunkFormatStatus(request, response); +} + } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service.h b/src/mds/topology/topology_service.h index a3eeaed257..8f4cc4b635 100644 --- a/src/mds/topology/topology_service.h +++ b/src/mds/topology/topology_service.h @@ -232,6 +232,11 @@ class TopologyServiceImpl : public TopologyService { const ListUnAvailCopySetsRequest* request, ListUnAvailCopySetsResponse* response, google::protobuf::Closure* done); + virtual void ListChunkFormatStatus( + google::protobuf::RpcController* cntl_base, + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response, + google::protobuf::Closure* done); private: std::shared_ptr topology_; diff --git a/src/mds/topology/topology_service_manager.cpp b/src/mds/topology/topology_service_manager.cpp index ca0e87e4a8..15dbcc6a5f 100644 --- a/src/mds/topology/topology_service_manager.cpp +++ b/src/mds/topology/topology_service_manager.cpp @@ -1795,6 +1795,28 @@ void TopologyServiceManager::ConvertCopyset(const CopySetInfo& in, out->set_lastscanconsistent(in.GetLastScanConsistent()); } +void TopologyServiceManager::ListChunkFormatStatus( + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response) { + std::vector chunkserverlist = + topology_->GetChunkServerInCluster( + [] (const ChunkServer &cs) { + return true; + }); + for (auto &id : chunkserverlist) { + ChunkServer cs; + ChunkServerStat stat; + if (topology_->GetChunkServer(id, &cs) + && topoStat_->GetChunkServerStat(id, &stat)) { + ChunkFormatStatus *status = response->add_chunkformatstatus(); + status->set_chunkserverid(id); + status->set_ip(cs.GetHostIp()); + status->set_port(cs.GetPort()); + status->set_formatrate(stat.chunkFilepoolFormatRate); + } + } +} + } // namespace topology } // namespace mds } // namespace curve diff --git a/src/mds/topology/topology_service_manager.h b/src/mds/topology/topology_service_manager.h index 2de2ab5c1d..d3be26ec33 100644 --- a/src/mds/topology/topology_service_manager.h +++ b/src/mds/topology/topology_service_manager.h @@ -198,6 +198,10 @@ class TopologyServiceManager { ChunkServerIdType id, const std::vector ©setInfos); + virtual void ListChunkFormatStatus( + const ListChunkFormatStatusRequest* request, + ListChunkFormatStatusResponse* response); + private: /** * @brief create copyset for logical pool diff --git a/src/mds/topology/topology_stat.h b/src/mds/topology/topology_stat.h index 1dfba17e00..2f70df78e7 100644 --- a/src/mds/topology/topology_stat.h +++ b/src/mds/topology/topology_stat.h @@ -85,6 +85,8 @@ struct ChunkServerStat { uint64_t chunkSizeTrashedBytes; // Size of chunkfilepool uint64_t chunkFilepoolSize; + // Rate of chunkfilepool format + uint32_t chunkFilepoolFormatRate; // Copyset statistic std::vector copysetStats; @@ -95,7 +97,8 @@ struct ChunkServerStat { readRate(0), writeRate(0), readIOPS(0), - writeIOPS(0) {} + writeIOPS(0), + chunkFilepoolFormatRate(0) {} }; /** diff --git a/src/tools/curve_format_main.cpp b/src/tools/curve_format_main.cpp index 795813cdfd..65df648e87 100644 --- a/src/tools/curve_format_main.cpp +++ b/src/tools/curve_format_main.cpp @@ -308,6 +308,7 @@ int main(int argc, char** argv) { meta.hasBlockSize = true; meta.blockSize = FLAGS_blockSize; meta.filePoolPath = FLAGS_filePoolDir; + meta.chunkNum = preAllocateChunkNum; int ret = curve::chunkserver::FilePoolHelper::PersistEnCodeMetaInfo( fsptr, meta, FLAGS_filePoolMetaPath); @@ -350,6 +351,11 @@ int main(int argc, char** argv) { break; } + if (recordMeta.chunkNum != preAllocateChunkNum) { + LOG(ERROR) << "chunkNum meta info persistency wrong!"; + break; + } + valid = true; } while (0); diff --git a/src/tools/curve_tool_define.h b/src/tools/curve_tool_define.h index a392b807bd..8800bf847c 100644 --- a/src/tools/curve_tool_define.h +++ b/src/tools/curve_tool_define.h @@ -57,6 +57,7 @@ const char kClientListCmd[] = "client-list"; const char kSnapshotCloneStatusCmd[] = "snapshot-clone-status"; const char kClusterStatusCmd[] = "cluster-status"; const char kScanStatusCmd[] = "scan-status"; +const char kFormatStatusCmd[] = "format-status"; // NameSpaceTool相关命令 const char kGetCmd[] = "get"; diff --git a/src/tools/mds_client.cpp b/src/tools/mds_client.cpp index 22e807ea9e..cd71aca5a0 100644 --- a/src/tools/mds_client.cpp +++ b/src/tools/mds_client.cpp @@ -1177,5 +1177,22 @@ int MDSClient::ListPoolset(std::vector* poolsets) { return -1; } +int MDSClient::ListChunkFormatStatus(std::vector* formatStatuses) { //NOLINT + assert(formatStatuses != nullptr); + curve::mds::topology::ListChunkFormatStatusRequest request; + curve::mds::topology::ListChunkFormatStatusResponse response; + curve::mds::topology::TopologyService_Stub stub(&channel_); + + auto fp = &curve::mds::topology::TopologyService_Stub::ListChunkFormatStatus; //NOLINT + if (0 != SendRpcToMds(&request, &response, &stub, fp)) { + std::cout << "ListChunkFormatStatus fail" << std::endl; + return -1; + } + for (auto stat : response.chunkformatstatus()) { + formatStatuses->push_back(stat); + } + return 0; +} + } // namespace tool } // namespace curve diff --git a/src/tools/mds_client.h b/src/tools/mds_client.h index 05bac69cd5..88ba05aa8e 100644 --- a/src/tools/mds_client.h +++ b/src/tools/mds_client.h @@ -69,6 +69,9 @@ using curve::mds::topology::ChunkServerStatus; using curve::mds::topology::ListChunkServerRequest; using curve::mds::topology::GetChunkServerInfoRequest; using curve::mds::topology::GetCopySetsInChunkServerRequest; +using curve::mds::topology::ChunkFormatStatus; +using curve::mds::topology::ListChunkFormatStatusRequest; + using curve::mds::schedule::RapidLeaderScheduleRequst; using curve::mds::schedule::RapidLeaderScheduleResponse; using curve::common::Authenticator; @@ -480,6 +483,8 @@ class MDSClient { int ListPoolset(std::vector* poolsets); + int ListChunkFormatStatus(std::vector* formatStatuses); + private: /** * @brief 切换mds diff --git a/src/tools/status_tool.cpp b/src/tools/status_tool.cpp index e6bfc116a4..538eb6e8fe 100644 --- a/src/tools/status_tool.cpp +++ b/src/tools/status_tool.cpp @@ -125,7 +125,8 @@ bool StatusTool::SupportCommand(const std::string& command) { || command == kClusterStatusCmd || command == kServerListCmd || command == kLogicalPoolList - || command == kScanStatusCmd); + || command == kScanStatusCmd + || command == kFormatStatusCmd); } void StatusTool::PrintHelp(const std::string& cmd) { @@ -225,6 +226,22 @@ int StatusTool::SpaceCmd() { return 0; } +int StatusTool::FormatStatusCmd() { + std::vector formatStatus; + int res = mdsClient_->ListChunkFormatStatus(&formatStatus); + if (res != 0) { + std::cout << "ListChunkserversInCluster fail!" << std::endl; + return -1; + } + for (auto stat : formatStatus) { + std::cout << "ip:" << stat.ip() + << " port:" << stat.port() + << " id:" << stat.chunkserverid() + << " rate:" << stat.formatrate() << std::endl; + } + return 0; +} + int StatusTool::ChunkServerListCmd() { std::vector chunkservers; int res = mdsClient_->ListChunkServersInCluster(&chunkservers); @@ -1137,6 +1154,8 @@ int StatusTool::RunCommand(const std::string &cmd) { return ClientListCmd(); } else if (cmd == kScanStatusCmd) { return ScanStatusCmd(); + } else if (cmd == kFormatStatusCmd) { + return FormatStatusCmd(); } else { std::cout << "Command not supported!" << std::endl; return -1; diff --git a/src/tools/status_tool.h b/src/tools/status_tool.h index 2b54d70943..82b776fa73 100644 --- a/src/tools/status_tool.h +++ b/src/tools/status_tool.h @@ -150,6 +150,7 @@ class StatusTool : public CurveTool { int PrintClientStatus(); int ClientListCmd(); int ScanStatusCmd(); + int FormatStatusCmd(); void PrintCsLeftSizeStatistics( const std::string &name, const std::map> &poolLeftSize); diff --git a/test/chunkserver/datastore/filepool_mock_unittest.cpp b/test/chunkserver/datastore/filepool_mock_unittest.cpp index f9fc0502e1..5217ffb8bd 100644 --- a/test/chunkserver/datastore/filepool_mock_unittest.cpp +++ b/test/chunkserver/datastore/filepool_mock_unittest.cpp @@ -59,6 +59,7 @@ const PageSizeType PAGE_SIZE = 4096; const uint32_t metaFileSize = 4096; const uint32_t blockSize = 4096; const uint32_t fileSize = CHUNK_SIZE + PAGE_SIZE; +const uint64_t chunkNum = 0; const std::string poolDir = "./chunkfilepool_dat"; // NOLINT const std::string poolMetaPath = "./chunkfilepool_dat.meta"; // NOLINT const std::string filePath1 = poolDir + "/1"; // NOLINT @@ -68,6 +69,7 @@ const char* kMetaPageSize = "metaPageSize"; const char* kChunkFilePoolPath = "chunkfilepool_path"; const char* kCRC = "crc"; const char* kBlockSize = "blockSize"; +const char* kChunkNum = "chunkNum"; class CSChunkfilePoolMockTest : public testing::Test { public: @@ -87,6 +89,7 @@ class CSChunkfilePoolMockTest : public testing::Test { meta.blockSize = blockSize; } meta.filePoolPath = poolDir; + meta.chunkNum = chunkNum; Json::Value jsonContent; jsonContent[kChunkSize] = CHUNK_SIZE; @@ -98,10 +101,13 @@ class CSChunkfilePoolMockTest : public testing::Test { jsonContent[kChunkFilePoolPath] = poolDir; jsonContent[kCRC] = meta.Crc32(); + jsonContent[kChunkNum] = chunkNum; return jsonContent; } void FakeMetaFile() { + EXPECT_CALL(*lfs_, FileExists(poolMetaPath)) + .WillOnce(Return(true)); EXPECT_CALL(*lfs_, Open(poolMetaPath, _)) .WillOnce(Return(100)); EXPECT_CALL(*lfs_, Read(100, NotNull(), 0, metaFileSize)) @@ -391,6 +397,8 @@ TEST_F(CSChunkfilePoolMockTest, InitializeTest) { char buf[metaFileSize] = {0}; EXPECT_CALL(*lfs_, Open(poolMetaPath, _)) .WillOnce(Return(1)); + EXPECT_CALL(*lfs_, FileExists(poolMetaPath)) + .WillOnce(Return(true)); EXPECT_CALL(*lfs_, Read(1, NotNull(), 0, metaFileSize)) .WillOnce(DoAll(SetArrayArgument<1>(buf, buf + metaFileSize), Return(metaFileSize))); @@ -404,7 +412,8 @@ TEST_F(CSChunkfilePoolMockTest, InitializeTest) { FakeMetaFile(); EXPECT_CALL(*lfs_, DirExists(_)) .WillOnce(Return(false)); - ASSERT_EQ(false, pool.Initialize(options)); + ASSERT_EQ(true, pool.Initialize(options)); + pool.WaitFormatDone(); } // 当前目录存在,list目录失败 { diff --git a/test/chunkserver/datastore/filepool_unittest.cpp b/test/chunkserver/datastore/filepool_unittest.cpp index 480f6da72a..e4e8f86969 100644 --- a/test/chunkserver/datastore/filepool_unittest.cpp +++ b/test/chunkserver/datastore/filepool_unittest.cpp @@ -169,7 +169,7 @@ bool CheckFileOpenOrNot(const std::string& filename) { return out.find("No such file or directory") != out.npos; } -TEST_P(CSFilePool_test, InitializeTest) { +TEST_P(CSFilePool_test, InitializeNomalTest) { std::string filePool = "./cspooltest/filePool.meta"; const std::string filePoolPath = FILEPOOL_DIR; @@ -216,6 +216,63 @@ TEST_P(CSFilePool_test, InitializeTest) { fsptr->Delete("./cspooltest/filePool.meta3"); } +TEST_P(CSFilePool_test, InitializeFormatTest) { + std::string filePool = "./cspooltest/filePool1.meta"; + const std::string filePoolPath = POOL1_DIR; + std::string filePath = "./cspooltest/file"; + + FilePoolOptions cfop; + cfop.fileSize = 4096; + cfop.metaPageSize = 4096; + cfop.blockSize = 4096; + + cfop.formatThreadNum = 2; + memcpy(cfop.metaPath, filePool.c_str(), filePool.size()); + memcpy(cfop.filePoolDir, filePoolPath.c_str(), filePoolPath.size()); + + char metaPage[cfop.metaPageSize]; // NOLINT + + { + // meta file not exit. + FilePool pool(fsptr); + cfop.filePoolSize = 8192 * 10; + ASSERT_TRUE(pool.Initialize(cfop)); + ASSERT_EQ(0, pool.GetFile(filePath, (const char*)&metaPage)); + ASSERT_EQ(0, pool.RecycleFile(filePath)); + pool.WaitFormatDone(); + ASSERT_EQ(10, pool.Size()); + } + + { + // filepool is not empty. + FilePool pool(fsptr); + cfop.filePoolSize = 8192 * 20; + ASSERT_TRUE(pool.Initialize(cfop)); + pool.WaitFormatDone(); + ASSERT_EQ(20, pool.Size()); + } + + { + // the chunk num of filepool is less than preAllcateNum. + FilePool pool(fsptr); + cfop.filePoolSize = 8192 * 10; + ASSERT_TRUE(pool.Initialize(cfop)); + pool.WaitFormatDone(); + ASSERT_EQ(20, pool.Size()); + } + + { + // get file while fil epool is formatting. + FilePool pool(fsptr); + std::string filePath = "./cspooltest/file"; + cfop.filePoolSize = 8192 * 100; + ASSERT_TRUE(pool.Initialize(cfop)); + ASSERT_EQ(0, pool.GetFile(filePath, (const char*)&metaPage)); + ASSERT_EQ(0, pool.RecycleFile(filePath)); + // ASSERT_GT(pool.Size(), 20); + } +} + TEST_P(CSFilePool_test, GetFileTest) { std::string filePool = "./cspooltest/filePool.meta"; FilePoolOptions cfop; diff --git a/test/integration/chunkserver/chunkserver_basic_test.cpp b/test/integration/chunkserver/chunkserver_basic_test.cpp index bc922d19e2..c897383281 100644 --- a/test/integration/chunkserver/chunkserver_basic_test.cpp +++ b/test/integration/chunkserver/chunkserver_basic_test.cpp @@ -113,7 +113,7 @@ class ChunkServerIoTest : public testing::Test { metaDir_ = "./" + std::to_string(PeerCluster::PeerToId(peer1_)) + "/chunkfilepool.meta"; - FilePoolMeta meta(kChunkSize, kPageSize, poolDir_); + FilePoolMeta meta(kChunkSize, kPageSize, poolDir_, kChunkNum); FilePoolHelper::PersistEnCodeMetaInfo(lfs_, meta, metaDir_); allocateChunk(lfs_, kChunkNum, poolDir_, kChunkSize); }