Skip to content

Commit

Permalink
Use Velox filesystem for SSD cache data file
Browse files Browse the repository at this point in the history
SSD cache currently uses native functions for file operations. This
needs to be switched to Velox filesystem, so more advanced testing can
be built by leveraging features like fault injections.
  • Loading branch information
zacw7 committed Oct 30, 2024
1 parent 11e4a8e commit 21b8c81
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 72 deletions.
153 changes: 86 additions & 67 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

#include "velox/common/caching/SsdFile.h"

#include <folly/Executor.h>
#include <folly/portability/SysUio.h>
#include "velox/common/base/AsyncSource.h"
#include "velox/common/base/Crc.h"
Expand Down Expand Up @@ -67,6 +66,28 @@ void disableCow(int32_t fd) {
#endif // linux
}

// TODO: Remove this function once we migrate all files to velox fs.
void disableFileCow(int32_t fd) {
#ifdef linux
int attr{0};
auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
attr |= FS_NOCOW_FL;
res = ioctl(fd, FS_IOC_SETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}",
res,
folly::errnoStr(errno));
#endif // linux
}

void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
if (entry.tinyData() != nullptr) {
iovecs.push_back({entry.tinyData(), static_cast<size_t>(entry.size())});
Expand Down Expand Up @@ -138,35 +159,21 @@ SsdFile::SsdFile(const Config& config)
checksumReadVerificationEnabled_(
config.checksumEnabled && config.checksumReadVerificationEnabled),
shardId_(config.shardId),
fs_(filesystems::getFileSystem(fileName_, nullptr)),
checkpointIntervalBytes_(config.checkpointIntervalBytes),
executor_(config.executor) {
process::TraceContext trace("SsdFile::SsdFile");
int32_t oDirect = 0;
#ifdef linux
oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0;
#endif // linux
fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR);
if (fd_ < 0) {
++stats_.openFileErrors;
}
// TODO: add fault tolerant handling for open file errors.
VELOX_CHECK_GE(
fd_,
0,
"Cannot open or create {}. Error: {}",
fileName_,
folly::errnoStr(errno));
filesystems::FileOptions fileOptions;
fileOptions.shouldThrowOnFileAlreadyExists = false;
fileOptions.bufferWrite = !FLAGS_ssd_odirect;
writeFile_ = fs_->openFileForWrite(fileName_, fileOptions);
readFile_ = fs_->openFileForRead(fileName_);

if (disableFileCow_) {
disableCow(fd_);
}

readFile_ = std::make_unique<LocalReadFile>(fd_);
const uint64_t size = lseek(fd_, 0, SEEK_END);
const uint64_t size = writeFile_->size();
numRegions_ = std::min<int32_t>(size / kRegionSize, maxRegions_);
fileSize_ = numRegions_ * kRegionSize;
if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) {
::ftruncate(fd_, fileSize_);
writeFile_->truncate(fileSize_);
}
// The existing regions in the file are writable.
writableRegions_.resize(numRegions_);
Expand All @@ -178,6 +185,10 @@ SsdFile::SsdFile(const Config& config)
if (checkpointEnabled()) {
initializeCheckpoint();
}

if (disableFileCow_) {
disableFileCow();
}
}

void SsdFile::pinRegion(uint64_t offset) {
Expand Down Expand Up @@ -324,19 +335,23 @@ bool SsdFile::growOrEvictLocked() {
process::TraceContext trace("SsdFile::growOrEvictLocked");
if (numRegions_ < maxRegions_) {
const auto newSize = (numRegions_ + 1) * kRegionSize;
const auto rc = ::ftruncate(fd_, newSize);
if (rc >= 0) {
try {
writeFile_->truncate(newSize);
fileSize_ = newSize;
writableRegions_.push_back(numRegions_);
regionSizes_[numRegions_] = 0;
erasedRegionSizes_[numRegions_] = 0;
++numRegions_;
VELOX_SSD_CACHE_LOG(INFO)
<< "Grow cache file " << fileName_ << " to " << numRegions_
<< " regions (max: " << maxRegions_ << ")";
return true;
} catch (const std::exception& e) {
++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << numRegions_
<< " regions. Error: " << e.what();
}

++stats_.growFileErrors;
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to grow cache file " << fileName_ << " to " << newSize;
}

const auto candidates =
Expand All @@ -349,7 +364,7 @@ bool SsdFile::growOrEvictLocked() {
logEviction(candidates);
clearRegionEntriesLocked(candidates);
stats_.regionsEvicted += candidates.size();
writableRegions_ = std::move(candidates);
writableRegions_ = candidates;
suspended_ = false;
return true;
}
Expand Down Expand Up @@ -467,20 +482,21 @@ void SsdFile::write(std::vector<CachePin>& pins) {
}

bool SsdFile::write(
uint64_t offset,
uint64_t length,
int64_t offset,
int64_t length,
const std::vector<iovec>& iovecs) {
const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset);
if (ret == length) {
try {
writeFile_->write(iovecs, offset, length);
return true;
} catch (const std::exception& e) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
}
VELOX_SSD_CACHE_LOG(ERROR)
<< "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_
<< ", size: " << iovecs.size() << ", offset: " << offset
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
++stats_.writeSsdErrors;
return false;
}

namespace {
Expand All @@ -497,8 +513,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) {
void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) {
process::TraceContext trace("SsdFile::verifyWrite");
auto testData = std::make_unique<char[]>(entry.size());
const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset());
VELOX_CHECK_EQ(rc, entry.size());
const auto rc =
readFile_->pread(ssdRun.offset(), entry.size(), testData.get());
VELOX_CHECK_EQ(rc.size(), entry.size());
if (entry.tinyData() != nullptr) {
if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) {
VELOX_FAIL("bad read back");
Expand Down Expand Up @@ -569,14 +586,16 @@ void SsdFile::clear() {

void SsdFile::testingDeleteFile() {
process::TraceContext trace("SsdFile::testingDeleteFile");
if (fd_) {
close(fd_);
fd_ = 0;
if (writeFile_) {
writeFile_->close();
writeFile_.reset();
}
auto rc = unlink(fileName_.c_str());
if (rc < 0) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Error deleting cache file " << fileName_ << " rc: " << rc;
try {
fs_->remove(fileName_);
} catch (const std::exception& e) {
VELOX_SSD_CACHE_LOG(ERROR) << "Failed to delete cache file " << fileName_
<< ", error code: " << errno
<< ", error string: " << folly::errnoStr(errno);
}
}

Expand Down Expand Up @@ -744,11 +763,10 @@ void SsdFile::checkpoint(bool force) {
// We schedule the potentially long fsync of the cache file on another
// thread of the cache write executor, if available. If there is none, we do
// the sync on this thread at the end.
auto fileSync = std::make_shared<AsyncSource<int>>(
[fd = fd_]() { return std::make_unique<int>(::fsync(fd)); });
if (executor_ != nullptr) {
executor_->add([fileSync]() { fileSync->prepare(); });
}
auto fileSync = std::make_shared<AsyncSource<int>>([this]() {
writeFile_->flush();
return std::make_unique<int>(0);
});

std::ofstream state;
const auto checkpointPath = getCheckpointFilePath();
Expand Down Expand Up @@ -803,8 +821,7 @@ void SsdFile::checkpoint(bool force) {

// NOTE: we need to ensure cache file data sync update completes before
// updating checkpoint file.
const auto fileSyncRc = fileSync->move();
checkRc(*fileSyncRc, "Sync of cache data file");
fileSync->move();

const auto endMarker = kCheckpointEndMarker;
state.write(asChar(&endMarker), sizeof(endMarker));
Expand Down Expand Up @@ -943,18 +960,20 @@ void SsdFile::maybeVerifyChecksum(
}
}

bool SsdFile::testingIsCowDisabled() const {
void SsdFile::disableFileCow() {
#ifdef linux
int attr{0};
const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
const std::unordered_map<std::string, std::string> attributes = {
{std::string(LocalWriteFile::Attributes::kNoCow), "true"}};
writeFile_->setAttributes(attributes);
#endif // linux
}

return (attr & FS_NOCOW_FL) == FS_NOCOW_FL;
bool SsdFile::testingIsCowDisabled() const {
#ifdef linux
const auto attributes = writeFile_->getAttributes();
const auto it =
attributes.find(std::string(LocalWriteFile::Attributes::kNoCow));
return it != attributes.end() && it->second == "true";
#else
return false;
#endif // linux
Expand Down
17 changes: 12 additions & 5 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "velox/common/caching/AsyncDataCache.h"
#include "velox/common/caching/SsdFileTracker.h"
#include "velox/common/file/File.h"
#include "velox/common/file/FileSystems.h"

DECLARE_bool(ssd_odirect);
DECLARE_bool(ssd_verify_write);
Expand Down Expand Up @@ -472,8 +473,7 @@ class SsdFile {

// Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write
// succeeds; otherwise, log the error and return false.
bool
write(uint64_t offset, uint64_t length, const std::vector<iovec>& iovecs);
bool write(int64_t offset, int64_t length, const std::vector<iovec>& iovecs);

// Synchronously logs that 'regions' are no longer valid in a possibly
// existing checkpoint.
Expand All @@ -499,6 +499,10 @@ class SsdFile {
const AsyncDataCacheEntry& entry,
const SsdRun& ssdRun);

// Disable 'copy on write'. Will throw if failed for any reason, including
// file system not supporting cow feature.
void disableFileCow();

// Returns true if checksum write is enabled for the given version.
static bool isChecksumEnabledOnCheckpointVersion(
const std::string& checkpointVersion) {
Expand Down Expand Up @@ -556,15 +560,18 @@ class SsdFile {
// Map of file number and offset to location in file.
folly::F14FastMap<FileCacheKey, SsdRun> entries_;

// File descriptor. 0 (stdin) means file not open.
int32_t fd_{0};
// File system.
std::shared_ptr<filesystems::FileSystem> fs_;

// Size of the backing file in bytes. Must be multiple of kRegionSize.
uint64_t fileSize_{0};

// ReadFile made from 'fd_'.
// ReadFile for cache data file.
std::unique_ptr<ReadFile> readFile_;

// WriteFile for cache data file.
std::unique_ptr<WriteFile> writeFile_;

// Counters.
SsdCacheStats stats_;

Expand Down
2 changes: 2 additions & 0 deletions velox/common/caching/tests/SsdFileTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/common/base/tests/GTestUtils.h"
#include "velox/common/caching/FileIds.h"
#include "velox/common/caching/SsdCache.h"
#include "velox/common/file/FileSystems.h"
#include "velox/common/memory/Memory.h"
#include "velox/exec/tests/utils/TempDirectoryPath.h"

Expand Down Expand Up @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test {
static constexpr int64_t kMB = 1 << 20;

void SetUp() override {
filesystems::registerLocalFileSystem();
memory::MemoryManager::testingSetInstance({});
}

Expand Down

0 comments on commit 21b8c81

Please sign in to comment.