diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 7f500973917a1..68859306e3383 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -16,7 +16,6 @@ #include "velox/common/caching/SsdFile.h" -#include #include #include "velox/common/base/AsyncSource.h" #include "velox/common/base/Crc.h" @@ -67,6 +66,28 @@ void disableCow(int32_t fd) { #endif // linux } +// TODO: Remove this function once we migrate all files to velox fs. +void disableFileCow(int32_t fd) { +#ifdef linux + int attr{0}; + auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); + VELOX_CHECK_EQ( + 0, + res, + "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", + res, + folly::errnoStr(errno)); + attr |= FS_NOCOW_FL; + res = ioctl(fd, FS_IOC_SETFLAGS, &attr); + VELOX_CHECK_EQ( + 0, + res, + "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", + res, + folly::errnoStr(errno)); +#endif // linux +} + void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -138,35 +159,21 @@ SsdFile::SsdFile(const Config& config) checksumReadVerificationEnabled_( config.checksumEnabled && config.checksumReadVerificationEnabled), shardId_(config.shardId), + fs_(filesystems::getFileSystem(fileName_, nullptr)), checkpointIntervalBytes_(config.checkpointIntervalBytes), executor_(config.executor) { process::TraceContext trace("SsdFile::SsdFile"); - int32_t oDirect = 0; -#ifdef linux - oDirect = FLAGS_ssd_odirect ? O_DIRECT : 0; -#endif // linux - fd_ = open(fileName_.c_str(), O_CREAT | O_RDWR | oDirect, S_IRUSR | S_IWUSR); - if (fd_ < 0) { - ++stats_.openFileErrors; - } - // TODO: add fault tolerant handling for open file errors. - VELOX_CHECK_GE( - fd_, - 0, - "Cannot open or create {}. Error: {}", - fileName_, - folly::errnoStr(errno)); + filesystems::FileOptions fileOptions; + fileOptions.shouldThrowOnFileAlreadyExists = false; + fileOptions.bufferWrite = !FLAGS_ssd_odirect; + writeFile_ = fs_->openFileForWrite(fileName_, fileOptions); + readFile_ = fs_->openFileForRead(fileName_); - if (disableFileCow_) { - disableCow(fd_); - } - - readFile_ = std::make_unique(fd_); - const uint64_t size = lseek(fd_, 0, SEEK_END); + const uint64_t size = writeFile_->size(); numRegions_ = std::min(size / kRegionSize, maxRegions_); fileSize_ = numRegions_ * kRegionSize; if ((size % kRegionSize > 0) || (size > numRegions_ * kRegionSize)) { - ::ftruncate(fd_, fileSize_); + writeFile_->truncate(fileSize_); } // The existing regions in the file are writable. writableRegions_.resize(numRegions_); @@ -178,6 +185,10 @@ SsdFile::SsdFile(const Config& config) if (checkpointEnabled()) { initializeCheckpoint(); } + + if (disableFileCow_) { + disableFileCow(); + } } void SsdFile::pinRegion(uint64_t offset) { @@ -324,19 +335,23 @@ bool SsdFile::growOrEvictLocked() { process::TraceContext trace("SsdFile::growOrEvictLocked"); if (numRegions_ < maxRegions_) { const auto newSize = (numRegions_ + 1) * kRegionSize; - const auto rc = ::ftruncate(fd_, newSize); - if (rc >= 0) { + try { + writeFile_->truncate(newSize); fileSize_ = newSize; writableRegions_.push_back(numRegions_); regionSizes_[numRegions_] = 0; erasedRegionSizes_[numRegions_] = 0; ++numRegions_; + VELOX_SSD_CACHE_LOG(INFO) + << "Grow cache file " << fileName_ << " to " << numRegions_ + << " regions (max: " << maxRegions_ << ")"; return true; + } catch (const std::exception& e) { + ++stats_.growFileErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to grow cache file " << fileName_ << " to " << numRegions_ + << " regions. Error: " << e.what(); } - - ++stats_.growFileErrors; - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to grow cache file " << fileName_ << " to " << newSize; } const auto candidates = @@ -349,7 +364,7 @@ bool SsdFile::growOrEvictLocked() { logEviction(candidates); clearRegionEntriesLocked(candidates); stats_.regionsEvicted += candidates.size(); - writableRegions_ = std::move(candidates); + writableRegions_ = candidates; suspended_ = false; return true; } @@ -467,20 +482,21 @@ void SsdFile::write(std::vector& pins) { } bool SsdFile::write( - uint64_t offset, - uint64_t length, + int64_t offset, + int64_t length, const std::vector& iovecs) { - const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (ret == length) { + try { + writeFile_->write(iovecs, offset, length); return true; + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ + << ", size: " << iovecs.size() << ", offset: " << offset + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + return false; } - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_ - << ", size: " << iovecs.size() << ", offset: " << offset - << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - return false; } namespace { @@ -497,8 +513,9 @@ int32_t indexOfFirstMismatch(char* x, char* y, int n) { void SsdFile::verifyWrite(AsyncDataCacheEntry& entry, SsdRun ssdRun) { process::TraceContext trace("SsdFile::verifyWrite"); auto testData = std::make_unique(entry.size()); - const auto rc = ::pread(fd_, testData.get(), entry.size(), ssdRun.offset()); - VELOX_CHECK_EQ(rc, entry.size()); + const auto rc = + readFile_->pread(ssdRun.offset(), entry.size(), testData.get()); + VELOX_CHECK_EQ(rc.size(), entry.size()); if (entry.tinyData() != nullptr) { if (::memcmp(testData.get(), entry.tinyData(), entry.size()) != 0) { VELOX_FAIL("bad read back"); @@ -569,14 +586,16 @@ void SsdFile::clear() { void SsdFile::testingDeleteFile() { process::TraceContext trace("SsdFile::testingDeleteFile"); - if (fd_) { - close(fd_); - fd_ = 0; + if (writeFile_) { + writeFile_->close(); + writeFile_.reset(); } - auto rc = unlink(fileName_.c_str()); - if (rc < 0) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Error deleting cache file " << fileName_ << " rc: " << rc; + try { + fs_->remove(fileName_); + } catch (const std::exception& e) { + VELOX_SSD_CACHE_LOG(ERROR) << "Failed to delete cache file " << fileName_ + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); } } @@ -744,11 +763,10 @@ void SsdFile::checkpoint(bool force) { // We schedule the potentially long fsync of the cache file on another // thread of the cache write executor, if available. If there is none, we do // the sync on this thread at the end. - auto fileSync = std::make_shared>( - [fd = fd_]() { return std::make_unique(::fsync(fd)); }); - if (executor_ != nullptr) { - executor_->add([fileSync]() { fileSync->prepare(); }); - } + auto fileSync = std::make_shared>([this]() { + writeFile_->flush(); + return std::make_unique(0); + }); std::ofstream state; const auto checkpointPath = getCheckpointFilePath(); @@ -803,8 +821,7 @@ void SsdFile::checkpoint(bool force) { // NOTE: we need to ensure cache file data sync update completes before // updating checkpoint file. - const auto fileSyncRc = fileSync->move(); - checkRc(*fileSyncRc, "Sync of cache data file"); + fileSync->move(); const auto endMarker = kCheckpointEndMarker; state.write(asChar(&endMarker), sizeof(endMarker)); @@ -943,18 +960,20 @@ void SsdFile::maybeVerifyChecksum( } } -bool SsdFile::testingIsCowDisabled() const { +void SsdFile::disableFileCow() { #ifdef linux - int attr{0}; - const auto res = ioctl(fd_, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); + const std::unordered_map attributes = { + {std::string(LocalWriteFile::Attributes::kNoCow), "true"}}; + writeFile_->setAttributes(attributes); +#endif // linux +} - return (attr & FS_NOCOW_FL) == FS_NOCOW_FL; +bool SsdFile::testingIsCowDisabled() const { +#ifdef linux + const auto attributes = writeFile_->getAttributes(); + const auto it = + attributes.find(std::string(LocalWriteFile::Attributes::kNoCow)); + return it != attributes.end() && it->second == "true"; #else return false; #endif // linux diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index 434d41c342b9e..31300a274a781 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -22,6 +22,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFileTracker.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileSystems.h" DECLARE_bool(ssd_odirect); DECLARE_bool(ssd_verify_write); @@ -472,8 +473,7 @@ class SsdFile { // Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write // succeeds; otherwise, log the error and return false. - bool - write(uint64_t offset, uint64_t length, const std::vector& iovecs); + bool write(int64_t offset, int64_t length, const std::vector& iovecs); // Synchronously logs that 'regions' are no longer valid in a possibly // existing checkpoint. @@ -499,6 +499,10 @@ class SsdFile { const AsyncDataCacheEntry& entry, const SsdRun& ssdRun); + // Disable 'copy on write'. Will throw if failed for any reason, including + // file system not supporting cow feature. + void disableFileCow(); + // Returns true if checksum write is enabled for the given version. static bool isChecksumEnabledOnCheckpointVersion( const std::string& checkpointVersion) { @@ -556,15 +560,18 @@ class SsdFile { // Map of file number and offset to location in file. folly::F14FastMap entries_; - // File descriptor. 0 (stdin) means file not open. - int32_t fd_{0}; + // File system. + std::shared_ptr fs_; // Size of the backing file in bytes. Must be multiple of kRegionSize. uint64_t fileSize_{0}; - // ReadFile made from 'fd_'. + // ReadFile for cache data file. std::unique_ptr readFile_; + // WriteFile for cache data file. + std::unique_ptr writeFile_; + // Counters. SsdCacheStats stats_; diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index fd2835596ad34..0b0bef1ca8425 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -17,6 +17,7 @@ #include "velox/common/base/tests/GTestUtils.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/file/FileSystems.h" #include "velox/common/memory/Memory.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" @@ -45,6 +46,7 @@ class SsdFileTest : public testing::Test { static constexpr int64_t kMB = 1 << 20; void SetUp() override { + filesystems::registerLocalFileSystem(); memory::MemoryManager::testingSetInstance({}); }