Skip to content

Commit

Permalink
Chunk pool format asyn
Browse files Browse the repository at this point in the history
Signed-off-by: yyyyufeng <[email protected]>
  • Loading branch information
Vigor-jpg committed Oct 15, 2023
1 parent e822715 commit 1b32b35
Show file tree
Hide file tree
Showing 24 changed files with 675 additions and 48 deletions.
8 changes: 8 additions & 0 deletions conf/chunkserver.conf
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ chunkfilepool.clean.enable=true
chunkfilepool.clean.bytes_per_write=4096
# The throttle iops for cleaning chunk (4KB/IO)
chunkfilepool.clean.throttle_iops=500
# The size of chunkfilepool (None/KB/MB/GB/TB)
chunkfilepool.chunk_file_pool_size=0
# The thread num for format chunks
chunkfilepool.thread_num=2

#
# WAL file pool
Expand All @@ -229,6 +233,10 @@ walfilepool.metapage_size=4096
walfilepool.meta_file_size=4096
# WAL filepool get chunk最大重试次数
walfilepool.retry_times=5
# The size of walfilepool (None/KB/MB/GB/TB)
walfilepool.wal_file_pool_size=0
# The thread num for format chunks
walfilepool.thread_num=2

#
# trash settings
Expand Down
4 changes: 4 additions & 0 deletions conf/chunkserver.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ chunkfilepool.clean.enable=true
chunkfilepool.clean.bytes_per_write=4096
# The throttle iops for cleaning chunk (4KB/IO)
chunkfilepool.clean.throttle_iops=500
# The size of chunkfilepool
chunkfilepool.chunk_file_pool_size=0
# The thread num for format chunks
chunkfilepool.thread_num=2

#
# WAL file pool
Expand Down
4 changes: 3 additions & 1 deletion proto/heartbeat.proto
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ message ChunkServerStatisticInfo {
required uint64 chunkSizeLeftBytes = 6;
// 回收站中chunk占用的磁盘空间
required uint64 chunkSizeTrashedBytes = 7;
// chunkfilepool的大小
// size of chunkfilepool
optional uint64 chunkFilepoolSize = 8;
// format chunkfilepool
optional uint32 chunkFilepoolFormatRate = 9;
};

message ChunkServerHeartbeatRequest {
Expand Down
16 changes: 16 additions & 0 deletions proto/topology.proto
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,21 @@ message GetCopySetsInChunkServerRequest {
optional uint32 port = 3;
}

message ChunkFormatStatus {
required string ip = 1;
required uint32 port = 2;
required uint32 chunkServerID = 3;
required uint32 formatRate = 4;
}

message ListChunkFormatStatusRequest {

}

message ListChunkFormatStatusResponse {
repeated ChunkFormatStatus chunkFormatStatus = 1;
}

message GetCopySetsInChunkServerResponse {
required sint32 statusCode = 1;
repeated common.CopysetInfo copysetInfos = 2;
Expand Down Expand Up @@ -595,4 +610,5 @@ service TopologyService {
rpc GetClusterInfo(GetClusterInfoRequest) returns (GetClusterInfoResponse);
rpc SetCopysetsAvailFlag(SetCopysetsAvailFlagRequest) returns (SetCopysetsAvailFlagResponse);
rpc ListUnAvailCopySets(ListUnAvailCopySetsRequest) returns (ListUnAvailCopySetsResponse);
rpc ListChunkFormatStatus(ListChunkFormatStatusRequest) returns (ListChunkFormatStatusResponse);
}
78 changes: 76 additions & 2 deletions src/chunkserver/chunkserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "src/chunkserver/raftsnapshot/curve_snapshot_storage.h"
#include "src/chunkserver/raftlog/curve_segment_log_storage.h"
#include "src/common/curve_version.h"
#include "src/common/bytes_convert.h"

using ::curve::fs::LocalFileSystem;
using ::curve::fs::LocalFileSystemOption;
Expand All @@ -65,6 +66,9 @@ DEFINE_string(raftSnapshotUri, "curve://./0/copysets", "raft snapshot uri");
DEFINE_string(raftLogUri, "curve://./0/copysets", "raft log uri");
DEFINE_string(recycleUri, "local://./0/recycler" , "recycle uri");
DEFINE_string(chunkFilePoolDir, "./0/", "chunk file pool location");
DEFINE_string(chunkFilePoolSize, "1GB", "format percent for chunkfillpool.");
DEFINE_uint32(chunkFormatThreadNum,
1, "number of threads while file pool formatting");
DEFINE_string(chunkFilePoolMetaPath,
"./chunkfilepool.meta", "chunk file pool meta path");
DEFINE_string(logPath, "./0/chunkserver.log-", "log file path");
Expand Down Expand Up @@ -479,8 +483,6 @@ void ChunkServer::Stop() {
brpc::AskToQuit();
}



void ChunkServer::InitChunkFilePoolOptions(
common::Configuration *conf, FilePoolOptions *chunkFilePoolOptions) {
LOG_IF(FATAL, !conf->GetUInt32Value("global.chunk_size",
Expand Down Expand Up @@ -513,13 +515,49 @@ void ChunkServer::InitChunkFilePoolOptions(
"chunkfilepool.meta_path", &metaUri));
::memcpy(
chunkFilePoolOptions->metaPath, metaUri.c_str(), metaUri.size());

std::string chunkFilePoolUri;
LOG_IF(FATAL, !conf->GetStringValue(
"chunkfilepool.chunk_file_pool_dir", &chunkFilePoolUri));

::memcpy(chunkFilePoolOptions->filePoolDir,
chunkFilePoolUri.c_str(),
chunkFilePoolUri.size());
std::string pool_size;
LOG_IF(FATAL, !conf->GetStringValue("chunkfilepool.chunk_file_pool_size", &pool_size)); // NOLINT
LOG_IF(FATAL, !curve::common::ToNumbericByte(pool_size,
&chunkFilePoolOptions->filePoolSize));
LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.chunk_file_pool_format_thread_num", // NOLINT
&chunkFilePoolOptions->formatThreadNum));
LOG_IF(FATAL, !conf->GetBoolValue("chunkfilepool.clean.enable",
&chunkFilePoolOptions->needClean));
LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.clean.bytes_per_write", // NOLINT
&chunkFilePoolOptions->bytesPerWrite));
LOG_IF(FATAL, !conf->GetUInt32Value("chunkfilepool.clean.throttle_iops",
&chunkFilePoolOptions->iops4clean));

std::string copysetUri;
LOG_IF(FATAL, !conf->GetStringValue("copyset.raft_snapshot_uri",
&copysetUri));
curve::common::UriParser::ParseUri(copysetUri,
&chunkFilePoolOptions->copysetDir);

std::string recycleUri;
LOG_IF(FATAL, !conf->GetStringValue("copyset.recycler_uri",
&recycleUri));
curve::common::UriParser::ParseUri(recycleUri,
&chunkFilePoolOptions->recycleDir);

bool useChunkFilePoolAsWalPool;
LOG_IF(FATAL, !conf->GetBoolValue(
"walfilepool.use_chunk_file_pool",
&useChunkFilePoolAsWalPool));

chunkFilePoolOptions->isAllocated = [=](const std::string& filename) {
return Trash::IsChunkOrSnapShotFile(filename)
|| (useChunkFilePoolAsWalPool && Trash::IsWALFile(filename));
};

if (0 == chunkFilePoolOptions->bytesPerWrite
|| chunkFilePoolOptions->bytesPerWrite > 1 * 1024 * 1024
|| 0 != chunkFilePoolOptions->bytesPerWrite % 4096) {
Expand Down Expand Up @@ -565,6 +603,28 @@ void ChunkServer::InitWalFilePoolOptions(
std::string metaUri;
LOG_IF(FATAL, !conf->GetStringValue(
"walfilepool.meta_path", &metaUri));
LOG_IF(FATAL, !conf->GetUInt64Value(
"walfilepool.wal_file_pool_size",
&walPoolOptions->filePoolSize));
LOG_IF(FATAL, !conf->GetUInt32Value(
"walfilepool.thread_num",
&walPoolOptions->formatThreadNum));

std::string copysetUri;
LOG_IF(FATAL, !conf->GetStringValue("copyset.raft_log_uri",
&copysetUri));
curve::common::UriParser::ParseUri(copysetUri,
&walPoolOptions->copysetDir);

std::string recycleUri;
LOG_IF(FATAL, !conf->GetStringValue("copyset.recycler_uri",
&recycleUri));
curve::common::UriParser::ParseUri(recycleUri,
&walPoolOptions->recycleDir);

walPoolOptions->isAllocated = [](const string& filename) {
return Trash::IsWALFile(filename);
};
::memcpy(
walPoolOptions->metaPath, metaUri.c_str(), metaUri.size());
}
Expand Down Expand Up @@ -833,6 +893,20 @@ void ChunkServer::LoadConfigFromCmdline(common::Configuration *conf) {
<< "chunkFilePoolDir must be set when run chunkserver in command.";
}

if (GetCommandLineFlagInfo("chunkFilePoolSize", &info)) {
conf->SetStringValue(
"chunkfilepool.chunk_file_pool_size", FLAGS_chunkFilePoolSize);
} else {
LOG(FATAL)
<< "chunkFilePoolSize must be set when run chunkserver in command.";
}

if (GetCommandLineFlagInfo("chunkFormatThreadNum", &info)) {
conf->SetUInt64Value(
"chunkfilepool.chunk_file_pool_format_thread_num",
FLAGS_chunkFormatThreadNum);
}

if (GetCommandLineFlagInfo("chunkFilePoolMetaPath", &info) &&
!info.is_default) {
conf->SetStringValue(
Expand Down
Loading

0 comments on commit 1b32b35

Please sign in to comment.