Skip to content

Commit

Permalink
[feat] curvefs: change s3 info
Browse files Browse the repository at this point in the history
  • Loading branch information
ken90242 committed Jan 21, 2024
1 parent 88a6091 commit 6af52d7
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 0 deletions.
10 changes: 10 additions & 0 deletions curvefs/proto/mds.proto
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,15 @@ message GetFsInfoResponse {
optional FsInfo fsInfo = 2;
}

message UpdateS3InfoRequest {
required common.S3Info s3Info = 1;
}

message UpdateS3InfoResponse {
required FSStatusCode statusCode = 1;
optional FsInfo fsInfo = 2;
}

message CreateFsRequest {
required string fsName = 1;
required uint64 blockSize = 2;
Expand Down Expand Up @@ -255,6 +264,7 @@ message TsoResponse {
service MdsService {
// fs interface
rpc CreateFs(CreateFsRequest) returns (CreateFsResponse);
rpc UpdateS3Info(UpdateS3InfoRequest) returns (UpdateS3InfoResponse);
rpc MountFs(MountFsRequest) returns (MountFsResponse);
rpc UmountFs(UmountFsRequest) returns (UmountFsResponse);
// TODO(chengyi01): move to GetFssInfo
Expand Down
44 changes: 44 additions & 0 deletions curvefs/src/client/curve_fuse_op.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ using ::curvefs::client::filesystem::EntryOut;
using ::curvefs::client::filesystem::FileOut;
using ::curvefs::client::filesystem::IsListWarmupXAttr;
using ::curvefs::client::filesystem::IsWarmupXAttr;
using ::curvefs::client::filesystem::IsS3ConfigXAttr;
using ::curvefs::client::filesystem::StrAttr;
using ::curvefs::client::filesystem::StrEntry;
using ::curvefs::client::filesystem::StrMode;
Expand Down Expand Up @@ -423,6 +424,47 @@ FuseClient* Client() {
return g_ClientInstance;
}


void UpdateS3Config(fuse_req_t req,
fuse_ino_t ino,
const char* name,
const char* value,
size_t size) {

if (g_ClientInstance->GetFsInfo()->fstype() != FSType::TYPE_S3) {
LOG(ERROR) << "updating s3 config only works for s3";
return EOPNOTSUPP;
}

const std::string fsName = g_ClientInstance->GetFsInfo()->fsname();
const curvefs::common::S3Info oldS3Info = g_ClientInstance->GetFsInfo()->detail()->s3Info();
const curvefs::common::S3Info newS3Info(oldS3Info);

Json::CharReaderBuilder builder;
Json::CharReaderBuilder::strictMode(&builder.settings_);
std::unique_ptr<Json::CharReader> reader(builder.newCharReader());
Json::Value rootNode;
JSONCPP_STRING errormsg;
if (reader->parse(value, value + strlen(value), &rootNode, &errormsg)) {
newS3Info.ak = rootNode["ak"].asString();
newS3Info.sk = rootNode["sk"].asString();
newS3Info.bucket = rootNode["bucket"].asString();
newS3Info.endpoint = rootNode["endpoint"].asString();

FsInfo fsInfo;
FSStatusCode statusCode = g_ClientInstance->mdsClient_->UpdateS3Info(fsName, newS3Info, &fsInfo);
if (statusCode == FSStatusCode::OK) {
g_ClientInstance->SetFsInfo(fsInfo);
}
} else {
LOG(ERROR) << "Error parsing the input value ' "
<< value
<< " ': " << errormsg;
}

fuse_reply_err(req, statusCode);
}

void TriggerWarmup(fuse_req_t req,
fuse_ino_t ino,
const char* name,
Expand Down Expand Up @@ -921,6 +963,8 @@ void FuseOpSetXattr(fuse_req_t req,

if (IsWarmupXAttr(name)) {
return TriggerWarmup(req, ino, name, value, size);
} else if (IsS3ConfigXAttr(name)) {
return UpdateS3Config(req, ino, name, value);
}
rc = client->FuseOpSetXattr(req, ino, name, value, size, flags);
return fs->ReplyError(req, rc);
Expand Down
1 change: 1 addition & 0 deletions curvefs/src/client/curve_fuse_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <fcntl.h>
#include <unistd.h>
#include <assert.h>
#include <json/json.h>

#include "curvefs/src/client/fuse_common.h"

Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/client/filesystem/xattr.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const char XATTR_DIR_RFBYTES[] = "curve.dir.rfbytes";
const char XATTR_DIR_PREFIX[] = "curve.dir";
const char XATTR_WARMUP_OP[] = "curvefs.warmup.op";
const char XATTR_WARMUP_OP_LIST[] = "curvefs.warmup.op.list";
const char XATTR_S3_CONFIG[] = "curvefs.s3.config";

inline bool IsSpecialXAttr(const std::string& key) {
static std::map<std::string, bool> xattrs {
Expand All @@ -69,6 +70,10 @@ inline bool IsListWarmupXAttr(const std::string& key) {
return key == XATTR_WARMUP_OP_LIST;
}

inline bool IsS3ConfigXAttr(const std::string& key) {
return key == XATTR_S3_CONFIG;
}

} // namespace filesystem
} // namespace client
} // namespace curvefs
Expand Down
11 changes: 11 additions & 0 deletions curvefs/src/client/rpcclient/base_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ namespace rpcclient {
using ::curvefs::mds::space::SpaceService_Stub;


void MDSBaseClient::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info,
UpdateS3InfoResponse* response, brpc::Controller* cntl,
brpc::Channel* channel) {
UpdateS3InfoRequest request;
request.set_fsname(fsName);
request.set_s3Info(s3Info);
curvefs::mds::MdsService_Stub stub(channel);
stub.UpdateS3Info(cntl, &request, response, nullptr);
}

void MDSBaseClient::MountFs(const std::string& fsName,
const Mountpoint& mountPt,
MountFsResponse* response, brpc::Controller* cntl,
Expand Down
4 changes: 4 additions & 0 deletions curvefs/src/client/rpcclient/base_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,10 @@ class MDSBaseClient {
public:
virtual ~MDSBaseClient() = default;

virtual void UpdateS3Info(const std::string& fsName, const curvefs::common::S3Info& s3Info,
UpdateS3InfoResponse* response, brpc::Controller* cntl,
brpc::Channel* channel);

virtual void MountFs(const std::string& fsName, const Mountpoint& mountPt,
MountFsResponse* response, brpc::Controller* cntl,
brpc::Channel* channel);
Expand Down
31 changes: 31 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,37 @@ MdsClientImpl::Init(const ::curve::client::MetaServerOption &mdsOpt,
[&](int addrindex, uint64_t rpctimeoutMS, brpc::Channel *channel, \
brpc::Controller *cntl) -> int

FSStatusCode MdsClientImpl::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) {
auto task = RPCTask {
(void)addrindex;
(void)rpctimeoutMS;
mdsClientMetric_.mountFs.qps.count << 1;
LatencyUpdater updater(&mdsClientMetric_.mountFs.latency);
UpdateS3InfoResponse response;
mdsbasecli_->UpdateS3Info(fsName, s3Info, &response, cntl, channel);
if (cntl->Failed()) {
mdsClientMetric_.mountFs.eps.count << 1;
LOG(WARNING) << "MountFs Failed, errorcode = " << cntl->ErrorCode()
<< ", error content:" << cntl->ErrorText()
<< ", log id = " << cntl->log_id();
return -cntl->ErrorCode();
}

FSStatusCode ret = response.statuscode();
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info: fsname = " << fsName
<< ", mountPt = " << mountPt.ShortDebugString()
<< ", errcode = " << ret
<< ", errmsg = " << FSStatusCode_Name(ret);
} else if (response.has_fsinfo()) {
fsInfo->CopyFrom(response.fsinfo());
}
return ret;
};
return ReturnError(rpcexcutor_.DoRPCTask(task, mdsOpt_.mdsMaxRetryMS));
}

FSStatusCode MdsClientImpl::MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) {
auto task = RPCTask {
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/client/rpcclient/mds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class MdsClient {
virtual FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) = 0;

virtual FSStatusCode UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) = 0;

virtual FSStatusCode MountFs(const std::string& fsName,
const Mountpoint& mountPt, FsInfo* fsInfo) = 0;

Expand Down Expand Up @@ -171,6 +174,9 @@ class MdsClientImpl : public MdsClient {
FSStatusCode Init(const ::curve::client::MetaServerOption &mdsOpt,
MDSBaseClient *baseclient) override;

FSStatusCode UpdateS3Info(const std::string& fsName, const curvefs::common::S3Info& s3Info,
FsInfo* fsInfo) override;

FSStatusCode MountFs(const std::string& fsName, const Mountpoint& mountPt,
FsInfo* fsInfo) override;

Expand Down
5 changes: 5 additions & 0 deletions curvefs/src/mds/fs_info_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ class FsInfoWrapper {
fsInfo_.set_status(status);
}

void SetS3Info(S3Info s3info) {
const FsDetail fsdetail_ = GetFsDetail();
fsdetail_._set_s3info(s3info)
}

void SetFsName(const std::string& name) {
fsInfo_.set_fsname(name);
}
Expand Down
33 changes: 33 additions & 0 deletions curvefs/src/mds/fs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,39 @@ FSStatusCode FsManager::DeleteFs(const std::string& fsName) {
return FSStatusCode::OK;
}

FSStatusCode FsManager::UpdateS3Info(const std::string& fsName,
const curvefs::common::S3Info& s3Info, FsInfo* fsInfo) {
NameLockGuard lock(nameLock_, fsName);

// query fs
FsInfoWrapper wrapper;
FSStatusCode ret = fsStorage_->Get(fsName, &wrapper);
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info fail, get fs fail, fsName = " << fsName
<< ", errCode = " << FSStatusCode_Name(ret);
return ret;
}

// insert mountpoint
wrapper.SetS3Info(s3Info);
// for persistence consider
ret = fsStorage_->Update(wrapper);
if (ret != FSStatusCode::OK) {
LOG(WARNING) << "UpdateS3Info fail, update fs fail, fsName = " << fsName
<< ", mountpoint = " << mountpoint.ShortDebugString()
<< ", errCode = " << FSStatusCode_Name(ret);
return ret;
}
// update client alive time
UpdateClientAliveTime(mountpoint, fsName, false);

// convert fs info
FsMetric::GetInstance().OnMount(wrapper.GetFsName(), mountpoint);
*fsInfo = std::move(wrapper).ProtoFsInfo();

return FSStatusCode::OK;
}

FSStatusCode FsManager::MountFs(const std::string& fsName,
const Mountpoint& mountpoint, FsInfo* fsInfo) {
NameLockGuard lock(nameLock_, fsName);
Expand Down
6 changes: 6 additions & 0 deletions curvefs/src/mds/fs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ class FsManager {
*/
FSStatusCode DeleteFs(const std::string& fsName);

/**
*
*/
FSStatusCode UpdateS3Info(const std::string& fsName,
const Mountpoint& mountpoint, FsInfo* fsInfo);

/**
* @brief Mount fs, mount point can not repeat. It will increate
* mountNum.
Expand Down
29 changes: 29 additions & 0 deletions curvefs/src/mds/mds_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,35 @@ void MdsServiceImpl::CreateFs(::google::protobuf::RpcController* controller,
<< ", capacity = " << request->capacity();
}

void MdsServiceImpl::UpdateS3Info(::google::protobuf::RpcController* controller,
const UpdateS3InfoRequest* request, UpdateS3InfoResponse* response,
::google::protobuf::Closure* done) {
(void)controller;
brpc::ClosureGuard doneGuard(done);
const std::string &fsName = request->fsname();
const curvefs::common::S3Info &s3Info = request->s3info();

LOG(INFO) << "MountFs request, fsName = " << fsName
<< ", mountPoint = " << mount.ShortDebugString();

FSStatusCode status =
fsManager_->UpdateS3Info(fsName, s3Info, response->mutable_fsinfo());

if (status != FSStatusCode::OK) {
response->clear_fsinfo();
response->set_statuscode(status);
LOG(ERROR) << "UpdateS3Info fail, fsName = " << fsName
<< ", mountPoint = " << mount.ShortDebugString()
<< ", errCode = " << FSStatusCode_Name(status);
return;
}

response->set_statuscode(FSStatusCode::OK);
LOG(INFO) << "UpdateS3Info success, fsName = " << fsName
<< ", mountPoint = " << mount.ShortDebugString()
<< ", mps: " << response->mutable_fsinfo()->mountpoints_size();
}

void MdsServiceImpl::MountFs(::google::protobuf::RpcController* controller,
const MountFsRequest* request, MountFsResponse* response,
::google::protobuf::Closure* done) {
Expand Down

0 comments on commit 6af52d7

Please sign in to comment.