From ac134400b5356c5ba3f19facee37884aa020afdc Mon Sep 17 00:00:00 2001 From: Zac Wen Date: Thu, 12 Dec 2024 11:21:43 -0800 Subject: [PATCH] feat: Use Velox fs for ssd cache checkpoint file (#11783) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/11783 Switch the ssd cache checkpoint file to use Velox filesystem for file r/w operations, so that more advanced testing can be built by leveraging features like fault injections. Reviewed By: xiaoxmeng Differential Revision: D66892136 fbshipit-source-id: d4da2df1f5da976b0a71c4a1a087a2c0ba569328 --- velox/common/caching/SsdFile.cpp | 295 ++++++++++-------- velox/common/caching/SsdFile.h | 63 +++- .../caching/tests/AsyncDataCacheTest.cpp | 14 + velox/common/caching/tests/SsdFileTest.cpp | 10 +- velox/common/file/File.cpp | 2 + velox/common/memory/Memory.cpp | 1 + velox/common/memory/Memory.h | 6 + .../common/memory/tests/MemoryManagerTest.cpp | 60 ++-- 8 files changed, 287 insertions(+), 164 deletions(-) diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index afafe78ebab5..a876a091e3d5 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -22,6 +22,7 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/memory/Memory.h" #include "velox/common/process/TraceContext.h" #include @@ -44,31 +45,6 @@ namespace facebook::velox::cache { namespace { -// TODO: Remove this function once we migrate all files to velox fs. -// -// Disable 'copy on write' on the given file. Will throw if failed for any -// reason, including file system not supporting cow feature. -void disableCow(int32_t fd) { -#ifdef linux - int attr{0}; - auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); - attr |= FS_NOCOW_FL; - res = ioctl(fd, FS_IOC_SETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", - res, - folly::errnoStr(errno)); -#endif // linux -} - void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -664,12 +640,9 @@ void SsdFile::deleteCheckpoint(bool keepLog) { if (evictLogWriteFile_ != nullptr) { try { if (keepLog) { - evictLogWriteFile_->truncate(0); - evictLogWriteFile_->flush(); + truncateEvictLogFile(); } else { - evictLogWriteFile_->close(); - fs_->remove(getEvictLogFilePath()); - evictLogWriteFile_.reset(); + deleteEvictLogFile(); } } catch (const std::exception& e) { ++stats_.deleteCheckpointErrors; @@ -677,14 +650,45 @@ void SsdFile::deleteCheckpoint(bool keepLog) { } } - const auto checkpointPath = getCheckpointFilePath(); - const auto checkpointRc = ::unlink(checkpointPath.c_str()); - if (checkpointRc != 0) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Error in deleting checkpoint: " << checkpointRc; + if (checkpointWriteFile_ != nullptr) { + deleteCheckpointFile(); } - if (checkpointRc != 0) { +} + +void SsdFile::truncateEvictLogFile() { + VELOX_CHECK_NOT_NULL(evictLogWriteFile_); + evictLogWriteFile_->truncate(0); + evictLogWriteFile_->flush(); +} + +void SsdFile::truncateCheckpointFile() { + VELOX_CHECK_NOT_NULL(checkpointWriteFile_); + checkpointWriteFile_->truncate(0); + checkpointWriteFile_->flush(); +} + +void SsdFile::deleteEvictLogFile() { + VELOX_CHECK_NOT_NULL(evictLogWriteFile_); + evictLogWriteFile_->close(); + evictLogWriteFile_.reset(); + const auto evictLogFilePath = getEvictLogFilePath(); + if (fs_->exists(evictLogFilePath)) { + fs_->remove(evictLogFilePath); + } +} + +void SsdFile::deleteCheckpointFile() { + VELOX_CHECK_NOT_NULL(checkpointWriteFile_); + try { + checkpointWriteFile_->close(); + checkpointWriteFile_.reset(); + const auto checkpointFilePath = getCheckpointFilePath(); + if (fs_->exists(checkpointFilePath)) { + fs_->remove(checkpointFilePath); + } + } catch (const std::exception& e) { ++stats_.deleteCheckpointErrors; + VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting checkpoint: " << e.what(); } } @@ -696,17 +700,53 @@ void SsdFile::checkpointError(int32_t rc, const std::string& error) { checkpointIntervalBytes_ = 0; } -namespace { -template -inline char* asChar(T ptr) { - return reinterpret_cast(ptr); +void SsdFile::allocateCheckpointBuffer() { + VELOX_CHECK_NULL(checkpointBuffer_); + checkpointBuffer_ = memory::memoryManager()->allocator()->allocateBytes( + kCheckpointBufferSize); + checkpointBufferedDataSize_ = 0; } -template -inline const char* asChar(const T* ptr) { - return reinterpret_cast(ptr); +void SsdFile::freeCheckpointBuffer() { + VELOX_CHECK_NOT_NULL(checkpointBuffer_); + memory::memoryManager()->allocator()->freeBytes( + checkpointBuffer_, kCheckpointBufferSize); + checkpointBuffer_ = nullptr; + checkpointBufferedDataSize_ = 0; +} + +void SsdFile::appendToCheckpointBuffer(const void* source, int32_t size) { + VELOX_CHECK_NOT_NULL(checkpointBuffer_); + maybeFlushCheckpointBuffer(size); + ::memcpy( + static_cast( + static_cast(checkpointBuffer_) + checkpointBufferedDataSize_), + source, + size); + checkpointBufferedDataSize_ += size; +} + +void SsdFile::appendToCheckpointBuffer(const std::string& string) { + appendToCheckpointBuffer(string.data(), string.length()); +} + +void SsdFile::maybeFlushCheckpointBuffer(uint32_t appendBytes, bool force) { + if (checkpointBufferedDataSize_ > 0 && + (force || + checkpointBufferedDataSize_ + appendBytes >= kCheckpointBufferSize)) { + VELOX_CHECK_NOT_NULL(checkpointBuffer_); + checkpointWriteFile_->append(std::string_view( + static_cast(checkpointBuffer_), + checkpointBufferedDataSize_)); + checkpointBufferedDataSize_ = 0; + } +} + +void SsdFile::flushCheckpointFile() { + VELOX_CHECK_NOT_NULL(checkpointWriteFile_); + maybeFlushCheckpointBuffer(0, true); + checkpointWriteFile_->flush(); } -} // namespace void SsdFile::checkpoint(bool force) { process::TraceContext trace("SsdFile::checkpoint"); @@ -724,13 +764,6 @@ void SsdFile::checkpoint(bool force) { checkpointDeleted_ = false; bytesAfterCheckpoint_ = 0; try { - const auto checkRc = [&](int32_t rc, const std::string& errMsg) { - if (rc < 0) { - VELOX_FAIL("{} with rc {} :{}", errMsg, rc, folly::errnoStr(errno)); - } - return rc; - }; - // We schedule the potentially long fsync of the cache file on another // thread of the cache write executor, if available. If there is none, we do // the sync on this thread at the end. @@ -739,11 +772,11 @@ void SsdFile::checkpoint(bool force) { return std::make_unique(0); }); - std::ofstream state; - const auto checkpointPath = getCheckpointFilePath(); + executor_->add([source = fileSync]() { source->prepare(); }); + try { - state.exceptions(std::ofstream::failbit); - state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc); + VELOX_CHECK_NOT_NULL(checkpointWriteFile_); + truncateCheckpointFile(); // The checkpoint state file contains: // int32_t The 4 bytes of checkpoint version, // int32_t maxRegions, @@ -753,71 +786,55 @@ void SsdFile::checkpoint(bool force) { // kMapMarker, // {fileId, offset, SSdRun} triples, // kEndMarker. - state.write(checkpointVersion().data(), sizeof(int32_t)); - state.write(asChar(&maxRegions_), sizeof(maxRegions_)); - state.write(asChar(&numRegions_), sizeof(numRegions_)); + allocateCheckpointBuffer(); + SCOPE_EXIT { + freeCheckpointBuffer(); + }; + const auto version = checkpointVersion(); + appendToCheckpointBuffer(checkpointVersion()); + appendToCheckpointBuffer(maxRegions_); + appendToCheckpointBuffer(numRegions_); // Copy the region scores before writing out for tsan. - const auto scoresCopy = tracker_.copyScores(); - state.write(asChar(scoresCopy.data()), maxRegions_ * sizeof(uint64_t)); + appendToCheckpointBuffer(tracker_.copyScores()); std::unordered_set fileNums; for (const auto& entry : entries_) { const auto fileNum = entry.first.fileNum.id(); if (fileNums.insert(fileNum).second) { - state.write(asChar(&fileNum), sizeof(fileNum)); + appendToCheckpointBuffer(fileNum); const auto name = fileIds().string(fileNum); const int32_t length = name.size(); - state.write(asChar(&length), sizeof(length)); - state.write(name.data(), length); + appendToCheckpointBuffer(length); + appendToCheckpointBuffer(name); } } - const auto mapMarker = kCheckpointMapMarker; - state.write(asChar(&mapMarker), sizeof(mapMarker)); + appendToCheckpointBuffer(kCheckpointMapMarker); for (auto& pair : entries_) { const auto id = pair.first.fileNum.id(); - state.write(asChar(&id), sizeof(id)); - state.write(asChar(&pair.first.offset), sizeof(pair.first.offset)); + appendToCheckpointBuffer(id); + appendToCheckpointBuffer(pair.first.offset); const auto offsetAndSize = pair.second.fileBits(); - state.write(asChar(&offsetAndSize), sizeof(offsetAndSize)); + appendToCheckpointBuffer(offsetAndSize); if (checksumEnabled_) { const auto checksum = pair.second.checksum(); - state.write(asChar(&checksum), sizeof(checksum)); + appendToCheckpointBuffer(checksum); } } - } catch (const std::exception& e) { - fileSync->close(); - std::rethrow_exception(std::current_exception()); - } - // NOTE: we need to ensure cache file data sync update completes before - // updating checkpoint file. - fileSync->move(); + // NOTE: we need to ensure cache file data sync update completes before + // updating checkpoint file. + fileSync->move(); - const auto endMarker = kCheckpointEndMarker; - state.write(asChar(&endMarker), sizeof(endMarker)); - - if (state.bad()) { - ++stats_.writeCheckpointErrors; - checkRc(-1, "Write of checkpoint file"); - } else { + appendToCheckpointBuffer(kCheckpointEndMarker); + flushCheckpointFile(); ++stats_.checkpointsWritten; + } catch (const std::exception& e) { + ++stats_.writeCheckpointErrors; + VELOX_SSD_CACHE_LOG(ERROR) << "Error in writing cehckpoint: " << e.what(); + fileSync->close(); + std::rethrow_exception(std::current_exception()); } - state.close(); - - // Sync checkpoint data file. ofstream does not have a sync method, so open - // as fd and sync that. - const auto checkpointFd = checkRc( - ::open(checkpointPath.c_str(), O_WRONLY), - "Open of checkpoint file for sync"); - // TODO: add this as file open option after we migrate to use velox - // filesystem for ssd file access. - if (disableFileCow_) { - disableCow(checkpointFd); - } - VELOX_CHECK_GE(checkpointFd, 0); - checkRc(::fsync(checkpointFd), "Sync of checkpoint file"); - ::close(checkpointFd); // NOTE: we shall truncate eviction log after checkpoint file sync // completes so that we never recover from an old checkpoint file without @@ -843,22 +860,35 @@ void SsdFile::initializeCheckpoint() { } bool hasCheckpoint = true; - std::ifstream state(getCheckpointFilePath()); - if (!state.is_open()) { + std::unique_ptr checkpointInputStream; + filesystems::FileOptions writeFileOptions; + writeFileOptions.shouldThrowOnFileAlreadyExists = false; + + const auto checkpointPath = getCheckpointFilePath(); + try { + checkpointWriteFile_ = + fs_->openFileForWrite(checkpointPath, writeFileOptions); + + auto checkpointReadFile = fs_->openFileForRead(checkpointPath); + checkpointInputStream = std::make_unique( + std::move(checkpointReadFile), + 1 << 20, + memory::memoryManager()->cachePool()); + } catch (std::exception& e) { hasCheckpoint = false; ++stats_.openCheckpointErrors; VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( - "Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}", + "Error openning checkpoint file {}: Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}", + e.what(), shardId_, checksumEnabled_ ? "enabled" : "disabled", checksumReadVerificationEnabled_ ? "enabled" : "disabled", - getCheckpointFilePath()); + checkpointPath); } + const auto logPath = getEvictLogFilePath(); - filesystems::FileOptions evictLogFileOptions; - evictLogFileOptions.shouldThrowOnFileAlreadyExists = false; try { - evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions); + evictLogWriteFile_ = fs_->openFileForWrite(logPath, writeFileOptions); } catch (std::exception& e) { ++stats_.openLogErrors; // Failure to open the log at startup is a process terminating error. @@ -867,8 +897,7 @@ void SsdFile::initializeCheckpoint() { try { if (hasCheckpoint) { - state.exceptions(std::ifstream::failbit); - readCheckpoint(state); + readCheckpoint(std::move(checkpointInputStream)); } } catch (const std::exception& e) { ++stats_.readCheckpointErrors; @@ -936,23 +965,40 @@ void SsdFile::disableFileCow() { if (evictLogWriteFile_ != nullptr) { evictLogWriteFile_->setAttributes(attributes); } + if (checkpointWriteFile_ != nullptr) { + checkpointWriteFile_->setAttributes(attributes); + } #endif // linux } namespace { template -T readNumber(std::ifstream& stream) { +T readNumber(common::FileInputStream* stream) { T data; - stream.read(asChar(&data), sizeof(T)); + stream->readBytes(reinterpret_cast(&data), sizeof(T)); + return data; +} + +std::string readString(common::FileInputStream* stream, int32_t length) { + std::string data(length, '\0'); + stream->readBytes( + reinterpret_cast(const_cast(data.data())), length); return data; } + +template +std::vector readVector(common::FileInputStream* stream, int32_t size) { + std::vector dataVector(size); + stream->readBytes( + reinterpret_cast(dataVector.data()), size * sizeof(T)); + return dataVector; +} } // namespace -void SsdFile::readCheckpoint(std::ifstream& state) { - char versionMagic[4]; - state.read(versionMagic, sizeof(versionMagic)); +void SsdFile::readCheckpoint(std::unique_ptr stream) { + const auto versionMagic = readString(stream.get(), 4); const auto checkpoinHasChecksum = - isChecksumEnabledOnCheckpointVersion(std::string(versionMagic, 4)); + isChecksumEnabledOnCheckpointVersion(versionMagic); if (checksumEnabled_ && !checkpoinHasChecksum) { VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( "Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}", @@ -961,23 +1007,22 @@ void SsdFile::readCheckpoint(std::ifstream& state) { return; } - const auto maxRegions = readNumber(state); + const auto maxRegions = readNumber(stream.get()); VELOX_CHECK_EQ( maxRegions, maxRegions_, "Trying to start from checkpoint with a different capacity"); - numRegions_ = readNumber(state); - std::vector scores(maxRegions); - state.read(asChar(scores.data()), maxRegions_ * sizeof(double)); + numRegions_ = readNumber(stream.get()); + + const auto scores = readVector(stream.get(), maxRegions_); std::unordered_map idMap; for (;;) { - const auto id = readNumber(state); + const auto id = readNumber(stream.get()); if (id == kCheckpointMapMarker) { break; } - std::string name; - name.resize(readNumber(state)); - state.read(name.data(), name.size()); + const auto length = readNumber(stream.get()); + const auto name = readString(stream.get(), length); idMap[id] = StringIdLease(fileIds(), id, name); } @@ -998,15 +1043,15 @@ void SsdFile::readCheckpoint(std::ifstream& state) { std::vector regionCacheSizes(numRegions_, 0); for (;;) { - const auto fileNum = readNumber(state); + const auto fileNum = readNumber(stream.get()); if (fileNum == kCheckpointEndMarker) { break; } - const auto offset = readNumber(state); - const auto fileBits = readNumber(state); + const auto offset = readNumber(stream.get()); + const auto fileBits = readNumber(stream.get()); uint32_t checksum = 0; if (checkpoinHasChecksum) { - checksum = readNumber(state); + checksum = readNumber(stream.get()); } const auto run = SsdRun(fileBits, checksum); const auto region = regionIndex(run.offset()); diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index a08326af30ce..adc54fa437a7 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -22,6 +22,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFileTracker.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" DECLARE_bool(ssd_odirect); @@ -373,13 +374,7 @@ class SsdFile { /// Returns the checkpoint file path. std::string getCheckpointFilePath() const { - // Faulty file path needs to be handled manually before we switch checkpoint - // file to Velox filesystem. - const std::string faultyPrefix = "faulty:"; - std::string checkpointPath = fileName_ + kCheckpointExtension; - return checkpointPath.find(faultyPrefix) == 0 - ? checkpointPath.substr(faultyPrefix.size()) - : checkpointPath; + return fileName_ + kCheckpointExtension; } /// Resets this' to a post-construction empty state. See SsdCache::clear(). @@ -448,7 +443,7 @@ class SsdFile { // Reads a checkpoint state file and sets 'this' accordingly if read is // successful. Return true for successful read. A failed read deletes the // checkpoint and leaves the log truncated open. - void readCheckpoint(std::ifstream& state); + void readCheckpoint(std::unique_ptr stream); // Logs an error message, deletes the checkpoint and stop making new // checkpoints. @@ -493,6 +488,48 @@ class SsdFile { // file system not supporting cow feature. void disableFileCow(); + // Truncates the eviction log file to 0. + void truncateEvictLogFile(); + + // Truncates the checkpoint file to 0. + void truncateCheckpointFile(); + + // Deletes the eviction log file if it exists. + void deleteEvictLogFile(); + + // Deletes the checkpoint file if it exists. + void deleteCheckpointFile(); + + // Allocates 'kCheckpointBufferSize' buffer from cache memory pool for + // checkpointing. + void allocateCheckpointBuffer(); + + // Frees checkpoint buffer. + void freeCheckpointBuffer(); + + // Appends 'size' bytes from source buffer to the checkpoint buffer and + // flushes the buffered data to disk if necessary. + void appendToCheckpointBuffer(const void* source, int32_t size); + + void appendToCheckpointBuffer(const std::string& string); + + template + void appendToCheckpointBuffer(const std::vector& vector) { + appendToCheckpointBuffer(vector.data(), vector.size() * sizeof(T)); + } + + template + void appendToCheckpointBuffer(const T& data) { + appendToCheckpointBuffer(&data, sizeof(data)); + } + + // Flushs the buffered data to write file if the buffered data has exceeded + // 'kCheckpointBufferSize' or 'force' is set true. + void maybeFlushCheckpointBuffer(uint32_t appendBytes, bool force = false); + + // Flushs the buffered data to disk. + void flushCheckpointFile(); + // Returns true if checksum write is enabled for the given version. static bool isChecksumEnabledOnCheckpointVersion( const std::string& checkpointVersion) { @@ -501,6 +538,7 @@ class SsdFile { static constexpr const char* kLogExtension = ".log"; static constexpr const char* kCheckpointExtension = ".cpt"; + static constexpr uint32_t kCheckpointBufferSize = 1 << 20; // 1MB // Name of cache file, used as prefix for checkpoint files. const std::string fileName_; @@ -565,6 +603,9 @@ class SsdFile { // WriteFile for evict log file. std::unique_ptr evictLogWriteFile_; + // WriteFile for checkpoint file. + std::unique_ptr checkpointWriteFile_; + // Counters. SsdCacheStats stats_; @@ -581,6 +622,12 @@ class SsdFile { // True if there was an error with checkpoint and the checkpoint was deleted. bool checkpointDeleted_{false}; + // Used for checkpoint buffer and is only set during the checkpoint write. + void* checkpointBuffer_ = nullptr; + + // Buffered data size for checkpoint. + uint32_t checkpointBufferedDataSize_; + friend class test::SsdFileTestHelper; friend class test::SsdCacheTestHelper; }; diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 277c574a22f1..095d1bdf4349 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -111,6 +111,17 @@ class AsyncDataCacheTest : public ::testing::TestWithParam { } } + void initializeMemoryManager(int64_t capacity) { + if (!memory::MemoryManager::testInstance()) { + memory::MemoryManagerOptions options; + options.useMmapAllocator = true; + options.allocatorCapacity = capacity; + options.arbitratorCapacity = capacity; + options.trackDefaultUsage = true; + memory::MemoryManager::initialize(options); + } + } + void initializeCache( uint64_t maxBytes, int64_t ssdBytes = 0, @@ -1204,6 +1215,8 @@ TEST_P(AsyncDataCacheTest, shutdown) { constexpr uint64_t kRamBytes = 16 << 20; constexpr uint64_t kSsdBytes = 64UL << 20; + initializeMemoryManager(kRamBytes); + for (const auto asyncShutdown : {false, true}) { SCOPED_TRACE(fmt::format("asyncShutdown {}", asyncShutdown)); // Initialize cache with a big checkpointIntervalBytes, giving eviction log @@ -1535,6 +1548,7 @@ TEST_P(AsyncDataCacheTest, checkpoint) { constexpr uint64_t kRamBytes = 16UL << 20; // 16 MB constexpr uint64_t kSsdBytes = 64UL << 20; // 64 MB + initializeMemoryManager(kRamBytes); initializeCache( kRamBytes, kSsdBytes, diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index f84d1da8e741..e9a0b6bc7a18 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -23,6 +23,7 @@ #include "velox/exec/tests/utils/TempDirectoryPath.h" #include +#include #include #include #include @@ -100,7 +101,8 @@ class SsdFileTest : public testing::Test { checkpointIntervalBytes, disableFileCow, checksumEnabled, - checksumReadVerificationEnabled); + checksumReadVerificationEnabled, + ssdExecutor()); ssdFile_ = std::make_unique(config); if (ssdFile_ != nullptr) { ssdFileHelper_ = @@ -167,6 +169,12 @@ class SsdFileTest : public testing::Test { } } + static folly::IOThreadPoolExecutor* ssdExecutor() { + static std::unique_ptr ssdExecutor = + std::make_unique(20); + return ssdExecutor.get(); + } + // Gets consecutive entries from file 'fileId' starting at 'startOffset' with // sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double // each time and go back to 'minSize' after exceeding 'maxSize'. This stops diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 6a30f0a26159..deccd247198b 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -383,6 +383,8 @@ void LocalWriteFile::truncate(int64_t newSize) { 0, "ftruncate failed in LocalWriteFile::truncate: {}.", folly::errnoStr(errno)); + // Reposition the file offset to the end of the file for append(). + ::lseek(fd_, newSize, SEEK_SET); size_ = newSize; } diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 83e670b948d9..cd29de95eeba 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -114,6 +114,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .coreOnAllocationFailureEnabled = options.coreOnAllocationFailureEnabled})}, spillPool_{addLeafPool("__sys_spilling__")}, + cachePool_{addLeafPool("__sys_caching__")}, tracePool_{addLeafPool("__sys_tracing__")}, sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) { VELOX_CHECK_NOT_NULL(allocator_); diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index f460d25ffee6..2aa3e5d93f50 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -274,6 +274,11 @@ class MemoryManager { return spillPool_.get(); } + /// Returns the process wide leaf memory pool used for ssd cache. + MemoryPool* cachePool() { + return cachePool_.get(); + } + /// Returns the process wide leaf memory pool used for query tracing. MemoryPool* tracePool() const { return tracePool_.get(); @@ -311,6 +316,7 @@ class MemoryManager { const std::shared_ptr sysRoot_; const std::shared_ptr spillPool_; + const std::shared_ptr cachePool_; const std::shared_ptr tracePool_; const std::vector> sharedLeafPools_; diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index b241e0f2f71d..dbfe8475feae 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) { const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools; { MemoryManager manager{}; - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); ASSERT_EQ(manager.capacity(), kMaxMemory); ASSERT_EQ(0, manager.getTotalBytes()); ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment); @@ -67,7 +67,7 @@ TEST_F(MemoryManagerTest, ctor) { MemoryManager manager{ {.allocatorCapacity = kCapacity, .arbitratorCapacity = kCapacity}}; ASSERT_EQ(kCapacity, manager.capacity()); - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); } { @@ -81,7 +81,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); // TODO: replace with root pool memory tracker quota check. ASSERT_EQ( - kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount()); + kSharedPoolCount + 3, manager.testingDefaultRoot().getChildCount()); ASSERT_EQ(kCapacity, manager.capacity()); ASSERT_EQ(0, manager.getTotalBytes()); } @@ -98,7 +98,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(arbitrator->stats().maxCapacityBytes, kCapacity); ASSERT_EQ( manager.toString(), - "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]"); + "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]"); } } @@ -263,10 +263,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) { TEST_F(MemoryManagerTest, defaultMemoryManager) { auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager()); auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; - ASSERT_EQ(managerA.numPools(), 2); + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3; + ASSERT_EQ(managerA.numPools(), 3); ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount); - ASSERT_EQ(managerB.numPools(), 2); + ASSERT_EQ(managerB.numPools(), 3); ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount); auto child1 = managerA.addLeafPool("child_1"); @@ -277,38 +277,38 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount()); EXPECT_EQ( kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount()); - ASSERT_EQ(managerA.numPools(), 4); - ASSERT_EQ(managerB.numPools(), 4); - auto pool = managerB.addRootPool(); ASSERT_EQ(managerA.numPools(), 5); ASSERT_EQ(managerB.numPools(), 5); + auto pool = managerB.addRootPool(); + ASSERT_EQ(managerA.numPools(), 6); + ASSERT_EQ(managerB.numPools(), 6); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 6\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 6\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); child1.reset(); EXPECT_EQ( kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount()); child2.reset(); EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount()); + ASSERT_EQ(managerA.numPools(), 4); + ASSERT_EQ(managerB.numPools(), 4); + pool.reset(); ASSERT_EQ(managerA.numPools(), 3); ASSERT_EQ(managerB.numPools(), 3); - pool.reset(); - ASSERT_EQ(managerA.numPools(), 2); - ASSERT_EQ(managerB.numPools(), 2); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); const std::string detailedManagerStr = managerA.toString(true); ASSERT_THAT( detailedManagerStr, testing::HasSubstr( - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); ASSERT_THAT( detailedManagerStr, testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n")); @@ -326,7 +326,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { // TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool. TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) { auto& manager = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3; ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount); { auto poolA = deprecatedAddDefaultLeafMemoryPool(); @@ -381,7 +381,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { MemoryManagerOptions options; options.alignment = alignment; MemoryManager manager{options}; - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); const int numPools = 100; std::vector> userRootPools; std::vector> userLeafPools; @@ -406,14 +406,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { ASSERT_FALSE(rootUnamedPool->name().empty()); ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootUnamedPool->parent(), nullptr); - ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools + 3 + 1); userLeafPools.clear(); leafUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1 + 1); userRootPools.clear(); - ASSERT_EQ(manager.numPools(), 1 + 2); + ASSERT_EQ(manager.numPools(), 1 + 3); rootUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); } // TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side @@ -430,7 +430,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_NE(manager, globalManager); ASSERT_EQ(manager, memoryManager()); auto* managerII = memoryManager(); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3; { auto& rootI = manager->testingDefaultRoot(); const std::string childIName("some_child"); @@ -464,9 +464,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1); ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1); - ASSERT_EQ(manager->numPools(), 2 + 2); + ASSERT_EQ(manager->numPools(), 2 + 3); } - ASSERT_EQ(manager->numPools(), 2); + ASSERT_EQ(manager->numPools(), 3); } TEST_F(MemoryManagerTest, alignmentOptionCheck) { @@ -563,9 +563,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) { } stopCheck = true; checkThread.join(); - ASSERT_EQ(manager.numPools(), pools.size() + 2); + ASSERT_EQ(manager.numPools(), pools.size() + 3); pools.clear(); - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); } TEST_F(MemoryManagerTest, quotaEnforcement) { @@ -681,7 +681,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) { ASSERT_EQ(manager.capacity(), 64LL << 20); ASSERT_EQ(manager.shrinkPools(), 0); // Default 1 system pool with 1 leaf child - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); VELOX_ASSERT_THROW( leaf0->allocate(38LL << 20), "Exceeded memory pool capacity");