From a155b0faf0862faa8f7046853c6f9b2f4f6f7348 Mon Sep 17 00:00:00 2001 From: Zac Wen Date: Thu, 11 Apr 2024 22:09:37 -0700 Subject: [PATCH] Prevent cache write from exceeding IOV_MAX (#9438) Summary: Pull Request resolved: https://github.com/facebookincubator/velox/pull/9438 Differential Revision: D55941557 --- velox/common/caching/SsdFile.cpp | 87 +++++++++++++++------- velox/common/caching/SsdFile.h | 5 ++ velox/common/caching/tests/SsdFileTest.cpp | 16 +++- 3 files changed, 78 insertions(+), 30 deletions(-) diff --git a/velox/common/caching/SsdFile.cpp b/velox/common/caching/SsdFile.cpp index 6c104e8ae1d2d..bc21a5ba0fec2 100644 --- a/velox/common/caching/SsdFile.cpp +++ b/velox/common/caching/SsdFile.cpp @@ -81,6 +81,14 @@ void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector& iovecs) { }; } } + +// Returns the number of entries in a cache 'entry'. +int32_t numIoVectorsFromEntry(AsyncDataCacheEntry& entry) { + if (entry.tinyData() != nullptr) { + return 1; + } + return entry.data().numRuns(); +} } // namespace SsdPin::SsdPin(SsdFile& file, SsdRun run) : file_(&file), run_(run) { @@ -374,46 +382,52 @@ void SsdFile::write(std::vector& pins) { VELOX_CHECK_NULL(entry->ssdFile()); } - int32_t storeIndex = 0; - while (storeIndex < pins.size()) { - auto space = getSpace(pins, storeIndex); + int32_t writeIndex = 0; + while (writeIndex < pins.size()) { + const auto space = getSpace(pins, writeIndex); if (!space.has_value()) { // No space can be reclaimed. The pins are freed when the caller is freed. return; } auto [offset, available] = space.value(); - int32_t numWritten = 0; - int32_t bytes = 0; - std::vector iovecs; - for (auto i = storeIndex; i < pins.size(); ++i) { + int32_t numWrittenEntries = 0; + uint64_t writeOffset = offset; + int32_t writeLength = 0; + std::vector writeIovecs; + for (auto i = writeIndex; i < pins.size(); ++i) { auto* entry = pins[i].checkedEntry(); const auto entrySize = entry->size(); - if (bytes + entrySize > available) { + const auto numIovecs = numIoVectorsFromEntry(*entry); + VELOX_CHECK_LE(numIovecs, IOV_MAX); + if (writeIovecs.size() + numIovecs > IOV_MAX) { + // Writes out the accumulated iovecs if it exceeds IOV_MAX limit. + if (!write(writeOffset, writeLength, writeIovecs)) { + return; + } + writeIovecs.clear(); + writeOffset += writeLength; + writeLength = 0; + } + if (writeLength + entrySize > available) { break; } - addEntryToIovecs(*entry, iovecs); - bytes += entrySize; - ++numWritten; - } - VELOX_CHECK_GE(fileSize_, offset + bytes); - - const auto rc = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); - if (rc != bytes) { - VELOX_SSD_CACHE_LOG(ERROR) - << "Failed to write to SSD, file name: " << fileName_ - << ", fd: " << fd_ << ", size: " << iovecs.size() - << ", offset: " << offset << ", error code: " << errno - << ", error string: " << folly::errnoStr(errno); - ++stats_.writeSsdErrors; - // If write fails, we return without adding the pins to the cache. The - // entries are unchanged. - return; + addEntryToIovecs(*entry, writeIovecs); + writeLength += entrySize; + ++numWrittenEntries; + } + if (writeLength > 0) { + VELOX_CHECK(!writeIovecs.empty()); + if (!write(writeOffset, writeLength, writeIovecs)) { + return; + } + writeOffset += writeLength; } + VELOX_CHECK_GE(fileSize_, writeOffset); { std::lock_guard l(mutex_); - for (auto i = storeIndex; i < storeIndex + numWritten; ++i) { + for (auto i = writeIndex; i < writeIndex + numWrittenEntries; ++i) { auto* entry = pins[i].checkedEntry(); entry->setSsdFile(this, offset); const auto size = entry->size(); @@ -429,7 +443,7 @@ void SsdFile::write(std::vector& pins) { bytesAfterCheckpoint_ += size; } } - storeIndex += numWritten; + writeIndex += numWrittenEntries; } if ((checkpointIntervalBytes_ > 0) && @@ -438,6 +452,25 @@ void SsdFile::write(std::vector& pins) { } } +bool SsdFile::write( + uint64_t offset, + uint64_t length, + const std::vector& iovecs) { + const auto ret = folly::pwritev(fd_, iovecs.data(), iovecs.size(), offset); + if (ret != length) { + VELOX_SSD_CACHE_LOG(ERROR) + << "Failed to write to SSD, file name: " << fileName_ << ", fd: " << fd_ + << ", size: " << iovecs.size() << ", offset: " << offset + << ", error code: " << errno + << ", error string: " << folly::errnoStr(errno); + ++stats_.writeSsdErrors; + // If write fails, we return without adding the pins to the cache. The + // entries are unchanged. + return false; + } + return true; +} + namespace { int32_t indexOfFirstMismatch(char* x, char* y, int n) { for (auto i = 0; i < n; ++i) { diff --git a/velox/common/caching/SsdFile.h b/velox/common/caching/SsdFile.h index d6e6c6277bcb6..5f90fbe2e4294 100644 --- a/velox/common/caching/SsdFile.h +++ b/velox/common/caching/SsdFile.h @@ -328,6 +328,11 @@ class SsdFile { // the files for making new checkpoints. void initializeCheckpoint(); + // Writes 'iovecs' to the SSD file at the 'offset'. Returns true if the write + // succeeds; otherwise, log the error and return false. + bool + write(uint64_t offset, uint64_t length, const std::vector& iovecs); + // Synchronously logs that 'regions' are no longer valid in a possibly xisting // checkpoint. void logEviction(const std::vector& regions); diff --git a/velox/common/caching/tests/SsdFileTest.cpp b/velox/common/caching/tests/SsdFileTest.cpp index 9cb89f2fabb92..9792b06aaf878 100644 --- a/velox/common/caching/tests/SsdFileTest.cpp +++ b/velox/common/caching/tests/SsdFileTest.cpp @@ -118,12 +118,12 @@ class SsdFileTest : public testing::Test { } } - // Gets consecutive entries from file 'fileId' starting at 'startOffset' with + // Gets consecutive entries from file 'fileId' starting at 'startOffset' with // sizes between 'minSize' and 'maxSize'. Sizes start at 'minSize' and double // each time and go back to 'minSize' after exceeding 'maxSize'. This stops // after the total size has exceeded 'totalSize'. The entries are returned as // pins. The pins are exclusive for newly created entries and shared for - // existing ones. New entries are deterministically initialized from 'fileId' + // existing ones. New entries are deterministically initialized from 'fileId' // and the entry's offset. std::vector makePins( uint64_t fileId, @@ -280,7 +280,7 @@ TEST_F(SsdFileTest, writeAndRead) { } } - // We check howmany entries are found. The earliest writes will have been + // We check how many entries are found. The earliest writes will have been // evicted. We read back the found entries and check their contents. int32_t numFound = 0; for (auto& entry : allEntries) { @@ -302,6 +302,16 @@ TEST_F(SsdFileTest, writeAndRead) { } } } + + // Test cache writes with different iobufs sizes. + for (int numPins : {0, 1, IOV_MAX - 1, IOV_MAX, IOV_MAX + 1}) { + SCOPED_TRACE(fmt::format("numPins: {}", numPins)); + auto pins = makePins(fileName_.id(), 0, 4096, 4096, 4096 * numPins); + EXPECT_EQ(pins.size(), numPins); + ssdFile_->write(pins); + readAndCheckPins(pins); + pins.clear(); + } } #ifdef VELOX_SSD_FILE_TEST_SET_NO_COW_FLAG