diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index afafe78ebab5..1b2ab9f4710e 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -22,6 +22,7 @@ #include "velox/common/base/SuccinctPrinter.h" #include "velox/common/caching/FileIds.h" #include "velox/common/caching/SsdCache.h" +#include "velox/common/memory/Memory.h" #include "velox/common/process/TraceContext.h" #include @@ -44,31 +45,6 @@ namespace facebook::velox::cache { namespace { -// TODO: Remove this function once we migrate all files to velox fs. -// -// Disable 'copy on write' on the given file. Will throw if failed for any -// reason, including file system not supporting cow feature. -void disableCow(int32_t fd) { -#ifdef linux - int attr{0}; - auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_GETFLAGS) failed: {}, {}", - res, - folly::errnoStr(errno)); - attr |= FS_NOCOW_FL; - res = ioctl(fd, FS_IOC_SETFLAGS, &attr); - VELOX_CHECK_EQ( - 0, - res, - "ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}", - res, - folly::errnoStr(errno)); -#endif // linux -} - void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { if (entry.tinyData() != nullptr) { iovecs.push_back({entry.tinyData(), static_cast(entry.size())}); @@ -677,14 +653,16 @@ void SsdFile::deleteCheckpoint(bool keepLog) { } } - const auto checkpointPath = getCheckpointFilePath(); - const auto checkpointRc = ::unlink(checkpointPath.c_str()); - if (checkpointRc != 0) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Error in deleting checkpoint: " << checkpointRc; - } - if (checkpointRc != 0) { - ++stats_.deleteCheckpointErrors; + if (checkpointWriteFile_ != nullptr) { + try { + checkpointWriteFile_->close(); + fs_->remove(getCheckpointFilePath()); + checkpointWriteFile_.reset(); + } catch (const std::exception& e) { + ++stats_.deleteCheckpointErrors; + VELOX_SSD_CACHE_LOG(ERROR) + << "Error in deleting checkpoint: " << e.what(); + } } } @@ -724,13 +702,6 @@ void SsdFile::checkpoint(bool force) { checkpointDeleted_ = false; bytesAfterCheckpoint_ = 0; try { - const auto checkRc = [&](int32_t rc, const std::string& errMsg) { - if (rc < 0) { - VELOX_FAIL("{} with rc {} :{}", errMsg, rc, folly::errnoStr(errno)); - } - return rc; - }; - // We schedule the potentially long fsync of the cache file on another // thread of the cache write executor, if available. If there is none, we do // the sync on this thread at the end. @@ -739,11 +710,12 @@ void SsdFile::checkpoint(bool force) { return std::make_unique(0); }); - std::ofstream state; - const auto checkpointPath = getCheckpointFilePath(); + // NOTE: we need to ensure cache file data sync update completes before + // updating checkpoint file. + fileSync->move(); + try { - state.exceptions(std::ofstream::failbit); - state.open(checkpointPath, std::ios_base::out | std::ios_base::trunc); + checkpointWriteFile_->truncate(0); // The checkpoint state file contains: // int32_t The 4 bytes of checkpoint version, // int32_t maxRegions, @@ -753,72 +725,57 @@ void SsdFile::checkpoint(bool force) { // kMapMarker, // {fileId, offset, SSdRun} triples, // kEndMarker. - state.write(checkpointVersion().data(), sizeof(int32_t)); - state.write(asChar(&maxRegions_), sizeof(maxRegions_)); - state.write(asChar(&numRegions_), sizeof(numRegions_)); + checkpointWriteFile_->append(checkpointVersion()); + checkpointWriteFile_->append( + folly::IOBuf::copyBuffer(&maxRegions_, sizeof(maxRegions_))); + checkpointWriteFile_->append( + folly::IOBuf::copyBuffer(&numRegions_, sizeof(numRegions_))); // Copy the region scores before writing out for tsan. const auto scoresCopy = tracker_.copyScores(); - state.write(asChar(scoresCopy.data()), maxRegions_ * sizeof(uint64_t)); + checkpointWriteFile_->append(folly::IOBuf::copyBuffer( + scoresCopy.data(), maxRegions_ * sizeof(double))); std::unordered_set fileNums; for (const auto& entry : entries_) { const auto fileNum = entry.first.fileNum.id(); if (fileNums.insert(fileNum).second) { - state.write(asChar(&fileNum), sizeof(fileNum)); + checkpointWriteFile_->append( + folly::IOBuf::copyBuffer(&fileNum, sizeof(fileNum))); const auto name = fileIds().string(fileNum); const int32_t length = name.size(); - state.write(asChar(&length), sizeof(length)); - state.write(name.data(), length); + checkpointWriteFile_->append( + folly::IOBuf::copyBuffer(&length, sizeof(length))); + checkpointWriteFile_->append(name); } } - const auto mapMarker = kCheckpointMapMarker; - state.write(asChar(&mapMarker), sizeof(mapMarker)); + checkpointWriteFile_->append(folly::IOBuf::copyBuffer( + &kCheckpointMapMarker, sizeof(kCheckpointMapMarker))); for (auto& pair : entries_) { const auto id = pair.first.fileNum.id(); - state.write(asChar(&id), sizeof(id)); - state.write(asChar(&pair.first.offset), sizeof(pair.first.offset)); + checkpointWriteFile_->append(folly::IOBuf::copyBuffer(&id, sizeof(id))); + checkpointWriteFile_->append(folly::IOBuf::copyBuffer( + &pair.first.offset, sizeof(pair.first.offset))); const auto offsetAndSize = pair.second.fileBits(); - state.write(asChar(&offsetAndSize), sizeof(offsetAndSize)); + checkpointWriteFile_->append( + folly::IOBuf::copyBuffer(&offsetAndSize, sizeof(offsetAndSize))); if (checksumEnabled_) { const auto checksum = pair.second.checksum(); - state.write(asChar(&checksum), sizeof(checksum)); + checkpointWriteFile_->append( + folly::IOBuf::copyBuffer(&checksum, sizeof(checksum))); } } + checkpointWriteFile_->append(folly::IOBuf::copyBuffer( + &kCheckpointEndMarker, sizeof(kCheckpointEndMarker))); + checkpointWriteFile_->flush(); + ++stats_.checkpointsWritten; } catch (const std::exception& e) { + ++stats_.writeCheckpointErrors; + VELOX_SSD_CACHE_LOG(ERROR) << "Error in writing cehckpoint: " << e.what(); fileSync->close(); std::rethrow_exception(std::current_exception()); } - // NOTE: we need to ensure cache file data sync update completes before - // updating checkpoint file. - fileSync->move(); - - const auto endMarker = kCheckpointEndMarker; - state.write(asChar(&endMarker), sizeof(endMarker)); - - if (state.bad()) { - ++stats_.writeCheckpointErrors; - checkRc(-1, "Write of checkpoint file"); - } else { - ++stats_.checkpointsWritten; - } - state.close(); - - // Sync checkpoint data file. ofstream does not have a sync method, so open - // as fd and sync that. - const auto checkpointFd = checkRc( - ::open(checkpointPath.c_str(), O_WRONLY), - "Open of checkpoint file for sync"); - // TODO: add this as file open option after we migrate to use velox - // filesystem for ssd file access. - if (disableFileCow_) { - disableCow(checkpointFd); - } - VELOX_CHECK_GE(checkpointFd, 0); - checkRc(::fsync(checkpointFd), "Sync of checkpoint file"); - ::close(checkpointFd); - // NOTE: we shall truncate eviction log after checkpoint file sync // completes so that we never recover from an old checkpoint file without // log evictions. The latter might lead to data consistent issue. @@ -843,22 +800,35 @@ void SsdFile::initializeCheckpoint() { } bool hasCheckpoint = true; - std::ifstream state(getCheckpointFilePath()); - if (!state.is_open()) { + std::unique_ptr stream = nullptr; + filesystems::FileOptions writeFileOptions; + writeFileOptions.shouldThrowOnFileAlreadyExists = false; + + const auto checkpointPath = getCheckpointFilePath(); + try { + checkpointWriteFile_ = + fs_->openFileForWrite(checkpointPath, writeFileOptions); + + auto checkpointReadFile = fs_->openFileForRead(checkpointPath); + stream = std::make_unique( + std::move(checkpointReadFile), + 1 << 20, + memory::memoryManager()->cachePool()); + } catch (std::exception& e) { hasCheckpoint = false; ++stats_.openCheckpointErrors; VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( - "Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}", + "Error openning checkpoint file {}: Starting shard {} without checkpoint, with checksum write {}, read verification {}, checkpoint file {}", + e.what(), shardId_, checksumEnabled_ ? "enabled" : "disabled", checksumReadVerificationEnabled_ ? "enabled" : "disabled", - getCheckpointFilePath()); + checkpointPath); } + const auto logPath = getEvictLogFilePath(); - filesystems::FileOptions evictLogFileOptions; - evictLogFileOptions.shouldThrowOnFileAlreadyExists = false; try { - evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions); + evictLogWriteFile_ = fs_->openFileForWrite(logPath, writeFileOptions); } catch (std::exception& e) { ++stats_.openLogErrors; // Failure to open the log at startup is a process terminating error. @@ -867,8 +837,7 @@ void SsdFile::initializeCheckpoint() { try { if (hasCheckpoint) { - state.exceptions(std::ifstream::failbit); - readCheckpoint(state); + readCheckpoint(std::move(stream)); } } catch (const std::exception& e) { ++stats_.readCheckpointErrors; @@ -936,23 +905,41 @@ void SsdFile::disableFileCow() { if (evictLogWriteFile_ != nullptr) { evictLogWriteFile_->setAttributes(attributes); } + if (checkpointWriteFile_ != nullptr) { + checkpointWriteFile_->setAttributes(attributes); + } #endif // linux } namespace { template -T readNumber(std::ifstream& stream) { - T data; - stream.read(asChar(&data), sizeof(T)); - return data; +T readNumber(common::FileInputStream* stream) { + const int32_t numBytes = sizeof(T); + uint8_t data[numBytes]; + stream->readBytes(data, numBytes); + return *reinterpret_cast(data); +} + +std::string readString(common::FileInputStream* stream, int32_t length) { + uint8_t data[length]; + stream->readBytes(data, length); + return std::string(reinterpret_cast(data), length); +} + +template +std::vector readVector(common::FileInputStream* stream, int32_t size) { + const int32_t numBytes = size * sizeof(T); + uint8_t data[numBytes]; + stream->readBytes(data, numBytes); + return std::vector( + reinterpret_cast(data), reinterpret_cast(data) + size); } } // namespace -void SsdFile::readCheckpoint(std::ifstream& state) { - char versionMagic[4]; - state.read(versionMagic, sizeof(versionMagic)); +void SsdFile::readCheckpoint(std::unique_ptr stream) { + const auto versionMagic = readString(stream.get(), 4); const auto checkpoinHasChecksum = - isChecksumEnabledOnCheckpointVersion(std::string(versionMagic, 4)); + isChecksumEnabledOnCheckpointVersion(versionMagic); if (checksumEnabled_ && !checkpoinHasChecksum) { VELOX_SSD_CACHE_LOG(WARNING) << fmt::format( "Starting shard {} without checkpoint: checksum is enabled but the checkpoint was made without checksum, so skip the checkpoint recovery, checkpoint file {}", @@ -961,23 +948,22 @@ void SsdFile::readCheckpoint(std::ifstream& state) { return; } - const auto maxRegions = readNumber(state); + const auto maxRegions = readNumber(stream.get()); VELOX_CHECK_EQ( maxRegions, maxRegions_, "Trying to start from checkpoint with a different capacity"); - numRegions_ = readNumber(state); - std::vector scores(maxRegions); - state.read(asChar(scores.data()), maxRegions_ * sizeof(double)); + numRegions_ = readNumber(stream.get()); + + const auto scores = readVector(stream.get(), maxRegions_); std::unordered_map idMap; for (;;) { - const auto id = readNumber(state); + const auto id = readNumber(stream.get()); if (id == kCheckpointMapMarker) { break; } - std::string name; - name.resize(readNumber(state)); - state.read(name.data(), name.size()); + const auto length = readNumber(stream.get()); + const auto name = readString(stream.get(), length); idMap[id] = StringIdLease(fileIds(), id, name); } @@ -998,15 +984,15 @@ void SsdFile::readCheckpoint(std::ifstream& state) { std::vector regionCacheSizes(numRegions_, 0); for (;;) { - const auto fileNum = readNumber(state); + const auto fileNum = readNumber(stream.get()); if (fileNum == kCheckpointEndMarker) { break; } - const auto offset = readNumber(state); - const auto fileBits = readNumber(state); + const auto offset = readNumber(stream.get()); + const auto fileBits = readNumber(stream.get()); uint32_t checksum = 0; if (checkpoinHasChecksum) { - checksum = readNumber(state); + checksum = readNumber(stream.get()); } const auto run = SsdRun(fileBits, checksum); const auto region = regionIndex(run.offset()); diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index a08326af30ce..bd6a54650dc2 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -22,6 +22,7 @@ #include "velox/common/caching/AsyncDataCache.h" #include "velox/common/caching/SsdFileTracker.h" #include "velox/common/file/File.h" +#include "velox/common/file/FileInputStream.h" #include "velox/common/file/FileSystems.h" DECLARE_bool(ssd_odirect); @@ -373,13 +374,7 @@ class SsdFile { /// Returns the checkpoint file path. std::string getCheckpointFilePath() const { - // Faulty file path needs to be handled manually before we switch checkpoint - // file to Velox filesystem. - const std::string faultyPrefix = "faulty:"; - std::string checkpointPath = fileName_ + kCheckpointExtension; - return checkpointPath.find(faultyPrefix) == 0 - ? checkpointPath.substr(faultyPrefix.size()) - : checkpointPath; + return fileName_ + kCheckpointExtension; } /// Resets this' to a post-construction empty state. See SsdCache::clear(). @@ -448,7 +443,7 @@ class SsdFile { // Reads a checkpoint state file and sets 'this' accordingly if read is // successful. Return true for successful read. A failed read deletes the // checkpoint and leaves the log truncated open. - void readCheckpoint(std::ifstream& state); + void readCheckpoint(std::unique_ptr stream); // Logs an error message, deletes the checkpoint and stop making new // checkpoints. @@ -479,7 +474,7 @@ class SsdFile { // Returns true if checkpoint is needed. bool needCheckpoint(bool force) const { - if (!checkpointEnabled()) { + if (!checkpointEnabled() || checkpointWriteFile_ == nullptr) { return false; } return force || (bytesAfterCheckpoint_ >= checkpointIntervalBytes_); @@ -565,6 +560,9 @@ class SsdFile { // WriteFile for evict log file. std::unique_ptr evictLogWriteFile_; + // WriteFile for checkpoint file. + std::unique_ptr checkpointWriteFile_; + // Counters. SsdCacheStats stats_; diff --git a/velox/common/caching/tests/AsyncDataCacheTest.cpp b/velox/common/caching/tests/AsyncDataCacheTest.cpp index 277c574a22f1..e319084bb51a 100644 --- a/velox/common/caching/tests/AsyncDataCacheTest.cpp +++ b/velox/common/caching/tests/AsyncDataCacheTest.cpp @@ -1204,6 +1204,15 @@ TEST_P(AsyncDataCacheTest, shutdown) { constexpr uint64_t kRamBytes = 16 << 20; constexpr uint64_t kSsdBytes = 64UL << 20; + if (!memory::MemoryManager::testInstance()) { + memory::MemoryManagerOptions options; + options.useMmapAllocator = true; + options.allocatorCapacity = kRamBytes; + options.arbitratorCapacity = kRamBytes; + options.trackDefaultUsage = true; + memory::MemoryManager::initialize(options); + } + for (const auto asyncShutdown : {false, true}) { SCOPED_TRACE(fmt::format("asyncShutdown {}", asyncShutdown)); // Initialize cache with a big checkpointIntervalBytes, giving eviction log diff --git a/velox/common/file/File.cpp b/velox/common/file/File.cpp index 6a30f0a26159..deccd247198b 100644 --- a/velox/common/file/File.cpp +++ b/velox/common/file/File.cpp @@ -383,6 +383,8 @@ void LocalWriteFile::truncate(int64_t newSize) { 0, "ftruncate failed in LocalWriteFile::truncate: {}.", folly::errnoStr(errno)); + // Reposition the file offset to the end of the file for append(). + ::lseek(fd_, newSize, SEEK_SET); size_ = newSize; } diff --git a/velox/common/memory/Memory.cpp b/velox/common/memory/Memory.cpp index 83e670b948d9..cd29de95eeba 100644 --- a/velox/common/memory/Memory.cpp +++ b/velox/common/memory/Memory.cpp @@ -114,6 +114,7 @@ MemoryManager::MemoryManager(const MemoryManagerOptions& options) .coreOnAllocationFailureEnabled = options.coreOnAllocationFailureEnabled})}, spillPool_{addLeafPool("__sys_spilling__")}, + cachePool_{addLeafPool("__sys_caching__")}, tracePool_{addLeafPool("__sys_tracing__")}, sharedLeafPools_(createSharedLeafMemoryPools(*sysRoot_)) { VELOX_CHECK_NOT_NULL(allocator_); diff --git a/velox/common/memory/Memory.h b/velox/common/memory/Memory.h index f460d25ffee6..2aa3e5d93f50 100644 --- a/velox/common/memory/Memory.h +++ b/velox/common/memory/Memory.h @@ -274,6 +274,11 @@ class MemoryManager { return spillPool_.get(); } + /// Returns the process wide leaf memory pool used for ssd cache. + MemoryPool* cachePool() { + return cachePool_.get(); + } + /// Returns the process wide leaf memory pool used for query tracing. MemoryPool* tracePool() const { return tracePool_.get(); @@ -311,6 +316,7 @@ class MemoryManager { const std::shared_ptr sysRoot_; const std::shared_ptr spillPool_; + const std::shared_ptr cachePool_; const std::shared_ptr tracePool_; const std::vector> sharedLeafPools_; diff --git a/velox/common/memory/tests/MemoryManagerTest.cpp b/velox/common/memory/tests/MemoryManagerTest.cpp index c4a7329cf8ef..d2dc685fbd70 100644 --- a/velox/common/memory/tests/MemoryManagerTest.cpp +++ b/velox/common/memory/tests/MemoryManagerTest.cpp @@ -53,7 +53,7 @@ TEST_F(MemoryManagerTest, ctor) { const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools; { MemoryManager manager{}; - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); ASSERT_EQ(manager.capacity(), kMaxMemory); ASSERT_EQ(0, manager.getTotalBytes()); ASSERT_EQ(manager.alignment(), MemoryAllocator::kMaxAlignment); @@ -67,7 +67,7 @@ TEST_F(MemoryManagerTest, ctor) { MemoryManager manager{ {.allocatorCapacity = kCapacity, .arbitratorCapacity = kCapacity}}; ASSERT_EQ(kCapacity, manager.capacity()); - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); } { @@ -81,7 +81,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(manager.testingDefaultRoot().alignment(), manager.alignment()); // TODO: replace with root pool memory tracker quota check. ASSERT_EQ( - kSharedPoolCount + 2, manager.testingDefaultRoot().getChildCount()); + kSharedPoolCount + 3, manager.testingDefaultRoot().getChildCount()); ASSERT_EQ(kCapacity, manager.capacity()); ASSERT_EQ(0, manager.getTotalBytes()); } @@ -98,7 +98,7 @@ TEST_F(MemoryManagerTest, ctor) { ASSERT_EQ(arbitrator->stats().maxCapacityBytes, kCapacity); ASSERT_EQ( manager.toString(), - "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]"); + "Memory Manager[capacity 4.00GB alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity 4.00GB allocated bytes 0 allocated pages 0 mapped pages 0]\nARBITRATOR[SHARED CAPACITY[4.00GB] numRequests 0 numRunning 0 numSucceded 0 numAborted 0 numFailures 0 numNonReclaimableAttempts 0 reclaimedFreeCapacity 0B reclaimedUsedCapacity 0B maxCapacity 4.00GB freeCapacity 4.00GB freeReservedCapacity 0B]]"); } } @@ -263,10 +263,10 @@ TEST_F(MemoryManagerTest, addPoolWithArbitrator) { TEST_F(MemoryManagerTest, defaultMemoryManager) { auto& managerA = toMemoryManager(deprecatedDefaultMemoryManager()); auto& managerB = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; - ASSERT_EQ(managerA.numPools(), 2); + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3; + ASSERT_EQ(managerA.numPools(), 3); ASSERT_EQ(managerA.testingDefaultRoot().getChildCount(), kSharedPoolCount); - ASSERT_EQ(managerB.numPools(), 2); + ASSERT_EQ(managerB.numPools(), 3); ASSERT_EQ(managerB.testingDefaultRoot().getChildCount(), kSharedPoolCount); auto child1 = managerA.addLeafPool("child_1"); @@ -277,38 +277,38 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { kSharedPoolCount + 2, managerA.testingDefaultRoot().getChildCount()); EXPECT_EQ( kSharedPoolCount + 2, managerB.testingDefaultRoot().getChildCount()); - ASSERT_EQ(managerA.numPools(), 4); - ASSERT_EQ(managerB.numPools(), 4); - auto pool = managerB.addRootPool(); ASSERT_EQ(managerA.numPools(), 5); ASSERT_EQ(managerB.numPools(), 5); + auto pool = managerB.addRootPool(); + ASSERT_EQ(managerA.numPools(), 6); + ASSERT_EQ(managerB.numPools(), 6); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 6\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 5\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 6\nList of root pools:\n\t__sys_root__\n\tdefault_root_0\n\trefcount 2\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); child1.reset(); EXPECT_EQ( kSharedPoolCount + 1, managerA.testingDefaultRoot().getChildCount()); child2.reset(); EXPECT_EQ(kSharedPoolCount, managerB.testingDefaultRoot().getChildCount()); + ASSERT_EQ(managerA.numPools(), 4); + ASSERT_EQ(managerB.numPools(), 4); + pool.reset(); ASSERT_EQ(managerA.numPools(), 3); ASSERT_EQ(managerB.numPools(), 3); - pool.reset(); - ASSERT_EQ(managerA.numPools(), 2); - ASSERT_EQ(managerB.numPools(), 2); ASSERT_EQ( managerA.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); ASSERT_EQ( managerB.toString(), - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n\t__sys_root__\nMemory Allocator[MALLOC capacity UNLIMITED allocated bytes 0 allocated pages 0 mapped pages 0]\nARBIRTATOR[NOOP CAPACITY[UNLIMITED]]]"); const std::string detailedManagerStr = managerA.toString(true); ASSERT_THAT( detailedManagerStr, testing::HasSubstr( - "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 2\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); + "Memory Manager[capacity UNLIMITED alignment 64B usedBytes 0B number of pools 3\nList of root pools:\n__sys_root__ usage 0B reserved 0B peak 0B\n")); ASSERT_THAT( detailedManagerStr, testing::HasSubstr("__sys_spilling__ usage 0B reserved 0B peak 0B\n")); @@ -326,7 +326,7 @@ TEST_F(MemoryManagerTest, defaultMemoryManager) { // TODO: remove this test when remove deprecatedAddDefaultLeafMemoryPool. TEST(MemoryHeaderTest, addDefaultLeafMemoryPool) { auto& manager = toMemoryManager(deprecatedDefaultMemoryManager()); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3; ASSERT_EQ(manager.testingDefaultRoot().getChildCount(), kSharedPoolCount); { auto poolA = deprecatedAddDefaultLeafMemoryPool(); @@ -381,7 +381,7 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { MemoryManagerOptions options; options.alignment = alignment; MemoryManager manager{options}; - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); const int numPools = 100; std::vector> userRootPools; std::vector> userLeafPools; @@ -406,14 +406,14 @@ TEST_F(MemoryManagerTest, memoryPoolManagement) { ASSERT_FALSE(rootUnamedPool->name().empty()); ASSERT_EQ(rootUnamedPool->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootUnamedPool->parent(), nullptr); - ASSERT_EQ(manager.numPools(), 1 + numPools + 2 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools + 3 + 1); userLeafPools.clear(); leafUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1); + ASSERT_EQ(manager.numPools(), 1 + numPools / 2 + 1 + 1 + 1); userRootPools.clear(); - ASSERT_EQ(manager.numPools(), 1 + 2); + ASSERT_EQ(manager.numPools(), 1 + 3); rootUnamedPool.reset(); - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); } // TODO: when run sequentially, e.g. `buck run dwio/memory/...`, this has side @@ -430,7 +430,7 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_NE(manager, globalManager); ASSERT_EQ(manager, memoryManager()); auto* managerII = memoryManager(); - const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 2; + const auto kSharedPoolCount = FLAGS_velox_memory_num_shared_leaf_pools + 3; { auto& rootI = manager->testingDefaultRoot(); const std::string childIName("some_child"); @@ -464,9 +464,9 @@ TEST_F(MemoryManagerTest, globalMemoryManager) { ASSERT_EQ(userRootChild->kind(), MemoryPool::Kind::kAggregate); ASSERT_EQ(rootI.getChildCount(), kSharedPoolCount + 1); ASSERT_EQ(rootII.getChildCount(), kSharedPoolCount + 1); - ASSERT_EQ(manager->numPools(), 2 + 2); + ASSERT_EQ(manager->numPools(), 2 + 3); } - ASSERT_EQ(manager->numPools(), 2); + ASSERT_EQ(manager->numPools(), 3); } TEST_F(MemoryManagerTest, alignmentOptionCheck) { @@ -564,9 +564,9 @@ TEST_F(MemoryManagerTest, concurrentPoolAccess) { } stopCheck = true; checkThread.join(); - ASSERT_EQ(manager.numPools(), pools.size() + 2); + ASSERT_EQ(manager.numPools(), pools.size() + 3); pools.clear(); - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); } TEST_F(MemoryManagerTest, quotaEnforcement) { @@ -682,7 +682,7 @@ TEST_F(MemoryManagerTest, disableMemoryPoolTracking) { ASSERT_EQ(manager.capacity(), 64LL << 20); ASSERT_EQ(manager.shrinkPools(), 0); // Default 1 system pool with 1 leaf child - ASSERT_EQ(manager.numPools(), 2); + ASSERT_EQ(manager.numPools(), 3); VELOX_ASSERT_THROW( leaf0->allocate(38LL << 20), "Exceeded memory pool capacity");