Skip to content

Commit

Permalink
curvefs/client: fix the delayed inode not being retrieved in nocto sc…
Browse files Browse the repository at this point in the history
…enario.

Signed-off-by: Wine93 <[email protected]>
  • Loading branch information
Wine93 committed Nov 20, 2023
1 parent d890424 commit cac262c
Show file tree
Hide file tree
Showing 11 changed files with 519 additions and 40 deletions.
124 changes: 105 additions & 19 deletions curvefs/src/client/filesystem/defer_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,94 @@ namespace curvefs {
namespace client {
namespace filesystem {

DeferSync::DeferSync(DeferSyncOption option)
: option_(option),
using ::curve::common::LockGuard;
using ::curve::common::ReadLockGuard;
using ::curve::common::WriteLockGuard;
using ::curvefs::client::filesystem::AttrCtime;

#define RETURN_FALSE_IF_CTO_ON() \
do { \
if (cto_) { \
return false; \
} \
} while (0)

DeferInodes::DeferInodes(bool cto)
: cto_(cto),
rwlock_(),
inodes_() {}

bool DeferInodes::Add(const std::shared_ptr<InodeWrapper>& inode) {
RETURN_FALSE_IF_CTO_ON();
WriteLockGuard lk(rwlock_);
Ino ino = inode->GetInodeId();
auto ret = inodes_.emplace(ino, inode);
auto iter = ret.first;
bool yes = ret.second;
if (!yes) { // already exists
iter->second = inode;
}
return true;
}

bool DeferInodes::Get(Ino ino, std::shared_ptr<InodeWrapper>* inode) {
RETURN_FALSE_IF_CTO_ON();
ReadLockGuard lk(rwlock_);
auto iter = inodes_.find(ino);
if (iter == inodes_.end()) {
return false;
}
*inode = iter->second;
return true;
}

bool DeferInodes::Remove(const std::shared_ptr<InodeWrapper>& inode) {
RETURN_FALSE_IF_CTO_ON();
WriteLockGuard lk(rwlock_);
InodeAttr attr;
inode->GetInodeAttrLocked(&attr);
auto iter = inodes_.find(attr.inodeid());
if (iter == inodes_.end()) {
return false;
}

InodeAttr defered;
iter->second->GetInodeAttrLocked(&defered);
if (AttrCtime(attr) < AttrCtime(defered)) {
// it means the old defered inode already replaced by the lastest one,
// so we can't remove it before it synced yet.
return false;
}
inodes_.erase(iter);
return true;
}

size_t DeferInodes::Size() {
ReadLockGuard lk(rwlock_);
return inodes_.size();
}

SyncInodeClosure::SyncInodeClosure(const std::shared_ptr<DeferInodes>& inodes,
const std::shared_ptr<InodeWrapper>& inode)
: inodes_(inodes), inode_(inode) {}

void SyncInodeClosure::Run() {
std::unique_ptr<SyncInodeClosure> self_guard(this);
MetaStatusCode rc = GetStatusCode();
if (rc == MetaStatusCode::OK || rc == MetaStatusCode::NOT_FOUND) {
inodes_->Remove(inode_);
}
}

DeferSync::DeferSync(bool cto, DeferSyncOption option)
: cto_(cto),
option_(option),
mutex_(),
running_(false),
thread_(),
sleeper_(),
inodes_() {
}
pending_(),
inodes_(std::make_shared<DeferInodes>(cto)) {}

void DeferSync::Start() {
if (!running_.exchange(true)) {
Expand All @@ -55,20 +135,32 @@ void DeferSync::Stop() {
}
}

SyncInodeClosure* DeferSync::NewSyncInodeClosure(
const std::shared_ptr<InodeWrapper>& inode) {
// NOTE: we only store the defer inodes in nocto scenario,
// which means we don't need to remove the inode from defer inodes
// even if the inode already synced done in cto scenario.
if (cto_) {
return nullptr;
}
return new SyncInodeClosure(inodes_, inode);
}

void DeferSync::SyncTask() {
std::vector<std::shared_ptr<InodeWrapper>> inodes;
std::vector<std::shared_ptr<InodeWrapper>> syncing;
for ( ;; ) {
bool running = sleeper_.wait_for(std::chrono::seconds(option_.delay));

{
LockGuard lk(mutex_);
inodes.swap(inodes_);
syncing.swap(pending_);
}
for (const auto& inode : inodes) {
for (const auto& inode : syncing) {
auto closure = NewSyncInodeClosure(inode);
UniqueLock lk(inode->GetUniqueLock());
inode->Async(nullptr, true);
inode->Async(closure, true);
}
inodes.clear();
syncing.clear();

if (!running) {
break;
Expand All @@ -78,18 +170,12 @@ void DeferSync::SyncTask() {

void DeferSync::Push(const std::shared_ptr<InodeWrapper>& inode) {
LockGuard lk(mutex_);
inodes_.emplace_back(inode);
pending_.emplace_back(inode);
inodes_->Add(inode);
}

bool DeferSync::IsDefered(Ino ino, InodeAttr* attr) {
LockGuard lk(mutex_);
for (const auto& inode : inodes_) {
if (inode->GetInodeId() == ino) {
inode->GetInodeAttr(attr);
return true;
}
}
return false;
bool DeferSync::IsDefered(Ino ino, std::shared_ptr<InodeWrapper>* inode) {
return inodes_->Get(ino, inode);
}

} // namespace filesystem
Expand Down
53 changes: 47 additions & 6 deletions curvefs/src/client/filesystem/defer_sync.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,42 +27,83 @@
#include <vector>
#include <memory>

#include "absl/container/btree_map.h"
#include "src/common/interruptible_sleeper.h"
#include "curvefs/src/client/common/config.h"
#include "curvefs/src/client/rpcclient/task_excutor.h"
#include "curvefs/src/client/filesystem/meta.h"

namespace curvefs {
namespace client {
namespace filesystem {

using ::curvefs::client::common::DeferSyncOption;

using ::curve::common::RWLock;
using ::curve::common::Mutex;
using ::curve::common::LockGuard;
using ::curve::common::InterruptibleSleeper;
using ::curvefs::client::common::DeferSyncOption;
using ::curvefs::client::rpcclient::MetaServerClientDone;

// NOTE: we only store the defer inodes in nocto scenario.
class DeferInodes {
public:
explicit DeferInodes(bool cto);

bool Add(const std::shared_ptr<InodeWrapper>& inode);

bool Get(Ino ino, std::shared_ptr<InodeWrapper>* inode);

bool Remove(const std::shared_ptr<InodeWrapper>& inode);

size_t Size();

private:
bool cto_;
RWLock rwlock_;
absl::btree_map<Ino, std::shared_ptr<InodeWrapper>> inodes_;
};

class SyncInodeClosure : public MetaServerClientDone {
public:
explicit SyncInodeClosure(const std::shared_ptr<DeferInodes>& inodes,
const std::shared_ptr<InodeWrapper>& inode);

void Run() override;

private:
std::shared_ptr<DeferInodes> inodes_;
std::shared_ptr<InodeWrapper> inode_;
};

class DeferSync {
public:
explicit DeferSync(DeferSyncOption option);
explicit DeferSync(bool cto, DeferSyncOption option);

void Start();

void Stop();

void Push(const std::shared_ptr<InodeWrapper>& inode);

bool IsDefered(Ino ino, InodeAttr* attr);
bool IsDefered(Ino ino, std::shared_ptr<InodeWrapper>* inode);

private:
SyncInodeClosure* NewSyncInodeClosure(
const std::shared_ptr<InodeWrapper>& inode);

void SyncTask();

private:
friend class SyncInodeClosure;

private:
bool cto_;
DeferSyncOption option_;
Mutex mutex_;
std::atomic<bool> running_;
std::thread thread_;
InterruptibleSleeper sleeper_;
std::vector<std::shared_ptr<InodeWrapper>> inodes_;
std::vector<std::shared_ptr<InodeWrapper>> pending_;
std::shared_ptr<DeferInodes> inodes_;
};

} // namespace filesystem
Expand Down
10 changes: 3 additions & 7 deletions curvefs/src/client/filesystem/filesystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ namespace filesystem {

FileSystem::FileSystem(FileSystemOption option, ExternalMember member)
: option_(option), member(member) {
deferSync_ = std::make_shared<DeferSync>(option.deferSyncOption);
deferSync_ = std::make_shared<DeferSync>(option.cto,
option.deferSyncOption);
negative_ = std::make_shared<LookupCache>(option.lookupCacheOption);
dirCache_ = std::make_shared<DirCache>(option.dirCacheOption);
openFiles_ = std::make_shared<OpenFiles>(option_.openFilesOption,
Expand Down Expand Up @@ -257,11 +258,6 @@ CURVEFS_ERROR FileSystem::Lookup(Ino parent,

CURVEFS_ERROR FileSystem::GetAttr(Ino ino, AttrOut* attrOut) {
InodeAttr attr;
if (!option_.cto && deferSync_->IsDefered(ino, &attr)) {
*attrOut = AttrOut(attr);
return CURVEFS_ERROR::OK;
}

auto rc = rpc_->GetAttr(ino, &attr);
if (rc == CURVEFS_ERROR::OK) {
*attrOut = AttrOut(attr);
Expand Down Expand Up @@ -319,7 +315,7 @@ CURVEFS_ERROR FileSystem::Open(Ino ino, FileInfo* fi) {
bool yes = openFiles_->IsOpened(ino, &inode);
if (yes) {
openFiles_->Open(ino, inode);
// fi->keep_cache = 1;
// fi->keep_cache = 1; // FIXME(Wine93): let it works.
return CURVEFS_ERROR::OK;
}

Expand Down
Loading

0 comments on commit cac262c

Please sign in to comment.